Skip to main content

Out-of-core data processing framework for pandas

Project description

oocas

PyPI version fury.io PyPI license PyPI download month

oocas is an out-of-core data processing framework built for pandas. Its main purpose is local ETL processing of large data by sequentially processing smaller files.

Installation

pip install oocas
conda install -c conda-forge oocas 

Usage

The data, which are specified by a list of file paths, are read, transformed, and written/returned. In a transform substep data can be cached to be available in later transform substeps. A processing pipeline is build out of callable components.

  • Process
  • Read, FileRead, ParquetRead, ParquetMetadataRead, MultiRead
  • Transform, CacheTransform
  • Cache, IndexCache, TimeIndexCache
  • Write, FileWrite, ParquetWrite, MultiWrite

High level components (bold) support plugin callables in addition to a variety of other useful arguments. Low level components inherit their behavior by passing a predefined method this way.

Examples

Examples can be found in the docs.

Read files to dataframe

import oocas as oc

paths = oc.find_paths('data/raw')
sqp = oc.Process(oc.ParquetRead(), progress_bar=False)
df = pd.concat(sqp(paths), axis=0)

Random sampling

import oocas as oc

sqp = oc.Process(
    oc.ParquetMetaDataRead(),
    lambda x: x.num_rows,
    progress_bar=False)

nrows = sum(sqp(paths))                     
sample_idxs = np.random.choice(nrows, 10**3)

def sample(df, cache):
    adjusted_idxs = sample_idxs - cache.data
    cache(cache.data + len(df))
    return df.iloc[[                                      
        idx for idx in adjusted_idxs\
             if idx >= 0 and idx < len(df)
    ]].copy()

sqp = oc.Process(
    oc.ParquetRead(),
    oc.CacheTransform(sample, cache=0))

sample_df = pd.concat(sqp(paths), axis=0)

Time series moving average

import oocas as oc

def moving_average(df, cache):
    nrows = len(df)
    df = pd.concat([cache.data, df])
    cache(df)
    return df.rolling(cache.lookback)\
        .mean().iloc[-nrows:]
    
sqp = oc.Process(
    oc.ParquetRead(),
    oc.CacheTransform(
        moving_average,
        oc.TimeIndexCache('1H')),
    oc.ParquetWrite(
        path_part_replace=('raw', 'mavg'),
        mkdirs=True))

mavg_paths = sqp(paths)

N-to-1 file write

import oocas as oc

pickle_multi_to_one_write = oc.FileWrite(
    lambda x, path, **kwargs:\
        x.to_pickle(path, **kwargs),
    path_transform=lambda x: x[0],
    path_part_replace=('raw', 'pickle'),
    name_transform=lambda x:\
        '_'.join(x.split('_')[1:]),
    suffix='.pkl.gz',
    mkdirs=True,
    overwrite=True,
    compression='infer')

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

oocas-1.0.1.tar.gz (4.0 MB view details)

Uploaded Source

File details

Details for the file oocas-1.0.1.tar.gz.

File metadata

  • Download URL: oocas-1.0.1.tar.gz
  • Upload date:
  • Size: 4.0 MB
  • Tags: Source
  • Uploaded using Trusted Publishing? No
  • Uploaded via: twine/4.0.1 CPython/3.9.12

File hashes

Hashes for oocas-1.0.1.tar.gz
Algorithm Hash digest
SHA256 244dfe7278c912b4395e41fcaa0ee2fbd95775ef7869db83ade9b5e3314ce979
MD5 172ebe0baedab2f2d84ec1a4f8c652b4
BLAKE2b-256 c64d87724ce709015c9d0a269a2c9123482aa7154bffe7ea79e0e2d7d6c27540

See more details on using hashes here.

Supported by

AWS AWS Cloud computing and Security Sponsor Datadog Datadog Monitoring Fastly Fastly CDN Google Google Download Analytics Microsoft Microsoft PSF Sponsor Pingdom Pingdom Monitoring Sentry Sentry Error logging StatusPage StatusPage Status page