Skip to main content

Universal interface for AI agents to control ROS robots via natural language

Project description

Agent ROS Bridge

Universal ROS1/ROS2 bridge for AI agents to control robots and embodied intelligence systems.

CI PyPI Python License: MIT OpenClaw


What It Does

Agent ROS Bridge sits between your AI agent and your robots. It speaks WebSocket, MQTT, and gRPC on the agent side, and ROS1/ROS2 on the robot side — with JWT auth, agent memory, safety confirmation, and fleet orchestration built in.

┌─────────────────────────────────────────────────────────────┐
│                      AI AGENT LAYER                          │
│   LangChain · AutoGPT · Claude (MCP) · OpenClaw · Custom    │
└──────────────────────────┬──────────────────────────────────┘
                           │  WebSocket / MQTT / gRPC
                           ▼
┌─────────────────────────────────────────────────────────────┐
│                   AGENT ROS BRIDGE                           │
│  ┌──────────────┐  ┌─────────────┐  ┌────────────────────┐  │
│  │  Transports  │  │  Core Bridge│  │  AI Integrations   │  │
│  │  WebSocket   │  │  JWT Auth   │  │  Memory (SQLite)   │  │
│  │  MQTT        │  │  RBAC       │  │  Safety Manager    │  │
│  │  gRPC        │  │  Fleet Mgmt │  │  Tool Discovery    │  │
│  └──────────────┘  └─────────────┘  └────────────────────┘  │
└──────────────────────────┬──────────────────────────────────┘
                           │  rclpy / rospy
              ┌────────────┴────────────┐
              ▼                         ▼
       ROS2 (Jazzy/Humble)         ROS1 (Noetic)

Feature Status

Feature Status Notes
WebSocket transport ✅ Working Full duplex, TLS support
MQTT transport ✅ Working paho v2 compatible
gRPC transport ✅ Working Bidirectional streaming, TLS/mTLS, JWT auth
JWT authentication ✅ Working Per-transport, RBAC included
Agent memory (SQLite) ✅ Working TTL, append, list ops
Agent memory (Redis) ✅ Working Same API, Redis backend
Safety manager ✅ Working Emergency stop, policy levels
Fleet orchestration ✅ Working Task allocation, priority queue
LangChain adapter ✅ Working ROSBridgeTool, ROSAgent
MCP server (stdio) ✅ Working Claude Desktop integration
AutoGPT adapter ✅ Working Command discovery & execution
OpenClaw adapter ✅ Working ClawHub skill + extension support
Web dashboard ✅ Working HTTP polling, emergency stop button
Simulated robot ✅ Working No ROS needed — perfect for testing
ROS2 connector ✅ Working Dynamic message types, publish/subscribe, discovery
ROS1 connector ✅ Working Full feature parity with ROS2
Tool auto-discovery ✅ Working Real-time topic/service/action introspection

Quick Start (no ROS required)

The fastest path uses the built-in simulated robot — a full bridge with a virtual TurtleBot that responds to commands over WebSocket, with zero ROS installation needed.

1. Install

pip install agent-ros-bridge

2. Set JWT secret

export JWT_SECRET=$(openssl rand -base64 32)

The bridge will refuse to start without JWT_SECRET. This is intentional.

3. Start the simulated bridge

python examples/quickstart/simulated_robot.py
# WebSocket available at ws://localhost:8766

Or with Docker:

docker-compose -f examples/quickstart/docker-compose.yml up

4. Generate a token and connect

# Generate a JWT token
python scripts/generate_token.py --user myagent --roles operator

# Connect with wscat (npm install -g wscat)
wscat -c "ws://localhost:8766?token=<TOKEN>"

5. Send commands

{"command": {"action": "list_robots"}}
{"command": {"action": "get_topics"}}
{"command": {"action": "move", "parameters": {"direction": "forward", "distance": 1.0}}}
{"command": {"action": "rotate", "parameters": {"angle": 90}}}

Python API

Basic bridge

import asyncio, os
from agent_ros_bridge import Bridge
from agent_ros_bridge.gateway_v2.transports.websocket import WebSocketTransport

os.environ["JWT_SECRET"] = "your-secret"   # or set in env before importing

async def main():
    bridge = Bridge()
    bridge.transport_manager.register(WebSocketTransport({"port": 8765}))
    async with bridge.run():
        print("Bridge running on ws://localhost:8765")
        while bridge.running:
            await asyncio.sleep(1)

asyncio.run(main())

With LangChain

from agent_ros_bridge import Bridge

bridge = Bridge()

# Get a LangChain-compatible tool
tool = bridge.get_langchain_tool(actions=["navigate", "move_arm", "get_status"])

# Plug into any LangChain agent
from langchain_core.tools import BaseTool
from langchain.agents import AgentExecutor, create_react_agent

agent = create_react_agent(llm, [tool], prompt)
executor = AgentExecutor(agent=agent, tools=[tool])
result = executor.invoke({"input": "Navigate the robot to zone A"})

With Claude Desktop (MCP)

from agent_ros_bridge import Bridge

bridge = Bridge()
mcp = bridge.get_mcp_server(mode="stdio")
await mcp.start()   # Claude Desktop can now control robots

Add to your claude_desktop_config.json:

{
  "mcpServers": {
    "agent-ros-bridge": {
      "command": "python",
      "args": ["-m", "agent_ros_bridge.integrations.mcp_transport"],
      "env": {
        "JWT_SECRET": "your-secret-here"
      }
    }
  }
}

With AutoGPT

from agent_ros_bridge import Bridge

bridge = Bridge()
adapter = bridge.get_autogpt_adapter()

# List available commands
commands = adapter.get_commands()

# Execute a command
result = await adapter.execute_command("ros_navigate", target="zone_a")

With OpenClaw

from agent_ros_bridge import Bridge

bridge = Bridge()
adapter = bridge.get_openclaw_adapter()

# Get ClawHub skill path
skill_path = adapter.get_skill_path()
print(f"Skill location: {skill_path}")
# Package for ClawHub: cd {skill_path} && python scripts/package_skill.py

# Or use extension mode
tools = adapter.get_tools()
result = await adapter.execute_tool("ros2_publish", {
    "topic": "/cmd_vel",
    "message": {"linear": {"x": 0.5}}
})

Install via ClawHub:

# When published to ClawHub
npx clawhub@latest install agent-ros-bridge

# Or manually package and install
cd skills/agent-ros-bridge
python scripts/package_skill.py
# Upload dist/agent-ros-bridge.skill to ClawHub

Natural language commands (via OpenClaw):

  • "Move forward 1 meter" → publishes to /cmd_vel
  • "Navigate to the kitchen" → sends Nav2 goal
  • "What do you see?" → captures camera frame
  • "Check the battery" → reads /battery_state
  • "Emergency stop" → triggers safety e-stop

Agent memory

bridge = Bridge(config={"memory_backend": "sqlite", "memory_path": "/tmp/bridge.db"})

# Memory is automatically attached to Bridge.memory
await bridge.memory.set("last_position", {"x": 3.0, "y": 1.5}, ttl=3600)
pos = await bridge.memory.get("last_position")

# Append to a history log
await bridge.memory.append("agent:user1:actions", {"action": "navigate", "ts": "..."})

Safety confirmation

from agent_ros_bridge.integrations.safety import SafetyManager, SafetyLevel

safety = SafetyManager()
safety.register_policy("move_arm", SafetyLevel.DANGEROUS, "Arm movement requires confirmation")

# Register a callback to receive confirmation requests (e.g. send to operator dashboard)
def on_confirm_request(request):
    print(f"Operator approval needed: {request.id}{request.message}")

safety.on_confirmation_request(on_confirm_request)

# Approve via request ID
await safety.confirm(request_id)

# Emergency stop
safety.trigger_emergency_stop("Obstacle detected")

Fleet orchestration

from agent_ros_bridge.fleet import FleetOrchestrator, FleetRobot, RobotCapability, Task

orchestrator = FleetOrchestrator()
await orchestrator.start()

# Register robots
await orchestrator.add_robot(FleetRobot(
    robot_id="tb4_01", name="TurtleBot4 #1",
    capabilities=RobotCapability(can_navigate=True, battery_hours=4.0)
))

# Submit tasks
task_id = await orchestrator.submit_task(Task(
    type="navigate", target_location="zone_b", priority=3
))

metrics = orchestrator.get_metrics()
print(f"Fleet: {metrics.idle_robots} idle, {metrics.active_robots} active")

Docker

Development / CI (no ROS)

# Build
docker build -t agent-ros-bridge .

# Run with JWT secret
docker run --rm -e JWT_SECRET=$JWT_SECRET -p 8765:8765 agent-ros-bridge

With ROS2 Jazzy

docker build -f docker/Dockerfile.ros2 -t agent-ros-bridge:ros2 .
docker run --rm \
  -e JWT_SECRET=$JWT_SECRET \
  -e ROS_DOMAIN_ID=0 \
  -p 8765:8765 \
  agent-ros-bridge:ros2

docker-compose (profile-based)

# ROS2 bridge only
JWT_SECRET=$JWT_SECRET docker-compose --profile ros2 up

# ROS1 bridge + roscore
JWT_SECRET=$JWT_SECRET docker-compose --profile ros1 up

# ROS2 with TurtleBot4 simulator
JWT_SECRET=$JWT_SECRET docker-compose --profile ros2 --profile sim up

Connecting to a Real Robot (ROS2)

ROS2 publish/subscribe implementation is in progress. The following shows the intended workflow:

# 1. Install with ROS2 support
pip install "agent-ros-bridge[ros2]"

# 2. Source your ROS2 environment
source /opt/ros/jazzy/setup.bash   # or humble

# 3. Set config pointing at your ROS domain
cat > config/gateway.yaml << EOF
name: my_robot_bridge
transports:
  websocket:
    port: 8765
    auth:
      enabled: true
      jwt_secret: ${JWT_SECRET}
connectors:
  ros2:
    enabled: true
    options:
      domain_id: 0
EOF

# 4. Start
agent-ros-bridge --config config/gateway.yaml

CLI Reference

# Start with default config (WebSocket on 8765, gRPC on 50051)
agent-ros-bridge

# Start with a config file
agent-ros-bridge --config config/gateway.yaml

# Demo mode — WebSocket + greenhouse plugin, no ROS needed
agent-ros-bridge --demo

# Override ports
agent-ros-bridge --websocket-port 9000 --grpc-port 50052

# Print version
agent-ros-bridge --version

Environment Variables

Variable Default Description
JWT_SECRET Required. JWT signing secret (min 32 chars). Generate: openssl rand -base64 32
BRIDGE_CONFIG Path to YAML/JSON config file
BRIDGE_LOG_LEVEL INFO Log level: DEBUG, INFO, WARNING, ERROR
BRIDGE_WEBSOCKET_PORT 8765 WebSocket listen port
BRIDGE_GRPC_PORT 50051 gRPC listen port

Examples

Example Description Run
examples/quickstart/ Simulated robot — zero ROS needed docker-compose up
examples/fleet/ 4-robot fleet with task scheduling ./run.sh
examples/auth/ JWT auth, API keys, RBAC ./run.sh
examples/arm/ Robot arm control (UR, xArm, Franka) ./run.sh
examples/actions/ ROS Actions (nav2, manipulation) ./run.sh
examples/mqtt_iot/ MQTT IoT devices + bridge docker-compose up
examples/metrics/ Prometheus metrics + Grafana docker-compose up
examples/v0.5.0_integrations/ LangChain, AutoGPT, MCP, OpenClaw, Dashboard python <example>.py

Project Structure

agent_ros_bridge/
│
├── gateway_v2/              # Core bridge
│   ├── core.py              # Bridge class — transport/connector/plugin lifecycle
│   ├── auth.py              # JWT authentication + RBAC
│   ├── config.py            # YAML/JSON/env config loader  (BRIDGE_* env vars)
│   ├── __main__.py          # CLI entry point  (agent-ros-bridge)
│   │
│   ├── transports/
│   │   ├── websocket.py     # WebSocket transport  ✅
│   │   ├── mqtt_transport.py# MQTT transport        ✅
│   │   └── grpc_transport.py# gRPC transport        🔧 in progress
│   │
│   ├── connectors/
│   │   ├── ros2_connector.py# ROS2 (rclpy)          🔧 in progress
│   │   └── ros1_connector.py# ROS1 (rospy)          🔧 in progress
│   │
│   └── plugins/
│       └── greenhouse_plugin.py  # Example plugin    ✅
│
├── integrations/            # AI agent integrations
│   ├── memory.py            # AgentMemory — SQLite/Redis with TTL  ✅
│   ├── safety.py            # SafetyManager — confirmation + e-stop ✅
│   ├── discovery.py         # ToolDiscovery — MCP/OpenAI export    ✅
│   ├── langchain_adapter.py # ROSBridgeTool, ROSAgent               ✅
│   ├── autogpt_adapter.py   # AutoGPT command adapter               ✅
│   ├── openclaw_adapter.py  # OpenClaw/ClawHub integration          ✅
│   ├── mcp_transport.py     # MCP stdio server                      ✅
│   └── dashboard_server.py  # aiohttp web dashboard                 ✅
│
├── fleet/
│   └── orchestrator.py      # FleetOrchestrator — multi-robot tasks ✅
│
├── plugins/
│   └── arm_robot.py         # UR / xArm / Franka arm plugin         🔧
│
├── actions/
│   └── __init__.py          # ROS Action client (ROS2 + simulated)  ✅
│
└── skills/
    └── agent-ros-bridge/    # OpenClaw/ClawHub skill                ✅
        ├── SKILL.md         # Skill definition
        ├── references/      # ROS1/ROS2 guides
        └── scripts/         # Packaging script

Security

  • JWT_SECRET is always required — the bridge refuses to start without it.
  • Authentication is opt-in per transport via config (auth.enabled: true). The simulated quickstart runs without auth for ease of development.
  • RBAC roles: admin (all), operator (navigate/publish/subscribe), viewer (read-only).
  • TLS: pass tls_cert / tls_key in transport config.
  • Generate tokens: python scripts/generate_token.py --user robot_agent --roles operator

Development

# Install dev + test dependencies
pip install -e ".[dev,test]"

# Run all tests
pytest

# Run specific suites
pytest tests/unit/ -v
pytest tests/integrations/ -v        # SafetyManager, Memory (no ROS needed)
pytest tests/integration/ -v         # requires running bridge

# Lint + format
ruff check .
black --check .

# Build package
python -m build

Documentation

Document Description
docs/ARCHITECTURE_V2.md Three-layer gateway architecture
docs/API_REFERENCE.md Full Python API reference
docs/NATIVE_ROS.md Native Ubuntu/ROS installation
docs/MULTI_ROS.md Multi-robot / multi-ROS-version setup
docs/DOCKER_VS_NATIVE.md Deployment trade-offs
docs/DDS_ARCHITECTURE.md ROS2 DDS / domain isolation
docs/troubleshooting.md Common issues and solutions
CHANGELOG.md Release history
CONTRIBUTING.md How to contribute
REVIEW.md Static + functional code review

Contributing

See CONTRIBUTING.md. Priority areas for contribution:

  • ROS2 Connectorros2_connector.py: complete _cmd_publish(), subscribe(), and _ros_msg_to_dict() using rosidl_runtime_py
  • gRPC Transport — extract proto/bridge.proto, generate stubs, register service
  • Safety approval endpoint — add a WebSocket command (safety.confirm) so operators can approve dangerous actions from the agent side
  • Tool auto-discovery — implement _discover_topics/services/actions() in integrations/discovery.py

License

MIT License — © Agent ROS Bridge Contributors

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

agent_ros_bridge-0.6.0.tar.gz (402.7 kB view details)

Uploaded Source

Built Distribution

If you're not sure about the file name format, learn more about wheel file names.

agent_ros_bridge-0.6.0-py3-none-any.whl (130.0 kB view details)

Uploaded Python 3

File details

Details for the file agent_ros_bridge-0.6.0.tar.gz.

File metadata

  • Download URL: agent_ros_bridge-0.6.0.tar.gz
  • Upload date:
  • Size: 402.7 kB
  • Tags: Source
  • Uploaded using Trusted Publishing? No
  • Uploaded via: twine/6.2.0 CPython/3.11.14

File hashes

Hashes for agent_ros_bridge-0.6.0.tar.gz
Algorithm Hash digest
SHA256 4f62baf94fc51c81131994649bb09a1c666692c20ff354be5aa0499b949f5250
MD5 3b6c5e97ab0da20b8d24780dce804cae
BLAKE2b-256 9af96c9ef8091ef93cd02d72155c0ff06155b16b5398f7d3492ea24e595ae34b

See more details on using hashes here.

File details

Details for the file agent_ros_bridge-0.6.0-py3-none-any.whl.

File metadata

File hashes

Hashes for agent_ros_bridge-0.6.0-py3-none-any.whl
Algorithm Hash digest
SHA256 2b3acccd9f6a6f0eff902111e14728067817235f6fefd8d6b3f2e221512f6fa3
MD5 6ae53900192aabd903003ceb37cfb36e
BLAKE2b-256 61d486122e0eefb6c6947e6a514243bca9a2d53adcd7e73fde8141430abd1af8

See more details on using hashes here.

Supported by

AWS Cloud computing and Security Sponsor Datadog Monitoring Depot Continuous Integration Fastly CDN Google Download Analytics Pingdom Monitoring Sentry Error logging StatusPage Status page