Skip to main content

Composable data transformation pipeline with lazy evaluation

Project description

philiprehberger-data-pipeline

Composable data transformation pipeline with lazy evaluation.

Install

pip install philiprehberger-data-pipeline

Usage

from philiprehberger_data_pipeline import Pipeline

data = [
    {"name": " Alice ", "email": "alice@example.com", "status": "active", "age": 30},
    {"name": "Bob", "email": "bob@example.com", "status": "inactive", "age": 25},
    {"name": "Alice", "email": "alice@example.com", "status": "active", "age": 30},
]

result = (
    Pipeline(data)
    .filter(lambda r: r["status"] == "active")
    .map(lambda r: {**r, "name": r["name"].strip()})
    .unique_by("email")
    .sort_by("name")
    .collect()
)

Reusable Pipelines

clean_users = (
    Pipeline.define()
    .filter(lambda r: r.get("email"))
    .map(lambda r: {**r, "email": r["email"].lower()})
    .unique_by("email")
)

active = clean_users.run(active_users)
archived = clean_users.run(archived_users)

Aggregations

p = Pipeline(sales_data)
total = p.sum("amount")
average = p.avg("amount")
grouped = p.group_by("category")

Export

Pipeline(data).filter(...).to_csv("output.csv")
Pipeline(data).filter(...).to_json("output.json")

Operations

Transform Description
.filter(fn) Keep items where fn returns True
.map(fn) Transform each item
.flat_map(fn) Transform and flatten
.sort_by(key) Sort by key (string or callable)
.unique_by(key) Remove duplicates by key
.take(n) Take first n items
.skip(n) Skip first n items
.chunk(size) Split into chunks
.flatten() Flatten one level of nesting
Terminal Description
.collect() Execute and return list
.first() Return first item
.count() Count items
.sum(key) Sum values
.avg(key) Average values
.group_by(key) Group into dict
.to_csv(path) Export as CSV
.to_json(path) Export as JSON

License

MIT

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

philiprehberger_data_pipeline-0.1.1.tar.gz (4.8 kB view details)

Uploaded Source

Built Distribution

If you're not sure about the file name format, learn more about wheel file names.

philiprehberger_data_pipeline-0.1.1-py3-none-any.whl (5.5 kB view details)

Uploaded Python 3

File details

Details for the file philiprehberger_data_pipeline-0.1.1.tar.gz.

File metadata

File hashes

Hashes for philiprehberger_data_pipeline-0.1.1.tar.gz
Algorithm Hash digest
SHA256 8a720bc3dd0fe0f1877949ad5eb5a9de9f4a2ab3d447e5281168165484073b3b
MD5 ebfc2692e228dd587aedec48f792ee01
BLAKE2b-256 28224259c83c01c233f83b3dea809c59ec5ee68a270c5c9151672ae6dbf097e5

See more details on using hashes here.

File details

Details for the file philiprehberger_data_pipeline-0.1.1-py3-none-any.whl.

File metadata

File hashes

Hashes for philiprehberger_data_pipeline-0.1.1-py3-none-any.whl
Algorithm Hash digest
SHA256 c3cf430925a9a0b5d435b8796b1e223044cf633ab95a6c9170648a5846d464b2
MD5 7dcfc8e679560e67124682cac9b48976
BLAKE2b-256 52356e7d75c20756d9aead1999b12979274b3d6c66ea3ec82ae54348c6b62a2b

See more details on using hashes here.

Supported by

AWS Cloud computing and Security Sponsor Datadog Monitoring Depot Continuous Integration Fastly CDN Google Download Analytics Pingdom Monitoring Sentry Error logging StatusPage Status page