Library for wrapping async streams to allow client resumption after connection loss
Project description
resumable-stream
A Python library for wrapping async streams to allow client resumption after connection loss.
This is a Python port of the vercel/resumable-stream TypeScript library.
Features
- Resumable SSE streams: Clients can resume streams after disconnection
- Redis-based persistence: Uses Redis pub/sub for cross-instance communication
- Serverless-friendly: Designed for environments without sticky load balancing
- Low latency: Minimal Redis operations for the common case (single
INCRandSUBSCRIBEper stream)
Installation
pip install resumable-stream
Usage
Idempotent API (Recommended)
The resumable_stream method automatically creates a new stream or resumes an existing one:
import asyncio
from resumable_stream import create_resumable_stream_context
# Create context (uses REDIS_URL env var by default)
ctx = create_resumable_stream_context(redis_url="redis://localhost:6379")
async def my_stream():
"""Your actual stream producer."""
for i in range(10):
yield f"data: chunk {i}\n\n"
await asyncio.sleep(0.5)
async def handle_request(stream_id: str, resume_at: int = None):
stream = await ctx.resumable_stream(
stream_id,
my_stream,
skip_characters=resume_at
)
if stream is None:
# Stream already finished
return {"status": 422, "body": "Stream is already done"}
async for chunk in stream:
yield chunk
# Use with FastAPI
from fastapi import FastAPI
from fastapi.responses import StreamingResponse
app = FastAPI()
@app.get("/stream/{stream_id}")
async def stream_endpoint(stream_id: str, resume_at: int = None):
return StreamingResponse(
handle_request(stream_id, resume_at),
media_type="text/event-stream"
)
Explicit Creation/Resumption
For more control, use separate methods for creating and resuming streams:
# POST endpoint - create new stream
async def create_stream(stream_id: str):
stream = await ctx.create_new_resumable_stream(stream_id, my_stream)
if stream is None:
return {"status": 422, "body": "Stream already exists"}
async for chunk in stream:
yield chunk
# GET endpoint - resume existing stream
async def resume_stream(stream_id: str, resume_at: int = None):
stream = await ctx.resume_existing_stream(stream_id, resume_at)
if stream == "NOT_FOUND":
return {"status": 404, "body": "Stream not found"}
if stream is None:
return {"status": 422, "body": "Stream is already done"}
async for chunk in stream:
yield chunk
Check Stream State
state = await ctx.has_existing_stream(stream_id)
if state is None:
print("No stream exists")
elif state is True:
print("Stream is active")
elif state == "DONE":
print("Stream is finished")
How It Works
- First request: When
resumable_streamis called for a newstream_id, a producer stream is created - Producer persistence: The producer always completes the stream, even if the original reader disconnects
- Resume requests: Additional consumers publish a message to request stream resumption
- Chunk delivery: The producer sends buffered chunks and continues streaming to all consumers via Redis pub/sub
Redis Key Structure
{prefix}:rs:sentinel:{streamId}- Stream state ("1" = active, "DONE" = finished){prefix}:rs:request:{streamId}- Channel for resume requests{prefix}:rs:chunk:{listenerId}- Channel for delivering chunks to consumers
Configuration
ctx = create_resumable_stream_context(
key_prefix="resumable-stream", # Redis key prefix
redis_url="redis://localhost:6379", # Or use REDIS_URL env var
publisher=my_publisher, # Custom Publisher implementation
subscriber=my_subscriber, # Custom Subscriber implementation
)
Custom Redis Clients
You can provide custom Publisher and Subscriber implementations for non-standard Redis setups:
from resumable_stream import Publisher, Subscriber
class MyPublisher:
async def connect(self) -> None: ...
async def publish(self, channel: str, message: str) -> int: ...
async def set(self, key: str, value: str, *, ex: int = None) -> str: ...
async def get(self, key: str) -> str | None: ...
async def incr(self, key: str) -> int: ...
class MySubscriber:
async def connect(self) -> None: ...
async def subscribe(self, channel: str, callback) -> None: ...
async def unsubscribe(self, channel: str) -> None: ...
Environment Variables
REDIS_URLorKV_URL: Redis connection URL (used if no explicit URL provided)DEBUG: Set to enable debug logging
License
MIT
Project details
Download files
Download the file for your platform. If you're not sure which to choose, learn more about installing packages.
Source Distribution
Built Distribution
Filter files by name, interpreter, ABI, and platform.
If you're not sure about the file name format, learn more about wheel file names.
Copy a direct link to the current filters
File details
Details for the file resumable_stream-0.2.0.tar.gz.
File metadata
- Download URL: resumable_stream-0.2.0.tar.gz
- Upload date:
- Size: 10.1 kB
- Tags: Source
- Uploaded using Trusted Publishing? Yes
- Uploaded via: twine/6.1.0 CPython/3.13.7
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
02e6fa7cebd45a3f451f19e965941389b5f8cd4f9a6f88638bdf00872327be3e
|
|
| MD5 |
a41305590c9b0523f9093bc2f9aa66f3
|
|
| BLAKE2b-256 |
c9c7b39f9ea4f34f25cc87c363dc6cb9b91d02590cc7bca5e1f90ba25c7caa41
|
Provenance
The following attestation bundles were made for resumable_stream-0.2.0.tar.gz:
Publisher:
python-publish.yml on hieunguyen1053/resumable-stream
-
Statement:
-
Statement type:
https://in-toto.io/Statement/v1 -
Predicate type:
https://docs.pypi.org/attestations/publish/v1 -
Subject name:
resumable_stream-0.2.0.tar.gz -
Subject digest:
02e6fa7cebd45a3f451f19e965941389b5f8cd4f9a6f88638bdf00872327be3e - Sigstore transparency entry: 799048971
- Sigstore integration time:
-
Permalink:
hieunguyen1053/resumable-stream@bc420e55b47065558b93a27040641dfdcc4c32c2 -
Branch / Tag:
refs/tags/0.2.0 - Owner: https://github.com/hieunguyen1053
-
Access:
public
-
Token Issuer:
https://token.actions.githubusercontent.com -
Runner Environment:
github-hosted -
Publication workflow:
python-publish.yml@bc420e55b47065558b93a27040641dfdcc4c32c2 -
Trigger Event:
release
-
Statement type:
File details
Details for the file resumable_stream-0.2.0-py3-none-any.whl.
File metadata
- Download URL: resumable_stream-0.2.0-py3-none-any.whl
- Upload date:
- Size: 8.5 kB
- Tags: Python 3
- Uploaded using Trusted Publishing? Yes
- Uploaded via: twine/6.1.0 CPython/3.13.7
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
76cf8b0ebea91aa9c52b24c0e435556a71651a8b8c84b2d89cfbed88dc4b184f
|
|
| MD5 |
57ae9ba0541c9d7c22111f3bf59b4ac6
|
|
| BLAKE2b-256 |
e2efa345e968c288f9cb02c11fc622185371bf2340aa7e010cd19c3f9a73a90a
|
Provenance
The following attestation bundles were made for resumable_stream-0.2.0-py3-none-any.whl:
Publisher:
python-publish.yml on hieunguyen1053/resumable-stream
-
Statement:
-
Statement type:
https://in-toto.io/Statement/v1 -
Predicate type:
https://docs.pypi.org/attestations/publish/v1 -
Subject name:
resumable_stream-0.2.0-py3-none-any.whl -
Subject digest:
76cf8b0ebea91aa9c52b24c0e435556a71651a8b8c84b2d89cfbed88dc4b184f - Sigstore transparency entry: 799048972
- Sigstore integration time:
-
Permalink:
hieunguyen1053/resumable-stream@bc420e55b47065558b93a27040641dfdcc4c32c2 -
Branch / Tag:
refs/tags/0.2.0 - Owner: https://github.com/hieunguyen1053
-
Access:
public
-
Token Issuer:
https://token.actions.githubusercontent.com -
Runner Environment:
github-hosted -
Publication workflow:
python-publish.yml@bc420e55b47065558b93a27040641dfdcc4c32c2 -
Trigger Event:
release
-
Statement type: