Skip to main content

Rembus for python

Project description

Rembus for Python

Docs Stable Docs Dev Build Status Coverage

Rembus is a Pub/Sub and RPC middleware.

Features

  • Binary message encoding using CBOR.

  • Native support for exchanging DataFrames.

  • Persistent storage via DuckDB DuckLake.

  • Pub/Sub QOS0, QOS1 and QOS2.

  • Hierarchical topic routing with wildcards (*/*/temperature).

  • MQTT integration.

  • WebSocket transport.

See Rembus.jl broker for a full fledged broker that supports WebSocket, ZMQ and plain tcp protocols and more features like private topics, multi-tenancy and more.

Concepts

  • Component: an addressable node in a distributed system. A component connects to a broker and communicates using Pub/Sub and/or RPC semantics.

  • Broker: a specialized component responsible for routing Pub/Sub messages and dispatching RPC calls between components. A broker may also persist messages and expose services.

  • Topic: a "logical channel" string identifier used for Pub/Sub message routing (e.g. alarm_topic).

  • Topic space: a set of topics defined by wildcard patterns (e.g. */telemetry) used for bulk subscription..

  • Subscription: a callback bound to a topic or topic space; invoked automatically with the message payload when a Pub/Sub message is published. Supports wildcard topics and optional delivery of messages sent while the subscriber was offline (msgfrom=rb.LastReceived).

  • RPC Service: a named function exposed by a component and registered at the broker for remote invocation.

  • RPC Call: a synchronous or asynchronous request issued by a component to invoke a remote RPC service.

  • Schema: an optional declarative mapping between topic patterns and persistent storage tables, used to structure and persist messages at rest.

  • Data at Rest: broker capability to persist published messages into DuckDB tables defined by a schema, enabling historical queries and analytics.

Getting Started

Install the package:

pip install rembus

Broker

Start a broker (sync or async):

import rembus as rb

# sync version
bro = rb.node() # equivalent to rb.node(port = 8338)
bro.wait() # event loop, not needed in interactive interpreter


# async version
bro = await rb.component()
await bro.wait() 

Broker with persistent storage

A Rembus broker can be configured with a schema to persist published messages into a DuckDB database via the DataLake extension.

A schema declaratively maps Pub/Sub topic patterns to relational tables. Topics variables are extracted from the topic path and mapped to table columns; message payload fields are mapped to the remaining columns.

Example schema.json:

{
    "tables": [
        {
            "table": "sensor",
            "topic": ":site/:type/:dn/sensor",
            "columns": [
                {"col": "site", "type": "TEXT", "nullable": false},
                {"col": "type", "type": "TEXT", "nullable": false},
                {"col": "dn", "type": "TEXT"}
            ],
            "keys": ["dn"]
        },
        {
            "table": "telemetry",
            "topic": ":dn/telemetry",
            "columns": [
                {"col": "dn", "type": "TEXT"},
                {"col": "temperature", "type": "DOUBLE"},
                {"col": "pressure", "type": "DOUBLE"}
            ],
            "extras": {
                "recv_ts": "ts",
                "slot": "time_bucket"
            }
        }
    ]
}

Schema semantics

  • Each entry in tables defines a topic-to-table binding.

  • Topic segments prefixed with : are variables extracted from the topic path and written to the corresponding columns.

  • Message payload fields are mapped positionally or by name to remaining columns.

  • keys define logical primary keys.

  • extras specify broker-generated metadata (e.g. receive timestamp, time bucketing).

Example mappings

  • Messages published to :site/:type/:dn/sensor are persisted in the sensor table, with site, type, and dn derived from the topic path.

  • Messages published to :dn/telemetry are persisted in the telemetry table, with metric values extracted from the message payload.

If a schema is provided, DuckLake tables are created automatically if they do not already exist, and all matching publications are persisted.

For each table three special topics for RPC services are automatically created:

  • upsert_<table> insert/update records;
  • query_<table> retrieve records;
  • delete_<table> remove records;

RPC calls to query_* topics return a Polars DataFrame.

For example:

cli.rpc("query_sensor", {"where": "type='HVAC'"})

Starting a broker with storage enabled

import rembus as rb

bro = await rb.component(schema="schema.json")
await bro.wait() 

Components

Connect to a Broker:

# named component
cli = await rb.component("ws://host:8338/myname")

# default host/port
cli = await rb.component("myname")

# anonymous
cli = await rb.component(rb.anonym(host="host", port=8338))

Why named components

  • Enables authentication (RSA/ECDSA/shared secret).

  • Allows persistent twin mapping; offline messages are buffered. A Component connects to a Broker using a URL-like formatted string that identifies the broker address and declare the name of the Component.

Pub/Sub

Publish

# Single message
await cli.publish("site/type/dn/sensor")
await cli.publish("dn/telemetry", {'temperature': 21.6, 'pressure': 980})

# DataFrame
import polars as pl
df = pl.DataFrame({"dn":["s1","s2"], "temperature":[15,18.8]})
await cli.publish("telemetry", df)

Subscribe

# Single topic
def alarm(slogan, severity):
    print(f"{slogan}: {severity}")

sub = rb.node("monitor")
sub.subscribe(alarm, topic="alarm") # equivalent to sub.subscribe(alarm)

# Topic space
def telemetry(topic, data):
    print(f"{topic}: {data}")

sub.subscribe(telemetry, topic="**/telemetry", msgfrom=rb.LastReceived)

The subscribed alarm function gets called with the using the message payload: slogan as first argument and severity as second argument:

cli.publish("alarm", "mydevice: battery very low", "CRITICAL")

The option msgfrom is for receiving published messages when the component was not subscribed to the topic, for example because it was not connected when the messages was published.

RPC

Expose

import rembus as rb

def add(x,y):
    return x+y

handle = rb.node('calculator')
handle.expose(add)
handle.wait()

Call

import rembus as rb

handle = rb.node("myclient")
result = handle.rpc('add', 1, 2)

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

rembus-0.8.7.tar.gz (62.0 kB view details)

Uploaded Source

Built Distribution

If you're not sure about the file name format, learn more about wheel file names.

rembus-0.8.7-py3-none-any.whl (64.1 kB view details)

Uploaded Python 3

File details

Details for the file rembus-0.8.7.tar.gz.

File metadata

  • Download URL: rembus-0.8.7.tar.gz
  • Upload date:
  • Size: 62.0 kB
  • Tags: Source
  • Uploaded using Trusted Publishing? Yes
  • Uploaded via: twine/6.1.0 CPython/3.13.12

File hashes

Hashes for rembus-0.8.7.tar.gz
Algorithm Hash digest
SHA256 edd8889572350a7294678d8cf965450e65f0442b0aecc0ff31c039cbc44ff375
MD5 72432127df4e120ef466901163160e12
BLAKE2b-256 b82b8c1df62e59b597e2f27bc6aa50c39c8e9ecb07b24c9ee38d537ce9519d60

See more details on using hashes here.

Provenance

The following attestation bundles were made for rembus-0.8.7.tar.gz:

Publisher: python-app.yml on cardo-org/rembus.python

Attestations: Values shown here reflect the state when the release was signed and may no longer be current.

File details

Details for the file rembus-0.8.7-py3-none-any.whl.

File metadata

  • Download URL: rembus-0.8.7-py3-none-any.whl
  • Upload date:
  • Size: 64.1 kB
  • Tags: Python 3
  • Uploaded using Trusted Publishing? Yes
  • Uploaded via: twine/6.1.0 CPython/3.13.12

File hashes

Hashes for rembus-0.8.7-py3-none-any.whl
Algorithm Hash digest
SHA256 15f321e452661965fa3e7ce4d8e0f69c4739879fff036a63296cb1220dd4e8b0
MD5 18c1c00d1bb38969baf635d909856191
BLAKE2b-256 30e12a301fb6b5b77094eed8d5cb13fd0bd8da043d256c43045fe703ff1d361d

See more details on using hashes here.

Provenance

The following attestation bundles were made for rembus-0.8.7-py3-none-any.whl:

Publisher: python-app.yml on cardo-org/rembus.python

Attestations: Values shown here reflect the state when the release was signed and may no longer be current.

Supported by

AWS Cloud computing and Security Sponsor Datadog Monitoring Depot Continuous Integration Fastly CDN Google Download Analytics Pingdom Monitoring Sentry Error logging StatusPage Status page