Rainbow Zenoh Python
Project description
rb_zenoh
rb_zenoh는 Zenoh 기반의 공통 통신 패키지입니다.
주요 기능은 publish/subscribe, query/queryable, ZenohRouter 데코레이터 라우팅입니다.
설치/의존성
- Python:
>=3.12,<3.13 - 패키지 의존성:
flatbuffers,psutil,rb_utils,rb_modules
핵심 객체
ZenohClient: 저수준 통신 APIZenohRouter: 데코레이터 기반 라우터SubscribeOptions: subscribe 동작 옵션
1) publish
from rb_zenoh.client import ZenohClient
client = ZenohClient()
client.publish("muscat/sample/topic", payload={"ok": True})
FlatBuffer 요청 publish:
from rb_flat_buffers.IPC.Request_MotionPause import Request_MotionPauseT
req = Request_MotionPauseT()
client.publish(
"C500920/call_pause",
flatbuffer_req_obj=req,
flatbuffer_buf_size=64,
)
2) subscribe
from rb_zenoh.client import ZenohClient
from rb_zenoh.schema import SubscribeOptions
client = ZenohClient()
async def on_msg(*, topic, mv, obj_payload, dict_payload, attachment):
print(topic, obj_payload, dict_payload, attachment)
client.subscribe(
"muscat/sample/topic",
on_msg,
options=SubscribeOptions(dispatch="immediate"),
)
콜백 시그니처:
topic: topic 문자열mv: raw payload(memoryview)obj_payload:flatbuffer_obj_t=<FlatBuffer T class>파싱 객체 (T | None)dict_payload:flatbuffer_obj_t유무와 무관하게 dict 변환 결과 (dict | None)attachment:sender,sender_id정보
3) queryable
def on_query(req=None, params=None):
return {"status": "ok"}
client.queryable("muscat/sample/query", on_query)
콜백 인자 의미:
req: query payload 본문flatbuffer_req_t/flatbuffer_req_T_class를 지정한 경우에만 주입됩니다.- 내부에서
InitFromPackedBuf(...)로 파싱한 FlatBuffer...T객체입니다. - 요청 payload가 없는데
flatbuffer_req_t를 지정하면 에러가 발생합니다.
params: query parameter- 내부
q.parameters값을dict[str, str]형태로 주입합니다. - payload(
req)와 별도로 필터/옵션 전달에 사용합니다.
- 내부
주의사항:
- 콜백 파라미터 이름이 정확히
req,params일 때만 자동 주입됩니다. request,query_params같은 다른 이름으로 선언하면 주입되지 않습니다.- 둘 다 선언하지 않으면 인자 없이 호출됩니다.
4) query_one / query_all
query_one
res = client.query_one("muscat/program/state", timeout=0.3)
print(res.get("dict_payload"))
- 첫 응답 1개만 반환
- 응답이 없으면
ZenohNoReply예외
query_all
res_list = client.query_all("*/health", timeout=0.5)
for item in res_list:
print(item.get("key"), item.get("dict_payload"))
- timeout 내 도착한 응답을 모두 list로 반환
- 응답이 없어도 빈 리스트 반환
언제 무엇을 쓸지
- 단일 대상 호출:
query_one - 와일드카드/다중 서비스 수집:
query_all
FlatBuffer 파라미터 상세
flatbuffer_req_obj
- 의미: 요청으로 보낼 FlatBuffer 객체(
Pack()가능한 객체) - 사용 위치:
publish,query_one,query_all - 내부 동작: builder로 pack 후 bytes payload 전송
flatbuffer_buf_size
- 의미: FlatBuffer 직렬화 builder 초기 버퍼 크기
- 사용 위치:
- 요청 직렬화:
publish,query_one,query_all - queryable 응답 직렬화:
queryable(..., flatbuffer_res_buf_size=...)
- 요청 직렬화:
- 너무 작으면 직렬화 실패 가능
- 권장: 64/128/256/512부터 시작해 payload 크기에 맞게 상향
flatbuffer_res_T_class
- 의미: 응답 payload를 어떤 FlatBuffer 타입으로 파싱할지 지정
- 사용 위치:
query_one,query_all - 타입: FlatBuffer Object API 클래스(T 클래스) 를 넘겨야 함
- 예:
Response_FunctionsT,State_CoreT - 보통 이름이
...T로 끝나는 클래스 - 내부적으로
InitFromPackedBuf(...)를 통해 파싱됨
- 예:
- 결과:
- 지정 시:
obj_payload에 파싱 객체 - 미지정 시:
dict_payload/raw payload 중심 사용
- 지정 시:
예시:
from rb_flat_buffers.IPC.Request_MotionPause import Request_MotionPauseT
from rb_flat_buffers.IPC.Response_Functions import Response_FunctionsT
res = client.query_one(
"C500920/call_pause",
flatbuffer_req_obj=Request_MotionPauseT(),
flatbuffer_res_T_class=Response_FunctionsT,
flatbuffer_buf_size=64,
timeout=0.3,
)
obj = res.get("obj_payload")
5) ZenohRouter 사용
from rb_zenoh.router import ZenohRouter
router = ZenohRouter(prefix="muscat/common")
@router.subscribe("health")
async def on_health(*, topic, obj_payload, dict_payload, attachment):
pass
@router.queryable("echo")
async def on_echo(params=None):
return {"ok": True}
라이프사이클:
- 시작 시
await router.startup() - 종료 시
await router.shutdown()
예외/주의사항
query_oneno-reply:ZenohNoReply- transport 재연결 계열:
ZenohTransportError ZenohClient는 프로세스 단위 싱글톤이므로 불필요한 잦은close()호출은 피하는 것을 권장
Project details
Release history Release notifications | RSS feed
Download files
Download the file for your platform. If you're not sure which to choose, learn more about installing packages.
Source Distribution
Built Distribution
Filter files by name, interpreter, ABI, and platform.
If you're not sure about the file name format, learn more about wheel file names.
Copy a direct link to the current filters
File details
Details for the file rainbow_rb_zenoh-0.0.9.dev10.tar.gz.
File metadata
- Download URL: rainbow_rb_zenoh-0.0.9.dev10.tar.gz
- Upload date:
- Size: 26.8 kB
- Tags: Source
- Uploaded using Trusted Publishing? No
- Uploaded via: twine/6.2.0 CPython/3.12.5
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
2dc8743bfef0f7f7b17748531ff88682ca9bacd84fbd877d10deff59f43a26d7
|
|
| MD5 |
43cf5a675062a57afc79724c7e674156
|
|
| BLAKE2b-256 |
173e2b137ff1e217df4a0f278b243aec689705f7a32ef08c5a53783f27cba608
|
File details
Details for the file rainbow_rb_zenoh-0.0.9.dev10-py3-none-any.whl.
File metadata
- Download URL: rainbow_rb_zenoh-0.0.9.dev10-py3-none-any.whl
- Upload date:
- Size: 26.3 kB
- Tags: Python 3
- Uploaded using Trusted Publishing? No
- Uploaded via: twine/6.2.0 CPython/3.12.5
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
387331e50980170a883f7e57ca130e00bddbb81c5d00c7a59ff31fbfd2662de5
|
|
| MD5 |
bdfe6c9d229dcac78c3b979c2e902642
|
|
| BLAKE2b-256 |
0a1c1c85fc776e90c4119dae7faf95e5174365ebe4b08c92769df3daf0eceb52
|