Skip to main content

A coroutine composition framework for declarative async pipelines in Python

Project description

CorStream Logo

CorStream

Python Version Status Last Commit Contributors License


A coroutine composition framework for Python that enables declarative, streaming-style async pipelines.

📝 Table of Contents

🧐 About

CorStream is a Python library that lets you build elegant, composable pipelines using asynchronous iterables and coroutine-based operators. Think of it like a blend of asyncio, RxPy, and Unix pipes — but designed for readability, type safety, and modern Python development.

With CorStream, you can easily:

  • Transform and filter async data flows
  • Batch, throttle, or log stream data
  • Apply async functions with concurrency control
  • Collect or reduce outputs with simple syntax

🏁 Getting Started

These instructions will get you a copy of the project up and running on your local machine for development and testing purposes.

Prerequisites

You’ll need Python 3.9 or higher and Poetry installed:

python3 --version
# Should be 3.9+

curl -sSL https://install.python-poetry.org | python3 -

Installing

Clone the repo and install dependencies with Poetry:

git clone https://github.com/shivkun/corstream.git
cd corstream
poetry install

CorStream is also published on PyPI, you can install it directly with:

pip install corstream
# or use Poetry
poetry add corstream

You can now run tests, examples, or start building pipelines!

🔧 Running the tests

To run all tests:

poetry run pytest

Break down into end-to-end tests

Each operator and sink has its own test file under tests/.

Example:

tests/test_map.py          # Tests for .map()
tests/test_batch.py        # Tests for .batch()
tests/test_to_list.py      # Tests for .to_list()

And coding style tests

CorStream follows strict linting and formatting with black, mypy, and ruff:

poetry run black corstream/
poetry run ruff check corstream/
poetry run mypy corstream/

🎈 Usage

Here's a simple pipeline:

from corstream import Stream

async def get_email(user_id: int) -> str:
    return f"user{user_id}@example.com"

async def send_batch(batch: list[str]):
    print("Sending:", batch)

await (
    Stream
    .from_iterable(range(1, 11))
    .filter(lambda x: x % 2 == 0)
    .map_async(get_email, max_concurrency=3)
    .batch(5)
    .log("batch")
    .for_each(send_batch)
)

⛏️ Built Using

✍️ Authors

See also the list of contributors.

🎉 Acknowledgements

  • Inspiration from functional programming and reactive streams
  • Thanks to the maintainers of asyncio, RxPy, and toolz

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

corstream-0.1.0a1.tar.gz (9.7 kB view details)

Uploaded Source

Built Distribution

If you're not sure about the file name format, learn more about wheel file names.

corstream-0.1.0a1-py3-none-any.whl (14.2 kB view details)

Uploaded Python 3

File details

Details for the file corstream-0.1.0a1.tar.gz.

File metadata

  • Download URL: corstream-0.1.0a1.tar.gz
  • Upload date:
  • Size: 9.7 kB
  • Tags: Source
  • Uploaded using Trusted Publishing? No
  • Uploaded via: poetry/2.1.2 CPython/3.13.3 Darwin/24.4.0

File hashes

Hashes for corstream-0.1.0a1.tar.gz
Algorithm Hash digest
SHA256 5fca7ff1a34587967882313b2422ebecc64e05c957b3544766fdb9faff288696
MD5 04066e4768d686d72eda03366377bb97
BLAKE2b-256 b1b4c01d2e9838597eb49765309df68b33515cd2a19adf16bedc0f9f0c6ebfdc

See more details on using hashes here.

File details

Details for the file corstream-0.1.0a1-py3-none-any.whl.

File metadata

  • Download URL: corstream-0.1.0a1-py3-none-any.whl
  • Upload date:
  • Size: 14.2 kB
  • Tags: Python 3
  • Uploaded using Trusted Publishing? No
  • Uploaded via: poetry/2.1.2 CPython/3.13.3 Darwin/24.4.0

File hashes

Hashes for corstream-0.1.0a1-py3-none-any.whl
Algorithm Hash digest
SHA256 ff29c1de27afd11cad5809c546dc078ed5c52145e8d62cc53b51cc9c4fad9ca2
MD5 4ea691729d6a14c3de18dbfc80ad6e6b
BLAKE2b-256 00ca32e99b2dfa0942e8b22e572f583c877b49f3fdf2ea80df7c37c0f659a9a4

See more details on using hashes here.

Supported by

AWS Cloud computing and Security Sponsor Datadog Monitoring Depot Continuous Integration Fastly CDN Google Download Analytics Pingdom Monitoring Sentry Error logging StatusPage Status page