Skip to main content

(sync/async) iterable streams for Python

Project description

streamable

(sync/async) iterable streams for Python

stream[T] wraps any Iterable[T] or AsyncIterable[T] with a lazy fluent interface covering concurrency, batching, buffering, rate limiting, progress logging, and error handling.

Python 3.8+ PyPI version Anaconda-Server Badge coverage readthedocs

1. install

pip install streamable

2. import

from streamable import stream

3. init

Create a stream[T] from an Iterable[T] (or AsyncIterable[T]):

ints: stream[int] = stream(range(10))

4. operate

Chain lazy operations:

import logging
from datetime import timedelta
import httpx
from httpx import Response, HTTPStatusError
from streamable import stream

pokemons: stream[str] = (
    stream(range(10))
    .map(lambda i: f"https://pokeapi.co/api/v2/pokemon-species/{i}")
    .throttle(5, per=timedelta(seconds=1))
    .map(httpx.get, concurrency=2)
    .do(Response.raise_for_status)
    .catch(HTTPStatusError, do=logging.warning)
    .map(lambda poke: poke.json()["name"])
)

Source elements will be processed on-the-fly during iteration.

Operations accept both sync and async functions.

5. iterate

A stream[T] is Iterable[T] (and AsyncIterable[T]):

>>> list(pokemons)
['bulbasaur', 'ivysaur', 'venusaur', 'charmander', 'charmeleon', 'charizard', 'squirtle', 'wartortle', 'blastoise']

📒 Operations (docs)

Operations accept both sync and async functions, they can be mixed within the same stream, that can then be consumed as an Iterable or AsyncIterable. Async functions run in the current loop, one is created if needed.

Operations are implemented so that the iteration can resume after an exception.

A stream exposes operations to manipulate its elements, but the I/O is not its responsibility. It's meant to be combined with dedicated libraries like pyarrow, psycopg2, boto3, dlt (ETL example)

.map

Transform elements:

int_chars: stream[str] = stream(range(10)).map(str)

assert list(int_chars) == ['0', '1', '2', '3', '4', '5', '6', '7', '8', '9']

concurrency

Set the concurrency param to apply the transformation concurrently.

Only concurrency upstream elements are in-flight for processing.

Preserve upstream order unless you set as_completed=True.

via threads

If concurrency > 1, the transformation will be applied via concurrency threads:

pokemons: stream[str] = (
    stream(range(1, 4))
    .map(lambda i: f"https://pokeapi.co/api/v2/pokemon-species/{i}")
    .map(httpx.get, concurrency=2)
    .map(lambda poke: poke.json()["name"])
)
assert list(pokemons) == ['bulbasaur', 'ivysaur', 'venusaur']

via async coroutines

If concurrency > 1 and the transformation is async, it will be applied via concurrency async tasks:

# async context
async with httpx.AsyncClient() as http_client:
    pokemons: stream[str] = (
        stream(range(1, 4))
        .map(lambda i: f"https://pokeapi.co/api/v2/pokemon-species/{i}")
        .map(http_client.get, concurrency=2)
        .map(lambda poke: poke.json()["name"])
    )
    assert [name async for name in pokemons] == ['bulbasaur', 'ivysaur', 'venusaur']
# sync context
with asyncio.Runner() as runner:
    http_client = httpx.AsyncClient()
    pokemons: stream[str] = (
        stream(range(1, 4))
        .map(lambda i: f"https://pokeapi.co/api/v2/pokemon-species/{i}")
        .map(http_client.get, concurrency=2)
        .map(lambda poke: poke.json()["name"])
    )
    # uses runner's loop
    assert list(pokemons) == ['bulbasaur', 'ivysaur', 'venusaur']
    runner.run(http_client.aclose())

via processes

concurrency can also be a concurrent.futures.Executor, pass a ProcessPoolExecutor to apply the transformations via processes:

if __name__ == "__main__":
    with ProcessPoolExecutor(max_workers=10) as processes:
        state: list[int] = []
        # ints are mapped
        assert list(
            stream(range(10))
            .map(state.append, concurrency=processes)
        ) == [None] * 10
        # the `state` of the main process is not mutated
        assert state == []

.do

Perform side effects:

state: list[int] = []
store_ints: stream[int] = stream(range(10)).do(state.append)

assert list(store_ints) == [0, 1, 2, 3, 4, 5, 6, 7, 8, 9]
assert state == [0, 1, 2, 3, 4, 5, 6, 7, 8, 9]

concurrency

Same as .map.

.group

Group elements into batches...

... up_to a given batch size:

int_batches: stream[list[int]] = stream(range(10)).group(5)

assert list(int_batches) == [[0, 1, 2, 3, 4], [5, 6, 7, 8, 9]]

... within a given time interval:

from datetime import timedelta

int_1sec_batches: stream[list[int]] = (
    stream(range(10))
    .throttle(2, per=timedelta(seconds=1))
    .group(within=timedelta(seconds=0.99))
)

assert list(int_1sec_batches) == [[0, 1], [2, 3], [4, 5], [6, 7], [8, 9]]

... by a given key, yielding (key, elements) pairs:

ints_by_parity: stream[tuple[str, list[int]]] = (
    stream(range(10))
    .group(by=lambda n: "odd" if n % 2 else "even")
)

assert list(ints_by_parity) == [("even", [0, 2, 4, 6, 8]), ("odd", [1, 3, 5, 7, 9])]

You can combine these parameters.

.flatten

Explode upstream elements (Iterable or AsyncIterable):

chars: stream[str] = stream(["hel", "lo!"]).flatten()

assert list(chars) == ["h", "e", "l", "l", "o", "!"]

concurrency

Flattens concurrency iterables concurrently (via threads for Iterable elements and via coroutines for AsyncIterable elements):

chars: stream[str] = stream(["hel", "lo", "!"]).flatten(concurrency=2)

assert list(chars) == ["h", "l", "e", "o", "l", "!"]

.filter

Filter elements satisfying a predicate:

even_ints: stream[int] = stream(range(10)).filter(lambda n: n % 2 == 0)

assert list(even_ints) == [0, 2, 4, 6, 8]

.take

Take a given number of elements:

first_5_ints: stream[int] = stream(range(10)).take(5)

assert list(first_5_ints) == [0, 1, 2, 3, 4]

... or take until a predicate is satisfied:

first_5_ints: stream[int] = stream(range(10)).take(until=lambda n: n == 5)

assert list(first_5_ints) == [0, 1, 2, 3, 4]

.skip

Skip a given number of elements:

ints_after_5: stream[int] = stream(range(10)).skip(5)

assert list(ints_after_5) == [5, 6, 7, 8, 9]

... or skip until a predicate is satisfied:

ints_after_5: stream[int] = stream(range(10)).skip(until=lambda n: n >= 5)

assert list(ints_after_5) == [5, 6, 7, 8, 9]

.catch

Catch exceptions of a given type:

inverses: stream[float] = (
    stream(range(10))
    .map(lambda n: round(1 / n, 2))
    .catch(ZeroDivisionError)
)

assert list(inverses) == [1.0, 0.5, 0.33, 0.25, 0.2, 0.17, 0.14, 0.12, 0.11]

... where a predicate is satisfied:

domains = ["github.com", "foo.bar", "google.com"]

resolvable_domains: stream[str] = (
    stream(domains)
    .do(lambda domain: httpx.get(f"https://{domain}"), concurrency=2)
    .catch(httpx.HTTPError, where=lambda e: "not known" in str(e))
)

assert list(resolvable_domains) == ["github.com", "google.com"]

... do a side effect on catch:

errors: list[Exception] = []
inverses: stream[float] = (
    stream(range(10))
    .map(lambda n: round(1 / n, 2))
    .catch(ZeroDivisionError, do=errors.append)
)
assert list(inverses) == [1.0, 0.5, 0.33, 0.25, 0.2, 0.17, 0.14, 0.12, 0.11]
assert len(errors) == 1

... replace with a value:

inverses: stream[float] = (
    stream(range(10))
    .map(lambda n: round(1 / n, 2))
    .catch(ZeroDivisionError, replace=lambda e: float("inf"))
)

assert list(inverses) == [float("inf"), 1.0, 0.5, 0.33, 0.25, 0.2, 0.17, 0.14, 0.12, 0.11]

... stop=True to stop the iteration if an exception is caught:

inverses: stream[float] = (
    stream(range(10))
    .map(lambda n: round(1 / n, 2))
    .catch(ZeroDivisionError, stop=True)
)

assert list(inverses) == []

You can combine these parameters.

.throttle

Limit the number of emissions per time interval:

from datetime import timedelta

three_ints_per_second: stream[int] = stream(range(10)).throttle(3, per=timedelta(seconds=1))

# collects 10 ints in 3 seconds
assert list(three_ints_per_second) == [0, 1, 2, 3, 4, 5, 6, 7, 8, 9]

.buffer

Buffer upstream elements into a bounded queue via a background task (decoupling upstream production rate from downstream consumption rate):

pulled: list[int] = []
buffered_ints = iter(
    stream(range(10))
    .do(pulled.append)
    .buffer(5)
)
assert next(buffered_ints) == 0
time.sleep(1e-3)
assert pulled == [0, 1, 2, 3, 4, 5]

.observe

Observe the iteration progress:

observed_ints: stream[int] = stream(range(10)).observe("ints")
assert list(observed_ints) == [0, 1, 2, 3, 4, 5, 6, 7, 8, 9]

logs:

2025-12-23T16:43:07Z INFO observed=ints elapsed=0:00:00.000019 errors=0 elements=1
2025-12-23T16:43:07Z INFO observed=ints elapsed=0:00:00.001117 errors=0 elements=2
2025-12-23T16:43:07Z INFO observed=ints elapsed=0:00:00.001147 errors=0 elements=4
2025-12-23T16:43:07Z INFO observed=ints elapsed=0:00:00.001162 errors=0 elements=8
2025-12-23T16:43:07Z INFO observed=ints elapsed=0:00:00.001179 errors=0 elements=10

Logs are produced when the counts reach powers of 2. Set every to produce them periodically:

# observe every 1k elements (or errors)
observed_ints = stream(range(10)).observe("ints", every=1000)
# observe every 5 seconds
observed_ints = stream(range(10)).observe("ints", every=timedelta(seconds=5))

Observations are logged via logging.getLogger("streamable").info. Set do to do something else with the streamable.Observation:

observed_ints = stream(range(10)).observe("ints", do=custom_logger.info)
observed_ints = stream(range(10)).observe("ints", do=observations.append)
observed_ints = stream(range(10)).observe("ints", do=print)

+

Concatenate a stream with an iterable:

concatenated_ints = stream(range(10)) + range(10)
assert list(concatenated_ints) == [0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 0, 1, 2, 3, 4, 5, 6, 7, 8, 9]

.cast

Provide a type hint for elements:

docs: stream[Any] = stream(['{"foo": "bar"}', '{"foo": "baz"}']).map(json.loads)
dicts: stream[dict[str, str]] = docs.cast(dict[str, str])
# the stream remains the same, it's for type checkers only
assert dicts is docs

.__call__

Iterate as an Iterable until exhaustion, without collecting its elements:

state: list[int] = []
pipeline: stream[int] = stream(range(10)).do(state.append)

pipeline()

assert state == [0, 1, 2, 3, 4, 5, 6, 7, 8, 9]

await

Iterate as an AsyncIterable until exhaustion, without collecting its elements:

state: list[int] = []
pipeline: stream[int] = stream(range(10)).do(state.append)

await pipeline

assert state == [0, 1, 2, 3, 4, 5, 6, 7, 8, 9]

.pipe

Apply a callable, passing the stream as first argument, followed by the provided *args and **kwargs:

import polars as pl

pokemons: stream[str] = ...
pokemons.pipe(pl.DataFrame, schema=["name"]).write_csv("pokemons.csv")

••• other notes

function as source

A stream can also be instantiated from a function (sync or async) that will be called sequentially to get the next source element during iteration.

e.g. stream from a Queue:

queued_ints: queue.Queue[int] = ...
# or asyncio.Queue[int]
ints: stream[int] = stream(queued_ints.get)

starmap

The star function decorator transforms a function (sync or async) that takes several positional arguments into a function that takes a tuple.

from streamable import star

pokemons: stream[str] = ...
enumerated_pokes: stream[str] = (
    stream(enumerate(pokemons))
    .map(star(lambda index, poke: f"#{index + 1} {poke}"))
)
assert list(enumerated_pokes) == ['#1 bulbasaur', '#2 ivysaur', '#3 venusaur', '#4 charmander', '#5 charmeleon', '#6 charizard', '#7 squirtle', '#8 wartortle', '#9 blastoise']

distinct

To collect distinct elements you can set(a_stream).

To deduplicates in the middle of the stream, .filter new values and .do add them into a set (or a fancier cache):

seen: set[str] = set()

unique_ints: stream[int] = (
    stream("001000111")
    .filter(lambda _: _ not in seen)
    .do(seen.add)
    .map(int)
)

assert list(unique_ints) == [0, 1]

vs builtins.map/filter

There is zero overhead during iteration compared to builtins.map and builtins.filter:

odd_int_chars = stream(range(N)).filter(lambda n: n % 2).map(str)

iter(odd_int_chars) visits the operations lineage and returns exactly this iterator:

map(str, filter(lambda n: n % 2, range(N)))

e.g. ETL via dlt

A stream is an expressive way to declare a dlt.resource:

# from datetime import timedelta
# from http import HTTPStatus
# from itertools import count
# import dlt
# import httpx
# from httpx import Response, HTTPStatusError
# from dlt.destinations import filesystem
# from streamable import stream

def not_found(e: HTTPStatusError) -> bool:
    return e.response.status_code == HTTPStatus.NOT_FOUND

@dlt.resource
def pokemons(http_client: httpx.Client, concurrency: int, per_second: int) -> stream[dict]:
    """Ingest Pokémons from the PokéAPI, stop on first 404."""
    return (
        stream(count(1))
        .map(lambda i: f"https://pokeapi.co/api/v2/pokemon-species/{i}")
        .throttle(per_second, per=timedelta(seconds=1))
        .map(http_client.get, concurrency=concurrency, as_completed=True)
        .do(Response.raise_for_status)
        .catch(HTTPStatusError, where=not_found, stop=True)
        .map(Response.json)
        .observe("pokemons")
    )

# Write to a partitioned Delta Lake table, chunk by chunk on-the-fly.
with httpx.Client() as http_client:
    dlt.pipeline(
        pipeline_name="ingest_pokeapi",
        destination=filesystem("deltalake"),
        dataset_name="pokeapi",
    ).run(
        pokemons(http_client, concurrency=8, per_second=32),
        table_format="delta",
        columns={"color__name": {"partition": True}},
    )

⭐ links

Project details


Release history Release notifications | RSS feed

Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

streamable-2.0.0rc11.tar.gz (56.1 kB view details)

Uploaded Source

Built Distribution

If you're not sure about the file name format, learn more about wheel file names.

streamable-2.0.0rc11-py3-none-any.whl (44.3 kB view details)

Uploaded Python 3

File details

Details for the file streamable-2.0.0rc11.tar.gz.

File metadata

  • Download URL: streamable-2.0.0rc11.tar.gz
  • Upload date:
  • Size: 56.1 kB
  • Tags: Source
  • Uploaded using Trusted Publishing? No
  • Uploaded via: twine/6.2.0 CPython/3.11.14

File hashes

Hashes for streamable-2.0.0rc11.tar.gz
Algorithm Hash digest
SHA256 e4de9a7a6e95ac382473f04f731d1d1d1724cea0e86d3d18caf7995dd17711bd
MD5 112af732f6757a84f24454dad94bf6f8
BLAKE2b-256 aabd73edc5a98db1b65148d74c1802eb6ca69545a775d6bbeeb3a5d51d656fbb

See more details on using hashes here.

File details

Details for the file streamable-2.0.0rc11-py3-none-any.whl.

File metadata

  • Download URL: streamable-2.0.0rc11-py3-none-any.whl
  • Upload date:
  • Size: 44.3 kB
  • Tags: Python 3
  • Uploaded using Trusted Publishing? No
  • Uploaded via: twine/6.2.0 CPython/3.11.14

File hashes

Hashes for streamable-2.0.0rc11-py3-none-any.whl
Algorithm Hash digest
SHA256 ea8623d8b9a0cf24f641c2254b9a292954908b2c6e8ca2296a500602b72ffc27
MD5 61e4eaa6803f6a5ca53eba5c85567c9c
BLAKE2b-256 1347850fca68fabe0359165a4531cd31872948ca84a015a702ed95c420486083

See more details on using hashes here.

Supported by

AWS Cloud computing and Security Sponsor Datadog Monitoring Depot Continuous Integration Fastly CDN Google Download Analytics Pingdom Monitoring Sentry Error logging StatusPage Status page