Skip to main content

RelayX App SDK for Python

Project description

RelayX App SDK for Python

Official Python SDK for building applications on the RelayX platform.

View Full Documentation →

Installation

pip install relayx_app_sdk

Quick Start

import asyncio
from relayx_app_sdk import RelayApp

app = RelayApp({
    "api_key": "<YOUR_API_KEY>",
    "secret": "<YOUR_SECRET>",
    "mode": "production",
})

async def main():
    app.connection.listeners(lambda event: print(f"[connection] {event}"))
    await app.connect()

    await app.telemetry.stream({
        "device_ident": "sensor-1",
        "metric": "temperature",
        "callback": lambda data: print(f"temp: {data}"),
    })

    # ... your application logic ...

    await app.disconnect()

asyncio.run(main())

Configuration

app = RelayApp({
    "api_key": "<YOUR_API_KEY>",   # JWT credential from RelayX console
    "secret": "<YOUR_SECRET>",      # Secret key
    "mode": "production",           # "production" | "test"
    "debug": False,                 # Enable debug logging (default: False)
})

Get your credentials at console.relay-x.io.

Connection

await app.connect()
await app.disconnect()

# Connection lifecycle events
app.connection.listeners(lambda event: print(event))
# Events: "connected" | "disconnected" | "reconnecting" | "reconnected" | "auth_failed"

Presence

Subscribe to device connect/disconnect events.

async def on_presence(data):
    print(f"{data['device_ident']} {data['event']}")
    # data["event"]: "connected" | "disconnected"

await app.connection.presence(on_presence)

# Unsubscribe
await app.connection.presence_off()

Devices

# List all devices
devices = await app.device.list()

# Get a single device
device = await app.device.get({"ident": "sensor-1"})

# Create a device
device = await app.device.create({
    "ident": "sensor-1",
    "schema": {
        "temperature": {"type": "number", "unit": "Celsius", "unit_symbol": "°C"},
        "humidity": {"type": "number", "unit": "Percentage", "unit_symbol": "%"},
    },
    "config": {},
})

# Update a device
device = await app.device.update({
    "id": device["id"],
    "config": {"interval": 5000},
})

# Delete a device
await app.device.delete("sensor-1")

Telemetry

Live Streaming

Stream real-time telemetry from a device. The metric is validated against the device schema.

# Stream a specific metric
await app.telemetry.stream({
    "device_ident": "sensor-1",
    "metric": "temperature",
    "callback": lambda data: print(f"[{data['metric']}] {data['data']}"),
})

# Stream all metrics
await app.telemetry.stream({
    "device_ident": "sensor-1",
    "metric": "*",
    "callback": lambda data: print(data),
})

# Unsubscribe from specific metrics
await app.telemetry.off({"device_ident": "sensor-1", "metric": ["temperature"]})

# Unsubscribe from all metrics for a device
await app.telemetry.off({"device_ident": "sensor-1"})

History

history = await app.telemetry.history({
    "device_ident": "sensor-1",
    "fields": ["temperature", "humidity"],
    "start": "2026-03-01T00:00:00.000Z",
    "end": "2026-03-25T00:00:00.000Z",
})

Latest

Fetches the most recent telemetry values (last 24 hours).

latest = await app.telemetry.latest({
    "device_ident": "sensor-1",
    "fields": ["temperature", "humidity"],
})

Commands

Send one-way commands to devices.

# Send to one or more devices
result = await app.command.send({
    "name": "set_interval",
    "device_ident": ["sensor-1", "sensor-2"],
    "data": {"interval": 5000},
})
# result: {"sensor-1": {"sent": True}, "sensor-2": {"sent": True}}

# Command history
history = await app.command.history({
    "name": "set_interval",
    "device_idents": ["sensor-1"],
    "start": "2026-03-01T00:00:00.000Z",
    "end": "2026-03-25T00:00:00.000Z",
})

RPC

Make request/reply calls to devices.

response = await app.rpc.call({
    "device_ident": "sensor-1",
    "name": "get_status",
    "data": {"verbose": True},
    "timeout": 10,  # seconds (default: 10)
})

Events

Subscribe to device-published events.

await app.events.stream({
    "name": "door_opened",
    "callback": lambda data: print(f"Event: {data}"),
})

await app.events.off({"name": "door_opened"})

Alerts

CRUD

# Create a threshold alert
alert = await app.alert.create({
    "name": "high-temp",
    "type": "THRESHOLD",  # "THRESHOLD" | "RATE_CHANGE"
    "metric": "temperature",
    "config": {"threshold": 85, "duration": 5},
    "notification_channel": ["ops-webhook"],
})

# Get, update, delete
alert = await app.alert.get("high-temp")
alert = await app.alert.update({"id": alert["id"], "config": {"threshold": 90}})
await app.alert.delete(alert["id"])

# List all alerts
alerts = await app.alert.list()

Listening

alert = await app.alert.get("high-temp")

await alert.listen({
    "on_fire": lambda data: print("FIRED:", data),
    "on_resolved": lambda data: print("RESOLVED:", data),
    "on_ack": lambda data: print("ACK:", data),
    "on_ack_all": lambda data: print("ACK ALL:", data),
})

History

history = await app.alert.history({
    "rule_type": "RULE",  # "RULE" | "DEVICE"
    "rule_id": alert["id"],
    "rule_states": ["fire", "resolved"],
    "start": "2026-03-01T00:00:00.000Z",
    "end": "2026-03-25T00:00:00.000Z",
})

Acknowledge

# Acknowledge for a specific device
await app.alert.ack({
    "device_id": "<device_id>",
    "alert_id": alert["id"],
    "acked_by": "operator-1",
    "ack_notes": "Investigating",
})

# Acknowledge all instances
await app.alert.ack_all({
    "alert_id": alert["id"],
    "acked_by": "operator-1",
})

Mute / Unmute

await app.alert.mute({
    "id": alert["id"],
    "mute_config": {"type": "FOREVER"},
    # or {"type": "TIME_BASED", "mute_till": "2026-04-01T00:00:00.000Z"}
})

await app.alert.unmute(alert["id"])

Ephemeral Alerts

Ephemeral alerts let you define custom alert rules that are evaluated client-side with your own logic. See the full guide: Ephemeral Alerting Guide.

# Create an ephemeral alert
alert = await app.alert.create_ephemeral({
    "name": "custom-temp-alert",
    "config": {
        "topic": {
            "source": "TELEMETRY",
            "device_ident": "sensor-1",
            "last_token": "temperature",
        },
        "duration": 5,
        "recovery_duration": 10,
        "recovery_eval_type": "VALUE",
    },
})

# Set your evaluator
alert.set_evaluator(
    lambda state: state.get("sensor-1", {}).get("temperature", {}).get("value", 0) > 85
)

# Start monitoring
await alert.listen({
    "on_fire": lambda data: print("ALERT:", data),
    "on_resolved": lambda data: print("RESOLVED:", data),
})

# Stop
await alert.stop()

Logical Groups

Group devices by tags for batch operations and streaming.

# Create
group = await app.logical_group.create({
    "name": "floor-1-sensors",
    "tags": ["floor_1", "temperature"],
    "device_idents": ["sensor-1", "sensor-2"],
})

# Update membership
group = await app.logical_group.update({
    "id": group["id"],
    "devices": {"add": ["sensor-3"], "remove": ["sensor-1"]},
    "tags": {"add": ["humidity"], "remove": ["floor_1"]},
})

# List, get, delete
groups = await app.logical_group.list()
group = await app.logical_group.get(group["id"])
devices = await app.logical_group.list_devices(group["id"])
await app.logical_group.delete(group["id"])

Group Streaming

Each group instance has stream() and off() methods.

group = await app.logical_group.get("<group_id>")

await group.stream({
    "callback": lambda data: print(data),
})

await group.off()

Hierarchy Groups

Organize devices in a hierarchy path (e.g., building_1.floor_2.zone_a).

# Create
group = await app.heirarchy_group.create({
    "name": "zone-a-sensors",
    "heirarchy": "building_1.floor_2.zone_a",
    "device_idents": ["sensor-1", "sensor-2"],
})

# Update
group = await app.heirarchy_group.update({
    "id": group["id"],
    "devices": {"add": ["sensor-3"], "remove": []},
    "heirarchy": "building_1.floor_3.zone_a",
})

# List, get, delete
groups = await app.heirarchy_group.list()
group = await app.heirarchy_group.get(group["id"])
devices = await app.heirarchy_group.list_devices(group["id"])
await app.heirarchy_group.delete(group["id"])

Hierarchy Group Streaming

Supports metric and hierarchy path filtering with wildcards.

group = await app.heirarchy_group.get("<group_id>")

# Stream all data
await group.stream({"callback": lambda data: print(data)})

# Filter by metric
await group.stream({
    "metric": "temperature",
    "callback": lambda data: print(data),
})

# Filter by hierarchy path (supports * and > wildcards)
await group.stream({
    "heirarchy": "building_1.*.zone_a",
    "callback": lambda data: print(data),
})

await group.off()

Notifications

Create webhook or email notification channels for alerts.

# Webhook
notif = await app.notification.create({
    "name": "ops-webhook",
    "type": "WEBHOOK",
    "config": {"endpoint": "https://hooks.example.com/alerts"},
})

# Email
notif = await app.notification.create({
    "name": "ops-email",
    "type": "EMAIL",
    "config": {
        "recipients": ["ops@example.com"],
        "subject": "Alert Notification",
        "template": "Alert {{alert_name}} fired on {{device_ident}}",
    },
})

# Update, delete, list, get
notif = await app.notification.update({
    "name": "ops-webhook",
    "type": "WEBHOOK",
    "config": {"endpoint": "https://new-url.com"},
})
await app.notification.delete(notif["id"])
notifs = await app.notification.list()
notif = await app.notification.get("<notif_id>")

Offline Behavior

  • Commands: Buffered in memory while disconnected and flushed automatically on reconnect.
  • Subscriptions: All active telemetry, event, presence, alert, and group stream subscriptions are automatically restored on reconnect.
  • Ephemeral Alerts: Alert state events (fire, resolved, ack) are buffered and published on reconnect.

Error Handling

The SDK raises standard Python exceptions:

  • ValueError — Invalid arguments, missing required fields, schema validation failures
  • RuntimeError — Operations attempted while disconnected
try:
    await app.telemetry.stream({
        "device_ident": "sensor-1",
        "metric": "nonexistent",
        "callback": lambda d: None,
    })
except ValueError as e:
    print(f"Validation error: {e}")

License

Apache-2.0

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

relayx_app_sdk-0.1.1.tar.gz (47.8 kB view details)

Uploaded Source

Built Distribution

If you're not sure about the file name format, learn more about wheel file names.

relayx_app_sdk-0.1.1-py3-none-any.whl (40.1 kB view details)

Uploaded Python 3

File details

Details for the file relayx_app_sdk-0.1.1.tar.gz.

File metadata

  • Download URL: relayx_app_sdk-0.1.1.tar.gz
  • Upload date:
  • Size: 47.8 kB
  • Tags: Source
  • Uploaded using Trusted Publishing? No
  • Uploaded via: twine/6.2.0 CPython/3.14.3

File hashes

Hashes for relayx_app_sdk-0.1.1.tar.gz
Algorithm Hash digest
SHA256 e682e27ab894585b1464b905cd58492a53dcb3b0ab15ba844b8cc73990fde7fc
MD5 e71b1b6ae8b6d0054e68b8c05b3966c1
BLAKE2b-256 2acfa3231750650ba3876cd2f71629441cfe5bf7064f5b95506be8c71a56bed4

See more details on using hashes here.

File details

Details for the file relayx_app_sdk-0.1.1-py3-none-any.whl.

File metadata

  • Download URL: relayx_app_sdk-0.1.1-py3-none-any.whl
  • Upload date:
  • Size: 40.1 kB
  • Tags: Python 3
  • Uploaded using Trusted Publishing? No
  • Uploaded via: twine/6.2.0 CPython/3.14.3

File hashes

Hashes for relayx_app_sdk-0.1.1-py3-none-any.whl
Algorithm Hash digest
SHA256 b3d70302512c52753a1b2b9374ed82cff3c1a6e9ba22219da6e01985a062dd5d
MD5 149f1638d2f3fae1ea54b7d0a90afa92
BLAKE2b-256 2a5c44353f9d4d50f03b23f4916d921f071e634e1978bbb82bb1a2533cdc0640

See more details on using hashes here.

Supported by

AWS Cloud computing and Security Sponsor Datadog Monitoring Depot Continuous Integration Fastly CDN Google Download Analytics Pingdom Monitoring Sentry Error logging StatusPage Status page