Skip to main content

Carefully crafted library to operate with continuous streams of data in a reactive style with publish/subscribe and broker functionality.

Project description

https://img.shields.io/pypi/v/broqer.svg https://img.shields.io/travis/semiversus/python-broqer/master.svg https://readthedocs.org/projects/python-broqer/badge/?version=latest https://codecov.io/gh/semiversus/python-broqer/branch/master/graph/badge.svg https://img.shields.io/github/license/semiversus/python-broqer.svg

Initial focus on embedded systems Broqer can be used wherever continuous streams of data have to be processed - and they are everywhere. Watch out!

https://cdn.rawgit.com/semiversus/python-broqer/7beb7379/docs/logo.svg

Synopsis

This is version 0.9 and is here to prepare version 1.0. Broqer 1.0.0 will be released soon (till end of october 2018) - stay tuned
  • Pure python implementation without dependencies
  • Under MIT license (2018 Günther Jena)
  • Source is hosted on GitHub.com
  • Documentation is hosted on ReadTheDocs.com
  • Tested on Python 3.5, 3.6, 3.7 and 3.8-dev
  • Unit tested with pytest, coding style checked with Flake8, static type checked with mypy, static code checked with Pylint, documented with Sphinx
  • Operators known from ReactiveX and other streaming frameworks (like Map, CombineLatest, …)
  • Broker functionality via Hub
    • Centralised object to keep track of publishers and subscribers
    • Starting point to build applications with a microservice architecture

Showcase

In other frameworks a Publisher is sometimes called Oberservable. A Subscriber is able to observe changes the publisher is emitting. With this basics you’re able to use the observer pattern - let’s see!

Observer pattern

Subscribing to a publisher is done via the | operator - here used as a pipe. A simple subscriber is op.Sink which is calling a function with optional positional and keyword arguments.

>>> from broqer import Value, op
>>> a = Value(5)  # create a value (publisher and subscriber with state)
>>> disposable = a | op.Sink(print, 'Change:')  # subscribe a callback
Change: 5

>>> a.emit(3)  # change the value
Change: 3

>>> disposable.dispose()  # unsubscribe

Combine publishers with arithmetic operators

You’re able to create publishers on the fly by combining two publishers with the common operators (like +, >, <<, …).

>>> from broqer import Value, op
>>> a = Value(1)
>>> b = Value(3)

>>> c = a * 3 > b  # create a new publisher via operator overloading
>>> c | op.Sink(print, 'c:')
c: False

>>> a.emit(1)  # will not change the state of c
>>> a.emit(2)
c: True

Also fancy stuff like getting item by index or key is possible:

>>> i = Value('a')
>>> d = Value({'a':100, 'b':200, 'c':300})

>>> d[i] | op.Sink(print, 'r:')
r: 100

>>> i.emit('c')
r: 300
>>> d.emit({'c':123})
r: 123

Some python built in functions can’t return Publishers (e.g. len() needs to return an integer). For this cases special functions are defined in broqer: Str, Int, Float, Len and In (for x in y).

Attribute access on a publisher is building a publisher where the actual attribute access is done on emitting values:

>>> i = Value('Attribute access made REACTIVE')
>>> i.lower().strip(sep=' ') | op.Sink(print)
['attribute', 'access', 'made', 'reactive']

>>> i.emit('Reactive and pythonic')
['reactive', 'and', 'pythonic']

Asyncio Support

A lot of operators are made for asynchronous operations. You’re able to debounce and throttle emits (via op.Debounce and op.Throttle), sample and delay (via op.Sample and op.Delay) or start coroutines and when finishing the result will be emitted.

>>> async def long_running_coro(value):
...     await asyncio.sleep(3)
...     return value + 1
...
>>> a = Value(0)
>>> a | op.MapAsync(long_running_coro) | op.Sink(print, 'Result:')

After 3 seconds the result will be:

Result: 0

MapAsync supports various modes how to handle a new emit when a coroutine is running. Default is a concurrent run of coroutines, but also various queue or interrupt mode is available.

Every publisher can be awaited in coroutines:

await signal_publisher

Function decorators

Make your own operators on the fly with function decorators. Decorators are available for Accumulate, CombineLatest, Filter, Map, MapAsync, MapThreaded, Reduce and Sink.

>>> @build_map
... def count_vowels(s):
...     return sum([s.count(v) for v in 'aeiou'])

>>> msg = Value('Hello World!)
>>> msg | count_vowels() | Sink(print, 'Number of vowels:')
Number of vowels: 3
>>> msg.emit('Wahuuu')
Number of vowels: 4

You can even make configurable Map s and Filter s:

>>> import re

>>> @build_filter
... def filter_pattern(pattern, s):
...     return re.search(pattern, s) is not None

>>> msg = Value('Cars passed: 135!')
>>> msg | filter_pattern('[0-9]*') | Sink(print)
Cars passed: 135!
>>> msg.emit('No cars have passed')
>>> msg.emit('Only 1 car has passed')
Only 1 car has passed

Install

pip install broqer

Credits

Broqer was inspired by:

  • RxPY: Reactive Extension for Python (by Børge Lanes and Dag Brattli)
  • aioreactive: Async/Await reactive tools for Python (by Dag Brattli)
  • streamz: build pipelines to manage continuous streams of data (by Matthew Rocklin)
  • MQTT: M2M connectivity protocol
  • Florian Feurstein: spending hours of discussion, coming up with great ideas and help me understand the concepts!

API

Publishers

A Publisher is the source for messages.

Using asyncio event loop:

Publisher () Basic publisher
StatefulPublisher (init) Publisher keeping an internal state
FromPolling (interval, func, …) Call func(*args, **kwargs) periodically and emit the returned values

Operators

Accumulate (func, init) Apply func(value, state) which is returning new state and value to emit
Cache (*init) Caching the emitted values to access it via .cache property
CatchException (*exceptions) Catching exceptions of following operators in the pipelines
CombineLatest (*publishers) Combine the latest emit of multiple publishers and emit the combination
Filter (predicate, …) Filters values based on a predicate function
Map (map_func, *args, **kwargs) Apply map_func(*args, value, **kwargs) to each emitted value
Merge (*publishers) Merge emits of multiple publishers into one stream
Partition (size) Group size emits into one emit as tuple
Reduce (func, init) Apply func to the current emitted value and the last result of func
Replace (value) Replace each received value by the given value
SlidingWindow (size, …) Group size emitted values overlapping
Switch (mapping) Emit selected source mapped by mapping

Using asyncio event loop:

Debounce (duetime, *reset_value) Emit a value only after a given idle time (emits meanwhile are skipped)
Delay (delay) Emit every value delayed by the given time
MapAsync (map_coro, mode, …) Apply map_coro to each emitted value allowing async processing
MapThreaded (map_func, mode, …) Apply map_func to each emitted value allowing threaded processing
Sample (interval) Emit the last received value periodically
Throttle (duration) Rate limit emits by the given time

Subscribers

A Subscriber is the sink for messages.

Sink (func, *args, **kwargs) Apply func(*args, value, **kwargs) to each emitted value
OnEmitFuture (timeout=None) Build a future able to await for
hub.utils.TopicMapper (d) Update a dictionary with changes from topics
Trace (d) Debug output for publishers

Subjects

Subject () Source with .emit(*args) method to publish a new message
Value (*init) Source with a state (initialized via init)

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Filename, size & hash SHA256 hash help File type Python version Upload date
broqer-0.9.0-py2.py3-none-any.whl (78.4 kB) Copy SHA256 hash SHA256 Wheel py2.py3 Sep 30, 2018
broqer-0.9.0.tar.gz (53.5 kB) Copy SHA256 hash SHA256 Source None Sep 30, 2018

Supported by

Elastic Elastic Search Pingdom Pingdom Monitoring Google Google BigQuery Sentry Sentry Error logging AWS AWS Cloud computing DataDog DataDog Monitoring Fastly Fastly CDN SignalFx SignalFx Supporter DigiCert DigiCert EV certificate StatusPage StatusPage Status page