Skip to main content

A minimalistic async event bus implementation

Project description

aioemit

aioemit

aioemit allows you to manage events asynchonosly and notify subscribers when those events occur. It provides a simple way to implement event bus pattern in an event driven architecture application.

Installation

pip install aioemit

Usage

Creating Events

The Event class represents an event with a specified event type and optional data. You can create an event by initializing an instance of the Event class with the event type and data (if any).

event = Event("example_event", "example_data")

Creating an Emitter

To use the event emitter, create an instance of the Emitter class.

emitter = Emitter()

Subscribing to Events

To subscribe to events, use the subscribe method of the Emitter class. Pass the event type and an observer function that will be called when the event is emitted.

def event_observer(event):
    # Handle the event
    print("Received event:", event)

emitter.subscribe("example_event", event_observer)

Unsubscribing from Events

If you no longer want to receive notifications for a specific event, you can unsubscribe from it using the unsubscribe method. Provide the event type and the observer function that you want to remove.

emitter.unsubscribe("example_event", event_observer)

Emitting Events

To emit an event and notify all subscribers, use the emit method of the Emitter class. Pass the event you want to emit.

event = Event("example_event", "example_data")
await emitter.emit(event)

The emit method will asynchronously call all the subscribed observer functions that are associated with the event type.

License

This project is licensed under the MIT License. Feel free to use, modify, and distribute the code as per the terms of the license.

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

aioemit-0.0.1.tar.gz (4.0 kB view hashes)

Uploaded Source

Built Distribution

aioemit-0.0.1-py3-none-any.whl (5.0 kB view hashes)

Uploaded Python 3

Supported by

AWS AWS Cloud computing and Security Sponsor Datadog Datadog Monitoring Fastly Fastly CDN Google Google Download Analytics Microsoft Microsoft PSF Sponsor Pingdom Pingdom Monitoring Sentry Sentry Error logging StatusPage StatusPage Status page