Skip to main content

Rainbow Zenoh Python

Project description

rb_zenoh

rb_zenoh는 Zenoh 기반의 공통 통신 패키지입니다. 주요 기능은 publish/subscribe, query/queryable, ZenohRouter 데코레이터 라우팅입니다.

설치/의존성

  • Python: >=3.12,<3.13
  • 패키지 의존성: flatbuffers, psutil, rb_utils, rb_modules

핵심 객체

  • ZenohClient: 저수준 통신 API
  • ZenohRouter: 데코레이터 기반 라우터
  • SubscribeOptions: subscribe 동작 옵션

1) publish

from rb_zenoh.client import ZenohClient

client = ZenohClient()
client.publish("muscat/sample/topic", payload={"ok": True})

FlatBuffer 요청 publish:

from rb_flat_buffers.IPC.Request_MotionPause import Request_MotionPauseT

req = Request_MotionPauseT()
client.publish(
    "C500920/call_pause",
    flatbuffer_req_obj=req,
    flatbuffer_buf_size=64,
)

2) subscribe

from rb_zenoh.client import ZenohClient
from rb_zenoh.schema import SubscribeOptions

client = ZenohClient()

async def on_msg(*, topic, mv, obj_payload, dict_payload, attachment):
    print(topic, obj_payload, dict_payload, attachment)

client.subscribe(
    "muscat/sample/topic",
    on_msg,
    options=SubscribeOptions(dispatch="immediate"),
)

콜백 시그니처:

  • topic: topic 문자열
  • mv: raw payload(memoryview)
  • obj_payload: flatbuffer_obj_t=<FlatBuffer T class> 파싱 객체 (T | None)
  • dict_payload: flatbuffer_obj_t 유무와 무관하게 dict 변환 결과 (dict | None)
  • attachment: sender, sender_id 정보

3) queryable

def on_query(req=None, params=None):
    return {"status": "ok"}

client.queryable("muscat/sample/query", on_query)

콜백 인자 의미:

  • req: query payload 본문
    • flatbuffer_req_t/flatbuffer_req_T_class를 지정한 경우에만 주입됩니다.
    • 내부에서 InitFromPackedBuf(...)로 파싱한 FlatBuffer ...T 객체입니다.
    • 요청 payload가 없는데 flatbuffer_req_t를 지정하면 에러가 발생합니다.
  • params: query parameter
    • 내부 q.parameters 값을 dict[str, str] 형태로 주입합니다.
    • payload(req)와 별도로 필터/옵션 전달에 사용합니다.

주의사항:

  • 콜백 파라미터 이름이 정확히 req, params일 때만 자동 주입됩니다.
  • request, query_params 같은 다른 이름으로 선언하면 주입되지 않습니다.
  • 둘 다 선언하지 않으면 인자 없이 호출됩니다.

4) query_one / query_all

query_one

res = client.query_one("muscat/program/state", timeout=0.3)
print(res.get("dict_payload"))
  • 첫 응답 1개만 반환
  • 응답이 없으면 ZenohNoReply 예외

query_all

res_list = client.query_all("*/health", timeout=0.5)
for item in res_list:
    print(item.get("key"), item.get("dict_payload"))
  • timeout 내 도착한 응답을 모두 list로 반환
  • 응답이 없어도 빈 리스트 반환

언제 무엇을 쓸지

  • 단일 대상 호출: query_one
  • 와일드카드/다중 서비스 수집: query_all

FlatBuffer 파라미터 상세

flatbuffer_req_obj

  • 의미: 요청으로 보낼 FlatBuffer 객체(Pack() 가능한 객체)
  • 사용 위치: publish, query_one, query_all
  • 내부 동작: builder로 pack 후 bytes payload 전송

flatbuffer_buf_size

  • 의미: FlatBuffer 직렬화 builder 초기 버퍼 크기
  • 사용 위치:
    • 요청 직렬화: publish, query_one, query_all
    • queryable 응답 직렬화: queryable(..., flatbuffer_res_buf_size=...)
  • 너무 작으면 직렬화 실패 가능
  • 권장: 64/128/256/512부터 시작해 payload 크기에 맞게 상향

flatbuffer_res_T_class

  • 의미: 응답 payload를 어떤 FlatBuffer 타입으로 파싱할지 지정
  • 사용 위치: query_one, query_all
  • 타입: FlatBuffer Object API 클래스(T 클래스) 를 넘겨야 함
    • 예: Response_FunctionsT, State_CoreT
    • 보통 이름이 ...T로 끝나는 클래스
    • 내부적으로 InitFromPackedBuf(...)를 통해 파싱됨
  • 결과:
    • 지정 시: obj_payload에 파싱 객체
    • 미지정 시: dict_payload/raw payload 중심 사용

예시:

from rb_flat_buffers.IPC.Request_MotionPause import Request_MotionPauseT
from rb_flat_buffers.IPC.Response_Functions import Response_FunctionsT

res = client.query_one(
    "C500920/call_pause",
    flatbuffer_req_obj=Request_MotionPauseT(),
    flatbuffer_res_T_class=Response_FunctionsT,
    flatbuffer_buf_size=64,
    timeout=0.3,
)
obj = res.get("obj_payload")

5) ZenohRouter 사용

from rb_zenoh.router import ZenohRouter

router = ZenohRouter(prefix="muscat/common")

@router.subscribe("health")
async def on_health(*, topic, obj_payload, dict_payload, attachment):
    pass

@router.queryable("echo")
async def on_echo(params=None):
    return {"ok": True}

라이프사이클:

  • 시작 시 await router.startup()
  • 종료 시 await router.shutdown()

예외/주의사항

  • query_one no-reply: ZenohNoReply
  • transport 재연결 계열: ZenohTransportError
  • ZenohClient는 프로세스 단위 싱글톤이므로 불필요한 잦은 close() 호출은 피하는 것을 권장

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

rainbow_rb_zenoh-0.0.9.dev10.tar.gz (26.8 kB view details)

Uploaded Source

Built Distribution

If you're not sure about the file name format, learn more about wheel file names.

rainbow_rb_zenoh-0.0.9.dev10-py3-none-any.whl (26.3 kB view details)

Uploaded Python 3

File details

Details for the file rainbow_rb_zenoh-0.0.9.dev10.tar.gz.

File metadata

  • Download URL: rainbow_rb_zenoh-0.0.9.dev10.tar.gz
  • Upload date:
  • Size: 26.8 kB
  • Tags: Source
  • Uploaded using Trusted Publishing? No
  • Uploaded via: twine/6.2.0 CPython/3.12.5

File hashes

Hashes for rainbow_rb_zenoh-0.0.9.dev10.tar.gz
Algorithm Hash digest
SHA256 2dc8743bfef0f7f7b17748531ff88682ca9bacd84fbd877d10deff59f43a26d7
MD5 43cf5a675062a57afc79724c7e674156
BLAKE2b-256 173e2b137ff1e217df4a0f278b243aec689705f7a32ef08c5a53783f27cba608

See more details on using hashes here.

File details

Details for the file rainbow_rb_zenoh-0.0.9.dev10-py3-none-any.whl.

File metadata

File hashes

Hashes for rainbow_rb_zenoh-0.0.9.dev10-py3-none-any.whl
Algorithm Hash digest
SHA256 387331e50980170a883f7e57ca130e00bddbb81c5d00c7a59ff31fbfd2662de5
MD5 bdfe6c9d229dcac78c3b979c2e902642
BLAKE2b-256 0a1c1c85fc776e90c4119dae7faf95e5174365ebe4b08c92769df3daf0eceb52

See more details on using hashes here.

Supported by

AWS Cloud computing and Security Sponsor Datadog Monitoring Depot Continuous Integration Fastly CDN Google Download Analytics Pingdom Monitoring Sentry Error logging StatusPage Status page