Out-of-core data processing framework for pandas
Project description
oocas
oocas is an out-of-core data processing framework built for pandas. Its main purpose is local ETL processing of large data by sequentially processing smaller files.
Installation
pip install oocas
conda install -c conda-forge oocas
Usage
The data, which are specified by a list of file paths, are read, transformed, and written/returned. In a transform substep data can be cached to be available in later transform substeps. A processing pipeline is build out of callable components.
- Process
- Read, FileRead, ParquetRead, ParquetMetadataRead, MultiRead
- Transform, CacheTransform
- Cache, IndexCache, TimeIndexCache
- Write, FileWrite, ParquetWrite, MultiWrite
High level components (bold) support plugin callables in addition to a variety of other useful arguments. Low level components inherit their behavior by passing a predefined method this way.
Examples
Examples can be found in the docs.
Read files to dataframe
import oocas as oc
paths = oc.find_paths('data/raw')
sqp = oc.Process(oc.ParquetRead(), progress_bar=False)
df = pd.concat(sqp(paths), axis=0)
Random sampling
import oocas as oc
sqp = oc.Process(
oc.ParquetMetaDataRead(),
lambda x: x.num_rows,
progress_bar=False)
nrows = sum(sqp(paths))
sample_idxs = np.random.choice(nrows, 10**3)
def sample(df, cache):
adjusted_idxs = sample_idxs - cache.data
cache(cache.data + len(df))
return df.iloc[[
idx for idx in adjusted_idxs\
if idx >= 0 and idx < len(df)
]].copy()
sqp = oc.Process(
oc.ParquetRead(),
oc.CacheTransform(sample, cache=0))
sample_df = pd.concat(sqp(paths), axis=0)
Time series moving average
import oocas as oc
def moving_average(df, cache):
nrows = len(df)
df = pd.concat([cache.data, df])
cache(df)
return df.rolling(cache.lookback)\
.mean().iloc[-nrows:]
sqp = oc.Process(
oc.ParquetRead(),
oc.CacheTransform(
moving_average,
oc.TimeIndexCache('1H')),
oc.ParquetWrite(
path_part_replace=('raw', 'mavg'),
mkdirs=True))
mavg_paths = sqp(paths)
N-to-1 file write
import oocas as oc
pickle_multi_to_one_write = oc.FileWrite(
lambda x, path, **kwargs:\
x.to_pickle(path, **kwargs),
path_transform=lambda x: x[0],
path_part_replace=('raw', 'pickle'),
name_transform=lambda x:\
'_'.join(x.split('_')[1:]),
suffix='.pkl.gz',
mkdirs=True,
overwrite=True,
compression='infer')
Project details
Release history Release notifications | RSS feed
Download files
Download the file for your platform. If you're not sure which to choose, learn more about installing packages.
Source Distribution
File details
Details for the file oocas-1.0.1.tar.gz
.
File metadata
- Download URL: oocas-1.0.1.tar.gz
- Upload date:
- Size: 4.0 MB
- Tags: Source
- Uploaded using Trusted Publishing? No
- Uploaded via: twine/4.0.1 CPython/3.9.12
File hashes
Algorithm | Hash digest | |
---|---|---|
SHA256 | 244dfe7278c912b4395e41fcaa0ee2fbd95775ef7869db83ade9b5e3314ce979 |
|
MD5 | 172ebe0baedab2f2d84ec1a4f8c652b4 |
|
BLAKE2b-256 | c64d87724ce709015c9d0a269a2c9123482aa7154bffe7ea79e0e2d7d6c27540 |