A lightweight message dispatcher based on SSE protocol data transfer objects format
Project description
The lightweight library for async messaging nobody expects.
Installation
pip install eric-sse
Features
- Send to one listener and broadcast
- SSE format was adopted by design, is order to make library suitable for such kind of model
- Sockets
- Callbacks
- [Experimental] Threading support for large data processing
Possible applications
- Message delivery mechanisms based on SSE
- Message queue processing (logging, etc)
- See https://github.com/laxertu/eric-api
Trivia
Library name pretends to be a tribute to the following movie https://en.wikipedia.org/wiki/Looking_for_Eric
Entities
class eric_sse.entities.AbstractChannel(stream_delay_seconds: int = 0)
Base class for channels.
Provides functionalities for listeners and message delivery management. SSEChannel is the default implementation
add_listener() → MessageQueueListener
Add the default listener
broadcast(msg: Message)
Enqueue a message to all listeners
deliver_next(listener_id: str) → Message
Returns next message for given listener id. Raises a NoMessagesException if queue is empty
dispatch(listener_id: str, msg: Message)
Adds a message to listener’s queue
async message_stream(listener: MessageQueueListener) → AsyncIterable[dict]
Entry point for message streaming
In case of failure at channel resolution time, a special message with type=MESSAGE_TYPE_CLOSED is sent, and correspondant listener is stopped
notify_end()
Broadcasts a MESSAGE_TYPE_CLOSED Message
register_listener(l: MessageQueueListener)
Adds a listener to channel
class eric_sse.entities.Message(type: str, payload: dict | list | str | int | float | None = None)
Models a message
It’s just a container of information identified by a type. For validation purposes you can override MessageQueueListener.on_message
class eric_sse.entities.MessageQueueListener
Base class for listeners.
Optionally you can override on_message method if you need to inject code at message delivery time.
on_message(msg: Message) → None
Event handler. It executes when a message is delivered to client
Prefab channels and listeners
class eric_sse.prefabs.DataProcessingChannel(max_workers: int, stream_delay_seconds: int = 0)
[Still experimental, it was never tested on some real use case] Channel intended for concurrent processing of data.
- Parameters:
- max_workers – Num og workers to use
- stream_delay_seconds – Can be used to limit response rate of streamings. Only applies to message_stream calls.
Relies on concurrent.futures.ThreadPoolExecutor.
Just override adapt method to control output returned to clients
MESSAGE_TYPE_CLOSED type is intended as end of stream. It should be considered as a reserved Message type.
Note that callback execution order is not guaranteed
adapt(msg: Message) → Any
Models output returned to clients
async process_queue(l: MessageQueueListener) → AsyncIterable[dict]
Launches the processing of the given listener’s queue
class eric_sse.prefabs.SSEChannel(stream_delay_seconds: int = 0, retry_timeout_milliseconds: int = 5)
SSE streaming channel.
- Parameters: retry_timeout_milliseconds – Used to indicate waiting time to clients
Currently, ‘id’ field is not supported.
Prefab servers
class eric_sse.servers.ChannelContainer
Helper class for management of multiple SSE channels cases of use.
class eric_sse.servers.SocketServer(file_descriptor_path: str)
An implementation of a socket server that acts as a controller to interact with library
A static shortcut for starting a basic server is provided. See examples. Accepted format: a plain (no nested) JSON with the following keys:
"c": "channel id" "v": "verb" "t": "message type" "p": "message payload"
Possible values of verb identifies a supported action:
"d" dispatch "b" broadcast "c" add listener "w" watch (opens a stream)
See examples
async static connect_callback(reader: StreamReader, writer: StreamWriter)
Integration with SocketServer.
See https://docs.python.org/3/library/asyncio-stream.html#asyncio.start_unix_server Handles low-lwvel communication and raw messages parsing
async shutdown(server: Server)
Graceful Shutdown
static start(file_descriptor_path: str)
Shortcut to start a server
Exceptions
exception eric_sse.exception.InvalidChannelException
exception eric_sse.exception.InvalidListenerException
exception eric_sse.exception.InvalidMessageFormat
exception eric_sse.exception.NoMessagesException
Raised when trying to fetch from an empty queue
Changelog
0.5.0.2
Fix: SSEChannel must accept stream_delay_seconds as constructor parameter
0.5.0
- Removed Threaded listener class
- Added DataProcessingChannel.process_queue
0.4.1.0
- Breaking: Changed DataProcessingChannel adapter to suit with SSE
0.4.0
Breaking changes:
- Rework of DataProcessingChannel, now extends AbstractChannel and its methods’ signatures have been updated
- AbstractChannel.retry_timeout_milliseconds have been moved to SSEChannel
0.3.2
- Breaking change: now ThreadPoolListener callback only accepts Message as parameter
- Fixed a concurrency bug in ThreadPoolListener
Developers section
Update README.md scipt: update_docs.sh
Project details
Release history Release notifications | RSS feed
Download files
Download the file for your platform. If you're not sure which to choose, learn more about installing packages.
Source Distribution
Built Distribution
File details
Details for the file eric_sse-0.5.0.3.tar.gz
.
File metadata
- Download URL: eric_sse-0.5.0.3.tar.gz
- Upload date:
- Size: 8.6 kB
- Tags: Source
- Uploaded using Trusted Publishing? No
- Uploaded via: poetry/1.8.3 CPython/3.10.12 Linux/5.15.0-122-generic
File hashes
Algorithm | Hash digest | |
---|---|---|
SHA256 | 1df646a936f9f352f91b2d04625101e42dbb836c926905951134644435697e1f |
|
MD5 | 04b62864b4c3f45e141ad7fde75de5fb |
|
BLAKE2b-256 | 0e7e8cfea3086b3b4be77147d74a6fbc4efae2a1dc1261e44528607d78157026 |
File details
Details for the file eric_sse-0.5.0.3-py3-none-any.whl
.
File metadata
- Download URL: eric_sse-0.5.0.3-py3-none-any.whl
- Upload date:
- Size: 9.9 kB
- Tags: Python 3
- Uploaded using Trusted Publishing? No
- Uploaded via: poetry/1.8.3 CPython/3.10.12 Linux/5.15.0-122-generic
File hashes
Algorithm | Hash digest | |
---|---|---|
SHA256 | 586d34e22ca480da95078c044c74271468b01c3423e9d5ef79749f36cb2a56f2 |
|
MD5 | 108511c9c3ed0ea768cd612168718c30 |
|
BLAKE2b-256 | f005573e3ca50d8ae3d909756aa13fc93d46e88fac43059f2fc9fa5e3d0fed03 |