Real-time Server-Sent Events (SSE) and HTMX integration library for FastHTML with cross-tab synchronization support.
Project description
cjm-fasthtml-sse
Install
pip install cjm_fasthtml_sse
Project Structure
nbs/
├── core.ipynb # Core SSE broadcast management system for FastHTML applications. Provides connection pooling, message distribution, and lifecycle hooks without UI dependencies.
├── dispatcher.ipynb # Event routing system with namespace support, pattern matching, and middleware pipeline. Enables decoupled event handling with priority-based execution and wildcard routing.
├── helpers.ipynb # Utility functions and decorators for common SSE patterns in FastHTML. Includes the @sse_element
├── htmx.ipynb # HTMX-specific SSE integration helpers for FastHTML. Simplifies adding SSE attributes, creating SSE-enabled elements, and managing HTMX SSE connections.
├── monitoring.ipynb # Connection monitoring and debugging tools for SSE applications. Provides configurable status indicators, automatic reconnection, and visibility change handling.
└── updater.ipynb # Flexible element update system for building out-of-band (OOB) swap elements. Register handlers by event type and compose updates without coupling to specific UI components.
Total: 6 notebooks
Module Dependencies
graph LR
core[core<br/>Core SSEBroadcastManager]
dispatcher[dispatcher<br/>SSEEventDispatcher]
helpers[helpers<br/>UI helpers & utilities]
htmx[htmx<br/>HTMXSSEConnector]
monitoring[monitoring<br/>Connection monitoring & config]
updater[updater<br/>SSEElementUpdater]
monitoring --> htmx
1 cross-module dependencies detected
CLI Reference
No CLI commands found in this project.
Module Overview
Detailed documentation for each module in the project:
Core SSEBroadcastManager (core.ipynb)
Core SSE broadcast management system for FastHTML applications. Provides connection pooling, message distribution, and lifecycle hooks without UI dependencies.
Import
from cjm_fasthtml_sse.core import (
SSEBroadcastManager
)
Classes
class SSEBroadcastManager:
def __init__(self,
max_queue_size: int = 100, # Maximum number of messages per connection queue
history_size: int = 50, # Number of broadcast messages to keep in history
default_timeout: float = 0.1 # Default timeout in seconds for queue operations
)
"""
Manages SSE connections and broadcasting without UI dependencies.
This class provides a reusable abstraction for managing Server-Sent Events
connections and broadcasting messages to multiple clients.
"""
def __init__(self,
max_queue_size: int = 100, # Maximum number of messages per connection queue
history_size: int = 50, # Number of broadcast messages to keep in history
default_timeout: float = 0.1 # Default timeout in seconds for queue operations
)
"Initialize the SSE Broadcast Manager."
async def register_connection(
self,
queue: Optional[asyncio.Queue] = None # Optional pre-existing queue, creates new one if not provided
) -> asyncio.Queue: # The queue associated with this connection
"Register a new SSE connection."
async def unregister_connection(
self,
queue: asyncio.Queue # The queue to unregister
)
"Unregister an SSE connection."
async def broadcast(self,
event_type: str, # Type of event being broadcast
data: Dict[str, Any], # Data to broadcast
timeout: Optional[float] = None # Optional timeout override for this broadcast
) -> int: # Number of successfully notified connections
"Broadcast a message to all connected clients."
def on_connect(
self,
callback: Callable # Function to call when a new connection is registered
)
"Register a callback for new connections."
def on_disconnect(
self,
callback: Callable # Function to call when a connection is unregistered
)
"Register a callback for disconnections."
def on_broadcast(
self,
callback: Callable # Function to call before broadcasting (can modify messages)
)
"Register a callback for broadcasts (can modify messages)."
def connection_count(
self
) -> int: # Number of active connections
"Get the current number of active connections."
def get_history(
self,
limit: Optional[int] = None # Optional limit on number of messages to return
) -> list[Dict[str, Any]]: # List of historical broadcast messages
"Get broadcast history."
SSEEventDispatcher (dispatcher.ipynb)
Event routing system with namespace support, pattern matching, and middleware pipeline. Enables decoupled event handling with priority-based execution and wildcard routing.
Import
from cjm_fasthtml_sse.dispatcher import (
SSEEvent,
SSEEventDispatcher
)
Classes
@dataclass
class SSEEvent:
"Represents an SSE event with metadata."
type: str
data: Dict[str, Any]
namespace: Optional[str]
priority: int = 0
timestamp: Optional[str]
def full_type(self):
"""Get the full event type including namespace."""
if self.namespace
"Get the full event type including namespace."
class SSEEventDispatcher:
def __init__(self):
"""Initialize the SSE Event Dispatcher."""
self._handlers: Dict[str, List[tuple[int, Callable]]] = {}
"""
Decoupled event routing system with namespace support,
middleware, filtering, and priority-based handling.
"""
def __init__(self):
"""Initialize the SSE Event Dispatcher."""
self._handlers: Dict[str, List[tuple[int, Callable]]] = {}
"Initialize the SSE Event Dispatcher."
def register_namespace(
self,
namespace: str # Namespace name to register for event organization
)
"Register a namespace for event organization."
def on(
self,
event_pattern: str, # Event pattern (supports wildcards: *, **)
priority: int = 0 # Handler priority (higher runs first)
)
"Decorator to register an event handler with pattern matching."
def add_handler(
self,
event_pattern: str, # Event pattern (e.g., "job:*", "**:completed")
handler: Callable, # Handler function
priority: int = 0 # Handler priority
)
"Add an event handler with pattern matching support."
def add_middleware(
self,
middleware: Callable # Function that takes (event, next) and calls next(event)
)
"Add middleware that processes events before handlers."
def add_filter(
self,
filter_func: Callable[[SSEEvent], bool] # Function that returns True to process event
)
"Add a filter to control which events are processed."
def add_transformer(
self,
transformer: Callable[[SSEEvent], SSEEvent] # Function that transforms an event
)
"Add a transformer to modify events before processing."
async def dispatch(
self,
event: Union[SSEEvent, Dict[str, Any]] # Event to dispatch (SSEEvent or dict)
) -> List[Any]: # List of handler results
"Dispatch an event through the processing pipeline."
def clear_handlers(
self,
pattern: Optional[str] = None # Specific pattern to clear, or None for all
)
"Clear handlers for a specific pattern or all handlers."
UI helpers & utilities (helpers.ipynb)
Utility functions and decorators for common SSE patterns in FastHTML. Includes the @sse_element
Import
from cjm_fasthtml_sse.helpers import (
oob_swap,
oob_element,
sse_element,
oob_update,
cleanup_sse_on_unload,
get_htmx_idx,
insert_htmx_sse_ext
)
Functions
def oob_swap(
"Add OOB swap attributes to an element."
def oob_element(
element_id: str, # ID of the target element
content: Any, # Content to swap
swap_type: str = "innerHTML" # Type of swap
)
"Create a wrapper element for OOB swap."
def sse_element(endpoint: str,
events: Optional[Union[str, List[str]]] = None, # Event name(s) to listen for from SSE stream
auto_close: bool = True, # Whether to auto-close on completion
swap_type: str = "message" # How to swap content
)
"Decorator to add SSE capabilities to any element."
def oob_update(
element_id: str, # Target element ID
content: Any, # Content to swap
swap_type: str = "innerHTML" # Type of swap (innerHTML, outerHTML, etc.)
)
"Create an out-of-band update element."
def cleanup_sse_on_unload(
) -> FT: # FastHTML element (Script) for cleanup
"Add script to cleanup SSE connections on page unload."
def get_htmx_idx(
hdrs: List # List of header elements to search
) -> int: # Index of HTMX script or -1 if not found
"Find the index of HTMX script in headers list."
def insert_htmx_sse_ext(
hdrs: List # List of header elements to modify
)
"Add HTMX SSE extension after HTMX script"
HTMXSSEConnector (htmx.ipynb)
HTMX-specific SSE integration helpers for FastHTML. Simplifies adding SSE attributes, creating SSE-enabled elements, and managing HTMX SSE connections.
Import
from cjm_fasthtml_sse.htmx import (
HTMXSSEConnector
)
Classes
class HTMXSSEConnector:
"""
Provides helper functions for setting up HTMX SSE connections
without hardcoding specific implementations.
"""
def add_sse_attrs(element,
endpoint: str, # SSE endpoint URL to connect to
events: Optional[Union[str, List[str]]] = None,
swap_type: str = "message", # How to swap content (message, innerHTML, outerHTML, etc.)
auto_reconnect: bool = True)
"Add SSE connection attributes to an element.
Args:
element: The element to add SSE attributes to
endpoint: SSE endpoint URL
events: Optional event name(s) to listen for
swap_type: How to swap content (message, innerHTML, outerHTML, etc.)
auto_reconnect: Whether to auto-reconnect on disconnect
Returns:
The element with SSE attributes added"
def create_sse_element(element_type=Div,
endpoint: str = None, # SSE endpoint URL to connect to
element_id: str = None, # Optional ID for the element
events: Optional[Union[str, List[str]]] = None,
swap_type: str = "message", # How to swap content when messages are received
hidden: bool = False, # Whether to hide the element initially
**kwargs)
"Create an element with SSE connection configured.
Args:
element_type: Type of element to create (Div, Span, etc.)
endpoint: SSE endpoint URL
element_id: Optional element ID
events: Optional event name(s) to listen for
swap_type: How to swap content
hidden: Whether to hide the element
**kwargs: Additional attributes for the element
Returns:
Element configured for SSE connection"
def sse_progress_element(job_id: str,
endpoint_template: str = "/stream_job_progress?job_id={job_id}", # URL template for the SSE endpoint
element_id_template: str = "progress-span-{job_id}", # Template for generating element ID
initial_content=None)
"Create an SSE-enabled progress element.
Args:
job_id: Job identifier
endpoint_template: Template for SSE endpoint URL
element_id_template: Template for element ID
initial_content: Initial content to display
Returns:
SSE-configured element for progress updates"
def sse_status_element(job_id: str,
endpoint_template: str = "/stream_job_status?job_id={job_id}", # URL template for the SSE endpoint
element_id_template: str = "status-span-{job_id}", # Template for generating element ID
initial_content=None)
"Create an SSE-enabled status element.
Args:
job_id: Job identifier
endpoint_template: Template for SSE endpoint URL
element_id_template: Template for element ID
initial_content: Initial content to display
Returns:
SSE-configured element for status updates"
def create_sse_monitor_script(
config: Dict[str, Any] # Configuration dictionary for monitoring setup
)
"Create a monitoring script for SSE connections.
Args:
config: Configuration dictionary with keys:
- sse_element_id: ID of SSE element to monitor
- status_element_id: ID of status display element
- auto_reconnect: Whether to auto-reconnect
- debug: Whether to enable debug logging
- status_indicators: Dict of status HTML strings
Returns:
Script element with monitoring code"
Connection monitoring & config (monitoring.ipynb)
Connection monitoring and debugging tools for SSE applications. Provides configurable status indicators, automatic reconnection, and visibility change handling.
Import
from cjm_fasthtml_sse.monitoring import (
SSEMonitorConfig,
create_sse_monitor
)
Functions
def create_sse_monitor(
htmx_sse: HTMXSSEConnector,
config: SSEMonitorConfig # SSEMonitorConfig instance
)
"Create a connection monitor with the specified configuration."
Classes
@dataclass
class SSEMonitorConfig:
"Configuration for SSE connection monitoring."
sse_element_id: str = 'sse-connection'
status_element_id: str = 'connection-status'
auto_reconnect: bool = True
reconnect_delay: int = 3000
debug: bool = False
heartbeat_timeout: int = 30000
status_indicators: Optional[Dict[str, str]]
SSEElementUpdater (updater.ipynb)
Flexible element update system for building out-of-band (OOB) swap elements. Register handlers by event type and compose updates without coupling to specific UI components.
Import
from cjm_fasthtml_sse.updater import (
SSEElementUpdater
)
Classes
class SSEElementUpdater:
def __init__(self):
"""Initialize the SSE Element Updater."""
self._handlers: Dict[str, List[Callable]] = {}
"""
Builds OOB swap elements without hardcoding UI components.
This class provides a flexible system for registering and executing
element update handlers based on event types.
"""
def __init__(self):
"""Initialize the SSE Element Updater."""
self._handlers: Dict[str, List[Callable]] = {}
"Initialize the SSE Element Updater."
def register(
self,
event_type: str, # The event type to handle
priority: int = 0 # Handler priority (higher numbers run first)
): # Decorator function
"Decorator to register an update handler for a specific event type."
def register_handler(
self,
event_type: str, # The event type to handle
handler: Callable, # The handler function
priority: int = 0 # Handler priority (higher numbers run first)
)
"Register an update handler programmatically."
def set_default_handler(
self,
handler: Callable # The default handler function
)
"Set a default handler for unregistered event types."
def add_preprocessor(
self,
processor: Callable # Function that processes (event_type, data) and returns modified data
)
"Add a preprocessor that runs before handlers."
def add_postprocessor(
self,
processor: Callable # Function that processes elements list and returns modified elements
)
"Add a postprocessor that runs after handlers."
def create_elements(
self,
event_type: str, # The type of event
data: Dict[str, Any] # Event data
) -> List[Any]: # List of elements to be sent via SSE
"Create elements for a given event type and data."
def clear_handlers(
self,
event_type: Optional[str] = None # Optional specific event type to clear
)
"Clear handlers for a specific event type or all handlers."
def get_registered_events(
self
) -> List[str]: # List of event types with registered handlers
"Get list of registered event types."
Project details
Release history Release notifications | RSS feed
Download files
Download the file for your platform. If you're not sure which to choose, learn more about installing packages.
Source Distribution
Built Distribution
Filter files by name, interpreter, ABI, and platform.
If you're not sure about the file name format, learn more about wheel file names.
Copy a direct link to the current filters
File details
Details for the file cjm_fasthtml_sse-0.0.19.tar.gz.
File metadata
- Download URL: cjm_fasthtml_sse-0.0.19.tar.gz
- Upload date:
- Size: 24.6 kB
- Tags: Source
- Uploaded using Trusted Publishing? No
- Uploaded via: twine/6.1.0 CPython/3.11.13
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
f5e5f1bef756b2a3a03e0cd49bffddbbf544ba7b3cd91a0cc5b27ba45d7deed5
|
|
| MD5 |
3bd76f0d3300cb9e5f18c3e7e992675d
|
|
| BLAKE2b-256 |
3d653ecfaa296de8ba8873ea67792ec1dad5937352bb8befd68d9e68b241702e
|
File details
Details for the file cjm_fasthtml_sse-0.0.19-py3-none-any.whl.
File metadata
- Download URL: cjm_fasthtml_sse-0.0.19-py3-none-any.whl
- Upload date:
- Size: 22.6 kB
- Tags: Python 3
- Uploaded using Trusted Publishing? No
- Uploaded via: twine/6.1.0 CPython/3.11.13
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
fa34462203e355c3892701c89bda2fa41ba3777f8dcebad2e69b94f26060f10e
|
|
| MD5 |
a864858a729c40171d462ad9ef3b0f11
|
|
| BLAKE2b-256 |
49c84abe2c90ec58e3c94195b08684b3817ec157bdd1f2f879b31f0b77df7e25
|