Skip to main content

sync/async iterable streams for Python

Project description

streamable

sync/async iterable streams for Python

stream[T] wraps any Iterable[T] or AsyncIterable[T] with a lazy fluent interface covering concurrency, batching, buffering, rate limiting, progress logging, and error handling.

Python 3.8+ PyPI version Anaconda-Server Badge coverage readthedocs

1. install

pip install streamable

2. import

from streamable import stream

3. init

Create a stream[T] from an Iterable[T] (or AsyncIterable[T]):

ints: stream[int] = stream(range(10))

4. operate

Chain lazy operations:

import logging
from datetime import timedelta
import httpx
from httpx import Response, HTTPStatusError
from streamable import stream

pokemons: stream[str] = (
    stream(range(10))
    .map(lambda i: f"https://pokeapi.co/api/v2/pokemon-species/{i}")
    .throttle(5, per=timedelta(seconds=1))
    .map(httpx.get, concurrency=2)
    .do(Response.raise_for_status)
    .catch(HTTPStatusError, do=logging.warning)
    .map(lambda poke: poke.json()["name"])
)

Source elements will be processed on-the-fly during iteration.

Operations accept both sync and async functions.

5. iterate

A stream[T] is Iterable[T] (and AsyncIterable[T]):

>>> list(pokemons)
['bulbasaur', 'ivysaur', 'venusaur', 'charmander', 'charmeleon', 'charizard', 'squirtle', 'wartortle', 'blastoise']

📒 Operations (docs)

Operations accept both sync and async functions, they can be mixed within the same stream, that can then be consumed as an Iterable or AsyncIterable. Async functions run in the current loop, one is created if needed.

Operations are implemented so that the iteration can resume after an exception.

A stream can be iterated several times if its source allows it.

A stream exposes operations to manipulate its elements, but the I/O is not its responsibility. It's meant to be combined with dedicated libraries like pyarrow, psycopg2, boto3, dlt (ETL example) ...

.map

Transform elements:

int_chars: stream[str] = stream(range(10)).map(str)

assert list(int_chars) == ['0', '1', '2', '3', '4', '5', '6', '7', '8', '9']

concurrency

Set the concurrency param to apply the transformation concurrently.

Only concurrency upstream elements are in-flight for processing.

Preserve upstream order unless you set as_completed=True.

via threads

If concurrency > 1, the transformation will be applied via concurrency threads:

pokemons: stream[str] = (
    stream(range(1, 4))
    .map(lambda i: f"https://pokeapi.co/api/v2/pokemon-species/{i}")
    .map(httpx.get, concurrency=2)
    .map(lambda poke: poke.json()["name"])
)
assert list(pokemons) == ['bulbasaur', 'ivysaur', 'venusaur']

via async

If concurrency > 1 and the transformation is async, it will be applied via concurrency async tasks:

# async context
async with httpx.AsyncClient() as http_client:
    pokemons: stream[str] = (
        stream(range(1, 4))
        .map(lambda i: f"https://pokeapi.co/api/v2/pokemon-species/{i}")
        .map(http_client.get, concurrency=2)
        .map(lambda poke: poke.json()["name"])
    )
    assert [name async for name in pokemons] == ['bulbasaur', 'ivysaur', 'venusaur']
# sync context
with asyncio.Runner() as runner:
    http_client = httpx.AsyncClient()
    pokemons: stream[str] = (
        stream(range(1, 4))
        .map(lambda i: f"https://pokeapi.co/api/v2/pokemon-species/{i}")
        .map(http_client.get, concurrency=2)
        .map(lambda poke: poke.json()["name"])
    )
    # uses runner's loop
    assert list(pokemons) == ['bulbasaur', 'ivysaur', 'venusaur']
    runner.run(http_client.aclose())

via processes

concurrency can also be a concurrent.futures.Executor, pass a ProcessPoolExecutor to apply the transformations via processes:

if __name__ == "__main__":
    with ProcessPoolExecutor(max_workers=10) as processes:
        state: list[int] = []
        # ints are mapped
        assert list(
            stream(range(10))
            .map(state.append, concurrency=processes)
        ) == [None] * 10
        # the `state` of the main process is not mutated
        assert state == []

.do

Perform side effects:

state: list[int] = []
storing_ints: stream[int] = stream(range(10)).do(state.append)

assert list(storing_ints) == [0, 1, 2, 3, 4, 5, 6, 7, 8, 9]
assert state == [0, 1, 2, 3, 4, 5, 6, 7, 8, 9]

concurrency

Same as .map.

.group

Group elements into batches...

... up_to a given batch size:

int_batches: stream[list[int]] = stream(range(10)).group(5)

assert list(int_batches) == [[0, 1, 2, 3, 4], [5, 6, 7, 8, 9]]

... within a given time interval:

from datetime import timedelta

int_1s_batches: stream[list[int]] = (
    stream(range(10))
    .throttle(2, per=timedelta(seconds=1))
    .group(within=timedelta(seconds=0.99))
)

assert list(int_1s_batches) == [[0, 1], [2, 3], [4, 5], [6, 7], [8, 9]]

... by a given key, yielding (key, elements) pairs:

ints_by_parity: stream[tuple[str, list[int]]] = (
    stream(range(10))
    .group(by=lambda n: "odd" if n % 2 else "even")
)

assert list(ints_by_parity) == [("even", [0, 2, 4, 6, 8]), ("odd", [1, 3, 5, 7, 9])]

You can combine these parameters.

.flatten

Explode upstream elements (Iterable or AsyncIterable):

chars: stream[str] = stream(["hel", "lo!"]).flatten()

assert list(chars) == ["h", "e", "l", "l", "o", "!"]

concurrency

Explode concurrency iterables at a time:

chars: stream[str] = stream(["hel", "lo", "!"]).flatten(concurrency=2)

assert list(chars) == ["h", "l", "e", "o", "l", "!"]

.filter

Filter elements satisfying a predicate:

even_ints: stream[int] = stream(range(10)).filter(lambda n: n % 2 == 0)

assert list(even_ints) == [0, 2, 4, 6, 8]

.take

Take a given number of elements:

first_5_ints: stream[int] = stream(range(10)).take(5)

assert list(first_5_ints) == [0, 1, 2, 3, 4]

... or take until a predicate is satisfied:

first_5_ints: stream[int] = stream(range(10)).take(until=lambda n: n == 5)

assert list(first_5_ints) == [0, 1, 2, 3, 4]

.skip

Skip a given number of elements:

ints_after_5: stream[int] = stream(range(10)).skip(5)

assert list(ints_after_5) == [5, 6, 7, 8, 9]

... or skip until a predicate is satisfied:

ints_after_5: stream[int] = stream(range(10)).skip(until=lambda n: n >= 5)

assert list(ints_after_5) == [5, 6, 7, 8, 9]

.catch

Catch exceptions of a given type:

inverses: stream[float] = (
    stream(range(10))
    .map(lambda n: round(1 / n, 2))
    .catch(ZeroDivisionError)
)

assert list(inverses) == [1.0, 0.5, 0.33, 0.25, 0.2, 0.17, 0.14, 0.12, 0.11]

... where a predicate is satisfied:

from http import HTTPStatus

urls = [
    "https://github.com/ebonnal",
    "https://github.com/ebonnal/streamable",
    "https://github.com/ebonnal/foo",
]
responses: stream[httpx.Response] = (
    stream(urls)
    .map(httpx.get)
    .do(httpx.Response.raise_for_status)
    .catch(
        httpx.HTTPStatusError,
        where=lambda e: e.response.status_code == HTTPStatus.NOT_FOUND,
    )
)
assert len(list(responses)) == 2

... do a side effect on catch:

errors: list[Exception] = []
inverses: stream[float] = (
    stream(range(10))
    .map(lambda n: round(1 / n, 2))
    .catch(ZeroDivisionError, do=errors.append)
)
assert list(inverses) == [1.0, 0.5, 0.33, 0.25, 0.2, 0.17, 0.14, 0.12, 0.11]
assert len(errors) == 1

... replace with a value:

inverses: stream[float] = (
    stream(range(10))
    .map(lambda n: round(1 / n, 2))
    .catch(ZeroDivisionError, replace=lambda e: float("inf"))
)

assert list(inverses) == [float("inf"), 1.0, 0.5, 0.33, 0.25, 0.2, 0.17, 0.14, 0.12, 0.11]

... stop=True to stop the iteration if an exception is caught:

inverses: stream[float] = (
    stream(range(10))
    .map(lambda n: round(1 / n, 2))
    .catch(ZeroDivisionError, stop=True)
)

assert list(inverses) == []

You can combine these parameters.

.throttle

Limit the number of emissions per time interval (sliding window):

from datetime import timedelta

throttled_ints: stream[int] = stream(range(10)).throttle(3, per=timedelta(seconds=1))

# takes 3 seconds
assert list(throttled_ints) == [0, 1, 2, 3, 4, 5, 6, 7, 8, 9]

.buffer

Buffer upstream elements into a bounded queue via a background task (decoupling upstream production rate from downstream consumption rate):

pulled: list[int] = []
buffered_ints = iter(
    stream(range(10))
    .do(pulled.append)
    .buffer(5)
)
assert next(buffered_ints) == 0
time.sleep(1e-3)
assert pulled == [0, 1, 2, 3, 4, 5]

.observe

Observe the iteration progress:

observed_ints: stream[int] = stream(range(10)).observe("ints")
assert list(observed_ints) == [0, 1, 2, 3, 4, 5, 6, 7, 8, 9]

logs:

2025-12-23T16:43:07Z INFO observed=ints elapsed=0:00:00.000019 errors=0 elements=1
2025-12-23T16:43:07Z INFO observed=ints elapsed=0:00:00.001117 errors=0 elements=2
2025-12-23T16:43:07Z INFO observed=ints elapsed=0:00:00.001147 errors=0 elements=4
2025-12-23T16:43:07Z INFO observed=ints elapsed=0:00:00.001162 errors=0 elements=8
2025-12-23T16:43:07Z INFO observed=ints elapsed=0:00:00.001179 errors=0 elements=10

Logs are produced when the counts reach powers of 2. Set every to produce them periodically:

# observe every 1k elements (or errors)
observed_ints = stream(range(10)).observe("ints", every=1000)
# observe every 5 seconds
observed_ints = stream(range(10)).observe("ints", every=timedelta(seconds=5))

Observations are logged via logging.getLogger("streamable").info. Set do to do something else with the streamable.Observation:

observed_ints = stream(range(10)).observe("ints", do=custom_logger.info)
observed_ints = stream(range(10)).observe("ints", do=observations.append)
observed_ints = stream(range(10)).observe("ints", do=print)

+

Concatenate a stream with an iterable:

concatenated_ints = stream(range(10)) + range(10)
assert list(concatenated_ints) == [0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 0, 1, 2, 3, 4, 5, 6, 7, 8, 9]

.cast

Provide a type hint for elements:

docs: stream[Any] = stream(['{"foo": "bar"}', '{"foo": "baz"}']).map(json.loads)
dicts: stream[dict[str, str]] = docs.cast(dict[str, str])
# the stream remains the same, it's for type checkers only
assert dicts is docs

.__call__

Iterate as an Iterable until exhaustion, without collecting its elements:

state: list[int] = []
pipeline: stream[int] = stream(range(10)).do(state.append)

pipeline()

assert state == [0, 1, 2, 3, 4, 5, 6, 7, 8, 9]

await

Iterate as an AsyncIterable until exhaustion, without collecting its elements:

state: list[int] = []
pipeline: stream[int] = stream(range(10)).do(state.append)

await pipeline

assert state == [0, 1, 2, 3, 4, 5, 6, 7, 8, 9]

.pipe

Apply a callable, passing the stream as first argument, followed by the provided *args and **kwargs:

import polars as pl

pokemons: stream[str] = ...
pokemons.pipe(pl.DataFrame, schema=["name"]).write_csv("pokemons.csv")

••• other notes

function as source

A stream can also be instantiated from a function (sync or async) that will be called sequentially to get the next source element during iteration.

e.g. stream from a Queue:

queued_ints: queue.Queue[int] = ...
# or asyncio.Queue[int]
ints: stream[int] = stream(queued_ints.get)

starmap

The star function decorator transforms a function (sync or async) that takes several positional arguments into a function that takes a tuple.

from streamable import star

pokemons: stream[str] = ...
enumerated_pokes: stream[str] = (
    stream(enumerate(pokemons))
    .map(star(lambda index, poke: f"#{index + 1} {poke}"))
)
assert list(enumerated_pokes) == ['#1 bulbasaur', '#2 ivysaur', '#3 venusaur', '#4 charmander', '#5 charmeleon', '#6 charizard', '#7 squirtle', '#8 wartortle', '#9 blastoise']

distinct

To collect distinct elements you can set(a_stream).

To deduplicates in the middle of the stream, .filter new values and .do add them into a set (or a fancier cache):

seen: set[str] = set()

unique_ints: stream[int] = (
    stream("001000111")
    .filter(lambda _: _ not in seen)
    .do(seen.add)
    .map(int)
)

assert list(unique_ints) == [0, 1]

vs builtins.map/filter

There is zero overhead during iteration compared to builtins.map and builtins.filter:

odd_int_chars = stream(range(N)).filter(lambda n: n % 2).map(str)

iter(odd_int_chars) visits the operations lineage and returns exactly this iterator:

map(str, filter(lambda n: n % 2, range(N)))

e.g. ETL via dlt

A stream is an expressive way to declare a dlt.resource:

# from datetime import timedelta
# from http import HTTPStatus
# from itertools import count
# import dlt
# import httpx
# from httpx import Response, HTTPStatusError
# from dlt.destinations import filesystem
# from streamable import stream

def not_found(e: HTTPStatusError) -> bool:
    return e.response.status_code == HTTPStatus.NOT_FOUND

@dlt.resource
def pokemons(http_client: httpx.Client, concurrency: int, per_second: int) -> stream[dict]:
    """Ingest Pokémons from the PokéAPI, stop on first 404."""
    return (
        stream(count(1))
        .map(lambda i: f"https://pokeapi.co/api/v2/pokemon-species/{i}")
        .throttle(per_second, per=timedelta(seconds=1))
        .map(http_client.get, concurrency=concurrency, as_completed=True)
        .do(Response.raise_for_status)
        .catch(HTTPStatusError, where=not_found, stop=True)
        .map(Response.json)
        .observe("pokemons")
    )

# Write to a partitioned Delta Lake table, chunk by chunk on-the-fly.
with httpx.Client() as http_client:
    dlt.pipeline(
        pipeline_name="ingest_pokeapi",
        destination=filesystem("deltalake"),
        dataset_name="pokeapi",
    ).run(
        pokemons(http_client, concurrency=8, per_second=32),
        table_format="delta",
        write_disposition='merge',
        columns={
            "id": {"primary_key": True},
            "color__name": {"partition": True},
        },
    )

⭐ links

Project details


Release history Release notifications | RSS feed

Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

streamable-2.0.0.tar.gz (56.3 kB view details)

Uploaded Source

Built Distribution

If you're not sure about the file name format, learn more about wheel file names.

streamable-2.0.0-py3-none-any.whl (44.4 kB view details)

Uploaded Python 3

File details

Details for the file streamable-2.0.0.tar.gz.

File metadata

  • Download URL: streamable-2.0.0.tar.gz
  • Upload date:
  • Size: 56.3 kB
  • Tags: Source
  • Uploaded using Trusted Publishing? No
  • Uploaded via: twine/6.2.0 CPython/3.11.14

File hashes

Hashes for streamable-2.0.0.tar.gz
Algorithm Hash digest
SHA256 0a88e956e3d3b72b4f5169b332f38be9f6dd7da4502123ec23bd96ff6e2bece2
MD5 bdeddf80a7a595317f12a9a1baa1ac82
BLAKE2b-256 79b94501aea43e27db134ae31f78aa8019baccf23e49318c8a535e1588b25c43

See more details on using hashes here.

File details

Details for the file streamable-2.0.0-py3-none-any.whl.

File metadata

  • Download URL: streamable-2.0.0-py3-none-any.whl
  • Upload date:
  • Size: 44.4 kB
  • Tags: Python 3
  • Uploaded using Trusted Publishing? No
  • Uploaded via: twine/6.2.0 CPython/3.11.14

File hashes

Hashes for streamable-2.0.0-py3-none-any.whl
Algorithm Hash digest
SHA256 3b68cd6a600480345df87105a34ea5e8b46b84c102130c06919fd6a129e263c1
MD5 c60a06eb8ec12a8243346c7ced581d92
BLAKE2b-256 fa55f801598be756607b718a01333edaa16f0ec42ec8887e2599ec4ed8d07adc

See more details on using hashes here.

Supported by

AWS Cloud computing and Security Sponsor Datadog Monitoring Depot Continuous Integration Fastly CDN Google Download Analytics Pingdom Monitoring Sentry Error logging StatusPage Status page