Skip to main content

fluent iteration

Project description

streamable: fluent iteration

Actions Status codecov Actions Status Actions Status Actions Status


TL;DR:

🇹 typed

The Stream[T] class extends Iterable[T]

🪶 light

pip install streamable with no dependency

🛡️ robust

Unittested with 100% coverage

💤 lazy

Operations are only evaluated during iteration

🔄 concurrent

Threads-based or asyncio-based


1. install

pip install streamable

2. import

from streamable import Stream

3. init

Instantiate a Stream[T] from an Iterable[T].

integers: Stream[int] = Stream(range(10))

4. operate

  • Streams are immutable: applying an operation returns a new stream.

  • Operations are lazy: only evaluated at iteration time.

odd_integer_strings: Stream[str] = (
    integers
    .filter(lambda n: n % 2)
    .map(str)
)

5. iterate

  • Iterate over a Stream[T] as you would over any other Iterable[T].
  • Source elements are processed on-the-fly.

collect it

>>> list(odd_integer_strings)
['1', '3', '5', '7', '9']
>>> set(odd_integer_strings)
{'9', '1', '5', '3', '7'}

reduce it

>>> sum(integers)
45
>>> from functools import reduce
>>> reduce(str.__add__, odd_integer_strings)
'13579'

loop it

for odd_integer_string in odd_integer_strings:
    ...

📒 Operations

.map

Applies a function on elements.

integer_strings: Stream[str] = integers.map(str)

It has an optional concurrency: int parameter to execute the function concurrently (threads-based) while preserving the order.

It has a sibling operation called .amap to apply an async function concurrently (see section asyncio support).

.foreach

Applies a function on elements like .map but yields the elements instead of the results.

printed_integers: Stream[int] = integers.foreach(print)

It has an optional concurrency: int parameter to execute the function concurrently (threads-based) while preserving the order.

It has a sibling operation called .aforeach to apply an async function concurrently (see section asyncio support).

.filter

Keeps only elements satisfying a predicate function.

pair_integers: Stream[int] = integers.filter(lambda n: n % 2 == 0)

.group

Groups elements.

parity_groups: Stream[List[int]] = integers.group(size=100, seconds=4, by=lambda i: i % 2)

A group is a list of size elements for which by returns the same value, but it may contain fewer elements in these cases:

  • seconds have elapsed since the last yield of a group
  • upstream is exhausted
  • upstream raises an exception

All the parameters are optional.

.flatten

Ungroups elements assuming that they are Iterables.

integers: Stream[int] = parity_groups.flatten()

It has an optional concurrency parameter to flatten several iterables concurrently (threads).

.slow

Limits the rate at which elements are yielded up to a maximum frequency (elements per second).

slow_integers: Stream[int] = integers.slow(frequency=2)

.catch

Catches exceptions that satisfy a predicate function.

safe_inverse_floats: Stream[float] = (
    integers
    .map(lambda n: 1 / n)
    .catch(lambda error: isinstance(error, ZeroDivisionError))
)

It has an optional raise_after_exhaustion parameter to raise the first catched exception when an iteration ends.

.observe

Logs the progress of iterations over this stream.

If you iterate on

observed_slow_integers: Stream[int] = slow_integers.observe(what="integers")

you will get these logs:

INFO: [duration=0:00:00.502155 errors=0] 1 integers yielded
INFO: [duration=0:00:01.006336 errors=0] 2 integers yielded
INFO: [duration=0:00:02.011921 errors=0] 4 integers yielded
INFO: [duration=0:00:04.029666 errors=0] 8 integers yielded
INFO: [duration=0:00:05.039571 errors=0] 10 integers yielded

The amount of logs will never be overwhelming because they are produced logarithmically e.g. the 11th log will be produced when the iteration reaches the 1024th element.

.truncate

Stops iteration as soon as the when predicate is satisfied or count elements have been yielded.

five_first_integers: Stream[int] = integers.truncate(5)

is equivalent to:

five_first_integers: Stream[int] = integers.truncate(when=lambda n: n == 5)

📦 Notes Box

typing

This is a fully typed library (you can mypy it).

supported Python versions

Compatible with Python 3.7+ (unittested for: 3.7.17, 3.8.18, 3.9.18, 3.10.13, 3.11.7, 3.12.1).

support for asyncio

As an alternative to the threads-based concurrency available for .map and .foreach operations (via the concurrency parameter), one can use .amap and .aforeach operations to apply async functions concurrently on a stream:

import asyncio
import time

async def slow_async_square(n: int) -> int:
    await asyncio.sleep(3)
    return n ** 2

def slow_str(n: int) -> str:
    time.sleep(3)
    return str(n)

print(
    ", ".join(
        integers
        # coroutines-based concurrency
        .amap(slow_async_square, concurrency=8)
        # threads-based concurrency
        .map(slow_str, concurrency=8)
        .truncate(5)
    )
)

this prints (in 6s):

0, 1, 4, 9, 16

CPU-bound tasks

For CPU-bound tasks, consider using the PyPy interpreter whose Just In Time (JIT) compilation should drastically improve performances, e.g. this snippet is run 50 times faster by PyPy compared to standard CPython interpreter:

# cpu_bound_script.py
from streamable import Stream
print(
    sum(
        Stream(range(1, 1_000_000_000))
        .map(lambda n: 1/n)
    )
)

Few rough runtime orders of magnitude: CPython vs PyPy vs Java vs C vs Rust.

Extract-Transform-Load tasks

One can leverage this library to write elegant ETL scripts, check the README dedicated to ETL.

as functions

The Stream's methods are also exposed as functions:

from streamable.functions import slow

iterator: Iterator[int] = ...
slow_iterator: Iterator[int] = slow(iterator)

visitor pattern

The Stream class exposes an .accept method and you can implement a visitor by extending the streamable.visitor.Visitor class:

from streamable.visitor import Visitor

class DepthVisitor(Visitor[int]):
    def visit_stream(self, stream: Stream) -> int:
        if not stream.upstream:
            return 1
        return 1 + stream.upstream.accept(self)

def stream_depth(stream: Stream) -> int:
    return stream.accept(DepthVisitor())
>>> stream_depth(odd_integer_strings)
3

go to line

Style tip: Enclose operations in parentheses to keep lines short without needing trailing backslashes \.

stream: Stream[str] = (
    Stream(range(10))
    .map(str)
    .foreach(print)
    .flatten()
    .truncate(10)
)

explain

print(stream.explanation())
└─•TruncateStream(count=10, when=None)
  └─•FlattenStream(concurrency=1)
    └─•ForeachStream(effect=print, concurrency=1)
      └─•MapStream(transformation=str, concurrency=1)
        └─•Stream(source=range(...))

change logging level

import logging

logging.getLogger("streamable").setLevel(logging.WARNING)

Project details


Release history Release notifications | RSS feed

Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

streamable-0.13.9.tar.gz (27.0 kB view details)

Uploaded Source

Built Distribution

streamable-0.13.9-py3-none-any.whl (28.3 kB view details)

Uploaded Python 3

File details

Details for the file streamable-0.13.9.tar.gz.

File metadata

  • Download URL: streamable-0.13.9.tar.gz
  • Upload date:
  • Size: 27.0 kB
  • Tags: Source
  • Uploaded using Trusted Publishing? No
  • Uploaded via: twine/5.1.1 CPython/3.12.4

File hashes

Hashes for streamable-0.13.9.tar.gz
Algorithm Hash digest
SHA256 ec833e83c8341bf4730a6632af2ed8f0083b6f18dfd38452d86ebf6f087a032e
MD5 5abda89fd8037f897766484d9b4ac561
BLAKE2b-256 71e4bdba86369edf32aa5c5ede4b2f3cf58727998de8b533500c7e7acc76e02e

See more details on using hashes here.

File details

Details for the file streamable-0.13.9-py3-none-any.whl.

File metadata

  • Download URL: streamable-0.13.9-py3-none-any.whl
  • Upload date:
  • Size: 28.3 kB
  • Tags: Python 3
  • Uploaded using Trusted Publishing? No
  • Uploaded via: twine/5.1.1 CPython/3.12.4

File hashes

Hashes for streamable-0.13.9-py3-none-any.whl
Algorithm Hash digest
SHA256 9ba9d5e2fb634d80139032a8eeabb2d384c5372484780a7870b250a93d2ca3fd
MD5 52b7221e104a4cb48bac67d39bdc7a2a
BLAKE2b-256 ac1bf2f647ca9a232b07f135cd8c08c9066621884c526b3c4fb08fe5b0e17791

See more details on using hashes here.

Supported by

AWS AWS Cloud computing and Security Sponsor Datadog Datadog Monitoring Fastly Fastly CDN Google Google Download Analytics Microsoft Microsoft PSF Sponsor Pingdom Pingdom Monitoring Sentry Sentry Error logging StatusPage StatusPage Status page