Rainbow Robotics 통합 Python SDK — AMR, 매니퓰레이터, RB-Y1 휴머노이드와 Rainbow Robot Service(common) 연동
Project description
rb_sdk
Rainbow Robotics 통합 Python SDK. AMR · Manipulator · RB-Y1 양팔로봇 제어와 Rainbow Robot Service(common) 연동을 단일 API로 제공합니다.
진입점
| 클래스 | 용도 |
|---|---|
RBAmrSDK |
AMR(자율 주행) 로봇 제어 |
RBManipulateSDK |
협동 로봇 매니퓰레이터 제어 |
RBRby1SDK |
RB-Y1 양팔로봇 제어 |
RBBaseSDK |
공통 베이스 (직접 상속하지 않는 한 사용할 일 없음) |
내부적으로 Zenoh pub/sub/query 메시 위에서 동작하며, 같은 PC(로봇 본체)에서 실행할 때는 별도 설정 없이 즉시 사용 가능합니다.
1. 설치
pip install rb_sdk
# RB-Y1 양팔로봇 제어가 필요하면 함께 설치
pip install rby1-sdk
requires-python = ">=3.12,<3.13" — Python 3.12 전용.
2. 기본 사용 (로봇 본체에서 실행하는 경우)
common 서비스와 같은 PC에서 실행되는 스크립트라면 추가 설정 없이 그대로 사용 가능합니다.
from rb_sdk import RBAmrSDK, RBManipulateSDK
amr = RBAmrSDK()
manipulate = RBManipulateSDK()
# 이 시점에 ZenohClient가 127.0.0.1:7447(common)에 자동 연결
manipulate.move.call_move_j(
robot_model="C500920",
target={"tar_values": [0, 0, 0, 0, 0, 0], "tar_frame": 0, "tar_unit": 0},
)
3. 외부 PC에서 사용 (Remote Connect)
개발용 노트북 등 로봇 본체와 다른 머신에서 로봇과 통신하고 싶다면 sdk.connect()를 호출합니다.
import asyncio
from rb_sdk import RBManipulateSDK
async def main():
sdk = RBManipulateSDK()
# 로봇(common 서비스)에 외부 peer로 등록 + 자동 heartbeat 시작
peer_id = await sdk.connect(
token="외부 서비스 Token", # Muscat이 생성한 서비스 Token
host="192.168.1.100", # 로봇 IP
# ttl=30, # 서버 측 세션 TTL(초). heartbeat 끊기면 자동 해제
# zenoh_port=None, # None → OS가 빈 포트 자동 배정 (충돌 없음)
# heartbeat_interval=15.0, # 기본: ttl / 2
)
sdk.move.call_move_j(robot_model="C500920", target={...})
await sdk.disconnect()
asyncio.run(main())
주요 특징
- 로컬 IP 자동 감지:
host방향의 라우팅 인터페이스 IP를 자동으로 찾아 로봇에 알림 - 포트 자동 배정: 별도 지정 없이 OS가 빈 포트를 잡아주므로 포트 충돌 없음
- 자동 heartbeat: 주기적으로 TTL 연장. 스크립트 종료 / 크래시 시 TTL 만료로 서버 측 정리
- 같은 PC 자동 감지:
host가 localhost거나 자기 IP인 경우connect()는 아무것도 하지 않고None반환 (이미 메시 안에 있음)
connect() 인자
| 인자 | 타입 | 기본 | 설명 |
|---|---|---|---|
token |
str |
(필수) | common 서비스 JWT |
host |
str |
(필수) | common이 실행 중인 로봇 IP |
zenoh_port |
int | None |
None |
로컬 Zenoh listen 포트. None이면 자동 |
ttl |
int |
30 |
서버 측 세션 TTL(초) |
heartbeat_interval |
float | None |
None |
heartbeat 주기. None이면 ttl/2 |
mesh_timeout |
float | None |
None |
mesh 형성 대기 최대 시간(초). None이면 무제한 대기 |
mesh_probe_timeout |
float |
1.0 |
probe query 1회 timeout(초) |
mesh_probe_interval |
float |
0.5 |
probe 재시도 간격(초) |
4. use_directly 모드
RBBaseSDK 생성자에 use_directly=True를 전달하면, 메서드 호출 시 {name}_directly 메서드가 존재하면 자동으로 그쪽으로 라우팅됩니다. 없으면 원래 메서드가 그대로 호출됩니다.
class MySDK(RBManipulateSDK):
def call_move_j_directly(self, *, robot_model, target, **kwargs):
# Zenoh를 거치지 않고 직접 하드웨어 제어하는 구현
...
# use_directly=True → call_move_j 호출 시 call_move_j_directly로 라우팅
sdk = MySDK(use_directly=True)
sdk.move.call_move_j(robot_model="C500920", target={...}) # → call_move_j_directly 호출
# use_directly=False (기본) 또는 _directly 메서드 없으면 원래 메서드 사용
sdk2 = MySDK(use_directly=False)
sdk2.move.call_move_j(...) # → call_move_j 호출
싱글톤 인스턴스가 이미 존재해도 use_directly는 매 생성자 호출마다 갱신됩니다.
5. AMR SDK (RBAmrSDK)
from rb_sdk import RBAmrSDK
sdk = RBAmrSDK()
하위 모듈
| 속성 | 클래스 | 주요 기능 |
|---|---|---|
sdk.move |
AmrMoveSDK |
목표 지점 이동, jog, 정지/일시정지/재개, 직선·원호·회전 이동 |
sdk.control |
AmrControlSDK |
안전 필드/플래그/IO 설정, 도킹/언도킹, LED/모터/jog 제어, 장애물 박스 |
sdk.localization |
AmrLocalizationSDK |
위치 초기화, 랜덤/자동/반자동 init, 위치 추정 시작/정지 |
sdk.map |
AmrMapSDK |
맵 목록/로드/삭제, 토폴로지 get/set, 매핑 시작/정지/저장 |
sdk.setting |
AmrSettingSDK |
로봇 타입, 설정 카테고리/파라미터, PDU/드라이브/센서 설정 |
sdk.status |
AmrStatusSDK |
상태·이동 상태·LiDAR·경로 조회 |
sdk.file |
AmrFileSDK |
맵·리소스 파일 관리 |
sdk.capability |
AmrCapabilitySDK |
로봇 능력 조회 |
sdk.accessory |
AmrAccessorySDK |
액세서리 제어 |
sdk.program |
AmrProgramSDK |
AMR 프로그램 실행/제어 |
이동 예시
# 목표 노드로 이동
sdk.move.send_move_goal(
robot_model="AMR001",
goal_type=0,
goal_node_name="node_A",
)
# 이동 결과 대기 (async)
result = await sdk.move.get_move_result(robot_model="AMR001")
# jog 이동
sdk.move.send_move_jog(robot_model="AMR001", vx=0.3, vy=0.0, wz=0.0)
# 멀티 경로
sdk.move.send_multi_path(robot_model="AMR001", path=["node_A", "node_B", "node_C"])
6. Manipulate SDK (RBManipulateSDK)
from rb_sdk import RBManipulateSDK
sdk = RBManipulateSDK()
하위 모듈
| 속성 | 클래스 | 주요 기능 |
|---|---|---|
sdk.move |
ManipulateMoveSDK |
관절/직선/원호 이동, jog, 서보/온라인, 어드미턴스, 위빙, 컨베이어 |
sdk.program |
ManipulateProgramSDK |
프로그램 실행·정지·재개, 핀포인트, 툴 출력 |
sdk.config |
ManipulateConfigSDK |
툴·유저프레임·충돌 파라미터, 속도/impedance/freedrive 설정 |
sdk.io |
ManipulateIOSDK |
side/flange/ext DIO·AIO 출력, 펄스, 비트 조합, IO 함수 설정 |
sdk.get_data |
ManipulateGetDataSDK |
변수 조회, 상대/절대 좌표 계산 |
sdk.point |
ManipulatePointSDK |
포인트·좌표 등록 |
sdk.service |
ManipulateServiceSDK |
미니 탭 기능 |
sdk.maintenance |
ManipulateMaintenanceSDK |
관리자 설정 |
sdk.state |
ManipulateStateSDK |
whoami, 전원·서보·레퍼런스 제어, 실시간 상태 구독 |
move 모듈 — 주요 메서드
| 메서드 | 설명 |
|---|---|
call_move_j |
관절 공간(Joint) 이동 |
call_move_l |
직선(Linear) 이동 |
call_move_cir_axis |
축 기반 원호 이동 |
call_move_cir_threepoint |
3점 기반 원호 이동 |
call_move_home |
Home 위치로 이동 |
call_move_jb_clr/add/run |
Joint 블렌딩 경로 등록·실행 |
call_move_lb_clr/add/run |
Linear 블렌딩 경로 등록·실행 |
call_move_xb_clr/add/run |
X 블렌딩 경로 등록·실행 |
call_smoothjog_j/l |
스무스 jog (관절/직선) |
call_tickjog_j/l |
틱 jog (관절/직선) |
call_approach_j/l |
어프로치 이동 |
call_servo_j/l |
서보 모드 관절/직선 이동 |
call_online_j/l |
온라인 궤적 관절/직선 |
call_admittance_force_on/off/change |
어드미턴스 포스 제어 |
call_arc_sensing_on/off |
아크 센싱 제어 |
call_tcp_weaving_on/off |
TCP 위빙 제어 |
call_base_conveyor_on/off/speed |
베이스 컨베이어 제어 |
이동 예시
# 관절 이동
sdk.move.call_move_j(
robot_model="C500920",
target={"tar_values": [0, 0, 90, 0, 90, 0], "tar_frame": 0, "tar_unit": 1},
speed={"spd_mode": 1, "spd_vel_para": 60, "spd_acc_para": 120},
)
# 직선 이동
sdk.move.call_move_l(
robot_model="C500920",
target={"tar_values": [400, 0, 500, 0, 180, 0], "tar_frame": 1, "tar_unit": 0},
)
# 기준 위치로부터 상대 이동
sdk.move.call_move_j(
robot_model="C500920",
target={"tar_values": [10, 0, 0, 0, 0, 0], "tar_frame": 0, "tar_unit": 1},
reference_value=[0, 0, 90, 0, 90, 0], # 현재 기준값
)
# 전원·서보 제어
sdk.state.call_powercontrol(robot_model="C500920", control=1)
sdk.state.call_servocontrol(robot_model="C500920", control=1)
7. RB-Y1 SDK (RBRby1SDK)
공식 rby1-sdk 래퍼. 자세한 설명은 rby1_sdk/README.md 참조.
from rb_sdk import RBRby1SDK
sdk = RBRby1SDK(endpoint="192.168.30.1:50051", model=None, auto_connect=True)
print(sdk.connected)
print(sdk.whoami())
await sdk.control.power_on(robot_model="a")
await sdk.control.servo_on(robot_model="a")
state = await sdk.state.get_state(robot_model="a")
모든 public 메서드는 async def + 내부 asyncio.to_thread입니다 (공식 SDK가 blocking C++ pybind11이므로).
sync 컨텍스트(PyFM Step 등)에서는 asyncio.run(...) 또는 RBRby1BaseSDK.run_sync(...) 사용.
8. FlowManagerArgs 통합 (PyFM)
PyFM Step에서 SDK 메서드를 호출할 때 flow_manager_args를 전달하면 done 콜백 처리, 이동 완료 대기, 변수 바인딩 등이 자동으로 처리됩니다.
# PyFM Step 내부
def execute(self, flow_manager_args):
sdk.move.call_move_j(
robot_model="C500920",
target={"tar_values": [0, 0, 0, 0, 0, 0], "tar_frame": 0, "tar_unit": 0},
flow_manager_args=flow_manager_args, # done() 콜백 자동 호출
)
flow_manager_args를 생략하면 블로킹 호출로 동작합니다.
9. 아키텍처 메모
프로세스 단위 싱글톤
RBBaseSDK(RBAmrSDK, RBManipulateSDK)는 (pid, cls) 기준 싱글톤입니다. 같은 프로세스에서 여러 번 생성자를 호출해도 동일 인스턴스가 반환됩니다. ZenohClient도 PID당 하나를 공유합니다.
a = RBManipulateSDK()
b = RBManipulateSDK()
assert a is b # True
자동 에러 래핑
RBBaseSDK를 상속한 클래스의 모든 public instance method는 __init_subclass__에 의해 자동으로 공통 try/except로 래핑됩니다. ZenohNoReply / ZenohTransportError 발생 시 자동 재연결 후 1회 재시도합니다.
SDK 종료
sdk.close() # 현재 인스턴스 정리
RBBaseSDK.close_all_for_pid() # 현재 프로세스의 모든 SDK 인스턴스 정리
10. 주의사항
RBRby1SDK는 최초endpoint/model기준으로 세션이 공유됩니다.- 연결 대상(zenoh, rby1 endpoint, Muscat HTTP)이 준비되지 않으면 런타임 예외가 발생합니다.
- 외부 PC에서 SDK 사용 시 방화벽이 7447/TCP(zenoh)와 8000/TCP을 양방향으로 허용해야 합니다.
use_directly=True는 싱글톤 재진입 시에도 반영됩니다. 단, 하위 SDK(sdk.move등)에는 전파되지 않으므로_directly메서드는 최상위 SDK에 정의해야 합니다.
11. 라이선스 / 문의
© Rainbow Robotics. 내부 이슈/문의: GitHub Issues
Project details
Release history Release notifications | RSS feed
Download files
Download the file for your platform. If you're not sure which to choose, learn more about installing packages.
Source Distribution
Built Distribution
Filter files by name, interpreter, ABI, and platform.
If you're not sure about the file name format, learn more about wheel file names.
Copy a direct link to the current filters
File details
Details for the file rainbow_rb_sdk-0.0.9.dev12.tar.gz.
File metadata
- Download URL: rainbow_rb_sdk-0.0.9.dev12.tar.gz
- Upload date:
- Size: 87.3 kB
- Tags: Source
- Uploaded using Trusted Publishing? No
- Uploaded via: twine/6.2.0 CPython/3.12.5
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
d4a3bf99082207ac49ec4a66fa5ad45a5e8eeff09f9695696855e749a0cf7f48
|
|
| MD5 |
5a2090f0253f364d2394cdebb11240b7
|
|
| BLAKE2b-256 |
1e6e0b8618300d48f222193283c9a22ec08d0a49a47c9f523ebf8bb1f0481c8f
|
File details
Details for the file rainbow_rb_sdk-0.0.9.dev12-py3-none-any.whl.
File metadata
- Download URL: rainbow_rb_sdk-0.0.9.dev12-py3-none-any.whl
- Upload date:
- Size: 104.2 kB
- Tags: Python 3
- Uploaded using Trusted Publishing? No
- Uploaded via: twine/6.2.0 CPython/3.12.5
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
564e7fa0ec643945c8cce0413b3685503d534ba4bfe0f6d111fec48c8d8f221d
|
|
| MD5 |
7446a4bbce85b5256af2d1fb77ac98cf
|
|
| BLAKE2b-256 |
193b58acd916d2a53249c0a5e7c563d6f30745bd835b01d9909923648b684571
|