Data pipelines with lazy computation and caching
Project description
dpipe
Data pipelines feat. lazy computation and caching
Installation
pip install dpipe
Example
import dpipe
import pandas as pd
import numpy as np
from loguru import logger
# --- Define data transformations via step functions (similar to dask.delayed)
@dpipe.delayed_cached() # lazy computation + caching on disk
def load_1():
df = pd.DataFrame({'a': [1., 2.], 'b': [0.1, np.nan]})
logger.info('Loaded {} records'.format(len(df)))
return df
@dpipe.delayed_cached() # lazy computation + caching on disk
def load_2(timestamp):
df = pd.DataFrame({'a': [0.9, 3.], 'b': [0.001, 1.]})
logger.info('Loaded {} records'.format(len(df)))
return df
@dpipe.delayed_cached() # lazy computation + caching on disk
def compute(x, y, eps):
assert x.shape == y.shape
diff = ((x - y).abs() / (y.abs()+eps)).mean().mean()
logger.info('Difference is computed')
return diff
# --- Define pipeline dependencies
ts = pd.Timestamp(2019, 1, 1)
eps = 0.01
s1 = load_1()
s2 = load_2(ts)
diff = compute(s1, s2, eps)
# --- Trigger pipeline execution
print('diff: {:.3f}'.format(dpipe.delayed_compute((diff, ))[0]))
Project details
Release history Release notifications | RSS feed
Download files
Download the file for your platform. If you're not sure which to choose, learn more about installing packages.
Source Distribution
apipe-0.1.0.tar.gz
(11.4 kB
view hashes)
Built Distribution
apipe-0.1.0-py3-none-any.whl
(15.5 kB
view hashes)