A library that integrates *backpressure* into the *Observable* via *Flowables*.
Project description
ReactiveX Backpressure Library
rxbp is a Python library that integrates backpressure into the Observable via Flowables.
Features
- Observable Pattern: built on the reactive programming model.
- Backpressure: enables memory-safe handling of fast data producers and slow consumers.
- Continuation certificate: ensures the execution of a Flowable completes, avoiding any continuation deadlock.
- RxPY compatibility: interoperates with RxPY, bridging classic observables and backpressure-aware Flowables.
- Favor usability - Favor an implementation that is simple, safe, and user-friendly, while accepting some computational overhead.
Example
import rxbp
source = rxbp.from_iterable(("Alpha", "Beta", "Gamma", "Delta", "Epsilon"))
flowable = (
source
.map(lambda s: len(s))
.filter(lambda i: i >= 5)
.tap(on_next=lambda v: print(f'Received {v}'))
)
# execute the flowable
rxbp.run(flowable)
RxPY integration
A Flowable can be created from an RxPY Observable using the rxbp.from_rx function.
Likewise, a Flowable can be converted to an RxPY Observable using the rxbp.to_rx function.
The example below demonstrates the two conversion:
import reactivex as rx
import rxbp
rx_source = rx.of("Alpha", "Beta", "Gamma", "Delta", "Epsilon")
# convert Observable to Flowable
source = rxbp.from_rx(rx_source)
flowable = (
source
.map(lambda s: len(s))
.filter(lambda i: i >= 5)
)
# convert Flowable to Observable
rxbp.to_rx(flowable).subscribe(lambda v: print(f"Received {v}"))
Run a Flowable
A Flowable is executed by calling the rxbp.run() function.
This call blocks program execution until the Flowable terminates (either by completing or erroring) and returns a list of the emitted elements.
Typically, rxbp.run() is intended to be the final step in a program: first, the dataflow is defined by constructing a Flowable; then, the program is executed by invoking run().
In contrast, functional programming languages eliminate the need for an explicit run() call—execution is handled automatically by the compiler or runtime.
In such cases, the programmer simply defines the computation, and the system takes care of running it.
In RxPY, the subscribe method is used to execute an Observable.
It accepts on_next, on_error, and on_completed handlers as arguments, but does not return the received items.
RxBP implements a similar method called unsafe_subscribe.
However, using it requires specific conditions: it should be called within an active Trampoline scheduler and an active main-thread scheduler to ensure that the program terminates when the Flowable completes.
To replicate the behavior of subscribe in RxPY — executing the stream and handling items via side effects — you can use the tap operator together with rxbp.run():
# Emulates the RxPY subscribe behavior
rxbp.run(
flowable.tap(on_next=print)
)
Operations
Create a Flowable
count- create a Flowable emitting 0, 1, 2, ...connectable- create a Flowable whose source must be specified by theconnectionsargument when calling therunfunctionempty- create a Flowable emitting no itemserror- create a Flowable emitting an exceptionfrom_iterable- create a Flowable that emits each element of an iterablefrom_value- create a Flowable that emits a single elementfrom_rx- wrap a rx.Observable and exposes it as a Flowable, relaying signals in a backpressure-aware manner.interval- create a Flowable emitting an item after every time intervalrepeat_value- create a Flowable that repeats the given elementschedule_on- schedule task on a dedicated schedulerschedule_relative- schedule task on a dedicated scheduler after a relative timeschedule_absolute- schedule task on a dedicated scheduler after an absolute time
Transforming operators
accumulate- apply an accumulator function over a Flowable sequence and returns each intermediate result.default_if_empty- emits a given value if the source completes without emitting anythingfilter- emit only those items for which the given predicate holdsfirst- emit the first element onlyflat_map- apply a function to each item emitted by the source and flattens the resultlast- emit last itemmap- map each element emitted by the source by applying the given functionreduce- apply an accumulator function over a Flowable sequence and emits a single elementrepeat- returns a Flowable that will resubscribe to the source when the source completesrepeat_first- return a Flowable that repeats the first element it receives from the source forever (until disposed).skip- skip the first n itemsskip_while- skip the first items while the given predicate holdstake- take the first n itemstake_while- take the first item while the given predicate holdstap- used to perform side-effects for notifications from the source Flowableto_list- create a new Flowable that collects the elements from the source sequence, and emits a single itemzip_with_index- zip each item emitted by the source with the enumerated index
Combining operators
merge- merge the items of the Flowable sequences into a single Flowablezip- Create a new Flowable from two Flowables by combining their item in pairs in a strict sequence
Other operators
share- share a Flowable to possibly multiple subscribers
Output functions
to_rx- create a rx Observable from a Observable
Reference
Below are some references related to this project:
- continuationmonad is a Python library that implements stack-safe continuations based on schedulers.
Project details
Release history Release notifications | RSS feed
Download files
Download the file for your platform. If you're not sure which to choose, learn more about installing packages.
Source Distribution
Built Distribution
Filter files by name, interpreter, ABI, and platform.
If you're not sure about the file name format, learn more about wheel file names.
Copy a direct link to the current filters
File details
Details for the file rxbp-0.1.0.tar.gz.
File metadata
- Download URL: rxbp-0.1.0.tar.gz
- Upload date:
- Size: 31.4 kB
- Tags: Source
- Uploaded using Trusted Publishing? No
- Uploaded via: twine/6.1.0 CPython/3.13.2
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
e434b68535fadf2de04e01d98dbefb7b5c722fb4b781ccb0a4f7f4b454bc25b5
|
|
| MD5 |
00baf6b45181cfa1e7ce2c9c7a8ec102
|
|
| BLAKE2b-256 |
014cae91f8edd60f112cfd38db5cb5435f03e3f34b74bb996fc0cecade039898
|
File details
Details for the file rxbp-0.1.0-py3-none-any.whl.
File metadata
- Download URL: rxbp-0.1.0-py3-none-any.whl
- Upload date:
- Size: 65.0 kB
- Tags: Python 3
- Uploaded using Trusted Publishing? No
- Uploaded via: twine/6.1.0 CPython/3.13.2
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
a51014226a6176ac24c6f567a968e9dfc9d20281f62be6bfcf38461024cee658
|
|
| MD5 |
5d58b50c84460a799c70fc5556c9b2f8
|
|
| BLAKE2b-256 |
4fa252dc7a4fd64191090d1f1abf526ba26ea88a5fa93ad1d8cb18efa4bbc66b
|