Rembus for python
Project description
Rembus for Python
Rembus is a Pub/Sub and RPC middleware.
Features
-
Binary message encoding using CBOR.
-
Native support for exchanging DataFrames.
-
Persistent storage via DuckDB DuckLake.
-
Pub/Sub QOS0, QOS1 and QOS2.
-
Hierarchical topic routing with wildcards (
*/*/temperature). -
MQTT integration.
-
WebSocket transport.
See Rembus.jl broker for a full fledged broker that supports WebSocket, ZMQ and plain tcp protocols and more features like private topics, multi-tenancy and more.
Concepts
-
Component: an addressable node in a distributed system. A component connects to a broker and communicates using Pub/Sub and/or RPC semantics.
-
Broker: a specialized component responsible for routing Pub/Sub messages and dispatching RPC calls between components. A broker may also persist messages and expose services.
-
Topic: a "logical channel" string identifier used for Pub/Sub message routing (e.g.
alarm_topic). -
Topic space: a set of topics defined by wildcard patterns (e.g.
*/telemetry) used for bulk subscription.. -
Subscription: a callback bound to a topic or topic space; invoked automatically with the message payload when a Pub/Sub message is published. Supports wildcard topics and optional delivery of messages sent while the subscriber was offline (
msgfrom=rb.LastReceived). -
RPC Service: a named function exposed by a component and registered at the broker for remote invocation.
-
RPC Call: a synchronous or asynchronous request issued by a component to invoke a remote RPC service.
-
Schema: an optional declarative mapping between topic patterns and persistent storage tables, used to structure and persist messages at rest.
-
Data at Rest: broker capability to persist published messages into DuckDB tables defined by a schema, enabling historical queries and analytics.
Getting Started
Install the package:
pip install rembus
Broker
Start a broker (sync or async):
import rembus as rb
# sync version
bro = rb.node() # equivalent to rb.node(port = 8000)
bro.wait() # run broker loop, unnecessary if running interpreter
# async version
bro = await rb.component()
await bro.wait()
Broker with persistent storage
A Rembus broker can be configured with a schema to persist published messages into a DuckDB database via the DataLake extension.
A schema declaratively maps Pub/Sub topic patterns to relational tables. Topics variables are extracted from the topic path and mapped to table columns; message payload fields are mapped to the remaining columns.
Example schema.json:
{
"tables": [
{
"table": "sensor",
"topic": ":site/:type/:dn/sensor",
"columns": [
{"col": "site", "type": "TEXT", "nullable": false},
{"col": "type", "type": "TEXT", "nullable": false},
{"col": "dn", "type": "TEXT"}
],
"keys": ["dn"]
},
{
"table": "telemetry",
"topic": ":dn/telemetry",
"columns": [
{"col": "dn", "type": "TEXT"},
{"col": "temperature", "type": "DOUBLE"},
{"col": "pressure", "type": "DOUBLE"}
],
"extras": {
"recv_ts": "ts",
"slot": "time_bucket"
}
}
]
}
Schema semantics
-
Each entry in
tablesdefines a topic-to-table binding. -
Topic segments prefixed with
:are variables extracted from the topic path and written to the corresponding columns. -
Message payload fields are mapped positionally or by name to remaining columns.
-
keysdefine logical primary keys. -
extrasspecify broker-generated metadata (e.g. receive timestamp, time bucketing).
Example mappings
-
Messages published to
:site/:type/:dn/sensorare persisted in thesensortable, withsite,type, anddnderived from the topic path. -
Messages published to
:dn/telemetryare persisted in thetelemetrytable, with metric values extracted from the message payload.
If a schema is provided, DuckLake tables are created automatically if they do not already exist, and all matching publications are persisted.
For each table two special topics are automatically created:
query_<table>retrieve records;delete_<table>remove records;
RPC calls to query_* topics return a Polars DataFrame.
For example:
cli.rpc("query_sensor", {"where": "type='HVAC'"})
Starting a broker with storage enabled
import rembus as rb
bro = await rb.component(schema="schema.json")
await bro.wait()
Components
Connect to a Broker:
# named component
cli = await rb.component("ws://host:8000/myname")
# default host/port
cli = await rb.component("myname")
# anonymous
cli = await rb.component(rb.anonym(host="host", port=8000))
Why named components
-
Enables authentication (RSA/ECDSA/shared secret).
-
Allows persistent twin mapping; offline messages are buffered. A Component connects to a Broker using a URL-like formatted string that identifies the broker address and declare the name of the Component.
Pub/Sub
Publish
# Single message
await cli.publish("site/type/dn/sensor")
await cli.publish("dn/telemetry", {'temperature': 21.6, 'pressure': 980})
# DataFrame
import polars as pl
df = pl.DataFrame({"dn":["s1","s2"], "temperature":[15,18.8]})
await cli.publish("telemetry", df)
Subscribe
# Single topic
def alarm(slogan, severity):
print(f"{slogan}: {severity}")
sub = rb.node("monitor")
sub.subscribe(alarm, topic="alarm") # equivalent to sub.subscribe(alarm)
# Topic space
def telemetry(topic, data):
print(f"{topic}: {data}")
sub.subscribe(telemetry, topic="**/telemetry", msgfrom=rb.LastReceived)
The subscribed alarm function gets called with the using the message payload:
slogan as first argument and severity as second argument:
cli.publish("alarm", "mydevice: battery very low", "CRITICAL")
The option msgfrom is for receiving published messages when the component
was not subscribed to the topic, for example because it was not connected when
the messages was published.
RPC
Expose
import rembus as rb
def add(x,y):
return x+y
handle = rb.node('calculator')
handle.expose(add)
handle.wait()
Call
import rembus as rb
handle = rb.node("myclient")
result = handle.rpc('add', 1, 2)
Project details
Release history Release notifications | RSS feed
Download files
Download the file for your platform. If you're not sure which to choose, learn more about installing packages.
Source Distribution
Built Distribution
Filter files by name, interpreter, ABI, and platform.
If you're not sure about the file name format, learn more about wheel file names.
Copy a direct link to the current filters
File details
Details for the file rembus-0.8.1.tar.gz.
File metadata
- Download URL: rembus-0.8.1.tar.gz
- Upload date:
- Size: 58.8 kB
- Tags: Source
- Uploaded using Trusted Publishing? Yes
- Uploaded via: twine/6.1.0 CPython/3.13.7
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
ec1eaaf18985338ce9104403f7f59bc6e32e7e9809db0d341cabbc23862a8513
|
|
| MD5 |
f086bed5b687b55d004e2ddb29a08cf7
|
|
| BLAKE2b-256 |
f565147b65c6b6fe9695e0641a0648995d90dfabe171ecef934667bf70c6ed2b
|
Provenance
The following attestation bundles were made for rembus-0.8.1.tar.gz:
Publisher:
python-app.yml on cardo-org/rembus.python
-
Statement:
-
Statement type:
https://in-toto.io/Statement/v1 -
Predicate type:
https://docs.pypi.org/attestations/publish/v1 -
Subject name:
rembus-0.8.1.tar.gz -
Subject digest:
ec1eaaf18985338ce9104403f7f59bc6e32e7e9809db0d341cabbc23862a8513 - Sigstore transparency entry: 947248079
- Sigstore integration time:
-
Permalink:
cardo-org/rembus.python@b1df9cded29400cca06c618892463daa4df66569 -
Branch / Tag:
refs/tags/0.8.1 - Owner: https://github.com/cardo-org
-
Access:
public
-
Token Issuer:
https://token.actions.githubusercontent.com -
Runner Environment:
github-hosted -
Publication workflow:
python-app.yml@b1df9cded29400cca06c618892463daa4df66569 -
Trigger Event:
push
-
Statement type:
File details
Details for the file rembus-0.8.1-py3-none-any.whl.
File metadata
- Download URL: rembus-0.8.1-py3-none-any.whl
- Upload date:
- Size: 60.4 kB
- Tags: Python 3
- Uploaded using Trusted Publishing? Yes
- Uploaded via: twine/6.1.0 CPython/3.13.7
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
44a1d42aeec78bf100ac195348b69e36a04dbdfeda413b454d369aeede617b31
|
|
| MD5 |
4fff37b1c2748e267f7ec5120cee5a3d
|
|
| BLAKE2b-256 |
b6f313bf07d9a0035ea7f4d9e40818859a886b0aefa08de4cfb4ce2800aab29a
|
Provenance
The following attestation bundles were made for rembus-0.8.1-py3-none-any.whl:
Publisher:
python-app.yml on cardo-org/rembus.python
-
Statement:
-
Statement type:
https://in-toto.io/Statement/v1 -
Predicate type:
https://docs.pypi.org/attestations/publish/v1 -
Subject name:
rembus-0.8.1-py3-none-any.whl -
Subject digest:
44a1d42aeec78bf100ac195348b69e36a04dbdfeda413b454d369aeede617b31 - Sigstore transparency entry: 947248081
- Sigstore integration time:
-
Permalink:
cardo-org/rembus.python@b1df9cded29400cca06c618892463daa4df66569 -
Branch / Tag:
refs/tags/0.8.1 - Owner: https://github.com/cardo-org
-
Access:
public
-
Token Issuer:
https://token.actions.githubusercontent.com -
Runner Environment:
github-hosted -
Publication workflow:
python-app.yml@b1df9cded29400cca06c618892463daa4df66569 -
Trigger Event:
push
-
Statement type: