Composable async pipelines with structured concurrency — built on anyio, inspired by aiostream and Rust's Result type.
Project description
anyiostream
Composable async pipelines with structured concurrency
anyiostream provides lazy, composable async pipelines with true inter-stage concurrency, backpressure, and Rust-inspired error handling — all built on anyio for seamless asyncio + trio support.
Why anyiostream?
Python has excellent async primitives, but a gap exists between raw concurrency tools and declarative pipeline APIs:
- aiostream pioneered composable
|pipe syntax — but uses nested async generators in a single task. Stages execute sequentially via__anext__()pull chains, not as concurrent tasks. asyncio-only. - anyio provides the right primitives (
TaskGroup,MemoryObjectStream,.clone()) — but no pipeline abstraction. Wiring a 3-stage concurrent pipeline requires ~30 lines of boilerplate.
anyiostream bridges this gap with the CSP (Communicating Sequential Processes) pattern: each stage runs as an independent task, connected by bounded channels — the same model as Go channels.
| Feature | aiostream | anyio (raw) | anyiostream |
|---|---|---|---|
| Inter-stage concurrency | No — single-task generator pull | Manual (~30 LOC per pipeline) | Yes — task-per-stage in TaskGroup |
Fan-out workers=N |
No (task_limit within one stage) |
Manual (clone streams yourself) | Yes — load-balanced via clone() |
| Backpressure | No (pull-based) | Yes — manual wiring | Yes — buffer_size per stage |
Result Ok/Err types |
No | No | Yes — railway-oriented error handling |
| Backend | asyncio only | asyncio + trio | asyncio + trio |
Pipe | syntax |
Yes | No | Yes |
| Structured concurrency | Partial | Yes | Yes — automatic cleanup |
30 lines of raw anyio → 4 lines of anyiostream
# Raw anyio — manual channel wiring
async def manual_pipeline():
s0, r0 = anyio.create_memory_object_stream(10)
s1, r1 = anyio.create_memory_object_stream(10)
s2, r2 = anyio.create_memory_object_stream(10)
async def source(send):
async with send:
for url in urls:
await send.send(url)
async def worker(recv, send):
async with recv, send:
async for url in recv:
await send.send(await fetch(url))
async with anyio.create_task_group() as tg:
tg.start_soon(source, s0)
for _ in range(3):
tg.start_soon(worker, r0.clone(), s1.clone())
r0.close(); s1.close()
# ... repeat for stage 2 ...
# anyiostream — same behavior
result = await (
Stream.from_iterable(urls)
.map(fetch, workers=3, buffer_size=10)
.map(parse)
.collect()
)
Features
- Lazy pipelines — nothing runs until a terminal operation (
collect,count,reduce,first,take) - True inter-stage concurrency — each stage runs in its own task, items flow between stages via bounded channels
- Backpressure — bounded memory object streams prevent fast producers from overwhelming slow consumers
- Fan-out workers — scale any stage horizontally with
workers=N - Two composition styles — method chaining or aiostream-inspired
|pipe syntax - Rust-inspired
Ok/Err—try_map,try_filter,recover,collect_splitfor railway-oriented error handling - Backend-portable — runs on both asyncio and trio via anyio
- Structured concurrency — automatic cleanup via
TaskGroup, no leaked tasks
Installation
pip install anyiostream
Or with uv:
uv add anyiostream
Quick Start
Method Chaining
from anyiostream import Stream
result = await (
Stream.from_iterable(range(100))
.map(lambda x: x * 2, workers=4)
.filter(lambda x: x > 50)
.collect()
)
Pipe Operator (aiostream-style)
from anyiostream import Stream, pipe
result = await (
Stream.from_iterable(urls)
| pipe.map(fetch, workers=10)
| pipe.flat_map(extract_links, workers=5)
| pipe.filter(is_valid)
| pipe.map(normalize)
| pipe.collect()
)
Error Handling (Rust-style Ok/Err)
from anyiostream import Stream, pipe, Ok, Err
# Exceptions become Err(PipelineError(...)) instead of crashing
oks, errs = await (
Stream.from_iterable(urls)
| pipe.try_map(fetch, workers=5) # Ok(response) or Err(PipelineError)
| pipe.try_map(parse) # chains on Ok, passes Err through
| pipe.collect_split() # partition into (successes, failures)
)
# Custom error handler — transform Err items instead of passing through
results = await (
Stream.from_iterable(urls)
| pipe.try_map(fetch, workers=5, err=lambda e: log_and_rewrap(e))
| pipe.try_map(parse, err=lambda e: e) # pass Err unchanged explicitly
| pipe.collect()
)
# Or recover from errors
results = await (
Stream.from_iterable(urls)
| pipe.try_map(fetch, workers=5)
| pipe.recover(lambda err: fallback(err.item)) # Err → fallback value
| pipe.collect()
)
Context Manager (manual iteration)
from anyiostream import Stream
pipeline = (
Stream.from_iterable(events)
.map(process, workers=4)
.filter(is_important)
)
async with pipeline.open() as items:
async for item in items:
await handle(item)
API Overview
Stream Constructors
| Method | Description |
|---|---|
Stream.from_iterable(items) |
Create from sync or async iterable |
Stream.from_callable(factory) |
Lazy — factory called at execution time |
Transform Stages
| Method | Pipe Syntax | Description |
|---|---|---|
.map(fn) |
| pipe.map(fn) |
1:1 transform (sync or async) |
.flat_map(fn) |
| pipe.flat_map(fn) |
1:N transform |
.filter(pred) |
| pipe.filter(pred) |
Keep items where pred is truthy |
.foreach(fn) |
| pipe.foreach(fn) |
Side-effect, passes items through |
Result-Aware Stages
| Method | Pipe Syntax | Description |
|---|---|---|
.try_map(fn, err=handler) |
| pipe.try_map(fn, err=handler) |
Map with Ok/Err wrapping |
.try_flat_map(fn, err=handler) |
| pipe.try_flat_map(fn, err=handler) |
Flat map with Ok/Err wrapping |
.try_filter(pred) |
| pipe.try_filter(pred) |
Filter Ok values, Err passes through |
.try_foreach(fn, err=handler) |
| pipe.try_foreach(fn, err=handler) |
Side-effect on Ok values |
.recover(fn) |
| pipe.recover(fn) |
Convert Err → value, unwrap Ok |
.ok_only() |
| pipe.ok_only() |
Keep Ok values, drop Err |
.errors_only() |
| pipe.errors_only() |
Keep Err values, drop Ok |
err=handler(optional): When provided,Erritems are transformed byhandler(error)instead of passing through unchanged. Omit to let errors flow downstream as-is.
Terminal Operations
| Method | Pipe Syntax | Description |
|---|---|---|
.collect() |
| pipe.collect() |
Collect all items into a list |
.count() |
| pipe.count() |
Consume all, return count |
.collect_split() |
| pipe.collect_split() |
Partition into (oks, errs) |
.reduce(fn, init) |
— | Fold into single value |
.first() |
— | Return first item or None |
.take(n) |
— | Collect at most n items |
.open() |
— | Context manager for manual iteration |
Stage Options
Every stage accepts these keyword arguments:
| Option | Default | Description |
|---|---|---|
workers |
1 |
Number of concurrent workers |
buffer_size |
0 |
Backpressure buffer (0 = rendezvous, math.inf = unbounded) |
name |
None |
Human-readable label for debugging |
How It Works
Source → [channel] → Stage 1 → [channel] → Stage 2 → [channel] → Terminal
↑ bounded workers=N workers=M
backpressure (fan-out) (fan-out)
- Lazy recipe —
Streamholds a list ofProcessdescriptors. Nothing runs yet. - Terminal triggers execution —
collect(),count(), etc. materialize the pipeline. - Channel chain — anyio
MemoryObjectStreampairs connect each stage with bounded backpressure. - Structured concurrency — all tasks run inside a single
TaskGroup. Cleanup is automatic. - Fan-out —
workers=Nclones the receive stream so N workers pull from the same channel (first-available-wins).
Development
# Install dependencies
uv sync --all-extras
# Run tests (both asyncio and trio backends)
uv run pytest
# Lint & format
uv run ruff check .
uv run ruff format .
Inspiration & Acknowledgements
anyiostream stands on the shoulders of excellent projects:
-
aiostream by Vincent Michel — pioneered composable async stream operators with pipe syntax for asyncio. anyiostream's
| pipe.map(fn)API is directly inspired by aiostream's elegant design. -
anyio by Alex Grönholm — the structured concurrency foundation that makes anyiostream backend-portable. Memory object streams and task groups from anyio are the core execution primitives.
-
trio by Nathaniel J. Smith — pioneered structured concurrency in Python and inspired anyio's design. Trio's philosophy of "make concurrency correct by default" deeply influences anyiostream's automatic cleanup guarantees.
-
Rust's
Result<T, E>— theOk/Errpattern for railway-oriented error handling. anyiostream'stry_map,recover, andcollect_splitbring this pattern to async Python pipelines, letting errors flow as values instead of crashing silently.
License
Project details
Release history Release notifications | RSS feed
Download files
Download the file for your platform. If you're not sure which to choose, learn more about installing packages.
Source Distribution
Built Distribution
Filter files by name, interpreter, ABI, and platform.
If you're not sure about the file name format, learn more about wheel file names.
Copy a direct link to the current filters
File details
Details for the file anyiostream-0.0.1a0.tar.gz.
File metadata
- Download URL: anyiostream-0.0.1a0.tar.gz
- Upload date:
- Size: 13.3 kB
- Tags: Source
- Uploaded using Trusted Publishing? Yes
- Uploaded via: twine/6.1.0 CPython/3.13.7
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
91fbfe3433f6846bc91e273822353e9c8c2f50e15b56a179ac7ae19a51883a79
|
|
| MD5 |
cb305e9c4cdbf1589f3963898b322794
|
|
| BLAKE2b-256 |
29b2365f4c3ac239a51b87351109d3618ea82a090a8529e6765ec10f42f202cb
|
Provenance
The following attestation bundles were made for anyiostream-0.0.1a0.tar.gz:
Publisher:
distribute.yml on hotung1027/anyiostream
-
Statement:
-
Statement type:
https://in-toto.io/Statement/v1 -
Predicate type:
https://docs.pypi.org/attestations/publish/v1 -
Subject name:
anyiostream-0.0.1a0.tar.gz -
Subject digest:
91fbfe3433f6846bc91e273822353e9c8c2f50e15b56a179ac7ae19a51883a79 - Sigstore transparency entry: 942041771
- Sigstore integration time:
-
Permalink:
hotung1027/anyiostream@7a1c0e088fb28dfade4eb3587993c04a4176a3a8 -
Branch / Tag:
refs/tags/v0.0.1-alpha - Owner: https://github.com/hotung1027
-
Access:
public
-
Token Issuer:
https://token.actions.githubusercontent.com -
Runner Environment:
github-hosted -
Publication workflow:
distribute.yml@7a1c0e088fb28dfade4eb3587993c04a4176a3a8 -
Trigger Event:
push
-
Statement type:
File details
Details for the file anyiostream-0.0.1a0-py3-none-any.whl.
File metadata
- Download URL: anyiostream-0.0.1a0-py3-none-any.whl
- Upload date:
- Size: 16.2 kB
- Tags: Python 3
- Uploaded using Trusted Publishing? Yes
- Uploaded via: twine/6.1.0 CPython/3.13.7
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
db2d3bb38f19d81e3b39d627180715525a6817575c226dc0573721de721333b4
|
|
| MD5 |
8e94522be1c9303f570ce471a039077f
|
|
| BLAKE2b-256 |
53df8bd1530e31f03e85a77a9ba32720461eb92e40b178f8f9ae7264b8346fde
|
Provenance
The following attestation bundles were made for anyiostream-0.0.1a0-py3-none-any.whl:
Publisher:
distribute.yml on hotung1027/anyiostream
-
Statement:
-
Statement type:
https://in-toto.io/Statement/v1 -
Predicate type:
https://docs.pypi.org/attestations/publish/v1 -
Subject name:
anyiostream-0.0.1a0-py3-none-any.whl -
Subject digest:
db2d3bb38f19d81e3b39d627180715525a6817575c226dc0573721de721333b4 - Sigstore transparency entry: 942041774
- Sigstore integration time:
-
Permalink:
hotung1027/anyiostream@7a1c0e088fb28dfade4eb3587993c04a4176a3a8 -
Branch / Tag:
refs/tags/v0.0.1-alpha - Owner: https://github.com/hotung1027
-
Access:
public
-
Token Issuer:
https://token.actions.githubusercontent.com -
Runner Environment:
github-hosted -
Publication workflow:
distribute.yml@7a1c0e088fb28dfade4eb3587993c04a4176a3a8 -
Trigger Event:
push
-
Statement type: