Skip to main content

Real-time Server-Sent Events (SSE) and HTMX integration library for FastHTML with cross-tab synchronization support.

Project description

cjm-fasthtml-sse

Install

pip install cjm_fasthtml_sse

Project Structure

nbs/
└── core/ (3)
    ├── broadcast.ipynb    # Broadcasting infrastructure for SSE cross-tab synchronization
    ├── connections.ipynb  # Connection management for SSE clients
    └── streaming.ipynb    # SSE streaming utilities and helpers

Total: 3 notebooks across 4 directories

Module Dependencies

graph LR
    core_broadcast[core.broadcast<br/>Broadcast]
    core_connections[core.connections<br/>Connections]
    core_streaming[core.streaming<br/>Streaming]

No cross-module dependencies detected.

CLI Reference

No CLI commands found in this project.

Module Overview

Detailed documentation for each module in the project:

Broadcast (broadcast.ipynb)

Broadcasting infrastructure for SSE cross-tab synchronization

Import

from cjm_fasthtml_sse.core.broadcast import (
    BroadcastMessage,
    BroadcastManager,
    create_broadcast_endpoint,
    create_broadcast_handler,
    setup_broadcast_routes
)

Functions

async def create_broadcast_endpoint(manager: BroadcastManager,
                                   connection_id: Optional[str] = None,  # Optional connection ID
                                   heartbeat_interval: float = 30.0,  # Interval for sending heartbeat messages
                                   send_history: bool = False,  # Whether to send recent history on connection
                                   history_limit: int = 10) -> EventStream
    "Create an SSE endpoint for broadcasting."
def create_broadcast_handler(manager: BroadcastManager,
                            element_builder: Optional[Callable] = None)
    "Create a broadcast handler function that can be used with FastHTML routes."
def setup_broadcast_routes(app, 
                          manager: BroadcastManager,  # The broadcast manager instance
                          prefix: str = "/sse",  # URL prefix for SSE endpoints
                          element_builder: Optional[Callable] = None)
    "Setup broadcast routes on a FastHTML app."

Classes

@dataclass
class BroadcastMessage:
    "Standard broadcast message format for SSE communication"
    
    type: str
    data: Dict[str, Any]
    timestamp: str = field(...)
    metadata: Optional[Dict[str, Any]]
    
    def to_dict(
            self
        ) -> Dict[str, Any]:  # Dictionary representation of the message
        "Convert message to dictionary format"
    
    def to_json(
            self
        ) -> str:  # JSON string representation of the message
        "Convert message to JSON string"
    
    def to_sse(
            self,
            event_type: Optional[str] = 'message'  # SSE event type for the message
        ) -> str:  # SSE formatted message string
        "Convert to SSE message format using FastHTML's sse_message"
class BroadcastManager:
    def __init__(self,
    "Manages SSE broadcast connections across multiple tabs/clients"
    
    def __init__(self,
        "Initialize the broadcast manager."
    
    async def register(self, 
                          connection_id: Optional[str] = None,  # Optional ID for the connection (auto-generated if not provided)
                          metadata: Optional[Dict[str, Any]] = None # Optional metadata for the connection
                          ) -> tuple[str, asyncio.Queue]: # Tuple of (connection_id, queue)
        "Register a new connection and return its queue."
    
    async def unregister(
            self,
            connection_id: str  # ID of the connection to unregister
        )
        "Unregister a connection."
    
    async def broadcast(self, 
                           message_type: str,  # Type of the message
                           data: Dict[str, Any], # Message data
                           metadata: Optional[Dict[str, Any]] = None, # Optional metadata
                           exclude: Optional[Set[str]] = None # Set of connection IDs to exclude from broadcast
                           ) -> int: # Number of successful broadcasts
        "Broadcast a message to all connected clients."
    
    async def send_to(self,
                         connection_id: str,  # Target connection ID
                         message_type: str,  # Type of the message
                         data: Dict[str, Any], # Message data
                         metadata: Optional[Dict[str, Any]] = None # Optional metadata
                         ) -> bool: # True if successful, False otherwise
        "Send a message to a specific connection."
    
    def get_connection_count(
            self
        ) -> int:  # Number of active connections
        "Get the number of active connections."
    
    def get_history(
            self,
            limit: Optional[int] = None  # Maximum number of messages to return
        ) -> list[BroadcastMessage]:  # List of broadcast messages from history
        "Get broadcast history."

Connections (connections.ipynb)

Connection management for SSE clients

Import

from cjm_fasthtml_sse.core.connections import (
    ConnectionState,
    SSEConnection,
    ConnectionRegistry,
    create_sse_element,
    cleanup_sse_on_unload,
    create_reconnection_script,
    create_connection_manager_script
)

Functions

def create_sse_element(endpoint: str,
                      element_id: Optional[str] = None,  # Optional element ID
                      swap_strategy: str = "message",  # HTMX swap strategy (message, innerHTML, outerHTML, etc.)
                      hidden: bool = False,  # Whether to hide the element
                      **attrs # Additional attributes for the element
                      ) -> Div:  # SSE-enabled Div element configured for HTMX
    "Create an SSE-enabled HTML element."
def cleanup_sse_on_unload(
) -> Script:  # Script element for cleanup
    "Create a script to clean up SSE connections on page unload."
def create_reconnection_script(check_interval: int = 5000,
                              max_retries: int = 5,  # Maximum number of reconnection attempts
                              debug: bool = False  # Enable debug logging
                              ) -> Script:  # Script element with reconnection logic
    "Create a script for automatic SSE reconnection."
def create_connection_manager_script(registry_endpoint: str = "/sse/connections",
                                    update_interval: int = 10000  # Interval for updating connection stats (in milliseconds)
                                    ) -> Script:  # Script element with connection management logic
    "Create a script to manage and monitor connections."

Classes

class ConnectionState(Enum):
    "States for SSE connections"
@dataclass
class SSEConnection:
    "Represents a single SSE connection"
    
    connection_id: str
    queue: asyncio.Queue
    connection_type: str = 'global'
    state: ConnectionState = ConnectionState.CONNECTING
    metadata: Dict[str, Any] = field(...)
    created_at: datetime = field(...)
    last_activity: datetime = field(...)
    message_count: int = 0
    
    async def send(
            self,
            data: Any,  # Data to send
            timeout: float = 1.0  # Timeout for the send operation
        ) -> bool:  # True if successful, False otherwise
        "Send data through the connection queue."
    
    async def heartbeat(
            self
        ) -> str:  # SSE formatted heartbeat message
        "Generate a heartbeat message."
    
    def close(self):
            """Mark the connection as closed."""
            self.state = ConnectionState.DISCONNECTED
        
        def is_active(
            self
        ) -> bool:  # True if connection is active, False otherwise
        "Mark the connection as closed."
    
    def is_active(
            self
        ) -> bool:  # True if connection is active, False otherwise
        "Check if connection is active."
class ConnectionRegistry:
    def __init__(
        self,
        debug: bool = False  # Enable debug logging
    )
    "Registry to track and manage SSE connections"
    
    def __init__(
            self,
            debug: bool = False  # Enable debug logging
        )
        "Initialize the connection registry."
    
    async def add_connection(self,
                                conn_id: Optional[str] = None,  # Optional connection ID (auto-generated if not provided)
                                conn_type: str = "global",  # Type of connection (e.g., 'global', 'job', 'user')
                                queue_size: int = 100,  # Size of the message queue
                                metadata: Optional[Dict[str, Any]] = None # Optional metadata for the connection
                                ) -> SSEConnection: # The created SSEConnection
        "Add a new connection to the registry."
    
    async def remove_connection(
            self,
            conn_id: str  # Connection ID to remove
        )
        "Remove a connection from the registry."
    
    def get_connection(
            self,
            conn_id: str  # Connection ID
        ) -> Optional[SSEConnection]:  # The connection if found, None otherwise
        "Get a specific connection."
    
    def get_connections(
            self,
            conn_type: Optional[str] = None  # Optional connection type to filter by
        ) -> list[SSEConnection]:  # List of connections
        "Get connections, optionally filtered by type."
    
    def get_active_connections(
            self,
            conn_type: Optional[str] = None  # Optional connection type to filter by
        ) -> list[SSEConnection]:  # List of active connections
        "Get active connections."
    
    def get_stats(
            self
        ) -> Dict[str, Any]:  # Dictionary with connection statistics
        "Get registry statistics."

Streaming (streaming.ipynb)

SSE streaming utilities and helpers

Import

from cjm_fasthtml_sse.core.streaming import (
    StreamConfig,
    SSEStream,
    OOBStreamBuilder,
    sse_generator,
    create_sse_endpoint,
    stream_updates,
    create_throttled_stream
)

Functions

async def sse_generator(data_source: Union[AsyncGenerator, List, Callable],
                       interval: float = 0.5,  # Interval between items for list sources
                       heartbeat: float = 30.0,  # Heartbeat interval
                       transform: Optional[Callable] = None  # Optional function to transform data before sending
                       ) -> AsyncGenerator[str, None]
    "Create an SSE generator from various data sources."
def create_sse_endpoint(stream_fn: Callable,
                       content_type: str = "text/event-stream") -> Callable
    "Create an SSE endpoint from a streaming function."
async def stream_updates(source_queue: asyncio.Queue,
                        transform_fn: Optional[Callable] = None,  # Optional transformation function
                        config: Optional[StreamConfig] = None  # Stream configuration settings
                        ) -> AsyncGenerator[str, None]
    "Stream updates from an async queue."
def create_throttled_stream(source: AsyncGenerator,
                           min_interval: float = 0.1,  # Minimum interval between messages
                           max_buffer: int = 10  # Maximum number of messages to buffer
                           ) -> AsyncGenerator:  # Throttled async generator
    "Create a throttled stream to prevent overwhelming clients."

Classes

@dataclass
class StreamConfig:
    "Configuration for SSE streaming"
    
    heartbeat_interval: float = 30.0
    timeout: Optional[float]
    send_initial_message: bool = True
    initial_message: str = 'Connected'
    send_close_message: bool = True
    close_message: str = 'Connection closed'
    debug: bool = False
class SSEStream:
    def __init__(
        self,
        config: Optional[StreamConfig] = None  # Stream configuration
    )
    "Generic SSE stream handler"
    
    def __init__(
            self,
            config: Optional[StreamConfig] = None  # Stream configuration
        )
        "Initialize the SSE stream."
    
    async def stream(self,
                        data_source: Union[AsyncGenerator, Callable], # Async generator or callable that produces data
                        transform_fn: Optional[Callable] = None # Optional function to transform data before sending
                        ) -> AsyncGenerator[str, None]: # SSE formatted strings
        "Stream data from a source through SSE."
    
    def stop(self)
        "Stop the stream."
class OOBStreamBuilder:
    def __init__(self):
        """Initialize the OOB stream builder."""
        self.elements: List[Any] = []
    "Build SSE messages with OOB (Out-of-Band) swaps"
    
    def __init__(self):
            """Initialize the OOB stream builder."""
            self.elements: List[Any] = []
        "Initialize the OOB stream builder."
    
    def add_element(self,
                       element: Any,  # The element to add
                       target_id: Optional[str] = None,  # Target element ID for OOB swap
                       swap_mode: str = "innerHTML",  # Swap mode (innerHTML, outerHTML, beforeend, afterbegin, etc.)
                       wrap: bool = True  # If True and target_id is provided, wrap content in a Div with OOB attributes. If False, add OOB attributes directly to the element
                       ) -> 'OOBStreamBuilder':  # Self for chaining
        "Add an element with OOB swap configuration."
    
    def add_elements(
            self,
            elements: List[tuple]  # List of tuples: (element, target_id, swap_mode, wrap) or (element, target_id, swap_mode) or (element, target_id) or (element,)
        ) -> 'OOBStreamBuilder':  # Self for chaining
        "Add multiple elements with OOB configurations."
    
    def build(
            self
        ) -> Div:  # Div with all elements
        "Build the Div element with all elements."
    
    def clear(
            self
        ) -> 'OOBStreamBuilder':  # Self for chaining
        "Clear all elements."

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

cjm_fasthtml_sse-0.0.11.tar.gz (22.9 kB view details)

Uploaded Source

Built Distribution

If you're not sure about the file name format, learn more about wheel file names.

cjm_fasthtml_sse-0.0.11-py3-none-any.whl (21.3 kB view details)

Uploaded Python 3

File details

Details for the file cjm_fasthtml_sse-0.0.11.tar.gz.

File metadata

  • Download URL: cjm_fasthtml_sse-0.0.11.tar.gz
  • Upload date:
  • Size: 22.9 kB
  • Tags: Source
  • Uploaded using Trusted Publishing? No
  • Uploaded via: twine/6.1.0 CPython/3.11.13

File hashes

Hashes for cjm_fasthtml_sse-0.0.11.tar.gz
Algorithm Hash digest
SHA256 fa615c1c84240483511c719f8f07017bdc50d22dd0d340c922d6354b1dd87575
MD5 a1af5900c135e5a71d31b5d6174a93a8
BLAKE2b-256 76b452942f94770a6903d66e87db46044548d156b96112cec57c55ca60c31954

See more details on using hashes here.

File details

Details for the file cjm_fasthtml_sse-0.0.11-py3-none-any.whl.

File metadata

File hashes

Hashes for cjm_fasthtml_sse-0.0.11-py3-none-any.whl
Algorithm Hash digest
SHA256 2d7d4eed3efd8adee9499ea07093c0d52ca64a941bbda9d5dfa215c4b6fa2189
MD5 b9054d9b3cd7b3c6ba074c56d9380ad6
BLAKE2b-256 9680f6e8e1136413d6a05359d3b5454c7460f66cfc460131624a26e091af3ab9

See more details on using hashes here.

Supported by

AWS Cloud computing and Security Sponsor Datadog Monitoring Depot Continuous Integration Fastly CDN Google Download Analytics Pingdom Monitoring Sentry Error logging StatusPage Status page