Skip to main content

A lightweight message dispatcher based on SSE protocol data transfer objects format

Project description

The lightweight library for async messaging nobody expects.

Installation

pip install eric-sse

Features

  • Send to one listener and broadcast
  • SSE format was adopted by design, is order to make library suitable for such kind of model
  • Sockets
  • Callbacks
  • [Experimental] Threading support for large data processing

Possible applications

Trivia

Library name pretends to be a tribute to the following movie https://en.wikipedia.org/wiki/Looking_for_Eric

Entities

class eric_sse.entities.AbstractChannel(stream_delay_seconds: int = 0)

Base class for channels.

Provides functionalities for listeners and message delivery management. SSEChannel is the default implementation

add_listener() → MessageQueueListener

Add the default listener

broadcast(msg: Message)

Enqueue a message to all listeners

deliver_next(listener_id: str) → Message

Returns next message for given listener id. Raises a NoMessagesException if queue is empty

dispatch(listener_id: str, msg: Message)

Adds a message to listener’s queue

async message_stream(listener: MessageQueueListener) → AsyncIterable[dict]

Entry point for message streaming

In case of failure at channel resolution time, a special message with type=MESSAGE_TYPE_CLOSED is sent, and correspondant listener is stopped

notify_end()

Broadcasts a MESSAGE_TYPE_CLOSED Message

register_listener(l: MessageQueueListener)

Adds a listener to channel

class eric_sse.entities.Message(type: str, payload: dict | list | str | int | float | None = None)

Models a message

It’s just a container of information identified by a type. For validation purposes you can override MessageQueueListener.on_message

class eric_sse.entities.MessageQueueListener

Base class for listeners.

Optionally you can override on_message method if you need to inject code at message delivery time.

on_message(msg: Message) → None

Event handler. It executes when a message is delivered to client

Prefab channels and listeners

class eric_sse.prefabs.DataProcessingChannel(max_workers: int, stream_delay_seconds: int = 0)

[Still experimental, it was never tested on some real use case] Channel intended for concurrent processing of data.

  • Parameters:
    • max_workers – Num og workers to use
    • stream_delay_seconds – Can be used to limit response rate of streamings. Only applies to message_stream calls.

Relies on concurrent.futures.ThreadPoolExecutor.

Just override adapt method to control output returned to clients

MESSAGE_TYPE_CLOSED type is intended as end of stream. It should be considered as a reserved Message type.

Note that callback execution order is not guaranteed

adapt(msg: Message) → Any

Models output returned to clients

async process_queue(l: MessageQueueListener) → AsyncIterable[dict]

Launches the processing of the given listener’s queue

class eric_sse.prefabs.SSEChannel(stream_delay_seconds: int = 0, retry_timeout_milliseconds: int = 5)

SSE streaming channel.

  • Parameters: retry_timeout_milliseconds – Used to indicate waiting time to clients

See https://developer.mozilla.org/en-US/docs/Web/API/Server-sent_events/Using_server-sent_events#event_stream_format

Currently, ‘id’ field is not supported.

Prefab servers

class eric_sse.servers.ChannelContainer

Helper class for management of multiple SSE channels cases of use.

class eric_sse.servers.SocketServer(file_descriptor_path: str)

An implementation of a socket server that acts as a controller to interact with library

A static shortcut for starting a basic server is provided. See examples. Accepted format: a plain (no nested) JSON with the following keys:

"c": "channel id" "v": "verb" "t": "message type" "p": "message payload"

Possible values of verb identifies a supported action:

"d" dispatch "b" broadcast "c" add listener "w" watch (opens a stream)

See examples

async static connect_callback(reader: StreamReader, writer: StreamWriter)

Integration with SocketServer.

See https://docs.python.org/3/library/asyncio-stream.html#asyncio.start_unix_server Handles low-lwvel communication and raw messages parsing

async shutdown(server: Server)

Graceful Shutdown

static start(file_descriptor_path: str)

Shortcut to start a server

Exceptions

exception eric_sse.exception.InvalidChannelException

exception eric_sse.exception.InvalidListenerException

exception eric_sse.exception.InvalidMessageFormat

exception eric_sse.exception.NoMessagesException

Raised when trying to fetch from an empty queue

Changelog

0.5.3

  • Restored behaviour of AbstractChannel.message_stream. Multiple streaming calls with same listener are allowed
  • Added locking to queue pop

0.5.2

Fixed close stream too early in AbstractChannel.message_stream

0.5.1

AbstractChannel.message_stream raises and InvalidListenerException if invoked more than one time with same listener

0.5.0.2

Fix: SSEChannel must accept stream_delay_seconds as constructor parameter

0.5.0

  • Removed Threaded listener class
  • Added DataProcessingChannel.process_queue

0.4.1.0

  • Breaking: Changed DataProcessingChannel adapter to suit with SSE

0.4.0

Breaking changes:

  • Rework of DataProcessingChannel, now extends AbstractChannel and its methods’ signatures have been updated
  • AbstractChannel.retry_timeout_milliseconds have been moved to SSEChannel

0.3.2

  • Breaking change: now ThreadPoolListener callback only accepts Message as parameter
  • Fixed a concurrency bug in ThreadPoolListener

Developers section

Update README.md scipt: update_docs.sh

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

eric_sse-0.5.3.tar.gz (9.0 kB view details)

Uploaded Source

Built Distribution

eric_sse-0.5.3-py3-none-any.whl (10.1 kB view details)

Uploaded Python 3

File details

Details for the file eric_sse-0.5.3.tar.gz.

File metadata

  • Download URL: eric_sse-0.5.3.tar.gz
  • Upload date:
  • Size: 9.0 kB
  • Tags: Source
  • Uploaded using Trusted Publishing? No
  • Uploaded via: poetry/1.8.3 CPython/3.10.12 Linux/5.15.0-122-generic

File hashes

Hashes for eric_sse-0.5.3.tar.gz
Algorithm Hash digest
SHA256 4643acee9fadaf7164f1f58d406aed15b98a46e3877d7987f1c8ed95b7cb2493
MD5 e1ae5d5495d1e8751f690132877e00a9
BLAKE2b-256 b4aebae53b9b632fda185bcf75803e4a4e96d04385a576dbcda87c4718f14288

See more details on using hashes here.

File details

Details for the file eric_sse-0.5.3-py3-none-any.whl.

File metadata

  • Download URL: eric_sse-0.5.3-py3-none-any.whl
  • Upload date:
  • Size: 10.1 kB
  • Tags: Python 3
  • Uploaded using Trusted Publishing? No
  • Uploaded via: poetry/1.8.3 CPython/3.10.12 Linux/5.15.0-122-generic

File hashes

Hashes for eric_sse-0.5.3-py3-none-any.whl
Algorithm Hash digest
SHA256 834ed50a91ca8163992d87cd096cd01b375a137d79feb87c58b92df18b3240c0
MD5 c01c199721ad410a59857083ef491bdb
BLAKE2b-256 0cd45d1a0927649a3bebaa72698a1d1cded056e375693cad2c0e2b3a04495da2

See more details on using hashes here.

Supported by

AWS AWS Cloud computing and Security Sponsor Datadog Datadog Monitoring Fastly Fastly CDN Google Google Download Analytics Microsoft Microsoft PSF Sponsor Pingdom Pingdom Monitoring Sentry Sentry Error logging StatusPage StatusPage Status page