Communication Library for Python implementing the most common communication patterns for CyberPhysical Systems.
Project description
📜 Table of Contents
- 📜 Table of Contents
- 📖 Overview
- 👾 Features
- 📜 Project Index
- 🚀 Getting Started
- 🏗️ Advanced
- 🤖 Examples
- 🧪 Testing
- 🎞️ Roadmap
- 🤝 Contributing
- 📜 License
- 🎩 Acknowledgments
- 🌟 Star History
📖 Overview
Commlib is a Domain-specific Language for communication and messaging in Cyber-Physical Systems. Can be used for rapid development of the communication layer on-device, at the Edge and on the Cloud, or using a mixed multi-level multi-broker schema.
The goal of this project is to implement a simple Protocol-agnostic API (AMQP, Kafka, Redis, MQTT, etc) for common communication patterns in the context of Cyber-Physical Systems, using message broker technologies. Such patterns include PubSub, RPC and Preemptive Services (aka Actions), among others.
👾 Features
| Feature | Summary | |
|---|---|---|
| ⚙️ | Protocol-Agnostic |
|
| 📄 | Documentation |
|
| 🧩 | Modularity |
|
| ⚡️ | Performance |
|
| 📦 | Dependencies |
|
📜 Project Index
brokers
⦿ brokersredis
⦿ brokers.redis
File Name Summary redis.conf - The redis.conffile in thebrokers/redisdirectory configures the Redis server instance used within the larger application
- It dictates settings such as memory allocation and potentially includes other configuration files for customized server behavior
- Essentially, this file is crucial for setting up and controlling the Redis message broker within the overall system architecture.launch_redis_docker.sh - The script launches a Redis instance within a Docker container
- It utilizes a provided configuration file (redis.conf) and maps port 6379 for external access
- This facilitates the use of Redis as a message broker within the larger application architecture, enabling efficient data exchange between application components
- The containers ephemeral nature ensures clean resource management.mqtt
⦿ brokers.mqtt
File Name Summary launch_emqx_docker.sh - The script launches an EMQX MQTT broker instance within a Docker container
- It exposes several ports for various MQTT protocols, including the management UI, enabling communication and administration
- This facilitates message queuing and data exchange within the broader application architecture
- The script simplifies deployment and management of the broker.kafka
⦿ brokers.kafka
File Name Summary start.sh - The start.shscript initiates the Kafka broker within the projects dockerized environment
- It leverages docker-compose to manage the lifecycle of the Kafka containers, ensuring a clean startup by first stopping any existing instances and then starting them, removing any orphaned containers
- This script is crucial for deploying and managing the Kafka message broker infrastructure.docker-compose.yml TODO docker-compose-2.yml TODO dragonfly
⦿ brokers.dragonfly
File Name Summary start.sh TODO docker-compose.yml TODO amqp
⦿ brokers.amqp
File Name Summary launch_rabbitmq_docker.sh TODO docker-compose.yml TODO
examples
⦿ examplestopic_aggregator
⦿ examples.topic_aggregator
File Name Summary topic_merge.py TODO producers.py TODO test_rpc_deletion
⦿ examples.test_rpc_deletion
File Name Summary test.py TODO minimize_conns
⦿ examples.minimize_conns
File Name Summary wsubscriber.py TODO wpublisher.py TODO simple_rpc
⦿ examples.simple_rpc
File Name Summary simple_rpc_service.py TODO simple_rpc_client.py TODO simple_pubsub
⦿ examples.simple_pubsub
File Name Summary subscriber.py TODO publisher.py TODO simple_action
⦿ examples.simple_action
File Name Summary action_service.py TODO action_client.py TODO rpc_server
⦿ examples.rpc_server
File Name Summary rpc_server.py TODO client.py TODO ptopic_bridge
⦿ examples.ptopic_bridge
File Name Summary redis_pub.py TODO mqtt_sub.py TODO bridge.py TODO perf_test
⦿ examples.perf_test
File Name Summary perf_test.py TODO node_inherit
⦿ examples.node_inherit
File Name Summary node_inherit_example.py TODO node_decorators
⦿ examples.node_decorators
File Name Summary decor_node.py TODO client.py TODO node
⦿ examples.node
File Name Summary node_with_features.py TODO multitopic_publisher
⦿ examples.multitopic_publisher
File Name Summary psubscriber.py TODO mpublisher.py TODO endpoint_factory
⦿ examples.endpoint_factory
File Name Summary example.py TODO bridges
⦿ examples.bridges
File Name Summary typed_bridge_example.py TODO bridge_example.py TODO
commlib
⦿ commlib
File Name Summary utils.py TODO timer.py TODO tcp_proxy.py TODO serializer.py TODO rpc.py TODO pubsub.py TODO node.py TODO msg.py TODO exceptions.py TODO endpoints.py TODO connection.py TODO bridges.py TODO async_utils.py TODO aggregation.py TODO action.py TODO compression.py TODO
🚀 Getting Started
🔖 Prerequisites
This project requires the following dependencies:
- Programming Language: Python 3.7+
- Packages: Pydantic, orjson|ujson (Optional for fast json serialization), paho-mqtt (Optional for MQTT support), redis-py (Optional for Redis Support), pika (Optional for AMQP support)
- Package Manager: Pip, Poetry, Conda, Tox
🛠️ Installation
Build commlib-py from the source and install dependencies:
PyPi Releases
- Using pip
❯ pip install commlib-py
Or select version explicitly
❯ pip install commlib-py==0.11.5
- Using pipx
❯ pipx install commlib-py
From Source
-
Clone the repository:
❯ git clone git@github.com:robotics-4-all/commlib-py.git
-
Navigate to the project directory:
❯ cd commlib-py
-
Install the dependencies:
Using pip:
❯ pip install .
Using poetry:
❯ poetry install
Using conda:
❯ conda env create -f environment.yml
JSON Serialization
It is recommended to use a fast json library, such as orjson or ujson.
The framework will load and use the most performance optimal library based on installations, using the below priority:
⚙️ Concepts
Node
A Node is a software component that follows the Component-Port-Connector model. It has input and output ports for communicating with the world. Each port defines an endpoint and can be of the following types.
| Port Type | Description |
|---|---|
| Input Port | Subscriber: Listens for messages on a specific topic. |
| RPC Service: Provides a Remote Procedure Call (RPC) service for handling requests. | |
| Action Service: Executes long-running tasks and provides feedback during execution. | |
| RPC Server: Handles RPC requests and sends responses. | |
| Output Port | Publisher: Sends messages to a specific topic. |
| RPC Client: Sends RPC requests and waits for responses. | |
| Action Client: Initiates actions and receives feedback during execution. | |
| InOut Port | RPCBridge: Bridges RPC communication between two brokers. Directional. |
| TopicBridge: Bridges PubSub communication between two brokers. Directional. | |
| PTopicBridge: Bridges PubSub communication between two brokers based on a topic pattern. Directional. |
Furthermore, it implements several features:
- Publish Heartbeat messages in the background for as long as the node is active
- Provide control interfaces, to
startandstopthe execution of the Node - Provides methods to create endpoints and bind to Node ports.
from commlib.node import Node, TransportType
from commlib.msg import RPCMessage
## Import the Redis transports
## Imports are lazy handled internally
from commlib.transports.redis import ConnectionParameters
class AddTwoIntMessage(RPCMessage):
class Request(RPCMessage.Request):
a: int = 0
b: int = 0
class Response(RPCMessage.Response):
c: int = 0
def add_two_int_handler(msg):
print(f'On-Request: {msg}')
resp = AddTwoIntMessage.Response(c = msg.a + msg.b)
return resp
if __name__ == '__main__':
conn_params = ConnectionParameters()
node = Node(
node_name='add_two_ints_node',
connection_params=conn_params,
heartbeats=True,
heartbeat_uri='nodes.add_two_ints.heartbeat',
heartbeat_interval=10,
ctrl_services=True,
debug=False
)
rpc = node.create_rpc(
msg_type=AddTwoIntMessage,
rpc_name='add_two_ints_node.add_two_ints',
on_request=add_two_int_handler
)
node.run_forever(sleep_rate=1)
A Node always binds to a specific broker via where it provides input and output ports to the functionality. Of course, several Nodes can be created and executed in a single-process application.
Below is the list of currently supported interface/endpoint types and protocol transports.
| Interface Type | Description | Required Parameters | Supported transports |
|---|---|---|---|
| RPCClient | Sends RPC requests and waits for responses. | rpc_name, connection_params |
MQTT, Redis, AMQP, Kafka |
| RPCServer | Handles RPC requests and sends responses. | rpc_name, on_request, connection_params |
MQTT, Redis, AMQP, Kafka |
| Publisher | Sends messages to a specific topic. | topic, connection_params |
MQTT, Redis, AMQP, Kafka |
| Subscriber | Listens for messages on a specific topic. | topic, on_message, connection_params |
MQTT, Redis, AMQP, Kafka |
| MPublisher | Publishes messages to multiple topics. | connection_params |
MQTT, Redis, AMQP, Kafka |
| WPublisher | A wrapped publisher with additional features. | topic, connection_params |
MQTT, Redis |
| PSubscriber | Subscribes to topics using patterns. | topic_pattern, on_message, connection_params |
MQTT, Redis, AMQP, Kafka |
| WSubscriber | A wrapped subscriber with additional features. | topic, on_message, connection_params |
MQTT, Redis |
| ActionService | Provides preemptive services with feedback. | msg_type, action_name, on_goal, connection_params |
MQTT, Redis, AMQP |
| ActionClient | Sends goals to an action service and receives feedback. | msg_type, action_name, on_feedback, on_result, connection_params |
MQTT, Redis, AMQP |
Node class:
class Node:
def __init__(self,
node_name: Optional[str] = "",
connection_params: Optional[Any] = None,
debug: Optional[bool] = False,
heartbeats: Optional[bool] = True,
heartbeat_interval: Optional[float] = 10.0,
heartbeat_uri: Optional[str] = None,
compression: CompressionType = CompressionType.NO_COMPRESSION,
ctrl_services: Optional[bool] = False,
workers_rpc: Optional[int] = 4):
node_name(Optional[str]): The name of the node. Defaults to an empty string.connection_params(Optional[Any]): Connection parameters for the broker. Defaults toNone.debug(Optional[bool]): Enables debug mode if set toTrue. Defaults toFalse.heartbeats(Optional[bool]): Enables heartbeat messages if set toTrue. Defaults toTrue.heartbeat_interval(Optional[float]): Interval in seconds for sending heartbeat messages. Defaults to10.0.heartbeat_uri(Optional[str]): URI for publishing heartbeat messages. Defaults toNone.compression(CompressionType): Compression type for messages. Defaults toCompressionType.NO_COMPRESSION.ctrl_services(Optional[bool]): Enables control services (start/stop) if set toTrue. Defaults toFalse.workers_rpc(Optional[int]): Number of worker threads for handling RPC requests. Defaults to4.
Node methods to create and run Endpoints::
Node:
# Properties
endpoints: List
health: bool
state: NodeState
# Functions
create_subscriber(self, *args, **kwargs)
create_publisher(self, *args, **kwargs)
create_rpc(self, *args, **kwargs)
create_rpc_client(self, *args, **kwargs)
create_rpc_server(self, *args, **kwargs)
create_action(self, *args, **kwargs)
create_action_client(self, *args, **kwargs)
create_mpublisher(self, *args, **kwargs)
create_psubscriber(self, *args, **kwargs)
create_wpublisher(self, *args, **kwargs)
create_wsubscriber(self, *args, **kwargs)
run_forever(self, sleep_rate: float = 0.001)
run(self, wait: bool = True) -> None
stop(self)
Req/Resp - RPCs
This README provides an introduction to using Remote Procedure Calls (RPCs) with commlib-py.
RPCs allow you to execute functions or methods on a remote system as if they were local, enabling seamless communication between distributed systems. Commlib simplifies the implementation of RPCs by providing a high-level API for defining, invoking, and managing remote procedures.
With commlib, you can:
- Define RPC endpoints for exposing specific functionalities.
- Call remote procedures from clients with minimal setup.
- Handle asynchronous responses using callbacks.
The following simple example will walk you through the basic setup and usage of RPCs using commlib-py, helping you build scalable and efficient distributed applications.
Server Side Example
from commlib.msg import RPCMessage
from commlib.node import Node
from commlib.transports.mqtt import ConnectionParameters
class AddTwoIntMessage(RPCMessage):
class Request(RPCMessage.Request):
a: int = 0
b: int = 0
class Response(RPCMessage.Response):
c: int = 0
# Callback function of the add_two_ints RPC
def add_two_int_handler(msg) -> AddTwoIntMessage.Response:
print(f'Request Message: {msg.__dict__}')
# Create the Response msg
resp = AddTwoIntMessage.Response(c = msg.a + msg.b)
# Return the Response msg
return resp
if __name__ == '__main__':
conn_params = ConnectionParameters()
# Create a node instance
node = Node(node_name='add_two_ints_node',
connection_params=conn_params)
# Create the add_two_ints RPC
rpc = node.create_rpc(
msg_type=AddTwoIntMessage,
rpc_name='add_two_ints_node.add_two_ints',
on_request=add_two_int_handler
)
# Run the node.
node.run_forever(sleep_rate=1)
Client Side Example
import time
from commlib.msg import RPCMessage
from commlib.node import Node
from commlib.transports.mqtt import ConnectionParameters
class AddTwoIntMessage(RPCMessage):
class Request(RPCMessage.Request):
a: int = 0
b: int = 0
class Response(RPCMessage.Response):
c: int = 0
if __name__ == '__main__':
conn_params = ConnectionParameters()
# Create a node instance
node = Node(node_name='myclient', connection_params=conn_params)
# Create an RPC client
rpc = node.create_rpc_client(
msg_type=AddTwoIntMessage,
rpc_name='add_two_ints_node.add_two_ints'
)
# Run the node to instantiate communication handling resources
node.run()
# Create an instance of the request object
msg = AddTwoIntMessage.Request()
while True:
# returns AddTwoIntMessage.Response instance
resp = rpc.call(msg)
print(resp)
msg.a += 1
msg.b += 1
time.sleep(1)
Pub/Sub
Publish/Subscribe (PubSub) is a messaging pattern where senders (publishers) send messages to a topic without knowing the recipients (subscribers). Subscribers express interest in specific topics and receive messages published to those topics. This decouples the producers and consumers, enabling scalable and flexible communication.
With commlib-py, implementing PubSub is straightforward. The library provides high-level abstractions for creating publishers and subscribers, allowing you to focus on your application's logic rather than the underlying transport mechanisms. You can define custom message types, publish data to topics, and handle incoming messages with ease.
Key features of PubSub in commlib-py:
- Topic-based Communication: Messages are categorized by topics, enabling selective subscription.
- Broker Agnostic: Supports multiple brokers like MQTT, Redis, and AMQP.
- Typed Message-driven Communication: Define structured messages using Python classes.
- Ease of Use: Simplified APIs for creating publishers and subscribers.
This section will guide you through creating a simple publisher and subscriber using commlib-py. You'll learn how to define message types, publish data, and process received messages effectively.
Write a Simple Publisher
from commlib.msg import MessageHeader, PubSubMessage
from commlib.node import Node
from commlib.transports.mqtt import ConnectionParameters
class SonarMessage(PubSubMessage):
header: MessageHeader = MessageHeader()
range: float = -1
hfov: float = 30.6
vfov: float = 14.2
class SonarMessage(PubSubMessage):
distance: float = 0.001
horizontal_fov: float = 30.0
vertical_fov: float = 14.0
if __name__ == "__main__":
conn_params = ConnectionParameters(host='localhost', port=1883)
node = Node(node_name='sensors.sonar.front', connection_params=conn_params)
pub = node.create_publisher(msg_type=SonarMessage, topic='sensors.sonar.front')
node.run()
msg = SonarMessage()
while True:
pub.publish(msg)
msg.range += 1
time.sleep(1)
Write a Simple Subscriber
#!/usr/bin/env python
import time
from commlib.msg import MessageHeader, PubSubMessage
from commlib.node import Node
from commlib.transports.mqtt import ConnectionParameters
class SonarMessage(PubSubMessage):
header: MessageHeader = MessageHeader()
range: float = -1
hfov: float = 30.6
vfov: float = 14.2
def on_message(msg):
print(f'Received front sonar data: {msg}')
if __name__ == '__main__':
conn_params = ConnectionParameters()
node = Node(node_name='node.obstacle_avoidance', connection_params=conn_params)
node.create_subscriber(msg_type=SonarMessage,
topic='sensors.sonar.front',
on_message=on_message) # Define a callback function
node.run_forever(sleep_rate=1) # Define a process-level sleep rate in hz
Wildcard Subscriptions
For pattern-based topic subscriptions using wildcards one can also use the PSubscriber class directly.
For multi-topic publisher one can also use the MPublisher class directly.
from commlib.node import Node
from commlib.transports.mqtt import ConnectionParameters
def on_msg_callback(msg, topic):
print(f'Message at topic <{topic}>: {msg}')
if __name__ == '__main__':
conn_params = ConnectionParameters()
node = Node(node_name='wildcard_subscription_example',
connection_params=conn_params)
# Create a pattern subscriber
node.create_psubscriber(topic='topic.*', on_message=on_msg_callback)
# Create a multi-topic publisher instance.
pub = node.create_mpublisher()
node.run(wait=True)
topicA = 'topic.a'
topicB = 'topic.b'
while True:
pub.publish({'a': 1}, topicA)
pub.publish({'b': 1}, topicB)
time.sleep(1)
Topic Notation Conversion
Commlib uses a unified topic notation standard across all supported protocols. This section explains how to convert topics between different protocol formats and between the unified notation and protocol-specific formats.
Unified Topic Notation
The commlib unified topic notation uses:
- Separator:
.(dot) - Wildcard:
*(asterisk) - matches any segment or multiple segments - Format:
a.b.c.d
Examples:
sensors.temperature- specific topicsensors.*.temperature- wildcard for single segmentsensors.*- wildcard for multiple segments (catch-all)
Protocol-Specific Formats
Different message brokers use different topic notations:
| Protocol | Format | Separator | Wildcard |
|---|---|---|---|
| Commlib (unified) | a.b.c |
. |
* |
| MQTT | a/b/c |
/ |
+ (single), # (multi) |
| Redis | a.b.c |
. |
* |
| AMQP | a.b.c |
. |
*, # |
| Kafka | a-b-c |
- |
* |
Topic Conversion Functions
The commlib.utils module provides functions for converting topics between different protocols:
from commlib.utils import (
convert_topic_notation,
topic_to_mqtt, topic_from_mqtt,
topic_to_redis, topic_from_redis,
topic_to_kafka, topic_from_kafka,
topic_to_amqp, topic_from_amqp,
)
# Convert from MQTT to commlib unified notation
mqtt_topic = "sensors/+/temperature"
commlib_topic = topic_from_mqtt(mqtt_topic)
# Result: "sensors.*.temperature"
# Convert from commlib to MQTT
commlib_topic = "sensors.*.temperature"
mqtt_topic = topic_to_mqtt(commlib_topic)
# Result: "sensors/+/temperature"
# Convert from Kafka to MQTT
kafka_topic = "sensors-temperature"
mqtt_topic = convert_topic_notation(kafka_topic, "kafka", "mqtt")
# Result: "sensors/temperature"
# Convert from commlib to Kafka
commlib_topic = "building.floor.room.sensor"
kafka_topic = convert_topic_notation(commlib_topic, "commlib", "kafka")
# Result: "building-floor-room-sensor"
Available Conversion Functions
Direct Protocol Conversions:
topic_to_mqtt(topic)- Convert from commlib to MQTT formattopic_from_mqtt(topic)- Convert from MQTT format to commlibtopic_to_redis(topic)- Convert from commlib to Redis formattopic_from_redis(topic)- Convert from Redis format to commlibtopic_to_kafka(topic)- Convert from commlib to Kafka formattopic_from_kafka(topic)- Convert from Kafka format to commlibtopic_to_amqp(topic)- Convert from commlib to AMQP formattopic_from_amqp(topic)- Convert from AMQP format to commlib
Unified Conversion:
convert_topic_notation(topic, from_protocol, to_protocol)- Convert between any two protocols
Supported protocol names: "commlib", "mqtt", "redis", "amqp", "kafka"
Real-World Examples
from commlib.utils import convert_topic_notation
# IoT sensor hierarchy
mqtt_hierarchy = "home/+/sensors/+/temperature"
commlib_hierarchy = convert_topic_notation(mqtt_hierarchy, "mqtt", "commlib")
# Result: "home.*.sensors.*.temperature"
# Event stream conversion
mqtt_events = "events/system/+/logs"
redis_events = convert_topic_notation(mqtt_events, "mqtt", "redis")
# Result: "events.system.*.logs"
# RPC namespace conversion
kafka_rpc = "rpc-service-request"
mqtt_rpc = convert_topic_notation(kafka_rpc, "kafka", "mqtt")
# Result: "rpc/service/request"
# Catch-all topic
mqtt_all = "#"
commlib_all = convert_topic_notation(mqtt_all, "mqtt", "commlib")
# Result: "*"
Preemptive Services with Feedback (Actions)
Actions are pre-emptive services with support for asynchronous feedback publishing. This communication pattern is used to implement services which can be stopped and can provide feedback data, such as the move command service of a robot.
The example below shows how to use Actions to implement preemptive robot motion services, with integrated feedback functionality. In this example we implement the MoveByDistance command service for our custom Lab robot. The message structure (MoveByDistanceMsg) is defined by the Goal, Result and Feedback classes. The logic is that we set a distance goal (in cm) and we monitor the progress through the feedback channel, which for commlib-py it is automatically captured.
Write an Action Service
import time
from commlib.action import GoalStatus
from commlib.msg import ActionMessage
from commlib.transports.redis import ConnectionParameters
)
class MoveByDistanceMsg(ActionMessage):
class Goal(ActionMessage.Goal):
target_cm: int = 0
class Result(ActionMessage.Result):
dest_cm: int = 0
class Feedback(ActionMessage.Feedback):
current_cm: int = 0
def on_goal_request(goal_h):
c = 0
goal_req_data = goal_h.data # Retrieve goal request data from goal handler
res = MoveByDistanceMsg.Result() # Goal Result Message
while c < goal_h.data.target_cm:
if goal_h.cancel_event.is_set(): # Use the cancel_event property of goal handler instance
break
goal_h.send_feedback(MoveByDistanceMsg.Feedback(current_cm=c))
c += 1
time.sleep(1)
res.dest_cm = c
return res
if __name__ == '__main__':
action_name = 'myrobot.move.distance'
conn_params = ConnectionParameters()
node = Node(node_name='myrobot.node.motion', # A node can provide several motion interfaces
connection_params=conn_params)
node.create_action(msg_type=MoveByDistanceMsg,
action_name=action_name,
on_goal=on_goal_request)
node.run_forever()
Write an Action Client
import time
from commlib.action import GoalStatus
from commlib.msg import ActionMessage
from commlib.transports.redis import ActionClient, ConnectionParameters
class MoveByDistanceMsg(ActionMessage):
class Goal(ActionMessage.Goal):
target_cm: int = 0
class Result(ActionMessage.Result):
dest_cm: int = 0
class Feedback(ActionMessage.Feedback):
current_cm: int = 0
def on_feedback(feedback):
print(f'ActionClient <on-feedback> callback: {feedback}')
def on_result(result):
print(f'ActionClient <on-result> callback: {result}')
def on_goal_reached(result):
print(f'ActionClient <on-goal-reached> callback: {result}')
if __name__ == '__main__':
action_name = 'testaction'
conn_params = ConnectionParameters()
node = Node(node_name='action_client_example_node',
connection_params=conn_params)
action_client = node.create_action_client(
msg_type=MoveByDistanceMsg,
action_name=action_name,
on_goal_reached=on_goal_reached,
on_feedback=on_feedback,
on_result=on_result
)
node.run()
goal_msg = MoveByDistanceMsg.Goal(target_cm=5)
action_client.send_goal(goal_msg)
resp = action_client.get_result(wait=True)
print(f'Action Result: {resp}')
node.stop()
🏗️ Advanced
Endpoints (Low-level API)
It is possible to construct endpoints without binding them to a specific Node. This is a feature to support a wider range of applications, where the concept Node might not be usable.
One can create endpoint instances by using the following classes of each supported transport:
| Endpoint Type | Description | Required Parameters | Supported Protocols |
|---|---|---|---|
| RPCClient | Sends RPC requests and waits for responses. | msg_type, rpc_name, connection_params |
MQTT, Redis, AMQP, Kafka |
| RPCServer | Handles RPC requests and sends responses. | msg_type, rpc_name, on_request, connection_params |
MQTT, Redis, AMQP, Kafka |
| Publisher | Sends messages to a specific topic. | msg_type, topic, connection_params |
MQTT, Redis, AMQP, Kafka |
| Subscriber | Listens for messages on a specific topic. | msg_type, topic, on_message, connection_params |
MQTT, Redis, AMQP, Kafka |
| MPublisher | Publishes messages to multiple topics. | connection_params |
MQTT, Redis, AMQP, Kafka |
| WPublisher | A wrapped publisher with additional features. | msg_type, topic, connection_params |
MQTT, Redis |
| PSubscriber | Subscribes to topics using patterns. | topic_pattern, on_message, connection_params |
MQTT, Redis, AMQP, Kafka |
| WSubscriber | A wrapped subscriber with additional features. | msg_type, topic, on_message, connection_params |
MQTT, Redis |
| ActionService | Provides preemptive services with feedback. | msg_type, action_name, on_goal, connection_params |
MQTT, Redis, AMQP |
| ActionClient | Sends goals to an action service and receives feedback. | msg_type, action_name, on_feedback, on_result, connection_params |
MQTT, Redis, AMQP |
from commlib.transports.redis import RPCService
from commlib.transports.amqp import Subscriber
from commlib.transports.mqtt import Publisher, RPCClient
...
Or use the endpoint_factory to construct endpoints.
import time
from commlib.endpoints import endpoint_factory, EndpointType, TransportType
def callback(data):
print(data)
if __name__ == '__main__':
topic = 'endpoints_factory_example'
mqtt_sub = endpoint_factory(
EndpointType.Subscriber,
TransportType.MQTT)(topic=topic, on_message=callback)
mqtt_sub.run()
mqtt_pub = endpoint_factory(
EndpointType.Publisher,
TransportType.MQTT)(topic=topic, debug=True)
mqtt_pub.run()
data = {'a': 1, 'b': 2}
while True:
mqtt_pub.publish(data)
time.sleep(1)
B2B bridges
In the context of IoT and CPS, it is a common requirement to bridge messages between message brokers, based on application-specific rules. An example is to bridge analytics (preprocessed) data from the Edge to the Cloud. And what happens if the brokers use different communication protocols?
Commlib builds a thin layer on top of the internal PubSub and RPC API to provide a protocol-agnostic implementation of Broker-to-Broker bridges.
Below are examples of:
- A Redis-to-MQTT RPC Bridge
- A Redis-to-MQTT Topic Bridge.
#!/usr/bin/env python
import time
import commlib.transports.amqp as acomm
import commlib.transports.redis as rcomm
import commlib.transports.mqtt as mcomm
from commlib.bridges import (
RPCBridge, RPCBridgeType, TopicBridge, TopicBridgeType
)
def redis_to_mqtt_rpc_bridge():
"""
[RPC Client] ----> [Broker A] ------> [Broker B] ---> [RPC Service]
"""
bA_params = rcomm.ConnectionParameters()
bB_params = mcomm.ConnectionParameters()
bA_uri = 'ops.start_navigation'
bB_uri = 'thing.robotA.ops.start_navigation'
br = RPCBridge(from_uri=bA_uri, to_uri=bB_uri,
from_broker_params=bA_params,
to_broker_params=bB_params,
debug=False)
br.run()
def redis_to_mqtt_topic_bridge():
"""
[Producer Endpoint] ---> [Broker A] ---> [Broker B] ---> [Consumer Endpoint]
"""
bA_params = rcomm.ConnectionParameters()
bB_params = mcomm.ConnectionParameters()
bA_uri = 'sonar.front'
bB_uri = 'thing.robotA.sensors.sonar.font'
br = TopicBridge(from_uri=bA_uri, to_uri=bB_uri,
from_broker_params=bA_params,
to_broker_params=bB_params,
debug=False)
br.run()
if __name__ == '__main__':
redis_to_mqtt_rpc_bridge()
redis_to_mqtt_topic_bridge()
The Base Bridge Class is inherited by the RPCBridge, TopicBridge and PTopicBridge classes.
class Bridge:
"""Bridge.
Base Bridge Class.
"""
def __init__(self,
from_uri: str,
to_uri: str,
from_broker_params: BaseConnectionParameters,
to_broker_params: BaseConnectionParameters,
auto_transform_uris: bool = True,
debug: bool = False):
The TopicBridge and RPCBridge classes have an extra argument to define the message type of the communication channel.
class RPCBridge(Bridge):
def __init__(self, msg_type: RPCMessage = None, *args, **kwargs):
...
class TopicBridge(Bridge):
def __init__(self, msg_type: PubSubMessage = None, *args, **kwargs):
...
Finally, the PTopicBridge is used for bridging topics using wildcards.
class PTopicBridge(Bridge):
"""PTopicBridge.
Args:
msg_type (PubSubMessage): The message type to be used for the subscriber and publisher.
uri_transform (List): A list of tuples containing the from and to strings for transforming the topic URIs.
The constructor determines the type of the topic bridge based on the types of
the from and to broker parameters. It then creates the subscriber and publisher
endpoints using the appropriate endpoint factory functions.
"""
def __init__(self,
msg_type: PubSubMessage = None,
uri_transform: List = [],
*args,
**kwargs):
A Pattern-based Topic Bridge (PTopicBridge) example is also shown below. In this example, we use static definition of messages (SonarMessage), also referred as typed communication.
#!/usr/bin/env python
import time
from commlib.msg import PubSubMessage
from commlib.bridges import PTopicBridge
import commlib.transports.amqp as acomm
import commlib.transports.redis as rcomm
class SonarMessage(PubSubMessage):
distance: float = 0.001
horizontal_fov: float = 30.0
vertical_fov: float = 14.0
if __name__ == '__main__':
"""
[Broker A] ------------> [Broker B] ---> [Consumer Endpoint]
"""
bA_uri = 'sensors.*'
bB_namespace = 'myrobot'
bA_params = rcomm.ConnectionParameters()
bB_params = mcomm.ConnectionParameters()
br = PTopicBridge(bA_uri,
bB_namespace,
bA_params,
bB_params,
msg_type=SonarMessage,
debug=False)
br.run()
TCP Bridge
TCP bridge forwards tcp packages between two endpoints:
[Client] -------> [TCPBridge, port=xxxx] ---------> [TCP endpoint, port=xxxx]
A one-to-one connection is performed between the bridge and the endpoint.
REST Proxy
Implements a REST proxy, that enables invocation of REST services via message brokers. The proxy uses an RPCService to run the broker endpoint and an http client for calling REST services. An RPC call is transformed into proper, REST-compliant, http request, based on the input parameters.
Responses from the REST services have the following RESTProxyMessage schema:
class RESTProxyMessage(RPCMessage):
class Request(RPCMessage.Request):
base_url: str
path: str = '/'
verb: str = 'GET'
query_params: Dict[str, Any] = {}
path_params: Dict[str, Any] = {}
body_params: Dict[str, Any] = {}
headers: Dict[str, Any] = {}
class Response(RPCMessage.Response):
data: Union[str, Dict, int]
headers: Dict[str, Any]
status_code: int = 200
Head to the commlib-rest-proxy repo for a dockerized application that implements a REST Proxy using commlib-py.
Web Gateway
Head to the commlib-web-gw repo for a dockerized application that implements a Web Gateway for message brokers, using commlib-py.
🤖 Examples
The examples/ directory of this repository includes various usage and application examples of using commlib-py.
🧪 Testing
Commlib-py uses the pytest framework. Run the test suite with:
Using make (Makefile included in this repo):
make test
Using pip:
pytest
Using conda:
conda activate {venv}
pytest
Furthermore, the Makefile provides coverage commands:
Coverage
make cov
Coverage Diff
make diff
🎞️ Roadmap
-
Task 1: Protocol-agnostic architecture -
Task 2: Support the AMQP and MQTT protocols -
Task 3: Support Redis protocol -
Task 4: Support Kafka protocol (Under development / Partial Support) -
Task 5: RPCServer implementation for AMQP and Kafka transports -
Task 6: Comprehensive testing
🤝 Contributing
- 💬 Join the Discussions: Share your insights, provide feedback, or ask questions.
- 🐛 Report Issues: Submit bugs found or log feature requests for the
commlib-py-2project. - 💡 Submit Pull Requests: Review open PRs, and submit your own PRs.
Contributing Guidelines
- Fork the Repository: Start by forking the project repository to your LOCAL account.
- Clone Locally: Clone the forked repository to your local machine using a git client.
git clone https://github.com/{FORKED_ACCOUNT}/commlib-py.git
- Create a New Branch: Always work on a new branch, giving it a descriptive name.
git checkout -b new-feature-x
- Make Your Changes: Develop and test your changes locally.
- Commit Your Changes: Commit with a clear message describing your updates.
git commit -m 'Implemented new feature x.'
- Push to LOCAL: Push the changes to your forked repository.
git push origin new-feature-x
- Submit a Pull Request: Create a PR against the original project repository. Clearly describe the changes and their motivations.
- Review: Once your PR is reviewed and approved, it will be merged into the main branch. Congratulations on your contribution!
📜 License
Commlib-py is protected under the MIT License. For more details, refer to the MIT LICENSE uri.
🎩 Acknowledgments
- Credit
contributors,inspiration,references, etc.
🌟 Star History
Project details
Release history Release notifications | RSS feed
Download files
Download the file for your platform. If you're not sure which to choose, learn more about installing packages.
Source Distribution
Built Distribution
Filter files by name, interpreter, ABI, and platform.
If you're not sure about the file name format, learn more about wheel file names.
Copy a direct link to the current filters
File details
Details for the file commlib_py-0.13.1.tar.gz.
File metadata
- Download URL: commlib_py-0.13.1.tar.gz
- Upload date:
- Size: 114.9 kB
- Tags: Source
- Uploaded using Trusted Publishing? No
- Uploaded via: twine/6.2.0 CPython/3.12.12
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
be4b18aca4bdcd3368c9c8512ff1ff33cd4ef022401147e4c3805a7b91c31393
|
|
| MD5 |
67946b00e5469fa50d3c97edf06d5b12
|
|
| BLAKE2b-256 |
28cbb71481b4574ba3dc4fb76f211e583ef3421f91765d4e109a4d37384cb894
|
File details
Details for the file commlib_py-0.13.1-py3-none-any.whl.
File metadata
- Download URL: commlib_py-0.13.1-py3-none-any.whl
- Upload date:
- Size: 80.3 kB
- Tags: Python 3
- Uploaded using Trusted Publishing? No
- Uploaded via: twine/6.2.0 CPython/3.12.12
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
58874ef0cc264a0baec725e34dd09f1e8607558ad55a05778ed27be169d3ecd9
|
|
| MD5 |
2522461b8279d6b21eff0870866332dd
|
|
| BLAKE2b-256 |
0eea4d3f4b5d339717596c039fc80e545f007784b946abe2db2f03de466a2e3a
|