Skip to main content

Python SDK for Restless Stream

Project description

restless-stream

Official Python SDK for Restless Stream: https://restlessapi.stream

Restless Stream turns REST APIs into live Server-Sent Events and WebSocket streams. This package provides synchronous REST and SSE support, asynchronous REST, SSE, and WebSocket support, typed Pydantic models, runtime URL builders, and HMAC signature helpers.

Requirements

  • Python >=3.14.
  • A Restless Stream account and API key.

Installation

python -m pip install restless-stream

Getting Started

Async Client

Use AsyncRestlessStreamClient when you need async REST calls, SSE subscriptions, or WebSocket subscriptions.

import asyncio
import os

from restless_stream import AsyncRestlessStreamClient


async def main() -> None:
  async with AsyncRestlessStreamClient(api_key=os.environ["RESTLESS_API_KEY"]) as client:
    stream = await client.streams.create(
      name="Orders",
      description="Live order feed",
      status="ACTIVE",
      method="GET",
      url="https://api.example.com/orders",
      payload_mode="FULL_DATA",
      polling_interval=30,
    )

    async for event in client.streams.subscribe_sse(stream.sse_url, reconnect=False):
      print(event.type, event.data)


asyncio.run(main())

Sync Client

Use RestlessStreamClient for synchronous REST calls and SSE subscriptions.

import os

from restless_stream import RestlessStreamClient


with RestlessStreamClient(api_key=os.environ["RESTLESS_API_KEY"]) as client:
  streams = client.streams.list(limit=20, offset=0)

  for stream in streams.streams:
    print(stream.id, stream.name)

Client Configuration

from restless_stream import AsyncRestlessStreamClient, RestlessStreamClient

client = RestlessStreamClient(
  api_key="rs_...",
  base_url="https://api.restlessapi.stream",
  stream_base_url="https://stream.restlessapi.stream",
  timeout=30.0,
)

async_client = AsyncRestlessStreamClient(
  api_key="rs_...",
  timeout=30.0,
)
Option Description
api_key Sends x-api-key on REST requests and Authorization: Bearer <key> on stream runtime requests.
base_url REST API base URL. Defaults to https://api.restlessapi.stream.
stream_base_url Runtime stream base URL. Defaults to https://stream.restlessapi.stream.
timeout HTTP timeout in seconds for the owned httpx client. Defaults to 30.0.
http_client Optional httpx.Client or httpx.AsyncClient. When provided, the SDK does not close it.

Both clients support context managers. Use close() for the sync client and await aclose() for the async client when you do not use a context manager.

When creating or updating streams, the SDK adds apiKey to the request body when the client has an API key and the body does not already include apiKey or api_key.

Stream Management

All stream management methods are available as top-level client methods and through client.streams.

stream = await client.streams.create(
  name="Orders",
  description="Live order feed",
  status="ACTIVE",
  method="GET",
  url="https://api.example.com/orders",
  headers={"Accept": "application/json"},
  payload_mode="FULL_DATA",
  polling_interval=30,
)

await client.streams.update(stream.id, name="Orders v2", polling_interval=60)
await client.streams.stop(stream.id)
await client.streams.start(stream.id)

usage = await client.streams.credit_usage_stats(stream.id)
snippets = await client.streams.connection_snippets(stream_id=stream.id, language="python")
Top-level method Resource method Description
list_streams(limit=20, offset=0) streams.list(...) List streams.
get_stream(stream_id) streams.get(stream_id) Get one stream.
create_stream(data=None, **kwargs) streams.create(...) Create a persisted stream.
update_stream(stream_id, data=None, **kwargs) streams.update(...) Patch stream configuration.
start_stream(stream_id) streams.start(stream_id) Mark a stream active.
stop_stream(stream_id) streams.stop(stream_id) Mark a stream inactive.
delete_stream(stream_id) streams.delete(stream_id) Delete a stream.
validate_stream_api_key(api_key=None) streams.validate_api_key(...) Validate an API key. Uses the client key when omitted.
credit_usage_stats(stream_id) streams.credit_usage_stats(...) Fetch credit usage totals and daily usage.
connection_snippets(data=None, **kwargs) streams.connection_snippets(...) Generate runtime URLs and snippets.
direct_setup(data=None, **kwargs) streams.direct_setup(...) Generate direct-stream commands, URLs, and snippets.
direct_session(data=None, **kwargs) streams.direct_session(...) Create or reuse a direct stream session.
subscribe_sse(url, **kwargs) streams.subscribe_sse(...) Subscribe to an SSE runtime URL.
subscribe_direct_sse(**kwargs) streams.subscribe_direct_sse(...) Build and subscribe to a direct SSE URL.
subscribe_websocket(url, **kwargs) streams.subscribe_websocket(...) Async client only. Subscribe to a WebSocket runtime URL.
subscribe_direct_websocket(**kwargs) streams.subscribe_direct_websocket(...) Async client only. Build and subscribe to a direct WebSocket URL.

Request bodies may be mappings, Pydantic models, or keyword arguments. Snake-case keys are converted to the camel-case field names expected by the API.

await client.streams.create(
  name="Orders",
  description="",
  status="ACTIVE",
  method="POST",
  url="https://api.example.com/orders/search",
  body={"status": "open"},
  payload_mode="JSON_PATCH",
  polling_interval=30,
)

Direct Streams

Direct streams let you stream a REST endpoint without creating a persisted stream first.

async for event in client.streams.subscribe_direct_sse(
  url="https://api.example.com/orders",
  method="GET",
  polling_interval=30,
  reconnect=False,
):
  print(event)

Create a reusable direct session when you need a stable URL.

session = await client.streams.direct_session(
  dedupe_key="orders-feed-v1",
  method="GET",
  url="https://api.example.com/orders",
  polling_interval=30,
)

async for event in client.streams.subscribe_sse(session.sse_url):
  print(event.data)

Generate setup commands and code snippets without subscribing.

setup = await client.streams.direct_setup(
  method="POST",
  url="https://api.example.com/search",
  body={"query": "restless"},
  language="python",
)

print(setup.commands.header.curl)
print(setup.runtime.direct_sse_url)

Direct stream inputs support url, method, headers, body, jq_filter, payload_mode, polling_interval, polling_strategies, and api_key.

SSE Streaming

SSE subscriptions yield StreamEvent Pydantic models.

for event in sync_client.streams.subscribe_sse(
  "https://stream.restlessapi.stream?streamId=stream_123",
  cursor="1700000000000",
  reconnect=False,
):
  print(event.type, event.data)
async for event in async_client.streams.subscribe_sse(
  "https://stream.restlessapi.stream?streamId=stream_123",
  since="2026-01-01T00:00:00Z",
  max_reconnects=3,
  retry_seconds=2,
):
  print(event.type, event.meta.timestamp)

SSE options:

Option Description
cursor Initial cursor or SSE event ID. Updated automatically from received events.
since Initial timestamp or cursor used until an event cursor is observed.
reconnect Reconnect after the stream ends or errors. Defaults to True.
max_reconnects Maximum reconnect attempts. None means unbounded.
retry_seconds Delay between reconnect attempts. Defaults to 1.0.

The SDK sends Authorization: Bearer <api_key> for runtime subscriptions when the client has an API key.

WebSocket Streaming

WebSocket subscriptions are async-only.

async for event in async_client.streams.subscribe_websocket(
  "wss://stream.restlessapi.stream/ws?streamId=stream_123",
  cursor="42",
  reconnect=True,
  max_reconnects=3,
):
  print(event.type, event.data)

Direct WebSocket subscription:

async for event in async_client.streams.subscribe_direct_websocket(
  url="https://api.example.com/orders",
  method="GET",
  polling_interval=30,
  connect_kwargs={"open_timeout": 10},
):
  print(event)

WebSocket options include all SSE runtime options plus connect_kwargs, which are passed to websockets.connect. Do not include additional_headers or extra_headers in connect_kwargs when the SDK client has an api_key; the SDK owns the runtime Authorization header in that case.

Runtime URL Helpers

Build runtime URLs without creating a client.

from restless_stream import (
  build_direct_sse_url,
  build_direct_websocket_url,
  build_sse_url,
  build_websocket_url,
  to_websocket_url,
)

sse_url = build_sse_url(stream_id="stream_123", since="2026-01-01T00:00:00Z")
ws_url = build_websocket_url(session_id="session_123")

direct_sse_url = build_direct_sse_url(
  url="https://api.example.com/orders",
  method="POST",
  body={"status": "open"},
  payload_mode="JSON_PATCH",
  polling_interval=30,
)

direct_ws_url = build_direct_websocket_url(url="https://api.example.com/orders")
converted_ws_url = to_websocket_url(sse_url)

build_sse_url requires exactly one of stream_id or session_id.

Models

Response models are Pydantic v2 models. Python attributes use snake case and accept the API's camel-case aliases.

Common models and enums:

Export Description
RestlessModel Base Pydantic model with camel-case aliases and extra fields allowed.
HttpMethod GET, POST, PUT, PATCH, DELETE, OPTIONS.
StreamStatus ACTIVE or INACTIVE.
PayloadMode FULL_DATA or JSON_PATCH.
PollingStrategy Time-windowed polling strategy.
Stream Persisted stream response.
StreamsResponse Stream list response with pagination info.
BaseActionResponse Generic action response.
StreamCreditUsageStats, DailyCreditUsage, CreditUsageChargeTypeBreakdown Credit usage responses.
ConnectionSnippetsResponse Managed stream runtime URLs and snippets.
DirectSetupResponse Direct setup commands, runtime URLs, snippets, and stream config.
DirectSessionResponse Direct session runtime URLs and expiry.
StreamEvent, StreamEventMeta, StreamErrorDetail Runtime event models.

Runtime update events contain type, meta, data, optional signature, and optional event_id. Error events contain type, meta, and error.

Error Handling

REST API failures raise RestlessStreamAPIError.

from restless_stream import RestlessStreamAPIError

try:
  stream = client.streams.get("missing")
except RestlessStreamAPIError as error:
  print(error.status_code, error.message)

Invalid runtime event payloads raise RestlessStreamParseError. Both exceptions inherit from RestlessStreamError.

HMAC Helpers

Use HMAC helpers to compute or verify Restless Stream HMAC-SHA256 signatures over JSON-compatible payloads.

from restless_stream import compute_hmac_signature, verify_hmac_signature

payload = {"id": "order_123", "total": 42}
signature = compute_hmac_signature("secret", payload)

if verify_hmac_signature("secret", payload, signature):
  print("valid")

When verifying a runtime event signature, pass the same JSON payload your integration signs and the event's signature value.

Public Exports

Export Purpose
RestlessStreamClient Synchronous REST and SSE client.
AsyncRestlessStreamClient Asynchronous REST, SSE, and WebSocket client.
RestlessStreamError Base SDK exception.
RestlessStreamAPIError REST API error exception.
RestlessStreamParseError Runtime event parsing exception.
compute_hmac_signature Computes an HMAC-SHA256 signature for a JSON-compatible payload.
verify_hmac_signature Constant-time HMAC signature verification.
DEFAULT_STREAM_BASE_URL Default runtime stream base URL.
build_sse_url, build_websocket_url Managed stream runtime URL builders.
build_direct_sse_url, build_direct_websocket_url Direct stream runtime URL builders.
to_websocket_url Converts an SSE URL to a WebSocket URL.
Models and enums Pydantic response models and enum types listed above.

Appendix: Creating an API Key

API keys authenticate requests to the Restless Stream management API and runtime streams. Keys start with rs_ followed by a hex string:

rs_xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx

Create a key in the dashboard

  1. Sign in at restlessapi.stream and go to API Keys in the dashboard.

API Keys dashboard showing existing keys

  1. Click Create API Key, enter a name and optional description, and set an optional expiry date.

Create API Key modal form

  1. Copy the key immediately — it is shown only once after creation. Store it in an environment variable or secrets manager.
export RESTLESS_API_KEY="rs_xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx"

Pass the key to the client

from restless_stream import RestlessStreamClient

client = RestlessStreamClient(api_key="rs_xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx")
# or read from environment
import os
client = RestlessStreamClient(api_key=os.environ["RESTLESS_API_KEY"])

Sample Responses

streams.list()

from restless_stream import RestlessStreamClient
import os

with RestlessStreamClient(api_key=os.environ["RESTLESS_API_KEY"]) as client:
    result = client.streams.list(limit=5, offset=0)
    print(result)

Response:

{
  "streams": [],
  "pagination_info": {
    "has_next_page": false
  }
}

streams.validate_api_key()

result = client.streams.validate_api_key()

Response:

{
  "id": "rs_xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx",
  "affected": true
}

streams.subscribe_direct_sse() — live event

import asyncio
from restless_stream import AsyncRestlessStreamClient

async def main():
    async with AsyncRestlessStreamClient(api_key=os.environ["RESTLESS_API_KEY"]) as client:
        async for event in client.streams.subscribe_direct_sse(
            url="https://httpbin.org/get",
            method="GET",
            polling_interval=15,
            reconnect=False,
        ):
            print(event)
            break  # receive one event then stop

asyncio.run(main())

Event received:

{
  "type": "update",
  "meta": {
    "timestamp": "2026-06-07T00:49:36.562+00:00",
    "attempt_id": "a59d80d4-7d36-4019-b2c6-40cc8b8742ce",
    "status": 200,
    "latency_ms": 43,
    "payload_mode_fallback": null
  },
  "data": {
    "args": {},
    "headers": {
      "Accept": "*/*",
      "Host": "httpbin.org",
      "User-Agent": "Mozilla/5.0 (compatible; OurInternalService/1.0)",
      "X-Amzn-Trace-Id": "Root=1-6a24c020-316d1d396470c0900ce68495"
    },
    "origin": "140.82.14.127",
    "url": "https://httpbin.org/get"
  },
  "error": null,
  "signature": null,
  "event_id": "1780793376562"
}

Development

python -m pip install -e "packages/python/core[dev]"
python -m ruff check packages/python/core packages/python/examples
cd packages/python/core
python -m pytest tests

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

restless_stream-0.1.2.tar.gz (1.3 MB view details)

Uploaded Source

Built Distribution

If you're not sure about the file name format, learn more about wheel file names.

restless_stream-0.1.2-py3-none-any.whl (17.5 kB view details)

Uploaded Python 3

File details

Details for the file restless_stream-0.1.2.tar.gz.

File metadata

  • Download URL: restless_stream-0.1.2.tar.gz
  • Upload date:
  • Size: 1.3 MB
  • Tags: Source
  • Uploaded using Trusted Publishing? No
  • Uploaded via: twine/6.2.0 CPython/3.14.5

File hashes

Hashes for restless_stream-0.1.2.tar.gz
Algorithm Hash digest
SHA256 9d9ba35290706e8793210c30d5b0ac39b7d90868b255e85f29d7d259a20e29a5
MD5 5b6c93251cb9c8b705a89a7b7e52e15d
BLAKE2b-256 6b862fd12072e7e41c40bfb55fbf00399c6018ba762e8d20ee183e125b934ccd

See more details on using hashes here.

File details

Details for the file restless_stream-0.1.2-py3-none-any.whl.

File metadata

File hashes

Hashes for restless_stream-0.1.2-py3-none-any.whl
Algorithm Hash digest
SHA256 d2be1bae0b6057deb503858f90b289294ebd3ffe0433459c93c291211944a443
MD5 90a9f5a106151ef1376e5b6cb7add05c
BLAKE2b-256 e66017548d00ca8e20ae11d5861e4c4424bf6e82439e14b86c6ec489ad565c3f

See more details on using hashes here.

Supported by

AWS Cloud computing and Security Sponsor Datadog Monitoring Depot Continuous Integration Fastly CDN Google Download Analytics Pingdom Monitoring Sentry Error logging StatusPage Status page