Skip to main content

Rx or the observable pattern with curio async.

Project description

async-rx

Unix Build Status Codacy Badge Coverage StatusScrutinizer Code QualityPyPI Version PyPI License

Versions following Semantic Versioning

Overview

A free implementation of "rx" alias "react" alias "the power of observable pattern and his children" for application server side.

Implementation is based on:

  • async function with curio framework
  • user Protocol declaration for type checking
  • our friends: poetry, flake8, black, isort, pytest, mypy, sphinx, tox, travis ...
  • a taste of namedtuple
  • a lot of closed variable, clojure function, inner function
  • a taste of curiosity
  • shake it, again a little bit and tadaa !

Installation

Install this library directly into an activated virtual environment:

$ pip install async-rx

or add it to your Poetry project:

$ poetry add async-rx

Note about Python version support:

  • python code and test use python 3.8+
  • typing extensions give us an optional python < 3.8 support

API and Usage

After installation, the package can imported:

$ python
>>> import async_rx
>>> async_rx.__version__

Take a look on documentation and API.

Function Name Description
rx_observer(on_next, on_error, on_completed) Return an observer.
rx_observer_from(observer, on_next, …) Build an observer from another one.
rx_collector(initial_value) Create an observer collector.
rx_create(subscribe, ensure_contract, …) Create an observable with specific delayed execution ‘subscribe’.
rx_defer(observable_factory) Create an observable when a subscription occurs.
rx_distinct(observable, frame_size) Create an observable which send distinct event inside a windows of size #frame_size.
rx_empty() Create an empty Observable.
rx_filter(observable, predicate, …) Create an observable which event are filtered by a predicate function.
rx_first(observable) Create an observale which only take the first event and complete.
rx_forward(observable, except_complet, …) Create an observable wich forward event.
rx_from(observable_input) Convert almost anything to an Observable.
rx_last(observable, count) Create an observale which only take #count (or less) last events and complete.
rx_of(*args) Convert arguments into an observable sequence.
rx_range(start, stop, step) Create an observable sequence of range.
rx_skip(observable, count) Create an obervable wich skip #count event on source.
rx_take(observable, count) Create an observable which take only first #count event maximum (could be less).
rx_throw(error) Create an observable wich always call error.
rx_reduce(observable, accumulator, …) Create an observable which reduce source with accumulator and seed value.
rx_count(observable) Create an observable wich counts the emissions on the source and emits result.
rx_max(observable) Create an observable wich returns the maximal item in the source when completes.
rx_min(observable) Create an observable wich returns minimal item in the source when completes.
rx_sum(observable) Create an observable wich return the sum items in the source when completes.
rx_avg(observable) Create an observable wich return the average items in the source when completes.
rx_buffer(observable, buffer_size) Buffer operator.
rx_window(observable, buffer_size) Window operator.
rx_merge(*observables) Flattens multiple Observables together by blending their values into one Observable.
rx_concat(*observables) Concat operator.
rx_zip(*observables) Combine multiple Observables to create an Observable.
rx_amb(*observables) Amb operator.
rx_map(observable, transform, …) Map operator.
rx_merge_map(*observables, transform) Merge map operator.
rx_group_by(observable, key_selector) Group by operator.
rx_sample(observable, duration) Sample operator used to rate-limit the sequence.
rx_throttle(observable, duration) Throttle operator.
rx_delay(observable, duration, buffer_size, …) Delay operator.
rx_debounce(an_observable, duration) Debounce operator.
rx_dict(initial_value) Create an observable on dictionnary.
rx_list(initial_value) Create an observable on list.
rx_repeat(duration, producer) Repeat data.
rx_repeat_series(source, ratio) Repeat a series (delay, value) as an observable for each subscription.
rx_subject(subject_handler) Create a subject.
rx_subject_from(a_subject, subscribe, …) Build a subject from another one by override some function.
rx_subject_replay(buffer_size, subject_handler) Create a replay subject.
rx_subject_behavior(subject_handler) Create a behavior subject.
rx_publish(an_observable, subject_handler, …) Create a Connectable Observable.
rx_publish_replay(an_observable, …) Create a publish_replay.
rx_publish_behavior(an_observable, …) Create a publish_behavior.

A short sample

With this amazing observer:

class ObserverCounterCollector:
    def __init__(self):
        self.on_next_count = 0
        self.on_completed_count = 0
        self.on_error_count = 0
        self.items: Any = list([]) # a bad idea isn't it

    async def on_next(self, item: Any) -> None:
        """Process item."""
        self.items.append(item)
        self.on_next_count += 1

    async def on_completed(self) -> None:
        """Signal completion of this observable."""
        self.on_completed_count += 1

    async def on_error(self, err: Any) -> None:
        self.on_error_count += 1

We will going to select odd number:

async def _predicate(item: int) -> bool:
    return item % 2 == 0

seeker = ObserverCounterCollector()

observable = rx_range(start=0, stop=100) # create an observable of [0, 1, ..., 99]
sub = await rx_filter(observable=observable, predicate=_predicate).subscribe(an_observer=seeker) # filter and subscribe
sub() # release resource

# we have :
assert seeker.on_next_count == 50
assert seeker.on_completed_count == 1
assert seeker.on_error_count == 0
assert seeker.items[0:6] == [0, 2, 4, 6, 8, 10]

Your new on react/rx and wanna taste it ?

First question: Where to begin ?

If you read this page, you probably ever doing lot of search on google & co, and probably loose as me about react component in html/js/whatever. I can't purpose your the best state of the explanation, but... AMHPOV, if you like to known how slug are done behind the scene, you should remember:

  • what is Observable pattern (or listener, alias callback)
  • what is an event emiter (something which send event ?)

ok, with this in mind:

  • an observer receive event from an observable when he subscribe on it
  • events are "on_next(item)", "on_completed()", and ... "on_error(err)"

Take a look in "protocole.py" and come back :)

Did you see Observer/Observable/XXXHandler/Subscribe ? And Subcription (yes it will called for unsubscribe) ?

So rx_from([1, 2, ...]) create an observable which will send items of list in sequence when an observer subscribe on it. Take time to look at test unit :) You can go into observable module and see all rx_from, rx_defer, rx_last, ...

But but but, what is a freaking "subject" ?

It's like an observer AND an observable which can multicast items from an observable to several observers. As it is an observer, it can receive data from somewhere. As it is an observable, observer can subscribe on it.

Take a look under subject module and test unit, see what is a replay_subject, funny no ? See the function subject.

At the last for the goods: the big gun, the "ConnectableObservable" alias multicast. oh my god, they kill kenny!

Its like a subject which you can connect/disconnect as you want or automatically (with a call on ref_count). "connect" mean that subject start to receive from observable, so items will be send on observers.

I hope this could help you a little bit :)

Reference

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

async_rx-1.0.4.tar.gz (32.5 kB view details)

Uploaded Source

Built Distribution

If you're not sure about the file name format, learn more about wheel file names.

async_rx-1.0.4-py3-none-any.whl (49.3 kB view details)

Uploaded Python 3

File details

Details for the file async_rx-1.0.4.tar.gz.

File metadata

  • Download URL: async_rx-1.0.4.tar.gz
  • Upload date:
  • Size: 32.5 kB
  • Tags: Source
  • Uploaded using Trusted Publishing? No
  • Uploaded via: poetry/1.1.7 CPython/3.8.5 Darwin/20.5.0

File hashes

Hashes for async_rx-1.0.4.tar.gz
Algorithm Hash digest
SHA256 57e835f2ec51e662ba6314f91596d2313c38e7e4ec9821dd3c2650e7d5b35b1f
MD5 57f13065754e90e1501264f00f88b9de
BLAKE2b-256 cf219a901d53476a431da3d626e77658a698fd0d2418a162c1b66cee1183d7f7

See more details on using hashes here.

File details

Details for the file async_rx-1.0.4-py3-none-any.whl.

File metadata

  • Download URL: async_rx-1.0.4-py3-none-any.whl
  • Upload date:
  • Size: 49.3 kB
  • Tags: Python 3
  • Uploaded using Trusted Publishing? No
  • Uploaded via: poetry/1.1.7 CPython/3.8.5 Darwin/20.5.0

File hashes

Hashes for async_rx-1.0.4-py3-none-any.whl
Algorithm Hash digest
SHA256 fc322658a6bf5bc8a911faaada81066c8fabdaa752b03556aeb30c52609593a6
MD5 bf0a82e28bc219885f1c34cdbae2702d
BLAKE2b-256 f489f39cd1ee3c26894d423e8be3835e67c1ddbdcc18b334ec67a1279793a964

See more details on using hashes here.

Supported by

AWS Cloud computing and Security Sponsor Datadog Monitoring Depot Continuous Integration Fastly CDN Google Download Analytics Pingdom Monitoring Sentry Error logging StatusPage Status page