Skip to main content

Rainbow Robotics 통합 Python SDK — AMR, 매니퓰레이터, RB-Y1 휴머노이드와 Rainbow Robot Service(common) 연동

Project description

rb_sdk

Rainbow Robotics 통합 Python SDK. AMR · Manipulator · RB-Y1 양팔로봇 제어와 Rainbow Robot Service(common) 연동을 단일 API로 제공합니다.

진입점

클래스 용도
RBAmrSDK AMR(자율 주행) 로봇 제어
RBManipulateSDK 협동 로봇 매니퓰레이터 제어
RBRby1SDK RB-Y1 양팔로봇 제어
RBBaseSDK 공통 베이스 (직접 상속하지 않는 한 사용할 일 없음)

내부적으로 Zenoh pub/sub/query 메시 위에서 동작하며, 같은 PC(로봇 본체)에서 실행할 때는 별도 설정 없이 즉시 사용 가능합니다.


1. 설치

pip install rb_sdk
# RB-Y1 양팔로봇 제어가 필요하면 함께 설치
pip install rby1-sdk

requires-python = ">=3.12,<3.13" — Python 3.12 전용.


2. 기본 사용 (로봇 본체에서 실행하는 경우)

common 서비스와 같은 PC에서 실행되는 스크립트라면 추가 설정 없이 그대로 사용 가능합니다.

from rb_sdk import RBAmrSDK, RBManipulateSDK

amr = RBAmrSDK()
manipulate = RBManipulateSDK()

# 이 시점에 ZenohClient가 127.0.0.1:7447(common)에 자동 연결
manipulate.move.call_move_j(
    robot_model="C500920",
    target={"tar_values": [0, 0, 0, 0, 0, 0], "tar_frame": 0, "tar_unit": 0},
)

3. 외부 PC에서 사용 (Remote Connect)

개발용 노트북 등 로봇 본체와 다른 머신에서 로봇과 통신하고 싶다면 sdk.connect()를 호출합니다.

import asyncio
from rb_sdk import RBManipulateSDK

async def main():
    sdk = RBManipulateSDK()

    # 로봇(common 서비스)에 외부 peer로 등록 + 자동 heartbeat 시작
    peer_id = await sdk.connect(
        token="외부 서비스 Token",     # Muscat이 생성한 서비스 Token
        host="192.168.1.100",       # 로봇 IP
        # ttl=30,                   # 서버 측 세션 TTL(초). heartbeat 끊기면 자동 해제
        # zenoh_port=None,          # None → OS가 빈 포트 자동 배정 (충돌 없음)
        # heartbeat_interval=15.0,  # 기본: ttl / 2
    )

    sdk.move.call_move_j(robot_model="C500920", target={...})

    await sdk.disconnect()

asyncio.run(main())

주요 특징

  • 로컬 IP 자동 감지: host 방향의 라우팅 인터페이스 IP를 자동으로 찾아 로봇에 알림
  • 포트 자동 배정: 별도 지정 없이 OS가 빈 포트를 잡아주므로 포트 충돌 없음
  • 자동 heartbeat: 주기적으로 TTL 연장. 스크립트 종료 / 크래시 시 TTL 만료로 서버 측 정리
  • 같은 PC 자동 감지: host가 localhost거나 자기 IP인 경우 connect()는 아무것도 하지 않고 None 반환 (이미 메시 안에 있음)

connect() 인자

인자 타입 기본 설명
token str (필수) common 서비스 JWT
host str (필수) common이 실행 중인 로봇 IP
zenoh_port int | None None 로컬 Zenoh listen 포트. None이면 자동
ttl int 30 서버 측 세션 TTL(초)
heartbeat_interval float | None None heartbeat 주기. None이면 ttl/2
mesh_timeout float | None None mesh 형성 대기 최대 시간(초). None이면 무제한 대기
mesh_probe_timeout float 1.0 probe query 1회 timeout(초)
mesh_probe_interval float 0.5 probe 재시도 간격(초)

4. use_directly 모드

RBBaseSDK 생성자에 use_directly=True를 전달하면, 메서드 호출 시 {name}_directly 메서드가 존재하면 자동으로 그쪽으로 라우팅됩니다. 없으면 원래 메서드가 그대로 호출됩니다.

class MySDK(RBManipulateSDK):
    def call_move_j_directly(self, *, robot_model, target, **kwargs):
        # Zenoh를 거치지 않고 직접 하드웨어 제어하는 구현
        ...

# use_directly=True → call_move_j 호출 시 call_move_j_directly로 라우팅
sdk = MySDK(use_directly=True)
sdk.move.call_move_j(robot_model="C500920", target={...})  # → call_move_j_directly 호출

# use_directly=False (기본) 또는 _directly 메서드 없으면 원래 메서드 사용
sdk2 = MySDK(use_directly=False)
sdk2.move.call_move_j(...)  # → call_move_j 호출

싱글톤 인스턴스가 이미 존재해도 use_directly는 매 생성자 호출마다 갱신됩니다.


5. AMR SDK (RBAmrSDK)

from rb_sdk import RBAmrSDK

sdk = RBAmrSDK()

하위 모듈

속성 클래스 주요 기능
sdk.move AmrMoveSDK 목표 지점 이동, jog, 정지/일시정지/재개, 직선·원호·회전 이동
sdk.control AmrControlSDK 안전 필드/플래그/IO 설정, 도킹/언도킹, LED/모터/jog 제어, 장애물 박스
sdk.localization AmrLocalizationSDK 위치 초기화, 랜덤/자동/반자동 init, 위치 추정 시작/정지
sdk.map AmrMapSDK 맵 목록/로드/삭제, 토폴로지 get/set, 매핑 시작/정지/저장
sdk.setting AmrSettingSDK 로봇 타입, 설정 카테고리/파라미터, PDU/드라이브/센서 설정
sdk.status AmrStatusSDK 상태·이동 상태·LiDAR·경로 조회
sdk.file AmrFileSDK 맵·리소스 파일 관리
sdk.capability AmrCapabilitySDK 로봇 능력 조회
sdk.accessory AmrAccessorySDK 액세서리 제어
sdk.program AmrProgramSDK AMR 프로그램 실행/제어

이동 예시

# 목표 노드로 이동
sdk.move.send_move_goal(
    robot_model="AMR001",
    goal_type=0,
    goal_node_name="node_A",
)

# 이동 결과 대기 (async)
result = await sdk.move.get_move_result(robot_model="AMR001")

# jog 이동
sdk.move.send_move_jog(robot_model="AMR001", vx=0.3, vy=0.0, wz=0.0)

# 멀티 경로
sdk.move.send_multi_path(robot_model="AMR001", path=["node_A", "node_B", "node_C"])

6. Manipulate SDK (RBManipulateSDK)

from rb_sdk import RBManipulateSDK

sdk = RBManipulateSDK()

하위 모듈

속성 클래스 주요 기능
sdk.move ManipulateMoveSDK 관절/직선/원호 이동, jog, 서보/온라인, 어드미턴스, 위빙, 컨베이어
sdk.program ManipulateProgramSDK 프로그램 실행·정지·재개, 핀포인트, 툴 출력
sdk.config ManipulateConfigSDK 툴·유저프레임·충돌 파라미터, 속도/impedance/freedrive 설정
sdk.io ManipulateIOSDK side/flange/ext DIO·AIO 출력, 펄스, 비트 조합, IO 함수 설정
sdk.get_data ManipulateGetDataSDK 변수 조회, 상대/절대 좌표 계산
sdk.point ManipulatePointSDK 포인트·좌표 등록
sdk.service ManipulateServiceSDK 미니 탭 기능
sdk.maintenance ManipulateMaintenanceSDK 관리자 설정
sdk.state ManipulateStateSDK whoami, 전원·서보·레퍼런스 제어, 실시간 상태 구독

move 모듈 — 주요 메서드

메서드 설명
call_move_j 관절 공간(Joint) 이동
call_move_l 직선(Linear) 이동
call_move_cir_axis 축 기반 원호 이동
call_move_cir_threepoint 3점 기반 원호 이동
call_move_home Home 위치로 이동
call_move_jb_clr/add/run Joint 블렌딩 경로 등록·실행
call_move_lb_clr/add/run Linear 블렌딩 경로 등록·실행
call_move_xb_clr/add/run X 블렌딩 경로 등록·실행
call_smoothjog_j/l 스무스 jog (관절/직선)
call_tickjog_j/l 틱 jog (관절/직선)
call_approach_j/l 어프로치 이동
call_servo_j/l 서보 모드 관절/직선 이동
call_online_j/l 온라인 궤적 관절/직선
call_admittance_force_on/off/change 어드미턴스 포스 제어
call_arc_sensing_on/off 아크 센싱 제어
call_tcp_weaving_on/off TCP 위빙 제어
call_base_conveyor_on/off/speed 베이스 컨베이어 제어

이동 예시

# 관절 이동
sdk.move.call_move_j(
    robot_model="C500920",
    target={"tar_values": [0, 0, 90, 0, 90, 0], "tar_frame": 0, "tar_unit": 1},
    speed={"spd_mode": 1, "spd_vel_para": 60, "spd_acc_para": 120},
)

# 직선 이동
sdk.move.call_move_l(
    robot_model="C500920",
    target={"tar_values": [400, 0, 500, 0, 180, 0], "tar_frame": 1, "tar_unit": 0},
)

# 기준 위치로부터 상대 이동
sdk.move.call_move_j(
    robot_model="C500920",
    target={"tar_values": [10, 0, 0, 0, 0, 0], "tar_frame": 0, "tar_unit": 1},
    reference_value=[0, 0, 90, 0, 90, 0],  # 현재 기준값
)

# 전원·서보 제어
sdk.state.call_powercontrol(robot_model="C500920", control=1)
sdk.state.call_servocontrol(robot_model="C500920", control=1)

7. RB-Y1 SDK (RBRby1SDK)

공식 rby1-sdk 래퍼. 자세한 설명은 rby1_sdk/README.md 참조.

from rb_sdk import RBRby1SDK

sdk = RBRby1SDK(endpoint="192.168.30.1:50051", model=None, auto_connect=True)

print(sdk.connected)
print(sdk.whoami())

await sdk.control.power_on(robot_model="a")
await sdk.control.servo_on(robot_model="a")

state = await sdk.state.get_state(robot_model="a")

모든 public 메서드는 async def + 내부 asyncio.to_thread입니다 (공식 SDK가 blocking C++ pybind11이므로). sync 컨텍스트(PyFM Step 등)에서는 asyncio.run(...) 또는 RBRby1BaseSDK.run_sync(...) 사용.


8. FlowManagerArgs 통합 (PyFM)

PyFM Step에서 SDK 메서드를 호출할 때 flow_manager_args를 전달하면 done 콜백 처리, 이동 완료 대기, 변수 바인딩 등이 자동으로 처리됩니다.

# PyFM Step 내부
def execute(self, flow_manager_args):
    sdk.move.call_move_j(
        robot_model="C500920",
        target={"tar_values": [0, 0, 0, 0, 0, 0], "tar_frame": 0, "tar_unit": 0},
        flow_manager_args=flow_manager_args,  # done() 콜백 자동 호출
    )

flow_manager_args를 생략하면 블로킹 호출로 동작합니다.


9. 아키텍처 메모

프로세스 단위 싱글톤

RBBaseSDK(RBAmrSDK, RBManipulateSDK)는 (pid, cls) 기준 싱글톤입니다. 같은 프로세스에서 여러 번 생성자를 호출해도 동일 인스턴스가 반환됩니다. ZenohClient도 PID당 하나를 공유합니다.

a = RBManipulateSDK()
b = RBManipulateSDK()
assert a is b  # True

자동 에러 래핑

RBBaseSDK를 상속한 클래스의 모든 public instance method는 __init_subclass__에 의해 자동으로 공통 try/except로 래핑됩니다. ZenohNoReply / ZenohTransportError 발생 시 자동 재연결 후 1회 재시도합니다.

SDK 종료

sdk.close()                         # 현재 인스턴스 정리
RBBaseSDK.close_all_for_pid()       # 현재 프로세스의 모든 SDK 인스턴스 정리

10. 주의사항

  • RBRby1SDK는 최초 endpoint/model 기준으로 세션이 공유됩니다.
  • 연결 대상(zenoh, rby1 endpoint, Muscat HTTP)이 준비되지 않으면 런타임 예외가 발생합니다.
  • 외부 PC에서 SDK 사용 시 방화벽이 7447/TCP(zenoh)와 8000/TCP을 양방향으로 허용해야 합니다.
  • use_directly=True는 싱글톤 재진입 시에도 반영됩니다. 단, 하위 SDK(sdk.move 등)에는 전파되지 않으므로 _directly 메서드는 최상위 SDK에 정의해야 합니다.

11. 라이선스 / 문의

© Rainbow Robotics. 내부 이슈/문의: GitHub Issues

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

rainbow_rb_sdk-0.0.9.dev15.tar.gz (87.4 kB view details)

Uploaded Source

Built Distribution

If you're not sure about the file name format, learn more about wheel file names.

rainbow_rb_sdk-0.0.9.dev15-py3-none-any.whl (104.3 kB view details)

Uploaded Python 3

File details

Details for the file rainbow_rb_sdk-0.0.9.dev15.tar.gz.

File metadata

  • Download URL: rainbow_rb_sdk-0.0.9.dev15.tar.gz
  • Upload date:
  • Size: 87.4 kB
  • Tags: Source
  • Uploaded using Trusted Publishing? No
  • Uploaded via: twine/6.2.0 CPython/3.12.5

File hashes

Hashes for rainbow_rb_sdk-0.0.9.dev15.tar.gz
Algorithm Hash digest
SHA256 f2ebb2cb67481ac46798a0b1ad67ff2d11cf544fb72d84879d30f22e0a3408a2
MD5 eb31ed31b2714c31b082c0716a429c9d
BLAKE2b-256 b427930fc41581a365a28c4fb1858cc2d77fa8c456591958e5aec18fc6078cc3

See more details on using hashes here.

File details

Details for the file rainbow_rb_sdk-0.0.9.dev15-py3-none-any.whl.

File metadata

File hashes

Hashes for rainbow_rb_sdk-0.0.9.dev15-py3-none-any.whl
Algorithm Hash digest
SHA256 a61f33a2a0c45b2b88515bb0c7cb549220ce2a60f36a24ec1f32f44dce408485
MD5 d361d826b9eaff6c04e9c78441e996bd
BLAKE2b-256 a22ad74d0d6dbd3aa30ac8b798394d1410fa7b3f84709b4fb37a0c97f4bf9b98

See more details on using hashes here.

Supported by

AWS Cloud computing and Security Sponsor Datadog Monitoring Depot Continuous Integration Fastly CDN Google Download Analytics Pingdom Monitoring Sentry Error logging StatusPage Status page