Skip to main content

Real-time Server-Sent Events (SSE) and HTMX integration library for FastHTML with cross-tab synchronization support.

Project description

cjm-fasthtml-sse

Install

pip install cjm_fasthtml_sse

Project Structure

nbs/
├── core.ipynb       # Core SSE broadcast management system for FastHTML applications. Provides connection pooling, message distribution, and lifecycle hooks without UI dependencies.
├── dispatcher.ipynb # Event routing system with namespace support, pattern matching, and middleware pipeline. Enables decoupled event handling with priority-based execution and wildcard routing.
├── helpers.ipynb    # Utility functions and decorators for common SSE patterns in FastHTML. Includes the @sse_element
├── htmx.ipynb       # HTMX-specific SSE integration helpers for FastHTML. Simplifies adding SSE attributes, creating SSE-enabled elements, and managing HTMX SSE connections.
├── monitoring.ipynb # Connection monitoring and debugging tools for SSE applications. Provides configurable status indicators, automatic reconnection, and visibility change handling.
└── updater.ipynb    # Flexible element update system for building out-of-band (OOB) swap elements. Register handlers by event type and compose updates without coupling to specific UI components.

Total: 6 notebooks

Module Dependencies

graph LR
    core[core<br/>Core SSEBroadcastManager]
    dispatcher[dispatcher<br/>SSEEventDispatcher]
    helpers[helpers<br/>UI helpers & utilities]
    htmx[htmx<br/>HTMXSSEConnector]
    monitoring[monitoring<br/>Connection monitoring & config]
    updater[updater<br/>SSEElementUpdater]

    helpers --> htmx
    monitoring --> htmx

2 cross-module dependencies detected

CLI Reference

No CLI commands found in this project.

Module Overview

Detailed documentation for each module in the project:

Core SSEBroadcastManager (core.ipynb)

Core SSE broadcast management system for FastHTML applications. Provides connection pooling, message distribution, and lifecycle hooks without UI dependencies.

Import

from cjm_fasthtml_sse.core import (
    SSEBroadcastManager
)

Classes

class SSEBroadcastManager:
    def __init__(self, 
                 max_queue_size: int = 100,    # Maximum number of messages per connection queue
                 history_size: int = 50,    # Number of broadcast messages to keep in history
                 default_timeout: float = 0.1   # Default timeout in seconds for queue operations
                )
    """
    Manages SSE connections and broadcasting without UI dependencies.
    
    This class provides a reusable abstraction for managing Server-Sent Events
    connections and broadcasting messages to multiple clients.
    """
    
    def __init__(self,
                     max_queue_size: int = 100,    # Maximum number of messages per connection queue
                     history_size: int = 50,    # Number of broadcast messages to keep in history
                     default_timeout: float = 0.1   # Default timeout in seconds for queue operations
                    )
        "Initialize the SSE Broadcast Manager."
    
    async def register_connection(
            self,
            queue: Optional[asyncio.Queue] = None  # Optional pre-existing queue, creates new one if not provided
        ) -> asyncio.Queue:  # The queue associated with this connection
        "Register a new SSE connection."
    
    async def unregister_connection(
            self,
            queue: asyncio.Queue  # The queue to unregister
        )
        "Unregister an SSE connection."
    
    async def broadcast(self, 
                           event_type: str,   # Type of event being broadcast
                           data: Dict[str, Any], # Data to broadcast
                           timeout: Optional[float] = None # Optional timeout override for this broadcast
                           ) -> int: # Number of successfully notified connections
        "Broadcast a message to all connected clients."
    
    def on_connect(
            self,
            callback: Callable  # Function to call when a new connection is registered
        )
        "Register a callback for new connections."
    
    def on_disconnect(
            self,
            callback: Callable  # Function to call when a connection is unregistered
        )
        "Register a callback for disconnections."
    
    def on_broadcast(
            self,
            callback: Callable  # Function to call before broadcasting (can modify messages)
        )
        "Register a callback for broadcasts (can modify messages)."
    
    def connection_count(
            self
        ) -> int:  # Number of active connections
        "Get the current number of active connections."
    
    def get_history(
            self,
            limit: Optional[int] = None  # Optional limit on number of messages to return
        ) -> list[Dict[str, Any]]:  # List of historical broadcast messages
        "Get broadcast history."

SSEEventDispatcher (dispatcher.ipynb)

Event routing system with namespace support, pattern matching, and middleware pipeline. Enables decoupled event handling with priority-based execution and wildcard routing.

Import

from cjm_fasthtml_sse.dispatcher import (
    SSEEvent,
    SSEEventDispatcher
)

Classes

@dataclass
class SSEEvent:
    "Represents an SSE event with metadata."
    
    type: str
    data: Dict[str, Any]
    namespace: Optional[str]
    priority: int = 0
    timestamp: Optional[str]
    
    def full_type(self):
            """Get the full event type including namespace."""
            if self.namespace
        "Get the full event type including namespace."
class SSEEventDispatcher:
    def __init__(self):
        """Initialize the SSE Event Dispatcher."""
        self._handlers: Dict[str, List[tuple[int, Callable]]] = {}
    """
    Decoupled event routing system with namespace support,
    middleware, filtering, and priority-based handling.
    """
    
    def __init__(self):
            """Initialize the SSE Event Dispatcher."""
            self._handlers: Dict[str, List[tuple[int, Callable]]] = {}
        "Initialize the SSE Event Dispatcher."
    
    def register_namespace(
            self,
            namespace: str  # Namespace name to register for event organization
        )
        "Register a namespace for event organization."
    
    def on(
            self,
            event_pattern: str,  # Event pattern (supports wildcards: *, **)
            priority: int = 0  # Handler priority (higher runs first)
        )
        "Decorator to register an event handler with pattern matching."
    
    def add_handler(
            self,
            event_pattern: str,  # Event pattern (e.g., "job:*", "**:completed")
            handler: Callable,  # Handler function
            priority: int = 0  # Handler priority
        )
        "Add an event handler with pattern matching support."
    
    def add_middleware(
            self,
            middleware: Callable  # Function that takes (event, next) and calls next(event)
        )
        "Add middleware that processes events before handlers."
    
    def add_filter(
            self,
            filter_func: Callable[[SSEEvent], bool]  # Function that returns True to process event
        )
        "Add a filter to control which events are processed."
    
    def add_transformer(
            self,
            transformer: Callable[[SSEEvent], SSEEvent]  # Function that transforms an event
        )
        "Add a transformer to modify events before processing."
    
    async def dispatch(
            self,
            event: Union[SSEEvent, Dict[str, Any]]  # Event to dispatch (SSEEvent or dict)
        ) -> List[Any]:  # List of handler results
        "Dispatch an event through the processing pipeline."
    
    def clear_handlers(
            self,
            pattern: Optional[str] = None  # Specific pattern to clear, or None for all
        )
        "Clear handlers for a specific pattern or all handlers."

UI helpers & utilities (helpers.ipynb)

Utility functions and decorators for common SSE patterns in FastHTML. Includes the @sse_element

Import

from cjm_fasthtml_sse.helpers import (
    oob_swap,
    oob_element,
    sse_element,
    oob_update,
    cleanup_sse_on_unload,
    get_htmx_idx,
    insert_htmx_sse_ext
)

Functions

def oob_swap(
    "Add OOB swap attributes to an element."
def oob_element(
    element_id: str,  # ID of the target element
    content: Any,    # Content to swap
    swap_type: str = "innerHTML"  # Type of swap
)
    "Create a wrapper element for OOB swap."
def sse_element(
    htmx_sse: HTMXSSEConnector,
    endpoint: str, 
    events: Optional[Union[str, List[str]]] = None, # Event name(s) to listen for from SSE stream
    auto_close: bool = True,  # Whether to auto-close on completion
    swap_type: str = "message" # How to swap content
)
    "Decorator to add SSE capabilities to any element."
def oob_update(
    element_id: str,  # Target element ID
    content: Any,  # Content to swap
    swap_type: str = "innerHTML"  # Type of swap (innerHTML, outerHTML, etc.)
)
    "Create an out-of-band update element."
def cleanup_sse_on_unload(
) -> FT:  # FastHTML element (Script) for cleanup
    "Add script to cleanup SSE connections on page unload."
def get_htmx_idx(
    hdrs: List  # List of header elements to search
) -> int:  # Index of HTMX script or -1 if not found
    "Find the index of HTMX script in headers list."
def insert_htmx_sse_ext(
    hdrs: List  # List of header elements to modify
)
    "Add HTMX SSE extension after HTMX script"

HTMXSSEConnector (htmx.ipynb)

HTMX-specific SSE integration helpers for FastHTML. Simplifies adding SSE attributes, creating SSE-enabled elements, and managing HTMX SSE connections.

Import

from cjm_fasthtml_sse.htmx import (
    HTMXSSEConnector
)

Classes

class HTMXSSEConnector:
    """
    Provides helper functions for setting up HTMX SSE connections
    without hardcoding specific implementations.
    """
    
    def add_sse_attrs(element,
                          endpoint: str,  # SSE endpoint URL to connect to
                          events: Optional[Union[str, List[str]]] = None,
                          swap_type: str = "message",  # How to swap content (message, innerHTML, outerHTML, etc.)
                          auto_reconnect: bool = True)
        "Add SSE connection attributes to an element.

Args:
    element: The element to add SSE attributes to
    endpoint: SSE endpoint URL
    events: Optional event name(s) to listen for
    swap_type: How to swap content (message, innerHTML, outerHTML, etc.)
    auto_reconnect: Whether to auto-reconnect on disconnect
    
Returns:
    The element with SSE attributes added"
    
    def create_sse_element(element_type=Div,
                              endpoint: str = None,  # SSE endpoint URL to connect to
                              element_id: str = None,  # Optional ID for the element
                              events: Optional[Union[str, List[str]]] = None,
                              swap_type: str = "message",  # How to swap content when messages are received
                              hidden: bool = False,  # Whether to hide the element initially
                              **kwargs)
        "Create an element with SSE connection configured.

Args:
    element_type: Type of element to create (Div, Span, etc.)
    endpoint: SSE endpoint URL
    element_id: Optional element ID
    events: Optional event name(s) to listen for
    swap_type: How to swap content
    hidden: Whether to hide the element
    **kwargs: Additional attributes for the element
    
Returns:
    Element configured for SSE connection"
    
    def sse_progress_element(job_id: str,
                                endpoint_template: str = "/stream_job_progress?job_id={job_id}",  # URL template for the SSE endpoint
                                element_id_template: str = "progress-span-{job_id}",  # Template for generating element ID
                                initial_content=None)
        "Create an SSE-enabled progress element.

Args:
    job_id: Job identifier
    endpoint_template: Template for SSE endpoint URL
    element_id_template: Template for element ID
    initial_content: Initial content to display
    
Returns:
    SSE-configured element for progress updates"
    
    def sse_status_element(job_id: str,
                              endpoint_template: str = "/stream_job_status?job_id={job_id}",  # URL template for the SSE endpoint
                              element_id_template: str = "status-span-{job_id}",  # Template for generating element ID
                              initial_content=None)
        "Create an SSE-enabled status element.

Args:
    job_id: Job identifier
    endpoint_template: Template for SSE endpoint URL
    element_id_template: Template for element ID
    initial_content: Initial content to display
    
Returns:
    SSE-configured element for status updates"
    
    def create_sse_monitor_script(
            config: Dict[str, Any]  # Configuration dictionary for monitoring setup
        )
        "Create a monitoring script for SSE connections.

Args:
    config: Configuration dictionary with keys:
        - sse_element_id: ID of SSE element to monitor
        - status_element_id: ID of status display element
        - auto_reconnect: Whether to auto-reconnect
        - debug: Whether to enable debug logging
        - status_indicators: Dict of status HTML strings
        
Returns:
    Script element with monitoring code"

Connection monitoring & config (monitoring.ipynb)

Connection monitoring and debugging tools for SSE applications. Provides configurable status indicators, automatic reconnection, and visibility change handling.

Import

from cjm_fasthtml_sse.monitoring import (
    SSEMonitorConfig,
    create_sse_monitor
)

Functions

def create_sse_monitor(
    htmx_sse: HTMXSSEConnector,
    config: SSEMonitorConfig  # SSEMonitorConfig instance
)
    "Create a connection monitor with the specified configuration."

Classes

@dataclass
class SSEMonitorConfig:
    "Configuration for SSE connection monitoring."
    
    sse_element_id: str = 'sse-connection'
    status_element_id: str = 'connection-status'
    auto_reconnect: bool = True
    reconnect_delay: int = 3000
    debug: bool = False
    heartbeat_timeout: int = 30000
    status_indicators: Optional[Dict[str, str]]

SSEElementUpdater (updater.ipynb)

Flexible element update system for building out-of-band (OOB) swap elements. Register handlers by event type and compose updates without coupling to specific UI components.

Import

from cjm_fasthtml_sse.updater import (
    SSEElementUpdater
)

Classes

class SSEElementUpdater:
    def __init__(self):
        """Initialize the SSE Element Updater."""
        self._handlers: Dict[str, List[Callable]] = {}
    """
    Builds OOB swap elements without hardcoding UI components.
    This class provides a flexible system for registering and executing
    element update handlers based on event types.
    """
    
    def __init__(self):
            """Initialize the SSE Element Updater."""
            self._handlers: Dict[str, List[Callable]] = {}
        "Initialize the SSE Element Updater."
    
    def register(
            self,
            event_type: str,  # The event type to handle
            priority: int = 0  # Handler priority (higher numbers run first)
        ): # Decorator function
        "Decorator to register an update handler for a specific event type."
    
    def register_handler(
            self,
            event_type: str,  # The event type to handle
            handler: Callable,  # The handler function
            priority: int = 0  # Handler priority (higher numbers run first)
        )
        "Register an update handler programmatically."
    
    def set_default_handler(
            self,
            handler: Callable  # The default handler function
        )
        "Set a default handler for unregistered event types."
    
    def add_preprocessor(
            self,
            processor: Callable  # Function that processes (event_type, data) and returns modified data
        )
        "Add a preprocessor that runs before handlers."
    
    def add_postprocessor(
            self,
            processor: Callable  # Function that processes elements list and returns modified elements
        )
        "Add a postprocessor that runs after handlers."
    
    def create_elements(
            self,
            event_type: str,  # The type of event
            data: Dict[str, Any]  # Event data
        ) -> List[Any]:  # List of elements to be sent via SSE
        "Create elements for a given event type and data."
    
    def clear_handlers(
            self,
            event_type: Optional[str] = None  # Optional specific event type to clear
        )
        "Clear handlers for a specific event type or all handlers."
    
    def get_registered_events(
            self
        ) -> List[str]:  # List of event types with registered handlers
        "Get list of registered event types."

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

cjm_fasthtml_sse-0.0.22.tar.gz (24.5 kB view details)

Uploaded Source

Built Distribution

If you're not sure about the file name format, learn more about wheel file names.

cjm_fasthtml_sse-0.0.22-py3-none-any.whl (22.6 kB view details)

Uploaded Python 3

File details

Details for the file cjm_fasthtml_sse-0.0.22.tar.gz.

File metadata

  • Download URL: cjm_fasthtml_sse-0.0.22.tar.gz
  • Upload date:
  • Size: 24.5 kB
  • Tags: Source
  • Uploaded using Trusted Publishing? No
  • Uploaded via: twine/6.1.0 CPython/3.11.13

File hashes

Hashes for cjm_fasthtml_sse-0.0.22.tar.gz
Algorithm Hash digest
SHA256 29ce47a64821a76ea688447804694a9ef8e03334b73ae1029f6d5af196cf68be
MD5 e087d39be28ae1f142edabc5927f7089
BLAKE2b-256 729dd35ce7e44a69a250798d3734928f19d0f0eccd54e07b51c8d18c7fdf54e2

See more details on using hashes here.

File details

Details for the file cjm_fasthtml_sse-0.0.22-py3-none-any.whl.

File metadata

File hashes

Hashes for cjm_fasthtml_sse-0.0.22-py3-none-any.whl
Algorithm Hash digest
SHA256 623f42fe305454b28d2733885631e8ceeafb010eefbc9b4be6ca886be8f4de3b
MD5 4919be6473b30c3c6c9086b33ce8b2a6
BLAKE2b-256 6a4148654b3461829b09d75dc8f46b7a691a117d99f0f5f3530d481d7efb0269

See more details on using hashes here.

Supported by

AWS Cloud computing and Security Sponsor Datadog Monitoring Depot Continuous Integration Fastly CDN Google Download Analytics Pingdom Monitoring Sentry Error logging StatusPage Status page