Skip to main content

Data pipelines with lazy computation and caching

Project description

A-Pipe

A-Pipe allows to create data pipelines with lazy computation and caching.

Features:

  • Lazy computation and cache loading
  • Pickle and parquet serialization
  • Support for hashing of numpy arrays and pandas DataFrames
  • Support of Delayed objects

Installation

pip install apipe

Example

import apipe
import pandas as pd
import numpy as np
from loguru import logger

# --- Define data transformations via step functions (similar to dask.delayed)

@apipe.delayed_cached()  # lazy computation + caching on disk
def load_1():
    df = pd.DataFrame({'a': [1., 2.], 'b': [0.1, np.nan]})
    logger.info('Loaded {} records'.format(len(df)))
    return df

@apipe.delayed_cached()  # lazy computation + caching on disk
def load_2(timestamp):
    df = pd.DataFrame({'a': [0.9, 3.], 'b': [0.001, 1.]})
    logger.info('Loaded {} records'.format(len(df)))
    return df

@apipe.delayed_cached()  # lazy computation + caching on disk
def compute(x, y, eps):
    assert x.shape == y.shape
    diff = ((x - y).abs() / (y.abs()+eps)).mean().mean()
    logger.info('Difference is computed')
    return diff

# --- Define pipeline dependencies
ts = pd.Timestamp(2019, 1, 1)
eps = 0.01
s1 = load_1()
s2 = load_2(ts)
diff = compute(s1, s2, eps)

# --- Trigger pipeline execution
print('diff: {:.3f}'.format(apipe.delayed_compute((diff, ))[0]))

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

apipe-0.1.1.tar.gz (11.3 kB view hashes)

Uploaded Source

Built Distribution

apipe-0.1.1-py3-none-any.whl (15.6 kB view hashes)

Uploaded Python 3

Supported by

AWS AWS Cloud computing and Security Sponsor Datadog Datadog Monitoring Fastly Fastly CDN Google Google Download Analytics Microsoft Microsoft PSF Sponsor Pingdom Pingdom Monitoring Sentry Sentry Error logging StatusPage StatusPage Status page