Skip to main content

streamerate: a fluent and expressive Python library for chainable iterable processing, inspired by Java 8 streams.

Project description

streamerate

Build Status

streamerate is a powerful pure-Python library inspired by Fluent Interface pattern (used by Java 8 streams), providing a chainable and expressive approach to processing iterable data.

By leveraging the Fluent Interface pattern, streamerate enables you to chain together multiple operations, such as filtering, mapping, and reducing, to create complex data processing pipelines with ease. With streamerate, you can write elegant and readable code that efficiently operates on streams of data, facilitating the development of clean and expressive Python applications.

streamerate empowers you to write elegant and functional code, unlocking the full potential of your iterable data processing pipelines

The library is distributed under the permissive MIT license, allowing you to freely use, modify, and distribute it in both open-source and commercial projects.

Note: streamerate originated as part of the pyxtension project but has since been migrated as a standalone library.

Installation

pip install streamerate

or from Github:

git clone https://github.com/asuiu/streamerate.git
cd streamerate
python setup.py install

or

git submodule add https://github.com/asuiu/streamerate.git

For developers (running unit tests)

First install Poetry and make sure it is in your $PATH. Create a virtual environment using Poetry and then use the script to run tests:

poetry install
poetry run ./run_tests.py

Versioning and Committing (Commitizen)

This project uses commitizen for conventional commits and automatic semantic version bumping.

Making a commit: To launch the interactive commit prompt and automatically format your commit message:

poetry run cz commit

Bypassing pre-commit hooks: If you need to skip the pre-commit linters (like pylint and ruff) when committing, you must use a double -- separator so Poetry passes the argument directly to cz and git:

poetry run cz commit -- -- --no-verify

Bumping the version: To automatically calculate the next Semantic Version based on your recent commits, update pyproject.toml, and generate a git tag:

poetry run cz bump

Bypassing hooks during a bump:

poetry run cz bump --no-verify

Modules overview

streams.py

stream

stream subclasses collections.Iterable. It's the same Python iterable, but with more added methods, suitable for multithreading and multiprocess processings. Used to create stream processing pipelines, similar to those used in Scala and MapReduce programming model. Those who used Apache Spark RDD functions will find this model of processing very easy to use.

streams

Never again will you have to write code like this:

> lst = xrange(1,6)
> reduce(lambda x, y: x * y, map(lambda _: _ * _, filter(lambda _: _ % 2 == 0, lst)))
64

From now on, you may simply write the following lines:

> the_stream = stream( xrange(1,6) )
> the_stream.\
    filter(lambda _: _ % 2 == 0).\
    map(lambda _: _ * _).\
    reduce(lambda x, y: x * y)
64

A Word Count Map-Reduce naive example using multiprocessing map

corpus = [
    "MapReduce is a programming model and an associated implementation for processing and generating large data sets with a parallel, distributed algorithm on a cluster.",
    "At Google, MapReduce was used to completely regenerate Google's index of the World Wide Web",
    "Conceptually similar approaches have been very well known since 1995 with the Message Passing Interface standard having reduce and scatter operations."]

def reduceMaps(m1, m2):
    for k, v in m2.iteritems():
        m1[k] = m1.get(k, 0) + v
    return m1

word_counts = stream(corpus).\
    mpmap(lambda line: stream(line.lower().split(' ')).countByValue()).\
    reduce(reduceMaps)

Basic methods

map(f)

Identic with builtin map but returns a stream

mpmap(self, f: Callable[[_K], _V], poolSize: int = cpu_count(), bufferSize: Optional[int] = None)

Parallel ordered map using multiprocessing.Pool.imap().

It can replace the map when need to split computations to multiple cores, and order of results matters.

It spawns at most poolSize processes and applies the f function.

It won't take more than bufferSize elements from the input unless it was already required by output, so you can use it with takeWhile on infinite streams and not be afraid that it will continue work in background.

The elements in the result stream appears in the same order they appear in the initial iterable.

:type f: (T) -> V
:rtype: `stream`
mpfastmap(self, f: Callable[[_K], _V], poolSize: int = cpu_count(), bufferSize: Optional[int] = None)

Parallel ordered map using multiprocessing.Pool.imap_unordered().

It can replace the map when the ordered of results doesn't matter.

It spawns at most poolSize processes and applies the f function.

It won't take more than bufferSize elements from the input unless it was already required by output, so you can use it with takeWhile on infinite streams and not be afraid that it will continue work in background.

The elements in the result stream appears in the unpredicted order.

:type f: (T) -> V
:rtype: `stream`
fastmap(self, f: Callable[[_K], _V], poolSize: int = cpu_count(), bufferSize: Optional[int] = None)

Parallel unordered map using multithreaded pool. It can replace the map when the ordered of results doesn't matter.

It spawns at most poolSize threads and applies the f function.

The elements in the result stream appears in the unpredicted order.

It won't take more than bufferSize elements from the input unless it was already required by output, so you can use it with takeWhile on infinite streams and not be afraid that it will continue work in background.

Because of CPython GIL it's most usefull for I/O or CPU intensive consuming native functions, or on Jython or IronPython interpreters.

:type f: (T) -> V

:rtype: stream

mtmap(self, f: Callable[[_K], _V], poolSize: int = cpu_count(), bufferSize: Optional[int] = None)

Parallel ordered map using multithreaded pool. It can replace the map and the order of output stream will be the same as of the input.

It spawns at most poolSize threads and applies the f function.

The elements in the result stream appears in the predicted order.

It won't take more than bufferSize elements from the input unless it was already required by output, so you can use it with takeWhile on infinite streams and not be afraid that it will continue work in background.

Because of CPython GIL it's most usefull for I/O or CPU intensive consuming native functions, or on Jython or IronPython interpreters.

:type f: (T) -> V

:rtype: stream

Ordered helper parallelism

The ordered helper methods mapKeys, mapValues, filterKeys, filterValues, filter, filterfalse, starfilter, tap, and for_each accept parallel= with one of:

  • None for sequential execution (default)
  • Threads(poolSize=None, bufferSize=None, *, pool=None) for ordered multithreaded execution
  • Procs(poolSize=None, bufferSize=None, start_method=StartMethod.AUTO, *, pool=None) for ordered multiprocessing execution

When bufferSize is omitted, Parallelism sets it to poolSize * 2.

When pool= is provided, Threads derives poolSize from pool._processes, uses the caller-supplied pool as-is, and leaves pool lifecycle management to the caller. pool is keyword-only, and cannot be combined with poolSize.

When pool= is provided, Procs derives poolSize from pool._processes, uses the caller-supplied pool as-is, and leaves pool lifecycle management to the caller. pool is keyword-only, and cannot be combined with poolSize or a non-default start_method.

StartMethod.AUTO is the default. It resolves to spawn on Windows, and to forkserver on Linux and other POSIX platforms when available, otherwise spawn.

StartMethod.FASTEST resolves to fork on Linux and other POSIX platforms when available, otherwise forkserver, and falls back to spawn on Windows.

import multiprocessing
from multiprocessing.pool import ThreadPool

from streamerate import Procs, Threads, StartMethod, stream

stream({"a": 1, "b": 2}.items()).mapKeys(str.upper, parallel=Threads(4)).toMap()
with ThreadPool(4) as thread_pool:
    stream({"a": 1, "b": 2}.items()).mapKeys(str.upper, parallel=Threads(pool=thread_pool)).toMap()
stream(range(10)).filter(lambda x: x % 2 == 0, parallel=Procs(4)).toList()
stream(range(10)).filter(lambda x: x % 2 == 0, parallel=Procs(4, start_method=StartMethod.FASTEST)).toList()
with multiprocessing.get_context("forkserver").Pool(4) as pool:
    stream(range(10)).filter(lambda x: x % 2 == 0, parallel=Procs(pool=pool)).toList()

For tap and for_each, the output order is still preserved, but side effects may happen out of order. With Procs(...), side effects run in worker processes.

gtmap(self, f: Callable[[_K], _V], poolSize: int = cpu_count())
flatMap(predicate=_IDENTITY_FUNC)

:param predicate: is a function that will receive elements of self collection and return an iterable

By default predicate is an identity function

:type predicate: (V)-> collections.Iterable[T]

:return: will return stream of objects of the same type of elements from the stream returned by predicate()

Example:

stream([[1, 2], [3, 4], [4, 5]]).flatMap().toList() == [1, 2, 3, 4, 4, 5]
filter(predicate)

identic with builtin filter, but returns stream

reversed()

returns reversed stream

exists(predicate)

Tests whether a predicate holds for some of the elements of this sequence.

:rtype: bool

Example:

stream([1, 2, 3]).exists(0) -> False
stream([1, 2, 3]).exists(1) -> True
keyBy(keyfunc = _IDENTITY_FUNC)

Transforms stream of values to a stream of tuples (key, value)

:param keyfunc: function to map values to keys

:type keyfunc: (V) -> T

:return: stream of Key, Value pairs

:rtype: stream[( T, V )]

Example:

stream([1, 2, 3, 4]).keyBy(lambda _:_ % 2) -> [(1, 1), (0, 2), (1, 3), (0, 4)]
groupBy()

groupBy([keyfunc]) -> Make an iterator that returns consecutive keys and groups from the iterable.

The iterable needs not to be sorted on the same key function, but the keyfunction need to return hasable objects.

:param keyfunc: [Optional] The key is a function computing a key value for each element.

:type keyfunc: (T) -> (V)

:return: (key, sub-iterator) grouped by each value of key(value).

:rtype: stream[ ( V, slist[T] ) ]

Example:

stream([1, 2, 3, 4]).groupBy(lambda _: _ % 2) -> [(0, [2, 4]), (1, [1, 3])]
countByValue()

Returns a collections.Counter of values

Example

stream(['a', 'b', 'a', 'b', 'c', 'd']).countByValue() == {'a': 2, 'b': 2, 'c': 1, 'd': 1}
distinct()

Returns stream of distinct values. Values must be hashable.

stream(['a', 'b', 'a', 'b', 'c', 'd']).distinct() == {'a', 'b', 'c', 'd'}
reduce(f, init=None)

same arguments with builtin reduce() function

throttle(max_req: int, interval: float) -> "stream[_K]"

Throttles the stream.

:param max_req: number of requests :param interval: period in number of seconds :return: throttled stream

Example:

>>> s = Stream()
>>> throttled_stream = s.throttle(10, 1.5)
>>> for item in throttled_stream:
...     print(item)
toSet()

returns sset() instance

toList()

returns slist() instance

toMap()

returns sdict() instance

sorted(key=None, cmp=None, reverse=False)

same arguments with builtin sorted()

size()

returns length of stream. Use carefully on infinite streams.

join(f)

Returns a string joined by f. Proivides same functionality as str.join() builtin method.

if f is basestring, uses it to join the stream, else f should be a callable that returns a string to be used for join

mkString(f)

identic with join(f)

take(n)
returns first n elements from stream
head()
returns first element from stream
zip()
the same behavior with itertools.izip()
unique(predicate=_IDENTITY_FUNC)
Returns a stream of unique (according to predicate) elements appearing in the same order as in original stream

The items returned by predicate should be hashable and comparable.

Statistics related methods

entropy()

calculates the Shannon entropy of the values from stream

pstddev()

Calculates the population standard deviation.

mean()

returns the arithmetical mean of the values

sum()

returns the sum of elements from stream

min(key=_IDENTITY_FUNC)

same functionality with builtin min() funcion

min_default(default, key=_IDENTITY_FUNC)

same functionality with min() but returns :default: when called on empty streams

max()

same functionality with builtin max()

maxes(key=_IDENTITY_FUNC)

returns a stream of max values from stream

mins(key=_IDENTITY_FUNC)

returns a stream of min values from stream

Other classes

slist

Inherits streams.stream and built-in list classes, and keeps in memory a list allowing faster index access

sset

Inherits streams.stream and built-in set classes, and keeps in memory the whole set of values

sdict

Inherits streams.stream and built-in dict, and keeps in memory the dict object.

defaultstreamdict

Inherits streams.sdict and adds functionality of collections.defaultdict from stdlib

License

streamerate is released under MIT license.

Alternatives

There are other libraries that support Fluent Interface streams as alternatives to streamerate, but being much more poor in features for streaming:

and something quite different from Fluent pattern, that makes kind of Piping: https://github.com/sspipe/sspipe and https://github.com/JulienPalard/Pipe

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

streamerate-1.2.11.tar.gz (44.1 kB view details)

Uploaded Source

Built Distribution

If you're not sure about the file name format, learn more about wheel file names.

streamerate-1.2.11-py3-none-any.whl (42.0 kB view details)

Uploaded Python 3

File details

Details for the file streamerate-1.2.11.tar.gz.

File metadata

  • Download URL: streamerate-1.2.11.tar.gz
  • Upload date:
  • Size: 44.1 kB
  • Tags: Source
  • Uploaded using Trusted Publishing? No
  • Uploaded via: twine/6.2.0 CPython/3.12.12

File hashes

Hashes for streamerate-1.2.11.tar.gz
Algorithm Hash digest
SHA256 4855cd5f91afa12860994b066f5aee635078fe8eed77393c909a194e4e97ad85
MD5 c1ddec19df658f0b59b9ab6cfdcf965c
BLAKE2b-256 14cfdb3e3123185b12e54fa1ead503a8d6c415b08429996be0536c9d4547df40

See more details on using hashes here.

File details

Details for the file streamerate-1.2.11-py3-none-any.whl.

File metadata

  • Download URL: streamerate-1.2.11-py3-none-any.whl
  • Upload date:
  • Size: 42.0 kB
  • Tags: Python 3
  • Uploaded using Trusted Publishing? No
  • Uploaded via: twine/6.2.0 CPython/3.12.12

File hashes

Hashes for streamerate-1.2.11-py3-none-any.whl
Algorithm Hash digest
SHA256 0a8b11ab65cb6518e4b61f4a91dad8133ddf50aa71699a8c7ddcf724d677be18
MD5 5b05ffd98a1592dbe043ebfd51e43df2
BLAKE2b-256 3e97707e706753682d6243fd9c8a357a8ab1ab4eecbf3bfef6e18502b92bb9d4

See more details on using hashes here.

Supported by

AWS Cloud computing and Security Sponsor Datadog Monitoring Depot Continuous Integration Fastly CDN Google Download Analytics Pingdom Monitoring Sentry Error logging StatusPage Status page