Skip to main content

Real-time Server-Sent Events (SSE) and HTMX integration library for FastHTML with cross-tab synchronization support.

Project description

cjm-fasthtml-sse

Install

pip install cjm_fasthtml_sse

Project Structure

nbs/
└── core/ (3)
    ├── broadcast.ipynb    # Broadcasting infrastructure for SSE cross-tab synchronization
    ├── connections.ipynb  # Connection management for SSE clients
    └── streaming.ipynb    # SSE streaming utilities and helpers

Total: 3 notebooks across 4 directories

Module Dependencies

graph LR
    core_broadcast[core.broadcast<br/>Broadcast]
    core_connections[core.connections<br/>Connections]
    core_streaming[core.streaming<br/>Streaming]

No cross-module dependencies detected.

CLI Reference

No CLI commands found in this project.

Module Overview

Detailed documentation for each module in the project:

Broadcast (broadcast.ipynb)

Broadcasting infrastructure for SSE cross-tab synchronization

Import

from cjm_fasthtml_sse.core.broadcast import (
    BroadcastMessage,
    BroadcastManager,
    create_broadcast_endpoint,
    create_broadcast_handler,
    setup_broadcast_routes
)

Functions

async def create_broadcast_endpoint(manager: BroadcastManager,
                                   connection_id: Optional[str] = None,  # Optional connection ID
                                   heartbeat_interval: float = 30.0,  # Interval for sending heartbeat messages
                                   send_history: bool = False,  # Whether to send recent history on connection
                                   history_limit: int = 10) -> EventStream
    "Create an SSE endpoint for broadcasting."
def create_broadcast_handler(manager: BroadcastManager,
                            element_builder: Optional[Callable] = None)
    "Create a broadcast handler function that can be used with FastHTML routes."
def setup_broadcast_routes(app, 
                          manager: BroadcastManager,  # The broadcast manager instance
                          prefix: str = "/sse",  # URL prefix for SSE endpoints
                          element_builder: Optional[Callable] = None)
    "Setup broadcast routes on a FastHTML app."

Classes

@dataclass
class BroadcastMessage:
    "Standard broadcast message format for SSE communication"
    
    type: str
    data: Dict[str, Any]
    timestamp: str = field(...)
    metadata: Optional[Dict[str, Any]]
    
    def to_dict(
            self
        ) -> Dict[str, Any]:  # TODO: Add return description
        "Convert message to dictionary format"
    
    def to_json(
            self
        ) -> str:  # TODO: Add return description
        "Convert message to JSON string"
    
    def to_sse(
            self,
            event_type: Optional[str] = 'message'  # TODO: Add description
        ) -> str:  # TODO: Add return description
        "Convert to SSE message format using FastHTML's sse_message"
class BroadcastManager:
    def __init__(self, 
                 max_queue_size: int = 100,  # TODO: Add description
                 history_limit: int = 50,  # TODO: Add description
                 queue_timeout: float = 0.1,  # TODO: Add description
                 debug: bool = False)
    "Manages SSE broadcast connections across multiple tabs/clients"
    
    def __init__(self,
                     max_queue_size: int = 100,  # TODO: Add description
                     history_limit: int = 50,  # TODO: Add description
                     queue_timeout: float = 0.1,  # TODO: Add description
                     debug: bool = False)
        "Initialize the broadcast manager.

Args:
    max_queue_size: Maximum size for each connection's message queue
    history_limit: Number of recent messages to keep in history
    queue_timeout: Timeout for queue operations in seconds
    debug: Enable debug logging"
    
    async def register(self, 
                          connection_id: Optional[str] = None,  # TODO: Add description
                          metadata: Optional[Dict[str, Any]] = None) -> tuple[str, asyncio.Queue]
        "Register a new connection and return its queue.

Args:
    connection_id: Optional ID for the connection (auto-generated if not provided)
    metadata: Optional metadata for the connection
    
Returns:
    Tuple of (connection_id, queue)"
    
    async def unregister(
            self,
            connection_id: str  # TODO: Add description
        )
        "Unregister a connection."
    
    async def broadcast(self, 
                           message_type: str,  # TODO: Add description
                           data: Dict[str, Any],
                           metadata: Optional[Dict[str, Any]] = None,
                           exclude: Optional[Set[str]] = None) -> int
        "Broadcast a message to all connected clients.

Args:
    message_type: Type of the message
    data: Message data
    metadata: Optional metadata
    exclude: Set of connection IDs to exclude from broadcast
    
Returns:
    Number of successful broadcasts"
    
    async def send_to(self,
                         connection_id: str,  # TODO: Add description
                         message_type: str,  # TODO: Add description
                         data: Dict[str, Any],
                         metadata: Optional[Dict[str, Any]] = None) -> bool
        "Send a message to a specific connection.

Args:
    connection_id: Target connection ID
    message_type: Type of the message
    data: Message data
    metadata: Optional metadata
    
Returns:
    True if successful, False otherwise"
    
    def get_connection_count(
            self
        ) -> int:  # TODO: Add return description
        "Get the number of active connections."
    
    def get_history(
            self,
            limit: Optional[int] = None  # TODO: Add description
        ) -> list[BroadcastMessage]:  # TODO: Add return description
        "Get broadcast history."

Connections (connections.ipynb)

Connection management for SSE clients

Import

from cjm_fasthtml_sse.core.connections import (
    ConnectionState,
    SSEConnection,
    ConnectionRegistry,
    create_sse_element,
    cleanup_sse_on_unload,
    create_reconnection_script,
    create_connection_manager_script
)

Functions

def create_sse_element(endpoint: str,
                      element_id: Optional[str] = None,  # Optional element ID
                      swap_strategy: str = "message",  # HTMX swap strategy (message, innerHTML, outerHTML, etc.)
                      hidden: bool = False,  # Whether to hide the element **attrs: Additional attributes for the element
                      **attrs) -> Div
    "Create an SSE-enabled HTML element."
def cleanup_sse_on_unload(
) -> Script:  # Script element for cleanup
    "Create a script to clean up SSE connections on page unload."
def create_reconnection_script(check_interval: int = 5000,
                              max_retries: int = 5,  # Maximum number of reconnection attempts
                              debug: bool = False) -> Script
    "Create a script for automatic SSE reconnection."
def create_connection_manager_script(registry_endpoint: str = "/sse/connections",
                                    update_interval: int = 10000) -> Script
    "Create a script to manage and monitor connections."

Classes

class ConnectionState(Enum):
    "States for SSE connections"
@dataclass
class SSEConnection:
    "Represents a single SSE connection"
    
    connection_id: str
    queue: asyncio.Queue
    connection_type: str = 'global'
    state: ConnectionState = ConnectionState.CONNECTING
    metadata: Dict[str, Any] = field(...)
    created_at: datetime = field(...)
    last_activity: datetime = field(...)
    message_count: int = 0
    
    async def send(
            self,
            data: Any,  # TODO: Add description
            timeout: float = 1.0  # TODO: Add description
        ) -> bool:  # TODO: Add return description
        "Send data through the connection queue.

Args:
    data: Data to send
    timeout: Timeout for the send operation
    
Returns:
    True if successful, False otherwise"
    
    async def heartbeat(
            self
        ) -> str:  # TODO: Add return description
        "Generate a heartbeat message."
    
    def close(self):
            """Mark the connection as closed."""
            self.state = ConnectionState.DISCONNECTED
        
        def is_active(
            self
        ) -> bool:  # TODO: Add return description
        "Mark the connection as closed."
    
    def is_active(
            self
        ) -> bool:  # TODO: Add return description
        "Check if connection is active."
class ConnectionRegistry:
    def __init__(
        self,
        debug: bool = False  # TODO: Add description
    )
    "Registry to track and manage SSE connections"
    
    def __init__(
            self,
            debug: bool = False  # TODO: Add description
        )
        "Initialize the connection registry.

Args:
    debug: Enable debug logging"
    
    async def add_connection(self,
                                conn_id: Optional[str] = None,  # TODO: Add description
                                conn_type: str = "global",  # TODO: Add description
                                queue_size: int = 100,  # TODO: Add description
                                metadata: Optional[Dict[str, Any]] = None) -> SSEConnection
        "Add a new connection to the registry.

Args:
    conn_id: Optional connection ID (auto-generated if not provided)
    conn_type: Type of connection (e.g., 'global', 'job', 'user')
    queue_size: Size of the message queue
    metadata: Optional metadata for the connection
    
Returns:
    The created SSEConnection"
    
    async def remove_connection(
            self,
            conn_id: str  # TODO: Add description
        )
        "Remove a connection from the registry.

Args:
    conn_id: Connection ID to remove"
    
    def get_connection(
            self,
            conn_id: str  # TODO: Add description
        ) -> Optional[SSEConnection]:  # TODO: Add return description
        "Get a specific connection.

Args:
    conn_id: Connection ID
    
Returns:
    The connection if found, None otherwise"
    
    def get_connections(
            self,
            conn_type: Optional[str] = None  # TODO: Add description
        ) -> list[SSEConnection]:  # TODO: Add return description
        "Get connections, optionally filtered by type.

Args:
    conn_type: Optional connection type to filter by
    
Returns:
    List of connections"
    
    def get_active_connections(
            self,
            conn_type: Optional[str] = None  # TODO: Add description
        ) -> list[SSEConnection]:  # TODO: Add return description
        "Get active connections.

Args:
    conn_type: Optional connection type to filter by
    
Returns:
    List of active connections"
    
    def get_stats(
            self
        ) -> Dict[str, Any]:  # TODO: Add return description
        "Get registry statistics.

Returns:
    Dictionary with connection statistics"

Streaming (streaming.ipynb)

SSE streaming utilities and helpers

Import

from cjm_fasthtml_sse.core.streaming import (
    StreamConfig,
    SSEStream,
    OOBStreamBuilder,
    sse_generator,
    create_sse_endpoint,
    stream_updates,
    create_throttled_stream
)

Functions

async def sse_generator(data_source: Union[AsyncGenerator, List, Callable],
                       interval: float = 0.5,  # Interval between items for list sources
                       heartbeat: float = 30.0,  # Heartbeat interval
                       transform: Optional[Callable] = None) -> AsyncGenerator[str, None]
    "Create an SSE generator from various data sources."
def create_sse_endpoint(stream_fn: Callable,
                       content_type: str = "text/event-stream") -> Callable
    "Create an SSE endpoint from a streaming function."
async def stream_updates(source_queue: asyncio.Queue,
                        transform_fn: Optional[Callable] = None,  # Optional transformation function
                        config: Optional[StreamConfig] = None) -> AsyncGenerator[str, None]
    "Stream updates from an async queue."
def create_throttled_stream(source: AsyncGenerator,
                           min_interval: float = 0.1,  # Minimum interval between messages
                           max_buffer: int = 10) -> AsyncGenerator
    "Create a throttled stream to prevent overwhelming clients."

Classes

@dataclass
class StreamConfig:
    "Configuration for SSE streaming"
    
    heartbeat_interval: float = 30.0
    timeout: Optional[float]
    send_initial_message: bool = True
    initial_message: str = 'Connected'
    send_close_message: bool = True
    close_message: str = 'Connection closed'
    debug: bool = False
class SSEStream:
    def __init__(
        self,
        config: Optional[StreamConfig] = None  # TODO: Add description
    )
    "Generic SSE stream handler"
    
    def __init__(
            self,
            config: Optional[StreamConfig] = None  # TODO: Add description
        )
        "Initialize the SSE stream.

Args:
    config: Stream configuration"
    
    async def stream(self,
                        data_source: Union[AsyncGenerator, Callable],
                        transform_fn: Optional[Callable] = None) -> AsyncGenerator[str, None]
        "Stream data from a source through SSE.

Args:
    data_source: Async generator or callable that produces data
    transform_fn: Optional function to transform data before sending
    
Yields:
    SSE formatted strings"
    
    def stop(self)
        "Stop the stream."
class OOBStreamBuilder:
    def __init__(self):
        """Initialize the OOB stream builder."""
        self.elements: List[Any] = []
    "Build SSE messages with OOB (Out-of-Band) swaps"
    
    def __init__(self):
            """Initialize the OOB stream builder."""
            self.elements: List[Any] = []
        "Initialize the OOB stream builder."
    
    def add_element(self,
                       element: Any,
                       target_id: Optional[str] = None,
                       swap_mode: str = "innerHTML",
                       wrap: bool = True) -> 'OOBStreamBuilder'
        "Add an element with OOB swap configuration.

Args:
    element: The element to add
    target_id: Target element ID for OOB swap
    swap_mode: Swap mode (innerHTML, outerHTML, beforeend, afterbegin, etc.)
    wrap: If True and target_id is provided, wrap content in a Div with OOB attributes.
          If False, add OOB attributes directly to the element.
    
Returns:
    Self for chaining"
    
    def add_elements(self, elements: List[tuple]) -> 'OOBStreamBuilder':
            """Add multiple elements with OOB configurations.
            
            Args:
                elements: List of tuples: (element, target_id, swap_mode, wrap) or
                         (element, target_id, swap_mode) or (element, target_id) or (element,)
                
            Returns:
                Self for chaining
            """
            for item in elements
        "Add multiple elements with OOB configurations.

Args:
    elements: List of tuples: (element, target_id, swap_mode, wrap) or
             (element, target_id, swap_mode) or (element, target_id) or (element,)
    
Returns:
    Self for chaining"
    
    def build(self) -> str:
            """Build the SSE message with all elements.
            
            Returns:
                SSE formatted message
            """
            if not self.elements
        "Build the SSE message with all elements.

Returns:
    SSE formatted message"
    
    def clear(self) -> 'OOBStreamBuilder'
        "Clear all elements.

Returns:
    Self for chaining"

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

cjm_fasthtml_sse-0.0.9.tar.gz (23.3 kB view details)

Uploaded Source

Built Distribution

If you're not sure about the file name format, learn more about wheel file names.

cjm_fasthtml_sse-0.0.9-py3-none-any.whl (21.4 kB view details)

Uploaded Python 3

File details

Details for the file cjm_fasthtml_sse-0.0.9.tar.gz.

File metadata

  • Download URL: cjm_fasthtml_sse-0.0.9.tar.gz
  • Upload date:
  • Size: 23.3 kB
  • Tags: Source
  • Uploaded using Trusted Publishing? No
  • Uploaded via: twine/6.1.0 CPython/3.11.13

File hashes

Hashes for cjm_fasthtml_sse-0.0.9.tar.gz
Algorithm Hash digest
SHA256 ef3e57df0540481b50bb8c63a73fb7fbbeef3f09f581ff739857a5d2b80d0d29
MD5 a2adee2f11effa62c57f2db7c3e49d1f
BLAKE2b-256 24707e176ed266b46673c846d9fc13eebb8947071af345d438373ef186768ac1

See more details on using hashes here.

File details

Details for the file cjm_fasthtml_sse-0.0.9-py3-none-any.whl.

File metadata

File hashes

Hashes for cjm_fasthtml_sse-0.0.9-py3-none-any.whl
Algorithm Hash digest
SHA256 6a024237cc003a42948c62dadf1a08ab6ea13f4435936209d9d967310df0673b
MD5 efc2e261b398097592761ea7ced05199
BLAKE2b-256 1d9a5ea2713ca40245accd6659efe6c7743336d27f11c64b2bcf8f3b0aed55b8

See more details on using hashes here.

Supported by

AWS Cloud computing and Security Sponsor Datadog Monitoring Depot Continuous Integration Fastly CDN Google Download Analytics Pingdom Monitoring Sentry Error logging StatusPage Status page