Composable data transformation pipeline with lazy evaluation.
Project description
philiprehberger-data-pipeline
Composable data transformation pipeline with lazy evaluation.
Installation
pip install philiprehberger-data-pipeline
Usage
from philiprehberger_data_pipeline import Pipeline
data = [
{"name": " Alice ", "email": "alice@example.com", "status": "active", "age": 30},
{"name": "Bob", "email": "bob@example.com", "status": "inactive", "age": 25},
{"name": "Alice", "email": "alice@example.com", "status": "active", "age": 30},
]
result = (
Pipeline(data)
.filter(lambda r: r["status"] == "active")
.map(lambda r: {**r, "name": r["name"].strip()})
.unique_by("email")
.sort_by("name")
.collect()
)
Reusable Pipelines
from philiprehberger_data_pipeline import Pipeline
clean_users = (
Pipeline.define()
.filter(lambda r: r.get("email"))
.map(lambda r: {**r, "email": r["email"].lower()})
.unique_by("email")
)
active = clean_users.run(active_users)
archived = clean_users.run(archived_users)
Tap (Side Effects)
from philiprehberger_data_pipeline import Pipeline
result = (
Pipeline([1, 2, 3])
.tap(lambda x: print(f"Processing: {x}"))
.map(lambda x: x * 2)
.collect()
)
# Prints each item without altering the data
Branch (Parallel Splits)
from philiprehberger_data_pipeline import Pipeline
result = (
Pipeline([1, 2, 3])
.branch(
lambda p: p.map(lambda x: x * 2).collect(),
lambda p: p.filter(lambda x: x > 1).collect(),
)
.collect()
)
# [2, 4, 6, 2, 3]
Retry Wrapper
from philiprehberger_data_pipeline import Pipeline, retry
def fetch_url(url):
# might fail transiently
return requests.get(url).text
result = Pipeline(urls).map(retry(fetch_url, attempts=3, delay=1.0)).collect()
Pipeline Composition
from philiprehberger_data_pipeline import Pipeline
clean = Pipeline.define().filter(lambda x: x > 0).map(lambda x: x * 2)
limit = Pipeline.define().take(3)
combined = clean + limit
combined.run([−1, 5, 0, 3, 7, 2])
# [10, 6, 14]
Dry Run
from philiprehberger_data_pipeline import Pipeline
log = (
Pipeline([1, 2, 3, 4])
.filter(lambda x: x > 2)
.map(lambda x: x * 10)
.dry_run()
)
# [{"step": 0, "name": "filter", "input": [1,2,3,4], "output": [3,4]},
# {"step": 1, "name": "map", "input": [3,4], "output": [30,40]}]
Sliding Window
from philiprehberger_data_pipeline import Pipeline
Pipeline([1, 2, 3, 4, 5]).window(3, 1).collect()
# [[1, 2, 3], [2, 3, 4], [3, 4, 5]]
Aggregations
from philiprehberger_data_pipeline import Pipeline
p = Pipeline(sales_data)
total = p.sum("amount")
average = p.avg("amount")
grouped = p.group_by("category")
Export
from philiprehberger_data_pipeline import Pipeline
Pipeline(data).filter(lambda x: x["active"]).to_csv("output.csv")
Pipeline(data).filter(lambda x: x["active"]).to_json("output.json")
API
| Function / Class | Description |
|---|---|
Pipeline(data) |
Composable, lazy data transformation pipeline |
.filter(fn) |
Keep items where fn returns True |
.map(fn) |
Transform each item |
.flat_map(fn) |
Transform and flatten |
.flatten() |
Flatten one level of nesting |
.sort_by(key) |
Sort by key (string or callable) |
.unique_by(key) |
Remove duplicates by key |
.take(n) |
Take first n items |
.skip(n) |
Skip first n items |
.chunk(size) |
Split into chunks |
.each(fn) |
Execute side effect for each item |
.tap(fn) |
Side effect without altering data, skipped in dry run |
.window(size, step) |
Sliding window grouping |
.deduplicate() |
Remove duplicate items preserving order |
.branch(*fns) |
Split into parallel branches and merge results |
.dry_run(data) |
Log each step's input/output without side effects |
pipeline_a + pipeline_b |
Compose two pipelines into one |
.collect() |
Execute and return list |
.first() |
Return first item |
.count() |
Count items |
.sum(key) |
Sum values |
.avg(key) |
Average values |
.min(key) |
Find minimum value |
.max(key) |
Find maximum value |
.reduce(fn, initial) |
Reduce to single value |
.group_by(key) |
Group into dict |
.to_csv(path) |
Export as CSV |
.to_json(path) |
Export as JSON |
.enumerate(start) |
Pair each item with its index |
.zip_with(other) |
Pair items with another iterable |
.take_while(fn) |
Take items while predicate is True |
.skip_while(fn) |
Skip items while predicate is True |
retry(fn, attempts, delay, on_error) |
Wrap a step function with configurable retry logic |
Development
pip install -e .
python -m pytest tests/ -v
Support
If you find this project useful:
License
Project details
Release history Release notifications | RSS feed
Download files
Download the file for your platform. If you're not sure which to choose, learn more about installing packages.
Source Distribution
Built Distribution
Filter files by name, interpreter, ABI, and platform.
If you're not sure about the file name format, learn more about wheel file names.
Copy a direct link to the current filters
File details
Details for the file philiprehberger_data_pipeline-0.5.0.tar.gz.
File metadata
- Download URL: philiprehberger_data_pipeline-0.5.0.tar.gz
- Upload date:
- Size: 12.4 kB
- Tags: Source
- Uploaded using Trusted Publishing? No
- Uploaded via: twine/6.2.0 CPython/3.12.13
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
5d6f78c86870fc7915cb2d8103f0b72021623cdf4a8a61509b8db407095bfab0
|
|
| MD5 |
88c208212aff94d805a42785c21a45af
|
|
| BLAKE2b-256 |
b6f274f35312035e71db144a85b49efe6a406aa0b858863ba43cec510db1577e
|
File details
Details for the file philiprehberger_data_pipeline-0.5.0-py3-none-any.whl.
File metadata
- Download URL: philiprehberger_data_pipeline-0.5.0-py3-none-any.whl
- Upload date:
- Size: 9.2 kB
- Tags: Python 3
- Uploaded using Trusted Publishing? No
- Uploaded via: twine/6.2.0 CPython/3.12.13
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
43afd5cd0e1eb0e10e54a10540d448e1cab76fface95bc88d06d23f10f02332f
|
|
| MD5 |
d457a50a8e509d4348cacc58e8648c17
|
|
| BLAKE2b-256 |
7c02421363e69a3590e9a6ed5cedcd690b9b953ef0dde5a4fd27e48a3d4a169d
|