A coroutine composition framework for declarative async pipelines in Python
Project description
CorStream
A coroutine composition framework for Python that enables declarative, streaming-style async pipelines.
📝 Table of Contents
🧐 About
CorStream is a Python library that lets you build elegant, composable pipelines using asynchronous iterables and coroutine-based operators. Think of it like a blend of asyncio, RxPy, and Unix pipes — but designed for readability, type safety, and modern Python development.
With CorStream, you can easily:
- Transform and filter async data flows
- Batch, throttle, or log stream data
- Apply async functions with concurrency control
- Collect or reduce outputs with simple syntax
🏁 Getting Started
These instructions will get you a copy of the project up and running on your local machine for development and testing purposes.
Prerequisites
You’ll need Python 3.9 or higher and Poetry installed:
python3 --version
# Should be 3.9+
curl -sSL https://install.python-poetry.org | python3 -
Installing
Clone the repo and install dependencies with Poetry:
git clone https://github.com/shivkun/corstream.git
cd corstream
poetry install
CorStream is also published on PyPI, you can install it directly with:
pip install corstream
# or use Poetry
poetry add corstream
You can now run tests, examples, or start building pipelines!
🔧 Running the tests
To run all tests:
poetry run pytest
Break down into end-to-end tests
Each operator and sink has its own test file under tests/.
Example:
tests/test_map.py # Tests for .map()
tests/test_batch.py # Tests for .batch()
tests/test_to_list.py # Tests for .to_list()
And coding style tests
CorStream follows strict linting and formatting with black, mypy, and ruff:
poetry run black corstream/
poetry run ruff check corstream/
poetry run mypy corstream/
🎈 Usage
Here's a simple pipeline:
from corstream import Stream
async def get_email(user_id: int) -> str:
return f"user{user_id}@example.com"
async def send_batch(batch: list[str]):
print("Sending:", batch)
await (
Stream
.from_iterable(range(1, 11))
.filter(lambda x: x % 2 == 0)
.map_async(get_email, max_concurrency=3)
.batch(5)
.log("batch")
.for_each(send_batch)
)
⛏️ Built Using
✍️ Authors
- @shivkun — Design & Implementation
See also the list of contributors.
🎉 Acknowledgements
- Inspiration from functional programming and reactive streams
- Thanks to the maintainers of
asyncio,RxPy, andtoolz
Project details
Release history Release notifications | RSS feed
Download files
Download the file for your platform. If you're not sure which to choose, learn more about installing packages.
Source Distribution
Built Distribution
Filter files by name, interpreter, ABI, and platform.
If you're not sure about the file name format, learn more about wheel file names.
Copy a direct link to the current filters
File details
Details for the file corstream-0.1.0a1.tar.gz.
File metadata
- Download URL: corstream-0.1.0a1.tar.gz
- Upload date:
- Size: 9.7 kB
- Tags: Source
- Uploaded using Trusted Publishing? No
- Uploaded via: poetry/2.1.2 CPython/3.13.3 Darwin/24.4.0
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
5fca7ff1a34587967882313b2422ebecc64e05c957b3544766fdb9faff288696
|
|
| MD5 |
04066e4768d686d72eda03366377bb97
|
|
| BLAKE2b-256 |
b1b4c01d2e9838597eb49765309df68b33515cd2a19adf16bedc0f9f0c6ebfdc
|
File details
Details for the file corstream-0.1.0a1-py3-none-any.whl.
File metadata
- Download URL: corstream-0.1.0a1-py3-none-any.whl
- Upload date:
- Size: 14.2 kB
- Tags: Python 3
- Uploaded using Trusted Publishing? No
- Uploaded via: poetry/2.1.2 CPython/3.13.3 Darwin/24.4.0
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
ff29c1de27afd11cad5809c546dc078ed5c52145e8d62cc53b51cc9c4fad9ca2
|
|
| MD5 |
4ea691729d6a14c3de18dbfc80ad6e6b
|
|
| BLAKE2b-256 |
00ca32e99b2dfa0942e8b22e572f583c877b49f3fdf2ea80df7c37c0f659a9a4
|