Readable, lazy data-transformation pipelines for Python — no more nested function calls. pip install streamchain
Project description
pipeflow
Readable, lazy data-transformation pipelines for Python — no more nested function calls.
# Before pipeflow
result = list(filter(lambda x: x > 5, map(lambda x: x ** 2, range(10))))
# With pipeflow
result = (
Stream(range(10))
.map(lambda x: x ** 2)
.filter(lambda x: x > 5)
.to(list)
)
Installation
pip install pipeflow
Quick Start
from pipeflow import Stream, pipe, to
# Method chaining style
result = (
Stream([1, 2, 3, 4, 5, 6, 7, 8, 9, 10])
.filter(lambda x: x % 2 == 0)
.map(lambda x: x ** 2)
.take(3)
.to(list)
)
# [4, 16, 36]
# Pipe operator style (works on plain lists too)
result = (
range(10)
| pipe(filter, lambda x: x % 2 == 0)
| pipe(map, lambda x: x ** 2)
| to(list)
)
# [0, 4, 16, 36, 64]
Why pipeflow?
Python lacks a native pipe operator (|>). The workaround — nesting function calls — is hard to read:
# Hard to follow — you read inside-out
list(filter(pred, map(func, filter(other_pred, data))))
# pipeflow — you read top-to-bottom, left-to-right
Stream(data).filter(other_pred).map(func).filter(pred).to(list)
Zero dependencies. pipeflow uses only the Python standard library.
Fully lazy. Elements are computed on demand — no intermediate lists.
Fully typed. Every method ships with precise type annotations.
API Reference
Stream(iterable)
Wrap any iterable in a lazy pipeline.
Transformations
| Method | Description |
|---|---|
.map(func) |
Apply func to every element |
.filter(func) |
Keep elements where func is True |
.flatmap(func) |
Map then flatten one level |
.flatten() |
Flatten one level of nesting |
.take(n) |
Keep the first n elements |
.skip(n) |
Drop the first n elements |
.take_while(func) |
Take while func is True |
.drop_while(func) |
Drop while func is True |
.enumerate(start=0) |
Pair elements with their index |
.zip(*others) |
Zip with other iterables |
.chain(*others) |
Append other iterables |
.batch(n) |
Yield lists of n elements |
.unique(key=None) |
Deduplicate (order-preserving) |
.sort(key=None, reverse=False) |
Sort (forces evaluation) |
.reverse() |
Reverse (forces evaluation) |
.tap(func) |
Side-effects without changing elements |
.starmap(func) |
Map with tuple unpacking |
.group_by(key) |
Group consecutive elements |
Terminals
| Method | Description |
|---|---|
.to(collector) |
Collect into list, set, dict, tuple… |
.reduce(func, initial=None) |
Fold to a single value |
.fold(func, initial) |
Fold with explicit initial value |
.count() |
Count elements |
.first(default=None) |
First element |
.last(default=None) |
Last element |
.any(func=None) |
Any element satisfies func |
.all(func=None) |
All elements satisfy func |
.sum() |
Sum of elements |
.min(key=None) |
Minimum element |
.max(key=None) |
Maximum element |
.for_each(func) |
Call func for side-effects |
pipe(func, *args) and to(collector)
Use the | operator on plain lists or generators:
from pipeflow import pipe, to
result = [1, 2, 3, 4, 5] | pipe(filter, lambda x: x % 2 == 0) | to(list)
# [2, 4]
Real-world Examples
ETL Pipeline
users = [
{"name": "Alice", "age": 30, "active": True},
{"name": "Bob", "age": 17, "active": True},
{"name": "Carol", "age": 25, "active": False},
]
result = (
Stream(users)
.filter(lambda u: u["active"])
.filter(lambda u: u["age"] >= 18)
.map(lambda u: u["name"].upper())
.sort()
.to(list)
)
# ["ALICE"]
Word Count
text = "hello world hello python world hello"
counts = (
Stream(text.split())
.group_by(lambda w: w)
.map(lambda kv: (kv[0], len(kv[1])))
.sort(key=lambda kv: kv[1], reverse=True)
.to(dict)
)
# {"hello": 3, "world": 2, "python": 1}
Debugging mid-pipeline
result = (
Stream(range(20))
.filter(lambda x: x % 3 == 0)
.tap(lambda x: print(f"after filter: {x}"))
.map(lambda x: x * 10)
.to(list)
)
Batch processing
Stream(records).batch(100).for_each(bulk_insert)
License
MIT
Project details
Release history Release notifications | RSS feed
Download files
Download the file for your platform. If you're not sure which to choose, learn more about installing packages.
Source Distribution
Built Distribution
Filter files by name, interpreter, ABI, and platform.
If you're not sure about the file name format, learn more about wheel file names.
Copy a direct link to the current filters
File details
Details for the file streamchain-0.1.0.tar.gz.
File metadata
- Download URL: streamchain-0.1.0.tar.gz
- Upload date:
- Size: 9.0 kB
- Tags: Source
- Uploaded using Trusted Publishing? Yes
- Uploaded via: twine/6.1.0 CPython/3.13.12
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
2d68ff76037db31d761c585a1f7f04a57a73ab591c755638170e5dd72b5404c5
|
|
| MD5 |
7f2697447c4f23c96a3c82f75927c651
|
|
| BLAKE2b-256 |
bd987865143fe0ffb90b608919a55c92a8e3191373c7835f2081a53f53a91c4f
|
Provenance
The following attestation bundles were made for streamchain-0.1.0.tar.gz:
Publisher:
publish.yml on Sheheryar-byte/pipeflow
-
Statement:
-
Statement type:
https://in-toto.io/Statement/v1 -
Predicate type:
https://docs.pypi.org/attestations/publish/v1 -
Subject name:
streamchain-0.1.0.tar.gz -
Subject digest:
2d68ff76037db31d761c585a1f7f04a57a73ab591c755638170e5dd72b5404c5 - Sigstore transparency entry: 1623478502
- Sigstore integration time:
-
Permalink:
Sheheryar-byte/pipeflow@68b4980f6ad8427f0564be5ccd00c84b6643abad -
Branch / Tag:
refs/tags/v0.1.0 - Owner: https://github.com/Sheheryar-byte
-
Access:
public
-
Token Issuer:
https://token.actions.githubusercontent.com -
Runner Environment:
github-hosted -
Publication workflow:
publish.yml@68b4980f6ad8427f0564be5ccd00c84b6643abad -
Trigger Event:
release
-
Statement type:
File details
Details for the file streamchain-0.1.0-py3-none-any.whl.
File metadata
- Download URL: streamchain-0.1.0-py3-none-any.whl
- Upload date:
- Size: 8.5 kB
- Tags: Python 3
- Uploaded using Trusted Publishing? Yes
- Uploaded via: twine/6.1.0 CPython/3.13.12
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
06fb585675f730ff75a4c94c10874eb7c4268a39ba59df1f6dad6cf30281a358
|
|
| MD5 |
8569650926532e93f8eed2294957b87e
|
|
| BLAKE2b-256 |
b631dcae4e26a5cd1812c0c9a9640ec437670709d63f167293a4747d5aa13268
|
Provenance
The following attestation bundles were made for streamchain-0.1.0-py3-none-any.whl:
Publisher:
publish.yml on Sheheryar-byte/pipeflow
-
Statement:
-
Statement type:
https://in-toto.io/Statement/v1 -
Predicate type:
https://docs.pypi.org/attestations/publish/v1 -
Subject name:
streamchain-0.1.0-py3-none-any.whl -
Subject digest:
06fb585675f730ff75a4c94c10874eb7c4268a39ba59df1f6dad6cf30281a358 - Sigstore transparency entry: 1623478536
- Sigstore integration time:
-
Permalink:
Sheheryar-byte/pipeflow@68b4980f6ad8427f0564be5ccd00c84b6643abad -
Branch / Tag:
refs/tags/v0.1.0 - Owner: https://github.com/Sheheryar-byte
-
Access:
public
-
Token Issuer:
https://token.actions.githubusercontent.com -
Runner Environment:
github-hosted -
Publication workflow:
publish.yml@68b4980f6ad8427f0564be5ccd00c84b6643abad -
Trigger Event:
release
-
Statement type: