streamerate: a fluent and expressive Python library for chainable iterable processing, inspired by Java 8 streams.
Project description
streamerate
streamerate is a powerful pure-Python library inspired by Fluent Interface pattern (used by Java 8 streams), providing a chainable and expressive approach to processing iterable data.
By leveraging the Fluent Interface pattern, streamerate enables you to chain together multiple operations, such as filtering, mapping, and reducing, to create complex data processing pipelines with ease. With streamerate, you can write elegant and readable code that efficiently operates on streams of data, facilitating the development of clean and expressive Python applications.
streamerate empowers you to write elegant and functional code, unlocking the full potential of your iterable data processing pipelines
The library is distributed under the permissive MIT license, allowing you to freely use, modify, and distribute it in both open-source and commercial projects.
Note: streamerate originated as part of the pyxtension project but has since been migrated as a standalone library.
Installation
pip install streamerate
or from Github:
git clone https://github.com/asuiu/streamerate.git
cd streamerate
python setup.py install
or
git submodule add https://github.com/asuiu/streamerate.git
Modules overview
streams.py
stream
stream
subclasses collections.Iterable
. It's the same Python iterable, but with more added methods, suitable for multithreading and multiprocess processings.
Used to create stream processing pipelines, similar to those used in Scala and MapReduce programming model.
Those who used Apache Spark RDD functions will find this model of processing very easy to use.
streams
Never again will you have to write code like this:
> lst = xrange(1,6)
> reduce(lambda x, y: x * y, map(lambda _: _ * _, filter(lambda _: _ % 2 == 0, lst)))
64
From now on, you may simply write the following lines:
> the_stream = stream( xrange(1,6) )
> the_stream.\
filter(lambda _: _ % 2 == 0).\
map(lambda _: _ * _).\
reduce(lambda x, y: x * y)
64
A Word Count Map-Reduce naive example using multiprocessing map
corpus = [
"MapReduce is a programming model and an associated implementation for processing and generating large data sets with a parallel, distributed algorithm on a cluster.",
"At Google, MapReduce was used to completely regenerate Google's index of the World Wide Web",
"Conceptually similar approaches have been very well known since 1995 with the Message Passing Interface standard having reduce and scatter operations."]
def reduceMaps(m1, m2):
for k, v in m2.iteritems():
m1[k] = m1.get(k, 0) + v
return m1
word_counts = stream(corpus).\
mpmap(lambda line: stream(line.lower().split(' ')).countByValue()).\
reduce(reduceMaps)
Basic methods
map(f)
Identic with builtin map
but returns a stream
mpmap(self, f: Callable[[_K], _V], poolSize: int = cpu_count(), bufferSize: Optional[int] = None)
Parallel ordered map using multiprocessing.Pool.imap()
.
It can replace the map
when need to split computations to multiple cores, and order of results matters.
It spawns at most poolSize
processes and applies the f
function.
It won't take more than bufferSize
elements from the input unless it was already required by output, so you can use it with takeWhile
on infinite streams and not be afraid that it will continue work in background.
The elements in the result stream appears in the same order they appear in the initial iterable.
:type f: (T) -> V
:rtype: `stream`
mpfastmap(self, f: Callable[[_K], _V], poolSize: int = cpu_count(), bufferSize: Optional[int] = None)
Parallel ordered map using multiprocessing.Pool.imap_unordered()
.
It can replace the map
when the ordered of results doesn't matter.
It spawns at most poolSize
processes and applies the f
function.
It won't take more than bufferSize
elements from the input unless it was already required by output, so you can use it with takeWhile
on infinite streams and not be afraid that it will continue work in background.
The elements in the result stream appears in the unpredicted order.
:type f: (T) -> V
:rtype: `stream`
fastmap(self, f: Callable[[_K], _V], poolSize: int = cpu_count(), bufferSize: Optional[int] = None)
Parallel unordered map using multithreaded pool.
It can replace the map
when the ordered of results doesn't matter.
It spawns at most poolSize
threads and applies the f
function.
The elements in the result stream appears in the unpredicted order.
It won't take more than bufferSize
elements from the input unless it was already required by output, so you can use it with takeWhile
on infinite streams and not be afraid that it will continue work in background.
Because of CPython GIL it's most usefull for I/O or CPU intensive consuming native functions, or on Jython or IronPython interpreters.
:type f: (T) -> V
:rtype: stream
mtmap(self, f: Callable[[_K], _V], poolSize: int = cpu_count(), bufferSize: Optional[int] = None)
Parallel ordered map using multithreaded pool.
It can replace the map
and the order of output stream will be the same as of the input.
It spawns at most poolSize
threads and applies the f
function.
The elements in the result stream appears in the predicted order.
It won't take more than bufferSize
elements from the input unless it was already required by output, so you can use it with takeWhile
on infinite streams and not be afraid that it will continue work in background.
Because of CPython GIL it's most usefull for I/O or CPU intensive consuming native functions, or on Jython or IronPython interpreters.
:type f: (T) -> V
:rtype: stream
gtmap(self, f: Callable[[_K], _V], poolSize: int = cpu_count())
flatMap(predicate=_IDENTITY_FUNC)
:param predicate: is a function that will receive elements of self collection and return an iterable
By default predicate is an identity function
:type predicate: (V)-> collections.Iterable[T]
:return: will return stream of objects of the same type of elements from the stream returned by predicate()
Example:
stream([[1, 2], [3, 4], [4, 5]]).flatMap().toList() == [1, 2, 3, 4, 4, 5]
filter(predicate)
identic with builtin filter, but returns stream
reversed()
returns reversed stream
exists(predicate)
Tests whether a predicate holds for some of the elements of this sequence.
:rtype: bool
Example:
stream([1, 2, 3]).exists(0) -> False
stream([1, 2, 3]).exists(1) -> True
keyBy(keyfunc = _IDENTITY_FUNC)
Transforms stream of values to a stream of tuples (key, value)
:param keyfunc: function to map values to keys
:type keyfunc: (V) -> T
:return: stream of Key, Value pairs
:rtype: stream[( T, V )]
Example:
stream([1, 2, 3, 4]).keyBy(lambda _:_ % 2) -> [(1, 1), (0, 2), (1, 3), (0, 4)]
groupBy()
groupBy([keyfunc]) -> Make an iterator that returns consecutive keys and groups from the iterable.
The iterable needs not to be sorted on the same key function, but the keyfunction need to return hasable objects.
:param keyfunc: [Optional] The key is a function computing a key value for each element.
:type keyfunc: (T) -> (V)
:return: (key, sub-iterator) grouped by each value of key(value).
:rtype: stream[ ( V, slist[T] ) ]
Example:
stream([1, 2, 3, 4]).groupBy(lambda _: _ % 2) -> [(0, [2, 4]), (1, [1, 3])]
countByValue()
Returns a collections.Counter of values
Example
stream(['a', 'b', 'a', 'b', 'c', 'd']).countByValue() == {'a': 2, 'b': 2, 'c': 1, 'd': 1}
distinct()
Returns stream of distinct values. Values must be hashable.
stream(['a', 'b', 'a', 'b', 'c', 'd']).distinct() == {'a', 'b', 'c', 'd'}
reduce(f, init=None)
same arguments with builtin reduce() function
toSet()
returns sset() instance
toList()
returns slist() instance
toMap()
returns sdict() instance
sorted(key=None, cmp=None, reverse=False)
same arguments with builtin sorted()
size()
returns length of stream. Use carefully on infinite streams.
join(f)
Returns a string joined by f. Proivides same functionality as str.join() builtin method.
if f is basestring, uses it to join the stream, else f should be a callable that returns a string to be used for join
mkString(f)
identic with join(f)
take(n)
returns first n elements from stream
head()
returns first element from stream
zip()
the same behavior with itertools.izip()
unique(predicate=_IDENTITY_FUNC)
Returns a stream of unique (according to predicate) elements appearing in the same order as in original stream
The items returned by predicate should be hashable and comparable.
Statistics related methods
entropy()
calculates the Shannon entropy of the values from stream
pstddev()
Calculates the population standard deviation.
mean()
returns the arithmetical mean of the values
sum()
returns the sum of elements from stream
min(key=_IDENTITY_FUNC)
same functionality with builtin min() funcion
min_default(default, key=_IDENTITY_FUNC)
same functionality with min() but returns :default: when called on empty streams
max()
same functionality with builtin max()
maxes(key=_IDENTITY_FUNC)
returns a stream of max values from stream
mins(key=_IDENTITY_FUNC)
returns a stream of min values from stream
Other classes
slist
Inherits streams.stream
and built-in list
classes, and keeps in memory a list allowing faster index access
sset
Inherits streams.stream
and built-in set
classes, and keeps in memory the whole set of values
sdict
Inherits streams.stream
and built-in dict
, and keeps in memory the dict object.
defaultstreamdict
Inherits streams.sdict
and adds functionality of collections.defaultdict
from stdlib
License
streamerate is released under MIT license.
Alternatives
There are other libraries that support Fluent Interface streams as alternatives to streamerate, but being much more poor in features for streaming:
- https://pypi.org/project/lazy-streams/
- https://pypi.org/project/pystreams/
- https://pypi.org/project/fluentpy/
- https://github.com/matthagy/scalaps
- https://pypi.org/project/infixpy/ mentioned here
- https://github.com/sspipe/sspipe
and something quite different from Fluent pattern, that makes kind of Piping: https://github.com/sspipe/sspipe and https://github.com/JulienPalard/Pipe
Project details
Release history Release notifications | RSS feed
Download files
Download the file for your platform. If you're not sure which to choose, learn more about installing packages.
Source Distributions
Built Distribution
File details
Details for the file streamerate-1.0.4-py3-none-any.whl
.
File metadata
- Download URL: streamerate-1.0.4-py3-none-any.whl
- Upload date:
- Size: 28.0 kB
- Tags: Python 3
- Uploaded using Trusted Publishing? No
- Uploaded via: twine/4.0.2 CPython/3.8.16
File hashes
Algorithm | Hash digest | |
---|---|---|
SHA256 | 7af250ee7ba2c69456b6114fe016a1b042bba8c36c0a93b71302678fdc98be31 |
|
MD5 | acef21e5064df0f3930ea8d4a6c8c686 |
|
BLAKE2b-256 | fb2ad842592a26ac508434c292ae04cc8ab8eedbf902da6cca018f648f0a4e29 |