SSE plugin for Starlette
Project description
Server-Sent Events for Starlette and FastAPI
Background: https://sysid.github.io/server-sent-events/
Production ready Server-Sent Events implementation for Starlette and FastAPI following the W3C SSE specification.
Installation
pip install sse-starlette
uv add sse-starlette
# To run the examples and demonstrations
uv add sse-starlette[examples]
# Recommended ASGI server
uv add sse-starlette[uvicorn,granian,daphne]
Quick Start
import asyncio
from starlette.applications import Starlette
from starlette.routing import Route
from sse_starlette import EventSourceResponse
async def generate_events():
for i in range(10):
yield {"data": f"Event {i}"}
await asyncio.sleep(1)
async def sse_endpoint(request):
return EventSourceResponse(generate_events())
app = Starlette(routes=[Route("/events", sse_endpoint)])
Core Features
- Standards Compliant: Full SSE specification implementation
- Framework Integration: Native Starlette and FastAPI support
- Async/Await: Built on modern Python async patterns
- Connection Management: Automatic client disconnect detection
- Graceful Shutdown: Proper cleanup on server termination
- Thread Safety: Context-local event management for multi-threaded applications
- Multi-Loop Support: Works correctly with multiple asyncio event loops
Key Components
EventSourceResponse
The main response class that handles SSE streaming:
from sse_starlette import EventSourceResponse
# Basic usage
async def stream_data():
for item in data:
yield {"data": item, "event": "update", "id": str(item.id)}
return EventSourceResponse(stream_data())
ServerSentEvent
For structured event creation:
from sse_starlette import ServerSentEvent
event = ServerSentEvent(
data="Custom message",
event="notification",
id="msg-123",
retry=5000
)
JSONServerSentEvent
For an easy way to send json data as SSE events:
from sse_starlette import JSONServerSentEvent
event = JSONServerSentEvent(
data={"field":"value"}, # Anything serializable with json.dumps
)
Advanced Usage
Custom Ping Configuration
from sse_starlette import ServerSentEvent
def custom_ping():
return ServerSentEvent(comment="Custom ping message")
return EventSourceResponse(
generate_events(),
ping=10, # Ping every 10 seconds
ping_message_factory=custom_ping
)
Multi-Threaded Usage
sse-starlette now supports usage in multi-threaded applications and with multiple asyncio event loops:
import threading
import asyncio
from sse_starlette import EventSourceResponse
def run_sse_in_thread():
"""SSE streaming works correctly in separate threads"""
loop = asyncio.new_event_loop()
asyncio.set_event_loop(loop)
async def thread_events():
for i in range(5):
yield {"data": f"Thread event {i}"}
await asyncio.sleep(1)
# This works without "Event bound to different loop" errors
response = EventSourceResponse(thread_events())
loop.close()
# Start SSE in multiple threads
for i in range(3):
thread = threading.Thread(target=run_sse_in_thread)
thread.start()
Database Streaming (Thread-Safe)
async def stream_database_results(request):
# CORRECT: Create session within generator context
async with AsyncSession() as session:
results = await session.execute(select(User))
for row in results:
if await request.is_disconnected():
break
yield {"data": row.name, "id": str(row.id)}
return EventSourceResponse(stream_database_results(request))
Error Handling and Timeouts
async def robust_stream(request):
try:
for i in range(100):
if await request.is_disconnected():
break
yield {"data": f"Item {i}"}
await asyncio.sleep(0.5)
except asyncio.CancelledError:
# Client disconnected - perform cleanup
raise
return EventSourceResponse(
robust_stream(request),
send_timeout=30, # Timeout hanging sends
headers={"Cache-Control": "no-cache"}
)
Memory Channels Alternative
For complex data flows, use memory channels instead of generators:
import anyio
from functools import partial
async def data_producer(send_channel):
async with send_channel:
for i in range(10):
await send_channel.send({"data": f"Item {i}"})
await anyio.sleep(1)
async def channel_endpoint(request):
send_channel, receive_channel = anyio.create_memory_object_stream(10)
return EventSourceResponse(
receive_channel,
data_sender_callable=partial(data_producer, send_channel)
)
Configuration Options
EventSourceResponse Parameters
| Parameter | Type | Default | Description |
|---|---|---|---|
content |
ContentStream |
Required | Async generator or iterable |
ping |
int |
15 | Ping interval in seconds |
sep |
str |
"\r\n" |
Line separator (\r\n, \r, \n) |
send_timeout |
float |
None |
Send operation timeout |
headers |
dict |
None |
Additional HTTP headers |
ping_message_factory |
Callable |
None |
Custom ping message creator |
Client Disconnection
async def monitored_stream(request):
events_sent = 0
try:
while events_sent < 100:
if await request.is_disconnected():
print(f"Client disconnected after {events_sent} events")
break
yield {"data": f"Event {events_sent}"}
events_sent += 1
await asyncio.sleep(1)
except asyncio.CancelledError:
print("Stream cancelled")
raise
Testing
sse-starlette includes now comprehensive test isolation without manual setup. The library automatically handles event loop contexts, eliminating the need for manual state resets:
# this is deprecated and not needed since version 3.0.0
import pytest
from sse_starlette import EventSourceResponse
@pytest.fixture
def reset_sse_app_status():
AppStatus.should_exit_event = None
yield
AppStatus.should_exit_event = None
Production Considerations
Performance Limits
- Memory: Each connection maintains a buffer. Monitor memory usage.
- Connections: Limited by system file descriptors and application design.
- Network: High-frequency events can saturate bandwidth.
Error Recovery
Implement client-side reconnection logic:
function createEventSource(url) {
const eventSource = new EventSource(url);
eventSource.onerror = function() {
setTimeout(() => {
createEventSource(url); // Reconnect after delay
}, 5000);
};
return eventSource;
}
Learning Resources
Examples Directory
The examples/ directory contains production-ready patterns:
01_basic_sse.py: Fundamental SSE concepts02_message_broadcasting.py: Multi-client message distribution03_database_streaming.py: Thread-safe database integration04_advanced_features.py: Custom protocols and error handling
Demonstrations Directory
The examples/demonstrations/ directory provides educational scenarios:
Basic Patterns (basic_patterns/):
- Client disconnect detection and cleanup
- Graceful server shutdown behavior
Production Scenarios (production_scenarios/):
- Load testing with concurrent clients
- Network interruption handling
Advanced Patterns (advanced_patterns/):
- Memory channels vs generators
- Error recovery and circuit breakers
- Custom protocol development
Run any demonstration:
python examples/demonstrations/basic_patterns/client_disconnect.py
python examples/demonstrations/production_scenarios/load_simulations.py
python examples/demonstrations/advanced_patterns/error_recovery.py
Troubleshooting
Common Issues
Database session errors with async generators
- Create database sessions inside generators, not as dependencies
Hanging connections after client disconnect
- Always check
await request.is_disconnected()in loops - Use
send_timeoutparameter to detect dead connections
If you are using Postman, please see: https://github.com/sysid/sse-starlette/issues/47#issuecomment-1445953826
Performance Optimization
# Connection limits
class ConnectionLimiter:
def __init__(self, max_connections=100):
self.semaphore = asyncio.Semaphore(max_connections)
async def limited_endpoint(self, request):
async with self.semaphore:
return EventSourceResponse(generate_events())
Network-Level Gotchas
Network infrastructure components can buffer SSE streams, breaking real-time delivery. Here are the most common issues and solutions:
Reverse Proxy Buffering (Nginx/Apache)
Problem: Nginx buffers responses by default, delaying SSE events until ~16KB accumulates.
Solution: Add the X-Accel-Buffering: no header.
Nginx Configuration (if you can't modify app headers):
location /events {
proxy_pass http://localhost:8000;
proxy_http_version 1.1;
proxy_set_header Connection '';
proxy_buffering off; # Disable for this location
chunked_transfer_encoding off;
}
CDN Issues
Cloudflare: Buffers ~100KB before flushing to clients, breaking real-time delivery. Akamai: Edge servers buffer by default.
Load Balancer Problems
HAProxy: Timeout settings must exceed heartbeat frequency.
# Ensure timeouts > ping interval
timeout client 60s # If ping every 45s
timeout server 60s
F5 Load Balancers: Buffer responses by default.
Contributing
See examples and demonstrations for implementation patterns. Run tests with:
make test-unit # Unit tests only
make test # All tests including integration
Project details
Release history Release notifications | RSS feed
Download files
Download the file for your platform. If you're not sure which to choose, learn more about installing packages.
Source Distribution
Built Distribution
Filter files by name, interpreter, ABI, and platform.
If you're not sure about the file name format, learn more about wheel file names.
Copy a direct link to the current filters
File details
Details for the file sse_starlette-3.2.0.tar.gz.
File metadata
- Download URL: sse_starlette-3.2.0.tar.gz
- Upload date:
- Size: 27.3 kB
- Tags: Source
- Uploaded using Trusted Publishing? No
- Uploaded via: twine/6.2.0 CPython/3.13.11
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
8127594edfb51abe44eac9c49e59b0b01f1039d0c7461c6fd91d4e03b70da422
|
|
| MD5 |
918248339fe909a6148cbcc6dc09995a
|
|
| BLAKE2b-256 |
8b8d00d280c03ffd39aaee0e86ec81e2d3b9253036a0f93f51d10503adef0e65
|
File details
Details for the file sse_starlette-3.2.0-py3-none-any.whl.
File metadata
- Download URL: sse_starlette-3.2.0-py3-none-any.whl
- Upload date:
- Size: 12.8 kB
- Tags: Python 3
- Uploaded using Trusted Publishing? No
- Uploaded via: twine/6.2.0 CPython/3.13.11
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
5876954bd51920fc2cd51baee47a080eb88a37b5b784e615abb0b283f801cdbf
|
|
| MD5 |
7ce4be39892731fd357ca11c687b84e3
|
|
| BLAKE2b-256 |
967f832f015020844a8b8f7a9cbc103dd76ba8e3875004c41e08440ea3a2b41a
|