Carefully crafted library to operate with continuous streams of data in a reactive style with publish/subscribe and broker functionality.
Project description
Carefully crafted library to operate with continuous streams of data in a reactive style with publish/subscribe and broker functionality.
Synopsis
Pure python implementation without dependencies (except Python 3.5+)
Operators known from ReactiveX and other streaming frameworks (like distinct, combine_latest, …)
Supporting asyncio for time depended operations and using coroutines (e.g. map_async, debounce, …)
Publishers are awaitable (e.g. await adc_raw)
Supporting broker functionality (via Hub)
Compact library (<1000 lines of code), but well documented (>1000 lines of comments)
Fully unit tested (coverage towards 100%), coding style checked with flake8, static typing checked with mypy
Under MIT license (2018 Günther Jena)
Install
pip install broqer
Example
In the first example adc_raw is a Publisher emitting values from an analog digital converter. The value will be converter (scaled by factor 0.3), sampled and a moving average is applied. Filtering for values greater 1 will be printed (with the prefix ‘Voltage too high:’)
from broqer import op
import statistics
( adc_raw
| op.map(lambda v:v*0.3) # apply a function with one argument returning to value multiplied by 0.3
| op.sample(0.1) # periodically emit the actual value every 0.1 seconds
| op.sliding_window(4) # append the value to a buffer with 4 elements (and drop the oldest value)
| op.map(statistics.mean) # use ``statistics.mean`` to calulate the average over the emitted sequence
| op.filter(lambda v:v>1) # emit only values greater 1
| op.sink (print, 'Voltage too high:') # call ``print`` with 'Voltage too high:' and the value
)
Output of | op.sink(print, 'Voltage too high:'):
Voltage too high: 1.25
Voltage too high: 1.5
Voltage too high: 1.75
Voltage too high: 2
Voltage too high: 2
Voltage too high: 2
Voltage too high: 2
API
Publishers
Publisher are the sources for messages.
Subject () |
Source with .emit(*args) method to publish a new message |
Value (*init) |
Source with a state (initialized via init) |
FromIterable (iterable) |
Use an iterable and emit each value |
Using asyncio event loop:
FromPolling (interval, func, …) |
Call func(*args, **kwargs) periodically and emit the returned values |
Operators
accumulate (func, init) |
Apply func(value, state) which is returning new state and value to emit |
cache (*init) |
Caching the emitted values to access it via .cache property |
catch_exception (*exceptions) |
Catching exceptions of following operators in the pipelines |
combine_latest (*publishers) |
Combine the latest emit of multiple publishers and emit the combination |
distinct (*init) |
Only emit values which changed regarding to the cached state |
filter (predicate, …) |
Filters values based on a predicate function |
map (map_func, *args, **kwargs) |
Apply map_func(*args, value, **kwargs) to each emitted value |
merge (*publishers) |
Merge emits of multiple publishers into one stream |
pack (*args) |
Emit a multi-argument emit as tuple of arguments |
partition (size) |
Group size emits into one emit as tuple |
pluck (*picks) |
Apply sequence of picks via getitem to emitted values |
reduce (func, init) |
Apply func to the current emitted value and the last result of func |
sliding_window (size, …) |
Group size emitted values overlapping |
switch (mapping) |
Emit selected source mapped by mapping |
unpack (args) |
Unpacking a sequence of values and use it to emit as arguments |
Using asyncio event loop:
debounce (duetime) |
Emit a value only after a given idle time (emits meanwhile are skipped) |
delay (delay) |
Emit every value delayed by the given time |
map_async (map_coro, mode, …) |
Apply map_coro to each emitted value allowing async processing |
sample (interval) |
Emit the last received value periodically |
throttle (duration) |
Rate limit emits by the given time |
Subscribers
Subscriber are the sinks for messages.
sink (func, *args, **kwargs) |
Apply func(*args, value, **kwargs) to each emitted value |
to_future (timeout=None) |
Build a future able to await for |
Credits
Broqer was inspired by:
RxPY: Reactive Extension for Python (by Børge Lanes and Dag Brattli)
aioreactive: Async/Await reactive tools for Python (by Dag Brattli)
streamz: build pipelines to manage continous streams of data (by Matthew Rocklin)
MQTT: M2M connectivity protocol
Florian Feurstein: spending hours of discussion, coming up with great ideas and help me understand the concepts!
Project details
Release history Release notifications | RSS feed
Download files
Download the file for your platform. If you're not sure which to choose, learn more about installing packages.
Source Distribution
Built Distribution
Hashes for broqer-0.1.2-py2.py3-none-any.whl
Algorithm | Hash digest | |
---|---|---|
SHA256 | b2757cc3ed15e5324f009cee90a35aecfa5cb2905d14fd4107b148627a501862 |
|
MD5 | 935da6ac1fd44051cb07c6da9d0f70e3 |
|
BLAKE2b-256 | 192b02d9dbcc4d0b0dec71630f2611fef78d22c991aa667615f671aebd622575 |