Skip to main content

Composable data transformation pipeline with lazy evaluation

Project description

philiprehberger-data-pipeline

Composable data transformation pipeline with lazy evaluation.

Install

pip install philiprehberger-data-pipeline

Usage

from philiprehberger_data_pipeline import Pipeline

data = [
    {"name": " Alice ", "email": "alice@example.com", "status": "active", "age": 30},
    {"name": "Bob", "email": "bob@example.com", "status": "inactive", "age": 25},
    {"name": "Alice", "email": "alice@example.com", "status": "active", "age": 30},
]

result = (
    Pipeline(data)
    .filter(lambda r: r["status"] == "active")
    .map(lambda r: {**r, "name": r["name"].strip()})
    .unique_by("email")
    .sort_by("name")
    .collect()
)

Reusable Pipelines

clean_users = (
    Pipeline.define()
    .filter(lambda r: r.get("email"))
    .map(lambda r: {**r, "email": r["email"].lower()})
    .unique_by("email")
)

active = clean_users.run(active_users)
archived = clean_users.run(archived_users)

Aggregations

p = Pipeline(sales_data)
total = p.sum("amount")
average = p.avg("amount")
grouped = p.group_by("category")

Export

Pipeline(data).filter(...).to_csv("output.csv")
Pipeline(data).filter(...).to_json("output.json")

Operations

Transform Description
.filter(fn) Keep items where fn returns True
.map(fn) Transform each item
.flat_map(fn) Transform and flatten
.sort_by(key) Sort by key (string or callable)
.unique_by(key) Remove duplicates by key
.take(n) Take first n items
.skip(n) Skip first n items
.chunk(size) Split into chunks
.flatten() Flatten one level of nesting
Terminal Description
.collect() Execute and return list
.first() Return first item
.count() Count items
.sum(key) Sum values
.avg(key) Average values
.min(key) Find minimum value
.max(key) Find maximum value
.reduce(fn, initial) Reduce to single value
.group_by(key) Group into dict
.to_csv(path) Export as CSV
.to_json(path) Export as JSON

License

MIT

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

philiprehberger_data_pipeline-0.2.2.tar.gz (6.3 kB view details)

Uploaded Source

Built Distribution

If you're not sure about the file name format, learn more about wheel file names.

philiprehberger_data_pipeline-0.2.2-py3-none-any.whl (5.6 kB view details)

Uploaded Python 3

File details

Details for the file philiprehberger_data_pipeline-0.2.2.tar.gz.

File metadata

File hashes

Hashes for philiprehberger_data_pipeline-0.2.2.tar.gz
Algorithm Hash digest
SHA256 5d93135d1a7c074a66ddfdcb67633393c8797d0e42c4af84a3d32fdc74c71533
MD5 afd057a52b781e49730d9f1c45a0aa25
BLAKE2b-256 f55ae98b9bb591e1b4a703e226409782583236f906285a8dc27a0885bbef1e3e

See more details on using hashes here.

File details

Details for the file philiprehberger_data_pipeline-0.2.2-py3-none-any.whl.

File metadata

File hashes

Hashes for philiprehberger_data_pipeline-0.2.2-py3-none-any.whl
Algorithm Hash digest
SHA256 836310ea50bd76ec52890219c1d1fd319ea84dbadff49a0874b75c0f06c66d2a
MD5 f82f3c385575d25668b38876d214f47e
BLAKE2b-256 117ad20784615ec6aa22a334f9c290c35dd36fc6e15340df0da6559f7ae6b193

See more details on using hashes here.

Supported by

AWS Cloud computing and Security Sponsor Datadog Monitoring Depot Continuous Integration Fastly CDN Google Download Analytics Pingdom Monitoring Sentry Error logging StatusPage Status page