A lightweight ROS2-like framework based on Zenoh — no Ubuntu lock-in, no compilation needed, no .msg files. 一个类似 ROS 的基于 Zenoh 的轻量级 发布/订阅 与 RPC 通信框架, 不挑平台、无需编译、没有 .msg
Project description
Rose🌹: Robotics On Simple Engine
ROS compiles. Rose runs. ROS 还在编译,Rose 已经上线
Rose 是一个基于 Zenoh 的轻量级 发布/订阅 与 RPC 通信框架。 它借鉴了 ROS2 的节点(Node)、话题(Topic)、服务(Service)概念,但完全运行在纯 Python 生态中 —— 不挑平台、无需编译、没有 .msg。
为什么不用 ROS?
| 痛点 | ROS | Rose ✅ |
|---|---|---|
| 安装 | 挑系统和版本,依赖复杂,安装一次半小时 | pip install rose-py,一行秒装 |
| 开发体验 | C++ 改代码等编译,Python 节点 LSP 基本残废 | 纯 Python,改完即跑,IDE 完整类型提示 |
| 消息定义 | 手写 .msg 文件,无类型提示,还要额外生成代码 |
原生 Python 类,自动补全 + 静态类型检查 |
| 通信引擎 | 依赖 DDS 栈,部署臃肿,配置繁琐 | 基于 Zenoh,零额外运行时,开箱即用 |
设计理念
Rose 提供一种开箱即用的分布式通信体验——
- 轻量核心: 核心依赖仅
zenoh+msgspec,无 DDS、无 ROS 生态包袱 - 类型安全: 基于
msgspec.Struct定义消息,自动 msgpack 序列化,IDE 和运行时双重类型校验 - 声明式 API: 通过
Node工厂方法创建发布者/订阅者/服务/客户端,API 清晰一致 - 自发现拓扑: 利用 Zenoh Liveliness Token 实现节点/话题/服务的自动发现,并提供
rrrCLI 工具查看网络拓扑
安装
pip install rose-py
或者使用 uv:
uv add rose-py
使用文档
可查阅 Zread 为本项目生成的 文档
快速上手
所有示例代码位于 examples/ 目录,可直接运行, 此处仅展示核心代码片段。
发布/订阅 (Pub/Sub)
# 终端 A — 传感器发布者
uv run python examples/pub_sub/pub.py
# 终端 B — 日志订阅者(回调模式)
uv run python examples/pub_sub/sub_callback.py
# 或轮询模式
uv run python examples/pub_sub/sub_polling.py
① 定义消息 — examples/pub_sub/msg.py
from rose import Message
class EnvSensorData(Message):
"""环境传感器数据"""
temperature: float
humidity: float
消息继承
msgspec.Struct,自带header.timestamp(自动时间戳)和header.frame_id,子类只需添加业务字段。
② 发布者 — examples/pub_sub/pub.py
from rose import Node
from msg import EnvSensorData
node = Node("sensor_hub_1")
pub = node.create_publisher("room_a/sensor/env", EnvSensorData)
msg = EnvSensorData(temperature=25.0, humidity=60.5)
pub.publish(msg) # 类型不匹配自动抛 TypeError
③ 订阅者(回调模式) — examples/pub_sub/sub_callback.py
def on_sensor_data(msg: EnvSensorData, source_key: str) -> None:
print(f"收到来自 [{source_key}] 的数据:", msg)
node.create_subscriber("room_a/sensor/env", EnvSensorData, on_sensor_data)
node.spin() # 保持节点运行
④ 订阅者(轮询模式) — examples/pub_sub/sub_polling.py
sub = node.create_subscriber("room_a/sensor/env", EnvSensorData)
while True:
msg, key = sub.recv()
print(f"收到 [{key}]:", msg)
RPC(服务/客户端)
# 终端 A — 数学服务端
uv run python examples/rpc/server.py
# 终端 B — 客户端调用
uv run python examples/rpc/client.py
① 定义请求与响应消息 — examples/rpc/msg.py
from rose import Message
class AddIntsReq(Message):
a: int
b: int
class AddIntsRes(Message):
sum: int
② 服务端 — examples/rpc/server.py
def handle_add(req: AddIntsReq) -> AddIntsRes:
return AddIntsRes(sum=req.a + req.b)
node = Node("math_server")
node.create_service("math/add", AddIntsReq, AddIntsRes, handle_add)
node.spin()
③ 客户端 — examples/rpc/client.py
client = node.create_client("math/add", AddIntsReq, AddIntsRes)
if client.wait_for_service(timeout=3):
res = client.call(AddIntsReq(a=10, b=20))
print(res) # AddIntsRes(sum=30)
客户端通过 Liveliness Token 自动探测服务是否就绪,超时抛出
TimeoutError。
API 概览
Node
节点是通信的核心容器,每个 Node 内部维护一个 Zenoh Session。
| 方法 | 描述 |
|---|---|
Node(name) |
创建节点,自动建立 Zenoh 会话 |
node.spin() |
保持节点运行,直到 Ctrl+C |
node.close() |
优雅关闭节点,释放资源 |
支持 with 语句自动管理生命周期:
with Node("my_node") as node:
# 做点什么
pass # 自动 close()
Message
所有消息的基类,继承 msgspec.Struct,自带 Header 字段:
header.timestamp— 自动填入当前时间戳header.frame_id— 坐标系 ID(如"base_link")
子类只需添加业务字段:
class MyMsg(Message):
value: float
Publisher[MsgType]
pub = node.create_publisher(key_expr, MsgClass)
pub.publish(msg)
- 发布时自动做类型校验,类型不匹配抛出
TypeError - 自动注册 Liveliness Token
Subscriber[MsgType]
# 回调模式
sub = node.create_subscriber(key_expr, MsgClass, callback)
# 轮询模式
sub = node.create_subscriber(key_expr, MsgClass)
msg, key = sub.recv(timeout=2.0)
Service[ReqType, ResType]
service = node.create_service(key_expr, ReqClass, ResClass, handler)
handler接收请求消息,返回响应消息- 异常自动捕获并返回错误信息给客户端
Client[ReqType, ResType]
client = node.create_client(key_expr, ReqClass, ResClass)
if client.wait_for_service(timeout=3):
res = client.call(request, timeout=2.0)
wait_for_service()— 通过 Liveliness Token 探测服务端call()— 同步阻塞调用,超时抛出TimeoutError
内置基础类型
Rose 内置了一些类 ROS 的几何基础类型:
| 类型 | 说明 |
|---|---|
Vector3 |
三维向量 (x, y, z),提供 to_tuple() / from_tuple() |
Quaternion |
四元数 (x, y, z, w),默认单位四元数 w=1 |
Pose |
位姿,组合 position: Vector3 和 orientation: Quaternion |
CLI 诊断工具
Rose 提供了一个 CLI 工具 rrr(Rose 网络诊断),用于查看当前网络的拓扑结构:
# 查看所有节点/话题/服务(完整拓扑)
rrr ls
# 列出所有节点
rrr node list
# 查看节点详情
rrr node info sensor_hub_1
# 列出所有话题
rrr topic list
# 查看话题详情
rrr topic info room_a/sensor/env
# 列出所有服务
rrr service list
# 查看服务详情
rrr service info math/add
项目结构
rose/
├── src/rose/ # 核心库
│ ├── __init__.py # 导出 Node, Message
│ ├── message.py # 消息基类 + 几何基础类型
│ ├── node.py # Node + Publisher/Subscriber/Service/Client
│ └── probe.py # CLI 诊断工具
├── examples/ # 完整可运行示例
│ ├── pub_sub/ # 发布/订阅
│ │ ├── msg.py
│ │ ├── pub.py
│ │ ├── sub_callback.py
│ │ └── sub_polling.py
│ └── rpc/ # RPC 服务/客户端
│ ├── msg.py
│ ├── server.py
│ └── client.py
└── tests/ # 测试套件
├── test_message.py
├── test_node.py
├── test_probe.py
└── benchmark_*.py # 性能基准测试
依赖
eclipse-zenoh >= 1.9.0msgspec >= 0.21.1typer >= 0.25.1loguru >= 0.7.3rich >= 15.0.0
性能基准
详见 BENCHMARK.md
License
MIT
Project details
Download files
Download the file for your platform. If you're not sure which to choose, learn more about installing packages.
Source Distribution
Built Distribution
Filter files by name, interpreter, ABI, and platform.
If you're not sure about the file name format, learn more about wheel file names.
Copy a direct link to the current filters
File details
Details for the file rose_py-0.1.3.tar.gz.
File metadata
- Download URL: rose_py-0.1.3.tar.gz
- Upload date:
- Size: 10.9 kB
- Tags: Source
- Uploaded using Trusted Publishing? Yes
- Uploaded via: twine/6.1.0 CPython/3.13.12
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
25d700c89c39c7c5d77dc027b2236869d177cc474b34e61b737f48fb7742327d
|
|
| MD5 |
4bde72f78162ee2ba01f39c7d9685632
|
|
| BLAKE2b-256 |
701718f725211b2e5f4001b313c8863a1bb8ca193903908e22f806b8c4934006
|
Provenance
The following attestation bundles were made for rose_py-0.1.3.tar.gz:
Publisher:
test_and_publish.yml on DBinK/rose
-
Statement:
-
Statement type:
https://in-toto.io/Statement/v1 -
Predicate type:
https://docs.pypi.org/attestations/publish/v1 -
Subject name:
rose_py-0.1.3.tar.gz -
Subject digest:
25d700c89c39c7c5d77dc027b2236869d177cc474b34e61b737f48fb7742327d - Sigstore transparency entry: 1429593095
- Sigstore integration time:
-
Permalink:
DBinK/rose@d45e5705d33d539699d01dc41f5feb3b5aac1a11 -
Branch / Tag:
refs/tags/v0.1.3 - Owner: https://github.com/DBinK
-
Access:
public
-
Token Issuer:
https://token.actions.githubusercontent.com -
Runner Environment:
github-hosted -
Publication workflow:
test_and_publish.yml@d45e5705d33d539699d01dc41f5feb3b5aac1a11 -
Trigger Event:
push
-
Statement type:
File details
Details for the file rose_py-0.1.3-py3-none-any.whl.
File metadata
- Download URL: rose_py-0.1.3-py3-none-any.whl
- Upload date:
- Size: 12.6 kB
- Tags: Python 3
- Uploaded using Trusted Publishing? Yes
- Uploaded via: twine/6.1.0 CPython/3.13.12
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
5c5bf0d7e927211a813190735498ddf1663007a98b62b9158fc234e8a4571df9
|
|
| MD5 |
82a8cb91e2990aa48607109deff55710
|
|
| BLAKE2b-256 |
17e6058503131e6bf4c1b526a0b2c503c64d61ab342deaebfbeac8d70ddb11ea
|
Provenance
The following attestation bundles were made for rose_py-0.1.3-py3-none-any.whl:
Publisher:
test_and_publish.yml on DBinK/rose
-
Statement:
-
Statement type:
https://in-toto.io/Statement/v1 -
Predicate type:
https://docs.pypi.org/attestations/publish/v1 -
Subject name:
rose_py-0.1.3-py3-none-any.whl -
Subject digest:
5c5bf0d7e927211a813190735498ddf1663007a98b62b9158fc234e8a4571df9 - Sigstore transparency entry: 1429593100
- Sigstore integration time:
-
Permalink:
DBinK/rose@d45e5705d33d539699d01dc41f5feb3b5aac1a11 -
Branch / Tag:
refs/tags/v0.1.3 - Owner: https://github.com/DBinK
-
Access:
public
-
Token Issuer:
https://token.actions.githubusercontent.com -
Runner Environment:
github-hosted -
Publication workflow:
test_and_publish.yml@d45e5705d33d539699d01dc41f5feb3b5aac1a11 -
Trigger Event:
push
-
Statement type: