Skip to main content

Carefully crafted library to operate with continuous streams of data in a reactive style with publish/subscribe and broker functionality.

Project description

https://img.shields.io/pypi/v/broqer.svg https://img.shields.io/travis/semiversus/python-broqer.svg https://codecov.io/gh/semiversus/python-broqer/branch/master/graph/badge.svg https://img.shields.io/github/license/semiversus/python-broqer.svg

Carefully crafted library to operate with continuous streams of data in a reactive style with publish/subscribe and broker functionality. Initial focus on embedded systems Broqer can be used wherever continuous streams of data have to be processed - and they are everywhere. Watch out!

Synopsis

  • Pure python implementation without dependencies

  • Under MIT license (2018 Günther Jena)

  • Compact library (<1000 lines of code) and well documented (>1000 lines of comments)

  • Fully unit tested (coverage towards 100%), coding style checked with flake8, static type checked with mypy

  • Operators known from ReactiveX and other streaming frameworks (like distinct, combine_latest, …)

  • Supporting asyncio for time depended operations and using coroutines (e.g. map_async, debounce, …)

  • Publishers are awaitable (e.g. await adc_raw)

  • Broker functionality via Hub

    • Centralised object to keep track of publishers and subscribers

    • Starting point to build applications with a microservice architecture

Install

Tested on Python 3.5 and following.

pip install broqer

Example

In the first example adc_raw is a Publisher emitting values from an analog digital converter. The value will be converted (scaled by factor 0.3), sampled and a moving average is applied. Filtering for values greater 1 will be printed (with the prefix ‘Voltage too high:’)

from broqer import op
import statistics

( adc_raw
  | op.map(lambda v:v*0.3) # apply a function with one argument returning to value multiplied by 0.3
  | op.sample(0.1) # periodically emit the actual value every 0.1 seconds
  | op.sliding_window(4) # append the value to a buffer with 4 elements (and drop the oldest value)
  | op.map(statistics.mean) # use ``statistics.mean`` to calulate the average over the emitted sequence
  | op.filter(lambda v:v>1) # emit only values greater 1
  | op.sink (print, 'Voltage too high:') # call ``print`` with 'Voltage too high:' and the value
)
https://cdn.rawgit.com/semiversus/python-broqer/ec5ddbbd/docs/example1.svg

Output to stdout:

Voltage too high: 1.25
Voltage too high: 1.5
Voltage too high: 1.75
Voltage too high: 2
Voltage too high: 2
Voltage too high: 2
Voltage too high: 2

API

Publishers

A Publisher is the source for messages.

Subject ()

Source with .emit(*args) method to publish a new message

Value (*init)

Source with a state (initialized via init)

FromIterable (iterable)

Use an iterable and emit each value

Just (*value)

Emit *value on each subscribtion

Using asyncio event loop:

FromPolling (interval, func, …)

Call func(*args, **kwargs) periodically and emit the returned values

Operators

accumulate (func, init)

Apply func(value, state) which is returning new state and value to emit

cache (*init)

Caching the emitted values to access it via .cache property

catch_exception (*exceptions)

Catching exceptions of following operators in the pipelines

combine_latest (*publishers)

Combine the latest emit of multiple publishers and emit the combination

distinct (*init)

Only emit values which changed regarding to the cached state

filter (predicate, …)

Filters values based on a predicate function

map (map_func, *args, **kwargs)

Apply map_func(*args, value, **kwargs) to each emitted value

merge (*publishers)

Merge emits of multiple publishers into one stream

pack (*args)

Emit a multi-argument emit as tuple of arguments

partition (size)

Group size emits into one emit as tuple

pluck (*picks)

Apply sequence of picks via getitem to emitted values

reduce (func, init)

Apply func to the current emitted value and the last result of func

sliding_window (size, …)

Group size emitted values overlapping

switch (mapping)

Emit selected source mapped by mapping

unpack (args)

Unpacking a sequence of values and use it to emit as arguments

Using asyncio event loop:

debounce (duetime, *reset_value)

Emit a value only after a given idle time (emits meanwhile are skipped)

delay (delay)

Emit every value delayed by the given time

map_async (map_coro, mode, …)

Apply map_coro to each emitted value allowing async processing

map_threaded (map_func, mode, …)

Apply map_func to each emitted value allowing threaded processing

sample (interval)

Emit the last received value periodically

throttle (duration)

Rate limit emits by the given time

Subscribers

A Subscriber is the sink for messages.

sink (func, *args, **kwargs)

Apply func(*args, value, **kwargs) to each emitted value

to_future (timeout=None)

Build a future able to await for

Credits

Broqer was inspired by:

  • RxPY: Reactive Extension for Python (by Børge Lanes and Dag Brattli)

  • aioreactive: Async/Await reactive tools for Python (by Dag Brattli)

  • streamz: build pipelines to manage continuous streams of data (by Matthew Rocklin)

  • MQTT: M2M connectivity protocol

  • Florian Feurstein: spending hours of discussion, coming up with great ideas and help me understand the concepts!

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

broqer-0.2.0.tar.gz (18.7 kB view details)

Uploaded Source

Built Distribution

broqer-0.2.0-py2.py3-none-any.whl (32.4 kB view details)

Uploaded Python 2 Python 3

File details

Details for the file broqer-0.2.0.tar.gz.

File metadata

  • Download URL: broqer-0.2.0.tar.gz
  • Upload date:
  • Size: 18.7 kB
  • Tags: Source
  • Uploaded using Trusted Publishing? No

File hashes

Hashes for broqer-0.2.0.tar.gz
Algorithm Hash digest
SHA256 29260c1743b135a4ed1749f66c485325a34e8c0a6d8eda01db12e2aa1719299d
MD5 1dd528266321619dd11cc7d13f5ee02a
BLAKE2b-256 fac5cc94480cbc36839a9b5fbfc175a23dc4cef3fc799b1b814053390c729ac0

See more details on using hashes here.

Provenance

File details

Details for the file broqer-0.2.0-py2.py3-none-any.whl.

File metadata

File hashes

Hashes for broqer-0.2.0-py2.py3-none-any.whl
Algorithm Hash digest
SHA256 a588997fe61a0419c17aeb1828369a5265d24fdcfb003a55c68ed4b36712b8cb
MD5 2a8331d9570ae3b1fa33e852fe5c1369
BLAKE2b-256 2405059dbc2d3c76d537fcdf3ff5b6c51e59c27878dd9540e2ebb2769170b9b4

See more details on using hashes here.

Provenance

Supported by

AWS AWS Cloud computing and Security Sponsor Datadog Datadog Monitoring Fastly Fastly CDN Google Google Download Analytics Microsoft Microsoft PSF Sponsor Pingdom Pingdom Monitoring Sentry Sentry Error logging StatusPage StatusPage Status page