Skip to main content

Composable data transformation pipeline with lazy evaluation

Project description

philiprehberger-data-pipeline

Tests PyPI version License

Composable data transformation pipeline with lazy evaluation.

Install

pip install philiprehberger-data-pipeline

Usage

from philiprehberger_data_pipeline import Pipeline

data = [
    {"name": " Alice ", "email": "alice@example.com", "status": "active", "age": 30},
    {"name": "Bob", "email": "bob@example.com", "status": "inactive", "age": 25},
    {"name": "Alice", "email": "alice@example.com", "status": "active", "age": 30},
]

result = (
    Pipeline(data)
    .filter(lambda r: r["status"] == "active")
    .map(lambda r: {**r, "name": r["name"].strip()})
    .unique_by("email")
    .sort_by("name")
    .collect()
)

Reusable Pipelines

clean_users = (
    Pipeline.define()
    .filter(lambda r: r.get("email"))
    .map(lambda r: {**r, "email": r["email"].lower()})
    .unique_by("email")
)

active = clean_users.run(active_users)
archived = clean_users.run(archived_users)

Aggregations

p = Pipeline(sales_data)
total = p.sum("amount")
average = p.avg("amount")
grouped = p.group_by("category")

Export

Pipeline(data).filter(...).to_csv("output.csv")
Pipeline(data).filter(...).to_json("output.json")

Operations

Transform Description
.filter(fn) Keep items where fn returns True
.map(fn) Transform each item
.flat_map(fn) Transform and flatten
.sort_by(key) Sort by key (string or callable)
.unique_by(key) Remove duplicates by key
.take(n) Take first n items
.skip(n) Skip first n items
.chunk(size) Split into chunks
.flatten() Flatten one level of nesting
Terminal Description
.collect() Execute and return list
.first() Return first item
.count() Count items
.sum(key) Sum values
.avg(key) Average values
.min(key) Find minimum value
.max(key) Find maximum value
.reduce(fn, initial) Reduce to single value
.group_by(key) Group into dict
.to_csv(path) Export as CSV
.to_json(path) Export as JSON

License

MIT

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

philiprehberger_data_pipeline-0.2.1.tar.gz (6.4 kB view details)

Uploaded Source

Built Distribution

If you're not sure about the file name format, learn more about wheel file names.

philiprehberger_data_pipeline-0.2.1-py3-none-any.whl (5.7 kB view details)

Uploaded Python 3

File details

Details for the file philiprehberger_data_pipeline-0.2.1.tar.gz.

File metadata

File hashes

Hashes for philiprehberger_data_pipeline-0.2.1.tar.gz
Algorithm Hash digest
SHA256 6439cbf87f46a539dccedaea97148e405b1f65f814136ce2e77ab439cad8a916
MD5 22b2d6153e4088923ad651b9ea5afe79
BLAKE2b-256 a13dc40cb510203fd07f0f1d25634eac37d7ffd51a68041c4ba8eec2b236746c

See more details on using hashes here.

File details

Details for the file philiprehberger_data_pipeline-0.2.1-py3-none-any.whl.

File metadata

File hashes

Hashes for philiprehberger_data_pipeline-0.2.1-py3-none-any.whl
Algorithm Hash digest
SHA256 55f21b9dd2c994f5547c3e44d14cfb7f5414259a2ed6493eafa8444b11a9e85e
MD5 6db0f5902cebee662efda6502db5ee5e
BLAKE2b-256 01206575cee4e6be6b1c4a2f6c0839400e9d4d9b9858776d5144100b6f6bd044

See more details on using hashes here.

Supported by

AWS Cloud computing and Security Sponsor Datadog Monitoring Depot Continuous Integration Fastly CDN Google Download Analytics Pingdom Monitoring Sentry Error logging StatusPage Status page