Skip to main content

(sync/async) iterable streams for Python

Project description

streamable

(sync/async) iterable streams for Python

stream[T] wraps any Iterable[T] or AsyncIterable[T] with a lazy fluent interface covering concurrency, batching, buffering, rate limiting, progress logging, and error handling.

Python 3.8+ PyPI version Anaconda-Server Badge coverage readthedocs

1. install

pip install streamable

2. import

from streamable import stream

3. init

Create a stream[T] from an Iterable[T] (or AsyncIterable[T]):

ints: stream[int] = stream(range(10))

4. operate

Chain lazy operations:

import logging
from datetime import timedelta
import httpx
from httpx import Response, HTTPStatusError
from streamable import stream

pokemons: stream[str] = (
    stream(range(10))
    .map(lambda i: f"https://pokeapi.co/api/v2/pokemon-species/{i}")
    .throttle(5, per=timedelta(seconds=1))
    .map(httpx.get, concurrency=2)
    .do(Response.raise_for_status)
    .catch(HTTPStatusError, do=logging.warning)
    .map(lambda poke: poke.json()["name"])
)

Source elements will be processed on-the-fly during iteration.

Operations accept both sync and async functions.

5. iterate

A stream[T] is Iterable[T] (and AsyncIterable[T]):

>>> list(pokemons)
['bulbasaur', 'ivysaur', 'venusaur', 'charmander', 'charmeleon', 'charizard', 'squirtle', 'wartortle', 'blastoise']

📒 Operations (docs)

Operations accept both sync and async functions, they can be mixed within the same stream, that can then be consumed as an Iterable or AsyncIterable.

Operations are implemented so that the iteration can resume after an exception.

A stream exposes operations to manipulate its elements, but the I/O is not its responsibility. It's meant to be combined with dedicated libraries like pyarrow, psycopg2, boto3, dlt (ETL example)

.map

Transform elements:

int_chars: stream[str] = stream(range(10)).map(str)

assert list(int_chars) == ['0', '1', '2', '3', '4', '5', '6', '7', '8', '9']

concurrency

Set the concurrency param to apply the transformation concurrently, only concurrency upstream elements are pulled for processing; the next upstream element is pulled only when a result is yielded downstream.

It preserves the upstream order by default, set as_completed=True to yield results as they become available.

via threads

If concurrency > 1, the transformation will be applied via concurrency threads:

pokemons: stream[str] = (
    stream(range(1, 4))
    .map(lambda i: f"https://pokeapi.co/api/v2/pokemon-species/{i}")
    .map(httpx.get, concurrency=2)
    .map(lambda poke: poke.json()["name"])
)
assert list(pokemons) == ['bulbasaur', 'ivysaur', 'venusaur']

via async coroutines

If concurrency > 1 and the transformation is async, it will be applied via concurrency async tasks:

# async context
async with httpx.AsyncClient() as http_client:
    pokemons: stream[str] = (
        stream(range(1, 4))
        .map(lambda i: f"https://pokeapi.co/api/v2/pokemon-species/{i}")
        .map(http_client.get, concurrency=2)
        .map(lambda poke: poke.json()["name"])
    )
    # uses current running loop
    assert [name async for name in pokemons] == ['bulbasaur', 'ivysaur', 'venusaur']
# sync context
with asyncio.Runner() as runner:
    http_client = httpx.AsyncClient()
    pokemons: stream[str] = (
        stream(range(1, 4))
        .map(lambda i: f"https://pokeapi.co/api/v2/pokemon-species/{i}")
        .map(http_client.get, concurrency=2)
        .map(lambda poke: poke.json()["name"])
    )
    # uses runner's loop
    assert list(pokemons) == ['bulbasaur', 'ivysaur', 'venusaur']
    runner.run(http_client.aclose)

via processes

concurrency can also be a concurrent.futures.Executor, pass a ProcessPoolExecutor to apply the transformations via processes:

if __name__ == "__main__":
    with ProcessPoolExecutor(max_workers=10) as processes:
        state: list[int] = []
        # ints are mapped
        assert list(
            stream(range(10))
            .map(state.append, concurrency=processes)
        ) == [None] * 10
        # the `state` of the main process is not mutated
        assert state == []

.do

Perform side effects:

state: list[int] = []
store_ints: stream[int] = stream(range(10)).do(state.append)

assert list(store_ints) == [0, 1, 2, 3, 4, 5, 6, 7, 8, 9]
assert state == [0, 1, 2, 3, 4, 5, 6, 7, 8, 9]

concurrency

Same as .map.

.group

Group elements into batches...

... up_to a given batch size:

int_batches: stream[list[int]] = stream(range(10)).group(5)

assert list(int_batches) == [[0, 1, 2, 3, 4], [5, 6, 7, 8, 9]]

... within a given time interval:

from datetime import timedelta

int_1sec_batches: stream[list[int]] = (
    stream(range(10))
    .throttle(2, per=timedelta(seconds=1))
    .group(within=timedelta(seconds=0.99))
)

assert list(int_1sec_batches) == [[0, 1], [2, 3], [4, 5], [6, 7], [8, 9]]

... by a given key, yielding (key, elements) pairs:

ints_by_parity: stream[tuple[str, list[int]]] = (
    stream(range(10))
    .group(by=lambda n: "odd" if n % 2 else "even")
)

assert list(ints_by_parity) == [("even", [0, 2, 4, 6, 8]), ("odd", [1, 3, 5, 7, 9])]

You can combine these parameters.

.flatten

Explode upstream elements (Iterable or AsyncIterable):

chars: stream[str] = stream(["hel", "lo!"]).flatten()

assert list(chars) == ["h", "e", "l", "l", "o", "!"]

concurrency

Flattens concurrency iterables concurrently (via threads for Iterable elements and via coroutines for AsyncIterable elements):

chars: stream[str] = stream(["hel", "lo", "!"]).flatten(concurrency=2)

assert list(chars) == ["h", "l", "e", "o", "l", "!"]

.filter

Filter elements satisfying a predicate:

even_ints: stream[int] = stream(range(10)).filter(lambda n: n % 2 == 0)

assert list(even_ints) == [0, 2, 4, 6, 8]

.take

Take a given number of elements:

first_5_ints: stream[int] = stream(range(10)).take(5)

assert list(first_5_ints) == [0, 1, 2, 3, 4]

... or take until a predicate is satisfied:

first_5_ints: stream[int] = stream(range(10)).take(until=lambda n: n == 5)

assert list(first_5_ints) == [0, 1, 2, 3, 4]

.skip

Skip a given number of elements:

ints_after_5: stream[int] = stream(range(10)).skip(5)

assert list(ints_after_5) == [5, 6, 7, 8, 9]

... or skip until a predicate is satisfied:

ints_after_5: stream[int] = stream(range(10)).skip(until=lambda n: n >= 5)

assert list(ints_after_5) == [5, 6, 7, 8, 9]

.catch

Catch exceptions of a given type:

inverses: stream[float] = (
    stream(range(10))
    .map(lambda n: round(1 / n, 2))
    .catch(ZeroDivisionError)
)

assert list(inverses) == [1.0, 0.5, 0.33, 0.25, 0.2, 0.17, 0.14, 0.12, 0.11]

... where a predicate is satisfied:

domains = ["github.com", "foo.bar", "google.com"]

resolvable_domains: stream[str] = (
    stream(domains)
    .do(lambda domain: httpx.get(f"https://{domain}"), concurrency=2)
    .catch(httpx.HTTPError, where=lambda e: "not known" in str(e))
)

assert list(resolvable_domains) == ["github.com", "google.com"]

... do a side effect on catch:

errors: list[Exception] = []
inverses: stream[float] = (
    stream(range(10))
    .map(lambda n: round(1 / n, 2))
    .catch(ZeroDivisionError, do=errors.append)
)
assert list(inverses) == [1.0, 0.5, 0.33, 0.25, 0.2, 0.17, 0.14, 0.12, 0.11]
assert len(errors) == 1

... replace with a value:

inverses: stream[float] = (
    stream(range(10))
    .map(lambda n: round(1 / n, 2))
    .catch(ZeroDivisionError, replace=lambda e: float("inf"))
)

assert list(inverses) == [float("inf"), 1.0, 0.5, 0.33, 0.25, 0.2, 0.17, 0.14, 0.12, 0.11]

... stop=True to stop the iteration if an exception is caught:

inverses: stream[float] = (
    stream(range(10))
    .map(lambda n: round(1 / n, 2))
    .catch(ZeroDivisionError, stop=True)
)

assert list(inverses) == []

You can combine these parameters.

.throttle

Limit the number of emissions per time interval:

from datetime import timedelta

three_ints_per_second: stream[int] = stream(range(10)).throttle(3, per=timedelta(seconds=1))

# collects 10 ints in 3 seconds
assert list(three_ints_per_second) == [0, 1, 2, 3, 4, 5, 6, 7, 8, 9]

.buffer

Buffer upstream elements into a bounded queue via a background task (decoupling upstream production rate from downstream consumption rate):

pulled: list[int] = []
buffered_ints = iter(
    stream(range(10))
    .do(pulled.append)
    .buffer(5)
)
assert next(buffered_ints) == 0
time.sleep(1e-3)
assert pulled == [0, 1, 2, 3, 4, 5]

.observe

Observe the iteration progress:

observed_ints: stream[int] = stream(range(10)).observe("ints")
assert list(observed_ints) == [0, 1, 2, 3, 4, 5, 6, 7, 8, 9]

logs:

2025-12-23T16:43:07Z INFO observed=ints elapsed=0:00:00.000019 errors=0 elements=1
2025-12-23T16:43:07Z INFO observed=ints elapsed=0:00:00.001117 errors=0 elements=2
2025-12-23T16:43:07Z INFO observed=ints elapsed=0:00:00.001147 errors=0 elements=4
2025-12-23T16:43:07Z INFO observed=ints elapsed=0:00:00.001162 errors=0 elements=8
2025-12-23T16:43:07Z INFO observed=ints elapsed=0:00:00.001179 errors=0 elements=10

Logs are produced when the counts reach powers of 2. Set every to produce them periodically:

# observe every 1k elements (or errors)
observed_ints = stream(range(10)).observe("ints", every=1000)
# observe every 5 seconds
observed_ints = stream(range(10)).observe("ints", every=timedelta(seconds=5))

Observations are logged via logging.getLogger("streamable").info. Set do to do something else with the streamable.Observation:

observed_ints = stream(range(10)).observe("ints", do=custom_logger.info)
observed_ints = stream(range(10)).observe("ints", do=observations.append)
observed_ints = stream(range(10)).observe("ints", do=print)

+

Concatenate a stream with an iterable:

concatenated_ints = stream(range(10)) + range(10)
assert list(concatenated_ints) == [0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 0, 1, 2, 3, 4, 5, 6, 7, 8, 9]

.cast

Provide a type hint for elements:

docs: stream[Any] = stream(['{"foo": "bar"}', '{"foo": "baz"}']).map(json.loads)
dicts: stream[dict[str, str]] = docs.cast(dict[str, str])
# the stream remains the same, it's for type checkers only
assert dicts is docs

.__call__

Iterate as an Iterable until exhaustion, without collecting its elements:

state: list[int] = []
pipeline: stream[int] = stream(range(10)).do(state.append)

pipeline()

assert state == [0, 1, 2, 3, 4, 5, 6, 7, 8, 9]

await

Iterate as an AsyncIterable until exhaustion, without collecting its elements:

state: list[int] = []
pipeline: stream[int] = stream(range(10)).do(state.append)

await pipeline

assert state == [0, 1, 2, 3, 4, 5, 6, 7, 8, 9]

.pipe

Apply a callable, passing the stream as first argument, followed by the provided *args and **kwargs:

import polars as pl

pokemons: stream[str] = ...
pokemons.pipe(pl.DataFrame, schema=["name"]).write_csv("pokemons.csv")

••• other notes

function as source

A stream can also be instantiated from a (sync or async) function that will be called sequentially to get the next source element during iteration.

e.g. stream from a Queue:

queued_ints: queue.Queue[int] = ...
# or asyncio.Queue[int]
ints: stream[int] = stream(queued_ints.get)

starmap

The star function decorator transforms a function (sync or async) that takes several positional arguments into a function that takes a tuple.

from streamable import star

pokemons: stream[str] = ...
enumerated_pokes: stream[str] = (
    stream(enumerate(pokemons))
    .map(star(lambda index, poke: f"#{index + 1} {poke}"))
)
assert list(enumerated_pokes) == ['#1 bulbasaur', '#2 ivysaur', '#3 venusaur', '#4 charmander', '#5 charmeleon', '#6 charizard', '#7 squirtle', '#8 wartortle', '#9 blastoise']

distinct

To collect distinct elements you can set(a_stream).

To deduplicates in the middle of the stream, .filter new values and .do add them into a set (or a fancier cache):

seen: set[str] = set()

unique_ints: stream[int] = (
    stream("001000111")
    .filter(lambda _: _ not in seen)
    .do(seen.add)
    .map(int)
)

assert list(unique_ints) == [0, 1]

vs builtins.map/filter

There is zero overhead during iteration compared to builtins.map and builtins.filter:

odd_int_chars = stream(range(N)).filter(lambda n: n % 2).map(str)

iter(odd_int_chars) visits the operations lineage and returns exactly this iterator:

map(str, filter(lambda n: n % 2, range(N)))

e.g. ETL via dlt

A stream is an expressive way to declare a dlt.resource:

# from datetime import timedelta
# from http import HTTPStatus
# from itertools import count
# import dlt
# import httpx
# from httpx import Response, HTTPStatusError
# from dlt.destinations import filesystem
# from streamable import stream

def not_found(e: HTTPStatusError) -> bool:
    return e.response.status_code == HTTPStatus.NOT_FOUND

@dlt.resource
def pokemons(http_client: httpx.Client, concurrency: int, per_second: int) -> stream[dict]:
    """Ingest Pokémons from the PokéAPI, stop on first 404."""
    return (
        stream(count(1))
        .map(lambda i: f"https://pokeapi.co/api/v2/pokemon-species/{i}")
        .throttle(per_second, per=timedelta(seconds=1))
        .map(http_client.get, concurrency=concurrency, as_completed=True)
        .do(Response.raise_for_status)
        .catch(HTTPStatusError, where=not_found, stop=True)
        .map(Response.json)
        .observe("pokemons")
    )

# Write to a partitioned Delta Lake table, chunk by chunk on-the-fly.
with httpx.Client() as http_client:
    dlt.pipeline(
        pipeline_name="ingest_pokeapi",
        destination=filesystem("deltalake"),
        dataset_name="pokeapi",
    ).run(
        pokemons(http_client, concurrency=8, per_second=32),
        table_format="delta",
        columns={"color__name": {"partition": True}},
    )

links

Project details


Release history Release notifications | RSS feed

Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

streamable-2.0.0rc9.tar.gz (55.5 kB view details)

Uploaded Source

Built Distribution

If you're not sure about the file name format, learn more about wheel file names.

streamable-2.0.0rc9-py3-none-any.whl (44.1 kB view details)

Uploaded Python 3

File details

Details for the file streamable-2.0.0rc9.tar.gz.

File metadata

  • Download URL: streamable-2.0.0rc9.tar.gz
  • Upload date:
  • Size: 55.5 kB
  • Tags: Source
  • Uploaded using Trusted Publishing? No
  • Uploaded via: twine/6.2.0 CPython/3.11.14

File hashes

Hashes for streamable-2.0.0rc9.tar.gz
Algorithm Hash digest
SHA256 534c72e4150809536fcbd67ca7409ef8634e7eb900d7899eefcd565fb1fe918a
MD5 ffb6e9855607baf1c7f07a8be5533782
BLAKE2b-256 943f140b70bdf24525a8503f15ac3a4fa2f66ddb151da40cf0a39b91023c9562

See more details on using hashes here.

File details

Details for the file streamable-2.0.0rc9-py3-none-any.whl.

File metadata

  • Download URL: streamable-2.0.0rc9-py3-none-any.whl
  • Upload date:
  • Size: 44.1 kB
  • Tags: Python 3
  • Uploaded using Trusted Publishing? No
  • Uploaded via: twine/6.2.0 CPython/3.11.14

File hashes

Hashes for streamable-2.0.0rc9-py3-none-any.whl
Algorithm Hash digest
SHA256 7c712defd52b33769e83f387bc6f21bfd18b29e530e1853280f2a239d631f02e
MD5 76787cff54e53bf40bb3a6270b4d28e7
BLAKE2b-256 7c7ca539565898995b78482066419756feb581d7731a70fb36dc83f1f07f0e07

See more details on using hashes here.

Supported by

AWS Cloud computing and Security Sponsor Datadog Monitoring Depot Continuous Integration Fastly CDN Google Download Analytics Pingdom Monitoring Sentry Error logging StatusPage Status page