Skip to main content

RelayX App SDK for Python

Project description

RelayX App SDK for Python

Official Python SDK for building applications on the RelayX platform.

View Full Documentation →

Installation

pip install relayx_app_sdk

Quick Start

import asyncio
from relayx_app_sdk import RelayApp

app = RelayApp({
    "api_key": "<YOUR_API_KEY>",
    "secret": "<YOUR_SECRET>",
    "mode": "production",
})

async def main():
    app.connection.listeners(lambda event: print(f"[connection] {event}"))
    await app.connect()

    await app.telemetry.stream({
        "device_ident": "sensor-1",
        "metric": "temperature",
        "callback": lambda data: print(f"temp: {data}"),
    })

    # ... your application logic ...

    await app.disconnect()

asyncio.run(main())

Configuration

app = RelayApp({
    "api_key": "<YOUR_API_KEY>",   # JWT credential from RelayX console
    "secret": "<YOUR_SECRET>",      # Secret key
    "mode": "production",           # "production" | "test"
    "debug": False,                 # Enable debug logging (default: False)
})

Get your credentials at console.relay-x.io.

Connection

await app.connect()
await app.disconnect()

# Connection lifecycle events
app.connection.listeners(lambda event: print(event))
# Events: "connected" | "disconnected" | "reconnecting" | "reconnected" | "auth_failed"

Presence

Subscribe to device connect/disconnect events.

async def on_presence(data):
    print(f"{data['device_ident']} {data['event']}")
    # data["event"]: "connected" | "disconnected"

await app.connection.presence(on_presence)

# Unsubscribe
await app.connection.presence_off()

Devices

# List all devices
devices = await app.device.list()

# Get a single device
device = await app.device.get({"ident": "sensor-1"})

# Create a device
device = await app.device.create({
    "ident": "sensor-1",
    "schema": {
        "temperature": {"type": "number", "unit": "Celsius", "unit_symbol": "°C"},
        "humidity": {"type": "number", "unit": "Percentage", "unit_symbol": "%"},
    },
    "config": {},
})

# Update a device
device = await app.device.update({
    "id": device["id"],
    "config": {"interval": 5000},
})

# Delete a device
await app.device.delete("sensor-1")

Telemetry

Live Streaming

Stream real-time telemetry from a device. The metric is validated against the device schema.

# Stream a specific metric
await app.telemetry.stream({
    "device_ident": "sensor-1",
    "metric": "temperature",
    "callback": lambda data: print(f"[{data['metric']}] {data['data']}"),
})

# Stream all metrics
await app.telemetry.stream({
    "device_ident": "sensor-1",
    "metric": "*",
    "callback": lambda data: print(data),
})

# Unsubscribe from specific metrics
await app.telemetry.off({"device_ident": "sensor-1", "metric": ["temperature"]})

# Unsubscribe from all metrics for a device
await app.telemetry.off({"device_ident": "sensor-1"})

History

history = await app.telemetry.history({
    "device_ident": "sensor-1",
    "fields": ["temperature", "humidity"],
    "start": "2026-03-01T00:00:00.000Z",
    "end": "2026-03-25T00:00:00.000Z",
})

Latest

Fetches the most recent telemetry values (last 24 hours).

latest = await app.telemetry.latest({
    "device_ident": "sensor-1",
    "fields": ["temperature", "humidity"],
})

Commands

Send one-way commands to devices.

# Send to one or more devices
result = await app.command.send({
    "name": "set_interval",
    "device_ident": ["sensor-1", "sensor-2"],
    "data": {"interval": 5000},
})
# result: {"sensor-1": {"sent": True}, "sensor-2": {"sent": True}}

# Command history
history = await app.command.history({
    "name": "set_interval",
    "device_idents": ["sensor-1"],
    "start": "2026-03-01T00:00:00.000Z",
    "end": "2026-03-25T00:00:00.000Z",
})

RPC

Make request/reply calls to devices.

response = await app.rpc.call({
    "device_ident": "sensor-1",
    "name": "get_status",
    "data": {"verbose": True},
    "timeout": 10,  # seconds (default: 10)
})

Events

Subscribe to device-published events.

await app.events.stream({
    "name": "door_opened",
    "callback": lambda data: print(f"Event: {data}"),
})

await app.events.off({"name": "door_opened"})

Alerts

CRUD

# Create a threshold alert
alert = await app.alert.create({
    "name": "high-temp",
    "type": "THRESHOLD",  # "THRESHOLD" | "RATE_CHANGE"
    "metric": "temperature",
    "config": {"threshold": 85, "duration": 5},
    "notification_channel": ["ops-webhook"],
})

# Get, update, delete
alert = await app.alert.get("high-temp")
alert = await app.alert.update({"id": alert["id"], "config": {"threshold": 90}})
await app.alert.delete(alert["id"])

# List all alerts
alerts = await app.alert.list()

Listening

alert = await app.alert.get("high-temp")

await alert.listen({
    "on_fire": lambda data: print("FIRED:", data),
    "on_resolved": lambda data: print("RESOLVED:", data),
    "on_ack": lambda data: print("ACK:", data),
    "on_ack_all": lambda data: print("ACK ALL:", data),
})

History

history = await app.alert.history({
    "rule_type": "RULE",  # "RULE" | "DEVICE"
    "rule_id": alert["id"],
    "rule_states": ["fire", "resolved"],
    "start": "2026-03-01T00:00:00.000Z",
    "end": "2026-03-25T00:00:00.000Z",
})

Acknowledge

# Acknowledge for a specific device
await app.alert.ack({
    "device_id": "<device_id>",
    "alert_id": alert["id"],
    "acked_by": "operator-1",
    "ack_notes": "Investigating",
})

# Acknowledge all instances
await app.alert.ack_all({
    "alert_id": alert["id"],
    "acked_by": "operator-1",
})

Mute / Unmute

await app.alert.mute({
    "id": alert["id"],
    "mute_config": {"type": "FOREVER"},
    # or {"type": "TIME_BASED", "mute_till": "2026-04-01T00:00:00.000Z"}
})

await app.alert.unmute(alert["id"])

Ephemeral Alerts

Ephemeral alerts let you define custom alert rules that are evaluated client-side with your own logic. See the full guide: Ephemeral Alerting Guide.

# Create an ephemeral alert
alert = await app.alert.create_ephemeral({
    "name": "custom-temp-alert",
    "config": {
        "topic": {
            "source": "TELEMETRY",
            "device_ident": "sensor-1",
            "last_token": "temperature",
        },
        "duration": 5,
        "recovery_duration": 10,
        "recovery_eval_type": "VALUE",
    },
})

# Set your evaluator
alert.set_evaluator(
    lambda state: state.get("sensor-1", {}).get("temperature", {}).get("value", 0) > 85
)

# Start monitoring
await alert.listen({
    "on_fire": lambda data: print("ALERT:", data),
    "on_resolved": lambda data: print("RESOLVED:", data),
})

# Stop
await alert.stop()

Logical Groups

Group devices by tags for batch operations and streaming.

# Create
group = await app.logical_group.create({
    "name": "floor-1-sensors",
    "tags": ["floor_1", "temperature"],
    "device_idents": ["sensor-1", "sensor-2"],
})

# Update membership
group = await app.logical_group.update({
    "id": group["id"],
    "devices": {"add": ["sensor-3"], "remove": ["sensor-1"]},
    "tags": {"add": ["humidity"], "remove": ["floor_1"]},
})

# List, get, delete
groups = await app.logical_group.list()
group = await app.logical_group.get(group["id"])
devices = await app.logical_group.list_devices(group["id"])
await app.logical_group.delete(group["id"])

Group Streaming

Each group instance has stream() and off() methods.

group = await app.logical_group.get("<group_id>")

await group.stream({
    "callback": lambda data: print(data),
})

await group.off()

Hierarchy Groups

Organize devices in a hierarchy path (e.g., building_1.floor_2.zone_a).

# Create
group = await app.heirarchy_group.create({
    "name": "zone-a-sensors",
    "heirarchy": "building_1.floor_2.zone_a",
    "device_idents": ["sensor-1", "sensor-2"],
})

# Update
group = await app.heirarchy_group.update({
    "id": group["id"],
    "devices": {"add": ["sensor-3"], "remove": []},
    "heirarchy": "building_1.floor_3.zone_a",
})

# List, get, delete
groups = await app.heirarchy_group.list()
group = await app.heirarchy_group.get(group["id"])
devices = await app.heirarchy_group.list_devices(group["id"])
await app.heirarchy_group.delete(group["id"])

Hierarchy Group Streaming

Supports metric and hierarchy path filtering with wildcards.

group = await app.heirarchy_group.get("<group_id>")

# Stream all data
await group.stream({"callback": lambda data: print(data)})

# Filter by metric
await group.stream({
    "metric": "temperature",
    "callback": lambda data: print(data),
})

# Filter by hierarchy path (supports * and > wildcards)
await group.stream({
    "heirarchy": "building_1.*.zone_a",
    "callback": lambda data: print(data),
})

await group.off()

Notifications

Create webhook or email notification channels for alerts.

# Webhook
notif = await app.notification.create({
    "name": "ops-webhook",
    "type": "WEBHOOK",
    "config": {"endpoint": "https://hooks.example.com/alerts"},
})

# Email
notif = await app.notification.create({
    "name": "ops-email",
    "type": "EMAIL",
    "config": {
        "recipients": ["ops@example.com"],
        "subject": "Alert Notification",
        "template": "Alert {{alert_name}} fired on {{device_ident}}",
    },
})

# Update, delete, list, get
notif = await app.notification.update({
    "name": "ops-webhook",
    "type": "WEBHOOK",
    "config": {"endpoint": "https://new-url.com"},
})
await app.notification.delete(notif["id"])
notifs = await app.notification.list()
notif = await app.notification.get("<notif_id>")

Offline Behavior

  • Commands: Buffered in memory while disconnected and flushed automatically on reconnect.
  • Subscriptions: All active telemetry, event, presence, alert, and group stream subscriptions are automatically restored on reconnect.
  • Ephemeral Alerts: Alert state events (fire, resolved, ack) are buffered and published on reconnect.

Error Handling

The SDK raises standard Python exceptions:

  • ValueError — Invalid arguments, missing required fields, schema validation failures
  • RuntimeError — Operations attempted while disconnected
try:
    await app.telemetry.stream({
        "device_ident": "sensor-1",
        "metric": "nonexistent",
        "callback": lambda d: None,
    })
except ValueError as e:
    print(f"Validation error: {e}")

License

Apache-2.0

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

relayx_app_sdk-0.1.2.tar.gz (48.2 kB view details)

Uploaded Source

Built Distribution

If you're not sure about the file name format, learn more about wheel file names.

relayx_app_sdk-0.1.2-py3-none-any.whl (40.2 kB view details)

Uploaded Python 3

File details

Details for the file relayx_app_sdk-0.1.2.tar.gz.

File metadata

  • Download URL: relayx_app_sdk-0.1.2.tar.gz
  • Upload date:
  • Size: 48.2 kB
  • Tags: Source
  • Uploaded using Trusted Publishing? No
  • Uploaded via: twine/6.2.0 CPython/3.14.3

File hashes

Hashes for relayx_app_sdk-0.1.2.tar.gz
Algorithm Hash digest
SHA256 7592763d2b17a5b74a75edca54dd209927b2b37dee3162609b087d8f21fd5d7c
MD5 bd9133f8bedc5fe871bc3f44b1768ca1
BLAKE2b-256 a36f924f8c40c623c718116db92506840b7cff50898775f5ed9c35e29c11cb3b

See more details on using hashes here.

File details

Details for the file relayx_app_sdk-0.1.2-py3-none-any.whl.

File metadata

  • Download URL: relayx_app_sdk-0.1.2-py3-none-any.whl
  • Upload date:
  • Size: 40.2 kB
  • Tags: Python 3
  • Uploaded using Trusted Publishing? No
  • Uploaded via: twine/6.2.0 CPython/3.14.3

File hashes

Hashes for relayx_app_sdk-0.1.2-py3-none-any.whl
Algorithm Hash digest
SHA256 3f2e51b4dbedf173cb5033b2df5df406461e26a57f454e0d9c1af1b2bf77b039
MD5 f90e92629dc0d0a05081f02cb6c58353
BLAKE2b-256 e47988321d89e9cc91c41600299dcaaff09f2639416e2c2279867b24b5925fed

See more details on using hashes here.

Supported by

AWS Cloud computing and Security Sponsor Datadog Monitoring Depot Continuous Integration Fastly CDN Google Download Analytics Pingdom Monitoring Sentry Error logging StatusPage Status page