Skip to main content

A lightweight message dispatcher based on SSE protocol data transfer objects format

Project description

The lightweight library for async messaging nobody expects.

Installation

pip install eric-sse

Changelog

0.3.2

  • Breaking change: now ThreadPoolListener callback only accepts Message as parameter
  • Fixed a consurrency bug in ThreadPoolListener

Features

  • Send to one listener and broadcast
  • SSE format was adopted by design, is order to make library suitable for such kind of model
  • Sockets
  • Callbacks
  • [Experimental] Threading support for large data processing

Possible applications

Trivia

Library name pretends to be a tribute to the following movie https://en.wikipedia.org/wiki/Looking_for_Eric

Entities

class eric_sse.entities.AbstractChannel(stream_delay_seconds: int = 0, retry_timeout_milliseconds: int = 5)

Base class for channels.

Provides functionalities for listeners and message delivery management. SSEChannel is the default implementation

add_listener() → MessageQueueListener

Add the default listener

broadcast(msg: Message)

Enqueue a message to all listeners

deliver_next(listener_id: str) → Message

Returns next message for given listener id. Raises a NoMessagesException if queue is empty

dispatch(listener_id: str, msg: Message)

Adds a message to listener’s queue

async message_stream(listener: MessageQueueListener) → AsyncIterable[dict]

Entry point for message streaming

In case of failure at channel resolution time, a special message with type=MESSAGE_TYPE_CLOSED is sent, and correspondant listener is stopped

register_listener(l: MessageQueueListener)

Adds a listener to channel

class eric_sse.entities.Message(type: str, payload: dict | list | str | int | float | None = None)

Models a message

It’s just a container of information identified by a type. For validation purposes you can override MessageQueueListener.on_message

class eric_sse.entities.MessageQueueListener

Base class for listeners.

Optionally you can override on_message method if you need to inject code at message delivery time.

on_message(msg: Message) → None

Event handler. It executes when a message is delivered to client

Prefab channels and listeners

class eric_sse.prefabs.DataProcessingChannel(stream_delay_seconds: int = 0, retry_timeout_milliseconds: int = 5)

Channel that invokes a callable in a Pool of threads

add_threaded_listener(callback: Callable[[Message], None], max_workers: int) → ThreadPoolListener

Adds a threaded listener

class eric_sse.prefabs.SSEChannel(stream_delay_seconds: int = 0, retry_timeout_milliseconds: int = 5)

SSE streaming channel.

See https://developer.mozilla.org/en-US/docs/Web/API/Server-sent_events/Using_server-sent_events#event_stream_format Currently, ‘id’ field is not supported.

class eric_sse.prefabs.ThreadPoolListener(callback: Callable[[Message], None], max_workers: int)

CURRENTLY NOT SUITABLE FOR PRODUCTION ENVIRONMENTS.

Listener intended for concurrent processing of data.

Relies on concurrent.futures.ThreadPoolExecutor.

MESSAGE_TYPE_CLOSED type is intended as end of stream. It should be considered as a reserved Message type.

Note that:

  • same callback is invoked, no matter of message type
  • callback execution order is not guaranteed (to be the same as the one while dispatching to channel)

on_message(msg: Message) → None

Event handler. It executes when a message is delivered to client

Prefab servers

class eric_sse.servers.ChannelContainer

Helper class for management of multiple SSE channels cases of use.

class eric_sse.servers.SocketServer(file_descriptor_path: str)

An implementation of a socket server that receives and broadcasts automatically all messages that receives

A static shortcut for starting a basic server is provided. See examples.

static start(file_descriptor_path: str)

Shortcut to start a server

Exceptions

exception eric_sse.exception.InvalidChannelException

exception eric_sse.exception.InvalidListenerException

exception eric_sse.exception.InvalidMessageFormat

exception eric_sse.exception.NoMessagesException

Raised when trying to fetch from an empty queue

Developers section

Update README.md scipt: update_docs.sh

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

eric_sse-0.3.3.1.tar.gz (6.6 kB view details)

Uploaded Source

Built Distribution

eric_sse-0.3.3.1-py3-none-any.whl (8.9 kB view details)

Uploaded Python 3

File details

Details for the file eric_sse-0.3.3.1.tar.gz.

File metadata

  • Download URL: eric_sse-0.3.3.1.tar.gz
  • Upload date:
  • Size: 6.6 kB
  • Tags: Source
  • Uploaded using Trusted Publishing? No
  • Uploaded via: poetry/1.8.3 CPython/3.10.12 Linux/5.15.0-122-generic

File hashes

Hashes for eric_sse-0.3.3.1.tar.gz
Algorithm Hash digest
SHA256 74e738a0021a2284e83465e63dd7c0e6c8bf6a3e3c055993b51e3898ff7a8670
MD5 cd70a44d0113a84a6581acea2625231b
BLAKE2b-256 d2508482431b6c685e6066b2d0959f1ce6653b2c670389b4bb52c78c3305a676

See more details on using hashes here.

File details

Details for the file eric_sse-0.3.3.1-py3-none-any.whl.

File metadata

  • Download URL: eric_sse-0.3.3.1-py3-none-any.whl
  • Upload date:
  • Size: 8.9 kB
  • Tags: Python 3
  • Uploaded using Trusted Publishing? No
  • Uploaded via: poetry/1.8.3 CPython/3.10.12 Linux/5.15.0-122-generic

File hashes

Hashes for eric_sse-0.3.3.1-py3-none-any.whl
Algorithm Hash digest
SHA256 6f4f4e8da1e6ccb6f90e2000cd2f114118023b28115519aa3814819d7efc26e4
MD5 e00ed49b77f7cde9cb70b96a759c3d7e
BLAKE2b-256 1680cbcbb668077285ea0f16a34d1eb1dc4bb7a39fc18b44253a2163bf918b18

See more details on using hashes here.

Supported by

AWS AWS Cloud computing and Security Sponsor Datadog Datadog Monitoring Fastly Fastly CDN Google Google Download Analytics Microsoft Microsoft PSF Sponsor Pingdom Pingdom Monitoring Sentry Sentry Error logging StatusPage StatusPage Status page