Skip to main content

Composable data transformation pipeline with lazy evaluation

Project description

philiprehberger-data-pipeline

Composable data transformation pipeline with lazy evaluation.

Install

pip install philiprehberger-data-pipeline

Usage

from philiprehberger_data_pipeline import Pipeline

data = [
    {"name": " Alice ", "email": "alice@example.com", "status": "active", "age": 30},
    {"name": "Bob", "email": "bob@example.com", "status": "inactive", "age": 25},
    {"name": "Alice", "email": "alice@example.com", "status": "active", "age": 30},
]

result = (
    Pipeline(data)
    .filter(lambda r: r["status"] == "active")
    .map(lambda r: {**r, "name": r["name"].strip()})
    .unique_by("email")
    .sort_by("name")
    .collect()
)

Reusable Pipelines

clean_users = (
    Pipeline.define()
    .filter(lambda r: r.get("email"))
    .map(lambda r: {**r, "email": r["email"].lower()})
    .unique_by("email")
)

active = clean_users.run(active_users)
archived = clean_users.run(archived_users)

Aggregations

p = Pipeline(sales_data)
total = p.sum("amount")
average = p.avg("amount")
grouped = p.group_by("category")

Export

Pipeline(data).filter(...).to_csv("output.csv")
Pipeline(data).filter(...).to_json("output.json")

Operations

Transform Description
.filter(fn) Keep items where fn returns True
.map(fn) Transform each item
.flat_map(fn) Transform and flatten
.sort_by(key) Sort by key (string or callable)
.unique_by(key) Remove duplicates by key
.take(n) Take first n items
.skip(n) Skip first n items
.chunk(size) Split into chunks
.flatten() Flatten one level of nesting
Terminal Description
.collect() Execute and return list
.first() Return first item
.count() Count items
.sum(key) Sum values
.avg(key) Average values
.min(key) Find minimum value
.max(key) Find maximum value
.reduce(fn, initial) Reduce to single value
.group_by(key) Group into dict
.to_csv(path) Export as CSV
.to_json(path) Export as JSON

License

MIT

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

philiprehberger_data_pipeline-0.2.0.tar.gz (6.2 kB view details)

Uploaded Source

Built Distribution

If you're not sure about the file name format, learn more about wheel file names.

philiprehberger_data_pipeline-0.2.0-py3-none-any.whl (5.6 kB view details)

Uploaded Python 3

File details

Details for the file philiprehberger_data_pipeline-0.2.0.tar.gz.

File metadata

File hashes

Hashes for philiprehberger_data_pipeline-0.2.0.tar.gz
Algorithm Hash digest
SHA256 39fda636f936b2c5b68da4cc4a91b2d8d3c0ae7f71a2e89abbbdffd020be8e4a
MD5 e034ca8e07e60bfa975cca9103c5a72e
BLAKE2b-256 87631cef2ffa73fd604501b11f938bc2c633fea6b7cb5c250058936fd516741f

See more details on using hashes here.

File details

Details for the file philiprehberger_data_pipeline-0.2.0-py3-none-any.whl.

File metadata

File hashes

Hashes for philiprehberger_data_pipeline-0.2.0-py3-none-any.whl
Algorithm Hash digest
SHA256 7590f8918e525a157879f5e37aa8573b544d0948c177afcb26d6711f08535946
MD5 39dea870aab425559c8ba4d8570ff1fb
BLAKE2b-256 671611476a898800c6c1e7239944fcdafd10d9d9ce48debbc3bd17aba832a424

See more details on using hashes here.

Supported by

AWS Cloud computing and Security Sponsor Datadog Monitoring Depot Continuous Integration Fastly CDN Google Download Analytics Pingdom Monitoring Sentry Error logging StatusPage Status page