fluent iteration
Project description
༄ streamable
: fluent iteration
TL;DR:
🇹 typed
The Stream[T]
class extends Iterable[T]
🪶 light
pip install streamable
with no dependency
🛡️ robust
Unittested with 100% coverage
💤 lazy
Operations are only evaluated during iteration
🔄 concurrent
Threads-based or asyncio
-based
1. install
pip install streamable
2. import
from streamable import Stream
3. init
Instantiate a Stream[T]
from an Iterable[T]
.
integers: Stream[int] = Stream(range(10))
4. operate
-
Stream
s are immutable: applying an operation returns a new stream. -
Operations are lazy: only evaluated at iteration time.
odd_integer_strings: Stream[str] = (
integers
.filter(lambda n: n % 2)
.map(str)
)
5. iterate
- Iterate over a
Stream[T]
as you would over any otherIterable[T]
. - Source elements are processed on-the-fly.
collect it
>>> list(odd_integer_strings)
['1', '3', '5', '7', '9']
>>> set(odd_integer_strings)
{'9', '1', '5', '3', '7'}
reduce it
>>> sum(integers)
45
>>> from functools import reduce
>>> reduce(str.__add__, odd_integer_strings)
'13579'
loop it
for odd_integer_string in odd_integer_strings:
...
📒 Operations
.map
Applies a function on elements.
integer_strings: Stream[str] = integers.map(str)
It has an optional concurrency: int
parameter to execute the function concurrently (threads-based) while preserving the order.
It has a sibling operation called .amap
to apply an async function concurrently (see section asyncio
support).
.foreach
Applies a function on elements like .map
but yields the elements instead of the results.
printed_integers: Stream[int] = integers.foreach(print)
It has an optional concurrency: int
parameter to execute the function concurrently (threads-based) while preserving the order.
It has a sibling operation called .aforeach
to apply an async function concurrently (see section asyncio
support).
.filter
Keeps only elements satisfying a predicate function.
pair_integers: Stream[int] = integers.filter(lambda n: n % 2 == 0)
.group
Groups elements.
parity_groups: Stream[List[int]] = integers.group(size=100, seconds=4, by=lambda i: i % 2)
A group is a list of size
elements for which by
returns the same value, but it may contain fewer elements in these cases:
seconds
have elapsed since the last yield of a group- upstream is exhausted
- upstream raises an exception
All the parameters are optional.
.flatten
Ungroups elements assuming that they are Iterable
s.
integers: Stream[int] = parity_groups.flatten()
It has an optional concurrency
parameter to flatten several iterables concurrently (threads).
.slow
Limits the rate at which elements are yielded up to a maximum frequency
(elements per second).
slow_integers: Stream[int] = integers.slow(frequency=2)
.catch
Catches exceptions that satisfy a predicate function.
safe_inverse_floats: Stream[float] = (
integers
.map(lambda n: 1 / n)
.catch(lambda error: isinstance(error, ZeroDivisionError))
)
It has an optional raise_after_exhaustion
parameter to raise the first catched exception when an iteration ends.
.observe
Logs the progress of iterations over this stream.
If you iterate on
observed_slow_integers: Stream[int] = slow_integers.observe(what="integers")
you will get these logs:
INFO: [duration=0:00:00.502155 errors=0] 1 integers yielded
INFO: [duration=0:00:01.006336 errors=0] 2 integers yielded
INFO: [duration=0:00:02.011921 errors=0] 4 integers yielded
INFO: [duration=0:00:04.029666 errors=0] 8 integers yielded
INFO: [duration=0:00:05.039571 errors=0] 10 integers yielded
The amount of logs will never be overwhelming because they are produced logarithmically e.g. the 11th log will be produced when the iteration reaches the 1024th element.
.truncate
Stops iteration as soon as the when
predicate is satisfied or count
elements have been yielded.
five_first_integers: Stream[int] = integers.truncate(5)
is equivalent to:
five_first_integers: Stream[int] = integers.truncate(when=lambda n: n == 5)
📦 Notes Box
typing
This is a fully typed library (you can mypy
it).
supported Python versions
Compatible with Python 3.7+
(unittested for: 3.7.17
, 3.8.18
, 3.9.18
, 3.10.13
, 3.11.7
, 3.12.1
).
support for asyncio
As an alternative to the threads-based concurrency available for .map
and .foreach
operations (via the concurrency
parameter), one can use .amap
and .aforeach
operations to apply async
functions concurrently on a stream:
import asyncio
import time
async def slow_async_square(n: int) -> int:
await asyncio.sleep(3)
return n ** 2
def slow_str(n: int) -> str:
time.sleep(3)
return str(n)
print(
", ".join(
integers
# coroutines-based concurrency
.amap(slow_async_square, concurrency=8)
# threads-based concurrency
.map(slow_str, concurrency=8)
.truncate(5)
)
)
this prints (in 6s):
0, 1, 4, 9, 16
CPU-bound tasks
For CPU-bound tasks, consider using the PyPy
interpreter whose Just In Time (JIT) compilation should drastically improve performances, e.g. this snippet is run 50 times faster by PyPy
compared to standard CPython interpreter:
# cpu_bound_script.py
from streamable import Stream
print(
sum(
Stream(range(1, 1_000_000_000))
.map(lambda n: 1/n)
)
)
Few rough runtime orders of magnitude: CPython vs PyPy vs Java vs C vs Rust.
Extract-Transform-Load tasks
One can leverage this library to write elegant ETL scripts, check the README dedicated to ETL.
as functions
The Stream
's methods are also exposed as functions:
from streamable.functions import slow
iterator: Iterator[int] = ...
slow_iterator: Iterator[int] = slow(iterator)
visitor pattern
The Stream
class exposes an .accept
method and you can implement a visitor by extending the streamable.visitor.Visitor
class:
from streamable.visitor import Visitor
class DepthVisitor(Visitor[int]):
def visit_stream(self, stream: Stream) -> int:
if not stream.upstream:
return 1
return 1 + stream.upstream.accept(self)
def stream_depth(stream: Stream) -> int:
return stream.accept(DepthVisitor())
>>> stream_depth(odd_integer_strings)
3
go to line
Style tip: Enclose operations in parentheses to keep lines short without needing trailing backslashes \
.
stream: Stream[str] = (
Stream(range(10))
.map(str)
.foreach(print)
.flatten()
.truncate(10)
)
explain
print(stream.explanation())
└─•TruncateStream(count=10, when=None)
└─•FlattenStream(concurrency=1)
└─•ForeachStream(effect=print, concurrency=1)
└─•MapStream(transformation=str, concurrency=1)
└─•Stream(source=range(...))
change logging level
import logging
logging.getLogger("streamable").setLevel(logging.WARNING)
Project details
Release history Release notifications | RSS feed
Download files
Download the file for your platform. If you're not sure which to choose, learn more about installing packages.
Source Distribution
Built Distribution
File details
Details for the file streamable-0.13.9.tar.gz
.
File metadata
- Download URL: streamable-0.13.9.tar.gz
- Upload date:
- Size: 27.0 kB
- Tags: Source
- Uploaded using Trusted Publishing? No
- Uploaded via: twine/5.1.1 CPython/3.12.4
File hashes
Algorithm | Hash digest | |
---|---|---|
SHA256 | ec833e83c8341bf4730a6632af2ed8f0083b6f18dfd38452d86ebf6f087a032e |
|
MD5 | 5abda89fd8037f897766484d9b4ac561 |
|
BLAKE2b-256 | 71e4bdba86369edf32aa5c5ede4b2f3cf58727998de8b533500c7e7acc76e02e |
File details
Details for the file streamable-0.13.9-py3-none-any.whl
.
File metadata
- Download URL: streamable-0.13.9-py3-none-any.whl
- Upload date:
- Size: 28.3 kB
- Tags: Python 3
- Uploaded using Trusted Publishing? No
- Uploaded via: twine/5.1.1 CPython/3.12.4
File hashes
Algorithm | Hash digest | |
---|---|---|
SHA256 | 9ba9d5e2fb634d80139032a8eeabb2d384c5372484780a7870b250a93d2ca3fd |
|
MD5 | 52b7221e104a4cb48bac67d39bdc7a2a |
|
BLAKE2b-256 | ac1bf2f647ca9a232b07f135cd8c08c9066621884c526b3c4fb08fe5b0e17791 |