Real-time Server-Sent Events (SSE) and HTMX integration library for FastHTML with cross-tab synchronization support.
Project description
cjm-fasthtml-sse
Install
pip install cjm_fasthtml_sse
Project Structure
nbs/
├── components/ (2)
│ ├── builders.ipynb # Generic builders for creating SSE-enabled elements with HTMX
│ └── monitors.ipynb # Client-side SSE connection monitoring and management utilities
├── core/ (3)
│ ├── broadcast.ipynb # SSE broadcast infrastructure for managing connections and broadcasting updates to multiple clients
│ ├── decorators.ipynb # Route decorators and utilities for FastHTML SSE endpoints
│ └── streaming.ipynb # SSE streaming utilities for creating and managing Server-Sent Events streams
└── integrations/ (1)
└── fasthtml.ipynb # High-level FastHTML integration for SSE with HTMX
Total: 6 notebooks across 3 directories
Module Dependencies
graph LR
components_builders[components.builders<br/>builders]
components_monitors[components.monitors<br/>monitors]
core_broadcast[core.broadcast<br/>broadcast]
core_decorators[core.decorators<br/>decorators]
core_streaming[core.streaming<br/>streaming]
integrations_fasthtml[integrations.fasthtml<br/>fasthtml]
core_decorators --> core_streaming
core_decorators --> core_broadcast
core_streaming --> core_broadcast
integrations_fasthtml --> components_builders
integrations_fasthtml --> core_streaming
integrations_fasthtml --> core_decorators
integrations_fasthtml --> core_broadcast
integrations_fasthtml --> components_monitors
8 cross-module dependencies detected
CLI Reference
No CLI commands found in this project.
Module Overview
Detailed documentation for each module in the project:
broadcast (broadcast.ipynb)
SSE broadcast infrastructure for managing connections and broadcasting updates to multiple clients
Import
from cjm_fasthtml_sse.core.broadcast import (
BroadcastEventType,
BroadcastMessage,
SSEBroadcastManager,
SSEConnection
)
Classes
class BroadcastEventType(Enum):
"Standard broadcast event types"
@dataclass
class BroadcastMessage:
"Structured broadcast message"
type: str
data: Dict[str, Any]
timestamp: str = field(...)
target_ids: Optional[List[str]] # Specific element IDs to update
metadata: Optional[Dict[str, Any]]
def to_dict(
self
) -> Dict[str, Any]: # Dictionary representation of the message
"Convert to dictionary for JSON serialization"
def to_json(
self
) -> str: # JSON string representation of the message
"Convert to JSON string"
class SSEBroadcastManager:
def __init__(self,
"Manages SSE connections and broadcasting across multiple clients. This class provides a centralized way to manage Server-Sent Events connections and broadcast messages to all connected clients, enabling real-time updates and cross-tab synchronization."
def __init__(self,
"Initialize the SSE Broadcast Manager."
async def register_connection(self,
queue: Optional[asyncio.Queue] = None, # Optional pre-existing queue (creates new if None)
metadata: Optional[Dict[str, Any]] = None # Optional metadata for this connection
) -> asyncio.Queue: # The queue for this connection
"Register a new client connection."
async def unregister_connection(
self,
queue: asyncio.Queue # The queue to unregister
)
"Remove a client connection."
async def broadcast(self,
event_type: str, # Type of event to broadcast
data: Dict[str, Any], # Data to broadcast
target_ids: Optional[List[str]] = None, # Optional list of element IDs to target for updates
metadata: Optional[Dict[str, Any]] = None # Optional metadata for the message
) -> int: # Number of clients successfully notified
"Broadcast a message to all connected clients."
async def run_message_handlers(
self,
message: BroadcastMessage # Message to process
) -> BroadcastMessage: # Potentially modified message
"Run message through registered handlers."
async def broadcast_message(
self,
message: BroadcastMessage # Message to broadcast
) -> int: # Number of clients successfully notified
"Broadcast a pre-constructed message to all connected clients."
def add_message_handler(self,
handler: Callable[[BroadcastMessage], Awaitable[Optional[BroadcastMessage]]] # Async function that takes and optionally returns a BroadcastMessage
)
"Add a message handler that can transform messages before broadcasting. Handlers are called in the order they are added and can modify messages or return None to pass through unchanged."
def get_connection_count(
self
) -> int: # Current number of active connections
"Get the current number of active connections"
def get_history(
self,
limit: Optional[int] = None # Maximum number of messages to return (None for all)
) -> List[BroadcastMessage]: # List of historical messages
"Get message history."
async def clear_history(self):
"""Clear the message history"""
self.history.clear()
self._log("Message history cleared")
async def close_all_connections(self)
"Clear the message history"
async def close_all_connections(self):
"""
Close all active connections. Useful for cleanup during shutdown.
"""
async with self.lock
"Close all active connections. Useful for cleanup during shutdown."
class SSEConnection:
def __init__(self,
manager: SSEBroadcastManager, # The SSEBroadcastManager to register with
metadata: Optional[Dict[str, Any]] = None # Optional metadata for this connection
)
"Context manager for SSE connections. Automatically registers/unregisters connections with the broadcast manager."
def __init__(self,
manager: SSEBroadcastManager, # The SSEBroadcastManager to register with
metadata: Optional[Dict[str, Any]] = None # Optional metadata for this connection
)
"Initialize SSE connection context."
builders (builders.ipynb)
Generic builders for creating SSE-enabled elements with HTMX
Import
from cjm_fasthtml_sse.components.builders import (
add_sse_attrs,
add_oob_swap,
SSEConfig,
OOBUpdate,
MultiUpdateBuilder,
generate_sse_cleanup_script,
get_htmx_sse_extension_url,
create_sse_attrs,
create_oob_attrs
)
Functions
def add_sse_attrs(
element: Any, # Any element with an attrs dictionary
endpoint: str, # SSE endpoint URL
event_name: str = "message", # SSE event to listen for
swap_mode: Optional[str] = None, # Optional HTMX swap mode **extra_attrs: Additional attributes to add
**extra_attrs
) -> Any: # The modified element
"Add SSE attributes to any element. This function modifies an element in-place by adding the necessary HTMX SSE attributes. It's framework-agnostic and works with any element that has an attrs dictionary."
def add_oob_swap(
"Add out-of-band swap attributes to any element."
def generate_sse_cleanup_script(
) -> str: # JavaScript code as a string
"Generate JavaScript for proper SSE connection cleanup."
def get_htmx_sse_extension_url(
version: str = "2.2.3" # Version of the extension
) -> str: # CDN URL as a string
"Get the CDN URL for the HTMX SSE extension."
def create_sse_attrs(
endpoint: str, # SSE endpoint
event: str = "message", # Event name to listen for
swap: Optional[str] = None, # Optional swap mode **kwargs: Additional attributes
**kwargs
) -> Dict[str, str]: # Dictionary of attributes
"Create a dictionary of SSE attributes. This is a convenience function for creating SSE attributes that can be spread into element constructors."
def create_oob_attrs(
element_id: str, # ID of target element
swap_type: str = "innerHTML" # Type of swap
) -> Dict[str, str]: # Dictionary of attributes
"Create a dictionary of OOB swap attributes."
Classes
@dataclass
class SSEConfig:
"""
Configuration for SSE-enabled elements.
This dataclass provides a clean way to configure SSE attributes
that can be applied to any element.
"""
endpoint: str
event_name: str = 'message'
swap_mode: Optional[str]
reconnect_time: Optional[int]
extra_attrs: Dict[str, Any] = field(...)
def apply_to(
self,
element: Any # Element to configure
) -> Any: # The configured element
"Apply this SSE configuration to an element."
def to_attrs(
self
) -> Dict[str, str]: # Dictionary of HTMX SSE attributes
"Convert configuration to attribute dictionary."
@dataclass
class OOBUpdate:
"""
Represents an out-of-band update.
This is a generic container for OOB updates that doesn't
depend on specific element types.
"""
element: Any
swap_type: str = 'innerHTML'
target_id: Optional[str]
def prepare(
self
) -> Any: # Element with OOB swap attributes
"Prepare element with OOB attributes."
class MultiUpdateBuilder:
def __init__(
self,
container_factory: Optional[Callable] = None # Optional factory function for creating containers. Should accept *children as arguments.
)
"""
Builder for creating multiple OOB updates.
This builder is framework-agnostic and works with any elements.
"""
def __init__(
self,
container_factory: Optional[Callable] = None # Optional factory function for creating containers. Should accept *children as arguments.
)
"Initialize the builder."
def add(
self,
element: Any, # Element to add
swap_type: str = "innerHTML", # Type of swap
target_id: Optional[str] = None # Optional target ID
) -> 'MultiUpdateBuilder': # Self for chaining
"Add an update to the builder."
def add_many(
self,
updates: List[Union[OOBUpdate, tuple, Any]] # List of updates (OOBUpdate objects, tuples, or elements)
) -> 'MultiUpdateBuilder': # Self for chaining
"Add multiple updates at once."
def build(
self
) -> Any: # Container with all prepared OOB updates
"Build the final multi-update container."
def clear(
self
) -> 'MultiUpdateBuilder': # Self for chaining
"Clear all pending updates."
decorators (decorators.ipynb)
Route decorators and utilities for FastHTML SSE endpoints
Import
from cjm_fasthtml_sse.core.decorators import (
sse_endpoint,
broadcast_action,
sse_generator_endpoint
)
Functions
def sse_endpoint(
broadcast_manager: Optional[SSEBroadcastManager] = None, # Optional SSEBroadcastManager for automatic broadcasting
config: Optional[SSEStreamConfig] = None, # Optional SSEStreamConfig for stream configuration
message_filter: Optional[Callable[[BroadcastMessage], bool]] = None,
client_metadata_fn: Optional[Callable[..., Dict[str, Any]]] = None
)
"Decorator for creating SSE endpoints with optional broadcast integration. This decorator can work in two modes: 1. With broadcast_manager: Automatically connects to broadcast stream 2. Without broadcast_manager: Wraps custom async generator functions"
def broadcast_action(
manager: SSEBroadcastManager, # SSEBroadcastManager to broadcast through
event_type: Optional[str] = None, # Event type to broadcast (defaults to function name)
extract_data: Optional[Callable[..., Dict[str, Any]]] = None,
broadcast_before: bool = False, # Broadcast before executing function
broadcast_after: bool = True, # Broadcast after executing function
include_result: bool = True # Include function result in broadcast data
)
"Decorator that broadcasts events when actions occur. This decorator automatically broadcasts SSE messages when the decorated function is called, useful for notifying clients of state changes."
def sse_generator_endpoint(
interval: float = 1.0, # Seconds between data checks
event_type: Optional[str] = None, # Optional event type for messages
heartbeat: Optional[float] = 30.0 # Heartbeat interval in seconds
)
"Decorator for creating simple SSE endpoints from data functions. This decorator converts a function that returns data into an SSE streaming endpoint."
fasthtml (fasthtml.ipynb)
High-level FastHTML integration for SSE with HTMX
Import
from cjm_fasthtml_sse.integrations.fasthtml import (
FastHTMLSSEConfig,
FastHTMLSSE,
setup_sse,
create_sse_page_template
)
Functions
def setup_sse(
app: Any, # FastHTML application
prefix: str = "/sse", # URL prefix for SSE routes
**kwargs # Additional configuration options
) -> FastHTMLSSE: # Configured FastHTMLSSE instance
"Quick setup function for SSE in FastHTML apps."
def create_sse_page_template(
sse: FastHTMLSSE, # FastHTMLSSE instance
title: str = "SSE Demo", # Page title
content: Any = None # Page content
)
"Create a basic page template with SSE setup."
Classes
@dataclass
class FastHTMLSSEConfig:
"""
Configuration for FastHTML SSE integration.
Attributes:
app: FastHTML application instance
prefix: URL prefix for SSE endpoints
enable_monitor: Add connection monitoring scripts
enable_cleanup: Add cleanup script for proper SSE closure
enable_htmx_extension: Add HTMX SSE extension script
monitor_debug: Enable debug logging in monitor
broadcast_history: Number of messages to keep in history
default_stream_config: Default configuration for SSE streams
"""
app: Any # FastHTML app
prefix: str = '/sse'
enable_monitor: bool = True
enable_cleanup: bool = True
enable_htmx_extension: bool = True
monitor_debug: bool = False
broadcast_history: int = 50
default_stream_config: Optional[SSEStreamConfig]
class FastHTMLSSE:
def __init__(
self,
config: FastHTMLSSEConfig # Configuration for the SSE integration
)
"""
Main integration class for SSE with FastHTML.
This class provides a high-level API for adding SSE capabilities
to FastHTML applications with minimal configuration.
"""
def __init__(
self,
config: FastHTMLSSEConfig # Configuration for the SSE integration
)
"Initialize FastHTML SSE integration.
Args:
config: Configuration for the integration"
async def broadcast(
self,
event_type: str, # Type of event to broadcast
data: Dict[str, Any], # Data payload to broadcast
target_ids: Optional[List[str]] = None # Optional list of element IDs to update
) -> int: # Number of clients successfully notified
"Broadcast a message to all connected clients.
Args:
event_type: Type of event
data: Data to broadcast
target_ids: Optional list of element IDs to update
Returns:
Number of clients notified"
def create_sse_element(
self,
element_id: str, # ID for the HTML element
endpoint: Optional[str] = None, # SSE endpoint URL
content: Any = None, # Initial content for the element
hidden: bool = False, # Whether to hide the element
**kwargs
)
"Create an SSE-enabled element.
Args:
element_id: ID for the element
endpoint: SSE endpoint (defaults to global)
content: Initial content
hidden: Whether to hide the element
**kwargs: Additional attributes
Returns:
Configured element"
def create_monitor(
self,
sse_element_id: str = "global-sse", # ID for the SSE connection element
status_element_id: str = "connection-status" # ID for the status display element
)
"Create connection monitor elements.
Args:
sse_element_id: ID for SSE connection element
status_element_id: ID for status display element
Returns:
Tuple of (sse_element, status_element, monitor_script)"
def create_oob_update(
self,
updates: List[tuple] # List of (element_id, content, swap_type) tuples
) -> str: # Formatted SSE message with OOB updates
"Create an OOB update message.
Args:
updates: List of (element_id, content, swap_type) tuples
Returns:
Formatted SSE message with OOB updates"
def sse_route(
self,
path: str, # Route path for the SSE endpoint
message_filter: Optional[Callable] = None # Optional filter function for messages
)
"Decorator for creating SSE routes.
Args:
path: Route path
message_filter: Optional filter function
Returns:
Route decorator"
def action_route(
self,
path: str, # Route path for the action endpoint
event_type: Optional[str] = None, # Event type to broadcast after action
method: str = "post" # HTTP method for the route
)
"Decorator for creating action routes that broadcast.
Args:
path: Route path
event_type: Event type to broadcast
method: HTTP method
Returns:
Route decorator"
monitors (monitors.ipynb)
Client-side SSE connection monitoring and management utilities
Import
from cjm_fasthtml_sse.components.monitors import (
ConnectionState,
MonitorConfig,
generate_monitor_script,
generate_simple_monitor,
generate_connection_counter
)
Functions
def generate_monitor_script(
config: MonitorConfig, # Monitor configuration
status_indicators: Optional[Dict[ConnectionState, str]] = None
) -> str: # JavaScript code as a string
"Generate JavaScript for monitoring SSE connections."
def generate_simple_monitor(
sse_element_id: str, # ID of SSE element to monitor
status_element_id: Optional[str] = None, # Optional status display element
debug: bool = False # Enable console logging
) -> str: # JavaScript code for basic monitoring
"Generate a simple SSE connection monitor."
def generate_connection_counter(
) -> str: # JavaScript code that tracks connection count
"Generate JavaScript for counting active SSE connections."
Classes
class ConnectionState(Enum):
"SSE connection states"
@dataclass
class MonitorConfig:
"""
Configuration for SSE connection monitoring.
Attributes:
sse_element_id: ID of the SSE connection element to monitor
status_element_id: ID of element to update with connection status
auto_reconnect: Whether to automatically reconnect on failures
reconnect_delay: Delay in ms before reconnection attempts
max_reconnect_attempts: Maximum number of reconnection attempts (None = infinite)
visibility_reconnect: Reconnect when tab becomes visible
heartbeat_timeout: Timeout in ms to consider connection dead (None = no timeout)
debug: Enable console logging
callbacks: JavaScript callbacks for state changes
"""
sse_element_id: str
status_element_id: Optional[str]
auto_reconnect: bool = True
reconnect_delay: int = 3000
max_reconnect_attempts: Optional[int]
visibility_reconnect: bool = True
heartbeat_timeout: Optional[int]
debug: bool = False
callbacks: Dict[str, str] = field(...)
def to_js_config(
self
) -> str: # JSON string of the JavaScript configuration object
"Convert to JavaScript configuration object"
streaming (streaming.ipynb)
SSE streaming utilities for creating and managing Server-Sent Events streams
Import
from cjm_fasthtml_sse.core.streaming import (
format_sse_message,
format_sse_comment,
SSEStreamConfig,
sse_broadcast_stream,
sse_generator,
OOBUpdate,
format_oob_updates,
create_oob_message
)
Functions
def format_sse_message(
data: Union[str, Dict[str, Any], Any],
event: Optional[str] = None, # Optional event type
id: Optional[str] = None, # Optional event ID
retry: Optional[int] = None # Optional retry interval in milliseconds
) -> str: # Formatted SSE message string
"Format data as a Server-Sent Event message."
def format_sse_comment(
comment: str # Comment text
) -> str: # Formatted SSE comment
"Format a comment for SSE (typically used for heartbeats)."
async def sse_broadcast_stream(
manager: SSEBroadcastManager, # The SSEBroadcastManager to connect to
client_metadata: Optional[Dict[str, Any]] = None,
config: Optional[SSEStreamConfig] = None, # Optional stream configuration
message_filter: Optional[Callable[[BroadcastMessage], bool]] = None
) -> AsyncIterator[str]: # Async iterator yielding formatted SSE messages
"Create an SSE stream that receives messages from a broadcast manager. This is the main function for creating SSE endpoints that receive broadcast messages and format them for SSE delivery."
async def sse_generator(
data_source: Union[AsyncIterator, Callable],
interval: float = 1.0, # Seconds between data checks (if using callable)
event_type: Optional[str] = None, # Optional event type for all messages
heartbeat: Optional[float] = 30.0 # Heartbeat interval in seconds (None to disable)
) -> AsyncIterator[str]: # Async iterator yielding formatted SSE messages
"Create an SSE stream from a data source. This is useful for creating SSE streams from custom data sources without using the broadcast manager."
def format_oob_updates(
updates: List[OOBUpdate] # List of OOB updates to format
) -> str: # Combined HTML string with all OOB updates
"Format multiple OOB updates into a single HTML string."
def create_oob_message(
updates: List[OOBUpdate], # List of OOB updates
event: str = "message" # SSE event type
) -> str: # Formatted SSE message with OOB HTML
"Create an SSE message containing OOB updates for HTMX."
Classes
@dataclass
class SSEStreamConfig:
"Configuration for SSE streams."
heartbeat_interval: Optional[float] = 30.0 # Seconds between heartbeat messages (None to disable)
reconnect_time: Optional[int] = 3000 # Suggested reconnect time for clients in milliseconds
timeout: Optional[float] # Maximum time to wait for messages before closing stream
initial_event: Optional[Dict[str, Any]] # Event to send when stream starts
closing_event: Optional[Dict[str, Any]] # Event to send when stream closes
debug: bool = False # Enable debug logging
@dataclass
class OOBUpdate:
"Represents an out-of-band update for HTMX."
target_id: str # The ID of the element to update
content: str # The HTML content to swap in
swap_type: str = 'innerHTML' # The type of swap (innerHTML, outerHTML, etc.)
def to_html(
self
) -> str: # HTML string with hx-swap-oob attribute for HTMX
"Convert to HTML with hx-swap-oob attribute"
Project details
Release history Release notifications | RSS feed
Download files
Download the file for your platform. If you're not sure which to choose, learn more about installing packages.
Source Distribution
Built Distribution
Filter files by name, interpreter, ABI, and platform.
If you're not sure about the file name format, learn more about wheel file names.
Copy a direct link to the current filters
File details
Details for the file cjm_fasthtml_sse-0.0.3.tar.gz.
File metadata
- Download URL: cjm_fasthtml_sse-0.0.3.tar.gz
- Upload date:
- Size: 34.3 kB
- Tags: Source
- Uploaded using Trusted Publishing? No
- Uploaded via: twine/6.1.0 CPython/3.11.13
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
c879523a475d068d4842a99578dd93578881e5bec31cd2e959605c5cf40865ea
|
|
| MD5 |
1f9585829ef560c69958055290d25ace
|
|
| BLAKE2b-256 |
e9b0a2673f00e267ba205af279b89580236b4491e380e0abc1e1218b4442ff72
|
File details
Details for the file cjm_fasthtml_sse-0.0.3-py3-none-any.whl.
File metadata
- Download URL: cjm_fasthtml_sse-0.0.3-py3-none-any.whl
- Upload date:
- Size: 32.1 kB
- Tags: Python 3
- Uploaded using Trusted Publishing? No
- Uploaded via: twine/6.1.0 CPython/3.11.13
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
47a98c99ec0cc76cd3478bef66b8f19db7f3e71e96f6bebbe048da47cb0bfe26
|
|
| MD5 |
280a1b359d871a7243c6d616887069b1
|
|
| BLAKE2b-256 |
f5a664026cca37534b21a4ee5a6ed0631a1a1ea0c15bb40953bb95cdb86f32e3
|