Async helper for Atlas Command's asset websocket gateway.
Project description
Atlas Asset WebSocket Client
Async helper that speaks the asset WebSocket contract (/ws/assets). The package manages the handshake, heartbeat cadence, telemetry helpers, command acknowledgements, and schema validation so asset OS projects stay in sync with Atlas Command without reimplementing the protocol.
Requirements
- Python 3.9–3.12
websockets>=10pydantic>=1.10
Install the package from PyPI:
pip install atlas-asset-websocket-client
During development inside this repository, install the editable package:
pip install -e Atlas_Command/connection_packages/atlas_asset_ws_client
Quickstart
import asyncio
from atlas_asset_ws_client import AssetWebSocketClient
URL = "ws://localhost:8000/ws/assets"
ASSET_ID = "asset-123"
async def connect(url: str):
import websockets
return await websockets.connect(url, ping_interval=20)
async def main() -> None:
client = AssetWebSocketClient(URL, asset_id=ASSET_ID, connect=connect)
async with client:
ack = await client.handshake(
{
"firmware_version": "1.6.2",
"capabilities": {"telemetry": True, "commands": True},
}
)
print("Handshake ACK:", ack.payload)
await client.send_heartbeat()
await client.send_telemetry_update(
{"latitude": 40.7128, "longitude": -74.0060, "altitude_m": 120}
)
asyncio.run(main())
API Overview
- Connection:
connect(),close(),async with client: ... - Protocol helpers:
handshake(),send_heartbeat(),send_telemetry_update(),request_command_queue(),complete_command(),send_settings_update(),create_track(),update_track(),create_geofeature(),update_geofeature(),delete_geofeature() - Listener helpers:
atlas_asset_ws_client.listener.listen()andatlas_asset_ws_client.listener.process_command_queue()keep your asset reacting to realtime command feeds without re-implementing the loop. - Schema helpers:
atlas_asset_ws_client.schemas.AssetEnvelopeandatlas_asset_ws_client.schemas.AssetMetaexpose the same metadata contract used by Atlas Command. - Low-level helpers:
send_json(),receive_json(),receive_envelope()
All helpers populate meta.asset_id, meta.stream, timestamps, and correlation_id defaults when omitted.
Custom Transport or Headers
Provide a custom connect(url: str) coroutine if you need TLS, headers, proxies, or token handling. The coroutine must return an object that implements send, recv, and close coroutines.
Operation Notes
- Always call
handshake()before sending other frames. - Honor the heartbeat cadence advertised in the handshake acknowledgement.
- After reconnecting, redo the handshake and call
request_command_queue()to restore queue state. - Supply deterministic
correlation_idvalues to trace requests across services.
Schema Exports
The package re-exports the schema models used internally so you can import the envelope definitions without pulling in Atlas Command:
atlas_asset_ws_client.schemas.AssetEnvelopeatlas_asset_ws_client.schemas.AssetMetaatlas_asset_ws_client.schemas.TelemetryPayloadatlas_asset_ws_client.schemas.CommandCompletionPayloadatlas_asset_ws_client.schemas.TrackPayloadatlas_asset_ws_client.schemas.GeofeaturePayloadatlas_asset_ws_client.schemas.SettingsPayload
Use the typed helpers to pass BaseModel instances into the stream helpers instead of raw dicts, and avoid duplicating the payload definitions that Atlas Command itself consumes.
Advanced helpers
listen()yields every frame pushed by the asset gateway; you canasync for envelope in listen(client)and dispatch only thecommandsstream frames, whilereceive_envelope()still enforces the expected type/stream/asset identifiers.process_command_queue(client, handler)automatically requests the latest queue, waits for additionalcommand_queue:*envelopes, and rerunshandleruntil you signal anasyncio.Eventto stop.
from atlas_asset_ws_client import AssetWebSocketClient
from atlas_asset_ws_client.listener import listen
from atlas_asset_ws_client.schemas import TrackPayload
client = AssetWebSocketClient("ws://localhost:8000/ws/assets", asset_id="asset-123")
async def run() -> None:
async with client:
await client.handshake(payload={"model_id": "DRONE-X"})
await client.create_track(payload=TrackPayload(name="patrol"))
async for envelope in listen(client):
if envelope.type == "command_queue:data":
print("queue", envelope.payload)
See Atlas_Command/docs/WEBSOCKET.md and Atlas_Command/docs/CLIENT_INTEGRATION.md for protocol references and integration tips.
Testing
-
Run the deterministic suite:
pip install -r requirements-dev.txt pip install -e Atlas_Command/connection_packages/atlas_asset_ws_client[dev] pytest Atlas_Command/connection_packages/atlas_asset_ws_client/tests
-
The package exposes a live integration test that connects to
ws://localhost:8000/ws/assets(override withATLAS_WS_URL). SupplyATLAS_WS_TOKENandATLAS_WS_ASSETto control authentication and asset identity. The test will skip quietly if the endpoint is unreachable:pytest Atlas_Command/connection_packages/atlas_asset_ws_client/tests/test_client.py -m live
Project details
Download files
Download the file for your platform. If you're not sure which to choose, learn more about installing packages.
Source Distribution
Built Distribution
Filter files by name, interpreter, ABI, and platform.
If you're not sure about the file name format, learn more about wheel file names.
Copy a direct link to the current filters
File details
Details for the file atlas_asset_websocket_client-0.1.4.tar.gz.
File metadata
- Download URL: atlas_asset_websocket_client-0.1.4.tar.gz
- Upload date:
- Size: 13.4 kB
- Tags: Source
- Uploaded using Trusted Publishing? No
- Uploaded via: twine/6.2.0 CPython/3.11.14
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
f418aa64e3aa3baa5f6a25c91d62e765df3eb3cc40db52207f0f75789a4619af
|
|
| MD5 |
b3d2efb1ebd6bf3bb5de12da6f3feb0b
|
|
| BLAKE2b-256 |
6faeac65bf0265698a99e7134a1de80e156999b4571db1ab3410c598f5ad2ee8
|
File details
Details for the file atlas_asset_websocket_client-0.1.4-py3-none-any.whl.
File metadata
- Download URL: atlas_asset_websocket_client-0.1.4-py3-none-any.whl
- Upload date:
- Size: 15.3 kB
- Tags: Python 3
- Uploaded using Trusted Publishing? No
- Uploaded via: twine/6.2.0 CPython/3.11.14
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
bb0bb07027f4d49b162c2dc14fe92a8547eb28f53b1f9ccc4fc3bc981996014e
|
|
| MD5 |
207ecdd349e098d4d586a59d7e154e18
|
|
| BLAKE2b-256 |
cae4ed671c9f253b03eb7301147d69710901917a046a231c494b5aa179b6b498
|