Skip to main content

Python SDK for ROS 2 communication via Zenoh - Use ROS 2 without ROS 2 environment

Project description

zenoh-ros2-sdk

Python SDK for ROS 2 communication via Zenoh - Use ROS 2 without ROS 2 environment

Enable ROS 2 topic publishing and subscribing in pure Python applications. Publishers and subscribers automatically appear in ros2 topic list and work seamlessly with existing ROS 2 nodes using rmw_zenoh.

Documentation

Features

  • No ROS 2 installation required - Works with just Python and Zenoh
  • Appears in ros2 topic list - Uses liveliness tokens for ROS 2 discovery
  • Automatic resource management - GIDs, node IDs, entity IDs handled automatically
  • Session pooling - Multiple publishers/subscribers share the same Zenoh session
  • Automatic message/service loading - Automatically downloads message and service definitions from Git repositories
  • Type hash computation - Computes ROS2-compatible type hashes from message/service definitions
  • Type registration - Automatic message and service type registration
  • Service support - Create service clients and servers with automatic type loading
  • Clean API - Simple, intuitive interface

Quick Start

Install

pip install zenoh-ros2-sdk

After install, the zenoh-ros2 command is available. Topic and service commands:

# Topic discovery (ros2 topic list / info equivalents)
zenoh-ros2 topic list -t              # list topics with types
zenoh-ros2 topic info -v /chatter     # verbose info for a topic
zenoh-ros2 --router 127.0.0.1:7447 topic list

# Service discovery (ros2 service list / type equivalents)
zenoh-ros2 service list -t            # list services with types
zenoh-ros2 service type /add_two_ints # print type(s) for a service

For fast discovery (no ~10 s exit delay), start the background daemon: zenoh-ros2 daemon start. Then zenoh-ros2 topic ... and zenoh-ros2 service ... use it and return quickly. Use --no-daemon to run discovery in-process.

Simple Publisher

from zenoh_ros2_sdk import ROS2Publisher

# Message type loading is automatic (no need to call load_message_type)
# Create publisher - msg_definition is optional, auto-loads from registry
pub = ROS2Publisher(
    topic="/chatter",
    msg_type="std_msgs/msg/String"
)

# Publish messages
pub.publish(data="Hello World!")
pub.publish(data="Another message")

pub.close()

Simple Subscriber

from zenoh_ros2_sdk import ROS2Subscriber

# Message type loading is automatic (no need to call load_message_type)
def on_message(msg):
    print(f"Received: {msg.data}")

# Create subscriber - msg_definition is optional, auto-loads from registry
sub = ROS2Subscriber(
    topic="/chatter",
    msg_type="std_msgs/msg/String",
    callback=on_message
)

# Keep running
import time
time.sleep(10)

sub.close()

Simple Service Server

from zenoh_ros2_sdk import ROS2ServiceServer, get_message_class

# Service type loading is automatic (no need to call load_service_type)
def service_handler(request):
    # Get response message class
    Response = get_message_class("example_interfaces/srv/AddTwoInts_Response")
    # Process request and return response
    return Response(sum=request.a + request.b)

# Create service server
server = ROS2ServiceServer(
    service_name="/add_two_ints",
    srv_type="example_interfaces/srv/AddTwoInts",
    callback=service_handler
)

# Keep running
import time
time.sleep(10)

server.close()

Simple Service Client

from zenoh_ros2_sdk import ROS2ServiceClient

# Service type loading is automatic (no need to call load_service_type)
# Create service client
client = ROS2ServiceClient(
    service_name="/add_two_ints",
    srv_type="example_interfaces/srv/AddTwoInts"
)

# Make synchronous service call
response = client.call(a=5, b=3)
if response:
    print(f"Sum: {response.sum}")

# Make asynchronous service call
def callback(response):
    if response:
        print(f"Sum: {response.sum}")

client.call_async(callback, a=10, b=20)

client.close()

Architecture

Key Components

  1. ZenohSession (Singleton)

    • Manages shared Zenoh session
    • Handles type registration
    • Generates unique GIDs
    • Manages node/entity ID counters
  2. ROS2Publisher

    • Creates publisher with liveliness tokens
    • Handles attachments (sequence, timestamp, GID)
    • Appears in ros2 topic list
  3. ROS2Subscriber

    • Subscribes to topics
    • Deserializes CDR messages
    • Calls user callback
  4. ROS2ServiceClient

    • Creates service client with liveliness tokens
    • Sends requests using Zenoh queries
    • Receives responses asynchronously
    • Supports both synchronous and asynchronous calls
  5. ROS2ServiceServer

    • Creates service server with liveliness tokens
    • Receives requests via Zenoh queryable
    • Calls user callback with request
    • Sends response back to client

Resource Management

  • GID Generation: Uses UUID4 to generate unique 16-byte GIDs
  • Node IDs: Auto-incremented per node
  • Entity IDs: Auto-incremented per publisher/subscriber
  • Session Reuse: All publishers/subscribers share the same Zenoh session

Examples

See examples/README.md for self-contained example scripts.

Advanced Usage

Using Message Registry (Recommended)

The SDK automatically downloads message definitions from Git repositories. Message types are loaded automatically when creating publishers/subscribers:

from zenoh_ros2_sdk import ROS2Publisher, get_message_class

# Message type loading is automatic - no need to call load_message_type
# Get message classes for easy object creation
Vector3 = get_message_class("geometry_msgs/msg/Vector3")
Twist = get_message_class("geometry_msgs/msg/Twist")

# Create publisher - message type is automatically loaded
pub = ROS2Publisher(
    topic="/cmd_vel",
    msg_type="geometry_msgs/msg/Twist"
)

# Create message objects
linear = Vector3(x=0.5, y=0.0, z=0.0)
angular = Vector3(x=0.0, y=0.0, z=0.2)
pub.publish(linear=linear, angular=angular)

pub.close()

Manual Message Definitions

You can still provide message definitions manually if needed:

from zenoh_ros2_sdk import ROS2Publisher

pub = ROS2Publisher(
    topic="/counter",
    msg_type="std_msgs/msg/Int32",
    msg_definition="int32 data\n"
)

pub.publish(data=42)
pub.close()

Configuration

Parameters

  • domain_id: ROS domain ID (defaults to ROS_DOMAIN_ID or 0)
  • router_ip: Zenoh router IP address
  • router_port: Zenoh router port
  • node_name: Custom node name (auto-generated if not provided)
  • namespace: Node namespace (default: "/")

Zenoh configuration override (advanced)

You can override the Zenoh session configuration using the ZENOH_CONFIG_OVERRIDE environment variable (a semicolon-separated list of path=value entries). This is useful for enabling features like shared memory transport or forcing client mode:

export ZENOH_CONFIG_OVERRIDE='transport/shared_memory/enabled=true;mode="client";connect/endpoints=["tcp/192.168.6.2:7447"]'

Notes:

  • Values are parsed as JSON5. If the value is a string, it must be quoted (e.g., mode="client").

ROS domain ID via environment

If you do not pass domain_id to the constructor, the SDK uses ROS_DOMAIN_ID from the environment (falling back to 0 when it is not set). An explicit domain_id argument always overrides the environment value.

export ROS_DOMAIN_ID=30

Requirements

  • Python 3.8+
  • eclipse-zenoh Python package (>=0.10.0)
  • rosbags Python package (>=0.11.0, for message serialization)
  • GitPython Python package (>=3.1.18, for automatic message downloading from git repositories)
  • tqdm Python package (>=4.64.0, for download progress indicators)

Optional Dependencies

For development and testing:

# From PyPI:
pip install "zenoh-ros2-sdk[dev]"

# From source:
pip install -e ".[dev]"

Installation

From PyPI

pip install zenoh-ros2-sdk

From source

git clone https://github.com/robotis-git/zenoh_ros2_sdk.git
cd zenoh_ros2_sdk
pip install -e .

Design Decisions

  1. Singleton Session: All publishers/subscribers share one Zenoh session for efficiency
  2. Auto GID Generation: Uses UUID4 for unique GIDs per publisher
  3. Liveliness Tokens: Automatically declared so publishers appear in ros2 topic list
  4. Type Hash Computation: Automatically computes ROS2-compatible type hashes from message definitions using the same algorithm as ROS2
  5. Message Registry: Automatically downloads message definitions from Git repositories and caches them locally
  6. Clean API: Abstracts away Zenoh/rmw_zenoh complexity

Future Improvements

  • Support for more message types out of the box
  • Action support
  • Better error handling and retry logic
  • Connection pooling and reconnection
  • QoS configuration options

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

zenoh_ros2_sdk-0.1.7.tar.gz (85.9 kB view details)

Uploaded Source

Built Distribution

If you're not sure about the file name format, learn more about wheel file names.

zenoh_ros2_sdk-0.1.7-py3-none-any.whl (88.3 kB view details)

Uploaded Python 3

File details

Details for the file zenoh_ros2_sdk-0.1.7.tar.gz.

File metadata

  • Download URL: zenoh_ros2_sdk-0.1.7.tar.gz
  • Upload date:
  • Size: 85.9 kB
  • Tags: Source
  • Uploaded using Trusted Publishing? No
  • Uploaded via: twine/6.1.0 CPython/3.13.7

File hashes

Hashes for zenoh_ros2_sdk-0.1.7.tar.gz
Algorithm Hash digest
SHA256 9a62c2aa75d628704de5f216d40db72027608fd1a275d18027253809d658bcbb
MD5 092313b3db672c100c1581a75f0f4cd7
BLAKE2b-256 2e1e0e86f7bbd727f6113a2de38194d9eaabf0597f54c84ac308b03bce719079

See more details on using hashes here.

File details

Details for the file zenoh_ros2_sdk-0.1.7-py3-none-any.whl.

File metadata

  • Download URL: zenoh_ros2_sdk-0.1.7-py3-none-any.whl
  • Upload date:
  • Size: 88.3 kB
  • Tags: Python 3
  • Uploaded using Trusted Publishing? No
  • Uploaded via: twine/6.1.0 CPython/3.13.7

File hashes

Hashes for zenoh_ros2_sdk-0.1.7-py3-none-any.whl
Algorithm Hash digest
SHA256 25fd1b0a8b78b3bf6783ed89dca073dd40a0d873577cd240532420b9640609aa
MD5 01f19794f39036b883e7da2d2b8b75c3
BLAKE2b-256 5d5ff96a073d59b079414cda82ff4131cc1a328c5736eadfd417e4003998cc1f

See more details on using hashes here.

Supported by

AWS Cloud computing and Security Sponsor Datadog Monitoring Depot Continuous Integration Fastly CDN Google Download Analytics Pingdom Monitoring Sentry Error logging StatusPage Status page