Skip to main content

A library that integrates *backpressure* into the *Observable* via *Flowables*.

Project description

ReactiveX Backpressure Library

rxbp is a Python library that integrates backpressure into the Observable via Flowables.

Features

  • Observable Pattern: built on the reactive programming model.
  • Backpressure: enables memory-safe handling of fast data producers and slow consumers.
  • Continuation certificate: ensures the execution of a Flowable completes, avoiding any continuation deadlock.
  • RxPY compatibility: interoperates with RxPY, bridging classic observables and backpressure-aware Flowables.
  • Favor usability - Favor an implementation that is simple, safe, and user-friendly, while accepting some computational overhead.

Example

import rxbp

source = rxbp.from_iterable(("Alpha", "Beta", "Gamma", "Delta", "Epsilon"))

flowable = (
    source
    .map(lambda s: len(s))
    .filter(lambda i: i >= 5)
    .tap(on_next=lambda v: print(f'Received {v}'))
)

# execute the flowable
rxbp.run(flowable)

RxPY integration

A Flowable can be created from an RxPY Observable using the rxbp.from_rx function. Likewise, a Flowable can be converted to an RxPY Observable using the rxbp.to_rx function. The example below demonstrates the two conversion:

import reactivex as rx
import rxbp

rx_source = rx.of("Alpha", "Beta", "Gamma", "Delta", "Epsilon")

# convert Observable to Flowable
source = rxbp.from_rx(rx_source)

flowable = (
    source
    .map(lambda s: len(s))
    .filter(lambda i: i >= 5)
)

# convert Flowable to Observable
rxbp.to_rx(flowable).subscribe(lambda v: print(f"Received {v}"))

Run a Flowable

A Flowable is executed by calling the rxbp.run() function. This call blocks program execution until the Flowable terminates (either by completing or erroring) and returns a list of the emitted elements. Typically, rxbp.run() is intended to be the final step in a program: first, the dataflow is defined by constructing a Flowable; then, the program is executed by invoking run(). In contrast, functional programming languages eliminate the need for an explicit run() call—execution is handled automatically by the compiler or runtime. In such cases, the programmer simply defines the computation, and the system takes care of running it.

In RxPY, the subscribe method is used to execute an Observable. It accepts on_next, on_error, and on_completed handlers as arguments, but does not return the received items. RxBP implements a similar method called unsafe_subscribe. However, using it requires specific conditions: it should be called within an active Trampoline scheduler and an active main-thread scheduler to ensure that the program terminates when the Flowable completes. To replicate the behavior of subscribe in RxPY — executing the stream and handling items via side effects — you can use the tap operator together with rxbp.run():

# Emulates the RxPY subscribe behavior
rxbp.run(
    flowable.tap(on_next=print)
)

Operations

Create a Flowable

  • count - create a Flowable emitting 0, 1, 2, ...
  • connectable - create a Flowable whose source must be specified by the connections argument when calling the run function
  • empty - create a Flowable emitting no items
  • error - create a Flowable emitting an exception
  • from_iterable - create a Flowable that emits each element of an iterable
  • from_value - create a Flowable that emits a single element
  • from_rx - wrap a rx.Observable and exposes it as a Flowable, relaying signals in a backpressure-aware manner.
  • interval - create a Flowable emitting an item after every time interval
  • repeat_value - create a Flowable that repeats the given element
  • schedule_on - schedule task on a dedicated scheduler
  • schedule_relative - schedule task on a dedicated scheduler after a relative time
  • schedule_absolute - schedule task on a dedicated scheduler after an absolute time

Transforming operators

  • accumulate - apply an accumulator function over a Flowable sequence and returns each intermediate result.
  • default_if_empty - emits a given value if the source completes without emitting anything
  • filter - emit only those items for which the given predicate holds
  • first - emit the first element only
  • flat_map - apply a function to each item emitted by the source and flattens the result
  • last - emit last item
  • map - map each element emitted by the source by applying the given function
  • reduce - apply an accumulator function over a Flowable sequence and emits a single element
  • repeat - returns a Flowable that will resubscribe to the source when the source completes
  • repeat_first - return a Flowable that repeats the first element it receives from the source forever (until disposed).
  • skip - skip the first n items
  • skip_while - skip the first items while the given predicate holds
  • take - take the first n items
  • take_while - take the first item while the given predicate holds
  • tap - used to perform side-effects for notifications from the source Flowable
  • to_list - create a new Flowable that collects the elements from the source sequence, and emits a single item
  • zip_with_index - zip each item emitted by the source with the enumerated index

Combining operators

  • merge - merge the items of the Flowable sequences into a single Flowable
  • zip - Create a new Flowable from two Flowables by combining their item in pairs in a strict sequence

Other operators

  • share - share a Flowable to possibly multiple subscribers

Output functions

  • to_rx - create a rx Observable from a Observable

Reference

Below are some references related to this project:

  • continuationmonad is a Python library that implements stack-safe continuations based on schedulers.

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

rxbp-0.1.0.tar.gz (31.4 kB view details)

Uploaded Source

Built Distribution

If you're not sure about the file name format, learn more about wheel file names.

rxbp-0.1.0-py3-none-any.whl (65.0 kB view details)

Uploaded Python 3

File details

Details for the file rxbp-0.1.0.tar.gz.

File metadata

  • Download URL: rxbp-0.1.0.tar.gz
  • Upload date:
  • Size: 31.4 kB
  • Tags: Source
  • Uploaded using Trusted Publishing? No
  • Uploaded via: twine/6.1.0 CPython/3.13.2

File hashes

Hashes for rxbp-0.1.0.tar.gz
Algorithm Hash digest
SHA256 e434b68535fadf2de04e01d98dbefb7b5c722fb4b781ccb0a4f7f4b454bc25b5
MD5 00baf6b45181cfa1e7ce2c9c7a8ec102
BLAKE2b-256 014cae91f8edd60f112cfd38db5cb5435f03e3f34b74bb996fc0cecade039898

See more details on using hashes here.

File details

Details for the file rxbp-0.1.0-py3-none-any.whl.

File metadata

  • Download URL: rxbp-0.1.0-py3-none-any.whl
  • Upload date:
  • Size: 65.0 kB
  • Tags: Python 3
  • Uploaded using Trusted Publishing? No
  • Uploaded via: twine/6.1.0 CPython/3.13.2

File hashes

Hashes for rxbp-0.1.0-py3-none-any.whl
Algorithm Hash digest
SHA256 a51014226a6176ac24c6f567a968e9dfc9d20281f62be6bfcf38461024cee658
MD5 5d58b50c84460a799c70fc5556c9b2f8
BLAKE2b-256 4fa252dc7a4fd64191090d1f1abf526ba26ea88a5fa93ad1d8cb18efa4bbc66b

See more details on using hashes here.

Supported by

AWS Cloud computing and Security Sponsor Datadog Monitoring Depot Continuous Integration Fastly CDN Google Download Analytics Pingdom Monitoring Sentry Error logging StatusPage Status page