Skip to main content

Communication Library for Python implementing the most common communication patterns for CyberPhysical Systems.

Project description

image

PyPI version Downloads Downloads Downloads

Built with the tools and technologies:

Redis MQTT RabbitMQ Kafka
Anaconda Pytest Docker Python Pydantic


📜 Table of Contents


📖 Overview

Commlib is a Domain-specific Language for communication and messaging in Cyber-Physical Systems. Can be used for rapid development of the communication layer on-device, at the Edge and on the Cloud, or using a mixed multi-level multi-broker schema.

The goal of this project is to implement a simple Protocol-agnostic API (AMQP, Kafka, Redis, MQTT, etc) for common communication patterns in the context of Cyber-Physical Systems, using message broker technologies. Such patterns include PubSub, RPC and Preemptive Services (aka Actions), among others.


👾 Features

Feature Summary
⚙️ Protocol-Agnostic
  • Protocol/Transport-level abstraction
  • Currently supports Redis, AMQP, MQTT and Kafka
📄 Documentation
  • Rich documentation in various formats (YAML, TOML, Markdown)
  • Includes detailed installation commands for different package managers
  • Utilizes MkDocs for generating documentation
🧩 Modularity
  • Well-structured codebase with clear separation of concerns
  • Encourages code reusability and maintainability
⚡️ Performance
  • Optimized code for efficiency
  • Scalable architecture for handling high loads
📦 Dependencies
  • Includes a variety of libraries for different functionalities
  • Dependency management with conda for environment setup
  • Dynamic imports of underlying transport libraries

📜 Project Index

brokers
⦿ brokers
redis
⦿ brokers.redis
File Name Summary
redis.conf - The redis.conf file in the brokers/redis directory configures the Redis server instance used within the larger application
- It dictates settings such as memory allocation and potentially includes other configuration files for customized server behavior
- Essentially, this file is crucial for setting up and controlling the Redis message broker within the overall system architecture.
launch_redis_docker.sh - The script launches a Redis instance within a Docker container
- It utilizes a provided configuration file (redis.conf) and maps port 6379 for external access
- This facilitates the use of Redis as a message broker within the larger application architecture, enabling efficient data exchange between application components
- The containers ephemeral nature ensures clean resource management.
mqtt
⦿ brokers.mqtt
File Name Summary
launch_emqx_docker.sh - The script launches an EMQX MQTT broker instance within a Docker container
- It exposes several ports for various MQTT protocols, including the management UI, enabling communication and administration
- This facilitates message queuing and data exchange within the broader application architecture
- The script simplifies deployment and management of the broker.
kafka
⦿ brokers.kafka
File Name Summary
start.sh - The start.sh script initiates the Kafka broker within the projects dockerized environment
- It leverages docker-compose to manage the lifecycle of the Kafka containers, ensuring a clean startup by first stopping any existing instances and then starting them, removing any orphaned containers
- This script is crucial for deploying and managing the Kafka message broker infrastructure.
docker-compose.yml TODO
docker-compose-2.yml TODO
dragonfly
⦿ brokers.dragonfly
File Name Summary
start.sh TODO
docker-compose.yml TODO
amqp
⦿ brokers.amqp
File Name Summary
launch_rabbitmq_docker.sh TODO
docker-compose.yml TODO
examples
⦿ examples
topic_aggregator
⦿ examples.topic_aggregator
File Name Summary
topic_merge.py TODO
producers.py TODO
test_rpc_deletion
⦿ examples.test_rpc_deletion
File Name Summary
test.py TODO
minimize_conns
⦿ examples.minimize_conns
File Name Summary
wsubscriber.py TODO
wpublisher.py TODO
simple_rpc
⦿ examples.simple_rpc
File Name Summary
simple_rpc_service.py TODO
simple_rpc_client.py TODO
simple_pubsub
⦿ examples.simple_pubsub
File Name Summary
subscriber.py TODO
publisher.py TODO
simple_action
⦿ examples.simple_action
File Name Summary
action_service.py TODO
action_client.py TODO
rpc_server
⦿ examples.rpc_server
File Name Summary
rpc_server.py TODO
client.py TODO
ptopic_bridge
⦿ examples.ptopic_bridge
File Name Summary
redis_pub.py TODO
mqtt_sub.py TODO
bridge.py TODO
perf_test
⦿ examples.perf_test
File Name Summary
perf_test.py TODO
node_inherit
⦿ examples.node_inherit
File Name Summary
node_inherit_example.py TODO
node_decorators
⦿ examples.node_decorators
File Name Summary
decor_node.py TODO
client.py TODO
node
⦿ examples.node
File Name Summary
node_with_features.py TODO
multitopic_publisher
⦿ examples.multitopic_publisher
File Name Summary
psubscriber.py TODO
mpublisher.py TODO
endpoint_factory
⦿ examples.endpoint_factory
File Name Summary
example.py TODO
bridges
⦿ examples.bridges
File Name Summary
typed_bridge_example.py TODO
bridge_example.py TODO
commlib
⦿ commlib
File Name Summary
utils.py TODO
timer.py TODO
tcp_proxy.py TODO
serializer.py TODO
rpc.py TODO
pubsub.py TODO
node.py TODO
msg.py TODO
exceptions.py TODO
endpoints.py TODO
connection.py TODO
bridges.py TODO
async_utils.py TODO
aggregation.py TODO
action.py TODO
compression.py TODO
transports
⦿ commlib.transports
File Name Summary
redis.py TODO
mqtt.py TODO
mock.py TODO
kafka.py TODO
base_transport.py TODO
amqp.py TODO

🚀 Getting Started

🔖 Prerequisites

This project requires the following dependencies:

  • Programming Language: Python 3.7+
  • Packages: Pydantic, orjson|ujson (Optional for fast json serialization), paho-mqtt (Optional for MQTT support), redis-py (Optional for Redis Support), pika (Optional for AMQP support)
  • Package Manager: Pip, Poetry, Conda, Tox

🛠️ Installation

Build commlib-py from the source and install dependencies:

PyPi Releases

  1. Using pip
 pip install commlib-py

Or select version explicitly

 pip install commlib-py==0.11.5
  1. Using pipx
 pipx install commlib-py

From Source

  1. Clone the repository:

     git clone git@github.com:robotics-4-all/commlib-py.git
    
  2. Navigate to the project directory:

     cd commlib-py
    
  3. Install the dependencies:

Using pip:

 pip install .

Using poetry:

 poetry install

Using conda:

 conda env create -f environment.yml

JSON Serialization

It is recommended to use a fast json library, such as orjson or ujson.

The framework will load and use the most performance optimal library based on installations, using the below priority:

⚙️ Concepts

Node

A Node is a software component that follows the Component-Port-Connector model. It has input and output ports for communicating with the world. Each port defines an endpoint and can be of the following types.

Port Type Description
Input Port Subscriber: Listens for messages on a specific topic.
RPC Service: Provides a Remote Procedure Call (RPC) service for handling requests.
Action Service: Executes long-running tasks and provides feedback during execution.
RPC Server: Handles RPC requests and sends responses.
Output Port Publisher: Sends messages to a specific topic.
RPC Client: Sends RPC requests and waits for responses.
Action Client: Initiates actions and receives feedback during execution.
InOut Port RPCBridge: Bridges RPC communication between two brokers. Directional.
TopicBridge: Bridges PubSub communication between two brokers. Directional.
PTopicBridge: Bridges PubSub communication between two brokers based on a topic pattern. Directional.

Furthermore, it implements several features:

  • Publish Heartbeat messages in the background for as long as the node is active
  • Provide control interfaces, to start and stop the execution of the Node
  • Provides methods to create endpoints and bind to Node ports.
from commlib.node import Node, TransportType
from commlib.msg import RPCMessage
## Import the Redis transports
## Imports are lazy handled internally
from commlib.transports.redis import ConnectionParameters

class AddTwoIntMessage(RPCMessage):
    class Request(RPCMessage.Request):
        a: int = 0
        b: int = 0

    class Response(RPCMessage.Response):
        c: int = 0

def add_two_int_handler(msg):
    print(f'On-Request: {msg}')
    resp = AddTwoIntMessage.Response(c = msg.a + msg.b)
    return resp

if __name__ == '__main__':
	conn_params = ConnectionParameters()
	node = Node(
		node_name='add_two_ints_node',
		connection_params=conn_params,
		heartbeats=True,
		heartbeat_uri='nodes.add_two_ints.heartbeat',
		heartbeat_interval=10,
		ctrl_services=True,
		debug=False
	)
	rpc = node.create_rpc(
		msg_type=AddTwoIntMessage,
		rpc_name='add_two_ints_node.add_two_ints',
		on_request=add_two_int_handler
	)
	node.run_forever(sleep_rate=1)

A Node always binds to a specific broker via where it provides input and output ports to the functionality. Of course, several Nodes can be created and executed in a single-process application.

Below is the list of currently supported interface/endpoint types and protocol transports.

Interface Type Description Required Parameters Supported transports
RPCClient Sends RPC requests and waits for responses. rpc_name, connection_params MQTT, Redis, AMQP, Kafka
RPCServer Handles RPC requests and sends responses. rpc_name, on_request, connection_params MQTT, Redis, AMQP, Kafka
Publisher Sends messages to a specific topic. topic, connection_params MQTT, Redis, AMQP, Kafka
Subscriber Listens for messages on a specific topic. topic, on_message, connection_params MQTT, Redis, AMQP, Kafka
MPublisher Publishes messages to multiple topics. connection_params MQTT, Redis, AMQP, Kafka
WPublisher A wrapped publisher with additional features. topic, connection_params MQTT, Redis
PSubscriber Subscribes to topics using patterns. topic_pattern, on_message, connection_params MQTT, Redis, AMQP, Kafka
WSubscriber A wrapped subscriber with additional features. topic, on_message, connection_params MQTT, Redis
ActionService Provides preemptive services with feedback. msg_type, action_name, on_goal, connection_params MQTT, Redis, AMQP
ActionClient Sends goals to an action service and receives feedback. msg_type, action_name, on_feedback, on_result, connection_params MQTT, Redis, AMQP

Node class:

class Node:

    def __init__(self,
                 node_name: Optional[str] = "",
                 connection_params: Optional[Any] = None,
                 debug: Optional[bool] = False,
                 heartbeats: Optional[bool] = True,
                 heartbeat_interval: Optional[float] = 10.0,
                 heartbeat_uri: Optional[str] = None,
                 compression: CompressionType = CompressionType.NO_COMPRESSION,
                 ctrl_services: Optional[bool] = False,
                 workers_rpc: Optional[int] = 4):
  • node_name (Optional[str]): The name of the node. Defaults to an empty string.
  • connection_params (Optional[Any]): Connection parameters for the broker. Defaults to None.
  • debug (Optional[bool]): Enables debug mode if set to True. Defaults to False.
  • heartbeats (Optional[bool]): Enables heartbeat messages if set to True. Defaults to True.
  • heartbeat_interval (Optional[float]): Interval in seconds for sending heartbeat messages. Defaults to 10.0.
  • heartbeat_uri (Optional[str]): URI for publishing heartbeat messages. Defaults to None.
  • compression (CompressionType): Compression type for messages. Defaults to CompressionType.NO_COMPRESSION.
  • ctrl_services (Optional[bool]): Enables control services (start/stop) if set to True. Defaults to False.
  • workers_rpc (Optional[int]): Number of worker threads for handling RPC requests. Defaults to 4.

Node methods to create and run Endpoints::

Node:
	# Properties
	endpoints: List
	health: bool
	state: NodeState

	# Functions
	create_subscriber(self, *args, **kwargs)
	create_publisher(self, *args, **kwargs)
	create_rpc(self, *args, **kwargs)
   	create_rpc_client(self, *args, **kwargs)
	create_rpc_server(self, *args, **kwargs)
   	create_action(self, *args, **kwargs)
   	create_action_client(self, *args, **kwargs)
   	create_mpublisher(self, *args, **kwargs)
   	create_psubscriber(self, *args, **kwargs)
	create_wpublisher(self, *args, **kwargs)
   	create_wsubscriber(self, *args, **kwargs)
   	run_forever(self, sleep_rate: float = 0.001)
   	run(self, wait: bool = True) -> None
   	stop(self)

Req/Resp - RPCs

This README provides an introduction to using Remote Procedure Calls (RPCs) with commlib-py. RPCs allow you to execute functions or methods on a remote system as if they were local, enabling seamless communication between distributed systems. Commlib simplifies the implementation of RPCs by providing a high-level API for defining, invoking, and managing remote procedures.

With commlib, you can:

  • Define RPC endpoints for exposing specific functionalities.
  • Call remote procedures from clients with minimal setup.
  • Handle asynchronous responses using callbacks.

The following simple example will walk you through the basic setup and usage of RPCs using commlib-py, helping you build scalable and efficient distributed applications.

Server Side Example
from commlib.msg import RPCMessage
from commlib.node import Node
from commlib.transports.mqtt import ConnectionParameters

class AddTwoIntMessage(RPCMessage):
    class Request(RPCMessage.Request):
        a: int = 0
        b: int = 0

    class Response(RPCMessage.Response):
        c: int = 0

# Callback function of the add_two_ints RPC
def add_two_int_handler(msg) -> AddTwoIntMessage.Response:
    print(f'Request Message: {msg.__dict__}')
	# Create the Response msg
    resp = AddTwoIntMessage.Response(c = msg.a + msg.b)
	# Return the Response msg
    return resp

if __name__ == '__main__':
    conn_params = ConnectionParameters()
	# Create a node instance
    node = Node(node_name='add_two_ints_node',
        		connection_params=conn_params)
	# Create the add_two_ints RPC
    rpc = node.create_rpc(
        msg_type=AddTwoIntMessage,
        rpc_name='add_two_ints_node.add_two_ints',
        on_request=add_two_int_handler
    )
	# Run the node.
    node.run_forever(sleep_rate=1)
Client Side Example
import time

from commlib.msg import RPCMessage
from commlib.node import Node
from commlib.transports.mqtt import ConnectionParameters

class AddTwoIntMessage(RPCMessage):
    class Request(RPCMessage.Request):
        a: int = 0
        b: int = 0

    class Response(RPCMessage.Response):
        c: int = 0

if __name__ == '__main__':
    conn_params = ConnectionParameters()
	# Create a node instance
    node = Node(node_name='myclient', connection_params=conn_params)
	# Create an RPC client
    rpc = node.create_rpc_client(
        msg_type=AddTwoIntMessage,
        rpc_name='add_two_ints_node.add_two_ints'
    )
	# Run the node to instantiate communication handling resources
    node.run()

    # Create an instance of the request object
    msg = AddTwoIntMessage.Request()
    while True:
        # returns AddTwoIntMessage.Response instance
        resp = rpc.call(msg)
        print(resp)
        msg.a += 1
        msg.b += 1
        time.sleep(1)

Pub/Sub

Publish/Subscribe (PubSub) is a messaging pattern where senders (publishers) send messages to a topic without knowing the recipients (subscribers). Subscribers express interest in specific topics and receive messages published to those topics. This decouples the producers and consumers, enabling scalable and flexible communication.

With commlib-py, implementing PubSub is straightforward. The library provides high-level abstractions for creating publishers and subscribers, allowing you to focus on your application's logic rather than the underlying transport mechanisms. You can define custom message types, publish data to topics, and handle incoming messages with ease.

Key features of PubSub in commlib-py:

  • Topic-based Communication: Messages are categorized by topics, enabling selective subscription.
  • Broker Agnostic: Supports multiple brokers like MQTT, Redis, and AMQP.
  • Typed Message-driven Communication: Define structured messages using Python classes.
  • Ease of Use: Simplified APIs for creating publishers and subscribers.

This section will guide you through creating a simple publisher and subscriber using commlib-py. You'll learn how to define message types, publish data, and process received messages effectively.

Write a Simple Publisher
from commlib.msg import MessageHeader, PubSubMessage
from commlib.node import Node
from commlib.transports.mqtt import ConnectionParameters

class SonarMessage(PubSubMessage):
    header: MessageHeader = MessageHeader()
    range: float = -1
    hfov: float = 30.6
    vfov: float = 14.2

class SonarMessage(PubSubMessage):
    distance: float = 0.001
    horizontal_fov: float = 30.0
    vertical_fov: float = 14.0

if __name__ == "__main__":
    conn_params = ConnectionParameters(host='localhost', port=1883)
    node = Node(node_name='sensors.sonar.front', connection_params=conn_params)
    pub = node.create_publisher(msg_type=SonarMessage, topic='sensors.sonar.front')
    node.run()
    msg = SonarMessage()
    while True:
        pub.publish(msg)
        msg.range += 1
        time.sleep(1)
Write a Simple Subscriber
#!/usr/bin/env python

import time
from commlib.msg import MessageHeader, PubSubMessage
from commlib.node import Node
from commlib.transports.mqtt import ConnectionParameters

class SonarMessage(PubSubMessage):
    header: MessageHeader = MessageHeader()
    range: float = -1
    hfov: float = 30.6
    vfov: float = 14.2

def on_message(msg):
    print(f'Received front sonar data: {msg}')

if __name__ == '__main__':
    conn_params = ConnectionParameters()

    node = Node(node_name='node.obstacle_avoidance', connection_params=conn_params)

    node.create_subscriber(msg_type=SonarMessage,
                           topic='sensors.sonar.front',
                           on_message=on_message)  # Define a callback function

    node.run_forever(sleep_rate=1)  # Define a process-level sleep rate in hz

Wildcard Subscriptions

For pattern-based topic subscriptions using wildcards one can also use the PSubscriber class directly.

For multi-topic publisher one can also use the MPublisher class directly.

from commlib.node import Node
from commlib.transports.mqtt import ConnectionParameters

def on_msg_callback(msg, topic):
    print(f'Message at topic <{topic}>: {msg}')

if __name__ == '__main__':
    conn_params = ConnectionParameters()
    node = Node(node_name='wildcard_subscription_example',
                connection_params=conn_params)

    # Create a pattern subscriber
    node.create_psubscriber(topic='topic.*', on_message=on_msg_callback)
    # Create a multi-topic publisher instance.
    pub = node.create_mpublisher()
    node.run(wait=True)

    topicA = 'topic.a'
    topicB = 'topic.b'

    while True:
        pub.publish({'a': 1}, topicA)
        pub.publish({'b': 1}, topicB)
        time.sleep(1)

Topic Notation Conversion

Commlib uses a unified topic notation standard across all supported protocols. This section explains how to convert topics between different protocol formats and between the unified notation and protocol-specific formats.

Unified Topic Notation

The commlib unified topic notation uses:

  • Separator: . (dot)
  • Wildcard: * (asterisk) - matches any segment or multiple segments
  • Format: a.b.c.d

Examples:

  • sensors.temperature - specific topic
  • sensors.*.temperature - wildcard for single segment
  • sensors.* - wildcard for multiple segments (catch-all)
Protocol-Specific Formats

Different message brokers use different topic notations:

Protocol Format Separator Wildcard
Commlib (unified) a.b.c . *
MQTT a/b/c / + (single), # (multi)
Redis a.b.c . *
AMQP a.b.c . *, #
Kafka a-b-c - *
Topic Conversion Functions

The commlib.utils module provides functions for converting topics between different protocols:

from commlib.utils import (
    convert_topic_notation,
    topic_to_mqtt, topic_from_mqtt,
    topic_to_redis, topic_from_redis,
    topic_to_kafka, topic_from_kafka,
    topic_to_amqp, topic_from_amqp,
)

# Convert from MQTT to commlib unified notation
mqtt_topic = "sensors/+/temperature"
commlib_topic = topic_from_mqtt(mqtt_topic)
# Result: "sensors.*.temperature"

# Convert from commlib to MQTT
commlib_topic = "sensors.*.temperature"
mqtt_topic = topic_to_mqtt(commlib_topic)
# Result: "sensors/+/temperature"

# Convert from Kafka to MQTT
kafka_topic = "sensors-temperature"
mqtt_topic = convert_topic_notation(kafka_topic, "kafka", "mqtt")
# Result: "sensors/temperature"

# Convert from commlib to Kafka
commlib_topic = "building.floor.room.sensor"
kafka_topic = convert_topic_notation(commlib_topic, "commlib", "kafka")
# Result: "building-floor-room-sensor"
Available Conversion Functions

Direct Protocol Conversions:

  • topic_to_mqtt(topic) - Convert from commlib to MQTT format
  • topic_from_mqtt(topic) - Convert from MQTT format to commlib
  • topic_to_redis(topic) - Convert from commlib to Redis format
  • topic_from_redis(topic) - Convert from Redis format to commlib
  • topic_to_kafka(topic) - Convert from commlib to Kafka format
  • topic_from_kafka(topic) - Convert from Kafka format to commlib
  • topic_to_amqp(topic) - Convert from commlib to AMQP format
  • topic_from_amqp(topic) - Convert from AMQP format to commlib

Unified Conversion:

  • convert_topic_notation(topic, from_protocol, to_protocol) - Convert between any two protocols

Supported protocol names: "commlib", "mqtt", "redis", "amqp", "kafka"

Real-World Examples
from commlib.utils import convert_topic_notation

# IoT sensor hierarchy
mqtt_hierarchy = "home/+/sensors/+/temperature"
commlib_hierarchy = convert_topic_notation(mqtt_hierarchy, "mqtt", "commlib")
# Result: "home.*.sensors.*.temperature"

# Event stream conversion
mqtt_events = "events/system/+/logs"
redis_events = convert_topic_notation(mqtt_events, "mqtt", "redis")
# Result: "events.system.*.logs"

# RPC namespace conversion
kafka_rpc = "rpc-service-request"
mqtt_rpc = convert_topic_notation(kafka_rpc, "kafka", "mqtt")
# Result: "rpc/service/request"

# Catch-all topic
mqtt_all = "#"
commlib_all = convert_topic_notation(mqtt_all, "mqtt", "commlib")
# Result: "*"

Preemptive Services with Feedback (Actions)

Actions are pre-emptive services with support for asynchronous feedback publishing. This communication pattern is used to implement services which can be stopped and can provide feedback data, such as the move command service of a robot.

The example below shows how to use Actions to implement preemptive robot motion services, with integrated feedback functionality. In this example we implement the MoveByDistance command service for our custom Lab robot. The message structure (MoveByDistanceMsg) is defined by the Goal, Result and Feedback classes. The logic is that we set a distance goal (in cm) and we monitor the progress through the feedback channel, which for commlib-py it is automatically captured.

Write an Action Service
import time

from commlib.action import GoalStatus
from commlib.msg import ActionMessage
from commlib.transports.redis import ConnectionParameters
)

class MoveByDistanceMsg(ActionMessage):
    class Goal(ActionMessage.Goal):
        target_cm: int = 0

    class Result(ActionMessage.Result):
        dest_cm: int = 0

    class Feedback(ActionMessage.Feedback):
        current_cm: int = 0

def on_goal_request(goal_h):
    c = 0
	goal_req_data = goal_h.data  # Retrieve goal request data from goal handler
    res = MoveByDistanceMsg.Result()  # Goal Result Message
    while c < goal_h.data.target_cm:
        if goal_h.cancel_event.is_set():  # Use the cancel_event property of goal handler instance
            break
        goal_h.send_feedback(MoveByDistanceMsg.Feedback(current_cm=c))
        c += 1
        time.sleep(1)
    res.dest_cm = c
    return res

if __name__ == '__main__':
    action_name = 'myrobot.move.distance'
    conn_params = ConnectionParameters()
    node = Node(node_name='myrobot.node.motion',  # A node can provide several motion interfaces
                connection_params=conn_params)
    node.create_action(msg_type=MoveByDistanceMsg,
                       action_name=action_name,
                       on_goal=on_goal_request)
    node.run_forever()
Write an Action Client
import time

from commlib.action import GoalStatus
from commlib.msg import ActionMessage
from commlib.transports.redis import ActionClient, ConnectionParameters

class MoveByDistanceMsg(ActionMessage):
    class Goal(ActionMessage.Goal):
        target_cm: int = 0

    class Result(ActionMessage.Result):
        dest_cm: int = 0

    class Feedback(ActionMessage.Feedback):
        current_cm: int = 0

def on_feedback(feedback):
    print(f'ActionClient <on-feedback> callback: {feedback}')

def on_result(result):
    print(f'ActionClient <on-result> callback: {result}')

def on_goal_reached(result):
    print(f'ActionClient <on-goal-reached> callback: {result}')

if __name__ == '__main__':
    action_name = 'testaction'
    conn_params = ConnectionParameters()
    node = Node(node_name='action_client_example_node',
                connection_params=conn_params)
    action_client = node.create_action_client(
        msg_type=MoveByDistanceMsg,
        action_name=action_name,
        on_goal_reached=on_goal_reached,
        on_feedback=on_feedback,
        on_result=on_result
    )
    node.run()
    goal_msg = MoveByDistanceMsg.Goal(target_cm=5)
    action_client.send_goal(goal_msg)
    resp = action_client.get_result(wait=True)
    print(f'Action Result: {resp}')
    node.stop()

🏗️ Advanced

Endpoints (Low-level API)

It is possible to construct endpoints without binding them to a specific Node. This is a feature to support a wider range of applications, where the concept Node might not be usable.

One can create endpoint instances by using the following classes of each supported transport:

Endpoint Type Description Required Parameters Supported Protocols
RPCClient Sends RPC requests and waits for responses. msg_type, rpc_name, connection_params MQTT, Redis, AMQP, Kafka
RPCServer Handles RPC requests and sends responses. msg_type, rpc_name, on_request, connection_params MQTT, Redis, AMQP, Kafka
Publisher Sends messages to a specific topic. msg_type, topic, connection_params MQTT, Redis, AMQP, Kafka
Subscriber Listens for messages on a specific topic. msg_type, topic, on_message, connection_params MQTT, Redis, AMQP, Kafka
MPublisher Publishes messages to multiple topics. connection_params MQTT, Redis, AMQP, Kafka
WPublisher A wrapped publisher with additional features. msg_type, topic, connection_params MQTT, Redis
PSubscriber Subscribes to topics using patterns. topic_pattern, on_message, connection_params MQTT, Redis, AMQP, Kafka
WSubscriber A wrapped subscriber with additional features. msg_type, topic, on_message, connection_params MQTT, Redis
ActionService Provides preemptive services with feedback. msg_type, action_name, on_goal, connection_params MQTT, Redis, AMQP
ActionClient Sends goals to an action service and receives feedback. msg_type, action_name, on_feedback, on_result, connection_params MQTT, Redis, AMQP
from commlib.transports.redis import RPCService
from commlib.transports.amqp import Subscriber
from commlib.transports.mqtt import Publisher, RPCClient
...

Or use the endpoint_factory to construct endpoints.

import time
from commlib.endpoints import endpoint_factory, EndpointType, TransportType

def callback(data):
    print(data)

if __name__ == '__main__':
    topic = 'endpoints_factory_example'

    mqtt_sub = endpoint_factory(
        EndpointType.Subscriber,
        TransportType.MQTT)(topic=topic, on_message=callback)
    mqtt_sub.run()

    mqtt_pub = endpoint_factory(
        EndpointType.Publisher,
        TransportType.MQTT)(topic=topic, debug=True)
	mqtt_pub.run()

    data = {'a': 1, 'b': 2}
    while True:
        mqtt_pub.publish(data)
        time.sleep(1)

B2B bridges

In the context of IoT and CPS, it is a common requirement to bridge messages between message brokers, based on application-specific rules. An example is to bridge analytics (preprocessed) data from the Edge to the Cloud. And what happens if the brokers use different communication protocols?

Commlib builds a thin layer on top of the internal PubSub and RPC API to provide a protocol-agnostic implementation of Broker-to-Broker bridges.

Below are examples of:

  1. A Redis-to-MQTT RPC Bridge
  2. A Redis-to-MQTT Topic Bridge.
#!/usr/bin/env python

import time

import commlib.transports.amqp as acomm
import commlib.transports.redis as rcomm
import commlib.transports.mqtt as mcomm

from commlib.bridges import (
    RPCBridge, RPCBridgeType, TopicBridge, TopicBridgeType
)

def redis_to_mqtt_rpc_bridge():
    """
    [RPC Client] ----> [Broker A] ------> [Broker B] ---> [RPC Service]
    """
    bA_params = rcomm.ConnectionParameters()
    bB_params = mcomm.ConnectionParameters()
    bA_uri = 'ops.start_navigation'
    bB_uri = 'thing.robotA.ops.start_navigation'
    br = RPCBridge(from_uri=bA_uri, to_uri=bB_uri,
                   from_broker_params=bA_params,
                   to_broker_params=bB_params,
                   debug=False)
    br.run()

def redis_to_mqtt_topic_bridge():
    """
    [Producer Endpoint] ---> [Broker A] ---> [Broker B] ---> [Consumer Endpoint]
    """
    bA_params = rcomm.ConnectionParameters()
    bB_params = mcomm.ConnectionParameters()
    bA_uri = 'sonar.front'
    bB_uri = 'thing.robotA.sensors.sonar.font'
    br = TopicBridge(from_uri=bA_uri, to_uri=bB_uri,
                     from_broker_params=bA_params,
                     to_broker_params=bB_params,
                     debug=False)
    br.run()

if __name__ == '__main__':
    redis_to_mqtt_rpc_bridge()
    redis_to_mqtt_topic_bridge()

The Base Bridge Class is inherited by the RPCBridge, TopicBridge and PTopicBridge classes.

class Bridge:
    """Bridge.
    Base Bridge Class.
    """
    def __init__(self,
                 from_uri: str,
                 to_uri: str,
                 from_broker_params: BaseConnectionParameters,
                 to_broker_params: BaseConnectionParameters,
                 auto_transform_uris: bool = True,
                 debug: bool = False):

The TopicBridge and RPCBridge classes have an extra argument to define the message type of the communication channel.

class RPCBridge(Bridge):
    def __init__(self, msg_type: RPCMessage = None, *args, **kwargs):
        ...


class TopicBridge(Bridge):
    def __init__(self, msg_type: PubSubMessage = None, *args, **kwargs):
        ...

Finally, the PTopicBridge is used for bridging topics using wildcards.

 class PTopicBridge(Bridge):
    """PTopicBridge.

    Args:
        msg_type (PubSubMessage): The message type to be used for the subscriber and publisher.
        uri_transform (List): A list of tuples containing the from and to strings for transforming the topic URIs.

    The constructor determines the type of the topic bridge based on the types of
    the from and to broker parameters. It then creates the subscriber and publisher
    endpoints using the appropriate endpoint factory functions.
    """

    def __init__(self,
                 msg_type: PubSubMessage = None,
                 uri_transform: List = [],
                 *args,
                 **kwargs):

A Pattern-based Topic Bridge (PTopicBridge) example is also shown below. In this example, we use static definition of messages (SonarMessage), also referred as typed communication.

#!/usr/bin/env python

import time

from commlib.msg import PubSubMessage
from commlib.bridges import PTopicBridge
import commlib.transports.amqp as acomm
import commlib.transports.redis as rcomm

class SonarMessage(PubSubMessage):
    distance: float = 0.001
    horizontal_fov: float = 30.0
    vertical_fov: float = 14.0

if __name__ == '__main__':
    """
    [Broker A] ------------> [Broker B] ---> [Consumer Endpoint]
    """
    bA_uri = 'sensors.*'
    bB_namespace = 'myrobot'

    bA_params = rcomm.ConnectionParameters()
    bB_params = mcomm.ConnectionParameters()

    br = PTopicBridge(bA_uri,
                      bB_namespace,
                      bA_params,
                      bB_params,
                      msg_type=SonarMessage,
                      debug=False)
    br.run()

TCP Bridge

TCP bridge forwards tcp packages between two endpoints:


[Client] -------> [TCPBridge, port=xxxx] ---------> [TCP endpoint, port=xxxx]

A one-to-one connection is performed between the bridge and the endpoint.

REST Proxy

Implements a REST proxy, that enables invocation of REST services via message brokers. The proxy uses an RPCService to run the broker endpoint and an http client for calling REST services. An RPC call is transformed into proper, REST-compliant, http request, based on the input parameters.

Responses from the REST services have the following RESTProxyMessage schema:

class RESTProxyMessage(RPCMessage):
    class Request(RPCMessage.Request):
        base_url: str
        path: str = '/'
        verb: str = 'GET'
        query_params: Dict[str, Any] = {}
        path_params: Dict[str, Any] = {}
        body_params: Dict[str, Any] = {}
        headers: Dict[str, Any] = {}

    class Response(RPCMessage.Response):
        data: Union[str, Dict, int]
        headers: Dict[str, Any]
        status_code: int = 200

Head to the commlib-rest-proxy repo for a dockerized application that implements a REST Proxy using commlib-py.

Web Gateway

Head to the commlib-web-gw repo for a dockerized application that implements a Web Gateway for message brokers, using commlib-py.

🤖 Examples

The examples/ directory of this repository includes various usage and application examples of using commlib-py.

🧪 Testing

Commlib-py uses the pytest framework. Run the test suite with:

Using make (Makefile included in this repo):

make test

Using pip:

pytest

Using conda:

conda activate {venv}
pytest

Furthermore, the Makefile provides coverage commands:

Coverage

make cov

Coverage Diff

make diff

🎞️ Roadmap

  • Task 1: Protocol-agnostic architecture
  • Task 2: Support the AMQP and MQTT protocols
  • Task 3: Support Redis protocol
  • Task 4: Support Kafka protocol (Under development / Partial Support)
  • Task 5: RPCServer implementation for AMQP and Kafka transports
  • Task 6: Comprehensive testing

🤝 Contributing

Contributing Guidelines
  1. Fork the Repository: Start by forking the project repository to your LOCAL account.
  2. Clone Locally: Clone the forked repository to your local machine using a git client.
    git clone https://github.com/{FORKED_ACCOUNT}/commlib-py.git
    
  3. Create a New Branch: Always work on a new branch, giving it a descriptive name.
    git checkout -b new-feature-x
    
  4. Make Your Changes: Develop and test your changes locally.
  5. Commit Your Changes: Commit with a clear message describing your updates.
    git commit -m 'Implemented new feature x.'
    
  6. Push to LOCAL: Push the changes to your forked repository.
    git push origin new-feature-x
    
  7. Submit a Pull Request: Create a PR against the original project repository. Clearly describe the changes and their motivations.
  8. Review: Once your PR is reviewed and approved, it will be merged into the main branch. Congratulations on your contribution!
Contributor Graph


📜 License

Commlib-py is protected under the MIT License. For more details, refer to the MIT LICENSE uri.


🎩 Acknowledgments

  • Credit contributors, inspiration, references, etc.


🌟 Star History

Star History Chart

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

commlib_py-0.13.1.tar.gz (114.9 kB view details)

Uploaded Source

Built Distribution

If you're not sure about the file name format, learn more about wheel file names.

commlib_py-0.13.1-py3-none-any.whl (80.3 kB view details)

Uploaded Python 3

File details

Details for the file commlib_py-0.13.1.tar.gz.

File metadata

  • Download URL: commlib_py-0.13.1.tar.gz
  • Upload date:
  • Size: 114.9 kB
  • Tags: Source
  • Uploaded using Trusted Publishing? No
  • Uploaded via: twine/6.2.0 CPython/3.12.12

File hashes

Hashes for commlib_py-0.13.1.tar.gz
Algorithm Hash digest
SHA256 be4b18aca4bdcd3368c9c8512ff1ff33cd4ef022401147e4c3805a7b91c31393
MD5 67946b00e5469fa50d3c97edf06d5b12
BLAKE2b-256 28cbb71481b4574ba3dc4fb76f211e583ef3421f91765d4e109a4d37384cb894

See more details on using hashes here.

File details

Details for the file commlib_py-0.13.1-py3-none-any.whl.

File metadata

  • Download URL: commlib_py-0.13.1-py3-none-any.whl
  • Upload date:
  • Size: 80.3 kB
  • Tags: Python 3
  • Uploaded using Trusted Publishing? No
  • Uploaded via: twine/6.2.0 CPython/3.12.12

File hashes

Hashes for commlib_py-0.13.1-py3-none-any.whl
Algorithm Hash digest
SHA256 58874ef0cc264a0baec725e34dd09f1e8607558ad55a05778ed27be169d3ecd9
MD5 2522461b8279d6b21eff0870866332dd
BLAKE2b-256 0eea4d3f4b5d339717596c039fc80e545f007784b946abe2db2f03de466a2e3a

See more details on using hashes here.

Supported by

AWS Cloud computing and Security Sponsor Datadog Monitoring Depot Continuous Integration Fastly CDN Google Download Analytics Pingdom Monitoring Sentry Error logging StatusPage Status page