Skip to main content

SSE plugin for Starlette

Project description

Server-Sent Events for Starlette and FastAPI

Downloads PyPI Version Build Status

Background: https://sysid.github.io/server-sent-events/

Production ready Server-Sent Events implementation for Starlette and FastAPI following the W3C SSE specification.

Installation

pip install sse-starlette
uv add sse-starlette

# To run the examples (fastapi, uvicorn, pydantic)
uv add sse-starlette[examples]

# Example 03 also needs the DB extras (sqlalchemy, aiosqlite)
uv add sse-starlette[examples,examples-db]

# Recommended ASGI server
uv add sse-starlette[uvicorn,granian,daphne]

Quick Start

import asyncio
from starlette.applications import Starlette
from starlette.routing import Route
from sse_starlette import EventSourceResponse

async def generate_events():
    for i in range(10):
        yield {"data": f"Event {i}"}
        await asyncio.sleep(1)

async def sse_endpoint(request):
    return EventSourceResponse(generate_events())

app = Starlette(routes=[Route("/events", sse_endpoint)])

Core Features

  • Standards Compliant: Full SSE specification implementation
  • Framework Integration: Native Starlette and FastAPI support
  • Async/Await: Built on modern Python async patterns
  • Connection Management: Automatic client disconnect detection
  • Graceful Shutdown: Proper cleanup on server termination with cooperative shutdown support
  • Thread Safety: Context-local event management for multi-threaded applications
  • Multi-Loop Support: Works correctly with multiple asyncio event loops

For a detailed look at the internal task coordination, shutdown detection, and cancellation flows, see ARCHITECTURE.md.

Key Components

EventSourceResponse

The main response class that handles SSE streaming:

from sse_starlette import EventSourceResponse

# Basic usage
async def stream_data():
    for item in data:
        yield {"data": item, "event": "update", "id": str(item.id)}

return EventSourceResponse(stream_data())

ServerSentEvent

For structured event creation:

from sse_starlette import ServerSentEvent

event = ServerSentEvent(
    data="Custom message",
    event="notification", 
    id="msg-123",
    retry=5000
)

JSONServerSentEvent

For an easy way to send json data as SSE events:

from sse_starlette import JSONServerSentEvent

event = JSONServerSentEvent(
    data={"field":"value"}, # Anything serializable with json.dumps
)

Advanced Usage

Custom Ping Configuration

from sse_starlette import ServerSentEvent

def custom_ping():
    return ServerSentEvent(comment="Custom ping message")

return EventSourceResponse(
    generate_events(),
    ping=10,  # Ping every 10 seconds
    ping_message_factory=custom_ping
)

Multi-Threaded Usage

sse-starlette now supports usage in multi-threaded applications and with multiple asyncio event loops:

import threading
import asyncio
from sse_starlette import EventSourceResponse

def run_sse_in_thread():
    """SSE streaming works correctly in separate threads"""
    loop = asyncio.new_event_loop()
    asyncio.set_event_loop(loop)
    
    async def thread_events():
        for i in range(5):
            yield {"data": f"Thread event {i}"}
            await asyncio.sleep(1)
    
    # This works without "Event bound to different loop" errors
    response = EventSourceResponse(thread_events())
    loop.close()

# Start SSE in multiple threads
for i in range(3):
    thread = threading.Thread(target=run_sse_in_thread)
    thread.start()

Database Streaming (Thread-Safe)

async def stream_database_results(request):
    # CORRECT: Create session within generator context
    async with AsyncSession() as session:
        results = await session.execute(select(User))
        for row in results:
            if await request.is_disconnected():
                break
            yield {"data": row.name, "id": str(row.id)}

return EventSourceResponse(stream_database_results(request))

Error Handling and Timeouts

async def robust_stream(request):
    try:
        for i in range(100):
            if await request.is_disconnected():
                break
            yield {"data": f"Item {i}"}
            await asyncio.sleep(0.5)
    except asyncio.CancelledError:
        # Client disconnected - perform cleanup
        raise

return EventSourceResponse(
    robust_stream(request),
    send_timeout=30,  # Timeout hanging sends
    headers={"Cache-Control": "no-cache"}
)

Memory Channels Alternative

For complex data flows, use memory channels instead of generators:

import anyio
from functools import partial

async def data_producer(send_channel):
    async with send_channel:
        for i in range(10):
            await send_channel.send({"data": f"Item {i}"})
            await anyio.sleep(1)

async def channel_endpoint(request):
    send_channel, receive_channel = anyio.create_memory_object_stream(10)
    
    return EventSourceResponse(
        receive_channel,
        data_sender_callable=partial(data_producer, send_channel)
    )

Cooperative Shutdown

By default, generators receive CancelledError immediately when the server shuts down. With cooperative shutdown, generators can detect the shutdown signal, send farewell events to clients, and exit gracefully within a configurable grace period.

import anyio
from sse_starlette import EventSourceResponse

async def graceful_stream(request):
    shutdown_event = anyio.Event()

    async def generate():
        try:
            while not shutdown_event.is_set():
                yield {"data": "tick"}
                # Check for shutdown between iterations
                with anyio.move_on_after(1.0):
                    await shutdown_event.wait()
            # Shutdown detected — send farewell event
            yield {"event": "shutdown", "data": "Server is shutting down"}
        except anyio.get_cancelled_exc_class():
            # Grace period expired — generator force-cancelled
            raise

    return EventSourceResponse(
        generate(),
        shutdown_event=shutdown_event,       # Library sets this on shutdown
        shutdown_grace_period=5.0,           # Seconds to wait before force-cancel
    )

How it works:

  1. On server shutdown, the library sets your shutdown_event
  2. Your generator sees the event and can yield final events
  3. If the generator exits within shutdown_grace_period, shutdown is clean (no CancelledError)
  4. If the generator doesn't exit in time, it is force-cancelled as before

Important: shutdown_grace_period should be less than your ASGI server's graceful shutdown timeout (e.g. uvicorn's --timeout-graceful-shutdown), otherwise the process is killed before the grace period expires.

Without shutdown_event (the default), behavior is identical to previous versions: immediate cancellation on server shutdown.

Configuration Options

EventSourceResponse Parameters

Parameter Type Default Description
content ContentStream Required Async generator or iterable
ping int 15 Ping interval in seconds (0 to disable)
sep str "\r\n" Line separator (\r\n, \r, \n)
send_timeout float None Send operation timeout in seconds
headers dict None Additional HTTP headers
ping_message_factory Callable None Custom ping message creator
shutdown_event anyio.Event None Event set by library on server shutdown
shutdown_grace_period float 0 Seconds to wait after setting shutdown_event before force-cancel

Client Disconnection

async def monitored_stream(request):
    events_sent = 0
    try:
        while events_sent < 100:
            if await request.is_disconnected():
                print(f"Client disconnected after {events_sent} events")
                break
            
            yield {"data": f"Event {events_sent}"}
            events_sent += 1
            await asyncio.sleep(1)
            
    except asyncio.CancelledError:
        print("Stream cancelled")
        raise

Testing

sse-starlette includes now comprehensive test isolation without manual setup. The library automatically handles event loop contexts, eliminating the need for manual state resets:

# this is deprecated and not needed since version 3.0.0
import pytest
from sse_starlette import EventSourceResponse

@pytest.fixture
def reset_sse_app_status():
    AppStatus.should_exit_event = None
    yield
    AppStatus.should_exit_event = None

Production Considerations

Performance Limits

  • Memory: Each connection maintains a buffer. Monitor memory usage.
  • Connections: Limited by system file descriptors and application design.
  • Network: High-frequency events can saturate bandwidth.

Error Recovery

Implement client-side reconnection logic:

function createEventSource(url) {
    const eventSource = new EventSource(url);
    
    eventSource.onerror = function() {
        setTimeout(() => {
            createEventSource(url);  // Reconnect after delay
        }, 5000);
    };
    
    return eventSource;
}

Learning Resources

Examples Directory

The examples/ directory contains production-ready patterns:

  • 01_basic_sse.py: Fundamental SSE concepts
  • 02_message_broadcasting.py: Multi-client message distribution
  • 03_database_streaming.py: Thread-safe database integration
  • 04_advanced_features.py: Custom protocols and error handling

Demonstrations Directory

The examples/demonstrations/ directory provides educational scenarios:

Basic Patterns (basic_patterns/):

  • Client disconnect detection and cleanup
  • Graceful server shutdown behavior

Production Scenarios (production_scenarios/):

  • Load testing with concurrent clients
  • Network interruption handling

Advanced Patterns (advanced_patterns/):

  • Memory channels vs generators
  • Error recovery and circuit breakers
  • Custom protocol development

Run any demonstration:

python examples/demonstrations/basic_patterns/client_disconnect.py
python examples/demonstrations/production_scenarios/load_simulations.py
python examples/demonstrations/advanced_patterns/error_recovery.py

Troubleshooting

Common Issues

Database session errors with async generators

  • Create database sessions inside generators, not as dependencies

Hanging connections after client disconnect

  • Always check await request.is_disconnected() in loops
  • Use send_timeout parameter to detect dead connections

If you are using Postman, please see: https://github.com/sysid/sse-starlette/issues/47#issuecomment-1445953826

Performance Optimization

# Connection limits
class ConnectionLimiter:
    def __init__(self, max_connections=100):
        self.semaphore = asyncio.Semaphore(max_connections)
    
    async def limited_endpoint(self, request):
        async with self.semaphore:
            return EventSourceResponse(generate_events())

Network-Level Gotchas

Network infrastructure components can buffer SSE streams, breaking real-time delivery. Here are the most common issues and solutions:

Reverse Proxy Buffering (Nginx/Apache)

Problem: Nginx buffers responses by default, delaying SSE events until ~16KB accumulates.

Solution: Add the X-Accel-Buffering: no header.

Nginx Configuration (if you can't modify app headers):

location /events {
    proxy_pass http://localhost:8000;
    proxy_http_version 1.1;
    proxy_set_header Connection '';
    proxy_buffering off;           # Disable for this location
    chunked_transfer_encoding off;
}

CDN Issues

Cloudflare: Buffers ~100KB before flushing to clients, breaking real-time delivery. Akamai: Edge servers buffer by default.

Load Balancer Problems

HAProxy: Timeout settings must exceed heartbeat frequency.

# Ensure timeouts > ping interval
timeout client 60s    # If ping every 45s
timeout server 60s

F5 Load Balancers: Buffer responses by default.

Contributing

See examples and demonstrations for implementation patterns. Run tests with:

make test-unit              # Unit tests only (default dev install)
make test                   # Unit + docker integration tests
make test-experimentation   # Optional experimentation tests (multi-consumer load tests)
                            # Requires the `experimentation` dependency group:
                            #   uv sync --group experimentation

The experimentation group is opt-in (kept out of the default dev install) to keep the contributor footprint small. See tests/experimentation/ for the tests it enables.

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

sse_starlette-3.4.2.tar.gz (35.2 kB view details)

Uploaded Source

Built Distribution

If you're not sure about the file name format, learn more about wheel file names.

sse_starlette-3.4.2-py3-none-any.whl (16.5 kB view details)

Uploaded Python 3

File details

Details for the file sse_starlette-3.4.2.tar.gz.

File metadata

  • Download URL: sse_starlette-3.4.2.tar.gz
  • Upload date:
  • Size: 35.2 kB
  • Tags: Source
  • Uploaded using Trusted Publishing? No
  • Uploaded via: twine/6.2.0 CPython/3.13.11

File hashes

Hashes for sse_starlette-3.4.2.tar.gz
Algorithm Hash digest
SHA256 2f9a7f51ed84395a0427fb9f66cb1ec11f7899d977a72cbc9070b962a2e14489
MD5 6d856d03423fe898ccb0c424165f2d80
BLAKE2b-256 388210cdfab4ab663a6b6bd624d33f55b2cfa41af5105be033a6d5d135a92c5f

See more details on using hashes here.

File details

Details for the file sse_starlette-3.4.2-py3-none-any.whl.

File metadata

  • Download URL: sse_starlette-3.4.2-py3-none-any.whl
  • Upload date:
  • Size: 16.5 kB
  • Tags: Python 3
  • Uploaded using Trusted Publishing? No
  • Uploaded via: twine/6.2.0 CPython/3.13.11

File hashes

Hashes for sse_starlette-3.4.2-py3-none-any.whl
Algorithm Hash digest
SHA256 6ea5d35b7ce979a3de5a0db5f77fe886b1616e4b3e1ad93fba502bd9b5fb662f
MD5 20acfcf8dd330bc572e0eab14739905a
BLAKE2b-256 c127351c71e803c56090d8d3bf9520422debeb8ed938871fd4f7ef519805a6c5

See more details on using hashes here.

Supported by

AWS Cloud computing and Security Sponsor Datadog Monitoring Depot Continuous Integration Fastly CDN Google Download Analytics Pingdom Monitoring Sentry Error logging StatusPage Status page