Skip to main content

Data pipelines with lazy computation and caching

Project description

A-Pipe

A-Pipe allows to create data pipelines with lazy computation and caching.

Features:

  • Lazy computation and cache loading
  • Pickle and parquet serialization
  • Support for hashing of numpy arrays and pandas DataFrames
  • Support for dask.Delayed objects

Installation

pip install apipe

Examples

Simple function caching

import time
import apipe
import numpy as np
from loguru import logger

@apipe.eager_cached()
def load_data(table: str):
    time.sleep(1)
    arr = np.ones(5)
    logger.debug(f"transferred array data from table={table}")
    return arr

logger.info("start loading data")

# --- First pass: transfer data and save on disk
data = load_data("weather-ldn")
logger.info(f"finished loading data: {load_data()}")

# --- Second pass: load data from disk
data = load_data("weather-ldn")
logger.info(f"finished loading data: {load_data()}")

Data pipeline with lazy execution and caching

import apipe
import pandas as pd
import numpy as np
from loguru import logger

# --- Define data transformations via step functions (similar to dask.delayed)

@apipe.delayed_cached()  # lazy computation + caching on disk
def load_1():
    df = pd.DataFrame({"a": [1., 2.], "b": [0.1, np.nan]})
    logger.debug("Loaded {} records".format(len(df)))
    return df

@apipe.delayed_cached()  # lazy computation + caching on disk
def load_2(timestamp):
    df = pd.DataFrame({"a": [0.9, 3.], "b": [0.001, 1.]})
    logger.debug("Loaded {} records".format(len(df)))
    return df

@apipe.delayed_cached()  # lazy computation + caching on disk
def compute(x, y, eps):
    assert x.shape == y.shape
    diff = ((x - y).abs() / (y.abs()+eps)).mean().mean()
    logger.debug("Difference is computed")
    return diff

# --- Define pipeline dependencies
ts = pd.Timestamp(2019, 1, 1)
eps = 0.01
s1 = load_1()
s2 = load_2(ts)
diff = compute(s1, s2, eps)

# --- Trigger pipeline execution (first pass: compute everything and save on disk)
logger.info("diff: {:.3f}".format(apipe.delayed_compute((diff, ))[0]))

# --- Trigger pipeline execution (second pass: load from disk the end result only)
logger.info("diff: {:.3f}".format(apipe.delayed_compute((diff, ))[0]))

See more examples in a notebook.

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

apipe-0.1.9.tar.gz (12.2 kB view details)

Uploaded Source

Built Distribution

apipe-0.1.9-py3-none-any.whl (16.2 kB view details)

Uploaded Python 3

File details

Details for the file apipe-0.1.9.tar.gz.

File metadata

  • Download URL: apipe-0.1.9.tar.gz
  • Upload date:
  • Size: 12.2 kB
  • Tags: Source
  • Uploaded using Trusted Publishing? No
  • Uploaded via: poetry/1.8.2 CPython/3.9.19 Linux/6.5.0-1018-azure

File hashes

Hashes for apipe-0.1.9.tar.gz
Algorithm Hash digest
SHA256 6fb8916f1d09878801daad17a58019c8ee8e220021085eb55f7393e4e88cf1b3
MD5 360c43718cdc855848904b93f0c77a9e
BLAKE2b-256 2bc96a8be4aa4929e817b741ae82062e957f4aa073127f7177f1d7d95658c815

See more details on using hashes here.

File details

Details for the file apipe-0.1.9-py3-none-any.whl.

File metadata

  • Download URL: apipe-0.1.9-py3-none-any.whl
  • Upload date:
  • Size: 16.2 kB
  • Tags: Python 3
  • Uploaded using Trusted Publishing? No
  • Uploaded via: poetry/1.8.2 CPython/3.9.19 Linux/6.5.0-1018-azure

File hashes

Hashes for apipe-0.1.9-py3-none-any.whl
Algorithm Hash digest
SHA256 ff84fc3fcbd652b7d97403cbcac8969d59c954b4e3ae0f481c170bb29249f39e
MD5 4bf5a008f41ad7e8d8a7d4d5cd9e9475
BLAKE2b-256 ece08628d9a07755418ad35e84317b7f5d3bcd50c285ec2ff20b09c602943fb3

See more details on using hashes here.

Supported by

AWS AWS Cloud computing and Security Sponsor Datadog Datadog Monitoring Fastly Fastly CDN Google Google Download Analytics Microsoft Microsoft PSF Sponsor Pingdom Pingdom Monitoring Sentry Sentry Error logging StatusPage StatusPage Status page