Data pipelines with lazy computation and caching
Project description
A-Pipe
A-Pipe allows to create data pipelines with lazy computation and caching.
Features:
- Lazy computation and cache loading
- Pickle and parquet serialization
- Support for hashing of
numpy
arrays andpandas
DataFrames - Support for
dask.Delayed
objects
Installation
pip install apipe
Examples
Simple function caching
import time
import apipe
import numpy as np
from loguru import logger
@apipe.eager_cached()
def load_data(table: str):
time.sleep(1)
arr = np.ones(5)
logger.debug(f"transferred array data from table={table}")
return arr
logger.info("start loading data")
# --- First pass: transfer data and save on disk
data = load_data("weather-ldn")
logger.info(f"finished loading data: {load_data()}")
# --- Second pass: load data from disk
data = load_data("weather-ldn")
logger.info(f"finished loading data: {load_data()}")
Data pipeline with lazy execution and caching
import apipe
import pandas as pd
import numpy as np
from loguru import logger
# --- Define data transformations via step functions (similar to dask.delayed)
@apipe.delayed_cached() # lazy computation + caching on disk
def load_1():
df = pd.DataFrame({"a": [1., 2.], "b": [0.1, np.nan]})
logger.debug("Loaded {} records".format(len(df)))
return df
@apipe.delayed_cached() # lazy computation + caching on disk
def load_2(timestamp):
df = pd.DataFrame({"a": [0.9, 3.], "b": [0.001, 1.]})
logger.debug("Loaded {} records".format(len(df)))
return df
@apipe.delayed_cached() # lazy computation + caching on disk
def compute(x, y, eps):
assert x.shape == y.shape
diff = ((x - y).abs() / (y.abs()+eps)).mean().mean()
logger.debug("Difference is computed")
return diff
# --- Define pipeline dependencies
ts = pd.Timestamp(2019, 1, 1)
eps = 0.01
s1 = load_1()
s2 = load_2(ts)
diff = compute(s1, s2, eps)
# --- Trigger pipeline execution (first pass: compute everything and save on disk)
logger.info("diff: {:.3f}".format(apipe.delayed_compute((diff, ))[0]))
# --- Trigger pipeline execution (second pass: load from disk the end result only)
logger.info("diff: {:.3f}".format(apipe.delayed_compute((diff, ))[0]))
See more examples in a notebook.
Project details
Release history Release notifications | RSS feed
Download files
Download the file for your platform. If you're not sure which to choose, learn more about installing packages.
Source Distribution
apipe-0.1.9.tar.gz
(12.2 kB
view details)
Built Distribution
apipe-0.1.9-py3-none-any.whl
(16.2 kB
view details)
File details
Details for the file apipe-0.1.9.tar.gz
.
File metadata
- Download URL: apipe-0.1.9.tar.gz
- Upload date:
- Size: 12.2 kB
- Tags: Source
- Uploaded using Trusted Publishing? No
- Uploaded via: poetry/1.8.2 CPython/3.9.19 Linux/6.5.0-1018-azure
File hashes
Algorithm | Hash digest | |
---|---|---|
SHA256 | 6fb8916f1d09878801daad17a58019c8ee8e220021085eb55f7393e4e88cf1b3 |
|
MD5 | 360c43718cdc855848904b93f0c77a9e |
|
BLAKE2b-256 | 2bc96a8be4aa4929e817b741ae82062e957f4aa073127f7177f1d7d95658c815 |
File details
Details for the file apipe-0.1.9-py3-none-any.whl
.
File metadata
- Download URL: apipe-0.1.9-py3-none-any.whl
- Upload date:
- Size: 16.2 kB
- Tags: Python 3
- Uploaded using Trusted Publishing? No
- Uploaded via: poetry/1.8.2 CPython/3.9.19 Linux/6.5.0-1018-azure
File hashes
Algorithm | Hash digest | |
---|---|---|
SHA256 | ff84fc3fcbd652b7d97403cbcac8969d59c954b4e3ae0f481c170bb29249f39e |
|
MD5 | 4bf5a008f41ad7e8d8a7d4d5cd9e9475 |
|
BLAKE2b-256 | ece08628d9a07755418ad35e84317b7f5d3bcd50c285ec2ff20b09c602943fb3 |