A simple ROS2-like framework based on Zenoh
Project description
Rose 🌹
A lightweight, ROS2-like robot communication framework built on Zenoh.
设计理念
Rose 是一个极简的发布/订阅与 RPC 通信框架,借鉴 ROS2 的节点(Node)、话题(Topic)、服务(Service)概念,但底层通信完全基于 Zenoh,不依赖 DDS 栈。
核心特点:
- 轻量 — 核心依赖仅包含
zenoh、msgspec,无 DDS、无 ROS 生态包袱 (typer/loguru/rich仅调试用途) - 类型安全 — 基于
msgspec.Struct定义消息,自动 msgpack 序列化,编译时(IDE)和运行时双重类型校验 - 声明式风格 — 通过
Node工厂方法创建发布者/订阅者/服务/客户端,API 清晰一致 - 自发现拓扑 — 利用 Zenoh Liveliness Token 实现节点/话题/服务的自动发现,并提供
rose-probeCLI 工具查看网络拓扑
安装
pip install rose-py
或者使用 uv:
uv add rose-py
快速入门
发布/订阅模式
1. 定义消息
# msg.py
from rose import Message
class EnvSensorData(Message):
"""环境传感器数据"""
temperature: float
humidity: float
2. 发布者
# pub.py
import time
from loguru import logger
from rose import Node
from msg import EnvSensorData
node = Node("sensor_hub_1")
pub = node.create_publisher("room_a/sensor/env", EnvSensorData)
logger.success("传感器节点启动,开始广播数据...")
try:
temp = 25.0
while True:
msg = EnvSensorData(
# header={"frame_id": "sensor_link_1"}, # 可选覆盖
temperature=temp,
humidity=60.5,
)
pub.publish(msg)
logger.info(f"已发送 -> 温度: {msg.temperature}°C")
temp += 0.001
time.sleep(0.1)
except KeyboardInterrupt:
node.close()
3. 订阅者(回调模式)
# sub_callback.py
from loguru import logger
from rich import print as rprint
from rose import Node
from msg import EnvSensorData
node = Node("data_logger")
def on_sensor_data(msg: EnvSensorData, source_key: str) -> None:
logger.success(f"收到来自 [{source_key}] 的数据:")
rprint(msg)
node.create_subscriber("room_a/sensor/env", EnvSensorData, on_sensor_data)
logger.success("日志节点已就绪,正在监听传感器网络...")
node.spin()
4. 订阅者(轮询模式)
# sub_polling.py
from loguru import logger
from rich import print as rprint
from rose import Node
from msg import EnvSensorData
node = Node("data_logger")
sub = node.create_subscriber("room_a/sensor/env", EnvSensorData)
try:
while True:
msg, key = sub.recv()
logger.success(f"收到 [{key}]:")
rprint(msg)
except KeyboardInterrupt:
node.close()
RPC 模式(服务/客户端)
1. 定义消息
# msg.py
from rose import Message
class AddIntsReq(Message):
a: int
b: int
class AddIntsRes(Message):
sum: int
class DivFloatsReq(Message):
a: float
b: float
class DivFloatsRes(Message):
quotient: float
2. 服务端
# server.py
from loguru import logger
from rose import Node
from msg import AddIntsReq, AddIntsRes, DivFloatsReq, DivFloatsRes
def handle_add(req: AddIntsReq) -> AddIntsRes:
logger.info(f"服务端收到计算请求: {req.a} + {req.b}")
return AddIntsRes(sum=req.a + req.b)
def handle_div(req: DivFloatsReq) -> DivFloatsRes:
logger.info(f"服务端收到计算请求: {req.a} / {req.b}")
return DivFloatsRes(quotient=req.a / req.b)
node = Node("math_server")
node.create_service("math/add", AddIntsReq, AddIntsRes, handle_add)
node.create_service("math/div", DivFloatsReq, DivFloatsRes, handle_div)
logger.success("加法和除法服务已就绪,等待客户端调用...")
node.spin()
3. 客户端
# client.py
from rich import print as rprint
from rose import Node
from msg import AddIntsReq, AddIntsRes, DivFloatsReq, DivFloatsRes
node = Node("math_client")
# 加法
add_client = node.create_client("math/add", AddIntsReq, AddIntsRes)
if not add_client.wait_for_service(timeout=3):
print("服务端未就绪,请稍后再试")
exit(1)
res = add_client.call(AddIntsReq(a=10, b=20))
rprint(res)
# 除法
div_client = node.create_client("math/div", DivFloatsReq, DivFloatsRes)
res = div_client.call(DivFloatsReq(a=10.0, b=2.0))
rprint(res)
API 概览
Node
节点是通信的核心容器,每个 Node 内部维护一个 Zenoh Session。
| 方法 | 描述 |
|---|---|
Node(name) |
创建节点,自动建立 Zenoh 会话 |
node.spin() |
保持节点运行,直到 Ctrl+C |
node.close() |
优雅关闭节点,释放资源 |
支持 with 语句自动管理生命周期:
with Node("my_node") as node:
# 做点什么
pass # 自动 close()
Message
所有消息的基类,继承 msgspec.Struct,自带 Header 字段:
header.timestamp— 自动填入当前时间戳header.frame_id— 坐标系 ID(如"base_link")
子类只需添加业务字段:
class MyMsg(Message):
value: float
Publisher[MsgType]
pub = node.create_publisher(key_expr, MsgClass)
pub.publish(msg)
- 发布时自动做类型校验,类型不匹配抛出
TypeError - 自动注册 Liveliness Token
Subscriber[MsgType]
# 回调模式
sub = node.create_subscriber(key_expr, MsgClass, callback)
# 轮询模式
sub = node.create_subscriber(key_expr, MsgClass)
msg, key = sub.recv(timeout=2.0)
Service[ReqType, ResType]
service = node.create_service(key_expr, ReqClass, ResClass, handler)
handler接收请求消息,返回响应消息- 异常自动捕获并返回错误信息给客户端
Client[ReqType, ResType]
client = node.create_client(key_expr, ReqClass, ResClass)
if client.wait_for_service(timeout=3):
res = client.call(request, timeout=2.0)
wait_for_service()— 通过 Liveliness Token 探测服务端call()— 同步阻塞调用,超时抛出TimeoutError
内置基础类型
Rose 内置了一些类 ROS 的几何基础类型:
| 类型 | 说明 |
|---|---|
Vector3 |
三维向量 (x, y, z),提供 to_tuple() / from_tuple() |
Quaternion |
四元数 (x, y, z, w),默认单位四元数 w=1 |
Pose |
位姿,组合 position: Vector3 和 orientation: Quaternion |
CLI 诊断工具
Rose 提供了一个 CLI 工具 rrr(Rose 网络诊断),用于查看当前网络的拓扑结构:
# 查看所有节点/话题/服务(完整拓扑)
rrr ls
# 列出所有节点
rrr node list
# 查看节点详情
rrr node info sensor_hub_1
# 列出所有话题
rrr topic list
# 查看话题详情
rrr topic info room_a/sensor/env
# 列出所有服务
rrr service list
# 查看服务详情
rrr service info math/add
项目结构
src/rose/
├── __init__.py # 导出 Node, Message
├── message.py # 消息基类 + 几何基础类型
├── node.py # Node + Publisher/Subscriber/Service/Client
└── probe.py # CLI 诊断工具
依赖
eclipse-zenoh >= 1.9.0msgspec >= 0.21.1typer >= 0.25.1loguru >= 0.7.3rich >= 15.0.0
License
MIT
Project details
Download files
Download the file for your platform. If you're not sure which to choose, learn more about installing packages.
Source Distribution
Built Distribution
Filter files by name, interpreter, ABI, and platform.
If you're not sure about the file name format, learn more about wheel file names.
Copy a direct link to the current filters
File details
Details for the file rose_py-0.1.2.tar.gz.
File metadata
- Download URL: rose_py-0.1.2.tar.gz
- Upload date:
- Size: 8.5 kB
- Tags: Source
- Uploaded using Trusted Publishing? Yes
- Uploaded via: twine/6.1.0 CPython/3.13.12
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
f931c93befbee7de0403fa484d6c7c60b0da758da34e4cf1fe57c96ffa798c80
|
|
| MD5 |
8129a86425867221d63ba2a26e22fd82
|
|
| BLAKE2b-256 |
36a7bab8bba9af65c23a43e7fd52e547cece9073a6214b0d79269888b5928722
|
Provenance
The following attestation bundles were made for rose_py-0.1.2.tar.gz:
Publisher:
test_and_publish.yml on DBinK/rose
-
Statement:
-
Statement type:
https://in-toto.io/Statement/v1 -
Predicate type:
https://docs.pypi.org/attestations/publish/v1 -
Subject name:
rose_py-0.1.2.tar.gz -
Subject digest:
f931c93befbee7de0403fa484d6c7c60b0da758da34e4cf1fe57c96ffa798c80 - Sigstore transparency entry: 1429085267
- Sigstore integration time:
-
Permalink:
DBinK/rose@34a97b08cec113c33ebed061978dca047c4bfd29 -
Branch / Tag:
refs/tags/v0.1.2 - Owner: https://github.com/DBinK
-
Access:
public
-
Token Issuer:
https://token.actions.githubusercontent.com -
Runner Environment:
github-hosted -
Publication workflow:
test_and_publish.yml@34a97b08cec113c33ebed061978dca047c4bfd29 -
Trigger Event:
push
-
Statement type:
File details
Details for the file rose_py-0.1.2-py3-none-any.whl.
File metadata
- Download URL: rose_py-0.1.2-py3-none-any.whl
- Upload date:
- Size: 10.5 kB
- Tags: Python 3
- Uploaded using Trusted Publishing? Yes
- Uploaded via: twine/6.1.0 CPython/3.13.12
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
5447521d318ba345d3b1201b05450f01715bde2de6115cfad5ec1aaa6d64b3ff
|
|
| MD5 |
138c7b2455ec9cb1af0db4c8704a25aa
|
|
| BLAKE2b-256 |
f26f90e4f9c3c8b6949ed6e9ceefb4bae97cbe23b16720286239e36ffe1c9c3f
|
Provenance
The following attestation bundles were made for rose_py-0.1.2-py3-none-any.whl:
Publisher:
test_and_publish.yml on DBinK/rose
-
Statement:
-
Statement type:
https://in-toto.io/Statement/v1 -
Predicate type:
https://docs.pypi.org/attestations/publish/v1 -
Subject name:
rose_py-0.1.2-py3-none-any.whl -
Subject digest:
5447521d318ba345d3b1201b05450f01715bde2de6115cfad5ec1aaa6d64b3ff - Sigstore transparency entry: 1429085270
- Sigstore integration time:
-
Permalink:
DBinK/rose@34a97b08cec113c33ebed061978dca047c4bfd29 -
Branch / Tag:
refs/tags/v0.1.2 - Owner: https://github.com/DBinK
-
Access:
public
-
Token Issuer:
https://token.actions.githubusercontent.com -
Runner Environment:
github-hosted -
Publication workflow:
test_and_publish.yml@34a97b08cec113c33ebed061978dca047c4bfd29 -
Trigger Event:
push
-
Statement type: