(sync/async) iterable streams for Python
Project description
༄ streamable
(sync/async) iterable streams for Python
stream[T] wraps any Iterable[T] or AsyncIterable[T] with a lazy fluent interface covering concurrency, batching, buffering, rate limiting, progress logging, and error handling.
1. install
pip install streamable
2. import
from streamable import stream
3. init
Create a stream[T] from an Iterable[T] (or AsyncIterable[T]):
ints: stream[int] = stream(range(10))
4. operate
Chain lazy operations:
import logging
from datetime import timedelta
import httpx
from httpx import Response, HTTPStatusError
from streamable import stream
pokemons: stream[str] = (
stream(range(10))
.map(lambda i: f"https://pokeapi.co/api/v2/pokemon-species/{i}")
.throttle(5, per=timedelta(seconds=1))
.map(httpx.get, concurrency=2)
.do(Response.raise_for_status)
.catch(HTTPStatusError, do=logging.warning)
.map(lambda poke: poke.json()["name"])
)
Source elements will be processed on-the-fly during iteration.
Operations accept both sync and async functions.
5. iterate
A stream[T] is Iterable[T] (and AsyncIterable[T]):
>>> list(pokemons)
['bulbasaur', 'ivysaur', 'venusaur', 'charmander', 'charmeleon', 'charizard', 'squirtle', 'wartortle', 'blastoise']
📒 Operations (docs)
.mapelements.doside effects on elements.groupelements into batches.flatteniterable elements.filterelements.takeelements until ....skipelements until ....catchexceptions.throttlethe rate of iteration.bufferelements.observethe iteration progress
Operations accept both sync and async functions, they can be mixed within the same stream, that can then be consumed as an Iterable or AsyncIterable. Async functions run in the current loop, one is created if needed.
Operations are implemented so that the iteration can resume after an exception.
A stream exposes operations to manipulate its elements, but the I/O is not its responsibility. It's meant to be combined with dedicated libraries like pyarrow, psycopg2, boto3, dlt (ETL example)
▼ .map
Transform elements:
int_chars: stream[str] = stream(range(10)).map(str)
assert list(int_chars) == ['0', '1', '2', '3', '4', '5', '6', '7', '8', '9']
concurrency
Set the concurrency param to apply the transformation concurrently.
Only concurrency upstream elements are in-flight for processing.
Preserve upstream order unless you set as_completed=True.
via threads
If concurrency > 1, the transformation will be applied via concurrency threads:
pokemons: stream[str] = (
stream(range(1, 4))
.map(lambda i: f"https://pokeapi.co/api/v2/pokemon-species/{i}")
.map(httpx.get, concurrency=2)
.map(lambda poke: poke.json()["name"])
)
assert list(pokemons) == ['bulbasaur', 'ivysaur', 'venusaur']
via async coroutines
If concurrency > 1 and the transformation is async, it will be applied via concurrency async tasks:
# async context
async with httpx.AsyncClient() as http_client:
pokemons: stream[str] = (
stream(range(1, 4))
.map(lambda i: f"https://pokeapi.co/api/v2/pokemon-species/{i}")
.map(http_client.get, concurrency=2)
.map(lambda poke: poke.json()["name"])
)
assert [name async for name in pokemons] == ['bulbasaur', 'ivysaur', 'venusaur']
# sync context
with asyncio.Runner() as runner:
http_client = httpx.AsyncClient()
pokemons: stream[str] = (
stream(range(1, 4))
.map(lambda i: f"https://pokeapi.co/api/v2/pokemon-species/{i}")
.map(http_client.get, concurrency=2)
.map(lambda poke: poke.json()["name"])
)
# uses runner's loop
assert list(pokemons) == ['bulbasaur', 'ivysaur', 'venusaur']
runner.run(http_client.aclose())
via processes
concurrency can also be a concurrent.futures.Executor, pass a ProcessPoolExecutor to apply the transformations via processes:
if __name__ == "__main__":
with ProcessPoolExecutor(max_workers=10) as processes:
state: list[int] = []
# ints are mapped
assert list(
stream(range(10))
.map(state.append, concurrency=processes)
) == [None] * 10
# the `state` of the main process is not mutated
assert state == []
▼ .do
Perform side effects:
state: list[int] = []
store_ints: stream[int] = stream(range(10)).do(state.append)
assert list(store_ints) == [0, 1, 2, 3, 4, 5, 6, 7, 8, 9]
assert state == [0, 1, 2, 3, 4, 5, 6, 7, 8, 9]
concurrency
Same as .map.
▼ .group
Group elements into batches...
... up_to a given batch size:
int_batches: stream[list[int]] = stream(range(10)).group(5)
assert list(int_batches) == [[0, 1, 2, 3, 4], [5, 6, 7, 8, 9]]
... within a given time interval:
from datetime import timedelta
int_1sec_batches: stream[list[int]] = (
stream(range(10))
.throttle(2, per=timedelta(seconds=1))
.group(within=timedelta(seconds=0.99))
)
assert list(int_1sec_batches) == [[0, 1], [2, 3], [4, 5], [6, 7], [8, 9]]
... by a given key, yielding (key, elements) pairs:
ints_by_parity: stream[tuple[str, list[int]]] = (
stream(range(10))
.group(by=lambda n: "odd" if n % 2 else "even")
)
assert list(ints_by_parity) == [("even", [0, 2, 4, 6, 8]), ("odd", [1, 3, 5, 7, 9])]
You can combine these parameters.
▼ .flatten
Explode upstream elements (Iterable or AsyncIterable):
chars: stream[str] = stream(["hel", "lo!"]).flatten()
assert list(chars) == ["h", "e", "l", "l", "o", "!"]
concurrency
Flattens concurrency iterables concurrently (via threads for Iterable elements and via coroutines for AsyncIterable elements):
chars: stream[str] = stream(["hel", "lo", "!"]).flatten(concurrency=2)
assert list(chars) == ["h", "l", "e", "o", "l", "!"]
▼ .filter
Filter elements satisfying a predicate:
even_ints: stream[int] = stream(range(10)).filter(lambda n: n % 2 == 0)
assert list(even_ints) == [0, 2, 4, 6, 8]
▼ .take
Take a given number of elements:
first_5_ints: stream[int] = stream(range(10)).take(5)
assert list(first_5_ints) == [0, 1, 2, 3, 4]
... or take until a predicate is satisfied:
first_5_ints: stream[int] = stream(range(10)).take(until=lambda n: n == 5)
assert list(first_5_ints) == [0, 1, 2, 3, 4]
▼ .skip
Skip a given number of elements:
ints_after_5: stream[int] = stream(range(10)).skip(5)
assert list(ints_after_5) == [5, 6, 7, 8, 9]
... or skip until a predicate is satisfied:
ints_after_5: stream[int] = stream(range(10)).skip(until=lambda n: n >= 5)
assert list(ints_after_5) == [5, 6, 7, 8, 9]
▼ .catch
Catch exceptions of a given type:
inverses: stream[float] = (
stream(range(10))
.map(lambda n: round(1 / n, 2))
.catch(ZeroDivisionError)
)
assert list(inverses) == [1.0, 0.5, 0.33, 0.25, 0.2, 0.17, 0.14, 0.12, 0.11]
... where a predicate is satisfied:
domains = ["github.com", "foo.bar", "google.com"]
resolvable_domains: stream[str] = (
stream(domains)
.do(lambda domain: httpx.get(f"https://{domain}"), concurrency=2)
.catch(httpx.HTTPError, where=lambda e: "not known" in str(e))
)
assert list(resolvable_domains) == ["github.com", "google.com"]
... do a side effect on catch:
errors: list[Exception] = []
inverses: stream[float] = (
stream(range(10))
.map(lambda n: round(1 / n, 2))
.catch(ZeroDivisionError, do=errors.append)
)
assert list(inverses) == [1.0, 0.5, 0.33, 0.25, 0.2, 0.17, 0.14, 0.12, 0.11]
assert len(errors) == 1
... replace with a value:
inverses: stream[float] = (
stream(range(10))
.map(lambda n: round(1 / n, 2))
.catch(ZeroDivisionError, replace=lambda e: float("inf"))
)
assert list(inverses) == [float("inf"), 1.0, 0.5, 0.33, 0.25, 0.2, 0.17, 0.14, 0.12, 0.11]
... stop=True to stop the iteration if an exception is caught:
inverses: stream[float] = (
stream(range(10))
.map(lambda n: round(1 / n, 2))
.catch(ZeroDivisionError, stop=True)
)
assert list(inverses) == []
You can combine these parameters.
▼ .throttle
Limit the number of emissions per time interval:
from datetime import timedelta
three_ints_per_second: stream[int] = stream(range(10)).throttle(3, per=timedelta(seconds=1))
# collects 10 ints in 3 seconds
assert list(three_ints_per_second) == [0, 1, 2, 3, 4, 5, 6, 7, 8, 9]
▼ .buffer
Buffer upstream elements into a bounded queue via a background task (decoupling upstream production rate from downstream consumption rate):
pulled: list[int] = []
buffered_ints = iter(
stream(range(10))
.do(pulled.append)
.buffer(5)
)
assert next(buffered_ints) == 0
time.sleep(1e-3)
assert pulled == [0, 1, 2, 3, 4, 5]
▼ .observe
Observe the iteration progress:
observed_ints: stream[int] = stream(range(10)).observe("ints")
assert list(observed_ints) == [0, 1, 2, 3, 4, 5, 6, 7, 8, 9]
logs:
2025-12-23T16:43:07Z INFO observed=ints elapsed=0:00:00.000019 errors=0 elements=1
2025-12-23T16:43:07Z INFO observed=ints elapsed=0:00:00.001117 errors=0 elements=2
2025-12-23T16:43:07Z INFO observed=ints elapsed=0:00:00.001147 errors=0 elements=4
2025-12-23T16:43:07Z INFO observed=ints elapsed=0:00:00.001162 errors=0 elements=8
2025-12-23T16:43:07Z INFO observed=ints elapsed=0:00:00.001179 errors=0 elements=10
Logs are produced when the counts reach powers of 2. Set every to produce them periodically:
# observe every 1k elements (or errors)
observed_ints = stream(range(10)).observe("ints", every=1000)
# observe every 5 seconds
observed_ints = stream(range(10)).observe("ints", every=timedelta(seconds=5))
Observations are logged via logging.getLogger("streamable").info. Set do to do something else with the streamable.Observation:
observed_ints = stream(range(10)).observe("ints", do=custom_logger.info)
observed_ints = stream(range(10)).observe("ints", do=observations.append)
observed_ints = stream(range(10)).observe("ints", do=print)
▼ +
Concatenate a stream with an iterable:
concatenated_ints = stream(range(10)) + range(10)
assert list(concatenated_ints) == [0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 0, 1, 2, 3, 4, 5, 6, 7, 8, 9]
▼ .cast
Provide a type hint for elements:
docs: stream[Any] = stream(['{"foo": "bar"}', '{"foo": "baz"}']).map(json.loads)
dicts: stream[dict[str, str]] = docs.cast(dict[str, str])
# the stream remains the same, it's for type checkers only
assert dicts is docs
▼ .__call__
Iterate as an Iterable until exhaustion, without collecting its elements:
state: list[int] = []
pipeline: stream[int] = stream(range(10)).do(state.append)
pipeline()
assert state == [0, 1, 2, 3, 4, 5, 6, 7, 8, 9]
▼ await
Iterate as an AsyncIterable until exhaustion, without collecting its elements:
state: list[int] = []
pipeline: stream[int] = stream(range(10)).do(state.append)
await pipeline
assert state == [0, 1, 2, 3, 4, 5, 6, 7, 8, 9]
▼ .pipe
Apply a callable, passing the stream as first argument, followed by the provided *args and **kwargs:
import polars as pl
pokemons: stream[str] = ...
pokemons.pipe(pl.DataFrame, schema=["name"]).write_csv("pokemons.csv")
••• other notes
function as source
A stream can also be instantiated from a function (sync or async) that will be called sequentially to get the next source element during iteration.
e.g. stream from a Queue:
queued_ints: queue.Queue[int] = ...
# or asyncio.Queue[int]
ints: stream[int] = stream(queued_ints.get)
starmap
The star function decorator transforms a function (sync or async) that takes several positional arguments into a function that takes a tuple.
from streamable import star
pokemons: stream[str] = ...
enumerated_pokes: stream[str] = (
stream(enumerate(pokemons))
.map(star(lambda index, poke: f"#{index + 1} {poke}"))
)
assert list(enumerated_pokes) == ['#1 bulbasaur', '#2 ivysaur', '#3 venusaur', '#4 charmander', '#5 charmeleon', '#6 charizard', '#7 squirtle', '#8 wartortle', '#9 blastoise']
distinct
To collect distinct elements you can set(a_stream).
To deduplicates in the middle of the stream, .filter new values and .do add them into a set (or a fancier cache):
seen: set[str] = set()
unique_ints: stream[int] = (
stream("001000111")
.filter(lambda _: _ not in seen)
.do(seen.add)
.map(int)
)
assert list(unique_ints) == [0, 1]
vs builtins.map/filter
There is zero overhead during iteration compared to builtins.map and builtins.filter:
odd_int_chars = stream(range(N)).filter(lambda n: n % 2).map(str)
iter(odd_int_chars) visits the operations lineage and returns exactly this iterator:
map(str, filter(lambda n: n % 2, range(N)))
e.g. ETL via dlt
A stream is an expressive way to declare a dlt.resource:
# from datetime import timedelta
# from http import HTTPStatus
# from itertools import count
# import dlt
# import httpx
# from httpx import Response, HTTPStatusError
# from dlt.destinations import filesystem
# from streamable import stream
def not_found(e: HTTPStatusError) -> bool:
return e.response.status_code == HTTPStatus.NOT_FOUND
@dlt.resource
def pokemons(http_client: httpx.Client, concurrency: int, per_second: int) -> stream[dict]:
"""Ingest Pokémons from the PokéAPI, stop on first 404."""
return (
stream(count(1))
.map(lambda i: f"https://pokeapi.co/api/v2/pokemon-species/{i}")
.throttle(per_second, per=timedelta(seconds=1))
.map(http_client.get, concurrency=concurrency, as_completed=True)
.do(Response.raise_for_status)
.catch(HTTPStatusError, where=not_found, stop=True)
.map(Response.json)
.observe("pokemons")
)
# Write to a partitioned Delta Lake table, chunk by chunk on-the-fly.
with httpx.Client() as http_client:
dlt.pipeline(
pipeline_name="ingest_pokeapi",
destination=filesystem("deltalake"),
dataset_name="pokeapi",
).run(
pokemons(http_client, concurrency=8, per_second=32),
table_format="delta",
columns={"color__name": {"partition": True}},
)
⭐ links
Project details
Release history Release notifications | RSS feed
Download files
Download the file for your platform. If you're not sure which to choose, learn more about installing packages.
Source Distribution
Built Distribution
Filter files by name, interpreter, ABI, and platform.
If you're not sure about the file name format, learn more about wheel file names.
Copy a direct link to the current filters
File details
Details for the file streamable-2.0.0rc11.tar.gz.
File metadata
- Download URL: streamable-2.0.0rc11.tar.gz
- Upload date:
- Size: 56.1 kB
- Tags: Source
- Uploaded using Trusted Publishing? No
- Uploaded via: twine/6.2.0 CPython/3.11.14
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
e4de9a7a6e95ac382473f04f731d1d1d1724cea0e86d3d18caf7995dd17711bd
|
|
| MD5 |
112af732f6757a84f24454dad94bf6f8
|
|
| BLAKE2b-256 |
aabd73edc5a98db1b65148d74c1802eb6ca69545a775d6bbeeb3a5d51d656fbb
|
File details
Details for the file streamable-2.0.0rc11-py3-none-any.whl.
File metadata
- Download URL: streamable-2.0.0rc11-py3-none-any.whl
- Upload date:
- Size: 44.3 kB
- Tags: Python 3
- Uploaded using Trusted Publishing? No
- Uploaded via: twine/6.2.0 CPython/3.11.14
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
ea8623d8b9a0cf24f641c2254b9a292954908b2c6e8ca2296a500602b72ffc27
|
|
| MD5 |
61e4eaa6803f6a5ca53eba5c85567c9c
|
|
| BLAKE2b-256 |
1347850fca68fabe0359165a4531cd31872948ca84a015a702ed95c420486083
|