Skip to main content

Carefully crafted library to operate with continuous streams of data in a reactive style with publish/subscribe and broker functionality.

Project description

https://img.shields.io/pypi/v/broqer.svg https://img.shields.io/travis/semiversus/python-broqer.svg https://codecov.io/gh/semiversus/python-broqer/branch/master/graph/badge.svg https://img.shields.io/github/license/semiversus/python-broqer.svg

Carefully crafted library to operate with continuous streams of data in a reactive style with publish/subscribe and broker functionality.

Synopsis

  • Pure python implementation without dependencies (except Python 3.5+)
  • Operators known from ReactiveX and other streaming frameworks (like distinct, combine_latest, …)
  • Supporting asyncio for time depended operations and using coroutines (e.g. map_async, debounce, …)
  • Publishers are awaitable (e.g. await adc_raw)
  • Supporting broker functionality (via Hub)
  • Compact library (<1000 lines of code), but well documented (>1000 lines of comments)
  • Fully unit tested (coverage towards 100%), coding style checked with flake8, static typing checked with mypy
  • Under MIT license (2018 Günther Jena)

Install

pip install broqer

Example

In the first example adc_raw is a Publisher emitting values from an analog digital converter. The value will be converter (scaled by factor 0.3), sampled and a moving average is applied. Filtering for values greater 1 will be printed (with the prefix ‘Voltage too high:’)

from broqer import op
import statistics

( adc_raw
  | op.map(lambda v:v*0.3) # apply a function with one argument returning to value multiplied by 0.3
  | op.sample(0.1) # periodically emit the actual value every 0.1 seconds
  | op.sliding_window(4) # append the value to a buffer with 4 elements (and drop the oldest value)
  | op.map(statistics.mean) # use ``statistics.mean`` to calulate the average over the emitted sequence
  | op.filter(lambda v:v>1) # emit only values greater 1
  | op.sink (print, 'Voltage too high:') # call ``print`` with 'Voltage too high:' and the value
)
https://github.com/semiversus/python-broqer/blob/master/docs/example1.svg

Output to stdout:

Voltage too high: 1.25
Voltage too high: 1.5
Voltage too high: 1.75
Voltage too high: 2
Voltage too high: 2
Voltage too high: 2
Voltage too high: 2

API

Publishers

Publisher are the sources for messages.

Subject () Source with .emit(*args) method to publish a new message
Value (*init) Source with a state (initialized via init)
FromIterable (iterable) Use an iterable and emit each value

Using asyncio event loop:

FromPolling (interval, func, …) Call func(*args, **kwargs) periodically and emit the returned values

Operators

accumulate (func, init) Apply func(value, state) which is returning new state and value to emit
cache (*init) Caching the emitted values to access it via .cache property
catch_exception (*exceptions) Catching exceptions of following operators in the pipelines
combine_latest (*publishers) Combine the latest emit of multiple publishers and emit the combination
distinct (*init) Only emit values which changed regarding to the cached state
filter (predicate, …) Filters values based on a predicate function
map (map_func, *args, **kwargs) Apply map_func(*args, value, **kwargs) to each emitted value
merge (*publishers) Merge emits of multiple publishers into one stream
pack (*args) Emit a multi-argument emit as tuple of arguments
partition (size) Group size emits into one emit as tuple
pluck (*picks) Apply sequence of picks via getitem to emitted values
reduce (func, init) Apply func to the current emitted value and the last result of func
sliding_window (size, …) Group size emitted values overlapping
switch (mapping) Emit selected source mapped by mapping
unpack (args) Unpacking a sequence of values and use it to emit as arguments

Using asyncio event loop:

debounce (duetime) Emit a value only after a given idle time (emits meanwhile are skipped)
delay (delay) Emit every value delayed by the given time
map_async (map_coro, mode, …) Apply map_coro to each emitted value allowing async processing
sample (interval) Emit the last received value periodically
throttle (duration) Rate limit emits by the given time

Subscribers

Subscriber are the sinks for messages.

sink (func, *args, **kwargs) Apply func(*args, value, **kwargs) to each emitted value
to_future (timeout=None) Build a future able to await for

Credits

Broqer was inspired by:

  • RxPY: Reactive Extension for Python (by Børge Lanes and Dag Brattli)
  • aioreactive: Async/Await reactive tools for Python (by Dag Brattli)
  • streamz: build pipelines to manage continous streams of data (by Matthew Rocklin)
  • MQTT: M2M connectivity protocol
  • Florian Feurstein: spending hours of discussion, coming up with great ideas and help me understand the concepts!

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Files for broqer, version 0.1.3
Filename, size File type Python version Upload date Hashes
Filename, size broqer-0.1.3-py2.py3-none-any.whl (5.7 kB) File type Wheel Python version py2.py3 Upload date Hashes View
Filename, size broqer-0.1.3.tar.gz (7.6 kB) File type Source Python version None Upload date Hashes View

Supported by

Pingdom Pingdom Monitoring Google Google Object Storage and Download Analytics Sentry Sentry Error logging AWS AWS Cloud computing DataDog DataDog Monitoring Fastly Fastly CDN DigiCert DigiCert EV certificate StatusPage StatusPage Status page