Skip to main content

Rembus for python

Project description

Rembus for Python

Docs Stable Docs Dev Build Status Coverage

Rembus is a Pub/Sub and RPC middleware.

Features

  • Binary message encoding using CBOR.

  • Native support for exchanging DataFrames.

  • Persistent storage via DuckDB DuckLake.

  • Pub/Sub QOS0, QOS1 and QOS2.

  • Hierarchical topic routing with wildcards (*/*/temperature).

  • MQTT integration.

  • WebSocket transport.

See Rembus.jl broker for a full fledged broker that supports WebSocket, ZMQ and plain tcp protocols and more features like private topics, multi-tenancy and more.

Concepts

  • Component: an addressable node in a distributed system. A component connects to a broker and communicates using Pub/Sub and/or RPC semantics.

  • Broker: a specialized component responsible for routing Pub/Sub messages and dispatching RPC calls between components. A broker may also persist messages and expose services.

  • Topic: a "logical channel" string identifier used for Pub/Sub message routing (e.g. alarm_topic).

  • Topic space: a set of topics defined by wildcard patterns (e.g. */telemetry) used for bulk subscription..

  • Subscription: a callback bound to a topic or topic space; invoked automatically with the message payload when a Pub/Sub message is published. Supports wildcard topics and optional delivery of messages sent while the subscriber was offline (msgfrom=rb.LastReceived).

  • RPC Service: a named function exposed by a component and registered at the broker for remote invocation.

  • RPC Call: a synchronous or asynchronous request issued by a component to invoke a remote RPC service.

  • Schema: an optional declarative mapping between topic patterns and persistent storage tables, used to structure and persist messages at rest.

  • Data at Rest: broker capability to persist published messages into DuckDB tables defined by a schema, enabling historical queries and analytics.

Getting Started

Install the package:

pip install rembus

Broker

Start a broker (sync or async):

import rembus as rb

# sync version
bro = rb.node() # equivalent to rb.node(port = 8000)
bro.wait() # run broker loop, unnecessary if running interpreter


# async version
bro = await rb.component()
await bro.wait() 

Broker with persistent storage

A Rembus broker can be configured with a schema to persist published messages into a DuckDB database via the DataLake extension.

A schema declaratively maps Pub/Sub topic patterns to relational tables. Topics variables are extracted from the topic path and mapped to table columns; message payload fields are mapped to the remaining columns.

Example schema.json:

{
    "tables": [
        {
            "table": "sensor",
            "topic": ":site/:type/:dn/sensor",
            "columns": [
                {"col": "site", "type": "TEXT", "nullable": false},
                {"col": "type", "type": "TEXT", "nullable": false},
                {"col": "dn", "type": "TEXT"}
            ],
            "keys": ["dn"]
        },
        {
            "table": "telemetry",
            "topic": ":dn/telemetry",
            "columns": [
                {"col": "dn", "type": "TEXT"},
                {"col": "temperature", "type": "DOUBLE"},
                {"col": "pressure", "type": "DOUBLE"}
            ],
            "extras": {
                "recv_ts": "ts",
                "slot": "time_bucket"
            }
        }
    ]
}

Schema semantics

  • Each entry in tables defines a topic-to-table binding.

  • Topic segments prefixed with : are variables extracted from the topic path and written to the corresponding columns.

  • Message payload fields are mapped positionally or by name to remaining columns.

  • keys define logical primary keys.

  • extras specify broker-generated metadata (e.g. receive timestamp, time bucketing).

Example mappings

  • Messages published to :site/:type/:dn/sensor are persisted in the sensor table, with site, type, and dn derived from the topic path.

  • Messages published to :dn/telemetry are persisted in the telemetry table, with metric values extracted from the message payload.

If a schema is provided, DuckLake tables are created automatically if they do not already exist, and all matching publications are persisted.

For each table two special topics are automatically created:

  • query_<table> retrieve records;
  • delete_<table> remove records;

RPC calls to query_* topics return a Polars DataFrame.

For example:

cli.rpc("query_sensor", {"where": "type='HVAC'"})

Starting a broker with storage enabled

import rembus as rb

bro = await rb.component(schema="schema.json")
await bro.wait() 

Components

Connect to a Broker:

# named component
cli = await rb.component("ws://host:8000/myname")

# default host/port
cli = await rb.component("myname")

# anonymous
cli = await rb.component(rb.anonym(host="host", port=8000))

Why named components

  • Enables authentication (RSA/ECDSA/shared secret).

  • Allows persistent twin mapping; offline messages are buffered. A Component connects to a Broker using a URL-like formatted string that identifies the broker address and declare the name of the Component.

Pub/Sub

Publish

# Single message
await cli.publish("site/type/dn/sensor")
await cli.publish("dn/telemetry", {'temperature': 21.6, 'pressure': 980})

# DataFrame
import polars as pl
df = pl.DataFrame({"dn":["s1","s2"], "temperature":[15,18.8]})
await cli.publish("telemetry", df)

Subscribe

# Single topic
def alarm(slogan, severity):
    print(f"{slogan}: {severity}")

sub = rb.node("monitor")
sub.subscribe(alarm, topic="alarm") # equivalent to sub.subscribe(alarm)

# Topic space
def telemetry(topic, data):
    print(f"{topic}: {data}")

sub.subscribe(telemetry, topic="**/telemetry", msgfrom=rb.LastReceived)

The subscribed alarm function gets called with the using the message payload: slogan as first argument and severity as second argument:

cli.publish("alarm", "mydevice: battery very low", "CRITICAL")

The option msgfrom is for receiving published messages when the component was not subscribed to the topic, for example because it was not connected when the messages was published.

RPC

Expose

import rembus as rb

def add(x,y):
    return x+y

handle = rb.node('calculator')
handle.expose(add)
handle.wait()

Call

import rembus as rb

handle = rb.node("myclient")
result = handle.rpc('add', 1, 2)

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

rembus-0.8.1.tar.gz (58.8 kB view details)

Uploaded Source

Built Distribution

If you're not sure about the file name format, learn more about wheel file names.

rembus-0.8.1-py3-none-any.whl (60.4 kB view details)

Uploaded Python 3

File details

Details for the file rembus-0.8.1.tar.gz.

File metadata

  • Download URL: rembus-0.8.1.tar.gz
  • Upload date:
  • Size: 58.8 kB
  • Tags: Source
  • Uploaded using Trusted Publishing? Yes
  • Uploaded via: twine/6.1.0 CPython/3.13.7

File hashes

Hashes for rembus-0.8.1.tar.gz
Algorithm Hash digest
SHA256 ec1eaaf18985338ce9104403f7f59bc6e32e7e9809db0d341cabbc23862a8513
MD5 f086bed5b687b55d004e2ddb29a08cf7
BLAKE2b-256 f565147b65c6b6fe9695e0641a0648995d90dfabe171ecef934667bf70c6ed2b

See more details on using hashes here.

Provenance

The following attestation bundles were made for rembus-0.8.1.tar.gz:

Publisher: python-app.yml on cardo-org/rembus.python

Attestations: Values shown here reflect the state when the release was signed and may no longer be current.

File details

Details for the file rembus-0.8.1-py3-none-any.whl.

File metadata

  • Download URL: rembus-0.8.1-py3-none-any.whl
  • Upload date:
  • Size: 60.4 kB
  • Tags: Python 3
  • Uploaded using Trusted Publishing? Yes
  • Uploaded via: twine/6.1.0 CPython/3.13.7

File hashes

Hashes for rembus-0.8.1-py3-none-any.whl
Algorithm Hash digest
SHA256 44a1d42aeec78bf100ac195348b69e36a04dbdfeda413b454d369aeede617b31
MD5 4fff37b1c2748e267f7ec5120cee5a3d
BLAKE2b-256 b6f313bf07d9a0035ea7f4d9e40818859a886b0aefa08de4cfb4ce2800aab29a

See more details on using hashes here.

Provenance

The following attestation bundles were made for rembus-0.8.1-py3-none-any.whl:

Publisher: python-app.yml on cardo-org/rembus.python

Attestations: Values shown here reflect the state when the release was signed and may no longer be current.

Supported by

AWS Cloud computing and Security Sponsor Datadog Monitoring Depot Continuous Integration Fastly CDN Google Download Analytics Pingdom Monitoring Sentry Error logging StatusPage Status page