Skip to main content

Python SDK for Cyberwave

Project description

Cyberwave Python SDK

The official Python SDK for Cyberwave. Create, control, and simulate robotics with ease.

Installation

pip install cyberwave

Quick Start

1. Get Your Token

Get your API token from the Cyberwave platform:

  • Log in to your Cyberwave instance
  • Navigate to Settings → API Tokens
  • Copy your token

2. Create Your First Digital Twin

import cyberwave as cw

# Configure with your token
cw.configure(
    token="your_token_here",
)

# Create a digital twin from an asset
robot = cw.twin("the-robot-studio/so101")

# Control position and rotation
robot.move(x=1.0, y=0.0, z=0.5)
robot.rotate(yaw=90)  # degrees

# Move the robot arm to 30 degrees
robot.joints.set("1", 30)

# Get current joint positions
print(robot.joints.get_all())

Core Features

Working with Workspaces and Projects

from cyberwave import Cyberwave

client = Cyberwave(
    token="your_token_here"
)

# List workspaces
workspaces = client.workspaces.list()
print(f"Found {len(workspaces)} workspaces")

# Create a project
project = client.projects.create(
    name="My Robotics Project",
    workspace_id=workspaces[0].uuid
)

# Create an environment
environment = client.environments.create(
    name="Development",
    project_id=project.uuid
)

Managing Assets and Twins

# Search for assets
assets = client.assets.search("robot")

# Create a twin from an asset
twin_data = client.twins.create(
    asset_id=assets[0].uuid,
    environment_id=environment.uuid,
    name="Robot-01"
)

# Use the high-level Twin API
from cyberwave import Twin
robot = Twin(client, twin_data)

# Move to a specific position
robot.move_to([1.0, 0.5, 0.0])

# Update scale
robot.scale(x=1.5, y=1.5, z=1.5)

# Delete when done
robot.delete()

Real-time Updates with MQTT

# Define callback for position updates
def on_position_change(data):
    print(f"Twin moved to: {data}")

# Subscribe to real-time updates
client.mqtt.subscribe_twin_position("twin_uuid", on_position_change)

# Publish position updates
client.mqtt.publish_twin_position(
    twin_id="twin_uuid",
    x=1.0, y=0.0, z=0.5
)

# Subscribe to joint states
def on_joint_update(data):
    print(f"Joint states: {data}")

client.mqtt.subscribe_joint_states("twin_uuid", on_joint_update)

Command Messages with MQTT

Receive and respond to commands for digital twins:

# Define command handler
def command_handler(data):
    command_type = data.get("command")
    if command_type == "greetings":
        print("Hello World!")
        client.mqtt.publish_command_message(twin_uuid, "ok")
    else:
        client.mqtt.publish_command_message(twin_uuid, "error")

# Subscribe to commands
client.mqtt.subscribe_command_message("twin_uuid", command_handler)

# Publish command response
client.mqtt.publish_command_message("twin_uuid", "ok")  # or "error"

Video Streaming (WebRTC)

Stream camera feeds to your digital twins using WebRTC.

To stream you will need to install FFMPEG if you don't have it.

On Mac with brew:

brew install ffmpeg pkg-config

On Ubuntu:

sudo apt-get install ffmpeg

Then install the additional deps for camera streaming:

# Install with camera support
pip install cyberwave[camera]
import asyncio
from cyberwave import Cyberwave

# Initialize client
client = Cyberwave(token="your_token_here")

# Create camera streamer - integrated into the Cyberwave client!
streamer = client.video_stream(
    twin_uuid="your_twin_uuid",
    camera_id=0,  # Default camera
    fps=10        # Frames per second
)

# Start streaming
async def stream_camera():
    await streamer.start()
    # Stream runs until stopped
    await asyncio.sleep(60)  # Stream for 60 seconds
    await streamer.stop()

# Run the async function
asyncio.run(stream_camera())

The video_stream() method:

  • Automatically uses the client's MQTT connection
  • Pre-configures the streamer with the twin UUID
  • Handles WebRTC peer connection setup
  • Manages ICE candidate gathering with STUN/TURN servers
  • Handles video encoding and streaming

Configuration Options

You can also set your token as environment variable:

export CYBERWAVE_TOKEN="your_token_here"
import cyberwave as cw

# SDK will automatically load from environment variables
robot = cw.twin("the-robot-studio/so101")

Programmatic Configuration

import cyberwave as cw

cw.configure(
    token="your_token_here",              # Bearer token
    environment="env_uuid",                # Default environment
    workspace="workspace_uuid",            # Default workspace
)

Advanced Usage

Context Manager for Cleanup

from cyberwave import Cyberwave

with Cyberwave(token="YOURTOKEN") as client:
    twins = client.twins.list()
    for twin in twins:
        print(twin.name)
# Automatically disconnects MQTT and cleans up resources

Joint Control

You can change a specific joint actuation. You can use degrees or radiants:

robot = cw.twin("the-robot-studio/so101")

# Set individual joints (degrees by default)
robot.joints.set("shoulder_joint", 45, degrees=True)

# Or use radians
import math
robot.joints.set("elbow_joint", math.pi/4, degrees=False)

# Get current joint position
angle = robot.joints.get("shoulder_joint")

# List all joints
joint_names = robot.joints.list()

# Get all joint states at once
all_joints = robot.joints.get_all()

API Reference

Cyberwave Client

  • client.workspaces - Workspace management
  • client.projects - Project management
  • client.environments - Environment management
  • client.assets - Asset catalog operations
  • client.twins - Digital twin CRUD operations
  • client.mqtt - Real-time MQTT client

Twin Class

  • twin.move(x, y, z) - Move twin to position
  • twin.move_to([x, y, z]) - Move to position array
  • twin.rotate(yaw, pitch, roll) - Rotate using euler angles
  • twin.rotate(quaternion=[x,y,z,w]) - Rotate using quaternion
  • twin.scale(x, y, z) - Scale the twin
  • twin.joints - Joint controller for robot manipulation
  • twin.delete() - Delete the twin
  • twin.refresh() - Reload twin data from server

Examples

Check the SDK repository for complete examples:

  • Basic twin control
  • Multi-robot coordination
  • Real-time synchronization
  • Joint manipulation for robot arms

Testing

Unit Tests

Run basic import tests:

poetry install
poetry run python tests/test_imports.py

Support

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

cyberwave-0.2.4.tar.gz (95.5 kB view details)

Uploaded Source

Built Distribution

If you're not sure about the file name format, learn more about wheel file names.

cyberwave-0.2.4-py3-none-any.whl (204.7 kB view details)

Uploaded Python 3

File details

Details for the file cyberwave-0.2.4.tar.gz.

File metadata

  • Download URL: cyberwave-0.2.4.tar.gz
  • Upload date:
  • Size: 95.5 kB
  • Tags: Source
  • Uploaded using Trusted Publishing? No
  • Uploaded via: poetry/2.2.1 CPython/3.9.25 Linux/6.11.0-1018-azure

File hashes

Hashes for cyberwave-0.2.4.tar.gz
Algorithm Hash digest
SHA256 49e0507d5a54933a6c33e35f30b4e9c7727753b0b64cddcad7aa4ba49c8415d9
MD5 59519e7c2c5ac2315c85e9f6700cc8b4
BLAKE2b-256 ed9af2634854235d05f31af0d147466e727b1f527682a16ff4a5890f23be04d6

See more details on using hashes here.

File details

Details for the file cyberwave-0.2.4-py3-none-any.whl.

File metadata

  • Download URL: cyberwave-0.2.4-py3-none-any.whl
  • Upload date:
  • Size: 204.7 kB
  • Tags: Python 3
  • Uploaded using Trusted Publishing? No
  • Uploaded via: poetry/2.2.1 CPython/3.9.25 Linux/6.11.0-1018-azure

File hashes

Hashes for cyberwave-0.2.4-py3-none-any.whl
Algorithm Hash digest
SHA256 14532c0d85b103093bc9cdc15755e56fa23a512d1b64a7e305b362f1d731e642
MD5 7070837305cf374cc7658d67f9f17fa8
BLAKE2b-256 56670de5dcc565e69393695e54e7132482073eaefb2da66cb0e21a284309e6c7

See more details on using hashes here.

Supported by

AWS Cloud computing and Security Sponsor Datadog Monitoring Depot Continuous Integration Fastly CDN Google Download Analytics Pingdom Monitoring Sentry Error logging StatusPage Status page