Skip to main content

Composable data transformation pipeline with lazy evaluation

Project description

philiprehberger-data-pipeline

Tests PyPI version GitHub release Last updated License Bug Reports Feature Requests Sponsor

Composable data transformation pipeline with lazy evaluation.

Installation

pip install philiprehberger-data-pipeline

Usage

Basic Pipeline

from philiprehberger_data_pipeline import Pipeline

data = [
    {"name": " Alice ", "email": "alice@example.com", "status": "active", "age": 30},
    {"name": "Bob", "email": "bob@example.com", "status": "inactive", "age": 25},
    {"name": "Alice", "email": "alice@example.com", "status": "active", "age": 30},
]

result = (
    Pipeline(data)
    .filter(lambda r: r["status"] == "active")
    .map(lambda r: {**r, "name": r["name"].strip()})
    .unique_by("email")
    .sort_by("name")
    .collect()
)

Reusable Pipelines

clean_users = (
    Pipeline.define()
    .filter(lambda r: r.get("email"))
    .map(lambda r: {**r, "email": r["email"].lower()})
    .unique_by("email")
)

active = clean_users.run(active_users)
archived = clean_users.run(archived_users)

Sliding Window

data = [1, 2, 3, 4, 5]

# Window of size 3, step 1
Pipeline(data).window(3, 1).collect()
# [[1, 2, 3], [2, 3, 4], [3, 4, 5]]

# Window of size 3, step 2
Pipeline(data).window(3, 2).collect()
# [[1, 2, 3], [3, 4, 5]]

Deduplication

Pipeline([1, 2, 3, 2, 1, 4]).deduplicate().collect()
# [1, 2, 3, 4]

# Works with unhashable items too
Pipeline([{"a": 1}, {"a": 1}, {"b": 2}]).deduplicate().collect()
# [{"a": 1}, {"b": 2}]

Aggregations

p = Pipeline(sales_data)
total = p.sum("amount")
average = p.avg("amount")
grouped = p.group_by("category")

Export

Pipeline(data).filter(...).to_csv("output.csv")
Pipeline(data).filter(...).to_json("output.json")

API

Function / Class Description
Pipeline(data) Composable, lazy data transformation pipeline with chainable operations and terminal methods
.filter(fn) Keep items where fn returns True
.map(fn) Transform each item
.flat_map(fn) Transform and flatten
.flatten() Flatten one level of nesting
.sort_by(key) Sort by key (string or callable)
.unique_by(key) Remove duplicates by key
.take(n) Take first n items
.skip(n) Skip first n items
.chunk(size) Split into chunks
.each(fn) Execute side effect for each item
.window(size, step) Sliding window grouping
.deduplicate() Remove duplicate items preserving order
.collect() Execute and return list
.first() Return first item
.count() Count items
.sum(key) Sum values
.avg(key) Average values
.min(key) Find minimum value
.max(key) Find maximum value
.reduce(fn, initial) Reduce to single value
.group_by(key) Group into dict
.to_csv(path) Export as CSV
.to_json(path) Export as JSON

Development

pip install -e .
python -m pytest tests/ -v

Support

If you find this package useful, consider giving it a star on GitHub — it helps motivate continued maintenance and development.

LinkedIn More packages

License

MIT

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

philiprehberger_data_pipeline-0.3.0.tar.gz (8.5 kB view details)

Uploaded Source

Built Distribution

If you're not sure about the file name format, learn more about wheel file names.

philiprehberger_data_pipeline-0.3.0-py3-none-any.whl (6.9 kB view details)

Uploaded Python 3

File details

Details for the file philiprehberger_data_pipeline-0.3.0.tar.gz.

File metadata

File hashes

Hashes for philiprehberger_data_pipeline-0.3.0.tar.gz
Algorithm Hash digest
SHA256 fd29b35ec6dd319fc87b37749cb1354a55f727a95a76f84105a992e2340df84c
MD5 6a7e30170051b3f51715a4f5d53f05f2
BLAKE2b-256 0d3b1ffb38d0956983e65013715d53b782107c89eeb11e63e5fa174bf20c95bf

See more details on using hashes here.

File details

Details for the file philiprehberger_data_pipeline-0.3.0-py3-none-any.whl.

File metadata

File hashes

Hashes for philiprehberger_data_pipeline-0.3.0-py3-none-any.whl
Algorithm Hash digest
SHA256 353dd4b86bf1cbe90112eb44ec4272dd39fc06bb93ab39616dbdbd7322abd6a9
MD5 3a5f8b9746b2710f7b8621e8d8e289c6
BLAKE2b-256 c2d971ac836235458e2faccb1a6fc02a131381a4bb7b8e27d9d9b5b7a906e11b

See more details on using hashes here.

Supported by

AWS Cloud computing and Security Sponsor Datadog Monitoring Depot Continuous Integration Fastly CDN Google Download Analytics Pingdom Monitoring Sentry Error logging StatusPage Status page