Skip to main content

concurrent & fluent interface for (async) iterables

Project description

streamable

concurrent & fluent interface for (async) iterables

Stream[T] is a decorator for Iterable[T] or AsyncIterable[T] that allows chaining lazy operations, with seamless concurrency via threads, processes, or async coroutines.

Battle-tested on Python 3.7 to 3.15 and compatible with PyPy.

coverage PyPI Anaconda-Server Badge

1. install

(zero dependencies)

pip install streamable

2. import

from streamable import Stream

3. init

Create a Stream[T] decorating an Iterable[T]/AsyncIterable[T]:

integers: Stream[int] = Stream(range(10))

4. operate

Chain lazy operations (only evaluated during iteration), each returning a new immutable Stream:

inverses: Stream[float] = (
    integers
    .map(lambda n: round(1 / n, 2))
    .catch(ZeroDivisionError, replacement=float("inf"))
)

5. iterate

Iterate over a Stream[T] like any Iterable[T]/AsyncIterable[T]:

>>> list(inverses)
[inf, 1.0, 0.5, 0.33, 0.25, 0.2, 0.17, 0.14, 0.12, 0.11]

Elements are processed on-the-fly as the iteration advances.

↔ showcase: ETL

Let's walk through the Stream's features with an Extract-Transform-Load script:

This toy script gets Pokémons concurrently from PokéAPI, and writes the quadrupeds from the first three generations into a CSV file, in 5-seconds batches:

import csv
from datetime import timedelta
from itertools import count
import httpx
from streamable import Stream

with open("./quadruped_pokemons.csv", mode="w") as file:
    fields = ["id", "name", "is_legendary", "base_happiness", "capture_rate"]
    writer = csv.DictWriter(file, fields, extrasaction='ignore')
    writer.writeheader()
    with httpx.Client() as http_client:
        pipeline = (
            # Infinite Stream[int] of Pokemon ids starting from Pokémon #1: Bulbasaur
            Stream(count(1))
            # Limit to 16 requests per second to be friendly to our fellow PokéAPI devs
            .throttle(16, per=timedelta(seconds=1))
            # GET pokemons concurrently using a pool of 8 threads
            .map(lambda poke_id: f"https://pokeapi.co/api/v2/pokemon-species/{poke_id}")
            .map(http_client.get, concurrency=8)
            .foreach(httpx.Response.raise_for_status)
            .map(httpx.Response.json)
            # Stop the iteration when reaching the 1st pokemon of the 4th generation
            .truncate(when=lambda poke: poke["generation"]["name"] == "generation-iv")
            .observe("pokemons")
            # Keep only quadruped Pokemons
            .filter(lambda poke: poke["shape"]["name"] == "quadruped")
            # Write a batch of pokemons every 5 seconds to the CSV file
            .group(interval=timedelta(seconds=5))
            .foreach(writer.writerows)
            .flatten()
            .observe("written pokemons")
            # Catch exceptions and raises the 1st one at the end of the iteration
            .catch(Exception, finally_raise=True)
        )

        # Start a full iteration
        pipeline()

... or the async way!

Let's write an async version of this script:

  • httpx.CLient becomes httpx.AsyncCLient
  • .map becomes .amap
  • pipeline() becomes await pipeline
import asyncio
import csv
from datetime import timedelta
from itertools import count
import httpx
from streamable import Stream

async def main() -> None:
    with open("./quadruped_pokemons.csv", mode="w") as file:
        fields = ["id", "name", "is_legendary", "base_happiness", "capture_rate"]
        writer = csv.DictWriter(file, fields, extrasaction='ignore')
        writer.writeheader()

        async with httpx.AsyncClient() as http_client:
            pipeline = (
                # Infinite Stream[int] of Pokemon ids starting from Pokémon #1: Bulbasaur
                Stream(count(1))
                # Limit to 16 requests per second to be friendly to our fellow PokéAPI devs
                .throttle(16, per=timedelta(seconds=1))
                # GET pokemons via 8 concurrent coroutines
                .map(lambda poke_id: f"https://pokeapi.co/api/v2/pokemon-species/{poke_id}")
                .amap(http_client.get, concurrency=8)
                .foreach(httpx.Response.raise_for_status)
                .map(httpx.Response.json)
                # Stop the iteration when reaching the 1st pokemon of the 4th generation
                .truncate(when=lambda poke: poke["generation"]["name"] == "generation-iv")
                .observe("pokemons")
                # Keep only quadruped Pokemons
                .filter(lambda poke: poke["shape"]["name"] == "quadruped")
                # Write a batch of pokemons every 5 seconds to the CSV file
                .group(interval=timedelta(seconds=5))
                .foreach(writer.writerows)
                .flatten()
                .observe("written pokemons")
                # Catch exceptions and raises the 1st one at the end of the iteration
                .catch(Exception, finally_raise=True)
            )

            # Start a full async iteration
            await pipeline

asyncio.run(main())

🙏 Highlights from the community

💡 Notes

Exceptions are not terminating the iteration

[!TIP] If an operation raises an exception while processing an element, you can handle it and continue the iteration:

👀 show snippet
from contextlib import suppress

casted_ints: Iterator[int] = iter(
    Stream("0123_56789")
    .map(int)
    .group(3)
    .flatten()
)
collected: List[int] = []

with suppress(ValueError):
    collected.extend(casted_ints)
assert collected == [0, 1, 2, 3]

collected.extend(casted_ints)
assert collected == [0, 1, 2, 3, 5, 6, 7, 8, 9]

Performances

Declaring a Stream is lazy,

odd_int_strings = Stream(range(1_000_000)).filter(lambda n: n % 2).map(str)

and there is zero overhead during iteration compared to builtins, iter(odd_int_strings) visits the operations lineage and returns exactly this iterator:

map(str, filter(lambda n: n % 2, range(1_000_000)))

Operations have been implemented with speed in mind. If you have any ideas for improvement, whether performance-related or not, an issue, PR, or discussion would be very much appreciated! 🙏 (CONTRIBUTING.md)

📒 Operations

.map transform elements
.foreach call a side effect function on elements
.group / .groupby batch a certain number of elements, by a given key, over a time interval
.flatten explode iterable elements
.filter remove elements
.distinct remove duplicates
.truncate cut the stream
.skip ignore head elements
.catch handle exceptions
.throttle control the rate of iteration
.observe log elements/errors counters

[!IMPORTANT] A Stream exposes a minimalist yet expressive set of operations to manipulate its elements, but creating its source or consuming it is not its responsability, it's meant to be combined with specialized libraries (csv, json, pyarrow, psycopg2, boto3, requests, httpx, ...).

[!NOTE] async counterparts: For each operation that takes a function (such as .map), there is an equivalent that accepts an async function (such as .amap). You can freely mix synchronous and asynchronous operations within the same Stream. The result can then be consumed either as an Iterable or as an AsyncIterable. When a stream involving async operations is consumed as an Iterable, a temporary asyncio event loop is attached to it.

🟡 .map / .amap

Applies a transformation on elements:

👀 show snippet
integer_strings: Stream[str] = integers.map(str)

assert list(integer_strings) == ['0', '1', '2', '3', '4', '5', '6', '7', '8', '9']

thread-based concurrency

Applies the transformation via concurrency threads.

👀 show snippet
import requests

pokemon_names: Stream[str] = (
    Stream(range(1, 4))
    .map(lambda i: f"https://pokeapi.co/api/v2/pokemon-species/{i}")
    .map(requests.get, concurrency=3)
    .map(requests.Response.json)
    .map(lambda poke: poke["name"])
)
assert list(pokemon_names) == ['bulbasaur', 'ivysaur', 'venusaur']

[!NOTE] Memory-efficient: Only concurrency upstream elements are pulled for processing; the next upstream element is pulled only when a result is yielded downstream.

[!NOTE] Ordering: it yields results in the upstream order (FIFO), set ordered=False to yield results as they become available (First Done, First Out).

process-based concurrency

Same but set via="process":

👀 show snippet
if __name__ == "__main__":
    state: List[int] = []
    # integers are mapped
    assert integers.map(state.append, concurrency=4, via="process").count() == 10
    # but the `state` of the main process is not mutated
    assert state == []

coroutine-based concurrency: .amap

.amap can apply an async function concurrently.

👀 show snippet
import asyncio
import httpx

async def main() -> None:
    async with httpx.AsyncClient() as http:
        pokemon_names: Stream[str] = (
            Stream(range(1, 4))
            .map(lambda i: f"https://pokeapi.co/api/v2/pokemon-species/{i}")
            .amap(http.get, concurrency=3)
            .map(httpx.Response.json)
            .map(lambda poke: poke["name"])
        )
        # consume as an AsyncIterable[str]
        assert [name async for name in pokemon_names] == ['bulbasaur', 'ivysaur', 'venusaur']

asyncio.run(main())

"starmap"

The star function decorator transforms a function that takes several positional arguments into a function that takes a tuple:

👀 show snippet
from streamable import star

zeros: Stream[int] = (
    Stream(enumerate(integers))
    .map(star(lambda index, integer: index - integer))
)

assert list(zeros) == [0, 0, 0, 0, 0, 0, 0, 0, 0, 0]

🟡 .foreach / .aforeach

Applies a side effect on elements:

👀 show snippet
state: List[int] = []
appending_integers: Stream[int] = integers.foreach(state.append)

assert list(appending_integers) == [0, 1, 2, 3, 4, 5, 6, 7, 8, 9]
assert state == [0, 1, 2, 3, 4, 5, 6, 7, 8, 9]

concurrency

Similar to .map:

  • set the concurrency parameter for thread-based concurrency
  • set via="process" for process-based concurrency
  • set ordered=False for First Done First Out
  • The .aforeach operation can apply an async effect concurrently.

🟡 .group / .agroup

Groups into Lists

... up to a given group size:

👀 show snippet
integers_by_5: Stream[List[int]] = integers.group(size=5)

assert list(integers_by_5) == [[0, 1, 2, 3, 4], [5, 6, 7, 8, 9]]

... and/or co-groups by a given key:

👀 show snippet
integers_by_parity: Stream[List[int]] = integers.group(by=lambda n: n % 2)

assert list(integers_by_parity) == [[0, 2, 4, 6, 8], [1, 3, 5, 7, 9]]

... and/or co-groups the elements yielded by the upstream within a given time interval:

👀 show snippet
from datetime import timedelta

integers_within_1_sec: Stream[List[int]] = (
    integers
    .throttle(2, per=timedelta(seconds=1))
    .group(interval=timedelta(seconds=0.99))
)

assert list(integers_within_1_sec) == [[0, 1, 2], [3, 4], [5, 6], [7, 8], [9]]

[!TIP] Combine the size/by/interval parameters:

👀 show snippet
integers_by_parity_by_2: Stream[List[int]] = (
    integers
    .group(by=lambda n: n % 2, size=2)
)

assert list(integers_by_parity_by_2) == [[0, 2], [1, 3], [4, 6], [5, 7], [8], [9]]

🟡 .groupby / .agroupby

Like .group, but groups into (key, elements) tuples:

👀 show snippet
integers_by_parity: Stream[Tuple[str, List[int]]] = (
    integers
    .groupby(lambda n: "odd" if n % 2 else "even")
)

assert list(integers_by_parity) == [("even", [0, 2, 4, 6, 8]), ("odd", [1, 3, 5, 7, 9])]

[!TIP] Then "starmap" over the tuples:

👀 show snippet
from streamable import star

counts_by_parity: Stream[Tuple[str, int]] = (
    integers_by_parity
    .map(star(lambda parity, ints: (parity, len(ints))))
)

assert list(counts_by_parity) == [("even", 5), ("odd", 5)]

🟡 .flatten / .aflatten

Ungroups elements assuming that they are Iterables (or AsyncIterables for .aflatten):

👀 show snippet
even_then_odd_integers: Stream[int] = integers_by_parity.flatten()

assert list(even_then_odd_integers) == [0, 2, 4, 6, 8, 1, 3, 5, 7, 9]

thread-based concurrency

Concurrently flattens concurrency iterables via threads (or via coroutines for .aflatten):

👀 show snippet
mixed_ones_and_zeros: Stream[int] = (
    Stream([[0] * 4, [1] * 4])
    .flatten(concurrency=2)
)
assert list(mixed_ones_and_zeros) == [0, 1, 0, 1, 0, 1, 0, 1]

🟡 .filter / .afilter

Keeps only the elements that satisfy a condition:

👀 show snippet
even_integers: Stream[int] = integers.filter(lambda n: n % 2 == 0)

assert list(even_integers) == [0, 2, 4, 6, 8]

🟡 .distinct / .adistinct

Removes duplicates:

👀 show snippet
distinct_chars: Stream[str] = Stream("foobarfooo").distinct()

assert list(distinct_chars) == ["f", "o", "b", "a", "r"]

specifying a deduplication key:

👀 show snippet
strings_of_distinct_lengths: Stream[str] = (
    Stream(["a", "foo", "bar", "z"])
    .distinct(len)
)

assert list(strings_of_distinct_lengths) == ["a", "foo"]

[!WARNING] During iteration, all distinct elements that are yielded are retained in memory to perform deduplication. However, you can remove only consecutive duplicates without a memory footprint by setting consecutive_only=True:

👀 show snippet
consecutively_distinct_chars: Stream[str] = (
    Stream("foobarfooo")
    .distinct(consecutive_only=True)
)

assert list(consecutively_distinct_chars) == ["f", "o", "b", "a", "r", "f", "o"]

🟡 .truncate / .atruncate

Ends iteration once a given number of elements have been yielded:

👀 show snippet
five_first_integers: Stream[int] = integers.truncate(5)

assert list(five_first_integers) == [0, 1, 2, 3, 4]

or when a condition is satisfied:

👀 show snippet
five_first_integers: Stream[int] = integers.truncate(when=lambda n: n == 5)

assert list(five_first_integers) == [0, 1, 2, 3, 4]

If both count and when are set, truncation occurs as soon as either condition is met.

🟡 .skip / .askip

Skips the first specified number of elements:

👀 show snippet
integers_after_five: Stream[int] = integers.skip(5)

assert list(integers_after_five) == [5, 6, 7, 8, 9]

or skips elements until a predicate is satisfied:

👀 show snippet
integers_after_five: Stream[int] = integers.skip(until=lambda n: n >= 5)

assert list(integers_after_five) == [5, 6, 7, 8, 9]

If both count and until are set, skipping stops as soon as either condition is met.

🟡 .catch / .acatch

Catches a given type of exception, and optionally yields a replacement value:

👀 show snippet
inverses: Stream[float] = (
    integers
    .map(lambda n: round(1 / n, 2))
    .catch(ZeroDivisionError, replacement=float("inf"))
)

assert list(inverses) == [float("inf"), 1.0, 0.5, 0.33, 0.25, 0.2, 0.17, 0.14, 0.12, 0.11]

You can specify an additional when condition for the catch:

👀 show snippet
import requests
from requests.exceptions import ConnectionError

status_codes_ignoring_resolution_errors: Stream[int] = (
    Stream(["https://github.com", "https://foo.bar", "https://github.com/foo/bar"])
    .map(requests.get, concurrency=2)
    .catch(ConnectionError, when=lambda error: "Max retries exceeded with url" in str(error))
    .map(lambda response: response.status_code)
)

assert list(status_codes_ignoring_resolution_errors) == [200, 404]

It has an optional finally_raise: bool parameter to raise the first exception caught (if any) when the iteration terminates.

[!TIP] Leverage when to apply side effects on catch:

👀 show snippet
errors: List[Exception] = []

def store_error(error: Exception) -> bool:
    errors.append(error)  # applies effect
    return True  # signals to catch the error

integers_in_string: Stream[int] = (
    Stream("012345foo6789")
    .map(int)
    .catch(ValueError, when=store_error)
)

assert list(integers_in_string) == [0, 1, 2, 3, 4, 5, 6, 7, 8, 9]
assert len(errors) == len("foo")

🟡 .throttle

Limits the number of yields per time interval:

👀 show snippet
from datetime import timedelta

three_integers_per_second: Stream[int] = integers.throttle(3, per=timedelta(seconds=1))

# takes 3s: ceil(10 integers / 3 per_second) - 1
assert list(three_integers_per_second) == [0, 1, 2, 3, 4, 5, 6, 7, 8, 9]

🟡 .observe

Logs the progress of iterations:

👀 show snippet
>>> assert list(integers.throttle(2, per=timedelta(seconds=1)).observe("integers")) == [0, 1, 2, 3, 4, 5, 6, 7, 8, 9]
INFO: [duration=0:00:00.001793 errors=0] 1 integers yielded
INFO: [duration=0:00:00.004388 errors=0] 2 integers yielded
INFO: [duration=0:00:01.003655 errors=0] 4 integers yielded
INFO: [duration=0:00:03.003196 errors=0] 8 integers yielded
INFO: [duration=0:00:04.003852 errors=0] 10 integers yielded

[!NOTE] To avoid flooding, logs are emitted only when the number of yielded elements (or errors) reaches powers of 2.

[!TIP] To mute these logs, set the logging level above INFO:

👀 show snippet
import logging
logging.getLogger("streamable").setLevel(logging.WARNING)

🟡 +

Concatenates streams:

👀 show snippet
assert list(integers + integers) == [0, 1, 2, 3 ,4, 5, 6, 7, 8, 9, 0, 1, 2, 3 ,4, 5, 6, 7, 8, 9]

🟡 zip

Use the builtins' zip function:

👀 show snippet
from streamable import star

cubes: Stream[int] = (
    Stream(zip(integers, integers, integers))  # Stream[Tuple[int, int, int]]
    .map(star(lambda a, b, c: a * b * c))  # Stream[int]
)

assert list(cubes) == [0, 1, 8, 27, 64, 125, 216, 343, 512, 729]

Shorthands for consuming the stream

Although consuming the stream is beyond the scope of this library, it provides two basic shorthands to trigger an iteration:

🟡 .count / .acount

.count iterates over the stream until exhaustion and returns the number of elements yielded:

👀 show snippet
assert integers.count() == 10

The .acount (async method) iterates over the stream as an AsyncIterable until exhaustion and returns the number of elements yielded:

👀 show snippet
assert asyncio.run(integers.acount()) == 10

🟡 () / await

Calling the stream iterates over it until exhaustion, and returns it:

👀 show snippet
state: List[int] = []
appending_integers: Stream[int] = integers.foreach(state.append)
assert appending_integers() is appending_integers
assert state == [0, 1, 2, 3, 4, 5, 6, 7, 8, 9]

Awaiting the stream iterates over it as an AsyncIterable until exhaustion, and returns it:

👀 show snippet
async def test_await() -> None:
    state: List[int] = []
    appending_integers: Stream[int] = integers.foreach(state.append)
    appending_integers is await appending_integers
    assert state == [0, 1, 2, 3, 4, 5, 6, 7, 8, 9]
asyncio.run(test_await())

🟡 .pipe

Calls a function, passing the stream as first argument, followed by *args/**kwargs if any (inspired by the .pipe from pandas or polars):

👀 show snippet
import pandas as pd

(
    integers
    .observe("ints")
    .pipe(pd.DataFrame, columns=["integer"])
    .to_csv("integers.csv", index=False)
)

Project details


Release history Release notifications | RSS feed

This version

1.6.6

Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

streamable-1.6.6.tar.gz (57.1 kB view details)

Uploaded Source

Built Distribution

If you're not sure about the file name format, learn more about wheel file names.

streamable-1.6.6-py3-none-any.whl (43.4 kB view details)

Uploaded Python 3

File details

Details for the file streamable-1.6.6.tar.gz.

File metadata

  • Download URL: streamable-1.6.6.tar.gz
  • Upload date:
  • Size: 57.1 kB
  • Tags: Source
  • Uploaded using Trusted Publishing? No
  • Uploaded via: twine/6.2.0 CPython/3.14.0

File hashes

Hashes for streamable-1.6.6.tar.gz
Algorithm Hash digest
SHA256 8bc1f282205c473f307cace55b289e8ad2b48244074e87d049bcb8f54ff38e31
MD5 c86d6b1364c753d44377945fd30cabfc
BLAKE2b-256 d6db0390deecc9770e377aa9c981e1f02aebd22bfbe51ccc1f07033b2fcf758a

See more details on using hashes here.

File details

Details for the file streamable-1.6.6-py3-none-any.whl.

File metadata

  • Download URL: streamable-1.6.6-py3-none-any.whl
  • Upload date:
  • Size: 43.4 kB
  • Tags: Python 3
  • Uploaded using Trusted Publishing? No
  • Uploaded via: twine/6.2.0 CPython/3.14.0

File hashes

Hashes for streamable-1.6.6-py3-none-any.whl
Algorithm Hash digest
SHA256 51685597bbcbeafb9a720c869900fa5e253d4c31533dd10be7035b22dad3951e
MD5 a27f978530c2314eaac84eab2eed18be
BLAKE2b-256 959c7b58acd8d3b7e538b8ff7f7631c7745d261fba623c49c09065cb8827004c

See more details on using hashes here.

Supported by

AWS Cloud computing and Security Sponsor Datadog Monitoring Depot Continuous Integration Fastly CDN Google Download Analytics Pingdom Monitoring Sentry Error logging StatusPage Status page