Skip to main content

Data pipelines with lazy computation and caching

Project description

dpipe

Data pipelines feat. lazy computation and caching

Installation

pip install dpipe

Example

import dpipe
import pandas as pd
import numpy as np
from loguru import logger

# --- Define data transformations via step functions (similar to dask.delayed)

@dpipe.delayed_cached()  # lazy computation + caching on disk
def load_1():
    df = pd.DataFrame({'a': [1., 2.], 'b': [0.1, np.nan]})
    logger.info('Loaded {} records'.format(len(df)))
    return df

@dpipe.delayed_cached()  # lazy computation + caching on disk
def load_2(timestamp):
    df = pd.DataFrame({'a': [0.9, 3.], 'b': [0.001, 1.]})
    logger.info('Loaded {} records'.format(len(df)))
    return df

@dpipe.delayed_cached()  # lazy computation + caching on disk
def compute(x, y, eps):
    assert x.shape == y.shape
    diff = ((x - y).abs() / (y.abs()+eps)).mean().mean()
    logger.info('Difference is computed')
    return diff

# --- Define pipeline dependencies
ts = pd.Timestamp(2019, 1, 1)
eps = 0.01
s1 = load_1()
s2 = load_2(ts)
diff = compute(s1, s2, eps)

# --- Trigger pipeline execution
print('diff: {:.3f}'.format(dpipe.delayed_compute((diff, ))[0]))

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

apipe-0.1.0.tar.gz (11.4 kB view hashes)

Uploaded Source

Built Distribution

apipe-0.1.0-py3-none-any.whl (15.5 kB view hashes)

Uploaded Python 3

Supported by

AWS AWS Cloud computing and Security Sponsor Datadog Datadog Monitoring Fastly Fastly CDN Google Google Download Analytics Microsoft Microsoft PSF Sponsor Pingdom Pingdom Monitoring Sentry Sentry Error logging StatusPage StatusPage Status page