Python client for the Durable Streams protocol
Project description
durable-streams
Python client for the Durable Streams protocol.
Overview
The Durable Streams client provides two main APIs:
stream()/astream()functions - Read-only APIs for consuming streamsDurableStream/AsyncDurableStreamclasses - Handles for read/write operations
Installation
pip install durable-streams
Or with uv:
uv add durable-streams
Quick Start
Reading from a Stream
from durable_streams import stream
# Default iteration yields raw bytes chunks
with stream("https://streams.example.com/my-stream") as res:
for chunk in res: # bytes
process(chunk)
# Iterate over JSON items (flattened from arrays)
with stream("https://streams.example.com/my-stream") as res:
for item in res.iter_json():
print(item)
# Read all items at once
with stream("https://streams.example.com/my-stream", live=False) as res:
items = res.read_json()
print(f"Got {len(items)} items")
Async Reading
from durable_streams import astream
# Direct async context manager - no await needed!
async with astream("https://streams.example.com/my-stream") as res:
async for chunk in res: # bytes
process(chunk)
# Or iterate JSON
async with astream("https://streams.example.com/my-stream") as res:
async for item in res.iter_json():
print(item)
Writing to a Stream
from durable_streams import DurableStream
# Create a new stream
handle = DurableStream.create(
"https://streams.example.com/my-stream",
content_type="application/json",
ttl_seconds=3600,
)
# Append data
handle.append({"message": "hello"})
handle.append({"message": "world"})
# Read back
with handle.stream() as res:
for item in res.iter_json():
print(item)
API Reference
Top-Level Functions
stream(url, *, offset=None, live="auto", ...)
Create a synchronous streaming session.
from durable_streams import stream
res = stream(
url="https://example.com/stream",
offset="12345", # Resume from offset
live="auto", # Live mode (see below)
headers={"Authorization": "Bearer token"},
params={"tenant": "my-tenant"},
)
astream(url, *, offset=None, live="auto", ...)
Create an asynchronous streaming session.
from durable_streams import astream
res = await astream(
url="https://example.com/stream",
offset="12345",
live="auto",
)
Live Modes
The live parameter controls streaming behavior:
False- Catch-up only. Stop after reaching the end of the stream."auto"(default) - Behavior depends on consumption method:read_*()methods: Stop after reaching up-to-date- Iteration methods: Continue with long-poll for live updates
"long-poll"- Explicit long-poll mode for live updates"sse"- Explicit Server-Sent Events mode for live updates
StreamResponse / AsyncStreamResponse
Response objects returned by stream() and astream(). These are one-shot -
you can only consume them in one mode. Attempting to consume again raises StreamConsumedError.
Context Manager Usage (Recommended)
# Sync
with stream(url) as res:
for chunk in res:
process(chunk)
# Async
async with astream(url) as res:
async for chunk in res:
process(chunk)
Raw Bytes Iteration
# Default iteration yields bytes
with stream(url) as res:
for chunk in res: # bytes
print(len(chunk))
Note: Raw bytes iteration is not available in SSE mode. Use iter_text() or iter_json() instead.
Text Iteration
with stream(url) as res:
for text in res.iter_text(encoding="utf-8"):
print(text)
JSON Iteration
# Iterate over individual items (arrays are flattened)
with stream(url) as res:
for item in res.iter_json():
print(item)
# With a custom decoder
with stream(url) as res:
for item in res.iter_json(decode=MyModel.from_dict):
print(item)
JSON Batches (Preserves Array Boundaries)
with stream(url) as res:
for batch in res.iter_json_batches():
print(f"Got batch of {len(batch)} items")
Events with Metadata
from durable_streams import StreamEvent
with stream(url) as res:
for event in res.iter_events(mode="json"):
print(f"Data: {event.data}")
print(f"Offset: {event.next_offset}")
print(f"Up-to-date: {event.up_to_date}")
print(f"Cursor: {event.cursor}")
Read-All Methods
with stream(url, live=False) as res:
# Read all bytes
data = res.read_bytes()
# Read all text
text = res.read_text()
# Read all JSON items (flattened)
items = res.read_json()
# Read JSON batches (preserves boundaries)
batches = res.read_json_batches()
DurableStream / AsyncDurableStream
Handle classes for read/write operations on streams.
Creating Handles
from durable_streams import DurableStream
# Create a new stream
handle = DurableStream.create(
url="https://example.com/stream",
content_type="application/json",
ttl_seconds=3600,
headers={"Authorization": "Bearer token"},
)
# Connect to existing stream
handle = DurableStream.connect(
url="https://example.com/stream",
headers={"Authorization": "Bearer token"},
)
# Direct instantiation (no network call)
handle = DurableStream(
url="https://example.com/stream",
headers={"Authorization": "Bearer token"},
)
Instance Methods
# Get metadata
result = handle.head()
print(f"Offset: {result.offset}")
print(f"Content-Type: {result.content_type}")
# Append data
handle.append({"event": "click"})
handle.append({"event": "scroll"}, seq="seq-001")
# Delete stream
handle.delete()
# Read stream
with handle.stream(offset="12345") as res:
for item in res.iter_json():
print(item)
Async Version
from durable_streams import AsyncDurableStream
handle = await AsyncDurableStream.create(
url="https://example.com/stream",
content_type="application/json",
)
await handle.append({"event": "click"})
async with handle.stream() as res:
async for item in res.iter_json():
print(item)
Automatic Batching
By default, multiple append() calls made while a POST is in-flight are batched together:
import asyncio
from durable_streams import AsyncDurableStream
handle = await AsyncDurableStream.create(url, content_type="application/json")
# These may be sent in a single batched request
await asyncio.gather(
handle.append({"event": "a"}),
handle.append({"event": "b"}),
handle.append({"event": "c"}),
)
Disable batching if needed:
handle = DurableStream(url, batching=False)
Error Handling
from durable_streams import (
stream,
DurableStreamError,
FetchError,
SeqConflictError,
RetentionGoneError,
StreamConsumedError,
)
try:
with stream(url) as res:
items = res.read_json()
except StreamConsumedError:
print("Stream was already consumed")
except SeqConflictError:
print("Sequence conflict during append")
except RetentionGoneError:
print("Offset is before earliest retained position")
except DurableStreamError as e:
print(f"Protocol error: {e.message} (status={e.status}, code={e.code})")
except FetchError as e:
print(f"Network error: {e.message}")
Error Recovery with on_error
def handle_error(error):
if isinstance(error, FetchError) and error.status == 401:
new_token = refresh_token()
return {"headers": {"Authorization": f"Bearer {new_token}"}}
# Return None to propagate the error
return None
with stream(url, on_error=handle_error) as res:
for item in res.iter_json():
print(item)
Types
StreamEvent
@dataclass(frozen=True, slots=True)
class StreamEvent(Generic[T]):
data: T
next_offset: str
up_to_date: bool
cursor: str | None = None
LiveMode
LiveMode = Literal["auto", "long-poll", "sse"] | bool
HeadResult
@dataclass(frozen=True, slots=True)
class HeadResult:
exists: Literal[True]
content_type: str | None = None
offset: str | None = None
etag: str | None = None
cache_control: str | None = None
AppendResult
@dataclass(frozen=True, slots=True)
class AppendResult:
next_offset: str
Development
This package uses uv for development.
Setup
cd packages/client-py
uv sync --dev
Run Tests
uv run pytest
uv run pytest --cov=durable_streams
Linting and Formatting
uv run ruff check .
uv run ruff format .
Type Checking
uv run pyright
Build
uv build
Protocol Compliance
This client implements the Durable Streams Protocol, including:
- Read modes: Catch-up, Long-poll, and SSE
- Headers:
Stream-Next-Offset,Stream-Cursor,Stream-Up-To-Date,Stream-Seq - JSON mode: Array flattening on reads, array wrapping on appends
- Batching: Automatic request batching for high-throughput appends
License
MIT
Project details
Release history Release notifications | RSS feed
Download files
Download the file for your platform. If you're not sure which to choose, learn more about installing packages.
Source Distribution
Built Distribution
Filter files by name, interpreter, ABI, and platform.
If you're not sure about the file name format, learn more about wheel file names.
Copy a direct link to the current filters
File details
Details for the file durable_streams-0.1.0.tar.gz.
File metadata
- Download URL: durable_streams-0.1.0.tar.gz
- Upload date:
- Size: 49.5 kB
- Tags: Source
- Uploaded using Trusted Publishing? No
- Uploaded via: uv/0.9.18 {"installer":{"name":"uv","version":"0.9.18","subcommand":["publish"]},"python":null,"implementation":{"name":null,"version":null},"distro":{"name":"macOS","version":null,"id":null,"libc":null},"system":{"name":null,"release":null},"cpu":null,"openssl_version":null,"setuptools_version":null,"rustc_version":null,"ci":null}
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
3984b126010ada9d5208d0b2135acaaf1c20b3852908328c4ba8cc8e0ed42fd2
|
|
| MD5 |
768c2630e3d12a49bb94bef74932d93f
|
|
| BLAKE2b-256 |
c9f76a00512f4e7be31e8a34385982f576e5d22f3c72fa3dd27169e3babe0836
|
File details
Details for the file durable_streams-0.1.0-py3-none-any.whl.
File metadata
- Download URL: durable_streams-0.1.0-py3-none-any.whl
- Upload date:
- Size: 36.3 kB
- Tags: Python 3
- Uploaded using Trusted Publishing? No
- Uploaded via: uv/0.9.18 {"installer":{"name":"uv","version":"0.9.18","subcommand":["publish"]},"python":null,"implementation":{"name":null,"version":null},"distro":{"name":"macOS","version":null,"id":null,"libc":null},"system":{"name":null,"release":null},"cpu":null,"openssl_version":null,"setuptools_version":null,"rustc_version":null,"ci":null}
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
60d483ec68ddf922d8d19c453bc01d223cb5a41fd6da23c76ebedbe424f57660
|
|
| MD5 |
d9823a8b39ba1766459583f4aec8276f
|
|
| BLAKE2b-256 |
0f0e39097514c05acfd8b4934188a0c31edb5f893655618fefb913bc1c4940a9
|