Skip to main content

Stream-like manipulation of iterables

Reason this release was yanked:

`.observe`operation is mute

Project description

streamable

Stream-like manipulation of iterables

A Stream[T] decorates an Iterable[T] with a fluent interface enabling the chaining of lazy operations.


codecov unittest typing lint PyPI

🔗 Fluent chain methods!
🇹 Typed type-annotated and mypyable
💤 Lazy operations are lazily evaluated at iteration time
🔄 Concurrent via threads or processes or asyncio
🛡️ Robust unit-tested for Python 3.7 to 3.14 with 100% coverage
🪶 Minimalist pip install streamable with no additional dependencies

1. install

pip install streamable

2. import

from streamable import Stream

3. init

Instantiate a Stream[T] from an Iterable[T].

integers: Stream[int] = Stream(range(10))

4. operate

  • Streams are immutable: applying an operation returns a new stream.

  • Operations are lazy: only evaluated at iteration time.

inverses: Stream[float] = (
    integers
    .map(lambda n: round(1 / n, 2))
    .catch(ZeroDivisionError)
)

5. iterate

  • Iterate over a Stream[T] as you would over any other Iterable[T].
  • Source elements are processed on-the-fly.

collect it

>>> list(inverses)
[1.0, 0.5, 0.33, 0.25, 0.2, 0.17, 0.14, 0.12, 0.11]
>>> set(inverses)
{0.5, 1.0, 0.2, 0.33, 0.25, 0.17, 0.14, 0.12, 0.11}

reduce it

>>> sum(inverses)
2.82
>>> max(inverses)
1.0
>>> from functools import reduce
>>> reduce(..., inverses)

loop it

>>> for inverse in inverses:
>>>    ...

next it

>>> inverses_iter = iter(inverses)
>>> next(inverses_iter)
1.0
>>> next(inverses_iter)
0.5

📒 Operations

.map

Applies a transformation on elements:

negative_integer_strings: Stream[str] = integers.map(lambda n: -n).map(str)

assert list(negative_integer_strings) == ['0', '-1', '-2', '-3', '-4', '-5', '-6', '-7', '-8', '-9']

thread-based concurrency

Applies the transformation via concurrency threads:

import requests

pokemon_names: Stream[str] = (
    Stream(range(1, 4))
    .map(lambda i: f"https://pokeapi.co/api/v2/pokemon-species/{i}")
    .map(requests.get, concurrency=3)
    .map(requests.Response.json)
    .map(lambda poke: poke["name"])
)
assert list(pokemon_names) == ['bulbasaur', 'ivysaur', 'venusaur']

Preserves the upstream order by default (FIFO), but you can set ordered=False for First Done First Out.

concurrency is also the size of the buffer containing not-yet-yielded results. If the buffer is full, the iteration over the upstream is stopped until some results are yielded out of the buffer.

process-based concurrency

Set via="process":

if __name__ == "__main__":
    state: List[int] = []
    n_integers: int = integers.map(state.append, concurrency=4, via="process").count()
    assert n_integers == 10
    assert state == [] # main process's state not mutated

async-based concurrency

The sibling operation .amap applies an async function:

import httpx
import asyncio

http_async_client = httpx.AsyncClient()

pokemon_names: Stream[str] = (
    Stream(range(1, 4))
    .map(lambda i: f"https://pokeapi.co/api/v2/pokemon-species/{i}")
    .amap(http_async_client.get, concurrency=3)
    .map(httpx.Response.json)
    .map(lambda poke: poke["name"])
)

assert list(pokemon_names) == ['bulbasaur', 'ivysaur', 'venusaur']
asyncio.get_event_loop().run_until_complete(http_async_client.aclose())

starmap

The star function decorator transforms a function that takes several positional arguments into a function that takes a tuple:

from streamable import star

zeros: Stream[int] = (
    Stream(enumerate(integers))
    .map(star(lambda index, integer: index - integer))
)

assert list(zeros) == [0, 0, 0, 0, 0, 0, 0, 0, 0, 0]

Also convenient with .foreach, .filter, ...

.foreach

Applies a side effect on elements:

self_printing_integers: Stream[int] = integers.foreach(print)

assert list(self_printing_integers) == list(integers)  # triggers the printing

thread-based concurrency

Like .map it has an optional concurrency parameter.

process-based concurrency

Like for .map, set the parameter via="process".

async-based concurrency

Like .map it has a sibling .aforeach operation for async.

.filter

Keeps only the elements that satisfy a condition:

pair_integers: Stream[int] = integers.filter(lambda n: n % 2 == 0)

assert list(pair_integers) == [0, 2, 4, 6, 8]

.throttle

Limits the number of yields per_second/per_minute/per_hour:

slow_integers: Stream[int] = integers.throttle(per_second=5)

assert list(slow_integers) == list(integers)  # takes 10 / 5 = 2 seconds

and/or ensure a minimum time interval separates successive yields:

from datetime import timedelta

slow_integers = integers.throttle(interval=timedelta(milliseconds=100))

assert list(slow_integers) == list(integers)  # takes 10 * 0.1 = 1 second

.group

Groups elements into Lists:

integers_5_by_5: Stream[List[int]] = integers.group(size=5)

assert list(integers_5_by_5) == [[0, 1, 2, 3, 4], [5, 6, 7, 8, 9]]
integers_by_parity: Stream[List[int]] = integers.group(by=lambda n: n % 2)

assert list(integers_by_parity) == [[0, 2, 4, 6, 8], [1, 3, 5, 7, 9]]
from datetime import timedelta

integers_within_1s: Stream[List[int]] = (
    integers
    .throttle(per_second=2)
    .group(interval=timedelta(seconds=0.99))
)

assert list(integers_within_1s) == [[0, 1, 2], [3, 4], [5, 6], [7, 8], [9]]

Mix size/by/interval parameters:

integers_2_by_2_by_parity: Stream[List[int]] = integers.group(by=lambda n: n % 2, size=2)

assert list(integers_2_by_2_by_parity) == [[0, 2], [1, 3], [4, 6], [5, 7], [8], [9]]

.flatten

Ungroups elements assuming that they are Iterables.

pair_then_odd_integers: Stream[int] = integers_by_parity.flatten()

assert list(pair_then_odd_integers) == [0, 2, 4, 6, 8, 1, 3, 5, 7, 9]

thread-based concurrency

Flattens concurrency iterables concurrently:

mix_of_0s_and_1s: Stream[int] = Stream([[0] * 4, [1] * 4]).flatten(concurrency=2)
assert list(mix_of_0s_and_1s) == [0, 1, 0, 1, 0, 1, 0, 1]

.catch

Catches a given type of exceptions, and optionally yields a replacement value:

inverses: Stream[float] = (
    integers
    .map(lambda n: round(1 / n, 2))
    .catch(ZeroDivisionError, replacement=float("inf"))
)

assert list(inverses) == [float("inf"), 1.0, 0.5, 0.33, 0.25, 0.2, 0.17, 0.14, 0.12, 0.11]

You can specify an additional when condition for the catch:

import requests
from requests.exceptions import SSLError

status_codes_ignoring_resolution_errors: Stream[int] = (
    Stream(["https://github.com", "https://foo.bar", "https://github.com/foo/bar"])
    .map(requests.get, concurrency=2)
    .catch(SSLError, when=lambda exception: "Max retries exceeded with url" in str(exception))
    .map(lambda response: response.status_code)
)

assert list(status_codes_ignoring_resolution_errors) == [200, 404]

It has an optional finally_raise: bool parameter to raise the first catched exception when iteration ends.

.truncate

Ends iteration once a given number of elements have been yielded:

five_first_integers: Stream[int] = integers.truncate(5)

assert list(five_first_integers) == [0, 1, 2, 3, 4]

... or when a condition has become satisfied:

five_first_integers: Stream[int] = integers.truncate(when=lambda n: n == 5)

assert list(five_first_integers) == [0, 1, 2, 3, 4]

.observe

Logs the progress of iterations over this stream, if you iterate on:

observed_slow_integers: Stream[int] = slow_integers.observe("integers")

you will get these logs:

INFO: [duration=0:00:00.502155 errors=0] 1 integers yielded
INFO: [duration=0:00:01.006336 errors=0] 2 integers yielded
INFO: [duration=0:00:02.011921 errors=0] 4 integers yielded
INFO: [duration=0:00:04.029666 errors=0] 8 integers yielded
INFO: [duration=0:00:05.039571 errors=0] 10 integers yielded

The amount of logs will never be overwhelming because they are produced logarithmically e.g. the 11th log will be produced when the iteration reaches the 1024th element.

zip

Use the standard zip function:

from streamable import star

cubes: Stream[int] = (
    Stream(zip(integers, integers, integers)) # Stream[Tuple[int, int, int]]
    .map(star(lambda a, b, c: a * b * c))
)

assert list(cubes) == [0, 1, 8, 27, 64, 125, 216, 343, 512, 729]

📦 Notes Box

Contribute

Please help us ! Feel very welcome to:

exhaust the stream

.count iterates over the stream until exhaustion and returns the count of elements yielded.

>>> assert integers.count() == 10

calling the stream iterates over it until exhaustion.

>>> integers.foreach(print)()
0
1
2
3
4
5
6
7
8
9

Extract-Transform-Load

ETL scripts (i.e. scripts fetching -> processing -> pushing data) can benefit from the expressivity of this library.

Here is an example that you can copy-paste and try (it only requires requests): it creates a CSV file containing all the 67 quadrupeds from the 1st, 2nd and 3rd generations of Pokémons (kudos to PokéAPI)

import csv
from datetime import timedelta
import itertools
import requests
from streamable import Stream

with open("./quadruped_pokemons.csv", mode="w") as file:
    fields = ["id", "name", "is_legendary", "base_happiness", "capture_rate"]
    writer = csv.DictWriter(file, fields, extrasaction='ignore')
    writer.writeheader()
    (
        # Infinite Stream[int] of Pokemon ids starting from Pokémon #1: Bulbasaur
        Stream(itertools.count(1))
        # Limits to 16 requests per second to be friendly to our fellow PokéAPI devs
        .throttle(per_second=16)
        # GETs pokemons concurrently using a pool of 8 threads
        .map(lambda poke_id: f"https://pokeapi.co/api/v2/pokemon-species/{poke_id}")
        .map(requests.get, concurrency=8)
        .foreach(requests.Response.raise_for_status)
        .map(requests.Response.json)
        # Stops the iteration when reaching the 1st pokemon of the 4th generation
        .truncate(when=lambda poke: poke["generation"]["name"] == "generation-iv")
        .observe("pokemons")
        # Keeps only quadruped Pokemons
        .filter(lambda poke: poke["shape"]["name"] == "quadruped")
        .observe("quadruped pokemons")
        # Catches errors due to None "generation" or "shape"
        .catch(
            TypeError,
            when=lambda error: str(error) == "'NoneType' object is not subscriptable"
        )
        # Writes a batch of pokemons every 5 seconds to the CSV file
        .group(interval=timedelta(seconds=5))
        .foreach(writer.writerows)
        .flatten()
        .observe("written pokemons")
        # Catches exceptions and raises the 1st one at the end of the iteration
        .catch(finally_raise=True)
        # Actually triggers an iteration (the lines above define lazy operations)
        .count()
    )

logging level

logging.getLogger("streamable").setLevel(logging.WARNING)  # default is INFO

visitor pattern

The Stream class exposes an .accept method and you can implement a visitor by extending the streamable.visitors.Visitor abstract class:

from streamable.visitors import Visitor

class DepthVisitor(Visitor[int]):
    def visit_stream(self, stream: Stream) -> int:
        if not stream.upstream:
            return 1
        return 1 + stream.upstream.accept(self)

def depth(stream: Stream) -> int:
    return stream.accept(DepthVisitor())

assert depth(Stream(range(10)).map(str).filter()) == 3

as functions

The Stream's methods are also exposed as functions:

from streamable.functions import catch

inverse_integers: Iterator[int] = map(lambda n: 1 / n, range(10))
safe_inverse_integers: Iterator[int] = catch(inverse_integers, ZeroDivisionError)

compatible with free-threaded Python 3.13+

Benefits from free-threaded Python 3.13+ builds, run via python -X gil=0.

ty highlighters 🙏

Project details


Release history Release notifications | RSS feed

Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

streamable-1.3.1.tar.gz (37.7 kB view details)

Uploaded Source

Built Distribution

streamable-1.3.1-py3-none-any.whl (40.4 kB view details)

Uploaded Python 3

File details

Details for the file streamable-1.3.1.tar.gz.

File metadata

  • Download URL: streamable-1.3.1.tar.gz
  • Upload date:
  • Size: 37.7 kB
  • Tags: Source
  • Uploaded using Trusted Publishing? No
  • Uploaded via: twine/5.1.1 CPython/3.12.7

File hashes

Hashes for streamable-1.3.1.tar.gz
Algorithm Hash digest
SHA256 877cac29e2af6be781eea70e88d2315345ae6144e5b1d3e4dc415fa4f3e2aac9
MD5 77f1f2e56d33515fcb83b627c599e26f
BLAKE2b-256 f482b727bcd2d65035addc18376aa2778e9c3ac28679b9c05e599dba87657fce

See more details on using hashes here.

File details

Details for the file streamable-1.3.1-py3-none-any.whl.

File metadata

  • Download URL: streamable-1.3.1-py3-none-any.whl
  • Upload date:
  • Size: 40.4 kB
  • Tags: Python 3
  • Uploaded using Trusted Publishing? No
  • Uploaded via: twine/5.1.1 CPython/3.12.7

File hashes

Hashes for streamable-1.3.1-py3-none-any.whl
Algorithm Hash digest
SHA256 c6370ddd3134d4145bbc46a3e4f011439db23ace158c59e94e447d7a6e2d3a31
MD5 8227c344a533fd8d4d1eee5afc8f8bf4
BLAKE2b-256 5bfcc47632624904a001f9828f4299cd382ba4e75329e52cf6cbcbc25b26d8eb

See more details on using hashes here.

Supported by

AWS AWS Cloud computing and Security Sponsor Datadog Datadog Monitoring Fastly Fastly CDN Google Google Download Analytics Microsoft Microsoft PSF Sponsor Pingdom Pingdom Monitoring Sentry Sentry Error logging StatusPage StatusPage Status page