A lightweight, pythonic library for building repeatable data pipelines
Project description
simplypipe
Chainable data pipelines for Python iterables. Zero dependencies.
stats = (
pipe(records)
.filter(is_valid)
.map(normalize)
.catch(enrich_from_api, on_error=dead_letters.append)
.batch(100)
.run(sink=write_to_db)
)
.run() returns a RunStats object with processed/emitted counts, error counts, and wall-clock duration.
pip install simplypipe
Quick start
from simplypipe import pipe
stats = (
pipe(range(1000))
.filter(lambda x: x % 2 == 0)
.map(lambda x: x * 3)
.batch(100)
.run(sink=print)
)
print(stats.processed, stats.emitted, stats.duration)
Operators
map(fn)
Applies fn to each item and passes the result downstream. The original item is replaced by the return value of fn.
pipe(["hello", "world"])
.map(str.upper)
.run(sink=print)
# HELLO
# WORLD
flat_map(fn)
Applies fn to each item and flattens the result. Use this when fn returns an iterable and you want each element of that iterable to continue as a separate item.
pipe(["hello world", "foo bar"])
.flat_map(str.split)
.run(sink=print)
# hello
# world
# foo
# bar
filter(fn)
Keeps only items for which fn returns a truthy value. Dropped items are counted in RunStats.dropped.
pipe(range(10))
.filter(lambda x: x % 2 == 0)
.run(sink=print)
# 0
# 2
# 4
# 6
# 8
tap(fn)
Calls fn for its side-effect on each item, then passes the item through unchanged. Useful for logging or debugging mid-pipeline.
pipe(range(3))
.tap(lambda x: print(f"processing {x}"))
.map(lambda x: x * 10)
.run(sink=print)
# processing 0
# 0
# processing 1
# 10
# processing 2
# 20
batch(size)
Collects items into lists of up to size elements. The last batch may be smaller if the source is exhausted. Each batch counts in RunStats.batches.
pipe(range(7))
.batch(3)
.run(sink=print)
# [0, 1, 2]
# [3, 4, 5]
# [6]
rate_limit(rate, per=1.0)
Throttles throughput to at most rate items per per seconds by sleeping between items as needed.
# Process at most 5 items per second
pipe(range(20))
.rate_limit(5, per=1.0)
.run(sink=print)
dedupe(key=None, max_size=None)
Drops duplicate items, keeping only the first occurrence. Use key to extract the comparison value from each item. Use max_size to bound memory — when the seen-set exceeds max_size, the oldest entry is evicted (LRU). Dropped duplicates are counted in RunStats.dropped.
pipe([1, 2, 2, 3, 1, 4])
.dedupe()
.run(sink=print)
# 1
# 2
# 3
# 4
# With a key function
pipe([{"id": 1, "v": "a"}, {"id": 1, "v": "b"}, {"id": 2, "v": "c"}])
.dedupe(key=lambda x: x["id"])
.run(sink=print)
# {"id": 1, "v": "a"}
# {"id": 2, "v": "c"}
retry_map(fn, retries=3, backoff=1.0, exceptions=(Exception,))
Like map, but retries fn up to retries times if it raises one of the specified exceptions. Backoff between attempts is exponential: backoff * 2 ** attempt seconds. If all retries are exhausted, the last exception is re-raised. Each failed attempt increments RunStats.errors.
import random
def flaky(x):
if random.random() < 0.5:
raise ValueError("transient error")
return x * 2
pipe(range(5))
.retry_map(flaky, retries=3, backoff=0.1)
.run(sink=print)
take(n)
Emits at most n items, then stops. Useful for previewing a pipeline, limiting output, or processing only a slice of a large source.
pipe(range(1_000_000))
.map(expensive_transform)
.take(10)
.run(sink=print)
catch(fn, on_error, exceptions=(Exception,))
Like map, but handles errors per item instead of crashing the pipeline. If fn raises one of the specified exceptions, on_error(item, exc) is called and the item is dropped. Processing continues with the next item. Each caught error increments RunStats.errors.
dead_letters = []
pipe(records)
.catch(
enrich_from_api,
on_error=lambda item, exc: dead_letters.append(item),
exceptions=(IOError, TimeoutError),
)
.run(sink=write_to_db)
RunStats
.run() returns a RunStats dataclass:
@dataclass
class RunStats:
processed: int # items from source
emitted: int # items delivered to sink
dropped: int # items removed by filter or dedupe
batches: int # batches produced by batch()
errors: int # exceptions caught by retry_map or catch
duration: float # wall-clock time in seconds
Development
git clone https://github.com/janmarkuslanger/simplypipe.git
cd simplypipe
pip install -e ".[dev]"
Run tests:
pytest
Lint and format:
ruff check .
ruff format .
Type-check:
mypy simplypipe
License
MIT
Project details
Release history Release notifications | RSS feed
Download files
Download the file for your platform. If you're not sure which to choose, learn more about installing packages.
Source Distribution
Built Distribution
Filter files by name, interpreter, ABI, and platform.
If you're not sure about the file name format, learn more about wheel file names.
Copy a direct link to the current filters
File details
Details for the file simplypipe-0.1.0.tar.gz.
File metadata
- Download URL: simplypipe-0.1.0.tar.gz
- Upload date:
- Size: 8.9 kB
- Tags: Source
- Uploaded using Trusted Publishing? Yes
- Uploaded via: twine/6.1.0 CPython/3.13.7
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
b31a5b4571e1693c892684f0cec25f1e97db9fd6d422fed9e3003657b457aa04
|
|
| MD5 |
f399777de29fb81f431f2d4e2a4c97b5
|
|
| BLAKE2b-256 |
10895961109f296d3aff84720e01fda6cadc34d80b2c5ab388b90222ed6dc798
|
Provenance
The following attestation bundles were made for simplypipe-0.1.0.tar.gz:
Publisher:
release.yml on janmarkuslanger/simplypipe
-
Statement:
-
Statement type:
https://in-toto.io/Statement/v1 -
Predicate type:
https://docs.pypi.org/attestations/publish/v1 -
Subject name:
simplypipe-0.1.0.tar.gz -
Subject digest:
b31a5b4571e1693c892684f0cec25f1e97db9fd6d422fed9e3003657b457aa04 - Sigstore transparency entry: 999863459
- Sigstore integration time:
-
Permalink:
janmarkuslanger/simplypipe@e537f87c4d20b7d0dd1ae03c7539a0692eb48e46 -
Branch / Tag:
refs/tags/v0.1.0 - Owner: https://github.com/janmarkuslanger
-
Access:
public
-
Token Issuer:
https://token.actions.githubusercontent.com -
Runner Environment:
github-hosted -
Publication workflow:
release.yml@e537f87c4d20b7d0dd1ae03c7539a0692eb48e46 -
Trigger Event:
release
-
Statement type:
File details
Details for the file simplypipe-0.1.0-py3-none-any.whl.
File metadata
- Download URL: simplypipe-0.1.0-py3-none-any.whl
- Upload date:
- Size: 6.4 kB
- Tags: Python 3
- Uploaded using Trusted Publishing? Yes
- Uploaded via: twine/6.1.0 CPython/3.13.7
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
26e81eb7a9fdf26baa3c283a80973135543ac5cd322eda26dc467b43a5a97090
|
|
| MD5 |
692264e098212c15bcbf7ef729e4b1b1
|
|
| BLAKE2b-256 |
7494c2eecfc042f63a708ef65fa92e8c928bb748afd4b1266388dcbc5fe79a38
|
Provenance
The following attestation bundles were made for simplypipe-0.1.0-py3-none-any.whl:
Publisher:
release.yml on janmarkuslanger/simplypipe
-
Statement:
-
Statement type:
https://in-toto.io/Statement/v1 -
Predicate type:
https://docs.pypi.org/attestations/publish/v1 -
Subject name:
simplypipe-0.1.0-py3-none-any.whl -
Subject digest:
26e81eb7a9fdf26baa3c283a80973135543ac5cd322eda26dc467b43a5a97090 - Sigstore transparency entry: 999863466
- Sigstore integration time:
-
Permalink:
janmarkuslanger/simplypipe@e537f87c4d20b7d0dd1ae03c7539a0692eb48e46 -
Branch / Tag:
refs/tags/v0.1.0 - Owner: https://github.com/janmarkuslanger
-
Access:
public
-
Token Issuer:
https://token.actions.githubusercontent.com -
Runner Environment:
github-hosted -
Publication workflow:
release.yml@e537f87c4d20b7d0dd1ae03c7539a0692eb48e46 -
Trigger Event:
release
-
Statement type: