Skip to main content

Library for wrapping async streams to allow client resumption after connection loss

Project description

resumable-stream

A Python library for wrapping async streams to allow client resumption after connection loss.

This is a Python port of the vercel/resumable-stream TypeScript library.

Features

  • Resumable SSE streams: Clients can resume streams after disconnection
  • Redis-based persistence: Uses Redis pub/sub for cross-instance communication
  • Serverless-friendly: Designed for environments without sticky load balancing
  • Low latency: Minimal Redis operations for the common case (single INCR and SUBSCRIBE per stream)

Installation

pip install resumable-stream

Usage

Idempotent API (Recommended)

The resumable_stream method automatically creates a new stream or resumes an existing one:

import asyncio
from resumable_stream import create_resumable_stream_context

# Create context (uses REDIS_URL env var by default)
ctx = create_resumable_stream_context(redis_url="redis://localhost:6379")

async def my_stream():
    """Your actual stream producer."""
    for i in range(10):
        yield f"data: chunk {i}\n\n"
        await asyncio.sleep(0.5)

async def handle_request(stream_id: str, resume_at: int = None):
    stream = await ctx.resumable_stream(
        stream_id,
        my_stream,
        skip_characters=resume_at
    )
    
    if stream is None:
        # Stream already finished
        return {"status": 422, "body": "Stream is already done"}
    
    async for chunk in stream:
        yield chunk

# Use with FastAPI
from fastapi import FastAPI
from fastapi.responses import StreamingResponse

app = FastAPI()

@app.get("/stream/{stream_id}")
async def stream_endpoint(stream_id: str, resume_at: int = None):
    return StreamingResponse(
        handle_request(stream_id, resume_at),
        media_type="text/event-stream"
    )

Explicit Creation/Resumption

For more control, use separate methods for creating and resuming streams:

# POST endpoint - create new stream
async def create_stream(stream_id: str):
    stream = await ctx.create_new_resumable_stream(stream_id, my_stream)
    if stream is None:
        return {"status": 422, "body": "Stream already exists"}
    
    async for chunk in stream:
        yield chunk

# GET endpoint - resume existing stream
async def resume_stream(stream_id: str, resume_at: int = None):
    stream = await ctx.resume_existing_stream(stream_id, resume_at)
    
    if stream == "NOT_FOUND":
        return {"status": 404, "body": "Stream not found"}
    if stream is None:
        return {"status": 422, "body": "Stream is already done"}
    
    async for chunk in stream:
        yield chunk

Check Stream State

state = await ctx.has_existing_stream(stream_id)

if state is None:
    print("No stream exists")
elif state is True:
    print("Stream is active")
elif state == "DONE":
    print("Stream is finished")

How It Works

  1. First request: When resumable_stream is called for a new stream_id, a producer stream is created
  2. Producer persistence: The producer always completes the stream, even if the original reader disconnects
  3. Resume requests: Additional consumers publish a message to request stream resumption
  4. Chunk delivery: The producer sends buffered chunks and continues streaming to all consumers via Redis pub/sub

Redis Key Structure

  • {prefix}:rs:sentinel:{streamId} - Stream state ("1" = active, "DONE" = finished)
  • {prefix}:rs:request:{streamId} - Channel for resume requests
  • {prefix}:rs:chunk:{listenerId} - Channel for delivering chunks to consumers

Configuration

ctx = create_resumable_stream_context(
    key_prefix="resumable-stream",  # Redis key prefix
    redis_url="redis://localhost:6379",  # Or use REDIS_URL env var
    publisher=my_publisher,  # Custom Publisher implementation
    subscriber=my_subscriber,  # Custom Subscriber implementation
)

Custom Redis Clients

You can provide custom Publisher and Subscriber implementations for non-standard Redis setups:

from resumable_stream import Publisher, Subscriber

class MyPublisher:
    async def connect(self) -> None: ...
    async def publish(self, channel: str, message: str) -> int: ...
    async def set(self, key: str, value: str, *, ex: int = None) -> str: ...
    async def get(self, key: str) -> str | None: ...
    async def incr(self, key: str) -> int: ...

class MySubscriber:
    async def connect(self) -> None: ...
    async def subscribe(self, channel: str, callback) -> None: ...
    async def unsubscribe(self, channel: str) -> None: ...

Environment Variables

  • REDIS_URL or KV_URL: Redis connection URL (used if no explicit URL provided)
  • DEBUG: Set to enable debug logging

License

MIT

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

resumable_stream-0.1.0.tar.gz (10.1 kB view details)

Uploaded Source

Built Distribution

If you're not sure about the file name format, learn more about wheel file names.

resumable_stream-0.1.0-py3-none-any.whl (9.2 kB view details)

Uploaded Python 3

File details

Details for the file resumable_stream-0.1.0.tar.gz.

File metadata

  • Download URL: resumable_stream-0.1.0.tar.gz
  • Upload date:
  • Size: 10.1 kB
  • Tags: Source
  • Uploaded using Trusted Publishing? Yes
  • Uploaded via: twine/6.1.0 CPython/3.13.7

File hashes

Hashes for resumable_stream-0.1.0.tar.gz
Algorithm Hash digest
SHA256 fdc2718a3c1fb43a05d7131ffd393c80ea9bc6ca206453accc4325a238179c4a
MD5 0e42995e88f6cf8717c7cec0a72873ee
BLAKE2b-256 d27d33aa2e9347b17d1bf4f092a8881615e57f86787830be6434d5f71bc8a3cc

See more details on using hashes here.

Provenance

The following attestation bundles were made for resumable_stream-0.1.0.tar.gz:

Publisher: python-publish.yml on hieunguyen1053/resumable-stream

Attestations: Values shown here reflect the state when the release was signed and may no longer be current.

File details

Details for the file resumable_stream-0.1.0-py3-none-any.whl.

File metadata

File hashes

Hashes for resumable_stream-0.1.0-py3-none-any.whl
Algorithm Hash digest
SHA256 feb9fb95ddab90fb31c8eb51bdd6185189914a5e3dfc5e54878dbcf3681a5a27
MD5 345caafdaf191b666def2e55a76e448c
BLAKE2b-256 71c94b926a12bd86dc6d11fb517570ec09dc023aaa4fbaa2951ecc09852088a4

See more details on using hashes here.

Provenance

The following attestation bundles were made for resumable_stream-0.1.0-py3-none-any.whl:

Publisher: python-publish.yml on hieunguyen1053/resumable-stream

Attestations: Values shown here reflect the state when the release was signed and may no longer be current.

Supported by

AWS Cloud computing and Security Sponsor Datadog Monitoring Depot Continuous Integration Fastly CDN Google Download Analytics Pingdom Monitoring Sentry Error logging StatusPage Status page