Real-time Server-Sent Events (SSE) and HTMX integration library for FastHTML with cross-tab synchronization support.
Project description
cjm-fasthtml-sse
Install
pip install cjm_fasthtml_sse
Project Structure
nbs/
└── core/ (6)
├── broadcast.ipynb # Broadcasting infrastructure for SSE cross-tab synchronization
├── connections.ipynb # Connection management for SSE clients
├── multi_stream.ipynb # Manage multiple related SSE streams for a single entity
├── response.ipynb # SSE response builders for complex UI updates
├── routes.ipynb # SSE route helpers and decorators for FastHTML
└── streaming.ipynb # SSE streaming utilities and helpers
Total: 6 notebooks across 4 directories
Module Dependencies
graph LR
core_broadcast[core.broadcast<br/>Broadcast]
core_connections[core.connections<br/>Connections]
core_multi_stream[core.multi_stream<br/>Multi Stream]
core_response[core.response<br/>Response]
core_routes[core.routes<br/>Routes]
core_streaming[core.streaming<br/>Streaming]
core_multi_stream --> core_connections
core_multi_stream --> core_streaming
core_response --> core_streaming
core_routes --> core_connections
core_routes --> core_streaming
5 cross-module dependencies detected
CLI Reference
No CLI commands found in this project.
Module Overview
Detailed documentation for each module in the project:
Broadcast (broadcast.ipynb)
Broadcasting infrastructure for SSE cross-tab synchronization
Import
from cjm_fasthtml_sse.core.broadcast import (
BroadcastMessage,
BroadcastManager,
create_broadcast_handler,
setup_broadcast_routes
)
Functions
def create_broadcast_handler(manager: BroadcastManager,
element_builder: Optional[Callable] = None)
"Create a broadcast handler function that can be used with FastHTML routes."
def setup_broadcast_routes(app,
manager: BroadcastManager, # The broadcast manager instance
prefix: str = "/sse", # URL prefix for SSE endpoints
element_builder: Optional[Callable] = None)
"Setup broadcast routes on a FastHTML app."
Classes
@dataclass
class BroadcastMessage:
"Standard broadcast message format for SSE communication"
type: str
data: Dict[str, Any]
timestamp: str = field(...)
metadata: Optional[Dict[str, Any]]
def to_dict(
self
) -> Dict[str, Any]: # Dictionary representation of the message
"Convert message to dictionary format"
def to_json(
self
) -> str: # JSON string representation of the message
"Convert message to JSON string"
def to_sse(
self,
event_type: Optional[str] = 'message' # SSE event type for the message
) -> str: # SSE formatted message string
"Convert to SSE message format using FastHTML's sse_message"
class BroadcastManager:
def __init__(self,
"Manages SSE broadcast connections across multiple tabs/clients"
def __init__(self,
"Initialize the broadcast manager."
async def register(self,
connection_id: Optional[str] = None, # Optional ID for the connection (auto-generated if not provided)
metadata: Optional[Dict[str, Any]] = None # Optional metadata for the connection
) -> tuple[str, asyncio.Queue]: # Tuple of (connection_id, queue)
"Register a new connection and return its queue."
async def unregister(
self,
connection_id: str # ID of the connection to unregister
)
"Unregister a connection."
async def broadcast(self,
message_type: str, # Type of the message
data: Dict[str, Any], # Message data
metadata: Optional[Dict[str, Any]] = None, # Optional metadata
exclude: Optional[Set[str]] = None # Set of connection IDs to exclude from broadcast
) -> int: # Number of successful broadcasts
"Broadcast a message to all connected clients."
async def send_to(self,
connection_id: str, # Target connection ID
message_type: str, # Type of the message
data: Dict[str, Any], # Message data
metadata: Optional[Dict[str, Any]] = None # Optional metadata
) -> bool: # True if successful, False otherwise
"Send a message to a specific connection."
def get_connection_count(
self
) -> int: # Number of active connections
"Get the number of active connections."
def get_history(
self,
limit: Optional[int] = None # Maximum number of messages to return
) -> list[BroadcastMessage]: # List of broadcast messages from history
"Get broadcast history."
Connections (connections.ipynb)
Connection management for SSE clients
Import
from cjm_fasthtml_sse.core.connections import (
ConnectionState,
SSEConnection,
ConnectionRegistry,
create_sse_element,
cleanup_sse_on_unload,
create_reconnection_script,
create_connection_manager_script
)
Functions
def create_sse_element(endpoint: str,
element_id: Optional[str] = None, # Optional element ID
swap_strategy: str = "message", # HTMX swap strategy (message, innerHTML, outerHTML, etc.)
hidden: bool = False, # Whether to hide the element
**attrs # Additional attributes for the element
) -> Div: # SSE-enabled Div element configured for HTMX
"Create an SSE-enabled HTML element."
def cleanup_sse_on_unload(
) -> Script: # Script element for cleanup
"Create a script to clean up SSE connections on page unload."
def create_reconnection_script(check_interval: int = 5000,
max_retries: int = 5, # Maximum number of reconnection attempts
debug: bool = False # Enable debug logging
) -> Script: # Script element with reconnection logic
"Create a script for automatic SSE reconnection."
def create_connection_manager_script(registry_endpoint: str = "/sse/connections",
update_interval: int = 10000 # Interval for updating connection stats (in milliseconds)
) -> Script: # Script element with connection management logic
"Create a script to manage and monitor connections."
Classes
class ConnectionState(Enum):
"States for SSE connections"
@dataclass
class SSEConnection:
"Represents a single SSE connection"
connection_id: str
queue: asyncio.Queue
connection_type: str = 'global'
state: ConnectionState = ConnectionState.CONNECTING
metadata: Dict[str, Any] = field(...)
created_at: datetime = field(...)
last_activity: datetime = field(...)
message_count: int = 0
async def send(
self,
data: Any, # Data to send
timeout: float = 1.0 # Timeout for the send operation
) -> bool: # True if successful, False otherwise
"Send data through the connection queue."
async def heartbeat(
self
) -> str: # SSE formatted heartbeat message
"Generate a heartbeat message."
def close(self):
"""Mark the connection as closed."""
self.state = ConnectionState.DISCONNECTED
def is_active(
self
) -> bool: # True if connection is active, False otherwise
"Mark the connection as closed."
def is_active(
self
) -> bool: # True if connection is active, False otherwise
"Check if connection is active."
class ConnectionRegistry:
def __init__(
self,
debug: bool = False # Enable debug logging
)
"Registry to track and manage SSE connections"
def __init__(
self,
debug: bool = False # Enable debug logging
)
"Initialize the connection registry."
async def add_connection(self,
conn_id: Optional[str] = None, # Optional connection ID (auto-generated if not provided)
conn_type: str = "global", # Type of connection (e.g., 'global', 'job', 'user')
queue_size: int = 100, # Size of the message queue
metadata: Optional[Dict[str, Any]] = None # Optional metadata for the connection
) -> SSEConnection: # The created SSEConnection
"Add a new connection to the registry."
async def remove_connection(
self,
conn_id: str # Connection ID to remove
)
"Remove a connection from the registry."
def get_connection(
self,
conn_id: str # Connection ID
) -> Optional[SSEConnection]: # The connection if found, None otherwise
"Get a specific connection."
def get_connections(
self,
conn_type: Optional[str] = None # Optional connection type to filter by
) -> list[SSEConnection]: # List of connections
"Get connections, optionally filtered by type."
def get_active_connections(
self,
conn_type: Optional[str] = None # Optional connection type to filter by
) -> list[SSEConnection]: # List of active connections
"Get active connections."
def get_stats(
self
) -> Dict[str, Any]: # Dictionary with connection statistics
"Get registry statistics."
Multi Stream (multi_stream.ipynb)
Manage multiple related SSE streams for a single entity
Import
from cjm_fasthtml_sse.core.multi_stream import (
StreamEndpoint,
MultiStreamManager,
create_multi_stream_row
)
Functions
def create_multi_stream_row(
manager: MultiStreamManager, # Multi-stream manager
entity_id: str, # Entity ID
streams: List[Dict[str, Any]], # List of stream configurations
row_builder: Callable, # Function to build the row
active: bool = True # Whether streams should be active
) -> FT: # Table row or similar element
"Create a row element with multiple SSE streams."
Classes
@dataclass
class StreamEndpoint:
"Configuration for a single SSE stream endpoint"
name: str # Stream name (e.g., 'progress', 'status')
path_suffix: str # URL path suffix (e.g., '_progress', '_status')
data_source: Callable # Async generator or callable that produces data
transform_fn: Optional[Callable] # Optional transform function
config: Optional[StreamConfig] # Stream configuration
metadata: Dict[str, Any] = field(...) # Additional metadata
class MultiStreamManager:
def __init__(
self,
base_path: str = "/stream", # Base path for stream endpoints
connection_registry: Optional[ConnectionRegistry] = None, # Optional shared registry
default_config: Optional[StreamConfig] = None, # Default config for all streams
debug: bool = False # Enable debug logging
)
"Manages multiple related SSE streams for entities"
def __init__(
self,
base_path: str = "/stream", # Base path for stream endpoints
connection_registry: Optional[ConnectionRegistry] = None, # Optional shared registry
default_config: Optional[StreamConfig] = None, # Default config for all streams
debug: bool = False # Enable debug logging
)
"Initialize the multi-stream manager."
def register_endpoint(
self,
endpoint: StreamEndpoint # Stream endpoint configuration
) -> 'MultiStreamManager': # Self for chaining
"Register a stream endpoint."
def create_element(
self,
entity_id: str, # Entity ID (e.g., job_id, user_id)
stream_name: str, # Stream name to connect to
element_id: Optional[str] = None, # Optional element ID
wrapper: Optional[Callable] = None, # Optional wrapper function for the element
**attrs # Additional attributes
) -> FT: # HTMX-enabled SSE element
"Create an SSE-enabled element for a specific stream."
def create_handler(
self,
stream_name: str, # Stream name
get_entity_fn: Callable, # Function to get entity state
is_active_fn: Callable # Function to check if entity is active
) -> Callable: # Route handler function
"Create a route handler for a specific stream."
def setup_routes(
self,
app, # FastHTML app
get_entity_fn: Callable, # Function to get entity state
is_active_fn: Callable # Function to check if entity is active
)
"Setup all stream routes on the app."
Response (response.ipynb)
SSE response builders for complex UI updates
Import
from cjm_fasthtml_sse.core.response import (
UpdateRule,
SSEResponseBuilder,
create_conditional_response,
create_state_response_builder
)
Functions
def create_conditional_response(
conditions: List[Tuple[Callable, Callable]], # List of (condition, builder) tuples
always_include: Optional[List[Callable]] = None, # Builders that always run
context: Optional[Dict[str, Any]] = None # Initial context
) -> SSEResponseBuilder: # Configured response builder
"Create a response builder with predefined conditions."
def create_state_response_builder(
state_builders: Dict[str, Callable], # Mapping of state names to builders
get_state_fn: Callable, # Function to determine current state
default_builder: Optional[Callable] = None # Default builder if no state matches
) -> SSEResponseBuilder: # Configured response builder
"Create a response builder for state-based updates."
Classes
@dataclass
class UpdateRule:
"Rule for conditional element updates"
condition: Callable # Function that returns True if rule should apply
builder: Callable # Function that builds the element(s)
target_id: Optional[str] # Target element ID for OOB swap
swap_mode: str = 'innerHTML' # Swap mode
priority: int = 0 # Higher priority rules are evaluated first
class SSEResponseBuilder:
def __init__(
self,
debug: bool = False # Enable debug logging
)
"Builder for complex SSE responses with conditional updates"
def __init__(
self,
debug: bool = False # Enable debug logging
)
"Initialize the response builder."
def add_rule(
self,
condition: Callable, # Condition function
builder: Callable, # Element builder function
target_id: Optional[str] = None, # Target ID for OOB
swap_mode: str = "innerHTML", # Swap mode
priority: int = 0 # Rule priority
) -> 'SSEResponseBuilder': # Self for chaining
"Add a conditional update rule."
def add_always(
self,
builder: Callable # Element builder that always runs
) -> 'SSEResponseBuilder': # Self for chaining
"Add a builder that always runs."
def set_context(
self,
**kwargs # Context variables
) -> 'SSEResponseBuilder': # Self for chaining
"Set context variables for builders."
def build(
self,
**kwargs # Additional context for this build
) -> FT: # Built response
"Build the response based on rules and context."
def clear_rules(
self
) -> 'SSEResponseBuilder': # Self for chaining
"Clear all rules."
def clear_always(
self
) -> 'SSEResponseBuilder': # Self for chaining
"Clear always-include builders."
Routes (routes.ipynb)
SSE route helpers and decorators for FastHTML
Import
from cjm_fasthtml_sse.core.routes import (
SSERouteConfig,
create_sse_route,
async_error_generator,
sse_route,
setup_sse_routes,
create_conditional_sse_route
)
Functions
def create_sse_route(
data_generator: Callable, # Async generator function
transform_fn: Optional[Callable] = None, # Optional transform function
config: Optional[SSERouteConfig] = None # Route configuration
) -> Callable: # Route handler
"Create an SSE route handler with automatic connection management."
async def async_error_generator(message: str)
"Generate an error message for SSE."
def sse_route(
path: Optional[str] = None, # Route path
transform_fn: Optional[Callable] = None, # Transform function
config: Optional[SSERouteConfig] = None # Route configuration
) -> Callable: # Decorator
"Decorator to create SSE routes."
def setup_sse_routes(
app, # FastHTML app
*handlers, # SSE route handlers created with @sse_route
prefix: str = "", # URL prefix
registry: Optional[ConnectionRegistry] = None # Shared registry
)
"Setup multiple SSE routes on an app."
def create_conditional_sse_route(
active_generator: Callable, # Generator for active entities
inactive_content: Callable, # Function to generate content for inactive entities
is_active_fn: Callable, # Function to check if entity is active
transform_fn: Optional[Callable] = None, # Transform function
config: Optional[SSERouteConfig] = None # Route configuration
) -> Callable: # Route handler
"Create an SSE route that handles both active and inactive states."
Classes
@dataclass
class SSERouteConfig:
"Configuration for SSE route"
connection_type: str = 'sse' # Connection type for registry
stream_config: Optional[StreamConfig] # Stream configuration
registry: Optional[ConnectionRegistry] # Connection registry
validate_fn: Optional[Callable] # Validation function
metadata_fn: Optional[Callable] # Function to generate metadata
error_handler: Optional[Callable] # Error handler function
debug: bool = False # Enable debug logging
Streaming (streaming.ipynb)
SSE streaming utilities and helpers
Import
from cjm_fasthtml_sse.core.streaming import (
StreamConfig,
SSEStream,
OOBStreamBuilder
)
Classes
@dataclass
class StreamConfig:
"Configuration for SSE streaming"
heartbeat_interval: float = 30.0
timeout: Optional[float]
send_initial_message: bool = True
initial_message: str = 'Connected'
send_close_message: bool = True
close_message: str = 'Connection closed'
debug: bool = False
class SSEStream:
def __init__(
self,
config: Optional[StreamConfig] = None # Stream configuration
)
"Generic SSE stream handler"
def __init__(
self,
config: Optional[StreamConfig] = None # Stream configuration
)
"Initialize the SSE stream."
async def stream(self,
data_source: Union[AsyncGenerator, Callable], # Async generator or callable that produces data
transform_fn: Optional[Callable] = None # Optional function to transform data before sending
) -> AsyncGenerator[str, None]: # SSE formatted strings
"Stream data from a source through SSE."
def stop(self)
"Stop the stream."
class OOBStreamBuilder:
def __init__(self):
"""Initialize the OOB stream builder."""
self.elements: List[Any] = []
"Build SSE messages with OOB (Out-of-Band) swaps"
def __init__(self):
"""Initialize the OOB stream builder."""
self.elements: List[Any] = []
"Initialize the OOB stream builder."
def add_element(self,
element: Any, # The element to add
target_id: Optional[str] = None, # Target element ID for OOB swap
swap_mode: str = "innerHTML", # Swap mode (innerHTML, outerHTML, beforeend, afterbegin, etc.)
wrap: bool = True # If True and target_id is provided, wrap content in a Div with OOB attributes. If False, add OOB attributes directly to the element
) -> 'OOBStreamBuilder': # Self for chaining
"Add an element with OOB swap configuration."
def add_elements(
self,
elements: List[tuple] # List of tuples: (element, target_id, swap_mode, wrap) or (element, target_id, swap_mode) or (element, target_id) or (element,)
) -> 'OOBStreamBuilder': # Self for chaining
"Add multiple elements with OOB configurations."
def build(
self
) -> FT: # Div with all elements
"Build the Div element with all elements."
def clear(
self
) -> 'OOBStreamBuilder': # Self for chaining
"Clear all elements."
Project details
Release history Release notifications | RSS feed
Download files
Download the file for your platform. If you're not sure which to choose, learn more about installing packages.
Source Distribution
Built Distribution
Filter files by name, interpreter, ABI, and platform.
If you're not sure about the file name format, learn more about wheel file names.
Copy a direct link to the current filters
File details
Details for the file cjm_fasthtml_sse-0.0.15.tar.gz.
File metadata
- Download URL: cjm_fasthtml_sse-0.0.15.tar.gz
- Upload date:
- Size: 30.3 kB
- Tags: Source
- Uploaded using Trusted Publishing? No
- Uploaded via: twine/6.1.0 CPython/3.11.13
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
9c65c820202306da04e9144fee191dfa30c0deed06be1bac50fba8a35540939e
|
|
| MD5 |
ceeb41606a73d8c648461db9f7dc9c2e
|
|
| BLAKE2b-256 |
76c5f294bb59b40fa58959fafb72515a97be368c9b3fb7bd37c28c0935fa3fd7
|
File details
Details for the file cjm_fasthtml_sse-0.0.15-py3-none-any.whl.
File metadata
- Download URL: cjm_fasthtml_sse-0.0.15-py3-none-any.whl
- Upload date:
- Size: 28.6 kB
- Tags: Python 3
- Uploaded using Trusted Publishing? No
- Uploaded via: twine/6.1.0 CPython/3.11.13
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
41a2e6c3b9537864f38a57553d0a1d6bcb732f677769e5a33799f29b10a546fe
|
|
| MD5 |
a555c65e7ac290106bc4c9928fc0d9fe
|
|
| BLAKE2b-256 |
012b12391946c1584d1617ecd9bae737cee96b2524b0e17524d7f7ae58e1189a
|