Server-Sent Events (SSE) notifications for FastAPI with Redis Pub/Sub - Add real-time updates to your REST API
Project description
FastAPI SSE Events
Server-Sent Events (SSE) notifications for FastAPI using Redis Pub/Sub
Add real-time "refresh-less" updates to your FastAPI REST API in minutes.
Perfect for collaborative tools (CRMs, project management, dashboards) where multiple users need to see updates instantly without manual page refreshes.
๐ Documentation
- Full Documentation - Complete guide with examples
- PyPI Package - Installation and releases
- GitHub Repository - Source code and issues
- Changelog - Version history
Table of Contents
- Overview
- Why SSE?
- Installation
- Quick Start
- Core Concepts
- Usage Guide
- Configuration
- Client Integration
- Deployment
- Examples
- API Reference
- Troubleshooting
- License
Overview
FastAPI SSE Events enables near real-time notifications for REST-based applications without the complexity of WebSockets. Clients subscribe to Server-Sent Event streams for specific topics, receive lightweight notifications when data changes, then refresh data via existing REST endpoints.
Key Features
- ๐ Simple Integration - Add SSE with 3 lines of code, minimal changes to existing endpoints
- ๐ก Server-Sent Events - Lightweight, one-way communication (server โ client)
- ๐ Redis Pub/Sub - Horizontal scaling across multiple API instances
- ๐ Authorization Hooks - Secure topic subscriptions with custom auth logic
- ๐ Heartbeat Support - Automatic connection keepalive (configurable interval)
- ๐ฏ Topic-based Routing - Fine-grained subscription control per resource
- ๐ง Type Safe - Full type hints and mypy compliance
- โ Well Tested - Comprehensive test suite with 80%+ coverage
Why SSE?
Problem: Traditional REST APIs require manual refresh to see updates from other users. This creates poor collaboration experiences and inefficient workflows.
Why not WebSockets? WebSockets are powerful but complex:
- Require separate infrastructure
- More difficult to debug
- Overkill for one-way notifications
- Don't work well with existing REST patterns
SSE Solution:
- Built on HTTP (works with existing infrastructure)
- Native browser support (
EventSource) - Automatic reconnection
- Perfect for "notify then fetch" pattern
- Keeps REST as source of truth
Installation
pip install fastapi-sse-events
Requirements:
- Python 3.10+
- FastAPI
- Redis server
Quick Start
1. Install and Start Redis
# Using Docker
docker run -d -p 6379:6379 redis:alpine
# Or use local Redis
redis-server
2. Add to Your FastAPI App
from fastapi import Request
from fastapi_sse_events import SSEApp, publish_event, subscribe_to_events
app = SSEApp(
title="My API",
redis_url="redis://localhost:6379/0",
)
@app.get("/events")
@subscribe_to_events()
async def events(request: Request):
pass
3. Publish Events from Your Endpoints
@app.post("/comments")
@publish_event(topic="comments", event="comment_created")
async def create_comment(request: Request, comment: Comment):
# Save comment to database
saved_comment = await db.save(comment)
return saved_comment # Auto-published to SSE clients
4. Subscribe from Client
// Connect to SSE stream
const eventSource = new EventSource('http://localhost:8000/events?topic=comments');
// Handle events
eventSource.addEventListener('comment_created', (e) => {
const data = JSON.parse(e.data);
console.log('Comment event:', data);
// Notify-then-fetch pattern (recommended)
fetch(`http://localhost:8000/comments/${data.id}`)
.then(r => r.json())
.then(renderComment);
});
That's it! Your app now has real-time updates.
Core Concepts
Architecture
โโโโโโโโโโโโโโโ โโโโโโโโโโโโโโโ โโโโโโโโโโโโโโโ
โ Client 1 โ โ Client 2 โ โ Client N โ
โ EventSource โ โ EventSource โ โ EventSource โ
โโโโโโโโฌโโโโโโโ โโโโโโโโฌโโโโโโโ โโโโโโโโฌโโโโโโโ
โ โ โ
โ SSE Stream (/events?topic=...) โ
โโโโโโโโโโโโโโโโโโโโโโโโโผโโโโโโโโโโโโโโโโโโโโโโโโ
โ
โโโโโโโโโโโโผโโโโโโโโโโโ
โ FastAPI Instance โ
โ โ
โ REST API + SSE โ
โโโโโโโโโโโโฌโโโโโโโโโโโ
โ
โ Pub/Sub
โโโโโโโโโโโโผโโโโโโโโโโโ
โ Redis Server โ
โ โ
โ Topic Channels โ
โโโโโโโโโโโโโโโโโโโโโโโ
โฒ
โ Pub/Sub
โโโโโโโโโโโโดโโโโโโโโโโโ
โ FastAPI Instance โ
โ โ
โ REST API + SSE โ
โโโโโโโโโโโโโโโโโโโโโโโ
How It Works
- Client opens SSE connection:
GET /events?topic=comment_thread:123 - Server subscribes to Redis channel for that topic
- Another client posts data via REST API
- Server saves data and publishes event to Redis
- Redis broadcasts to all subscribed FastAPI instances
- All clients receive lightweight notification
- Clients fetch fresh data via existing REST endpoints
Event Flow
User Action โ REST Endpoint โ Save to DB โ Publish Event โ Redis Pub/Sub
โ
Client โ SSE Stream โ FastAPI โ Redis Subscribe โ Redis Pub/Sub
โ
Fetch Updated Data โ REST Endpoint
Usage Guide
Basic Integration (Recommended)
from fastapi import Request
from fastapi_sse_events import SSEApp, publish_event, subscribe_to_events
app = SSEApp(redis_url="redis://localhost:6379/0")
@app.post("/tickets/{ticket_id}/status")
@publish_event(topic="tickets", event="ticket_status_changed")
async def update_ticket_status(request: Request, ticket_id: int, status: str):
await db.update_ticket(ticket_id, status=status)
return {"id": ticket_id, "status": status}
@app.get("/events")
@subscribe_to_events() # topics via ?topic=tickets
async def events(request: Request):
pass
Use SSEApp + decorators for minimal boilerplate. Keep mount_sse() for advanced/manual integration.
Publishing Events
There are several approaches to publishing events, ranging from the simple (recommended) to the advanced (manual control).
Method 1: Decorator-Based Publish (Recommended)
The simplest and most Pythonic approach. Just add @publish_event() to any endpoint, and the decorator automatically publishes the response:
from fastapi import Request
from fastapi_sse_events import publish_event
@app.post("/tickets/{ticket_id}/status")
@publish_event(topic="tickets", event="ticket_status_changed")
async def update_ticket_status(request: Request, ticket_id: int, status: str):
await db.update_ticket(ticket_id, status=status)
return {"id": ticket_id, "status": status} # Auto-published to all subscribers
Features:
- โ No boilerplate - just decorate your endpoint
- โ Response automatically published as event data
- โ Automatic HTTP and SSE notification
- โ Works with any endpoint (GET, POST, PUT, DELETE)
Method 2: Direct Publish (Advanced)
For fine-grained control over event data, publish manually using app.state.event_broker:
from fastapi_sse_events import publish_event
@app.post("/tickets/{ticket_id}/status")
async def update_ticket_status(request: Request, ticket_id: int, status: str):
# Update database
await db.update_ticket(ticket_id, status=status)
# Manual publish with custom data
await app.state.event_broker.publish(
topic=f"ticket:{ticket_id}",
event="ticket_status_changed",
data={
"ticket_id": ticket_id,
"status": status,
"changed_at": datetime.utcnow().isoformat(),
"changed_by": request.user.id
}
)
return {"id": ticket_id, "status": status}
Use this when you need:
- Custom/computed event data not in the HTTP response
- Different data for SSE vs HTTP clients
- Multiple events from a single endpoint
Method 3: Service Layer Pattern (Advanced)
Encapsulate publishing logic in service classes for clean separation of concerns:
from fastapi_sse_events import publish_event
class TaskService:
def __init__(self, broker):
self.broker = broker
async def create_task(self, data: dict, user_id: int):
# Business logic
task = await db.save_task(data, user_id)
# Publish to project subscribers
await self.broker.publish(
topic=f"project:{data['project_id']}",
event="task:created",
data={"id": task.id, "title": task.title, "assignee": user_id}
)
return task
# In endpoint
@app.post("/tasks")
async def create_task_endpoint(request: Request, data: TaskCreate):
service = TaskService(app.state.event_broker)
return await service.create_task(data.dict(), request.user.id)
Authorization
Protect SSE subscriptions with custom authorization callbacks. Control which users can subscribe to which topics:
from fastapi import Request, HTTPException
from fastapi_sse_events import SSEApp
async def authorize_subscription(request: Request, topic: str) -> bool:
"""
Authorization callback - return True to allow subscription, False to deny.
Called for each topic subscription attempt.
Args:
request: FastAPI request (contains user info via session, JWT, etc.)
topic: Topic being subscribed to
Returns:
True if authorized, False otherwise
"""
# Get current user (implement based on your auth method)
user = await get_current_user(request)
if not user:
return False
# Allow user to subscribe to their own topic
if topic.startswith("user:"):
user_id = topic.split(":")[1]
return str(user.id) == user_id
# Check workspace membership
if topic.startswith("workspace:"):
workspace_id = topic.split(":")[1]
return await user_has_workspace_access(user.id, workspace_id)
# Check organization access
if topic.startswith("org:"):
org_id = topic.split(":")[1]
return await user_in_organization(user.id, org_id)
# Broadcast topics - anyone can subscribe
if topic in ["announcements", "system"]:
return True
# Deny by default
return False
# Create app with authorization
app = SSEApp(
redis_url="redis://localhost:6379/0",
authorize=authorize_subscription # Pass authorization callback
)
JWT-Based Authorization Example:
from fastapi.security import HTTPBearer, HTTPAuthCredentialDetails
from jose import jwt, JWTError
security = HTTPBearer()
async def authorize_subscription(request: Request, topic: str) -> bool:
# Get token from header
credentials = await security(request)
if not credentials:
return False
try:
# Decode JWT token
payload = jwt.decode(
credentials.credentials,
settings.SECRET_KEY,
algorithms=["HS256"]
)
user_id = payload.get("sub")
user_roles = payload.get("roles", [])
except JWTError:
return False
# Check topic permissions based on roles
if topic.startswith("admin:"):
return "admin" in user_roles
if topic.startswith("user:"):
return user_id == topic.split(":")[1]
return True
Security Best Practices:
โ Always:
- Validate user authentication (JWT, sessions, etc.)
- Check permissions for resource-specific topics
- Log authorization failures for security monitoring
- Use granular topic patterns with IDs (e.g.,
workspace:123not justworkspace)
โ Never:
- Allow wildcard subscriptions without explicit auth
- Trust client-supplied topic names
- Skip authorization for "internal" topics
- Cache authorization results without invalidation
Topic Patterns
Topics are simple strings that route events to subscribers. Choose a consistent naming convention for your application:
Simple Topic Strings:
# Just use strings directly!
await app.state.event_broker.publish(
topic="comments",
event="comment:created",
data={"id": 1, "text": "Hello"}
)
# Or with resource IDs for more granularity
await app.state.event_broker.publish(
topic="comment:123",
event="updated",
data={"text": "Edited"}
)
Recommended Topic Conventions:
| Pattern | Use Case | Example |
|---|---|---|
resource |
Broadcast updates to all | comments, tasks, notifications |
resource:id |
Updates to specific resource | comment:123, task:456, invite:789 |
resource:id:action |
Specific action on resource | comment:123:deleted, task:456:assigned |
workspace:id |
All updates in a workspace | workspace:acme-corp |
user:id |
User-specific notifications | user:john_doe |
broadcast or global |
System-wide announcements | broadcast |
Using TopicBuilder (Optional):
For consistency, you can use the optional TopicBuilder helper:
from fastapi_sse_events import TopicBuilder
# Built-in helper methods
TopicBuilder.comment("c123") # โ "comment:c123"
TopicBuilder.task("t456") # โ "task:t456"
TopicBuilder.workspace("ws789") # โ "workspace:ws789"
TopicBuilder.user("john") # โ "user:john"
# Or custom topics
TopicBuilder.custom("invoice", "inv123") # โ "invoice:inv123"
However, TopicBuilder is optional - plain strings work equally well and are often simpler.
Best Practices:
- โ
Use colons for namespacing:
resource:id - โ Keep topic names lowercase and short
- โ Include IDs for resource-specific topics (enables authorization)
- โ Use consistent separators (colon recommended)
- โ Document topic names your app uses
- โ Avoid dynamic topic names without validation
- โ Avoid spaces or special characters
- โ Don't use topics to store data (topics are routing keys only)
Configuration
SSEApp Configuration
The SSEApp class provides simple one-line configuration:
from fastapi_sse_events import SSEApp
import os
# Minimal setup
app = SSEApp(redis_url="redis://localhost:6379/0")
# Full configuration
app = SSEApp(
title="My API",
redis_url=os.getenv("REDIS_URL", "redis://localhost:6379/0"),
heartbeat_seconds=30, # SSE keepalive interval (5-60s)
authorize=authorize_subscription, # Authorization callback (optional)
)
Configuration Parameters:
| Parameter | Type | Default | Description |
|---|---|---|---|
title |
str | "FastAPI" | App title (optional) |
redis_url |
str | โ ๏ธ Required | Redis connection URL |
heartbeat_seconds |
int | 30 | SSE keepalive interval (5-60) |
authorize |
callable | None | Authorization callback function |
Environment Variables
Load configuration from environment for easier deployment:
import os
from fastapi_sse_events import SSEApp
app = SSEApp(
redis_url=os.getenv("REDIS_URL", "redis://localhost:6379/0"),
heartbeat_seconds=int(os.getenv("SSE_HEARTBEAT_SECONDS", "30")),
)
Standard Environment Variables:
# Development
export REDIS_URL="redis://localhost:6379/0"
export SSE_HEARTBEAT_SECONDS=30
# Production (with authentication)
export REDIS_URL="redis://:password@redis-prod.example.com:6379/0"
export SSE_HEARTBEAT_SECONDS=60
Load from .env file using python-dotenv:
from dotenv import load_dotenv
import os
from fastapi_sse_events import SSEApp
load_dotenv()
app = SSEApp(
redis_url=os.getenv("REDIS_URL"),
heartbeat_seconds=int(os.getenv("SSE_HEARTBEAT_SECONDS", "30")),
)
Advanced: Manual Broker Setup
For advanced use cases, you can configure the broker manually (not recommended for most applications):
from fastapi import FastAPI
from fastapi_sse_events import EventBroker, RealtimeConfig, mount_sse
app = FastAPI()
config = RealtimeConfig(
redis_url="redis://localhost:6379/0",
heartbeat_seconds=30,
)
broker = mount_sse(app, config, authorize=authorize_subscription)
Prefer SSEApp for new projects - it handles all the setup automatically.
Client Integration
JavaScript / Browser
// Connect to SSE endpoint
const eventSource = new EventSource('/events?topic=comments');
// Handle connection opened
eventSource.addEventListener('open', () => {
console.log('โ
Connected to SSE');
document.getElementById('status').textContent = 'Connected';
});
// Handle connection errors
eventSource.addEventListener('error', (error) => {
console.error('โ SSE Error:', error);
document.getElementById('status').textContent = 'Disconnected';
// EventSource automatically reconnects
});
// Listen to specific events
eventSource.addEventListener('comment:created', (event) => {
const data = JSON.parse(event.data);
console.log('New comment:', data);
// Fetch fresh data from API
fetch(`/comments/${data.id}`)
.then(r => r.json())
.then(comment => renderComment(comment));
});
eventSource.addEventListener('comment:updated', (event) => {
const data = JSON.parse(event.data);
console.log('Updated comment:', data);
// Refresh specific comment
fetch(`/comments/${data.id}`)
.then(r => r.json())
.then(comment => updateComment(comment));
});
// Listen to heartbeat (optional)
eventSource.addEventListener('heartbeat', (event) => {
console.log('๐ Heartbeat');
});
// Subscribe to multiple topics
const multiTopics = ['comments', 'tasks', 'notifications'];
const eventSource2 = new EventSource(
`/events?topic=${multiTopics.join(',')}`
);
// Cleanup
function closeSSE() {
eventSource.close();
}
window.addEventListener('beforeunload', closeSSE);
React Hook Example
import { useEffect, useState, useCallback } from 'react';
// Custom hook for SSE
function useSSE(topic, onEvent) {
const [connected, setConnected] = useState(false);
useEffect(() => {
const eventSource = new EventSource(`/events?topic=${topic}`);
eventSource.addEventListener('open', () => {
setConnected(true);
});
eventSource.addEventListener('error', () => {
setConnected(false);
});
// Forward all events to callback
eventSource.onmessage = (event) => {
const data = JSON.parse(event.data);
onEvent?.(event.type, data);
};
return () => eventSource.close();
}, [topic, onEvent]);
return connected;
}
// Component using the hook
function CommentList({ threadId }) {
const [comments, setComments] = useState([]);
// Handle SSE events
const handleSSEEvent = useCallback((eventType, data) => {
if (eventType === 'comment:created' || eventType === 'comment:updated') {
// Refresh data from API
fetchComments();
}
}, []);
const connected = useSSE(`comment_thread:${threadId}`, handleSSEEvent);
useEffect(() => {
// Initial load
fetchComments();
}, [threadId]);
const fetchComments = async () => {
const response = await fetch(`/comments?thread_id=${threadId}`);
const data = await response.json();
setComments(data);
};
return (
<div>
<div style={{ color: connected ? 'green' : 'red' }}>
{connected ? '๐ข Live' : '๐ด Offline'}
</div>
{comments.map(comment => (
<div key={comment.id} className="comment">
{comment.content}
</div>
))}
</div>
);
}
Python Client
import httpx
import json
async def listen_to_events():
"""Listen to SSE events from Python."""
async with httpx.AsyncClient() as client:
async with client.stream(
"GET",
"http://localhost:8000/events?topic=comments",
headers={"Accept": "text/event-stream"}
) as response:
async for line in response.aiter_lines():
if line.startswith("event:"):
event_type = line[7:].strip()
elif line.startswith("data:"):
data_str = line[5:].strip()
try:
data = json.loads(data_str)
print(f"Event: {event_type}, Data: {data}")
except json.JSONDecodeError:
pass
# Using sseclient-py library (simpler)
from sseclient import SSEClient
def listen_simple():
"""Simple SSE listening with sseclient-py."""
url = "http://localhost:8000/events?topic=comments"
for event in SSEClient(url):
if event.event != "heartbeat":
print(f"Event: {event.event}")
print(f"Data: {event.data}")
Deployment
Nginx Configuration
Disable buffering for SSE endpoints:
server {
listen 80;
server_name api.example.com;
location /events {
proxy_pass http://fastapi_backend;
proxy_http_version 1.1;
proxy_set_header Connection '';
# Critical: Disable buffering for SSE
proxy_buffering off;
proxy_cache off;
# Keep connection alive
proxy_read_timeout 86400s;
# Forward headers
proxy_set_header Host $host;
proxy_set_header X-Real-IP $remote_addr;
proxy_set_header X-Forwarded-For $proxy_add_x_forwarded_for;
proxy_set_header X-Forwarded-Proto $scheme;
}
location / {
proxy_pass http://fastapi_backend;
# Normal proxy settings
}
}
Traefik Configuration
http:
routers:
sse-router:
rule: "PathPrefix(`/events`)"
service: sse-service
services:
sse-service:
loadBalancer:
servers:
- url: "http://fastapi:8000"
# Disable buffering
responseForwarding:
flushInterval: "1ms"
Docker Compose
version: '3.8'
services:
redis:
image: redis:alpine
ports:
- "6379:6379"
api:
build: .
ports:
- "8000:8000"
environment:
- SSE_REDIS_URL=redis://redis:6379/0
depends_on:
- redis
# Scale horizontally
deploy:
replicas: 3
Horizontal Scaling
SSE Events works seamlessly across multiple instances:
- All instances connect to same Redis
- Events published by any instance reach all SSE clients
- No sticky sessions required
- Load balance normally
# Start multiple instances
uvicorn app:app --port 8000 &
uvicorn app:app --port 8001 &
uvicorn app:app --port 8002 &
# All instances share events via Redis
Production Deployment
FastAPI SSE Events is designed to scale from prototype to 100,000+ concurrent connections.
Architecture for 100K Users
โโโโโโโโโโโโโโโโโโโ
โ Nginx/ALB โ
โ Load Balancer โ
โโโโโโโโโโฌโโโโโโโโโ
โ
โโโโโโโโโโโโโโโโโโโโผโโโโโโโโโโโโโโโโโโโ
โ โ โ
โโโโโโผโโโโ โโโโโโผโโโโ โโโโโโผโโโโโ
โFastAPI โ โFastAPI โ ... โ FastAPI โ
โ #1 โ โ #2 โ โ #10 โ
โ10K connโ โ10K connโ โ 10K connโ
โโโโโโฌโโโโ โโโโโโฌโโโโ โโโโโโฌโโโโโ
โ โ โ
โโโโโโโโโโโโโโโโโโโโผโโโโโโโโโโโโโโโโโโโ
โ
โโโโโโโโโผโโโโโโโโโ
โ Redis Cluster โ
โ (3-5 nodes) โ
โโโโโโโโโโโโโโโโโโ
Key Metrics
- 10,000 connections per FastAPI instance
- ~100KB memory per connection
- < 10ms message latency
- 100K messages/sec throughput
- 99.9%+ uptime
Quick Start
cd examples/production_scale
./start.sh
This starts:
- 10 FastAPI instances (100K capacity)
- Redis Cluster (3 nodes)
- Nginx load balancer
- Prometheus + Grafana monitoring
Configuration for Scale
# .env.100k_users
SSE_REDIS_URL=redis://redis-1:7001,redis-2:7002,redis-3:7003/0
SSE_MAX_CONNECTIONS=10000 # Per instance
SSE_MAX_QUEUE_SIZE=50 # Memory optimization
SSE_MAX_MESSAGE_SIZE=32768 # 32KB limit
SSE_HEARTBEAT_SECONDS=30 # Efficient keepalive
Monitoring & Health Checks
Built-in endpoints for production monitoring:
GET /health # Basic health check
GET /health/ready # Readiness probe (load balancers)
GET /health/live # Liveness probe (Kubernetes)
GET /metrics # Detailed metrics (JSON)
GET /metrics/prometheus # Prometheus format
Key Metrics to Monitor:
# Concurrent connections
sum(sse_connections_current)
# Connection rejection rate (scale up if > 0)
rate(sse_connections_rejected[5m])
# Message drop rate (should be < 0.1%)
rate(sse_messages_dropped[5m]) / rate(sse_messages_delivered[5m])
# Publish latency
avg(sse_publish_latency_ms)
Production Checklist
Before deploying to production:
- Load Testing - Test with expected peak load ร 2
- Monitoring - Set up Prometheus + Grafana dashboards
- Alerting - Configure alerts for connection rejections, high latency
- Auto-scaling - Configure triggers (CPU > 70%, connections > 8K)
- Security - Enable Redis auth, use SSL, restrict CORS
- Persistence - Configure Redis persistence (AOF + RDB)
- Failover - Test Redis cluster failover scenarios
- Backpressure - Verify slow clients don't crash servers
- Rate Limiting - Implement API rate limits
- Logging - Set up centralized logging (ELK/Loki)
Performance Tuning
Redis Optimization:
# Increase max clients
redis-cli CONFIG SET maxclients 20000
# Optimize memory
redis-cli CONFIG SET maxmemory-policy allkeys-lru
redis-cli CONFIG SET maxmemory 4gb
# Disable persistence for pure cache (optional)
redis-cli CONFIG SET save ""
Linux Kernel Tuning:
# Increase file descriptors
ulimit -n 100000
# Increase network buffers
sysctl -w net.core.somaxconn=4096
sysctl -w net.ipv4.tcp_max_syn_backlog=8096
FastAPI Instance Sizing:
- CPU: 1-2 cores per instance
- Memory: 2GB base + 1GB per 10K connections
- Network: 100Mbps per 10K connections
Complete Guide
For detailed deployment instructions, architecture decisions, and troubleshooting:
๐ Complete 100K User Deployment Guide
Topics covered:
- Multi-region deployment
- Kubernetes manifests
- Cost optimization strategies
- Disaster recovery
- Performance benchmarking
- Security hardening
Examples
See the examples/ directory for complete working examples:
CRM Comment System
Full-featured collaborative comment system with:
- REST API for CRUD operations
- Real-time SSE updates
- Beautiful web interface
- Multi-client synchronization
cd examples/crm_comments
uvicorn app:app --reload
# Open client.html in multiple browsers
More examples coming soon:
- Ticket tracking system
- Live dashboard with metrics
- Collaborative document editing
- Multi-tenant workspace notifications
API Reference
SSEApp (Recommended)
Simplified FastAPI subclass with built-in SSE support.
from fastapi_sse_events import SSEApp
class SSEApp(FastAPI):
"""FastAPI app with SSE support pre-configured."""
def __init__(
self,
title: str = "API",
redis_url: str = "redis://localhost:6379/0",
heartbeat_seconds: int = 30,
authorize: Optional[AuthorizeFn] = None,
**kwargs
):
pass
Usage:
app = SSEApp(
title="My API",
redis_url="redis://localhost:6379/0",
heartbeat_seconds=30,
authorize=authorize_callback # Optional
)
Automatically Provides:
- โ
EventBroker at
app.state.event_broker - โ
SSE endpoint at
/events(with?topic=query param) - โ
Health checks at
/health* - โ
Metrics at
/metrics
Decorators
@publish_event()
Automatically publishes endpoint response to subscribers.
from fastapi_sse_events import publish_event
@app.post("/comments")
@publish_event(topic="comments", event="comment:created")
async def create_comment(request: Request, text: str):
# Save to database
comment = await db.save_comment(text)
# Response automatically published to all subscribers!
return comment
Parameters:
topic(str): Topic to publish toevent(str): Event type identifier
@subscribe_to_events()
Converts endpoint to SSE stream. Query param: ?topic=...
from fastapi_sse_events import subscribe_to_events
@app.get("/events")
@subscribe_to_events()
async def events(request: Request):
# Endpoint body can be empty
pass
Query Parameters:
topic(str, required): Topic to subscribe totopic=topic1,topic2: Multiple topics (comma-separated)
EventBroker
Manual event publishing (use decorators when possible).
class EventBroker:
async def publish(
self,
topic: str,
event: str,
data: dict[str, Any] | EventData
) -> None:
"""Publish event to all subscribers on topic."""
Example:
await app.state.event_broker.publish(
topic="comments",
event="comment:created",
data={"id": 1, "text": "Hello"}
)
Configuration (SSEApp Parameters)
| Parameter | Type | Default | Description |
|---|---|---|---|
title |
str | "API" | Application title |
redis_url |
str | "redis://localhost:6379/0" | Redis connection URL |
heartbeat_seconds |
int | 30 | SSE keepalive interval (5-60) |
authorize |
AuthorizeFn | None | Authorization callback (optional) |
Advanced: Manual Setup
For complex use cases, manually configure the broker:
from fastapi import FastAPI
from fastapi_sse_events import mount_sse, RealtimeConfig
app = FastAPI()
config = RealtimeConfig(
redis_url="redis://localhost:6379/0",
heartbeat_seconds=30,
)
broker = mount_sse(app, config, authorize=authorize_fn)
TopicBuilder (Optional)
Helper for consistent topic naming (optional).
from fastapi_sse_events import TopicBuilder
# All optional - plain strings work too
TopicBuilder.comment("c123") # โ "comment:c123"
TopicBuilder.task("t456") # โ "task:t456"
TopicBuilder.workspace("ws789") # โ "workspace:ws789"
TopicBuilder.user("john") # โ "user:john"
TopicBuilder.custom("invoice", "i001") # โ "invoice:i001"
Type Definitions
# Authorization callback type
AuthorizeFn = Callable[[Request, str], Awaitable[bool]]
# Event data
EventData = dict[str, Any] # Any JSON-serializable dict
# Example authorization
async def authorize(request: Request, topic: str) -> bool:
"""Return True to allow subscription, False to deny."""
user = await get_current_user(request)
return user is not None and topic.startswith(f"user:{user.id}")
Troubleshooting
SSE Connection Fails
Symptom: Browser can't connect to /events
Solutions:
- Check Redis is running:
redis-cli ping - Verify endpoint is registered: check FastAPI docs at
/docs - Check CORS settings if cross-origin
- Look for errors in server logs
Updates Not Appearing
Symptom: Events published but clients don't receive them
Solutions:
- Verify topic names match exactly
- Check Redis pub/sub:
redis-cli SUBSCRIBE topic_name - Ensure all instances connect to same Redis
- Check authorization function isn't denying access
- Look for JavaScript errors in browser console
High Memory Usage
Symptom: Redis or app memory grows over time
Solutions:
- Monitor Redis:
redis-cli INFO memory - Implement idle connection timeouts
- Reduce heartbeat frequency if many connections
- Set connection limits in load balancer
- Monitor disconnected clients
Proxy Buffering Issues
Symptom: Events delayed or not arriving
Solution: Disable buffering in proxy:
# Nginx
proxy_buffering off;
proxy_cache off;
# Apache
SetEnv proxy-nokeepalive 1
"Redis not connected" Error
Symptom: RuntimeError: Redis client not connected
Solutions:
- Check Redis is running and accessible
- Verify
redis_urlin config - Check network/firewall rules
- Ensure
mount_sse()called before app starts
Performance Tips
- Use topic scoping - Narrow topics reduce fan-out
- Keep payloads small - Send IDs, not full objects
- Implement idle timeouts - Clean up dead connections
- Monitor Redis - Watch memory and connection count
- Connection pooling - Reuse Redis connections
- Event batching - For high-frequency updates, debounce on client
Comparison
| Feature | SSE (this library) | WebSockets | Polling |
|---|---|---|---|
| Real-time | โ Near real-time | โ Real-time | โ Delayed |
| Infrastructure | โ HTTP (simple) | โ Complex | โ HTTP |
| Browser support | โ Native | โ Native | โ Native |
| Bidirectional | โ ServerโClient | โ Both | โ ClientโServer |
| Auto-reconnect | โ Built-in | โ Manual | N/A |
| Horizontal scaling | โ Via Redis | โ Sticky sessions | โ Stateless |
| Debugging | โ Easy | โ Harder | โ Easy |
| Use case | Notifications | Chat, games | Legacy |
๐ค Contributing
Contributions are welcome! Here's how you can help:
Development Setup
# Clone repository
git clone https://github.com/bhadri01/fastapi_sse_events.git
cd fastapi_sse_events
# Install with dev dependencies
pip install -e ".[dev]"
# Run tests
pytest tests/ -v
# Run linting
ruff check fastapi_sse_events/ tests/ examples/
ruff format fastapi_sse_events/ tests/ examples/
# Run type checking
mypy fastapi_sse_events/
Contribution Guidelines
- Fork the repository
- Create a feature branch (
git checkout -b feature/amazing-feature) - Add tests for new features
- Ensure all tests pass (
pytest) - Run linting and formatting (
ruff) - Commit your changes (
git commit -m 'Add amazing feature') - Push to the branch (
git push origin feature/amazing-feature) - Open a Pull Request
Reporting Issues
- Use GitHub Issues
- Include Python version, FastAPI version, and Redis version
- Provide minimal reproducible example
- Check existing issues first
๐ License
This project is licensed under the MIT License - see the LICENSE file for details.
Copyright ยฉ 2025 bhadri01
๐ Acknowledgments
- Built with FastAPI
- Powered by Redis
- SSE support via sse-starlette
๐ Related Projects
- fastapi-querybuilder - Advanced query building for FastAPI + SQLAlchemy
โญ Star History
If you find this project helpful, please consider giving it a star on GitHub!
Made with โค๏ธ by bhadri01
Support
- Issues: GitHub Issues
- Discussions: GitHub Discussions
- Email: support@example.com
Built with โค๏ธ for the FastAPI community
Project details
Release history Release notifications | RSS feed
Download files
Download the file for your platform. If you're not sure which to choose, learn more about installing packages.
Source Distribution
Built Distribution
Filter files by name, interpreter, ABI, and platform.
If you're not sure about the file name format, learn more about wheel file names.
Copy a direct link to the current filters
File details
Details for the file fastapi_sse_events-0.2.0.tar.gz.
File metadata
- Download URL: fastapi_sse_events-0.2.0.tar.gz
- Upload date:
- Size: 59.8 kB
- Tags: Source
- Uploaded using Trusted Publishing? No
- Uploaded via: poetry/2.3.2 CPython/3.10.19 Linux/6.14.0-1017-azure
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
e1aff539ce1567da5dda7600c3ff353e08da19efd62e836d379e0ce6b70ccc30
|
|
| MD5 |
b1d5fc69709a97c52f9b073b11decb6f
|
|
| BLAKE2b-256 |
1ca0e44abbdfc1f8725e31f400a99fec8fce73e0865cc955fa798626d7e74c5d
|
File details
Details for the file fastapi_sse_events-0.2.0-py3-none-any.whl.
File metadata
- Download URL: fastapi_sse_events-0.2.0-py3-none-any.whl
- Upload date:
- Size: 37.3 kB
- Tags: Python 3
- Uploaded using Trusted Publishing? No
- Uploaded via: poetry/2.3.2 CPython/3.10.19 Linux/6.14.0-1017-azure
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
8ac7418e55fd513456c0e1710dc785342924135d6b971828e860fc0f837d0db7
|
|
| MD5 |
f4f82680e62182793a7fc9234c028546
|
|
| BLAKE2b-256 |
921cab4295ddceb684f6bf5a44b1c6e4d5ce96aeac987c7c89f62d820652080f
|