Skip to main content

Composable data transformation pipeline with lazy evaluation

Project description

philiprehberger-data-pipeline

Tests PyPI version Last updated

Composable data transformation pipeline with lazy evaluation.

Installation

pip install philiprehberger-data-pipeline

Usage

Basic Pipeline

from philiprehberger_data_pipeline import Pipeline

data = [
    {"name": " Alice ", "email": "alice@example.com", "status": "active", "age": 30},
    {"name": "Bob", "email": "bob@example.com", "status": "inactive", "age": 25},
    {"name": "Alice", "email": "alice@example.com", "status": "active", "age": 30},
]

result = (
    Pipeline(data)
    .filter(lambda r: r["status"] == "active")
    .map(lambda r: {**r, "name": r["name"].strip()})
    .unique_by("email")
    .sort_by("name")
    .collect()
)

Reusable Pipelines

clean_users = (
    Pipeline.define()
    .filter(lambda r: r.get("email"))
    .map(lambda r: {**r, "email": r["email"].lower()})
    .unique_by("email")
)

active = clean_users.run(active_users)
archived = clean_users.run(archived_users)

Sliding Window

data = [1, 2, 3, 4, 5]

# Window of size 3, step 1
Pipeline(data).window(3, 1).collect()
# [[1, 2, 3], [2, 3, 4], [3, 4, 5]]

# Window of size 3, step 2
Pipeline(data).window(3, 2).collect()
# [[1, 2, 3], [3, 4, 5]]

Deduplication

Pipeline([1, 2, 3, 2, 1, 4]).deduplicate().collect()
# [1, 2, 3, 4]

# Works with unhashable items too
Pipeline([{"a": 1}, {"a": 1}, {"b": 2}]).deduplicate().collect()
# [{"a": 1}, {"b": 2}]

Aggregations

p = Pipeline(sales_data)
total = p.sum("amount")
average = p.avg("amount")
grouped = p.group_by("category")

Export

Pipeline(data).filter(...).to_csv("output.csv")
Pipeline(data).filter(...).to_json("output.json")

API

Function / Class Description
Pipeline(data) Composable, lazy data transformation pipeline with chainable operations and terminal methods
.filter(fn) Keep items where fn returns True
.map(fn) Transform each item
.flat_map(fn) Transform and flatten
.flatten() Flatten one level of nesting
.sort_by(key) Sort by key (string or callable)
.unique_by(key) Remove duplicates by key
.take(n) Take first n items
.skip(n) Skip first n items
.chunk(size) Split into chunks
.each(fn) Execute side effect for each item
.window(size, step) Sliding window grouping
.deduplicate() Remove duplicate items preserving order
.collect() Execute and return list
.first() Return first item
.count() Count items
.sum(key) Sum values
.avg(key) Average values
.min(key) Find minimum value
.max(key) Find maximum value
.reduce(fn, initial) Reduce to single value
.group_by(key) Group into dict
.to_csv(path) Export as CSV
.to_json(path) Export as JSON

Development

pip install -e .
python -m pytest tests/ -v

Support

If you find this project useful:

Star the repo

🐛 Report issues

💡 Suggest features

❤️ Sponsor development

🌐 All Open Source Projects

💻 GitHub Profile

🔗 LinkedIn Profile

License

MIT

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

philiprehberger_data_pipeline-0.3.1.tar.gz (8.5 kB view details)

Uploaded Source

Built Distribution

If you're not sure about the file name format, learn more about wheel file names.

File details

Details for the file philiprehberger_data_pipeline-0.3.1.tar.gz.

File metadata

File hashes

Hashes for philiprehberger_data_pipeline-0.3.1.tar.gz
Algorithm Hash digest
SHA256 0aeb4dfe99af4c700f092f3956993d6d630bca7d14ac02d41070bea0f56946b8
MD5 5b0fe717dcc5af621c329dfa7fe9062c
BLAKE2b-256 639a041f9e648835d1758523c2e48158c9f1f10662dc2063dfaf026750f617e7

See more details on using hashes here.

File details

Details for the file philiprehberger_data_pipeline-0.3.1-py3-none-any.whl.

File metadata

File hashes

Hashes for philiprehberger_data_pipeline-0.3.1-py3-none-any.whl
Algorithm Hash digest
SHA256 4c624284ec48891d8d7c0a7732041e9ddc79ccc00fb4e7ea8a00d437790b9af7
MD5 3da7c85d67c0c2931e03f003964cb413
BLAKE2b-256 d50572ce60e83bdf09a1fad5bb3ef90d547512337e26783e40853d39bd7b60b7

See more details on using hashes here.

Supported by

AWS Cloud computing and Security Sponsor Datadog Monitoring Depot Continuous Integration Fastly CDN Google Download Analytics Pingdom Monitoring Sentry Error logging StatusPage Status page