Skip to main content

将数据源中的数据送到机器学习/深度学习模型中进行训练

Project description

功能

将数据源中的数据送到机器学习/深度学习模型中进行训练。 无论数据存在于磁盘(mysql,mongodb,hive等)、内存(pandas,dask,spark,koalas等)或者显存(rapids), 无论数据大小, 无论数据是格式化还是非格式化。

实现数据与模型分离,不会在模型中出现数据操作。数据与模型分离(算法与数据结构分离)的一般形式如下示例:

class Model(object):
    ...
    def train(self,dataset):
        """ Model 依赖于一个数据集,或者说,一个算法依赖于一个数据结构。"""
        pass


model = Model()
model.train(dataset) # all 

数据与模型分离的具体示例如下:

import pandas as pd 
from dataset import DataSet

pdf = pd.DataFrame(...) # 数据源

ds = DataSet(pdf) # 数据集

trainDataSet,testDataSet = ds.split_dataset(frac) # 划分训练集、测试集

for epochs:
    for outputDict in trainDataSet: # trainDataSet 是可迭代的形式
        # batch
        x = outputDict["columnsx"]
        y = outputDict["columnsy"]
        feed_dict = {
            columnsx:x,
            columnsy:y
        }
        sess.run(train_op,loss_op,feed_dict=feed_dict)

原理介绍

数据集

  1. 数据集中样本个体的特点1:样本是随机的,没有大小顺序的。
  2. 数据集中样本个体的特点2:多一个少一个无所谓。
  3. 数据集中样本个体的特点3:独立同分布的 iid 。
  4. 数据集的子集性:数据集分割后,仍然是数据集。即便是一个样本,也是一个数据集。
  5. 数据集中的样本/个体要有唯一标识符(索引 idxs)。
  6. 不需要数据清洗和数据分析(describe/mean/max)等操作。因为 mean/max 本身就是一种算法。
  7. 不需要观察全貌,分批读取即可,观察全貌是数据清洗的事情。
  8. 很多操作(merge/append/split等)在索引 idxs 上操作即可,不需要真实到数据上操作。
  9. 所有数据操作(数据清洗等)都应该用其他包来处理,而不是dataset。当最终形成df的时候,才转换为 dataset。

索引

  1. idxs 是样本在数据集中的唯一标识,可以随意更改,只要能区分不同个体。
  2. 索引的数据类型:原理上,只要能 hash 的数据都可以作为索引,但同一个数据集中的索引应该尽可能的保持一致。
  3. 索引的数据性:样本索引 idxs 不可以重复,但样本的数值是可以重复的。
  4. 索引映射:由于对原始数据只读不写,所以在更改索引时,只是对原始数据的索引进行了一对一映射。
  5. 尽可能的用数值来做索引,并且由于索引映射的存在,不应该在使用 dataset 时太多的关注索引的值,只需要能够按照索引找到相应的数据即可。
  6. 索引的独立性:数据集中的任何属性/字段都不应该和 idxs 有关联(属性值=f(idxs))。比如:图片名称就可能是用 idxs 作为名称的,可以用时间等具有唯一性的量作为图片名称。将 fileName = f(id) 改为 fileName = f(time)
  7. 索引的本质:要能够依据 idxs 取出数据。

数据本身

  • 对数据源 text,sql,pandas 中的数据,只读不写!

迭代 batch

  1. 样本数量:原理上,每batch的样本数量 <= batchSize,如果需要,可以设置 batch 的样本数量严格等于 batchSize。
  2. batch 的数据操作1:所有变换/操作是在 batch data 上进行,而不是先变换后 划分 batch !!!
  3. batch 的数据操作2:由于2的存在,append,merge,fillna,dropna,select_columns 等等几乎所有的数据操作,以及更一般的,像convert_to_numpy(本质是apply)这样的,都是在 batch data 上进行的。
  4. batch 的输出数据类型:输出是 dict 类型,{"columnname1":batch data, "columnname2":batch data}
  5. batch 的输出数据特点1:batch 的输出只能是数值类型。
  6. batch 的输出数据特点2:batch 的输出只能是数值类型,且不能有 nan。
  7. batch 输出中的 nan 需要在batch data上进行操作,将其去掉或换掉。
  8. batch 速度性:不需要实时(磁盘内存无所谓)。
  9. batch 高可用性:数据集在迭代 iter 时需要有高可用性,不能在训练中途出现故障。

数值转换 convert fun

  1. 进入到模型f(x)中的数据只能是数值数据,而不是字符串str/json/字节数组/树tree/图graph。某个字段/属性的值可能是很复杂的数据结构:树/图等等,要把这些复杂的数据结构转化为数值数据。

  2. 数值转换函数的形式:数值转换函数 convert_fun 可以是任意形式的。比如,输入是一张图片,转换函数可以是一个神经网络,输出是从图片中提取的特征向量,或者特征图。

  3. 数值转换函数的输入:数值转换函数f只应该有一个输入值b=f(a)。如果有其他的参数,应该将参数记录到静态量config中,并在函数体内,直接引用。

  4. 数值转换函数的输出:数值转换函数f只应该有一个输出,且输出的类型是numpy.ndarray

  5. convert 函数中不能计算全局量 mean 等。例如:不能在 convert 函数中,调用该列的平均值等。

  6. 逆函数:对每个数值转换函数都应该尽可能的提供其反方向的函数,比如对文本的onehot编码 array=one_hot(str) 而言,提供其反函数 str=one_hot_inv(array).

  7. 数值转换函数是将数据集中某列的数据,由非数值类型,转换到最终的数值类型,比如:将图片路径转换为图片数据,如下:

def convert_fun(image_path:str) -> np.ndarray:
    
    a = imread(image_path)

    b = imresize(a)

    c = other(b)

    tensor = resnet_fc7(c) # resnet 神经网络从图片中提取特征向量

    return tensor

# 然后,指定 dataset 的 image_path 列关联这个转换函数(convert_fun)即可,如下示例:
b = {
    "image_path" : convert_fun,
}

ds.convertFunDict = b

config

  1. 数据集中的其他超参数放到 config 中,作为全局变量存在。
  2. 如果有其他的参数,很可能是数值转换函数f的参数。
  3. 有些参数可能在 model 中用到,在 dataset 中也用到。
  4. dataset 本来只有一个参数batchsize,

安装

平台

win linux

依赖

可选依赖

  • spark :大数据操作尽可能选择之!
  • koalas :大数据操作尽可能选择之!
  • dask :中等数据操作尽可能选择之!

pip 安装命令

only core

不会安装 dask spark 的扩展依赖,只有当 You can also install only the dataset library. spark,dask,koalas等数据源 won’t work until you also install pyspark,koalas,dask,django, respectively. This is common for downstream library maintainers:

pip install dataset # Install only core parts of dask

big data

pip install dataset[koalas] pip install dataset[pyspark] pip install dataset[dask] pip install dataset[bigdata]

orm

pip install dataset[django]

gpu

TODO pip install dataset

示例

  • easy_demo
""" ds 基本功能
创建
查看 show、idxs、nSamples
简单迭代
划分训练集和测试集
iter --> tf、keras、pytorch、sklearn
iterepoch
严格模式

"""

import pandas as pd 
import numpy as np 
from datasets import DataSet




# ----------------------------------------------------
# 创建

df = pd.DataFrame(np.random.uniform(size=(20,3)),columns=list("abc"))
df.reset_index(inplace=True) # --> index a b c

ds = DataSet(df,batchSize=7)




# ----------------------------------------------------
# 查看

ds.sampleNum # 数据集大小
# >>> 
# 20

idxs = ds.get_idxs() # 数据集中每个样本的 id
# >>> 
# [ 0  1  2  3  4  5  6  7  8  9 10 11 12 13 14 15 16 17 18 19]

ds.show() # 打印1批次的数值数据,以及每个变量的形状
# >>> 
#           a          b         c index
# 0   0.62919  0.0851551  0.225027     0
# 1  0.848418   0.708781  0.440574     1
# 2  0.521759   0.620585  0.932888     2
# 3  0.727612   0.920076  0.154726     3
# 4   0.90353   0.243451  0.881314     4
# 5  0.102216    0.80945  0.200341     5
# 6  0.750237   0.332719  0.618396     6
# shape   (7,)       (7,)      (7,)  (7,)





# ----------------------------------------------------
# 迭代

for outputDict in ds:
    print(outputDict.keys())
    # >>> 
    # dict_keys(['c', 'index', 'a', 'b'])
    index = outputDict["index"] # 提取该 batch 某一变量的数值数据,用于模型输入
    # >>> 
    # array([0, 1, 2, 3, 4, 5, 6])
    # array([ 7,  8,  9, 10, 11, 12, 13])
    # array([14, 15, 16, 17, 18, 19])

for outputDict in ds.iter():
    # 使用 iter() 和不使用二者效果是一样的
    print(outputDict.keys())
    index = outputDict["index"]

    
for outputDict in ds.iter_epochs(3):
    # iter() 只遍历整个数据集一次
    # iter_epochs(3) 遍历整个数据集 3 次
    print(outputDict.keys())
    index = outputDict["index"]




# ----------------------------------------------------
# 迭代的严格模式 isStrictBatchSize
# 
# 说明:
# 非严格模式下,每个 batch 的样本数量是不确定的,只能保证样本数量是<=batchSize的。
# 严格模式可以让每 batch 的样本数量严格等于 batchSize
#
# 严格模式的使用很简单,只需要设置属性 isStrictBatchSize=True 即可.
# 

ds = DataSet(...)
ds.isStrictBatchSize = True

# 或者
ds = DataSet(...,isStrictBatchSize=True)

for outputDict in ds.iter_epochs(3):
    # iter() 只遍历整个数据集一次
    # iter_epochs(3) 遍历整个数据集 3 次
    print(outputDict.keys())
    index = outputDict["index"]




# ----------------------------------------------------
# 划分训练集和测试集
# 
# 说明:
# 1、划分数据集时,内部深度 copy 了原数据集的idxs,
# 但数据集本身的数据没有变化,相当于该数据集有多个 idxs 。
#
# 2、验证集说明:
# isHasValidate:bool=True 是否有验证集
# strategyValidate:int=1 验证集的生成方式:验证集的生成方式 default=1 1表示验证集在训练集中,参加训练,2表示验证集不在训练集中,不参加训练
# 如果有验证集:
#     如果验证集不参与训练:
#         将训练集再次按 1-frac 划分成训练集和验证集
#     如果验证集参与训练:
#         将验证集按 1-frac 挑选部分做为验证集
# 

frac=0.8 # 训练集所占比例
trainDataSet, validDataSet, testDataSet = ds.split_dataset(frac)




# ----------------------------------------------------
# 训练

# tensorflow 训练版本
for epoch:
    for outputDict in ds:
        # batch
        x = outputDict["columnsx"]
        y = outputDict["columnsy"]
        feed_dict = {
                columnsx:x,
                columnsy:y
        }
        sess.run(train_op,loss_op,feed_dict=feed_dict)
    ds.shuffle_idxs() # 每一 epoch 都打乱一下 idxs


# tf epoch训练版本
for outputDict in ds.iter_epochs(12):
    # iter_epochs 自动每一 epoch 都打乱一下 idxs
    x = outputDict["columnsx"]
    y = outputDict["columnsy"]
    feed_dict = {
        columnsx:x,
        columnsy:y
    }
    sess.run(train_op,loss_op,feed_dict=feed_dict)

# pytorch 训练版本
for epoch :
    for outputDict in ds:
        # batch
        x = outputDict["columnsx"]
        y = outputDict["columnsy"] 
        
        optimizer.zero_grad()
        yhat = model(x)
        loss = criterien(y,yhat)
        loss.backend()
        optimizer.step()
    ds.shuffle_idxs() # 每一 epoch 都打乱一下 idxs


# pytorch epoch 训练版本:
for outputDict in ds.iter_epochs(12):
    x = outputDict["columnsx"]
    y = outputDict["columnsy"]

    optimizer.zero_grad()
    yhat = model(x)
    loss = criterien(y,yhat)
    loss.backend()
    optimizer.step()


# keras 训练版本
for epochs:
    for outputDict in ds:
        # batch
        x = outputDict["columnsx"]
        y = outputDict["columnsy"] 
        model.train_on_batch(x,y)
    ds.shuffle_idxs() # 每一 epoch 都打乱一下 idxs

# keras epochs 训练版本
model.fit_generator(ds.iter_epochs(12))
  • 更多例子参考:example

支持的数据源/内存数据

  • PandasDataManager : 接管 pandas 中的数据,只读不写

  • DaskDataManager : 接管 dask 中的数据,只读不写

  • SparkDataManager : 接管 spark 中的数据,只读不写

  • KoalasDataManager : 接管 koalas 中的数据,只读不写

  • DjangoOrmDataManager :接管 django model 中的数据,只读不写

  • SqlDataManager :接管 mysql 中的数据,只读不写

  • 用户自定义!

  • RapidsDataManager :TODO : 接管 Rapids

  • RayDataManager :TODO : 接管 Ray

  • DparkDataManager :TODO : 接管 Dpark

  • MarsDataManager :TODO : 接管 Mars

示例:

import dataset
from dataset import DataSet

import pandas as pd 
from dataset import PandasDataManager

pdf = pd.DataFrame(...)
pdm = PandasDataManager(pdf)
pds = DataSet(pdf)
from dataset import DataSet
from dataset import DjangoOrmDataManager

from django.db import models

class OrmModel(models.Model):
    pass

inputClass = Person
ormdm = DjangoOrmDataManager(inputClass,idColumnsName="id")

ormds = DataSet(ormdm)

支持的磁盘数据源/IO

  • csv
  • excel
  • json
  • txt
  • hdf5
  • parquet
  • orc
  • hive
  • mongodb
  • elasticearch
  • solr

示例:

import dataset
from dataset import DataSet

ds = dataset.read_csv(path_str)

功能

新样本

数据集划分

回收

convert

有好的数值转换操作可以提过给我,十分感谢!

convert 函数设计规则:

  1. 尽可能只有一个输入参数。
  2. 只有一个输出参数,且输出类型是 np.ndarray.
  3. 尽可能提供逆函数.

io

有其他格式文件的io可以提供给我,十分感谢!

datamanager

实现了其他数据源的 datamanager 类的,可以提供给我,十分感谢!

datamanager 模板如下:

""" datamanager 模板 """

import numpy as np 
import pandas as pd 
from typing import Optional

from datamanager import DataManager
from datamanager import PYInterIterDataManager

from idxsutils import _drop_idxs_nan


class ADataManager(DataManager):
    def __init__(self, df:?.DataFrame, idColumnsName:Optional[str]=None):
        self._dataFrame = df

        if idColumnsName is None:
            self._isIndex = True
        else:
            self._isIndex = False

        self._idColumnsName = idColumnsName

    @property
    def idColumnsName(self):
        return self._idColumnsName
    

    def get_idxs(self) -> np.ndarray:
        if self._isIndex:
            seriesIdxs = self._dataFrame.index # --> cudf.Index
        else:
            seriesIdxs = self._dataFrame[self._idColumnsName] # --> cudf.Series

        arrayIdxs = seriesIdxs.?.values
        arrayIdxs = _drop_idxs_nan(arrayIdxs) # -> np.ndarray(int64)

        return arrayIdxs


    def get_data_by_idxs(self,idxs:np.ndarray) -> PYInterIterDataManager:
        
        # NOTE: Index/Series 是否都有 isin ? 
        # NOTE: isin 输入参数类型 ?
        
        if self._isIndex:
            _tableGetData = self._dataFrame[self._dataFrame.index.isin(idxs)] # --> ?.DataFrame
        else:
            _tableGetData = self._dataFrame[self._dataFrame[self._idColumnsName].isin(idxs)] 

        tableGetData = _tableGetData.to_pandas() ?
        
        interIterDataManager = PYInterIterDataManager(self._idColumnsName,tableGetData)
        
        return interIterDataManager

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Files for dataset-xy, version 0.1.4
Filename, size File type Python version Upload date Hashes
Filename, size dataset-xy-0.1.4.tar.gz (68.8 kB) File type Source Python version None Upload date Hashes View hashes

Supported by

Elastic Elastic Search Pingdom Pingdom Monitoring Google Google BigQuery Sentry Sentry Error logging AWS AWS Cloud computing DataDog DataDog Monitoring Fastly Fastly CDN SignalFx SignalFx Supporter DigiCert DigiCert EV certificate StatusPage StatusPage Status page