Skip to main content

Carefully crafted library to operate with continuous streams of data in a reactive style with publish/subscribe and broker functionality.

Project description

https://img.shields.io/pypi/v/broqer.svg https://img.shields.io/travis/semiversus/python-broqer/master.svg https://readthedocs.org/projects/python-broqer/badge/?version=latest https://codecov.io/gh/semiversus/python-broqer/branch/master/graph/badge.svg https://img.shields.io/github/license/semiversus/python-broqer.svg

Initial focus on embedded systems Broqer can be used wherever continuous streams of data have to be processed - and they are everywhere. Watch out!

https://cdn.rawgit.com/semiversus/python-broqer/7beb7379/docs/logo.svg

Synopsis

  • Pure python implementation without dependencies

  • Under MIT license (2018 Günther Jena)

  • Source is hosted on GitHub.com

  • Documentation is hosted on ReadTheDocs.com

  • Tested on Python 3.5, 3.6, 3.7. 3.8 and 3.9

  • Unit tested with pytest, coding style checked with Flake8, static type checked with mypy, static code checked with Pylint, documented with Sphinx

  • Operators known from ReactiveX and other streaming frameworks (like Map, CombineLatest, …)

    • Centralised object to keep track of publishers and subscribers

    • Starting point to build applications with a microservice architecture

Showcase

In other frameworks a Publisher is sometimes called Oberservable. A Subscriber is able to observe changes the publisher is emitting. With these basics you’re able to use the observer pattern - let’s see!

Observer pattern

Subscribing to a publisher is done via the .subscribe() method. A simple subscriber is Sink which is calling a function with optional positional and keyword arguments.

>>> from broqer import Publisher, Sink
>>> a = Publisher(5)  # create a publisher with state `5`
>>> s = Sink(print, 'Change:')  # create a subscriber
>>> disposable = a.subscribe(s)  # subscribe subscriber to publisher
Change: 5

>>> a.notify(3)  # change the state
Change: 3

>>> disposable.dispose()  # unsubscribe

Combine publishers with arithmetic operators

You’re able to create publishers on the fly by combining two publishers with the common operators (like +, >, <<, …).

>>> a = Publisher(1)
>>> b = Publisher(3)

>>> c = a * 3 > b  # create a new publisher via operator overloading
>>> disposable = c.subscribe(Sink(print, 'c:'))
c: False

>>> a.notify(2)
c: True

>>> b.notify(10)
c: False

Also fancy stuff like getting item by index or key is possible:

>>> i = Publisher('a')
>>> d = Publisher({'a':100, 'b':200, 'c':300})

>>> disposable = d[i].subscribe(Sink(print, 'r:'))
r: 100

>>> i.notify('c')
r: 300
>>> d.notify({'c':123})
r: 123

Some python built in functions can’t return Publishers (e.g. len() needs to return an integer). For these cases special functions are defined in broqer: Str, Int, Float, Len and In (for x in y). Also other functions for convenience are available: All, Any, BitwiseAnd and BitwiseOr.

Attribute access on a publisher is building a publisher where the actual attribute access is done on emitting values. A publisher has to know, which type it should mimic - this is done via .inherit_type(type).

>>> i = Publisher('Attribute access made REACTIVE')
>>> i.inherit_type(str)
>>> disposable = i.lower().split(sep=' ').subscribe(Sink(print))
['attribute', 'access', 'made', 'reactive']

>>> i.notify('Reactive and pythonic')
['reactive', 'and', 'pythonic']

Function decorators

Make your own operators on the fly with function decorators. Decorators are available for Accumulate, CombineLatest, Filter, Map, MapAsync, MapThreaded, Reduce and Sink.

>>> from broqer import op
>>> @op.build_map
... def count_vowels(s):
...     return sum([s.count(v) for v in 'aeiou'])

>>> msg = Publisher('Hello World!')
>>> disposable = (msg | count_vowels).subscribe(Sink(print, 'Number of vowels:'))
Number of vowels: 3
>>> msg.notify('Wahuuu')
Number of vowels: 4

You can even make configurable Map s and Filter s:

>>> import re

>>> @op.build_filter_factory
... def filter_pattern(pattern, s):
...     return re.search(pattern, s) is not None

>>> msg = Publisher('Cars passed: 135!')
>>> disposable = (msg | filter_pattern('[0-9]+')).subscribe(Sink(print))
Cars passed: 135!
>>> msg.notify('No cars have passed')
>>> msg.notify('Only 1 car has passed')
Only 1 car has passed

Install

pip install broqer

Credits

Broqer was inspired by:

  • RxPY: Reactive Extension for Python (by Børge Lanes and Dag Brattli)

  • aioreactive: Async/Await reactive tools for Python (by Dag Brattli)

  • streamz: build pipelines to manage continuous streams of data (by Matthew Rocklin)

  • MQTT: M2M connectivity protocol

  • Florian Feurstein: spending hours of discussion, coming up with great ideas and help me understand the concepts!

API

Publishers

A Publisher is the source for messages.

Publisher ()

Basic publisher

Operators

CombineLatest (*publishers)

Combine the latest emit of multiple publishers and emit the combination

Filter (predicate, …)

Filters values based on a predicate function

Map (map_func, *args, **kwargs)

Apply map_func(*args, value, **kwargs) to each emitted value

MapAsync (coro, mode, …)

Apply coro(*args, value, **kwargs) to each emitted value

Throttle (duration)

Limit the number of emits per duration

Subscribers

A Subscriber is the sink for messages.

Sink (func, *args, **kwargs)

Apply func(*args, value, **kwargs) to each emitted value

SinkAsync (coro, …)

Apply coro(*args, value, **kwargs) to each emitted value

OnEmitFuture (timeout=None)

Build a future able to await for

Trace (d)

Debug output for publishers

Values

Value (*init)

Publisher and Subscriber

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

broqer-2.3.2.tar.gz (59.1 kB view details)

Uploaded Source

Built Distribution

broqer-2.3.2-py2.py3-none-any.whl (32.6 kB view details)

Uploaded Python 2 Python 3

File details

Details for the file broqer-2.3.2.tar.gz.

File metadata

  • Download URL: broqer-2.3.2.tar.gz
  • Upload date:
  • Size: 59.1 kB
  • Tags: Source
  • Uploaded using Trusted Publishing? No
  • Uploaded via: twine/3.4.1 importlib_metadata/3.10.0 pkginfo/1.7.0 requests/2.25.1 requests-toolbelt/0.9.1 tqdm/4.59.0 CPython/3.9.1

File hashes

Hashes for broqer-2.3.2.tar.gz
Algorithm Hash digest
SHA256 ed2411f98956226ed8fcd3270ec5e198ab0d068289c7be9e1bafdd1266801e5f
MD5 e4f599a24abf36433c9c23a3996b2c9d
BLAKE2b-256 406457141d4b4ba933641d4d7a081172304fc8bb7b6f0fec189702a235d29c43

See more details on using hashes here.

Provenance

File details

Details for the file broqer-2.3.2-py2.py3-none-any.whl.

File metadata

  • Download URL: broqer-2.3.2-py2.py3-none-any.whl
  • Upload date:
  • Size: 32.6 kB
  • Tags: Python 2, Python 3
  • Uploaded using Trusted Publishing? No
  • Uploaded via: twine/3.4.1 importlib_metadata/3.10.0 pkginfo/1.7.0 requests/2.25.1 requests-toolbelt/0.9.1 tqdm/4.59.0 CPython/3.9.1

File hashes

Hashes for broqer-2.3.2-py2.py3-none-any.whl
Algorithm Hash digest
SHA256 95f84dd5d899b71056b1862c6c1eb3a182682d16764550e985e090c26466c033
MD5 26211667407e78a680400f38ae41bd5e
BLAKE2b-256 08612f8dae06f22ed5aa174716bc6bdf9bb1e459e38115123ed2f30f739d2292

See more details on using hashes here.

Provenance

Supported by

AWS AWS Cloud computing and Security Sponsor Datadog Datadog Monitoring Fastly Fastly CDN Google Google Download Analytics Microsoft Microsoft PSF Sponsor Pingdom Pingdom Monitoring Sentry Sentry Error logging StatusPage StatusPage Status page