Skip to main content

Real-time Server-Sent Events (SSE) and HTMX integration library for FastHTML with cross-tab synchronization support.

Project description

cjm-fasthtml-sse

Install

pip install cjm_fasthtml_sse

Project Structure

nbs/
├── components/ (2)
│   ├── builders.ipynb  # Generic builders for creating SSE-enabled elements with HTMX
│   └── monitors.ipynb  # Client-side SSE connection monitoring and management utilities
├── core/ (3)
│   ├── broadcast.ipynb   # SSE broadcast infrastructure for managing connections and broadcasting updates to multiple clients
│   ├── decorators.ipynb  # Route decorators and utilities for FastHTML SSE endpoints
│   └── streaming.ipynb   # SSE streaming utilities for creating and managing Server-Sent Events streams
└── integrations/ (1)
    └── fasthtml.ipynb  # High-level FastHTML integration for SSE with HTMX

Total: 6 notebooks across 3 directories

Module Dependencies

graph LR
    components_builders[components.builders<br/>builders]
    components_monitors[components.monitors<br/>monitors]
    core_broadcast[core.broadcast<br/>broadcast]
    core_decorators[core.decorators<br/>decorators]
    core_streaming[core.streaming<br/>streaming]
    integrations_fasthtml[integrations.fasthtml<br/>fasthtml]

    core_decorators --> core_streaming
    core_decorators --> core_broadcast
    core_streaming --> core_broadcast
    integrations_fasthtml --> components_builders
    integrations_fasthtml --> core_streaming
    integrations_fasthtml --> core_decorators
    integrations_fasthtml --> core_broadcast
    integrations_fasthtml --> components_monitors

8 cross-module dependencies detected

CLI Reference

No CLI commands found in this project.

Module Overview

Detailed documentation for each module in the project:

broadcast (broadcast.ipynb)

SSE broadcast infrastructure for managing connections and broadcasting updates to multiple clients

Import

from cjm_fasthtml_sse.core.broadcast import (
    BroadcastEventType,
    BroadcastMessage,
    SSEBroadcastManager,
    SSEConnection
)

Classes

class BroadcastEventType(Enum):
    "Standard broadcast event types"
@dataclass
class BroadcastMessage:
    "Structured broadcast message"
    
    type: str
    data: Dict[str, Any]
    timestamp: str = field(...)
    target_ids: Optional[List[str]]  # Specific element IDs to update
    metadata: Optional[Dict[str, Any]]
    
    def to_dict(
            self
        ) -> Dict[str, Any]:  # Dictionary representation of the message
        "Convert to dictionary for JSON serialization"
    
    def to_json(
            self
        ) -> str:  # JSON string representation of the message
        "Convert to JSON string"
class SSEBroadcastManager:
    def __init__(self,
    "Manages SSE connections and broadcasting across multiple clients. This class provides a centralized way to manage Server-Sent Events connections and broadcast messages to all connected clients, enabling real-time updates and cross-tab synchronization."
    
    def __init__(self,
        "Initialize the SSE Broadcast Manager."
    
    async def register_connection(self, 
                                      queue: Optional[asyncio.Queue] = None,  # Optional pre-existing queue (creates new if None)
                                      metadata: Optional[Dict[str, Any]] = None # Optional metadata for this connection
                                     ) -> asyncio.Queue: # The queue for this connection
        "Register a new client connection."
    
    async def unregister_connection(
            self,
            queue: asyncio.Queue  # The queue to unregister
        )
        "Remove a client connection."
    
    async def broadcast(self, 
                            event_type: str,   # Type of event to broadcast
                            data: Dict[str, Any], # Data to broadcast
                            target_ids: Optional[List[str]] = None,  # Optional list of element IDs to target for updates
                            metadata: Optional[Dict[str, Any]] = None # Optional metadata for the message
                           ) -> int: # Number of clients successfully notified
        "Broadcast a message to all connected clients."
    
    async def run_message_handlers(
            self,
            message: BroadcastMessage  # Message to process
        ) -> BroadcastMessage:  # Potentially modified message
        "Run message through registered handlers."
    
    async def broadcast_message(
            self,
            message: BroadcastMessage  # Message to broadcast
        ) -> int:  # Number of clients successfully notified
        "Broadcast a pre-constructed message to all connected clients."
    
    def add_message_handler(self,
                                handler: Callable[[BroadcastMessage], Awaitable[Optional[BroadcastMessage]]] # Async function that takes and optionally returns a BroadcastMessage
                               )
        "Add a message handler that can transform messages before broadcasting. Handlers are called in the order they are added and can modify messages or return None to pass through unchanged."
    
    def get_connection_count(
            self
        ) -> int:  # Current number of active connections
        "Get the current number of active connections"
    
    def get_history(
            self,
            limit: Optional[int] = None  # Maximum number of messages to return (None for all)
        ) -> List[BroadcastMessage]:  # List of historical messages
        "Get message history."
    
    async def clear_history(self):
            """Clear the message history"""
            self.history.clear()
            self._log("Message history cleared")
        
        async def close_all_connections(self)
        "Clear the message history"
    
    async def close_all_connections(self):
            """
            Close all active connections. Useful for cleanup during shutdown.
            """
            async with self.lock
        "Close all active connections. Useful for cleanup during shutdown."
class SSEConnection:
    def __init__(self, 
                 manager: SSEBroadcastManager,  # The SSEBroadcastManager to register with
                 metadata: Optional[Dict[str, Any]] = None # Optional metadata for this connection
                )
    "Context manager for SSE connections. Automatically registers/unregisters connections with the broadcast manager."
    
    def __init__(self,
                     manager: SSEBroadcastManager,  # The SSEBroadcastManager to register with
                     metadata: Optional[Dict[str, Any]] = None # Optional metadata for this connection
                    )
        "Initialize SSE connection context."

builders (builders.ipynb)

Generic builders for creating SSE-enabled elements with HTMX

Import

from cjm_fasthtml_sse.components.builders import (
    add_sse_attrs,
    add_oob_swap,
    SSEConfig,
    OOBUpdate,
    MultiUpdateBuilder,
    generate_sse_cleanup_script,
    get_htmx_sse_extension_url,
    create_sse_attrs,
    create_oob_attrs
)

Functions

def add_sse_attrs(
    element: Any,  # Any element with an attrs dictionary
    endpoint: str,  # SSE endpoint URL
    event_name: str = "message",  # SSE event to listen for
    swap_mode: Optional[str] = None,  # Optional HTMX swap mode **extra_attrs: Additional attributes to add
    **extra_attrs
) -> Any:  # The modified element
    "Add SSE attributes to any element. This function modifies an element in-place by adding the necessary HTMX SSE attributes. It's framework-agnostic and works with any element that has an attrs dictionary."
def add_oob_swap(
    "Add out-of-band swap attributes to any element."
def generate_sse_cleanup_script(
) -> str:  # JavaScript code as a string
    "Generate JavaScript for proper SSE connection cleanup."
def get_htmx_sse_extension_url(
    version: str = "2.2.3"  # Version of the extension
) -> str:  # CDN URL as a string
    "Get the CDN URL for the HTMX SSE extension."
def create_sse_attrs(
    endpoint: str,  # SSE endpoint
    event: str = "message",  # Event name to listen for
    swap: Optional[str] = None,  # Optional swap mode **kwargs: Additional attributes
    **kwargs
) -> Dict[str, str]:  # Dictionary of attributes
    "Create a dictionary of SSE attributes. This is a convenience function for creating SSE attributes that can be spread into element constructors."
def create_oob_attrs(
    element_id: str,  # ID of target element
    swap_type: str = "innerHTML"  # Type of swap
) -> Dict[str, str]:  # Dictionary of attributes
    "Create a dictionary of OOB swap attributes."

Classes

@dataclass
class SSEConfig:
    """
    Configuration for SSE-enabled elements.
    
    This dataclass provides a clean way to configure SSE attributes
    that can be applied to any element.
    """
    
    endpoint: str
    event_name: str = 'message'
    swap_mode: Optional[str]
    reconnect_time: Optional[int]
    extra_attrs: Dict[str, Any] = field(...)
    
    def apply_to(
            self,
            element: Any  # Element to configure
        ) -> Any:  # The configured element
        "Apply this SSE configuration to an element."
    
    def to_attrs(
            self
        ) -> Dict[str, str]:  # Dictionary of HTMX SSE attributes
        "Convert configuration to attribute dictionary."
@dataclass
class OOBUpdate:
    """
    Represents an out-of-band update.
    
    This is a generic container for OOB updates that doesn't
    depend on specific element types.
    """
    
    element: Any
    swap_type: str = 'innerHTML'
    target_id: Optional[str]
    
    def prepare(
            self
        ) -> Any:  # Element with OOB swap attributes
        "Prepare element with OOB attributes."
class MultiUpdateBuilder:
    def __init__(
        self,
        container_factory: Optional[Callable] = None  # Optional factory function for creating containers.  Should accept *children as arguments.
    )
    """
    Builder for creating multiple OOB updates.
    
    This builder is framework-agnostic and works with any elements.
    """
    
    def __init__(
            self,
            container_factory: Optional[Callable] = None  # Optional factory function for creating containers.  Should accept *children as arguments.
        )
        "Initialize the builder."
    
    def add(
            self,
            element: Any,  # Element to add
            swap_type: str = "innerHTML",  # Type of swap
            target_id: Optional[str] = None  # Optional target ID
        ) -> 'MultiUpdateBuilder':  # Self for chaining
        "Add an update to the builder."
    
    def add_many(
            self,
            updates: List[Union[OOBUpdate, tuple, Any]] # List of updates (OOBUpdate objects, tuples, or elements)
        ) -> 'MultiUpdateBuilder':  # Self for chaining
        "Add multiple updates at once."
    
    def build(
            self
        ) -> Any:  # Container with all prepared OOB updates
        "Build the final multi-update container."
    
    def clear(
            self
        ) -> 'MultiUpdateBuilder':  # Self for chaining
        "Clear all pending updates."

decorators (decorators.ipynb)

Route decorators and utilities for FastHTML SSE endpoints

Import

from cjm_fasthtml_sse.core.decorators import (
    sse_endpoint,
    broadcast_action,
    sse_generator_endpoint
)

Functions

def sse_endpoint(
    broadcast_manager: Optional[SSEBroadcastManager] = None,  # Optional SSEBroadcastManager for automatic broadcasting
    config: Optional[SSEStreamConfig] = None,  # Optional SSEStreamConfig for stream configuration
    message_filter: Optional[Callable[[BroadcastMessage], bool]] = None,
    client_metadata_fn: Optional[Callable[..., Dict[str, Any]]] = None
)
    "Decorator for creating SSE endpoints with optional broadcast integration. This decorator can work in two modes: 1. With broadcast_manager: Automatically connects to broadcast stream 2. Without broadcast_manager: Wraps custom async generator functions"
def broadcast_action(
    manager: SSEBroadcastManager,  # SSEBroadcastManager to broadcast through
    event_type: Optional[str] = None,  # Event type to broadcast (defaults to function name)
    extract_data: Optional[Callable[..., Dict[str, Any]]] = None,
    broadcast_before: bool = False,  # Broadcast before executing function
    broadcast_after: bool = True,  # Broadcast after executing function
    include_result: bool = True  # Include function result in broadcast data
)
    "Decorator that broadcasts events when actions occur. This decorator automatically broadcasts SSE messages when the decorated function is called, useful for notifying clients of state changes."
def sse_generator_endpoint(
    interval: float = 1.0,  # Seconds between data checks
    event_type: Optional[str] = None,  # Optional event type for messages
    heartbeat: Optional[float] = 30.0  # Heartbeat interval in seconds
)
    "Decorator for creating simple SSE endpoints from data functions. This decorator converts a function that returns data into an SSE streaming endpoint."

fasthtml (fasthtml.ipynb)

High-level FastHTML integration for SSE with HTMX

Import

from cjm_fasthtml_sse.integrations.fasthtml import (
    FastHTMLSSEConfig,
    FastHTMLSSE,
    setup_sse,
    create_sse_page_template
)

Functions

def setup_sse(
    app: Any,  # FastHTML application
    prefix: str = "/sse",  # URL prefix for SSE routes
    **kwargs  # Additional configuration options
) -> FastHTMLSSE:  # Configured FastHTMLSSE instance
    "Quick setup function for SSE in FastHTML apps."
def create_sse_page_template(
    sse: FastHTMLSSE,  # FastHTMLSSE instance
    title: str = "SSE Demo",  # Page title
    content: Any = None  # Page content
)
    "Create a basic page template with SSE setup."

Classes

@dataclass
class FastHTMLSSEConfig:
    """
    Configuration for FastHTML SSE integration.
    
    Attributes:
        app: FastHTML application instance
        prefix: URL prefix for SSE endpoints
        enable_monitor: Add connection monitoring scripts
        enable_cleanup: Add cleanup script for proper SSE closure
        enable_htmx_extension: Add HTMX SSE extension script
        monitor_debug: Enable debug logging in monitor
        broadcast_history: Number of messages to keep in history
        default_stream_config: Default configuration for SSE streams
    """
    
    app: Any  # FastHTML app
    prefix: str = '/sse'
    enable_monitor: bool = True
    enable_cleanup: bool = True
    enable_htmx_extension: bool = True
    monitor_debug: bool = False
    broadcast_history: int = 50
    default_stream_config: Optional[SSEStreamConfig]
class FastHTMLSSE:
    def __init__(
        self,
        config: FastHTMLSSEConfig  # Configuration for the SSE integration
    )
    """
    Main integration class for SSE with FastHTML.
    
    This class provides a high-level API for adding SSE capabilities
    to FastHTML applications with minimal configuration.
    """
    
    def __init__(
            self,
            config: FastHTMLSSEConfig  # Configuration for the SSE integration
        )
        "Initialize FastHTML SSE integration.

Args:
    config: Configuration for the integration"
    
    async def broadcast(
            self,
            event_type: str,  # Type of event to broadcast
            data: Dict[str, Any],  # Data payload to broadcast
            target_ids: Optional[List[str]] = None  # Optional list of element IDs to update
        ) -> int:  # Number of clients successfully notified
        "Broadcast a message to all connected clients.

Args:
    event_type: Type of event
    data: Data to broadcast
    target_ids: Optional list of element IDs to update
    
Returns:
    Number of clients notified"
    
    def create_sse_element(
            self,
            element_id: str,  # ID for the HTML element
            endpoint: Optional[str] = None,  # SSE endpoint URL
            content: Any = None,  # Initial content for the element
            hidden: bool = False,  # Whether to hide the element
            **kwargs
        )
        "Create an SSE-enabled element.

Args:
    element_id: ID for the element
    endpoint: SSE endpoint (defaults to global)
    content: Initial content
    hidden: Whether to hide the element
    **kwargs: Additional attributes
    
Returns:
    Configured element"
    
    def create_monitor(
            self,
            sse_element_id: str = "global-sse",  # ID for the SSE connection element
            status_element_id: str = "connection-status"  # ID for the status display element
        )
        "Create connection monitor elements.

Args:
    sse_element_id: ID for SSE connection element
    status_element_id: ID for status display element
    
Returns:
    Tuple of (sse_element, status_element, monitor_script)"
    
    def create_oob_update(
            self,
            updates: List[tuple]  # List of (element_id, content, swap_type) tuples
        ) -> str:  # Formatted SSE message with OOB updates
        "Create an OOB update message.

Args:
    updates: List of (element_id, content, swap_type) tuples
    
Returns:
    Formatted SSE message with OOB updates"
    
    def sse_route(
            self,
            path: str,  # Route path for the SSE endpoint
            message_filter: Optional[Callable] = None  # Optional filter function for messages
        )
        "Decorator for creating SSE routes.

Args:
    path: Route path
    message_filter: Optional filter function
    
Returns:
    Route decorator"
    
    def action_route(
            self,
            path: str,  # Route path for the action endpoint
            event_type: Optional[str] = None,  # Event type to broadcast after action
            method: str = "post"  # HTTP method for the route
        )
        "Decorator for creating action routes that broadcast.

Args:
    path: Route path
    event_type: Event type to broadcast
    method: HTTP method
    
Returns:
    Route decorator"

monitors (monitors.ipynb)

Client-side SSE connection monitoring and management utilities

Import

from cjm_fasthtml_sse.components.monitors import (
    ConnectionState,
    MonitorConfig,
    generate_monitor_script,
    generate_simple_monitor,
    generate_connection_counter
)

Functions

def generate_monitor_script(
    config: MonitorConfig,  # Monitor configuration
    status_indicators: Optional[Dict[ConnectionState, str]] = None
) -> str:  # JavaScript code as a string
    "Generate JavaScript for monitoring SSE connections."
def generate_simple_monitor(
    sse_element_id: str,  # ID of SSE element to monitor
    status_element_id: Optional[str] = None,  # Optional status display element
    debug: bool = False  # Enable console logging
) -> str:  # JavaScript code for basic monitoring
    "Generate a simple SSE connection monitor."
def generate_connection_counter(
) -> str:  # JavaScript code that tracks connection count
    "Generate JavaScript for counting active SSE connections."

Classes

class ConnectionState(Enum):
    "SSE connection states"
@dataclass
class MonitorConfig:
    """
    Configuration for SSE connection monitoring.
    
    Attributes:
        sse_element_id: ID of the SSE connection element to monitor
        status_element_id: ID of element to update with connection status
        auto_reconnect: Whether to automatically reconnect on failures
        reconnect_delay: Delay in ms before reconnection attempts
        max_reconnect_attempts: Maximum number of reconnection attempts (None = infinite)
        visibility_reconnect: Reconnect when tab becomes visible
        heartbeat_timeout: Timeout in ms to consider connection dead (None = no timeout)
        debug: Enable console logging
        callbacks: JavaScript callbacks for state changes
    """
    
    sse_element_id: str
    status_element_id: Optional[str]
    auto_reconnect: bool = True
    reconnect_delay: int = 3000
    max_reconnect_attempts: Optional[int]
    visibility_reconnect: bool = True
    heartbeat_timeout: Optional[int]
    debug: bool = False
    callbacks: Dict[str, str] = field(...)
    
    def to_js_config(
            self
        ) -> str:  # JSON string of the JavaScript configuration object
        "Convert to JavaScript configuration object"

streaming (streaming.ipynb)

SSE streaming utilities for creating and managing Server-Sent Events streams

Import

from cjm_fasthtml_sse.core.streaming import (
    format_sse_message,
    format_sse_comment,
    SSEStreamConfig,
    sse_broadcast_stream,
    sse_generator,
    OOBUpdate,
    format_oob_updates,
    create_oob_message
)

Functions

def format_sse_message(
    data: Union[str, Dict[str, Any], Any],
    event: Optional[str] = None,  # Optional event type
    id: Optional[str] = None,  # Optional event ID
    retry: Optional[int] = None  # Optional retry interval in milliseconds
) -> str:  # Formatted SSE message string
    "Format data as a Server-Sent Event message."
def format_sse_comment(
    comment: str  # Comment text
) -> str:  # Formatted SSE comment
    "Format a comment for SSE (typically used for heartbeats)."
async def sse_broadcast_stream(
    manager: SSEBroadcastManager,  # The SSEBroadcastManager to connect to
    client_metadata: Optional[Dict[str, Any]] = None,
    config: Optional[SSEStreamConfig] = None,  # Optional stream configuration
    message_filter: Optional[Callable[[BroadcastMessage], bool]] = None
) -> AsyncIterator[str]:  # Async iterator yielding formatted SSE messages
    "Create an SSE stream that receives messages from a broadcast manager. This is the main function for creating SSE endpoints that receive broadcast messages and format them for SSE delivery."
async def sse_generator(
    data_source: Union[AsyncIterator, Callable],
    interval: float = 1.0,  # Seconds between data checks (if using callable)
    event_type: Optional[str] = None,  # Optional event type for all messages
    heartbeat: Optional[float] = 30.0  # Heartbeat interval in seconds (None to disable)
) -> AsyncIterator[str]:  # Async iterator yielding formatted SSE messages
    "Create an SSE stream from a data source. This is useful for creating SSE streams from custom data sources without using the broadcast manager."
def format_oob_updates(
    updates: List[OOBUpdate]  # List of OOB updates to format
) -> str:  # Combined HTML string with all OOB updates
    "Format multiple OOB updates into a single HTML string."
def create_oob_message(
    updates: List[OOBUpdate],  # List of OOB updates
    event: str = "message"  # SSE event type
) -> str:  # Formatted SSE message with OOB HTML
    "Create an SSE message containing OOB updates for HTMX."

Classes

@dataclass
class SSEStreamConfig:
    "Configuration for SSE streams."
    
    heartbeat_interval: Optional[float] = 30.0  # Seconds between heartbeat messages (None to disable)
    reconnect_time: Optional[int] = 3000  # Suggested reconnect time for clients in milliseconds
    timeout: Optional[float]  # Maximum time to wait for messages before closing stream
    initial_event: Optional[Dict[str, Any]]  # Event to send when stream starts
    closing_event: Optional[Dict[str, Any]]  # Event to send when stream closes
    debug: bool = False  # Enable debug logging
@dataclass
class OOBUpdate:
    "Represents an out-of-band update for HTMX."
    
    target_id: str  # The ID of the element to update
    content: str  # The HTML content to swap in
    swap_type: str = 'innerHTML'  # The type of swap (innerHTML, outerHTML, etc.)
    
    def to_html(
            self
        ) -> str:  # HTML string with hx-swap-oob attribute for HTMX
        "Convert to HTML with hx-swap-oob attribute"

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

cjm_fasthtml_sse-0.0.4.tar.gz (34.3 kB view details)

Uploaded Source

Built Distribution

If you're not sure about the file name format, learn more about wheel file names.

cjm_fasthtml_sse-0.0.4-py3-none-any.whl (32.1 kB view details)

Uploaded Python 3

File details

Details for the file cjm_fasthtml_sse-0.0.4.tar.gz.

File metadata

  • Download URL: cjm_fasthtml_sse-0.0.4.tar.gz
  • Upload date:
  • Size: 34.3 kB
  • Tags: Source
  • Uploaded using Trusted Publishing? No
  • Uploaded via: twine/6.1.0 CPython/3.11.13

File hashes

Hashes for cjm_fasthtml_sse-0.0.4.tar.gz
Algorithm Hash digest
SHA256 7a041117ea7f7d73c4ca1c5b2fcd091e1dd593383cfdf292d96da3dc25810a7c
MD5 6dc409c2761b17d3f0ba02b59f1dd788
BLAKE2b-256 4418d53b6a8dff564a34649af20bc055a34f7c1045de11a0b976230f918fd57e

See more details on using hashes here.

File details

Details for the file cjm_fasthtml_sse-0.0.4-py3-none-any.whl.

File metadata

File hashes

Hashes for cjm_fasthtml_sse-0.0.4-py3-none-any.whl
Algorithm Hash digest
SHA256 90973d681d4c15fd288ac45bec0f84885220b536bb82588d4e6a69931661589b
MD5 b72ff94aaf6b3490568866ed6acbc6af
BLAKE2b-256 643cd282b44521d4ff09a03cd0d831e61b090529c17e68270ff5136500924b11

See more details on using hashes here.

Supported by

AWS Cloud computing and Security Sponsor Datadog Monitoring Depot Continuous Integration Fastly CDN Google Download Analytics Pingdom Monitoring Sentry Error logging StatusPage Status page