Real-time Server-Sent Events (SSE) and HTMX integration library for FastHTML with cross-tab synchronization support.
Project description
cjm-fasthtml-sse
Install
pip install cjm_fasthtml_sse
Project Structure
nbs/
└── core/ (3)
├── broadcast.ipynb # Broadcasting infrastructure for SSE cross-tab synchronization
├── connections.ipynb # Connection management for SSE clients
└── streaming.ipynb # SSE streaming utilities and helpers
Total: 3 notebooks across 4 directories
Module Dependencies
graph LR
core_broadcast[core.broadcast<br/>Broadcast]
core_connections[core.connections<br/>Connections]
core_streaming[core.streaming<br/>Streaming]
No cross-module dependencies detected.
CLI Reference
No CLI commands found in this project.
Module Overview
Detailed documentation for each module in the project:
Broadcast (broadcast.ipynb)
Broadcasting infrastructure for SSE cross-tab synchronization
Import
from cjm_fasthtml_sse.core.broadcast import (
BroadcastMessage,
BroadcastManager,
create_broadcast_endpoint,
create_broadcast_handler,
setup_broadcast_routes
)
Functions
async def create_broadcast_endpoint(manager: BroadcastManager,
connection_id: Optional[str] = None, # Optional connection ID
heartbeat_interval: float = 30.0, # Interval for sending heartbeat messages
send_history: bool = False, # Whether to send recent history on connection
history_limit: int = 10) -> EventStream
"Create an SSE endpoint for broadcasting."
def create_broadcast_handler(manager: BroadcastManager,
element_builder: Optional[Callable] = None)
"Create a broadcast handler function that can be used with FastHTML routes."
def setup_broadcast_routes(app,
manager: BroadcastManager, # The broadcast manager instance
prefix: str = "/sse", # URL prefix for SSE endpoints
element_builder: Optional[Callable] = None)
"Setup broadcast routes on a FastHTML app."
Classes
@dataclass
class BroadcastMessage:
"Standard broadcast message format for SSE communication"
type: str
data: Dict[str, Any]
timestamp: str = field(...)
metadata: Optional[Dict[str, Any]]
def to_dict(
self
) -> Dict[str, Any]: # TODO: Add return description
"Convert message to dictionary format"
def to_json(
self
) -> str: # TODO: Add return description
"Convert message to JSON string"
def to_sse(
self,
event_type: Optional[str] = None # TODO: Add description
) -> str: # TODO: Add return description
"Convert to SSE message format using FastHTML's sse_message"
class BroadcastManager:
def __init__(self,
max_queue_size: int = 100, # TODO: Add description
history_limit: int = 50, # TODO: Add description
queue_timeout: float = 0.1, # TODO: Add description
debug: bool = False)
"Manages SSE broadcast connections across multiple tabs/clients"
def __init__(self,
max_queue_size: int = 100, # TODO: Add description
history_limit: int = 50, # TODO: Add description
queue_timeout: float = 0.1, # TODO: Add description
debug: bool = False)
"Initialize the broadcast manager.
Args:
max_queue_size: Maximum size for each connection's message queue
history_limit: Number of recent messages to keep in history
queue_timeout: Timeout for queue operations in seconds
debug: Enable debug logging"
async def register(self,
connection_id: Optional[str] = None, # TODO: Add description
metadata: Optional[Dict[str, Any]] = None) -> tuple[str, asyncio.Queue]
"Register a new connection and return its queue.
Args:
connection_id: Optional ID for the connection (auto-generated if not provided)
metadata: Optional metadata for the connection
Returns:
Tuple of (connection_id, queue)"
async def unregister(
self,
connection_id: str # TODO: Add description
)
"Unregister a connection."
async def broadcast(self,
message_type: str, # TODO: Add description
data: Dict[str, Any],
metadata: Optional[Dict[str, Any]] = None,
exclude: Optional[Set[str]] = None) -> int
"Broadcast a message to all connected clients.
Args:
message_type: Type of the message
data: Message data
metadata: Optional metadata
exclude: Set of connection IDs to exclude from broadcast
Returns:
Number of successful broadcasts"
async def send_to(self,
connection_id: str, # TODO: Add description
message_type: str, # TODO: Add description
data: Dict[str, Any],
metadata: Optional[Dict[str, Any]] = None) -> bool
"Send a message to a specific connection.
Args:
connection_id: Target connection ID
message_type: Type of the message
data: Message data
metadata: Optional metadata
Returns:
True if successful, False otherwise"
def get_connection_count(
self
) -> int: # TODO: Add return description
"Get the number of active connections."
def get_history(
self,
limit: Optional[int] = None # TODO: Add description
) -> list[BroadcastMessage]: # TODO: Add return description
"Get broadcast history."
Connections (connections.ipynb)
Connection management for SSE clients
Import
from cjm_fasthtml_sse.core.connections import (
ConnectionState,
SSEConnection,
ConnectionRegistry,
create_sse_element,
cleanup_sse_on_unload,
create_reconnection_script,
create_connection_manager_script
)
Functions
def create_sse_element(endpoint: str,
element_id: Optional[str] = None, # Optional element ID
swap_strategy: str = "message", # HTMX swap strategy (message, innerHTML, outerHTML, etc.)
hidden: bool = False, # Whether to hide the element **attrs: Additional attributes for the element
**attrs) -> Div
"Create an SSE-enabled HTML element."
def cleanup_sse_on_unload(
) -> Script: # Script element for cleanup
"Create a script to clean up SSE connections on page unload."
def create_reconnection_script(check_interval: int = 5000,
max_retries: int = 5, # Maximum number of reconnection attempts
debug: bool = False) -> Script
"Create a script for automatic SSE reconnection."
def create_connection_manager_script(registry_endpoint: str = "/sse/connections",
update_interval: int = 10000) -> Script
"Create a script to manage and monitor connections."
Classes
class ConnectionState(Enum):
"States for SSE connections"
@dataclass
class SSEConnection:
"Represents a single SSE connection"
connection_id: str
queue: asyncio.Queue
connection_type: str = 'global'
state: ConnectionState = ConnectionState.CONNECTING
metadata: Dict[str, Any] = field(...)
created_at: datetime = field(...)
last_activity: datetime = field(...)
message_count: int = 0
async def send(
self,
data: Any, # TODO: Add description
timeout: float = 1.0 # TODO: Add description
) -> bool: # TODO: Add return description
"Send data through the connection queue.
Args:
data: Data to send
timeout: Timeout for the send operation
Returns:
True if successful, False otherwise"
async def heartbeat(
self
) -> str: # TODO: Add return description
"Generate a heartbeat message."
def close(self):
"""Mark the connection as closed."""
self.state = ConnectionState.DISCONNECTED
def is_active(
self
) -> bool: # TODO: Add return description
"Mark the connection as closed."
def is_active(
self
) -> bool: # TODO: Add return description
"Check if connection is active."
class ConnectionRegistry:
def __init__(
self,
debug: bool = False # TODO: Add description
)
"Registry to track and manage SSE connections"
def __init__(
self,
debug: bool = False # TODO: Add description
)
"Initialize the connection registry.
Args:
debug: Enable debug logging"
async def add_connection(self,
conn_id: Optional[str] = None, # TODO: Add description
conn_type: str = "global", # TODO: Add description
queue_size: int = 100, # TODO: Add description
metadata: Optional[Dict[str, Any]] = None) -> SSEConnection
"Add a new connection to the registry.
Args:
conn_id: Optional connection ID (auto-generated if not provided)
conn_type: Type of connection (e.g., 'global', 'job', 'user')
queue_size: Size of the message queue
metadata: Optional metadata for the connection
Returns:
The created SSEConnection"
async def remove_connection(
self,
conn_id: str # TODO: Add description
)
"Remove a connection from the registry.
Args:
conn_id: Connection ID to remove"
def get_connection(
self,
conn_id: str # TODO: Add description
) -> Optional[SSEConnection]: # TODO: Add return description
"Get a specific connection.
Args:
conn_id: Connection ID
Returns:
The connection if found, None otherwise"
def get_connections(
self,
conn_type: Optional[str] = None # TODO: Add description
) -> list[SSEConnection]: # TODO: Add return description
"Get connections, optionally filtered by type.
Args:
conn_type: Optional connection type to filter by
Returns:
List of connections"
def get_active_connections(
self,
conn_type: Optional[str] = None # TODO: Add description
) -> list[SSEConnection]: # TODO: Add return description
"Get active connections.
Args:
conn_type: Optional connection type to filter by
Returns:
List of active connections"
def get_stats(
self
) -> Dict[str, Any]: # TODO: Add return description
"Get registry statistics.
Returns:
Dictionary with connection statistics"
Streaming (streaming.ipynb)
SSE streaming utilities and helpers
Import
from cjm_fasthtml_sse.core.streaming import (
StreamConfig,
SSEStream,
OOBStreamBuilder,
sse_generator,
create_sse_endpoint,
stream_updates,
create_progress_stream,
create_throttled_stream
)
Functions
async def sse_generator(data_source: Union[AsyncGenerator, List, Callable],
interval: float = 0.5, # Interval between items for list sources
heartbeat: float = 30.0, # Heartbeat interval
transform: Optional[Callable] = None) -> AsyncGenerator[str, None]
"Create an SSE generator from various data sources."
def create_sse_endpoint(stream_fn: Callable,
content_type: str = "text/event-stream") -> Callable
"Create an SSE endpoint from a streaming function."
async def stream_updates(source_queue: asyncio.Queue,
transform_fn: Optional[Callable] = None, # Optional transformation function
config: Optional[StreamConfig] = None) -> AsyncGenerator[str, None]
"Stream updates from an async queue."
def create_progress_stream(job_id: str,
progress_source: Callable, # Callable that returns progress data
interval: float = 0.5) -> Callable
"Create a progress streaming endpoint."
def create_throttled_stream(source: AsyncGenerator,
min_interval: float = 0.1, # Minimum interval between messages
max_buffer: int = 10) -> AsyncGenerator
"Create a throttled stream to prevent overwhelming clients."
Classes
@dataclass
class StreamConfig:
"Configuration for SSE streaming"
heartbeat_interval: float = 30.0
timeout: Optional[float]
send_initial_message: bool = True
initial_message: str = 'Connected'
send_close_message: bool = True
close_message: str = 'Connection closed'
debug: bool = False
class SSEStream:
def __init__(
self,
config: Optional[StreamConfig] = None # TODO: Add description
)
"Generic SSE stream handler"
def __init__(
self,
config: Optional[StreamConfig] = None # TODO: Add description
)
"Initialize the SSE stream.
Args:
config: Stream configuration"
async def stream(self,
data_source: Union[AsyncGenerator, Callable],
transform_fn: Optional[Callable] = None) -> AsyncGenerator[str, None]
"Stream data from a source through SSE.
Args:
data_source: Async generator or callable that produces data
transform_fn: Optional function to transform data before sending
Yields:
SSE formatted strings"
def stop(self)
"Stop the stream."
class OOBStreamBuilder:
def __init__(self):
"""Initialize the OOB stream builder."""
self.elements: List[Any] = []
"Build SSE messages with OOB (Out-of-Band) swaps"
def __init__(self):
"""Initialize the OOB stream builder."""
self.elements: List[Any] = []
"Initialize the OOB stream builder."
def add_element(self,
element: Any,
target_id: Optional[str] = None,
swap_mode: str = "innerHTML",
wrap: bool = True) -> 'OOBStreamBuilder'
"Add an element with OOB swap configuration.
Args:
element: The element to add
target_id: Target element ID for OOB swap
swap_mode: Swap mode (innerHTML, outerHTML, beforeend, afterbegin, etc.)
wrap: If True and target_id is provided, wrap content in a Div with OOB attributes.
If False, add OOB attributes directly to the element.
Returns:
Self for chaining"
def add_elements(self, elements: List[tuple]) -> 'OOBStreamBuilder':
"""Add multiple elements with OOB configurations.
Args:
elements: List of tuples: (element, target_id, swap_mode, wrap) or
(element, target_id, swap_mode) or (element, target_id) or (element,)
Returns:
Self for chaining
"""
for item in elements
"Add multiple elements with OOB configurations.
Args:
elements: List of tuples: (element, target_id, swap_mode, wrap) or
(element, target_id, swap_mode) or (element, target_id) or (element,)
Returns:
Self for chaining"
def build(self) -> str:
"""Build the SSE message with all elements.
Returns:
SSE formatted message
"""
if not self.elements
"Build the SSE message with all elements.
Returns:
SSE formatted message"
def clear(self) -> 'OOBStreamBuilder'
"Clear all elements.
Returns:
Self for chaining"
Project details
Release history Release notifications | RSS feed
Download files
Download the file for your platform. If you're not sure which to choose, learn more about installing packages.
Source Distribution
Built Distribution
Filter files by name, interpreter, ABI, and platform.
If you're not sure about the file name format, learn more about wheel file names.
Copy a direct link to the current filters
File details
Details for the file cjm_fasthtml_sse-0.0.7.tar.gz.
File metadata
- Download URL: cjm_fasthtml_sse-0.0.7.tar.gz
- Upload date:
- Size: 23.7 kB
- Tags: Source
- Uploaded using Trusted Publishing? No
- Uploaded via: twine/6.1.0 CPython/3.11.13
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
990a3fbfed5c86fb6c9e273c525b18dd62a572ec5623bcce20a588bac9ae918c
|
|
| MD5 |
b08e70f4aa8551cb337632947937f842
|
|
| BLAKE2b-256 |
88ee702b475b191709184298302b1fa72b0738e52f75c851bc25e6153f0b1436
|
File details
Details for the file cjm_fasthtml_sse-0.0.7-py3-none-any.whl.
File metadata
- Download URL: cjm_fasthtml_sse-0.0.7-py3-none-any.whl
- Upload date:
- Size: 21.7 kB
- Tags: Python 3
- Uploaded using Trusted Publishing? No
- Uploaded via: twine/6.1.0 CPython/3.11.13
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
f90fa71413741ca67f380360309e7f84484febf50f6d7ae09c2417a31ba2d76b
|
|
| MD5 |
e0b0a6b27945ea9d4b07f007fd1cb9ae
|
|
| BLAKE2b-256 |
3c5e140ea1683ebaee433ba63254f3e2ca748a4705f01f4fb8a62f4276daa6a4
|