Skip to main content

Rembus for python

Project description

Rembus API for Python

Rembus is a Pub/Sub and RPC middleware.

There are few key concepts to get confident with Rembus:

  • A Component is a distributed application that communicate with Pub/Sub or RPC styles;
  • A Component connect to a Broker;
  • A Broker dispatch messages between Components;
  • A Component expose RPC services and/or subscribe to Pub/Sub topics;
  • A Component make RPC requests and/or publish messages to Pub/Sub topics;

This API version supports only the WebSocket protocol.

Getting Started

Start a Rembus broker, for example caronte.

Install the package:

pip install rembus

then import rembus.sync for using the synchronous Python API:

import rembus.sync as rembus

rb = rembus.component()
rb.publish({'name': 'sensor_1', 'metric': 'T', 'value':21.6})
rb.close()

or import rembus for using the asynchronous Python API:

import asyncio
import rembus

async def main():
    rb = await rembus.component(client_name)

    await rb.publish("mytopic", 
                         {
                            'name': 'sensor_1',
                            'metric': 'T',
                            'value':21.6
                         })

    await rb.close()


loop = asyncio.new_event_loop()
loop.run_until_complete(main())

Initialize a Component

Currently the Python API provides the WebSocket protocol for connecting to the Rembus broker.

The url argument of the component function define the component identity and the broker endpoint to connect:

import rembus

# Broker endpoint and named component
rb = rembus.component('ws://hostname:port/component_name')

# Broker endpoint and anonymous component 
rb = rembus.component('ws://hostname:port')

# Default broker and named component 
rb = rembus.component('component_name')

# Default broker and anonymous component 
rb = rembus.component()

The component builder function returns a Rembus handler that will be used for interacting with the components via Pub/Sub and RPC messages.

component_name is the unique name that assert the component identity between online sessions (connect/disconnect windows).

component_name is optional: if it is missing then a random identifier that changes at each connection event is used as the component identifier. In this case the broker is unable to bind the component to a persistent twin and messages published when the component is offline get not broadcasted to the component when it gets online again.

The default broker endpoint is set by REMBUS_BASE_URL environment variable and default to ws://127.0.0.1:8000.

Pub/Sub example

A message is published with publish function.

rb.publish('mytopic', arg_1, arg_2, ..., arg_N)

Where the arguments arg_i comprise the message data payload that gets received by the subscribed components.

A subscribed component interested to the topic mytopic have to define a function named as the topic of interest and with the same numbers of arguments:

# do something each time a message published to topic mytopic is published
def mytopic(arg_1, arg_2, ..., arg_N):
    ...

rb.subscribe(mytopic)

rb.forever()

The first argument to subscribe is the function, named as the topic of interest, that will be called each time a message is published.

The optional second argument of subscribe define the "retroactive" feature of the subscribed topic.

If the second argument is True then the messages published when the component is offline will be delivered as soon as the component will get online again, otherwise the messages published before connecting will be lost.

NOTE: To cache messages for an offline component the broker needs to know that such component has subscribed for a specific topic. This imply that messages published before the first subscribe happens will be lost. If you want all message will be delivered subscribe first and publish after.

RPC example

A RPC service is implemented with a function named as the exposed service.

import rembus.sync as rembus

def add(x,y):
    return x+y

rb = rembus.component('calculator')

rb.expose(add)

rb.forever()

The calculator component expose the add service, the RPC client will invoke as:

import rembus.sync as rembus

rb = rembus.component()
result = rb.rpc('add', 1, 2)

The asynchronous client and server implementations will be something like:

#server.py
import asyncio
import rembus

async def add(x, y):
    return x+y

async def main():
    rb = await rembus.component()
    
    await rb.expose(add)
    await rb.forever()

loop = asyncio.new_event_loop()
loop.run_until_complete(main())
# client.py
import asyncio
import rembus

async def main():
    rb = await rembus.component()
    result = await rb.rpc('add', 1, 2)
    print(f'result={result}')
    await rb.close()


loop = asyncio.new_event_loop()
loop.run_until_complete(main())

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

rembus-0.1.0.tar.gz (48.0 kB view details)

Uploaded Source

Built Distribution

rembus-0.1.0-py3-none-any.whl (34.9 kB view details)

Uploaded Python 3

File details

Details for the file rembus-0.1.0.tar.gz.

File metadata

  • Download URL: rembus-0.1.0.tar.gz
  • Upload date:
  • Size: 48.0 kB
  • Tags: Source
  • Uploaded using Trusted Publishing? No
  • Uploaded via: twine/5.0.0 CPython/3.11.4

File hashes

Hashes for rembus-0.1.0.tar.gz
Algorithm Hash digest
SHA256 0f45bad51f7873be5b053769cb2f1c93f23e710b78797537f158724e50d9a9ca
MD5 5c937ddc7966a2be93bec5217d543913
BLAKE2b-256 266b4a687553297fe878b0b835ff366fd13df6020c59c9acb54e4b03a6a7a8d5

See more details on using hashes here.

File details

Details for the file rembus-0.1.0-py3-none-any.whl.

File metadata

  • Download URL: rembus-0.1.0-py3-none-any.whl
  • Upload date:
  • Size: 34.9 kB
  • Tags: Python 3
  • Uploaded using Trusted Publishing? No
  • Uploaded via: twine/5.0.0 CPython/3.11.4

File hashes

Hashes for rembus-0.1.0-py3-none-any.whl
Algorithm Hash digest
SHA256 d9fae9283acbde38ec0a8e58753ac224666b77cd2a107313f51141b9d43d880e
MD5 b0f22166b49694bfe503f29b54b77b97
BLAKE2b-256 2b1408a06def61d968c5819a2f852eef63b9e5be65b461ce72caf2fd019751c0

See more details on using hashes here.

Supported by

AWS AWS Cloud computing and Security Sponsor Datadog Datadog Monitoring Fastly Fastly CDN Google Google Download Analytics Microsoft Microsoft PSF Sponsor Pingdom Pingdom Monitoring Sentry Sentry Error logging StatusPage StatusPage Status page