Skip to main content

Lightweight Data Pipeline for with code-based stage cacheing

Project description

Lightweight Data Pipeline (LWDP)

LWDP attempts to fill the niche for structuring pure-Python data transformations, with robust data- and code-based-cacheing across a few locales.

Because sometimes Spark or Dask or AWS Glue or anything other than a 5kb library and some dumbly hashed files is just too much.

LWDP is meant for the case where you're doing a few data transformations, possibly across multiple input file types ( csvs, Excel, parquet, etc.). Each of these files can generally (although not strictly) be held in memory. 25 csvs with structured transformations that you'd like to keep organized and possibly streamline with cacheing?

LWDP could be the answer.

If the data changes or your code changes, you want to be able to refresh the data pipeline once - and, ideally, only those parts of the data pipeline who need to be refreshed.

Installation

You should be able to install from PyPi with pip install lwdp

Usage

Decorate functions to represent a stage; and chain together functions to make a pipeline.

# read from a raw file and cache
from lwdp import stage
import pandas as pd


@stage(some_raw_file="raw/input.csv", cache=True)
def stg_read_format_raw(**kwargs) -> pd.DataFrame:
    pdf = pd.read_csv(kwargs.get('some_raw'))
    # some stuff to clean it
    return pdf


# read from a previous stage and cache
@stage(basic_raw=stg_read_format_raw, cache=True, cache_format='parquet')
def stg_format_more(**kwargs) -> pd.DataFrame:
    raw = kwargs.get('basic_raw')
    raw['new_analysis_column'] = 3
    return raw


# read from a previous stage without cacheing
@stage(formatted_src=stg_format_more)
def stg_final_process(**kwargs) -> pd.DataFrame:
    result = kwargs.get("formatted_src")
    result['wizard'] = 5
    return result


stg_final_process()

Just call the last stage in the pipeline (as you would any other function) to run all ancestors, reading/writing from cached stages as needed.

How it works

Each stage has a hash computed based on its code (excluding white space and docstrings), its "raw" ancestors, and its stage ancestors. Hash computation for a stage is recursive; and, if any stages change their code, all child stages will have new hashes.

Stages can optionally be cached, and, if so, a format supported by pandas (to_<format> and read_<format> can be specified). If a stage is cached and there exists a file with the specified hash, we read the file instead of recomputing the stage.

Ideally we could do this using distributed persistent storage (e.g. on S3), which is what I'd like to work on next. Then teams who are working on a data pipeline can read from a common source of "raw" files (and cached computations!).

TODO

  • Deleting cached files after some TTL
  • Using S3
  • Hashing the actual data in raw files and using that as part of "raw" data hash (instead of just the filename)

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

lwdp-0.0.5.tar.gz (5.7 kB view details)

Uploaded Source

Built Distribution

lwdp-0.0.5-py3-none-any.whl (6.6 kB view details)

Uploaded Python 3

File details

Details for the file lwdp-0.0.5.tar.gz.

File metadata

  • Download URL: lwdp-0.0.5.tar.gz
  • Upload date:
  • Size: 5.7 kB
  • Tags: Source
  • Uploaded using Trusted Publishing? No
  • Uploaded via: twine/4.0.1 CPython/3.9.13

File hashes

Hashes for lwdp-0.0.5.tar.gz
Algorithm Hash digest
SHA256 0bef008b0eaa071f9ea829e1cc7cb3c2dc1e5a64ee6bf4bd269f892b7ab579d0
MD5 d072478df7df7ae3346ccdb7668fd9d4
BLAKE2b-256 916386661d2155238b9cf8d08da0f1423627aa0f10781137645b88798f88d2cc

See more details on using hashes here.

File details

Details for the file lwdp-0.0.5-py3-none-any.whl.

File metadata

  • Download URL: lwdp-0.0.5-py3-none-any.whl
  • Upload date:
  • Size: 6.6 kB
  • Tags: Python 3
  • Uploaded using Trusted Publishing? No
  • Uploaded via: twine/4.0.1 CPython/3.9.13

File hashes

Hashes for lwdp-0.0.5-py3-none-any.whl
Algorithm Hash digest
SHA256 058d016620843fb4bab8bc403b46e59a063b121e78498dacc385bf7f962b56bb
MD5 801c828da7d20a041847bdf31992ca5a
BLAKE2b-256 f9b2f38dce2faaf0cf7ae7ab08422675827aba34207a985245f465e0b73fff20

See more details on using hashes here.

Supported by

AWS AWS Cloud computing and Security Sponsor Datadog Datadog Monitoring Fastly Fastly CDN Google Google Download Analytics Microsoft Microsoft PSF Sponsor Pingdom Pingdom Monitoring Sentry Sentry Error logging StatusPage StatusPage Status page