Skip to main content

Library for wrapping async streams to allow client resumption after connection loss

Project description

resumable-stream

A Python library for wrapping async streams to allow client resumption after connection loss.

This is a Python port of the vercel/resumable-stream TypeScript library.

Features

  • Resumable SSE streams: Clients can resume streams after disconnection
  • Redis-based persistence: Uses Redis pub/sub for cross-instance communication
  • Serverless-friendly: Designed for environments without sticky load balancing
  • Low latency: Minimal Redis operations for the common case (single INCR and SUBSCRIBE per stream)

Installation

pip install resumable-stream

Usage

Idempotent API (Recommended)

The resumable_stream method automatically creates a new stream or resumes an existing one:

import asyncio
from resumable_stream import create_resumable_stream_context

# Create context (uses REDIS_URL env var by default)
ctx = create_resumable_stream_context(redis_url="redis://localhost:6379")

async def my_stream():
    """Your actual stream producer."""
    for i in range(10):
        yield f"data: chunk {i}\n\n"
        await asyncio.sleep(0.5)

async def handle_request(stream_id: str, resume_at: int = None):
    stream = await ctx.resumable_stream(
        stream_id,
        my_stream,
        skip_characters=resume_at
    )
    
    if stream is None:
        # Stream already finished
        return {"status": 422, "body": "Stream is already done"}
    
    async for chunk in stream:
        yield chunk

# Use with FastAPI
from fastapi import FastAPI
from fastapi.responses import StreamingResponse

app = FastAPI()

@app.get("/stream/{stream_id}")
async def stream_endpoint(stream_id: str, resume_at: int = None):
    return StreamingResponse(
        handle_request(stream_id, resume_at),
        media_type="text/event-stream"
    )

Explicit Creation/Resumption

For more control, use separate methods for creating and resuming streams:

# POST endpoint - create new stream
async def create_stream(stream_id: str):
    stream = await ctx.create_new_resumable_stream(stream_id, my_stream)
    if stream is None:
        return {"status": 422, "body": "Stream already exists"}
    
    async for chunk in stream:
        yield chunk

# GET endpoint - resume existing stream
async def resume_stream(stream_id: str, resume_at: int = None):
    stream = await ctx.resume_existing_stream(stream_id, resume_at)
    
    if stream == "NOT_FOUND":
        return {"status": 404, "body": "Stream not found"}
    if stream is None:
        return {"status": 422, "body": "Stream is already done"}
    
    async for chunk in stream:
        yield chunk

Check Stream State

state = await ctx.has_existing_stream(stream_id)

if state is None:
    print("No stream exists")
elif state is True:
    print("Stream is active")
elif state == "DONE":
    print("Stream is finished")

How It Works

  1. First request: When resumable_stream is called for a new stream_id, a producer stream is created
  2. Producer persistence: The producer always completes the stream, even if the original reader disconnects
  3. Resume requests: Additional consumers publish a message to request stream resumption
  4. Chunk delivery: The producer sends buffered chunks and continues streaming to all consumers via Redis pub/sub

Redis Key Structure

  • {prefix}:rs:sentinel:{streamId} - Stream state ("1" = active, "DONE" = finished)
  • {prefix}:rs:request:{streamId} - Channel for resume requests
  • {prefix}:rs:chunk:{listenerId} - Channel for delivering chunks to consumers

Configuration

ctx = create_resumable_stream_context(
    key_prefix="resumable-stream",  # Redis key prefix
    redis_url="redis://localhost:6379",  # Or use REDIS_URL env var
    publisher=my_publisher,  # Custom Publisher implementation
    subscriber=my_subscriber,  # Custom Subscriber implementation
)

Custom Redis Clients

You can provide custom Publisher and Subscriber implementations for non-standard Redis setups:

from resumable_stream import Publisher, Subscriber

class MyPublisher:
    async def connect(self) -> None: ...
    async def publish(self, channel: str, message: str) -> int: ...
    async def set(self, key: str, value: str, *, ex: int = None) -> str: ...
    async def get(self, key: str) -> str | None: ...
    async def incr(self, key: str) -> int: ...

class MySubscriber:
    async def connect(self) -> None: ...
    async def subscribe(self, channel: str, callback) -> None: ...
    async def unsubscribe(self, channel: str) -> None: ...

Environment Variables

  • REDIS_URL or KV_URL: Redis connection URL (used if no explicit URL provided)
  • DEBUG: Set to enable debug logging

License

MIT

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

resumable_stream-0.1.1.tar.gz (12.1 kB view details)

Uploaded Source

Built Distribution

If you're not sure about the file name format, learn more about wheel file names.

resumable_stream-0.1.1-py3-none-any.whl (9.9 kB view details)

Uploaded Python 3

File details

Details for the file resumable_stream-0.1.1.tar.gz.

File metadata

  • Download URL: resumable_stream-0.1.1.tar.gz
  • Upload date:
  • Size: 12.1 kB
  • Tags: Source
  • Uploaded using Trusted Publishing? Yes
  • Uploaded via: twine/6.1.0 CPython/3.13.7

File hashes

Hashes for resumable_stream-0.1.1.tar.gz
Algorithm Hash digest
SHA256 2ac40ef7862ece865275a949f5a180f594969a252fe53ca9722885ec1926a462
MD5 f91e7ac5763da5ff204964a87128b7c4
BLAKE2b-256 90c550b5e83d4e3f590cbd0ec0b49c329f0004fe926c0718d2194ae5c38c8be3

See more details on using hashes here.

Provenance

The following attestation bundles were made for resumable_stream-0.1.1.tar.gz:

Publisher: python-publish.yml on hieunguyen1053/resumable-stream

Attestations: Values shown here reflect the state when the release was signed and may no longer be current.

File details

Details for the file resumable_stream-0.1.1-py3-none-any.whl.

File metadata

File hashes

Hashes for resumable_stream-0.1.1-py3-none-any.whl
Algorithm Hash digest
SHA256 2ddd12b5dac0650962c898cab30014f2a666c0dc9b8eef249eddb6fc85881d07
MD5 bc9d54d77254b06ac85c89a4f31cd023
BLAKE2b-256 9aedef0630f4a60f10605b4c468b4ed06bd6ecc8c914433e2d91abd163283f63

See more details on using hashes here.

Provenance

The following attestation bundles were made for resumable_stream-0.1.1-py3-none-any.whl:

Publisher: python-publish.yml on hieunguyen1053/resumable-stream

Attestations: Values shown here reflect the state when the release was signed and may no longer be current.

Supported by

AWS Cloud computing and Security Sponsor Datadog Monitoring Depot Continuous Integration Fastly CDN Google Download Analytics Pingdom Monitoring Sentry Error logging StatusPage Status page