Skip to main content

Out-of-core data processing framework for pandas

Project description

oocas

PyPI version fury.io PyPI license PyPI download month

oocas is an out-of-core data processing framework built for pandas. Its main purpose is local ETL processing of large data by sequentially processing smaller files.

Installation

pip install oocas
conda install -c conda-forge oocas 

Usage

The data, which are specified by a list of file paths, are read, transformed, and written/returned. In a transform substep data can be cached to be available in later transform substeps. A processing pipeline is build out of callable components.

  • Process
  • Read, FileRead, ParquetRead, ParquetMetadataRead, MultiRead
  • Transform, CacheTransform
  • Cache, IndexCache, TimeIndexCache
  • Write, FileWrite, ParquetWrite, MultiWrite

High level components (bold) support plugin callables in addition to a variety of other useful arguments. Low level components inherit their behavior by passing a predefined method this way.

Examples

Examples can be found in the docs.

Read files to dataframe

import oocas as oc

paths = oc.find_paths('data/raw')
sqp = oc.Process(oc.ParquetRead(), progress_bar=False)
df = pd.concat(sqp(paths), axis=0)

Random sampling

import oocas as oc

sqp = oc.Process(
    oc.ParquetMetaDataRead(),
    lambda x: x.num_rows,
    progress_bar=False)

nrows = sum(sqp(paths))                     
sample_idxs = np.random.choice(nrows, 10**3)

def sample(df, cache):
    adjusted_idxs = sample_idxs - cache.data
    cache(cache.data + len(df))
    return df.iloc[[                                      
        idx for idx in adjusted_idxs\
             if idx >= 0 and idx < len(df)
    ]].copy()

sqp = oc.Process(
    oc.ParquetRead(),
    oc.CacheTransform(sample, cache=0))

sample_df = pd.concat(sqp(paths), axis=0)

Time series moving average

import oocas as oc

def moving_average(df, cache):
    nrows = len(df)
    df = pd.concat([cache.data, df])
    cache(df)
    return df.rolling(cache.lookback)\
        .mean().iloc[-nrows:]
    
sqp = oc.Process(
    oc.ParquetRead(),
    oc.CacheTransform(
        moving_average,
        oc.TimeIndexCache('1H')),
    oc.ParquetWrite(
        path_part_replace=('raw', 'mavg'),
        mkdirs=True))

mavg_paths = sqp(paths)

N-to-1 file write

import oocas as oc

pickle_multi_to_one_write = oc.FileWrite(
    lambda x, path, **kwargs:\
        x.to_pickle(path, **kwargs),
    path_transform=lambda x: x[0],
    path_part_replace=('raw', 'pickle'),
    name_transform=lambda x:\
        '_'.join(x.split('_')[1:]),
    suffix='.pkl.gz',
    mkdirs=True,
    overwrite=True,
    compression='infer')

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

oocas-1.0.1.tar.gz (4.0 MB view hashes)

Uploaded Source

Supported by

AWS AWS Cloud computing and Security Sponsor Datadog Datadog Monitoring Fastly Fastly CDN Google Google Download Analytics Microsoft Microsoft PSF Sponsor Pingdom Pingdom Monitoring Sentry Sentry Error logging StatusPage StatusPage Status page