Thin Python SDK for Plexus — send telemetry in one line
Project description
plexus-python
Thin Python SDK for Plexus. Send telemetry to the Plexus gateway in one line. Storage, dashboards, alerts, and fleet management live in the platform — this package just ships your data.
Quick Start
pip install plexus-python
from plexus import Plexus
px = Plexus(api_key="plx_xxx", source_id="device-001")
px.send("temperature", 72.5)
Get an API key at app.plexus.company → Devices → Add Device.
Device identity
Every device needs a unique source_id. The recommended way to set one on a real host is the bootstrap script, which requires a device name up front:
curl -sL https://app.plexus.company/setup | bash -s -- \
--key plx_xxx --name drone-01
The name must match ^[a-z0-9][a-z0-9_-]{1,62}$. setup.sh refuses to run without --name (or without a TTY to prompt for one) — this is deliberate, because the previous hostname fallback silently merged telemetry from cloned SD-card images that all booted as raspberrypi.
If two devices end up requesting the same name, the gateway auto-suffixes: the first connection gets drone-01, the second gets drone-01_2, the third drone-01_3, and so on. The SDK logs the rename at INFO and persists the assigned name to ~/.plexus/config.json so the device keeps its identity across reboots. Under the hood, a per-installation UUID (install_id, lazily generated on first run) is what lets the gateway tell "same device reconnecting" from "different device claiming the same name."
In normal code, you usually just pass source_id=... explicitly to Plexus(...) and never have to think about it.
Core methods
send(metric, value) — stream a reading
The main method. Call it every time you have a new sensor reading.
px = Plexus(source_id="rig-01") # reads PLEXUS_API_KEY from env
px.send("engine.rpm", 3450)
px.send("coolant.temp", 82.3)
metric is a dot-namespaced string ("motor.rpm", "gps.fix_quality"). value accepts any JSON-serializable type:
| Type | Example | When to use |
|---|---|---|
float / int |
72.5, 3450 |
Sensor readings, counters |
str |
"RUNNING", "E_STALL" |
State machines, error codes |
bool |
True |
Binary flags |
dict |
{"x": 1.5, "y": 2.3} |
Vectors, structured readings |
list |
[0.5, 1.2, -0.3] |
Waveforms, joint angles |
Optional arguments:
tags={"motor_id": "A1"}— key-value labels for filtering in the dashboardtimestamp=t— explicit Unix timestamp in seconds; omit to let the SDK pick (see Timestamps)
send_batch(points) — send multiple readings at once
Use this when you sample several sensors together and want them to share a timestamp and land in one network call.
px.send_batch([
("temperature", 22.4),
("humidity", 58.1),
("pressure", 1013.2),
])
points is a list of (metric, value) tuples. All points share the same timestamp (now, unless you pass timestamp=t). For independent timestamps per point, call send() in a loop instead.
event(name, data) — record a discrete occurrence
Use event() for things that happen rather than things you measure continuously. Faults, state transitions, operator actions, log entries — anything you'd put on a timeline as a marker rather than plot as a graph.
px.event("fault", "E-stop triggered")
px.event("state_change", {"from": "IDLE", "to": "RUNNING"})
px.event("sensor_error", {"sensor": "imu", "code": 42}, tags={"motor": "A"})
The platform displays events as markers overlaid on your telemetry charts, not as time-series lines.
run(run_id) — group data into a named recording
with px.run("thermal-cycle-001"):
while running:
px.send("temperature", read_temp())
All send() calls inside the context are tagged with run_id, making it easy to isolate and replay that slice of data in the dashboard.
Video streaming
Two methods depending on whether you control the capture loop or just have a URL.
send_video_frame(frame, camera_id) — send frames you capture yourself
Use this when your code owns the capture loop — a picamera2 callback, an OpenCV VideoCapture loop, or an FFmpeg pipe you manage. Pass each frame and the SDK ships it to Plexus over WebSocket.
import cv2
cap = cv2.VideoCapture(0)
while True:
ok, frame = cap.read()
if ok:
px.send_video_frame(frame, camera_id="front")
Accepted frame types:
- numpy ndarray (H × W × C) — from OpenCV or picamera2; requires
opencv-python - JPEG bytes — passed through as-is, zero re-encode overhead
- Other image bytes (PNG, BMP, WebP) — decoded and re-encoded as JPEG via Pillow; requires
pip install plexus-python[video]
camera_id identifies which camera the frame came from. Use distinct IDs when streaming from multiple cameras simultaneously ("front", "rear", "cam:0").
stream_camera(url, camera_id) — stream from an RTSP URL or file
Use this when you have an RTSP stream or video file and don't want to manage the capture loop yourself. The SDK runs FFmpeg internally and handles the rest. Requires FFmpeg on $PATH.
stop = px.stream_camera("rtsp://192.168.1.100/stream", camera_id="front")
# ... do other work ...
stop.set() # stop streaming
Returns a threading.Event — call .set() to stop. Runs in a background thread so it doesn't block your main loop.
Which to use: if you're piping from rpicam-vid, picamera2, or your own capture process, use send_video_frame(). If you have an RTSP URL or file path, use stream_camera().
Bring Your Own Protocol
This package ships no adapters, auto-detection, or daemons — just the client. Use whatever library you'd use anyway and pipe values into px.send().
# MAVLink (pymavlink)
for msg in conn:
if msg.get_type() == "ATTITUDE":
px.send("attitude.roll", msg.roll)
# CAN (python-can)
for msg in bus:
px.send(f"can.0x{msg.arbitration_id:x}", int.from_bytes(msg.data, "big"))
# MQTT (paho-mqtt)
def on_message(_c, _u, msg):
px.send(msg.topic.replace("/", "."), float(msg.payload))
# I2C sensor (Adafruit CircuitPython)
px.send("temperature", bme.temperature)
See examples/ for runnable versions of each.
Reliability
Every send buffers locally before hitting the network, retries with exponential backoff, and keeps your data safe across outages. Enable SQLite persistence to survive restarts and power loss:
px = Plexus(persistent_buffer=True)
Point counts and flush:
px.buffer_size()
px.flush_buffer()
Timestamps and clock correction
By default — px.send("temp", 72.5) with no timestamp argument — the SDK picks the time itself. Over WebSocket, it synchronizes with the gateway clock on every connection, so data lands at the right place on the timeline even if the device's system clock is wrong (no NTP on first boot, stale RTC, fresh OS image).
px.send("temperature", 72.5) # SDK picks time; gateway-synced over WS
px.send("temperature", 72.5, timestamp=t) # your timestamp, used as-is, no correction
Pass an explicit timestamp when you have a reliable external time source (GPS, trusted RTC, host NTP) or are replaying historical data with known timestamps.
Omit timestamp when the device may have booted without NTP — which is the default on Raspberry Pi, Jetson, and most embedded Linux boards without a network connection at first boot.
Known limits:
- Clock sync refreshes on WebSocket (re)connect. A device with a drifting RTC that stays connected for many days accumulates uncorrected drift between reconnects.
- HTTP-only transport (
transport="http") does not receive clock sync — timestamps default to the uncorrected device clock. send_batch()shares one timestamp across the whole batch. For per-point timestamps, callsend()in a loop.
Transport
By default the SDK connects over a WebSocket to /ws/device on the gateway — same wire protocol as the C SDK. This gives you:
- lower-latency streaming of telemetry,
- live command delivery from the UI / API to the device.
If the socket is unavailable, sends transparently fall back to POST /ingest so no data is lost.
# default — ws with http fallback
px = Plexus()
# force http (legacy)
px = Plexus(transport="http")
Handling commands
Register a handler before the first send() so the command is advertised in the auth frame:
def reboot(name, params):
delay = params.get("delay_s", 0)
# ... reboot logic ...
return {"ok": True, "delay": delay}
px = Plexus()
px.on_command("reboot", reboot, description="reboot the device")
px.send("temperature", 72.5) # opens the socket, waits for auth
The SDK sends an ack frame before invoking the handler, then a result frame with whatever the handler returns (or an error frame if it raises).
Environment Variables
| Variable | Description | Default |
|---|---|---|
PLEXUS_API_KEY |
API key (required) | none |
PLEXUS_GATEWAY_URL |
HTTP ingest URL | https://plexus-gateway.fly.dev |
PLEXUS_GATEWAY_WS_URL |
WebSocket URL | wss://plexus-gateway.fly.dev |
Architecture
Your code ── px.send() ── HTTP POST /ingest ──> plexus-gateway ──> ClickHouse + Dashboard
One thin path. No agent, no daemon, no adapters. If you want the full HardwareOps platform — dashboards, alerts, RCA, fleet views — that's the web UI at app.plexus.company. This package gets your data there.
License
Apache 2.0
Project details
Release history Release notifications | RSS feed
Download files
Download the file for your platform. If you're not sure which to choose, learn more about installing packages.
Source Distribution
Built Distribution
Filter files by name, interpreter, ABI, and platform.
If you're not sure about the file name format, learn more about wheel file names.
Copy a direct link to the current filters
File details
Details for the file plexus_python-0.7.0.tar.gz.
File metadata
- Download URL: plexus_python-0.7.0.tar.gz
- Upload date:
- Size: 171.5 kB
- Tags: Source
- Uploaded using Trusted Publishing? Yes
- Uploaded via: twine/6.1.0 CPython/3.13.12
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
49c19789d523c74be35ca660b88b53a70b1e4191d2293a37151b4182467e8fe1
|
|
| MD5 |
f2ea188f25c1eb1f7daf163ea3e98fcf
|
|
| BLAKE2b-256 |
8b8cc000889355a9da617f8dc00c1779d00cc02306ac0c8743454e1e56fe3094
|
Provenance
The following attestation bundles were made for plexus_python-0.7.0.tar.gz:
Publisher:
publish.yml on plexus-oss/plexus-python
-
Statement:
-
Statement type:
https://in-toto.io/Statement/v1 -
Predicate type:
https://docs.pypi.org/attestations/publish/v1 -
Subject name:
plexus_python-0.7.0.tar.gz -
Subject digest:
49c19789d523c74be35ca660b88b53a70b1e4191d2293a37151b4182467e8fe1 - Sigstore transparency entry: 1673812410
- Sigstore integration time:
-
Permalink:
plexus-oss/plexus-python@ca1f32c8b264e363013e098b85ce6d2eacccd3d7 -
Branch / Tag:
refs/tags/v0.7.0 - Owner: https://github.com/plexus-oss
-
Access:
public
-
Token Issuer:
https://token.actions.githubusercontent.com -
Runner Environment:
github-hosted -
Publication workflow:
publish.yml@ca1f32c8b264e363013e098b85ce6d2eacccd3d7 -
Trigger Event:
release
-
Statement type:
File details
Details for the file plexus_python-0.7.0-py3-none-any.whl.
File metadata
- Download URL: plexus_python-0.7.0-py3-none-any.whl
- Upload date:
- Size: 40.3 kB
- Tags: Python 3
- Uploaded using Trusted Publishing? Yes
- Uploaded via: twine/6.1.0 CPython/3.13.12
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
7b1279e1b601ca22e0d1d0865b5830f544f9d7ab45fccfe092285701f6941a04
|
|
| MD5 |
217b9e504b4829efdc062f5377e15cd3
|
|
| BLAKE2b-256 |
1b92558719ecf88f0bd237cfcffa69480b4d63e8bfb822e75b1a2d53428647aa
|
Provenance
The following attestation bundles were made for plexus_python-0.7.0-py3-none-any.whl:
Publisher:
publish.yml on plexus-oss/plexus-python
-
Statement:
-
Statement type:
https://in-toto.io/Statement/v1 -
Predicate type:
https://docs.pypi.org/attestations/publish/v1 -
Subject name:
plexus_python-0.7.0-py3-none-any.whl -
Subject digest:
7b1279e1b601ca22e0d1d0865b5830f544f9d7ab45fccfe092285701f6941a04 - Sigstore transparency entry: 1673812524
- Sigstore integration time:
-
Permalink:
plexus-oss/plexus-python@ca1f32c8b264e363013e098b85ce6d2eacccd3d7 -
Branch / Tag:
refs/tags/v0.7.0 - Owner: https://github.com/plexus-oss
-
Access:
public
-
Token Issuer:
https://token.actions.githubusercontent.com -
Runner Environment:
github-hosted -
Publication workflow:
publish.yml@ca1f32c8b264e363013e098b85ce6d2eacccd3d7 -
Trigger Event:
release
-
Statement type: