Skip to main content

Composable data transformation pipeline with lazy evaluation

Project description

philiprehberger-data-pipeline

Composable data transformation pipeline with lazy evaluation.

Install

pip install philiprehberger-data-pipeline

Usage

from philiprehberger_data_pipeline import Pipeline

data = [
    {"name": " Alice ", "email": "alice@example.com", "status": "active", "age": 30},
    {"name": "Bob", "email": "bob@example.com", "status": "inactive", "age": 25},
    {"name": "Alice", "email": "alice@example.com", "status": "active", "age": 30},
]

result = (
    Pipeline(data)
    .filter(lambda r: r["status"] == "active")
    .map(lambda r: {**r, "name": r["name"].strip()})
    .unique_by("email")
    .sort_by("name")
    .collect()
)

Reusable Pipelines

clean_users = (
    Pipeline.define()
    .filter(lambda r: r.get("email"))
    .map(lambda r: {**r, "email": r["email"].lower()})
    .unique_by("email")
)

active = clean_users.run(active_users)
archived = clean_users.run(archived_users)

Aggregations

p = Pipeline(sales_data)
total = p.sum("amount")
average = p.avg("amount")
grouped = p.group_by("category")

Export

Pipeline(data).filter(...).to_csv("output.csv")
Pipeline(data).filter(...).to_json("output.json")

Operations

Transform Description
.filter(fn) Keep items where fn returns True
.map(fn) Transform each item
.flat_map(fn) Transform and flatten
.sort_by(key) Sort by key (string or callable)
.unique_by(key) Remove duplicates by key
.take(n) Take first n items
.skip(n) Skip first n items
.chunk(size) Split into chunks
.flatten() Flatten one level of nesting
Terminal Description
.collect() Execute and return list
.first() Return first item
.count() Count items
.sum(key) Sum values
.avg(key) Average values
.group_by(key) Group into dict
.to_csv(path) Export as CSV
.to_json(path) Export as JSON

License

MIT

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

philiprehberger_data_pipeline-0.1.0.tar.gz (4.7 kB view details)

Uploaded Source

Built Distribution

If you're not sure about the file name format, learn more about wheel file names.

philiprehberger_data_pipeline-0.1.0-py3-none-any.whl (5.4 kB view details)

Uploaded Python 3

File details

Details for the file philiprehberger_data_pipeline-0.1.0.tar.gz.

File metadata

File hashes

Hashes for philiprehberger_data_pipeline-0.1.0.tar.gz
Algorithm Hash digest
SHA256 ec7e389d8244326f75009a630d0c71feb4fe1b82248476c7d18123f138037dc0
MD5 d46d894bafc18c03364d51dde7d95f5f
BLAKE2b-256 f9f220fd381a3360ea44b96f1c2a7af62c93e314858f34ce0c6259df9dadba66

See more details on using hashes here.

File details

Details for the file philiprehberger_data_pipeline-0.1.0-py3-none-any.whl.

File metadata

File hashes

Hashes for philiprehberger_data_pipeline-0.1.0-py3-none-any.whl
Algorithm Hash digest
SHA256 993cf33f130ffe04f2c1c2f9257a87b1e6850b9a097c7e6cb12a3f6e51073f60
MD5 e4e58fa329bd9910f87f36e94393058f
BLAKE2b-256 4189efb0083a99152d05c4cb374e50e526e9cfccb9338a4366421338f9be994a

See more details on using hashes here.

Supported by

AWS Cloud computing and Security Sponsor Datadog Monitoring Depot Continuous Integration Fastly CDN Google Download Analytics Pingdom Monitoring Sentry Error logging StatusPage Status page