Aspyx event framework
Project description
aspyx
Eventing
Introduction
As we already covered the foundation for microservices, only one aspect is missing for enterprise software: Asynchronous communication involving queues.
It's pretty simple to have a working example ready in Python using some of the low-level libraries related to stomp or amqp, but the missing thing always is an abstraction layer on top to hide the technical details.
From a programmer perspective i essentially need a payload object, a listener and some means to define the routing logic. I really shouldn't have to bother with connections, messages, serialization and deserialization logic, threads, exception handling, etc.
Let's look at a simple example, what the result looks like:
Example:
@dataclass
@event()
class HelloEvent:
world: str
@event_listener(HelloEvent)
class HelloEventListener(EventListener[HelloEvent]):
# constructor
def __init__(self):
...
# implement
def on(self, event: HelloEvent):
...
environment = ...
event_manager = environment.get(EventManager)
event_manager.send_event(HelloEvent("world"))
Not bad, huh?
Features
- Support for any pydantic model or dataclass as events
- Pluggable transport protocol, currently supporting AMQP and Stomp.
- Possibility to pass headers to events
- Event interceptors on the sending and receiving side ( e.g. session capturing )
Installation
Just install from PyPI with
pip install aspyx-event
The library is tested with all Python version >= 3.9
API
Let's look at the details:
Event
An event is a dataclass or pydantic model, annotated with @event()
Parameters are:
name=""name of the event, if not specified the class name is used.durable=FalseifTrue, the provider will try to create a persistent queue.
The name attribute will be used as a queue name!
EventListener
An event listener derives from the base-class EventListener with a generic argument specifying the handled event type and
are decorated with @event_listener(...).
Parameters are:
name=""name of the listener, if not specified the class name is used.per_process=FalseifTrue, the event will be dispatched all identical listeners, that run inside a cluster.
Listeners are injectable objects. The on method can be sync or async!
EnvelopePipeline
An envelope pipeline is something like an interceptor that both covers the sending and receiving side.
It is declared as:
class EnvelopePipeline(ABC):
@abstractmethod
def send(self, envelope: EventManager.Envelope, event_descriptor: EventManager.EventDescriptor):
pass
@abstractmethod
def handle(self, envelope: EventManager.Envelope, event_listener_descriptor: EventManager.EventListenerDescriptor):
pass
with envelope being a wrapper around the event, with additional possibilities to set and retrieve header information.
class Envelope(ABC):
@abstractmethod
def get_body(self) -> str:
pass
@abstractmethod
def set(self, key: str, value: str):
pass
@abstractmethod
def get(self, key: str) -> str:
pass
The purpose is to give the programmer the chance to set and retrieve meta data to an event. Think of passing and retrieving a session id, that will be used to establish a session context.
Concrete pipelines are decorate with @envelope_pipeline() and are regular injectable objects.
EventManager
The central class is the EventManager that offers the single API method
def send_event(self, event: Any)-> None
The constructor expects an argument provider: EnvironmentManager.Provider that contains the technical queuing implementation.
The second argument exception_manager is an ExceptionManager thet wil be used to handle all internal exceptions.
Example:
@module(imports=[EventModule])
class Module:
# handlers
@catch()
def handle(self, exception: Exception):
... # log at least...
# internal
def create_exception_manager(self):
exception_manager = ExceptionManager()
exception_manager.collect_handlers(self)
return exception_manager
@create()
def create_event_manager(self) -> EventManager:
return EventManager(LocalProvider(), exception_manager=self.create_exception_manager())
Providers
A provider encapsulates the technical queuing details based on existing 3rd party libs.
As different technologies offer different possibilities, some of the logical parameters -durable, per_process - may not be considered!
AMQP
The class AMQProvider is based on the proton library utilizing the AMQ protocol.
The constructor accepts:
server_name: stra unique name identifying a specific server. Typically a name including the host and some port.host="localhost"the url of teh amq server ( e.g. Artemis )port=61616the port of the serveruser = ""the user namepassword = ""the password
If we think of an Artemis address model, this is how it is applied:
- addresses are event names
- a queue name is defined as
<event-name>::<listener-name>[-<server-name>]
The server name is appended, if per_process is True, guaranteeing the even in clusters every server receives the corresponding events!
The implementation has been tested with Artemis. Other server such as RabbitMQ, Azure Service Bus, Red Hat AMQ and Amazon MQ could work as well. Always make sure, that the serves are configured to auto-create queues!
Version History
- 0.9.0: Initial version
Project details
Download files
Download the file for your platform. If you're not sure which to choose, learn more about installing packages.
Source Distribution
Built Distribution
Filter files by name, interpreter, ABI, and platform.
If you're not sure about the file name format, learn more about wheel file names.
Copy a direct link to the current filters
File details
Details for the file aspyx_event-0.9.8.tar.gz.
File metadata
- Download URL: aspyx_event-0.9.8.tar.gz
- Upload date:
- Size: 21.9 kB
- Tags: Source
- Uploaded using Trusted Publishing? No
- Uploaded via: twine/6.2.0 CPython/3.11.14
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
22e110839b9736471509e9e8bf980b161b640027865c65392c5d9ccff0ed5aa3
|
|
| MD5 |
3d263fa33d21524f3f2ec9e2e7c65ea8
|
|
| BLAKE2b-256 |
ab96d100cfc318430162a121059007088ab75e068b584f5683abe8a122817f33
|
File details
Details for the file aspyx_event-0.9.8-py3-none-any.whl.
File metadata
- Download URL: aspyx_event-0.9.8-py3-none-any.whl
- Upload date:
- Size: 20.1 kB
- Tags: Python 3
- Uploaded using Trusted Publishing? No
- Uploaded via: twine/6.2.0 CPython/3.11.14
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
460fa14fd882d48459f197a731c516b8e4357a00266005c466fdb77f58629eba
|
|
| MD5 |
64146eab49ffe8eabd9bfd93888dd457
|
|
| BLAKE2b-256 |
26e50f7cdb1da5cab53d750b39ff27b6988dcb92f5992f31c2666212176d833c
|