Skip to main content

Rainbow Robotics 통합 Python SDK — AMR, 매니퓰레이터, RB-Y1 휴머노이드와 Rainbow Robot Service(common) 연동

Project description

rb_sdk

Rainbow Robotics 통합 Python SDK. AMR · Manipulator · RB-Y1 양팔로봇 제어와 Rainbow Robot Service(common) 연동을 단일 API로 제공합니다.

진입점

클래스 용도
RBAmrSDK AMR(자율 주행) 로봇 제어
RBManipulateSDK 협동 로봇 매니퓰레이터 제어
RBRby1SDK RB-Y1 양팔로봇 제어
RBBaseSDK 공통 베이스 (직접 상속하지 않는 한 사용할 일 없음)

내부적으로 Zenoh pub/sub/query 메시 위에서 동작하며, 같은 PC(로봇 본체)에서 실행할 때는 별도 설정 없이 즉시 사용 가능합니다.


1. 설치

pip install rb_sdk
# RB-Y1 양팔로봇 제어가 필요하면 함께 설치
pip install rby1-sdk

requires-python = ">=3.12,<3.13" — Python 3.12 전용.


2. 기본 사용 (로봇 본체에서 실행하는 경우)

common 서비스와 같은 PC에서 실행되는 스크립트라면 추가 설정 없이 그대로 사용 가능합니다.

from rb_sdk import RBAmrSDK, RBManipulateSDK

amr = RBAmrSDK()
manipulate = RBManipulateSDK()

# 이 시점에 ZenohClient가 127.0.0.1:7447(common)에 자동 연결
manipulate.move.call_move_j(
    robot_model="C500920",
    target={"tar_values": [0, 0, 0, 0, 0, 0], "tar_frame": 0, "tar_unit": 0},
)

3. 외부 PC에서 사용 (Remote Connect)

개발용 노트북 등 로봇 본체와 다른 머신에서 로봇과 통신하고 싶다면 sdk.connect()를 호출합니다.

import asyncio
from rb_sdk import RBManipulateSDK

async def main():
    sdk = RBManipulateSDK()

    # 로봇(common 서비스)에 외부 peer로 등록 + 자동 heartbeat 시작
    peer_id = await sdk.connect(
        token="외부 서비스 Token",     # Muscat이 생성한 서비스 Token
        host="192.168.1.100",       # 로봇 IP
        # ttl=30,                   # 서버 측 세션 TTL(초). heartbeat 끊기면 자동 해제
        # zenoh_port=None,          # None → OS가 빈 포트 자동 배정 (충돌 없음)
        # heartbeat_interval=15.0,  # 기본: ttl / 2
    )

    sdk.move.call_move_j(robot_model="C500920", target={...})

    await sdk.disconnect()

asyncio.run(main())

주요 특징

  • 로컬 IP 자동 감지: host 방향의 라우팅 인터페이스 IP를 자동으로 찾아 로봇에 알림
  • 포트 자동 배정: 별도 지정 없이 OS가 빈 포트를 잡아주므로 포트 충돌 없음
  • 자동 heartbeat: 주기적으로 TTL 연장. 스크립트 종료 / 크래시 시 TTL 만료로 서버 측 정리
  • 같은 PC 자동 감지: host가 localhost거나 자기 IP인 경우 connect()는 아무것도 하지 않고 None 반환 (이미 메시 안에 있음)

connect() 인자

인자 타입 기본 설명
token str (필수) common 서비스 JWT
host str (필수) common이 실행 중인 로봇 IP
zenoh_port int | None None 로컬 Zenoh listen 포트. None이면 자동
ttl int 30 서버 측 세션 TTL(초)
heartbeat_interval float | None None heartbeat 주기. None이면 ttl/2
mesh_timeout float | None None mesh 형성 대기 최대 시간(초). None이면 무제한 대기
mesh_probe_timeout float 1.0 probe query 1회 timeout(초)
mesh_probe_interval float 0.5 probe 재시도 간격(초)

4. use_directly 모드

RBBaseSDK 생성자에 use_directly=True를 전달하면, 메서드 호출 시 {name}_directly 메서드가 존재하면 자동으로 그쪽으로 라우팅됩니다. 없으면 원래 메서드가 그대로 호출됩니다.

class MySDK(RBManipulateSDK):
    def call_move_j_directly(self, *, robot_model, target, **kwargs):
        # Zenoh를 거치지 않고 직접 하드웨어 제어하는 구현
        ...

# use_directly=True → call_move_j 호출 시 call_move_j_directly로 라우팅
sdk = MySDK(use_directly=True)
sdk.move.call_move_j(robot_model="C500920", target={...})  # → call_move_j_directly 호출

# use_directly=False (기본) 또는 _directly 메서드 없으면 원래 메서드 사용
sdk2 = MySDK(use_directly=False)
sdk2.move.call_move_j(...)  # → call_move_j 호출

싱글톤 인스턴스가 이미 존재해도 use_directly는 매 생성자 호출마다 갱신됩니다.


5. AMR SDK (RBAmrSDK)

from rb_sdk import RBAmrSDK

sdk = RBAmrSDK()

하위 모듈

속성 클래스 주요 기능
sdk.move AmrMoveSDK 목표 지점 이동, jog, 정지/일시정지/재개, 직선·원호·회전 이동
sdk.control AmrControlSDK 안전 필드/플래그/IO 설정, 도킹/언도킹, LED/모터/jog 제어, 장애물 박스
sdk.localization AmrLocalizationSDK 위치 초기화, 랜덤/자동/반자동 init, 위치 추정 시작/정지
sdk.map AmrMapSDK 맵 목록/로드/삭제, 토폴로지 get/set, 매핑 시작/정지/저장
sdk.setting AmrSettingSDK 로봇 타입, 설정 카테고리/파라미터, PDU/드라이브/센서 설정
sdk.status AmrStatusSDK 상태·이동 상태·LiDAR·경로 조회
sdk.file AmrFileSDK 맵·리소스 파일 관리
sdk.capability AmrCapabilitySDK 로봇 능력 조회
sdk.accessory AmrAccessorySDK 액세서리 제어
sdk.program AmrProgramSDK AMR 프로그램 실행/제어

이동 예시

# 목표 노드로 이동
sdk.move.send_move_goal(
    robot_model="AMR001",
    goal_type=0,
    goal_node_name="node_A",
)

# 이동 결과 대기 (async)
result = await sdk.move.get_move_result(robot_model="AMR001")

# jog 이동
sdk.move.send_move_jog(robot_model="AMR001", vx=0.3, vy=0.0, wz=0.0)

# 멀티 경로
sdk.move.send_multi_path(robot_model="AMR001", path=["node_A", "node_B", "node_C"])

6. Manipulate SDK (RBManipulateSDK)

from rb_sdk import RBManipulateSDK

sdk = RBManipulateSDK()

하위 모듈

속성 클래스 주요 기능
sdk.move ManipulateMoveSDK 관절/직선/원호 이동, jog, 서보/온라인, 어드미턴스, 위빙, 컨베이어
sdk.program ManipulateProgramSDK 프로그램 실행·정지·재개, 핀포인트, 툴 출력
sdk.config ManipulateConfigSDK 툴·유저프레임·충돌 파라미터, 속도/impedance/freedrive 설정
sdk.io ManipulateIOSDK side/flange/ext DIO·AIO 출력, 펄스, 비트 조합, IO 함수 설정
sdk.get_data ManipulateGetDataSDK 변수 조회, 상대/절대 좌표 계산
sdk.point ManipulatePointSDK 포인트·좌표 등록
sdk.service ManipulateServiceSDK 미니 탭 기능
sdk.maintenance ManipulateMaintenanceSDK 관리자 설정
sdk.state ManipulateStateSDK whoami, 전원·서보·레퍼런스 제어, 실시간 상태 구독

move 모듈 — 주요 메서드

메서드 설명
call_move_j 관절 공간(Joint) 이동
call_move_l 직선(Linear) 이동
call_move_cir_axis 축 기반 원호 이동
call_move_cir_threepoint 3점 기반 원호 이동
call_move_home Home 위치로 이동
call_move_jb_clr/add/run Joint 블렌딩 경로 등록·실행
call_move_lb_clr/add/run Linear 블렌딩 경로 등록·실행
call_move_xb_clr/add/run X 블렌딩 경로 등록·실행
call_smoothjog_j/l 스무스 jog (관절/직선)
call_tickjog_j/l 틱 jog (관절/직선)
call_approach_j/l 어프로치 이동
call_servo_j/l 서보 모드 관절/직선 이동
call_online_j/l 온라인 궤적 관절/직선
call_admittance_force_on/off/change 어드미턴스 포스 제어
call_arc_sensing_on/off 아크 센싱 제어
call_tcp_weaving_on/off TCP 위빙 제어
call_base_conveyor_on/off/speed 베이스 컨베이어 제어

이동 예시

# 관절 이동
sdk.move.call_move_j(
    robot_model="C500920",
    target={"tar_values": [0, 0, 90, 0, 90, 0], "tar_frame": 0, "tar_unit": 1},
    speed={"spd_mode": 1, "spd_vel_para": 60, "spd_acc_para": 120},
)

# 직선 이동
sdk.move.call_move_l(
    robot_model="C500920",
    target={"tar_values": [400, 0, 500, 0, 180, 0], "tar_frame": 1, "tar_unit": 0},
)

# 기준 위치로부터 상대 이동
sdk.move.call_move_j(
    robot_model="C500920",
    target={"tar_values": [10, 0, 0, 0, 0, 0], "tar_frame": 0, "tar_unit": 1},
    reference_value=[0, 0, 90, 0, 90, 0],  # 현재 기준값
)

# 전원·서보 제어
sdk.state.call_powercontrol(robot_model="C500920", control=1)
sdk.state.call_servocontrol(robot_model="C500920", control=1)

7. RB-Y1 SDK (RBRby1SDK)

공식 rby1-sdk 래퍼. 자세한 설명은 rby1_sdk/README.md 참조.

from rb_sdk import RBRby1SDK

sdk = RBRby1SDK(endpoint="192.168.30.1:50051", model=None, auto_connect=True)

print(sdk.connected)
print(sdk.whoami())

await sdk.control.power_on(robot_model="a")
await sdk.control.servo_on(robot_model="a")

state = await sdk.state.get_state(robot_model="a")

모든 public 메서드는 async def + 내부 asyncio.to_thread입니다 (공식 SDK가 blocking C++ pybind11이므로). sync 컨텍스트(PyFM Step 등)에서는 asyncio.run(...) 또는 RBRby1BaseSDK.run_sync(...) 사용.


8. FlowManagerArgs 통합 (PyFM)

PyFM Step에서 SDK 메서드를 호출할 때 flow_manager_args를 전달하면 done 콜백 처리, 이동 완료 대기, 변수 바인딩 등이 자동으로 처리됩니다.

# PyFM Step 내부
def execute(self, flow_manager_args):
    sdk.move.call_move_j(
        robot_model="C500920",
        target={"tar_values": [0, 0, 0, 0, 0, 0], "tar_frame": 0, "tar_unit": 0},
        flow_manager_args=flow_manager_args,  # done() 콜백 자동 호출
    )

flow_manager_args를 생략하면 블로킹 호출로 동작합니다.


9. 아키텍처 메모

프로세스 단위 싱글톤

RBBaseSDK(RBAmrSDK, RBManipulateSDK)는 (pid, cls) 기준 싱글톤입니다. 같은 프로세스에서 여러 번 생성자를 호출해도 동일 인스턴스가 반환됩니다. ZenohClient도 PID당 하나를 공유합니다.

a = RBManipulateSDK()
b = RBManipulateSDK()
assert a is b  # True

자동 에러 래핑

RBBaseSDK를 상속한 클래스의 모든 public instance method는 __init_subclass__에 의해 자동으로 공통 try/except로 래핑됩니다. ZenohNoReply / ZenohTransportError 발생 시 자동 재연결 후 1회 재시도합니다.

SDK 종료

sdk.close()                         # 현재 인스턴스 정리
RBBaseSDK.close_all_for_pid()       # 현재 프로세스의 모든 SDK 인스턴스 정리

10. 주의사항

  • RBRby1SDK는 최초 endpoint/model 기준으로 세션이 공유됩니다.
  • 연결 대상(zenoh, rby1 endpoint, Muscat HTTP)이 준비되지 않으면 런타임 예외가 발생합니다.
  • 외부 PC에서 SDK 사용 시 방화벽이 7447/TCP(zenoh)와 8000/TCP을 양방향으로 허용해야 합니다.
  • use_directly=True는 싱글톤 재진입 시에도 반영됩니다. 단, 하위 SDK(sdk.move 등)에는 전파되지 않으므로 _directly 메서드는 최상위 SDK에 정의해야 합니다.

11. 라이선스 / 문의

© Rainbow Robotics. 내부 이슈/문의: GitHub Issues

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

rainbow_rb_sdk-0.0.9.dev12.tar.gz (87.3 kB view details)

Uploaded Source

Built Distribution

If you're not sure about the file name format, learn more about wheel file names.

rainbow_rb_sdk-0.0.9.dev12-py3-none-any.whl (104.2 kB view details)

Uploaded Python 3

File details

Details for the file rainbow_rb_sdk-0.0.9.dev12.tar.gz.

File metadata

  • Download URL: rainbow_rb_sdk-0.0.9.dev12.tar.gz
  • Upload date:
  • Size: 87.3 kB
  • Tags: Source
  • Uploaded using Trusted Publishing? No
  • Uploaded via: twine/6.2.0 CPython/3.12.5

File hashes

Hashes for rainbow_rb_sdk-0.0.9.dev12.tar.gz
Algorithm Hash digest
SHA256 d4a3bf99082207ac49ec4a66fa5ad45a5e8eeff09f9695696855e749a0cf7f48
MD5 5a2090f0253f364d2394cdebb11240b7
BLAKE2b-256 1e6e0b8618300d48f222193283c9a22ec08d0a49a47c9f523ebf8bb1f0481c8f

See more details on using hashes here.

File details

Details for the file rainbow_rb_sdk-0.0.9.dev12-py3-none-any.whl.

File metadata

File hashes

Hashes for rainbow_rb_sdk-0.0.9.dev12-py3-none-any.whl
Algorithm Hash digest
SHA256 564e7fa0ec643945c8cce0413b3685503d534ba4bfe0f6d111fec48c8d8f221d
MD5 7446a4bbce85b5256af2d1fb77ac98cf
BLAKE2b-256 193b58acd916d2a53249c0a5e7c563d6f30745bd835b01d9909923648b684571

See more details on using hashes here.

Supported by

AWS Cloud computing and Security Sponsor Datadog Monitoring Depot Continuous Integration Fastly CDN Google Download Analytics Pingdom Monitoring Sentry Error logging StatusPage Status page