Skip to main content

Python SDK for Restless Stream

Project description

restless-stream

Official Python SDK for Restless Stream: https://restlessapi.stream

Restless Stream turns REST APIs into live Server-Sent Events and WebSocket streams. This package provides synchronous REST and SSE support, asynchronous REST, SSE, and WebSocket support, typed Pydantic models, runtime URL builders, and HMAC signature helpers.

Requirements

  • Python >=3.9.
  • A Restless Stream account and API key.

Installation

python -m pip install restless-stream

For local development from this repository:

python -m pip install -e "packages/python/core[dev]"

Getting Started

Async Client

Use AsyncRestlessStreamClient when you need async REST calls, SSE subscriptions, or WebSocket subscriptions.

import asyncio
import os

from restless_stream import AsyncRestlessStreamClient


async def main() -> None:
  async with AsyncRestlessStreamClient(api_key=os.environ["RESTLESS_API_KEY"]) as client:
    stream = await client.streams.create(
      name="Orders",
      description="Live order feed",
      status="ACTIVE",
      method="GET",
      url="https://api.example.com/orders",
      payload_mode="FULL_DATA",
      polling_interval=30,
    )

    async for event in client.streams.subscribe_sse(stream.sse_url, reconnect=False):
      print(event.type, event.data)


asyncio.run(main())

Sync Client

Use RestlessStreamClient for synchronous REST calls and SSE subscriptions.

import os

from restless_stream import RestlessStreamClient


with RestlessStreamClient(api_key=os.environ["RESTLESS_API_KEY"]) as client:
  streams = client.streams.list(limit=20, offset=0)

  for stream in streams.streams:
    print(stream.id, stream.name)

Client Configuration

from restless_stream import AsyncRestlessStreamClient, RestlessStreamClient

client = RestlessStreamClient(
  api_key="rs_...",
  base_url="https://api.restlessapi.stream",
  stream_base_url="https://stream.restlessapi.stream",
  timeout=30.0,
)

async_client = AsyncRestlessStreamClient(
  api_key="rs_...",
  timeout=30.0,
)
Option Description
api_key Sends x-api-key on REST requests and Authorization: Bearer <key> on stream runtime requests.
base_url REST API base URL. Defaults to https://api.restlessapi.stream.
stream_base_url Runtime stream base URL. Defaults to https://stream.restlessapi.stream.
timeout HTTP timeout in seconds for the owned httpx client. Defaults to 30.0.
http_client Optional httpx.Client or httpx.AsyncClient. When provided, the SDK does not close it.

Both clients support context managers. Use close() for the sync client and await aclose() for the async client when you do not use a context manager.

When creating or updating streams, the SDK adds apiKey to the request body when the client has an API key and the body does not already include apiKey or api_key.

Stream Management

All stream management methods are available as top-level client methods and through client.streams.

stream = await client.streams.create(
  name="Orders",
  description="Live order feed",
  status="ACTIVE",
  method="GET",
  url="https://api.example.com/orders",
  headers={"Accept": "application/json"},
  payload_mode="FULL_DATA",
  polling_interval=30,
)

await client.streams.update(stream.id, name="Orders v2", polling_interval=60)
await client.streams.stop(stream.id)
await client.streams.start(stream.id)

usage = await client.streams.credit_usage_stats(stream.id)
snippets = await client.streams.connection_snippets(stream_id=stream.id, language="python")
Top-level method Resource method Description
list_streams(limit=20, offset=0) streams.list(...) List streams.
get_stream(stream_id) streams.get(stream_id) Get one stream.
create_stream(data=None, **kwargs) streams.create(...) Create a persisted stream.
update_stream(stream_id, data=None, **kwargs) streams.update(...) Patch stream configuration.
start_stream(stream_id) streams.start(stream_id) Mark a stream active.
stop_stream(stream_id) streams.stop(stream_id) Mark a stream inactive.
delete_stream(stream_id) streams.delete(stream_id) Delete a stream.
validate_stream_api_key(api_key=None) streams.validate_api_key(...) Validate an API key. Uses the client key when omitted.
credit_usage_stats(stream_id) streams.credit_usage_stats(...) Fetch credit usage totals and daily usage.
connection_snippets(data=None, **kwargs) streams.connection_snippets(...) Generate runtime URLs and snippets.
direct_setup(data=None, **kwargs) streams.direct_setup(...) Generate direct-stream commands, URLs, and snippets.
direct_session(data=None, **kwargs) streams.direct_session(...) Create or reuse a direct stream session.
subscribe_sse(url, **kwargs) streams.subscribe_sse(...) Subscribe to an SSE runtime URL.
subscribe_direct_sse(**kwargs) streams.subscribe_direct_sse(...) Build and subscribe to a direct SSE URL.
subscribe_websocket(url, **kwargs) streams.subscribe_websocket(...) Async client only. Subscribe to a WebSocket runtime URL.
subscribe_direct_websocket(**kwargs) streams.subscribe_direct_websocket(...) Async client only. Build and subscribe to a direct WebSocket URL.

Request bodies may be mappings, Pydantic models, or keyword arguments. Snake-case keys are converted to the camel-case field names expected by the API.

await client.streams.create(
  name="Orders",
  description="",
  status="ACTIVE",
  method="POST",
  url="https://api.example.com/orders/search",
  body={"status": "open"},
  payload_mode="JSON_PATCH",
  polling_interval=30,
)

Direct Streams

Direct streams let you stream a REST endpoint without creating a persisted stream first.

async for event in client.streams.subscribe_direct_sse(
  url="https://api.example.com/orders",
  method="GET",
  polling_interval=30,
  reconnect=False,
):
  print(event)

Create a reusable direct session when you need a stable URL.

session = await client.streams.direct_session(
  dedupe_key="orders-feed-v1",
  method="GET",
  url="https://api.example.com/orders",
  polling_interval=30,
)

async for event in client.streams.subscribe_sse(session.sse_url):
  print(event.data)

Generate setup commands and code snippets without subscribing.

setup = await client.streams.direct_setup(
  method="POST",
  url="https://api.example.com/search",
  body={"query": "restless"},
  language="python",
)

print(setup.commands.header.curl)
print(setup.runtime.direct_sse_url)

Direct stream inputs support url, method, headers, body, jq_filter, payload_mode, polling_interval, polling_strategies, and api_key.

SSE Streaming

SSE subscriptions yield StreamEvent Pydantic models.

for event in sync_client.streams.subscribe_sse(
  "https://stream.restlessapi.stream?streamId=stream_123",
  cursor="1700000000000",
  reconnect=False,
):
  print(event.type, event.data)
async for event in async_client.streams.subscribe_sse(
  "https://stream.restlessapi.stream?streamId=stream_123",
  since="2026-01-01T00:00:00Z",
  max_reconnects=3,
  retry_seconds=2,
):
  print(event.type, event.meta.timestamp)

SSE options:

Option Description
cursor Initial cursor or SSE event ID. Updated automatically from received events.
since Initial timestamp or cursor used until an event cursor is observed.
reconnect Reconnect after the stream ends or errors. Defaults to True.
max_reconnects Maximum reconnect attempts. None means unbounded.
retry_seconds Delay between reconnect attempts. Defaults to 1.0.

The SDK sends Authorization: Bearer <api_key> for runtime subscriptions when the client has an API key.

WebSocket Streaming

WebSocket subscriptions are async-only.

async for event in async_client.streams.subscribe_websocket(
  "wss://stream.restlessapi.stream/ws?streamId=stream_123",
  cursor="42",
  reconnect=True,
  max_reconnects=3,
):
  print(event.type, event.data)

Direct WebSocket subscription:

async for event in async_client.streams.subscribe_direct_websocket(
  url="https://api.example.com/orders",
  method="GET",
  polling_interval=30,
  connect_kwargs={"open_timeout": 10},
):
  print(event)

WebSocket options include all SSE runtime options plus connect_kwargs, which are passed to websockets.connect. Do not include additional_headers or extra_headers in connect_kwargs when the SDK client has an api_key; the SDK owns the runtime Authorization header in that case.

Runtime URL Helpers

Build runtime URLs without creating a client.

from restless_stream import (
  build_direct_sse_url,
  build_direct_websocket_url,
  build_sse_url,
  build_websocket_url,
  to_websocket_url,
)

sse_url = build_sse_url(stream_id="stream_123", since="2026-01-01T00:00:00Z")
ws_url = build_websocket_url(session_id="session_123")

direct_sse_url = build_direct_sse_url(
  url="https://api.example.com/orders",
  method="POST",
  body={"status": "open"},
  payload_mode="JSON_PATCH",
  polling_interval=30,
)

direct_ws_url = build_direct_websocket_url(url="https://api.example.com/orders")
converted_ws_url = to_websocket_url(sse_url)

build_sse_url requires exactly one of stream_id or session_id.

Models

Response models are Pydantic v2 models. Python attributes use snake case and accept the API's camel-case aliases.

Common models and enums:

Export Description
RestlessModel Base Pydantic model with camel-case aliases and extra fields allowed.
HttpMethod GET, POST, PUT, PATCH, DELETE, OPTIONS.
StreamStatus ACTIVE or INACTIVE.
PayloadMode FULL_DATA or JSON_PATCH.
PollingStrategy Time-windowed polling strategy.
Stream Persisted stream response.
StreamsResponse Stream list response with pagination info.
BaseActionResponse Generic action response.
StreamCreditUsageStats, DailyCreditUsage, CreditUsageChargeTypeBreakdown Credit usage responses.
ConnectionSnippetsResponse Managed stream runtime URLs and snippets.
DirectSetupResponse Direct setup commands, runtime URLs, snippets, and stream config.
DirectSessionResponse Direct session runtime URLs and expiry.
StreamEvent, StreamEventMeta, StreamErrorDetail Runtime event models.

Runtime update events contain type, meta, data, optional signature, and optional event_id. Error events contain type, meta, and error.

Error Handling

REST API failures raise RestlessStreamAPIError.

from restless_stream import RestlessStreamAPIError

try:
  stream = client.streams.get("missing")
except RestlessStreamAPIError as error:
  print(error.status_code, error.message)

Invalid runtime event payloads raise RestlessStreamParseError. Both exceptions inherit from RestlessStreamError.

HMAC Helpers

Use HMAC helpers to compute or verify Restless Stream HMAC-SHA256 signatures over JSON-compatible payloads.

from restless_stream import compute_hmac_signature, verify_hmac_signature

payload = {"id": "order_123", "total": 42}
signature = compute_hmac_signature("secret", payload)

if verify_hmac_signature("secret", payload, signature):
  print("valid")

When verifying a runtime event signature, pass the same JSON payload your integration signs and the event's signature value.

Public Exports

Export Purpose
RestlessStreamClient Synchronous REST and SSE client.
AsyncRestlessStreamClient Asynchronous REST, SSE, and WebSocket client.
RestlessStreamError Base SDK exception.
RestlessStreamAPIError REST API error exception.
RestlessStreamParseError Runtime event parsing exception.
compute_hmac_signature Computes an HMAC-SHA256 signature for a JSON-compatible payload.
verify_hmac_signature Constant-time HMAC signature verification.
DEFAULT_STREAM_BASE_URL Default runtime stream base URL.
build_sse_url, build_websocket_url Managed stream runtime URL builders.
build_direct_sse_url, build_direct_websocket_url Direct stream runtime URL builders.
to_websocket_url Converts an SSE URL to a WebSocket URL.
Models and enums Pydantic response models and enum types listed above.

Development

python -m pip install -e "packages/python/core[dev]"
python -m ruff check packages/python/core packages/python/examples
cd packages/python/core
python -m pytest tests

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

restless_stream-0.1.1.tar.gz (35.4 kB view details)

Uploaded Source

Built Distribution

If you're not sure about the file name format, learn more about wheel file names.

restless_stream-0.1.1-py3-none-any.whl (16.6 kB view details)

Uploaded Python 3

File details

Details for the file restless_stream-0.1.1.tar.gz.

File metadata

  • Download URL: restless_stream-0.1.1.tar.gz
  • Upload date:
  • Size: 35.4 kB
  • Tags: Source
  • Uploaded using Trusted Publishing? No
  • Uploaded via: twine/6.2.0 CPython/3.14.5

File hashes

Hashes for restless_stream-0.1.1.tar.gz
Algorithm Hash digest
SHA256 027cf9962b2d9c50a2b888cebab57d40b6614ce81218f49e0134aae2d2bfc4e2
MD5 b4d2242b1d81bbf850167c7f3d7b741f
BLAKE2b-256 b1d22950c66eae93dab0eeda45094789a82aee134ea6986178e22d93566908b5

See more details on using hashes here.

File details

Details for the file restless_stream-0.1.1-py3-none-any.whl.

File metadata

File hashes

Hashes for restless_stream-0.1.1-py3-none-any.whl
Algorithm Hash digest
SHA256 b4b3909d827bdb659ed74f483465405bc832f96a52bdb3e2dc754feda6b899af
MD5 9462a9d309182f3cc6db63a38ea02597
BLAKE2b-256 659e3775ec48d69d375088520aa16dd9ba537dc88826734fc64bd2c842d0f995

See more details on using hashes here.

Supported by

AWS Cloud computing and Security Sponsor Datadog Monitoring Depot Continuous Integration Fastly CDN Google Download Analytics Pingdom Monitoring Sentry Error logging StatusPage Status page