Skip to main content

Aspyx event framework

Project description

aspyx

Pylint Build Status Python Versions License coverage PyPI Docs

image

Eventing

Introduction

As we already covered the foundation for microservices, only one aspect is missing for enterprise software: Asynchronous communication involving queues.

It's pretty simple to have a working example ready in Python using some of the low-level libraries related to stomp or amqp, but the missing thing always is an abstraction layer on top to hide the technical details.

From a programmer perspective i essentially need a payload object, a listener and some means to define the routing logic. I really shouldn't have to bother with connections, messages, serialization and deserialization logic, threads, exception handling, etc.

Let's look at a simple example, what the result looks like:

Example:

@dataclass
@event()
class HelloEvent:
    world: str

@event_listener(HelloEvent)
class HelloEventListener(EventListener[HelloEvent]):
    # constructor

    def __init__(self):
        ...

    # implement

    def on(self, event: HelloEvent):
       ...

environment = ...
event_manager = environment.get(EventManager)
event_manager.send_event(HelloEvent("world"))

Not bad, huh?

Features

  • Support for any pydantic model or dataclass as events
  • Pluggable transport protocol, currently supporting AMQP and Stomp.
  • Possibility to pass headers to events
  • Event interceptors on the sending and receiving side ( e.g. session capturing )

Installation

Just install from PyPI with

pip install aspyx-event

The library is tested with all Python version >= 3.9

API

Let's look at the details:

Event

An event is a dataclass or pydantic model, annotated with @event()

Parameters are:

  • name="" name of the event, if not specified the class name is used.
  • durable=False if True, the provider will try to create a persistent queue.

The name attribute will be used as a queue name!

EventListener

An event listener derives from the base-class EventListener with a generic argument specifying the handled event type and are decorated with @event_listener(...).

Parameters are:

  • name="" name of the listener, if not specified the class name is used.
  • per_process=False if True, the event will be dispatched all identical listeners, that run inside a cluster.

Listeners are injectable objects. The on method can be sync or async!

EnvelopePipeline

An envelope pipeline is something like an interceptor that both covers the sending and receiving side.

It is declared as:

class EnvelopePipeline(ABC):
    @abstractmethod
    def send(self, envelope: EventManager.Envelope, event_descriptor: EventManager.EventDescriptor):
        pass

    @abstractmethod
    def handle(self, envelope: EventManager.Envelope, event_listener_descriptor: EventManager.EventListenerDescriptor):
        pass

with envelope being a wrapper around the event, with additional possibilities to set and retrieve header information.

class Envelope(ABC):
    @abstractmethod
    def get_body(self) -> str:
        pass

    @abstractmethod
    def set(self, key: str, value: str):
        pass

    @abstractmethod
    def get(self, key: str) -> str:
        pass

The purpose is to give the programmer the chance to set and retrieve meta data to an event. Think of passing and retrieving a session id, that will be used to establish a session context.

Concrete pipelines are decorate with @envelope_pipeline() and are regular injectable objects.

EventManager

The central class is the EventManager that offers the single API method

def send_event(self, event: Any)-> None

The constructor expects an argument provider: EnvironmentManager.Provider that contains the technical queuing implementation.

The second argument exception_manager is an ExceptionManager thet wil be used to handle all internal exceptions.

Example:

@module(imports=[EventModule])
class Module:
    # handlers

    @catch()
    def handle(self, exception: Exception):
        ... # log at least...

    # internal

    def create_exception_manager(self):
        exception_manager = ExceptionManager()

        exception_manager.collect_handlers(self)

        return exception_manager

    @create()
    def create_event_manager(self) -> EventManager:
        return EventManager(LocalProvider(), exception_manager=self.create_exception_manager())

Providers

A provider encapsulates the technical queuing details based on existing 3rd party libs. As different technologies offer different possibilities, some of the logical parameters -durable, per_process - may not be considered!

AMQP

The class AMQProvider is based on the proton library utilizing the AMQ protocol.

The constructor accepts:

  • server_name: str a unique name identifying a specific server. Typically a name including the host and some port.
  • host="localhost" the url of teh amq server ( e.g. Artemis )
  • port=61616 the port of the server
  • user = "" the user name
  • password = "" the password

If we think of an Artemis address model, this is how it is applied:

  • addresses are event names
  • a queue name is defined as <event-name>::<listener-name>[-<server-name>]

The server name is appended, if per_process is True, guaranteeing the even in clusters every server receives the corresponding events!

The implementation has been tested with Artemis. Other server such as RabbitMQ, Azure Service Bus, Red Hat AMQ and Amazon MQ could work as well. Always make sure, that the serves are configured to auto-create queues!

Version History

  • 0.9.0: Initial version

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

aspyx_event-0.9.4.tar.gz (19.1 kB view details)

Uploaded Source

Built Distribution

If you're not sure about the file name format, learn more about wheel file names.

aspyx_event-0.9.4-py3-none-any.whl (19.4 kB view details)

Uploaded Python 3

File details

Details for the file aspyx_event-0.9.4.tar.gz.

File metadata

  • Download URL: aspyx_event-0.9.4.tar.gz
  • Upload date:
  • Size: 19.1 kB
  • Tags: Source
  • Uploaded using Trusted Publishing? No
  • Uploaded via: twine/6.2.0 CPython/3.11.14

File hashes

Hashes for aspyx_event-0.9.4.tar.gz
Algorithm Hash digest
SHA256 4cf74b799ce23cbab0052a45b025932486ba4325d02ed3452b64ae652025f372
MD5 f367d05137449f050a38fec051b41ba1
BLAKE2b-256 c9965dae3c8a9ab57a1245638421387d2505d35568538563105ef458824e93eb

See more details on using hashes here.

File details

Details for the file aspyx_event-0.9.4-py3-none-any.whl.

File metadata

  • Download URL: aspyx_event-0.9.4-py3-none-any.whl
  • Upload date:
  • Size: 19.4 kB
  • Tags: Python 3
  • Uploaded using Trusted Publishing? No
  • Uploaded via: twine/6.2.0 CPython/3.11.14

File hashes

Hashes for aspyx_event-0.9.4-py3-none-any.whl
Algorithm Hash digest
SHA256 da5fbb66dc4e879552e6bf431d809e0b4507e9195b898b5caa9083ead4cfceb2
MD5 35950f116ecb5062eac092fe54798298
BLAKE2b-256 d949766cead4c8893fb1cc2b66dfd4e5207018873666083ecde5b6d31a26a274

See more details on using hashes here.

Supported by

AWS Cloud computing and Security Sponsor Datadog Monitoring Depot Continuous Integration Fastly CDN Google Download Analytics Pingdom Monitoring Sentry Error logging StatusPage Status page