Skip to main content

Real-time Server-Sent Events (SSE) and HTMX integration library for FastHTML with cross-tab synchronization support.

Project description

cjm-fasthtml-sse

Install

pip install cjm_fasthtml_sse

Project Structure

nbs/
├── core.ipynb       # Core SSE broadcast management system for FastHTML applications. Provides connection pooling, message distribution, and lifecycle hooks without UI dependencies.
├── dispatcher.ipynb # Event routing system with namespace support, pattern matching, and middleware pipeline. Enables decoupled event handling with priority-based execution and wildcard routing.
├── helpers.ipynb    # Utility functions and decorators for common SSE patterns in FastHTML. Includes the @sse_element
├── htmx.ipynb       # HTMX-specific SSE integration helpers for FastHTML. Simplifies adding SSE attributes, creating SSE-enabled elements, and managing HTMX SSE connections.
├── monitoring.ipynb # Connection monitoring and debugging tools for SSE applications. Provides configurable status indicators, automatic reconnection, and visibility change handling.
├── shutdown.ipynb   # Handles graceful shutdown of SSE connections when the server exits.
└── updater.ipynb    # Flexible element update system for building out-of-band (OOB) swap elements. Register handlers by event type and compose updates without coupling to specific UI components.

Total: 7 notebooks

Module Dependencies

graph LR
    core[core<br/>Core SSEBroadcastManager]
    dispatcher[dispatcher<br/>SSEEventDispatcher]
    helpers[helpers<br/>UI helpers & utilities]
    htmx[htmx<br/>HTMXSSEConnector]
    monitoring[monitoring<br/>Connection monitoring & config]
    shutdown[shutdown<br/>SSEShutdownHandler]
    updater[updater<br/>SSEElementUpdater]

    helpers --> htmx
    monitoring --> htmx
    shutdown --> core

3 cross-module dependencies detected

CLI Reference

No CLI commands found in this project.

Module Overview

Detailed documentation for each module in the project:

Core SSEBroadcastManager (core.ipynb)

Core SSE broadcast management system for FastHTML applications. Provides connection pooling, message distribution, and lifecycle hooks without UI dependencies.

Import

from cjm_fasthtml_sse.core import (
    SSEBroadcastManager
)

Classes

class SSEBroadcastManager:
    def __init__(self, 
                 max_queue_size: int = 100,    # Maximum number of messages per connection queue
                 history_size: int = 50,    # Number of broadcast messages to keep in history
                 default_timeout: float = 0.1   # Default timeout in seconds for queue operations
                )
    """
    Manages SSE connections and broadcasting without UI dependencies.
    
    This class provides a reusable abstraction for managing Server-Sent Events
    connections and broadcasting messages to multiple clients.
    """
    
    def __init__(self,
                     max_queue_size: int = 100,    # Maximum number of messages per connection queue
                     history_size: int = 50,    # Number of broadcast messages to keep in history
                     default_timeout: float = 0.1   # Default timeout in seconds for queue operations
                    )
        "Initialize the SSE Broadcast Manager."
    
    async def register_connection(
            self,
            queue: Optional[asyncio.Queue] = None  # Optional pre-existing queue, creates new one if not provided
        ) -> asyncio.Queue:  # The queue associated with this connection
        "Register a new SSE connection."
    
    async def unregister_connection(
            self,
            queue: asyncio.Queue  # The queue to unregister
        )
        "Unregister an SSE connection."
    
    async def broadcast(self, 
                           event_type: str,   # Type of event being broadcast
                           data: Dict[str, Any], # Data to broadcast
                           timeout: Optional[float] = None # Optional timeout override for this broadcast
                           ) -> int: # Number of successfully notified connections
        "Broadcast a message to all connected clients."
    
    def on_connect(
            self,
            callback: Callable  # Function to call when a new connection is registered
        )
        "Register a callback for new connections."
    
    def on_disconnect(
            self,
            callback: Callable  # Function to call when a connection is unregistered
        )
        "Register a callback for disconnections."
    
    def on_broadcast(
            self,
            callback: Callable  # Function to call before broadcasting (can modify messages)
        )
        "Register a callback for broadcasts (can modify messages)."
    
    def connection_count(
            self
        ) -> int:  # Number of active connections
        "Get the current number of active connections."
    
    def get_history(
            self,
            limit: Optional[int] = None  # Optional limit on number of messages to return
        ) -> list[Dict[str, Any]]:  # List of historical broadcast messages
        "Get broadcast history."

SSEEventDispatcher (dispatcher.ipynb)

Event routing system with namespace support, pattern matching, and middleware pipeline. Enables decoupled event handling with priority-based execution and wildcard routing.

Import

from cjm_fasthtml_sse.dispatcher import (
    SSEEvent,
    SSEEventDispatcher
)

Classes

@dataclass
class SSEEvent:
    "Represents an SSE event with metadata."
    
    type: str
    data: Dict[str, Any]
    namespace: Optional[str]
    priority: int = 0
    timestamp: Optional[str]
    
    def full_type(self):
            """Get the full event type including namespace."""
            if self.namespace
        "Get the full event type including namespace."
class SSEEventDispatcher:
    def __init__(self):
        """Initialize the SSE Event Dispatcher."""
        self._handlers: Dict[str, List[tuple[int, Callable]]] = {}
    """
    Decoupled event routing system with namespace support,
    middleware, filtering, and priority-based handling.
    """
    
    def __init__(self):
            """Initialize the SSE Event Dispatcher."""
            self._handlers: Dict[str, List[tuple[int, Callable]]] = {}
        "Initialize the SSE Event Dispatcher."
    
    def register_namespace(
            self,
            namespace: str  # Namespace name to register for event organization
        )
        "Register a namespace for event organization."
    
    def on(
            self,
            event_pattern: str,  # Event pattern (supports wildcards: *, **)
            priority: int = 0  # Handler priority (higher runs first)
        )
        "Decorator to register an event handler with pattern matching."
    
    def add_handler(
            self,
            event_pattern: str,  # Event pattern (e.g., "job:*", "**:completed")
            handler: Callable,  # Handler function
            priority: int = 0  # Handler priority
        )
        "Add an event handler with pattern matching support."
    
    def add_middleware(
            self,
            middleware: Callable  # Function that takes (event, next) and calls next(event)
        )
        "Add middleware that processes events before handlers."
    
    def add_filter(
            self,
            filter_func: Callable[[SSEEvent], bool]  # Function that returns True to process event
        )
        "Add a filter to control which events are processed."
    
    def add_transformer(
            self,
            transformer: Callable[[SSEEvent], SSEEvent]  # Function that transforms an event
        )
        "Add a transformer to modify events before processing."
    
    async def dispatch(
            self,
            event: Union[SSEEvent, Dict[str, Any]]  # Event to dispatch (SSEEvent or dict)
        ) -> List[Any]:  # List of handler results
        "Dispatch an event through the processing pipeline."
    
    def clear_handlers(
            self,
            pattern: Optional[str] = None  # Specific pattern to clear, or None for all
        )
        "Clear handlers for a specific pattern or all handlers."

UI helpers & utilities (helpers.ipynb)

Utility functions and decorators for common SSE patterns in FastHTML. Includes the @sse_element

Import

from cjm_fasthtml_sse.helpers import (
    oob_swap,
    oob_element,
    sse_element,
    oob_update,
    cleanup_sse_on_unload,
    get_htmx_idx,
    insert_htmx_sse_ext
)

Functions

def oob_swap(
    "Add OOB swap attributes to an element."
def oob_element(
    element_id: str,  # ID of the target element
    content: Any,    # Content to swap
    swap_type: str = "innerHTML"  # Type of swap
)
    "Create a wrapper element for OOB swap."
def sse_element(
    htmx_sse: HTMXSSEConnector,
    endpoint: str, 
    events: Optional[Union[str, List[str]]] = None, # Event name(s) to listen for from SSE stream
    auto_close: bool = True,  # Whether to auto-close on completion
    swap_type: str = "message" # How to swap content
)
    "Decorator to add SSE capabilities to any element."
def oob_update(
    element_id: str,  # Target element ID
    content: Any,  # Content to swap
    swap_type: str = "innerHTML"  # Type of swap (innerHTML, outerHTML, etc.)
)
    "Create an out-of-band update element."
def cleanup_sse_on_unload(
) -> FT:  # FastHTML element (Script) for cleanup
    "Add script to cleanup SSE connections on page unload."
def get_htmx_idx(
    hdrs: List  # List of header elements to search
) -> int:  # Index of HTMX script or -1 if not found
    "Find the index of HTMX script in headers list."
def insert_htmx_sse_ext(
    hdrs: List  # List of header elements to modify
)
    "Add HTMX SSE extension after HTMX script"

HTMXSSEConnector (htmx.ipynb)

HTMX-specific SSE integration helpers for FastHTML. Simplifies adding SSE attributes, creating SSE-enabled elements, and managing HTMX SSE connections.

Import

from cjm_fasthtml_sse.htmx import (
    HTMXSSEConnector
)

Classes

class HTMXSSEConnector:
    """
    Provides helper functions for setting up HTMX SSE connections
    without hardcoding specific implementations.
    """
    
    def add_sse_attrs(element,
                          endpoint: str,  # SSE endpoint URL to connect to
                          events: Optional[Union[str, List[str]]] = None,
                          swap_type: str = "message",  # How to swap content (message, innerHTML, outerHTML, etc.)
                          auto_reconnect: bool = True)
        "Add SSE connection attributes to an element.

Args:
    element: The element to add SSE attributes to
    endpoint: SSE endpoint URL
    events: Optional event name(s) to listen for
    swap_type: How to swap content (message, innerHTML, outerHTML, etc.)
    auto_reconnect: Whether to auto-reconnect on disconnect
    
Returns:
    The element with SSE attributes added"
    
    def create_sse_element(element_type=Div,
                              endpoint: str = None,  # SSE endpoint URL to connect to
                              element_id: str = None,  # Optional ID for the element
                              events: Optional[Union[str, List[str]]] = None,
                              swap_type: str = "message",  # How to swap content when messages are received
                              hidden: bool = False,  # Whether to hide the element initially
                              **kwargs)
        "Create an element with SSE connection configured.

Args:
    element_type: Type of element to create (Div, Span, etc.)
    endpoint: SSE endpoint URL
    element_id: Optional element ID
    events: Optional event name(s) to listen for
    swap_type: How to swap content
    hidden: Whether to hide the element
    **kwargs: Additional attributes for the element
    
Returns:
    Element configured for SSE connection"
    
    def sse_progress_element(job_id: str,
                                endpoint_template: str = "/stream_job_progress?job_id={job_id}",  # URL template for the SSE endpoint
                                element_id_template: str = "progress-span-{job_id}",  # Template for generating element ID
                                initial_content=None)
        "Create an SSE-enabled progress element.

Args:
    job_id: Job identifier
    endpoint_template: Template for SSE endpoint URL
    element_id_template: Template for element ID
    initial_content: Initial content to display
    
Returns:
    SSE-configured element for progress updates"
    
    def sse_status_element(job_id: str,
                              endpoint_template: str = "/stream_job_status?job_id={job_id}",  # URL template for the SSE endpoint
                              element_id_template: str = "status-span-{job_id}",  # Template for generating element ID
                              initial_content=None)
        "Create an SSE-enabled status element.

Args:
    job_id: Job identifier
    endpoint_template: Template for SSE endpoint URL
    element_id_template: Template for element ID
    initial_content: Initial content to display
    
Returns:
    SSE-configured element for status updates"
    
    def create_sse_monitor_script(
            config: Dict[str, Any]  # Configuration dictionary for monitoring setup
        )
        "Create a monitoring script for SSE connections.

Args:
    config: Configuration dictionary with keys:
        - sse_element_id: ID of SSE element to monitor
        - status_element_id: ID of status display element
        - auto_reconnect: Whether to auto-reconnect
        - debug: Whether to enable debug logging
        - status_indicators: Dict of status HTML strings
        
Returns:
    Script element with monitoring code"

Connection monitoring & config (monitoring.ipynb)

Connection monitoring and debugging tools for SSE applications. Provides configurable status indicators, automatic reconnection, and visibility change handling.

Import

from cjm_fasthtml_sse.monitoring import (
    SSEMonitorConfig,
    create_sse_monitor
)

Functions

def create_sse_monitor(
    htmx_sse: HTMXSSEConnector,
    config: SSEMonitorConfig  # SSEMonitorConfig instance
)
    "Create a connection monitor with the specified configuration."

Classes

@dataclass
class SSEMonitorConfig:
    "Configuration for SSE connection monitoring."
    
    sse_element_id: str = 'sse-connection'
    status_element_id: str = 'connection-status'
    auto_reconnect: bool = True
    reconnect_delay: int = 3000
    debug: bool = False
    heartbeat_timeout: int = 30000
    status_indicators: Optional[Dict[str, str]]

SSEShutdownHandler (shutdown.ipynb)

Handles graceful shutdown of SSE connections when the server exits.

Import

from cjm_fasthtml_sse.shutdown import (
    SSEShutdownHandler
)

Classes

class SSEShutdownHandler:
    def __init__(self, sse_manager, shutdown_delay: float = 1.0):
      self.sse_manager = sse_manager
      self.shutdown_delay = shutdown_delay
      self.should_exit = False
      self.active_connections: Set = set()
    """
    Handles graceful shutdown of SSE connections when the server exits, 
    by ensuring all SSE clients receive a shutdown notification before 
    the server terminates, allowing them to cleanly close connections 
    without attempting reconnection.
    """
    
    def __init__(self, sse_manager, shutdown_delay: float = 1.0):
          self.sse_manager = sse_manager
          self.shutdown_delay = shutdown_delay
          self.should_exit = False
          self.active_connections: Set = set()
    
    def install(self):
          """Install the shutdown handler into Uvicorn's Server class."""
          self._original_handler = Server.handle_exit
          Server.handle_exit = self._handle_exit
    
      def uninstall(self)
        "Install the shutdown handler into Uvicorn's Server class."
    
    def uninstall(self):
          """Restore the original Uvicorn handler."""
          if self._original_handler
        "Restore the original Uvicorn handler."
    
    def track_connection(self, task):
          """Track an active connection task."""
          self.active_connections.add(task)
    
      def untrack_connection(self, task)
        "Track an active connection task."
    
    def untrack_connection(self, task)
        "Remove a connection task from tracking."

SSEElementUpdater (updater.ipynb)

Flexible element update system for building out-of-band (OOB) swap elements. Register handlers by event type and compose updates without coupling to specific UI components.

Import

from cjm_fasthtml_sse.updater import (
    SSEElementUpdater
)

Classes

class SSEElementUpdater:
    def __init__(self):
        """Initialize the SSE Element Updater."""
        self._handlers: Dict[str, List[Callable]] = {}
    """
    Builds OOB swap elements without hardcoding UI components.
    This class provides a flexible system for registering and executing
    element update handlers based on event types.
    """
    
    def __init__(self):
            """Initialize the SSE Element Updater."""
            self._handlers: Dict[str, List[Callable]] = {}
        "Initialize the SSE Element Updater."
    
    def register(
            self,
            event_type: str,  # The event type to handle
            priority: int = 0  # Handler priority (higher numbers run first)
        ): # Decorator function
        "Decorator to register an update handler for a specific event type."
    
    def register_handler(
            self,
            event_type: str,  # The event type to handle
            handler: Callable,  # The handler function
            priority: int = 0  # Handler priority (higher numbers run first)
        )
        "Register an update handler programmatically."
    
    def set_default_handler(
            self,
            handler: Callable  # The default handler function
        )
        "Set a default handler for unregistered event types."
    
    def add_preprocessor(
            self,
            processor: Callable  # Function that processes (event_type, data) and returns modified data
        )
        "Add a preprocessor that runs before handlers."
    
    def add_postprocessor(
            self,
            processor: Callable  # Function that processes elements list and returns modified elements
        )
        "Add a postprocessor that runs after handlers."
    
    def create_elements(
            self,
            event_type: str,  # The type of event
            data: Dict[str, Any]  # Event data
        ) -> List[Any]:  # List of elements to be sent via SSE
        "Create elements for a given event type and data."
    
    def clear_handlers(
            self,
            event_type: Optional[str] = None  # Optional specific event type to clear
        )
        "Clear handlers for a specific event type or all handlers."
    
    def get_registered_events(
            self
        ) -> List[str]:  # List of event types with registered handlers
        "Get list of registered event types."

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

cjm_fasthtml_sse-0.0.21.tar.gz (26.6 kB view details)

Uploaded Source

Built Distribution

If you're not sure about the file name format, learn more about wheel file names.

cjm_fasthtml_sse-0.0.21-py3-none-any.whl (24.5 kB view details)

Uploaded Python 3

File details

Details for the file cjm_fasthtml_sse-0.0.21.tar.gz.

File metadata

  • Download URL: cjm_fasthtml_sse-0.0.21.tar.gz
  • Upload date:
  • Size: 26.6 kB
  • Tags: Source
  • Uploaded using Trusted Publishing? No
  • Uploaded via: twine/6.1.0 CPython/3.11.13

File hashes

Hashes for cjm_fasthtml_sse-0.0.21.tar.gz
Algorithm Hash digest
SHA256 c20ff0e7d2ad64f289132852d476a556308e3e7e5950b34f0d2c1e4a0a951dd3
MD5 f0667c9fea66cc6438eb1da0b14cacbd
BLAKE2b-256 73c51d70c3a990508694646f3c5b991b70f22abb12a4cf1af19a797a2830daa4

See more details on using hashes here.

File details

Details for the file cjm_fasthtml_sse-0.0.21-py3-none-any.whl.

File metadata

File hashes

Hashes for cjm_fasthtml_sse-0.0.21-py3-none-any.whl
Algorithm Hash digest
SHA256 167e0e024e66f5a0046379bb6025e1d1be040f9e32a46dc5bd10f61bb9bfdf25
MD5 e587d40f93e8303181e49109b8e0f88d
BLAKE2b-256 06b2e333fd1a632a1199cf59cd241953c276a4b624d52526d7f9011ae6bd89e0

See more details on using hashes here.

Supported by

AWS Cloud computing and Security Sponsor Datadog Monitoring Depot Continuous Integration Fastly CDN Google Download Analytics Pingdom Monitoring Sentry Error logging StatusPage Status page