Skip to main content

Rx or the observable pattern with curio async.

Project description


Unix Build Status Codacy Badge Coverage StatusScrutinizer Code QualityPyPI Version PyPI License

Versions following Semantic Versioning


A free implementation of "rx" alias "react" alias "the power of observable pattern and his children" for application server side.

Implementation is based on:

  • async function with curio framework
  • user Protocol declaration for type checking
  • our friends: poetry, flake8, black, isort, pytest, mypy, sphinx, tox, travis ...
  • a taste of namedtuple
  • a lot of closed variable, clojure function, inner function
  • a taste of curiosity
  • shake it, again a little bit and tadaa !


Install this library directly into an activated virtual environment:

$ pip install async-rx

or add it to your Poetry project:

$ poetry add async-rx

Note about Python version support:

  • python code and test use python 3.8+
  • typing extensions give us an optional python < 3.8 support

API and Usage

After installation, the package can imported:

$ python
>>> import async_rx
>>> async_rx.__version__

Take a look on documentation and API.

Function Name Description
rx_observer(on_next, on_error, on_completed) Return an observer.
rx_observer_from(observer, on_next, …) Build an observer from another one.
rx_collector(initial_value) Create an observer collector.
rx_create(subscribe, ensure_contract, …) Create an observable with specific delayed execution ‘subscribe’.
rx_defer(observable_factory) Create an observable when a subscription occurs.
rx_distinct(observable, frame_size) Create an observable which send distinct event inside a windows of size #frame_size.
rx_empty() Create an empty Observable.
rx_filter(observable, predicate, …) Create an observable which event are filtered by a predicate function.
rx_first(observable) Create an observale which only take the first event and complete.
rx_forward(observable, except_complet, …) Create an observable wich forward event.
rx_from(observable_input) Convert almost anything to an Observable.
rx_last(observable, count) Create an observale which only take #count (or less) last events and complete.
rx_of(*args) Convert arguments into an observable sequence.
rx_range(start, stop, step) Create an observable sequence of range.
rx_skip(observable, count) Create an obervable wich skip #count event on source.
rx_take(observable, count) Create an observable which take only first #count event maximum (could be less).
rx_throw(error) Create an observable wich always call error.
rx_reduce(observable, accumulator, …) Create an observable which reduce source with accumulator and seed value.
rx_count(observable) Create an observable wich counts the emissions on the source and emits result.
rx_max(observable) Create an observable wich returns the maximal item in the source when completes.
rx_min(observable) Create an observable wich returns minimal item in the source when completes.
rx_sum(observable) Create an observable wich return the sum items in the source when completes.
rx_avg(observable) Create an observable wich return the average items in the source when completes.
rx_buffer(observable, buffer_size) Buffer operator.
rx_window(observable, buffer_size) Window operator.
rx_merge(*observables) Flattens multiple Observables together by blending their values into one Observable.
rx_concat(*observables) Concat operator.
rx_zip(*observables) Combine multiple Observables to create an Observable.
rx_amb(*observables) Amb operator.
rx_map(observable, transform, …) Map operator.
rx_merge_map(*observables, transform) Merge map operator.
rx_group_by(observable, key_selector) Group by operator.
rx_sample(observable, duration) Sample operator used to rate-limit the sequence.
rx_throttle(observable, duration) Throttle operator.
rx_delay(observable, duration, buffer_size, …) Delay operator.
rx_debounce(an_observable, duration) Debounce operator.
rx_dict(initial_value) Create an observable on dictionnary.
rx_list(initial_value) Create an observable on list.
rx_repeat(duration, producer) Repeat data.
rx_repeat_series(source, ratio) Repeat a series (delay, value) as an observable for each subscription.
rx_subject(subject_handler) Create a subject.
rx_subject_from(a_subject, subscribe, …) Build a subject from another one by override some function.
rx_subject_replay(buffer_size, subject_handler) Create a replay subject.
rx_subject_behavior(subject_handler) Create a behavior subject.
rx_publish(an_observable, subject_handler, …) Create a Connectable Observable.
rx_publish_replay(an_observable, …) Create a publish_replay.
rx_publish_behavior(an_observable, …) Create a publish_behavior.

A short sample

With this amazing observer:

class ObserverCounterCollector:
    def __init__(self):
        self.on_next_count = 0
        self.on_completed_count = 0
        self.on_error_count = 0
        self.items: Any = list([]) # a bad idea isn't it

    async def on_next(self, item: Any) -> None:
        """Process item."""
        self.on_next_count += 1

    async def on_completed(self) -> None:
        """Signal completion of this observable."""
        self.on_completed_count += 1

    async def on_error(self, err: Any) -> None:
        self.on_error_count += 1

We will going to select odd number:

async def _predicate(item: int) -> bool:
    return item % 2 == 0

seeker = ObserverCounterCollector()

observable = rx_range(start=0, stop=100) # create an observable of [0, 1, ..., 99]
sub = await rx_filter(observable=observable, predicate=_predicate).subscribe(an_observer=seeker) # filter and subscribe
sub() # release resource

# we have :
assert seeker.on_next_count == 50
assert seeker.on_completed_count == 1
assert seeker.on_error_count == 0
assert seeker.items[0:6] == [0, 2, 4, 6, 8, 10]

Your new on react/rx and wanna taste it ?

First question: Where to begin ?

If you read this page, you probably ever doing lot of search on google & co, and probably loose as me about react component in html/js/whatever. I can't purpose your the best state of the explanation, but... AMHPOV, if you like to known how slug are done behind the scene, you should remember:

  • what is Observable pattern (or listener, alias callback)
  • what is an event emiter (something which send event ?)

ok, with this in mind:

  • an observer receive event from an observable when he subscribe on it
  • events are "on_next(item)", "on_completed()", and ... "on_error(err)"

Take a look in "" and come back :)

Did you see Observer/Observable/XXXHandler/Subscribe ? And Subcription (yes it will called for unsubscribe) ?

So rx_from([1, 2, ...]) create an observable which will send items of list in sequence when an observer subscribe on it. Take time to look at test unit :) You can go into observable module and see all rx_from, rx_defer, rx_last, ...

But but but, what is a freaking "subject" ?

It's like an observer AND an observable which can multicast items from an observable to several observers. As it is an observer, it can receive data from somewhere. As it is an observable, observer can subscribe on it.

Take a look under subject module and test unit, see what is a replay_subject, funny no ? See the function subject.

At the last for the goods: the big gun, the "ConnectableObservable" alias multicast. oh my god, they kill kenny!

Its like a subject which you can connect/disconnect as you want or automatically (with a call on ref_count). "connect" mean that subject start to receive from observable, so items will be send on observers.

I hope this could help you a little bit :)


Project details

Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

async_rx-1.0.4.tar.gz (32.5 kB view hashes)

Uploaded source

Built Distribution

async_rx-1.0.4-py3-none-any.whl (49.3 kB view hashes)

Uploaded py3

Supported by

AWS AWS Cloud computing and Security Sponsor Datadog Datadog Monitoring Fastly Fastly CDN Google Google Download Analytics Microsoft Microsoft PSF Sponsor Pingdom Pingdom Monitoring Sentry Sentry Error logging StatusPage StatusPage Status page