Communication library for the PUDA platform.
Project description
Puda Comms
A Python module for communication between machines and command services via NATS messaging. Provides client-side services for sending commands, machine-side clients for receiving commands, and data models for structured message exchange.
Overview
The puda_comms module enables asynchronous, reliable communication between command services and machines using NATS (NATS JetStream for guaranteed delivery). It handles:
- Command execution: Send commands to machines and receive responses
- Message routing: Queue commands (sequential execution) and immediate commands (control operations)
- State management: Thread-safe execution state tracking for cancellation and locking
- Connection management: Automatic NATS connection handling with async context managers
Components
The module consists of four main components:
1. Models (models.py)
Data models for structured message exchange. All models use Pydantic for validation and serialization.
Enums
CommandResponseStatus
Status of a command response:
SUCCESS: Command executed successfullyERROR: Command execution failed
CommandResponseCode
Error codes for command responses:
COMMAND_CANCELLED: Command was cancelled before completionJSON_DECODE_ERROR: Failed to decode JSON payloadEXECUTION_ERROR: General execution errorEXECUTION_LOCKED: Execution is locked (another command is running)UNKNOWN_COMMAND: Command name not recognizedPAUSE_ERROR: Error occurred while pausing executionRESUME_ERROR: Error occurred while resuming executionNO_EXECUTION: No execution foundRUN_ID_MISMATCH: Run ID doesn't match current executionCANCEL_ERROR: Error occurred while cancelling executionMACHINE_PAUSED: Machine is currently paused
MessageType
Type of NATS message:
COMMAND: Command message sent to machineRESPONSE: Response message from machineLOG: Log messageALERT: Alert messageMEDIA: Media message
ImmediateCommand
Command names for immediate/control commands:
PAUSE: Pause the current executionRESUME: Resume a paused executionCANCEL: Cancel the current execution
Data Models
CommandRequest
Represents a command to be sent to a machine.
Fields:
name(str): The command name to executemachine_id(str): Machine ID to send the command to (required)params(Dict[str, Any]): Command parameters (default: empty dict)step_number(int): Execution step number for tracking progressversion(str): Command version (default: "1.0")
Example:
command = CommandRequest(
name="attach_tip",
machine_id="first",
params={"deck_slot": "A3", "well_name": "G8"},
step_number=2,
version="1.0"
)
CommandResponse
Represents the result of a command execution.
Fields:
status(CommandResponseStatus): Status of the command response (SUCCESS or ERROR)completed_at(str): ISO 8601 UTC timestamp (auto-generated)code(Optional[str]): Error code if status is ERRORmessage(Optional[str]): Human-readable error message
Example:
response = CommandResponse(
status=CommandResponseStatus.SUCCESS,
completed_at="2026-01-20T02:00:46Z"
)
Error Example:
error_response = CommandResponse(
status=CommandResponseStatus.ERROR,
code="EXECUTION_ERROR",
message="Failed to attach tip: deck_slot A3 not found",
completed_at="2026-01-20T02:00:46Z"
)
MessageHeader
Header metadata for NATS messages.
Fields:
message_type(MessageType): Type of message (COMMAND, RESPONSE, LOG, etc.)version(str): Message version (default: "1.0")timestamp(str): ISO 8601 UTC timestamp (auto-generated)user_id(str): User ID who initiated the commandusername(str): Username who initiated the commandmachine_id(str): Identifier for the target machinerun_id(Optional[str]): Unique identifier (UUID) for the run/workflow
Example:
header = MessageHeader(
message_type=MessageType.RESPONSE,
version="1.0",
timestamp="2026-01-20T02:00:46Z",
user_id="user123",
username="John Doe",
machine_id="first",
run_id="092073e6-13d0-4756-8d99-eff1612a5a72"
)
NATSMessage
Complete NATS message structure combining header with optional command or response data.
Fields:
header(MessageHeader): Message header (required)command(Optional[CommandRequest]): Command request (for command messages)response(Optional[CommandResponse]): Command response (for response messages)
Structure:
- For command messages: include
headerwithmessage_type=COMMANDandcommandfield - For response messages: include
headerwithmessage_type=RESPONSEandresponsefield
Complete Message Example:
{
"header": {
"message_type": "response",
"version": "1.0",
"timestamp": "2026-01-20T02:00:46Z",
"user_id": "user123",
"username": "John Doe",
"machine_id": "first",
"run_id": "092073e6-13d0-4756-8d99-eff1612a5a72"
},
"command": {
"name": "attach_tip",
"params": {
"deck_slot": "A3",
"well_name": "G8"
},
"step_number": 2,
"version": "1.0"
},
"response": {
"status": "success",
"completed_at": "2026-01-20T02:00:46Z",
"code": null,
"message": null
}
}
2. CommandService (command_service.py)
Client-side service for sending commands to machines via NATS. Handles:
- Connecting to NATS servers
- Sending commands to machines (queue or immediate)
- Waiting for and handling responses
- Managing command lifecycle (run_id, step_number, etc.)
- Automatic connection cleanup via async context manager
See Sending Commands section for usage examples.
3. EdgeNatsClient (machine_client.py)
Basic default NATS client for generic machines. Handles commands, telemetry, and events following the puda.{machine_id}.{category}.{sub_category} pattern. Provides:
- Subscribing to command streams (queue and immediate) via JetStream with exactly-once delivery
- Processing incoming commands and sending command responses
- Publishing telemetry (core NATS, no JetStream)
- Publishing events (core NATS, fire-and-forget)
- Connection management and reconnection handling
Note: This is a generic client. Machine-specific methods should be implemented in the machine-edge client.
4. ExecutionState (execution_state.py)
Thread-safe state management for command execution. Provides:
- Execution lock to prevent concurrent commands
- Current task tracking for cancellation
- Run ID matching for cancel operations
- Thread-safe access to execution state
Sending Commands
The CommandService provides a high-level interface for sending commands to machines via NATS. See tests/commands.py and tests/batch_commands.py for complete examples.
Recommended Usage: Async Context Manager
The recommended way to use CommandService is with an async context manager, which automatically handles connection and disconnection. See tests/commands.py for complete examples.
Command Types
Queue Commands
Queue commands are regular commands that are executed in sequence. Use send_queue_command() for machine-specific operations.
Note: Available commands depend on the machine you are controlling. Different machines support different command sets (e.g., first machine supports commands like load_deck, attach_tip, aspirate_from, dispense_to, drop_tip, etc.).
Both send_queue_command(), send_queue_commands(), and send_immediate_command() accept an optional timeout parameter (default: 120 seconds):
# Single command (machine_id must be in CommandRequest)
reply = await service.send_queue_command(
request=request, # request.machine_id must be set
run_id=run_id,
user_id="user123",
username="John Doe",
timeout=60 # Wait up to 60 seconds
)
# Multiple commands (timeout applies to each command)
# Each command in the list must have machine_id set
reply = await service.send_queue_commands(
requests=commands, # Each CommandRequest must have machine_id
run_id=run_id,
user_id="user123",
username="John Doe",
timeout=60 # Wait up to 60 seconds per command
)
Examples:
See tests/commands.py for complete examples.
Immediate Commands
Immediate commands are control commands that interrupt or modify execution. Use send_immediate_command() for:
pause: Pause the current executionresume: Resume a paused executioncancel: Cancel the current execution
Examples:
See tests/commands.py for complete examples.
Sending Command Sequences
You can send multiple commands in sequence using send_queue_commands(), which sends commands one by one and waits for each response before sending the next. If any command fails or times out, it stops immediately and returns the error response.
Loading Commands from JSON (Recommended for LLM-generated commands):
When generating commands from an LLM or loading from external sources, you can store commands in a JSON file and load them. See tests/batch_commands.py for a complete example.
Error Handling
Always check the response status and handle errors appropriately:
reply: NATSMessage = await service.send_queue_command(
request=request, # request.machine_id must be set
run_id=run_id,
user_id="user123",
username="John Doe"
)
if reply is None:
# Command timed out or failed to send
logger.error("Command failed or timed out")
elif reply.response is not None and reply.response.status == CommandResponseStatus.SUCCESS:
# Command succeeded
logger.info("Command completed successfully")
else:
# Command failed with error
logger.error("Command failed with code: %s, message: %s",
reply.response.code if reply.response else None,
reply.response.message if reply.response else None)
Configuration
NATS Server Configuration
The CommandService requires NATS server URLs to be specified explicitly. There are no default values. You must provide servers in one of two ways:
Option 1: Via environment variable (comma-separated string)
Set the NATS_SERVERS environment variable with comma-separated server URLs:
export NATS_SERVERS="nats://192.168.50.201:4222,nats://192.168.50.201:4223,nats://192.168.50.201:4224"
Then parse it when creating a CommandService:
import os
nats_servers = [s.strip() for s in os.getenv("NATS_SERVERS", "").split(",") if s.strip()]
service = CommandService(servers=nats_servers)
Option 2: Directly as a list
Specify servers directly when creating a CommandService:
service = CommandService(servers=["nats://192.168.50.201:4222", "nats://192.168.50.201:4223", "nats://192.168.50.201:4224"])
Validation
All models use Pydantic for validation, ensuring:
- Type checking for all fields
- Required fields are present
- Default values are applied correctly
- JSON serialization/deserialization works correctly
Project details
Release history Release notifications | RSS feed
Download files
Download the file for your platform. If you're not sure which to choose, learn more about installing packages.
Source Distribution
Built Distribution
Filter files by name, interpreter, ABI, and platform.
If you're not sure about the file name format, learn more about wheel file names.
Copy a direct link to the current filters
File details
Details for the file puda_comms-0.0.13.tar.gz.
File metadata
- Download URL: puda_comms-0.0.13.tar.gz
- Upload date:
- Size: 30.9 kB
- Tags: Source
- Uploaded using Trusted Publishing? No
- Uploaded via: uv/0.9.18 {"installer":{"name":"uv","version":"0.9.18","subcommand":["publish"]},"python":null,"implementation":{"name":null,"version":null},"distro":{"name":"Fedora Linux","version":"43","id":"","libc":null},"system":{"name":null,"release":null},"cpu":null,"openssl_version":null,"setuptools_version":null,"rustc_version":null,"ci":null}
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
11c1c939f1f8c9af2d9010e0a4d478ca555279d15560a4071a57ed5fc1c6d6bb
|
|
| MD5 |
75e90619cce212d7d32c373bcca6ee33
|
|
| BLAKE2b-256 |
81efd106c5d8006680ac61c122c642087b02b25c7edec38e132885f09cdce4f4
|
File details
Details for the file puda_comms-0.0.13-py3-none-any.whl.
File metadata
- Download URL: puda_comms-0.0.13-py3-none-any.whl
- Upload date:
- Size: 36.0 kB
- Tags: Python 3
- Uploaded using Trusted Publishing? No
- Uploaded via: uv/0.9.18 {"installer":{"name":"uv","version":"0.9.18","subcommand":["publish"]},"python":null,"implementation":{"name":null,"version":null},"distro":{"name":"Fedora Linux","version":"43","id":"","libc":null},"system":{"name":null,"release":null},"cpu":null,"openssl_version":null,"setuptools_version":null,"rustc_version":null,"ci":null}
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
c4f825ee4d4ba2135342b73f4d88b7a3fbc3cbcc79281feb75347df74453c98d
|
|
| MD5 |
4a094f48b497ba06cea92b2a8a0eb5ef
|
|
| BLAKE2b-256 |
15b7a82a2607c7e8eb845bf438d63920b7e9bc4e79593f197fcd7464d627bf5e
|