Python SDK for Restless Stream
Project description
restless-stream
Official Python SDK for Restless Stream: https://restlessapi.stream
Restless Stream turns REST APIs into live Server-Sent Events and WebSocket streams. This package provides synchronous REST and SSE support, asynchronous REST, SSE, and WebSocket support, typed Pydantic models, runtime URL builders, and HMAC signature helpers.
Requirements
- Python
>=3.9. - A Restless Stream account and API key.
Installation
python -m pip install restless-stream
For local development from this repository:
python -m pip install -e "packages/python/core[dev]"
Getting Started
Async Client
Use AsyncRestlessStreamClient when you need async REST calls, SSE subscriptions, or WebSocket subscriptions.
import asyncio
import os
from restless_stream import AsyncRestlessStreamClient
async def main() -> None:
async with AsyncRestlessStreamClient(api_key=os.environ["RESTLESS_API_KEY"]) as client:
stream = await client.streams.create(
name="Orders",
description="Live order feed",
status="ACTIVE",
method="GET",
url="https://api.example.com/orders",
payload_mode="FULL_DATA",
polling_interval=30,
)
async for event in client.streams.subscribe_sse(stream.sse_url, reconnect=False):
print(event.type, event.data)
asyncio.run(main())
Sync Client
Use RestlessStreamClient for synchronous REST calls and SSE subscriptions.
import os
from restless_stream import RestlessStreamClient
with RestlessStreamClient(api_key=os.environ["RESTLESS_API_KEY"]) as client:
streams = client.streams.list(limit=20, offset=0)
for stream in streams.streams:
print(stream.id, stream.name)
Client Configuration
from restless_stream import AsyncRestlessStreamClient, RestlessStreamClient
client = RestlessStreamClient(
api_key="rs_...",
base_url="https://api.restlessapi.stream",
stream_base_url="https://stream.restlessapi.stream",
timeout=30.0,
)
async_client = AsyncRestlessStreamClient(
api_key="rs_...",
timeout=30.0,
)
| Option | Description |
|---|---|
api_key |
Sends x-api-key on REST requests and Authorization: Bearer <key> on stream runtime requests. |
base_url |
REST API base URL. Defaults to https://api.restlessapi.stream. |
stream_base_url |
Runtime stream base URL. Defaults to https://stream.restlessapi.stream. |
timeout |
HTTP timeout in seconds for the owned httpx client. Defaults to 30.0. |
http_client |
Optional httpx.Client or httpx.AsyncClient. When provided, the SDK does not close it. |
Both clients support context managers. Use close() for the sync client and await aclose() for the async client when you do not use a context manager.
When creating or updating streams, the SDK adds apiKey to the request body when the client has an API key and the body does not already include apiKey or api_key.
Stream Management
All stream management methods are available as top-level client methods and through client.streams.
stream = await client.streams.create(
name="Orders",
description="Live order feed",
status="ACTIVE",
method="GET",
url="https://api.example.com/orders",
headers={"Accept": "application/json"},
payload_mode="FULL_DATA",
polling_interval=30,
)
await client.streams.update(stream.id, name="Orders v2", polling_interval=60)
await client.streams.stop(stream.id)
await client.streams.start(stream.id)
usage = await client.streams.credit_usage_stats(stream.id)
snippets = await client.streams.connection_snippets(stream_id=stream.id, language="python")
| Top-level method | Resource method | Description |
|---|---|---|
list_streams(limit=20, offset=0) |
streams.list(...) |
List streams. |
get_stream(stream_id) |
streams.get(stream_id) |
Get one stream. |
create_stream(data=None, **kwargs) |
streams.create(...) |
Create a persisted stream. |
update_stream(stream_id, data=None, **kwargs) |
streams.update(...) |
Patch stream configuration. |
start_stream(stream_id) |
streams.start(stream_id) |
Mark a stream active. |
stop_stream(stream_id) |
streams.stop(stream_id) |
Mark a stream inactive. |
delete_stream(stream_id) |
streams.delete(stream_id) |
Delete a stream. |
validate_stream_api_key(api_key=None) |
streams.validate_api_key(...) |
Validate an API key. Uses the client key when omitted. |
credit_usage_stats(stream_id) |
streams.credit_usage_stats(...) |
Fetch credit usage totals and daily usage. |
connection_snippets(data=None, **kwargs) |
streams.connection_snippets(...) |
Generate runtime URLs and snippets. |
direct_setup(data=None, **kwargs) |
streams.direct_setup(...) |
Generate direct-stream commands, URLs, and snippets. |
direct_session(data=None, **kwargs) |
streams.direct_session(...) |
Create or reuse a direct stream session. |
subscribe_sse(url, **kwargs) |
streams.subscribe_sse(...) |
Subscribe to an SSE runtime URL. |
subscribe_direct_sse(**kwargs) |
streams.subscribe_direct_sse(...) |
Build and subscribe to a direct SSE URL. |
subscribe_websocket(url, **kwargs) |
streams.subscribe_websocket(...) |
Async client only. Subscribe to a WebSocket runtime URL. |
subscribe_direct_websocket(**kwargs) |
streams.subscribe_direct_websocket(...) |
Async client only. Build and subscribe to a direct WebSocket URL. |
Request bodies may be mappings, Pydantic models, or keyword arguments. Snake-case keys are converted to the camel-case field names expected by the API.
await client.streams.create(
name="Orders",
description="",
status="ACTIVE",
method="POST",
url="https://api.example.com/orders/search",
body={"status": "open"},
payload_mode="JSON_PATCH",
polling_interval=30,
)
Direct Streams
Direct streams let you stream a REST endpoint without creating a persisted stream first.
async for event in client.streams.subscribe_direct_sse(
url="https://api.example.com/orders",
method="GET",
polling_interval=30,
reconnect=False,
):
print(event)
Create a reusable direct session when you need a stable URL.
session = await client.streams.direct_session(
dedupe_key="orders-feed-v1",
method="GET",
url="https://api.example.com/orders",
polling_interval=30,
)
async for event in client.streams.subscribe_sse(session.sse_url):
print(event.data)
Generate setup commands and code snippets without subscribing.
setup = await client.streams.direct_setup(
method="POST",
url="https://api.example.com/search",
body={"query": "restless"},
language="python",
)
print(setup.commands.header.curl)
print(setup.runtime.direct_sse_url)
Direct stream inputs support url, method, headers, body, jq_filter, payload_mode, polling_interval, polling_strategies, and api_key.
SSE Streaming
SSE subscriptions yield StreamEvent Pydantic models.
for event in sync_client.streams.subscribe_sse(
"https://stream.restlessapi.stream?streamId=stream_123",
cursor="1700000000000",
reconnect=False,
):
print(event.type, event.data)
async for event in async_client.streams.subscribe_sse(
"https://stream.restlessapi.stream?streamId=stream_123",
since="2026-01-01T00:00:00Z",
max_reconnects=3,
retry_seconds=2,
):
print(event.type, event.meta.timestamp)
SSE options:
| Option | Description |
|---|---|
cursor |
Initial cursor or SSE event ID. Updated automatically from received events. |
since |
Initial timestamp or cursor used until an event cursor is observed. |
reconnect |
Reconnect after the stream ends or errors. Defaults to True. |
max_reconnects |
Maximum reconnect attempts. None means unbounded. |
retry_seconds |
Delay between reconnect attempts. Defaults to 1.0. |
The SDK sends Authorization: Bearer <api_key> for runtime subscriptions when the client has an API key.
WebSocket Streaming
WebSocket subscriptions are async-only.
async for event in async_client.streams.subscribe_websocket(
"wss://stream.restlessapi.stream/ws?streamId=stream_123",
cursor="42",
reconnect=True,
max_reconnects=3,
):
print(event.type, event.data)
Direct WebSocket subscription:
async for event in async_client.streams.subscribe_direct_websocket(
url="https://api.example.com/orders",
method="GET",
polling_interval=30,
connect_kwargs={"open_timeout": 10},
):
print(event)
WebSocket options include all SSE runtime options plus connect_kwargs, which are passed to websockets.connect. Do not include additional_headers or extra_headers in connect_kwargs when the SDK client has an api_key; the SDK owns the runtime Authorization header in that case.
Runtime URL Helpers
Build runtime URLs without creating a client.
from restless_stream import (
build_direct_sse_url,
build_direct_websocket_url,
build_sse_url,
build_websocket_url,
to_websocket_url,
)
sse_url = build_sse_url(stream_id="stream_123", since="2026-01-01T00:00:00Z")
ws_url = build_websocket_url(session_id="session_123")
direct_sse_url = build_direct_sse_url(
url="https://api.example.com/orders",
method="POST",
body={"status": "open"},
payload_mode="JSON_PATCH",
polling_interval=30,
)
direct_ws_url = build_direct_websocket_url(url="https://api.example.com/orders")
converted_ws_url = to_websocket_url(sse_url)
build_sse_url requires exactly one of stream_id or session_id.
Models
Response models are Pydantic v2 models. Python attributes use snake case and accept the API's camel-case aliases.
Common models and enums:
| Export | Description |
|---|---|
RestlessModel |
Base Pydantic model with camel-case aliases and extra fields allowed. |
HttpMethod |
GET, POST, PUT, PATCH, DELETE, OPTIONS. |
StreamStatus |
ACTIVE or INACTIVE. |
PayloadMode |
FULL_DATA or JSON_PATCH. |
PollingStrategy |
Time-windowed polling strategy. |
Stream |
Persisted stream response. |
StreamsResponse |
Stream list response with pagination info. |
BaseActionResponse |
Generic action response. |
StreamCreditUsageStats, DailyCreditUsage, CreditUsageChargeTypeBreakdown |
Credit usage responses. |
ConnectionSnippetsResponse |
Managed stream runtime URLs and snippets. |
DirectSetupResponse |
Direct setup commands, runtime URLs, snippets, and stream config. |
DirectSessionResponse |
Direct session runtime URLs and expiry. |
StreamEvent, StreamEventMeta, StreamErrorDetail |
Runtime event models. |
Runtime update events contain type, meta, data, optional signature, and optional event_id. Error events contain type, meta, and error.
Error Handling
REST API failures raise RestlessStreamAPIError.
from restless_stream import RestlessStreamAPIError
try:
stream = client.streams.get("missing")
except RestlessStreamAPIError as error:
print(error.status_code, error.message)
Invalid runtime event payloads raise RestlessStreamParseError. Both exceptions inherit from RestlessStreamError.
HMAC Helpers
Use HMAC helpers to compute or verify Restless Stream HMAC-SHA256 signatures over JSON-compatible payloads.
from restless_stream import compute_hmac_signature, verify_hmac_signature
payload = {"id": "order_123", "total": 42}
signature = compute_hmac_signature("secret", payload)
if verify_hmac_signature("secret", payload, signature):
print("valid")
When verifying a runtime event signature, pass the same JSON payload your integration signs and the event's signature value.
Public Exports
| Export | Purpose |
|---|---|
RestlessStreamClient |
Synchronous REST and SSE client. |
AsyncRestlessStreamClient |
Asynchronous REST, SSE, and WebSocket client. |
RestlessStreamError |
Base SDK exception. |
RestlessStreamAPIError |
REST API error exception. |
RestlessStreamParseError |
Runtime event parsing exception. |
compute_hmac_signature |
Computes an HMAC-SHA256 signature for a JSON-compatible payload. |
verify_hmac_signature |
Constant-time HMAC signature verification. |
DEFAULT_STREAM_BASE_URL |
Default runtime stream base URL. |
build_sse_url, build_websocket_url |
Managed stream runtime URL builders. |
build_direct_sse_url, build_direct_websocket_url |
Direct stream runtime URL builders. |
to_websocket_url |
Converts an SSE URL to a WebSocket URL. |
| Models and enums | Pydantic response models and enum types listed above. |
Development
python -m pip install -e "packages/python/core[dev]"
python -m ruff check packages/python/core packages/python/examples
cd packages/python/core
python -m pytest tests
Project details
Release history Release notifications | RSS feed
Download files
Download the file for your platform. If you're not sure which to choose, learn more about installing packages.
Source Distribution
Built Distribution
Filter files by name, interpreter, ABI, and platform.
If you're not sure about the file name format, learn more about wheel file names.
Copy a direct link to the current filters
File details
Details for the file restless_stream-0.1.1.tar.gz.
File metadata
- Download URL: restless_stream-0.1.1.tar.gz
- Upload date:
- Size: 35.4 kB
- Tags: Source
- Uploaded using Trusted Publishing? No
- Uploaded via: twine/6.2.0 CPython/3.14.5
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
027cf9962b2d9c50a2b888cebab57d40b6614ce81218f49e0134aae2d2bfc4e2
|
|
| MD5 |
b4d2242b1d81bbf850167c7f3d7b741f
|
|
| BLAKE2b-256 |
b1d22950c66eae93dab0eeda45094789a82aee134ea6986178e22d93566908b5
|
File details
Details for the file restless_stream-0.1.1-py3-none-any.whl.
File metadata
- Download URL: restless_stream-0.1.1-py3-none-any.whl
- Upload date:
- Size: 16.6 kB
- Tags: Python 3
- Uploaded using Trusted Publishing? No
- Uploaded via: twine/6.2.0 CPython/3.14.5
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
b4b3909d827bdb659ed74f483465405bc832f96a52bdb3e2dc754feda6b899af
|
|
| MD5 |
9462a9d309182f3cc6db63a38ea02597
|
|
| BLAKE2b-256 |
659e3775ec48d69d375088520aa16dd9ba537dc88826734fc64bd2c842d0f995
|