Skip to main content

Event management system for Python with support for Threading and Multiprocessing for task running.

Project description

PyEventManager

Read the Docs!

PyEventManager is a simple event-based routing package allowing code to be structured in a pseudo-decentralized manner.

Instead of calling functions directly, you can emit an event and have one or more functions (listeners) configured to listen on that event execute. These listeners can currently be run either in a new thread or a new process, allowing the creation of background tasks for long-running jobs off an API event, for example.

There are multiple execution options when registering a listener:

  • Simple: Execute the function (listener) when the specified event is emitted.
  • Batch: Batch up the data from mulitple event emissions until no new events occur for interval seconds, then execute the function with all of the received data.
  • Scheduled: Run a function on an interval with no inputs.

For each listener type, there are multiple execution options determining how the function will be executed; determined by fork_type

  • Process(Default): The lsitener is run within its own newly spawned Process
  • Thread: The listener is run within its own newly spawned Thread

Todo

  • Fix up docstrings across the board
  • Add tests
  • Add support for async execution within an existing event loop
  • Consider adding support for returning data from the listeners
  • Add support for external data stores (redis, rabbitmq?, etc.) for persistence of event data / batching

Installation

Install via pip

pip install pyeventmanager


Usage

Simple Listener

.. code-block:: python from event_manager import EventManager

em = EventManager()

# Use a decorator to register a simple listener

@em.on(event="somecategory.event")
def handle_some_event(data: MyDataType):
    ...

# Register a function to handle all events in the system
@em.on_all()
def handle_all_events(data: Any):
    ...

# Register a function to handle all events for a category using wildcard
@em.on(event="somecategory.*")
def handle_all_somecategory_events(data: Any):
    ...

# Register a simple listener using callback syntax

def also_handle_some_event(data: MyDataType):
    ...

em.on(event="somecategory.event", func=also_handle_some_event)

# Emit an event
em.emit(event="somecategory.event", data=MyDataType(...))

Simple Listener With Threading

.. code-block:: python from threading import Thread

from event_manager import EventManager

em = EventManager()

# Use Threading instead of Processing
@em.on(event="something.*", fork_type=Thread)

Batch Listener

.. code-block:: python from event_manager import EventManager, ThreadQueue

em = EventManager()

# Batch all data for `category.some_event` until no new events occur for 60 seconds
@em.on(event="category.some_event", batch=True, batch_window=60)
def handle_some_event_batch(data: list[MyDataType]):
    ...

# Same batch, using Threading.
# The queue type will be auto-detected if not specified, but better to be explicit.
@em.on(
    event="category.some_event",
    batch=True,
    batch_window=60,
    fork_type=Thread,
    queue_type=ThreadQueue,
)
def handle_some_event_batch(data: list[MyDataType]):
    ...

Scheduled Listener

Interval is defined using a datetime.timedelta object.

.. code-block:: python from datetime import timedelta

from event_manager import EventManager

em = EventManager()

# Schedule a function to be run daily
@em.schedule(interval=timedelta(days=1))
def run_daily():
    ...

# Schedule a function to be run hourly
@em.schedule(interval=timedelta(hours=1))
def run_hourly():
    ...

Using A Custom Queue for Batch Listener

.. code-block:: python from datetime import datetime from threading import Thread from typing import Any

from event_manager import EventManager, QueueInterface

class MyCustomQueue(QueueInterface):
    last_updated: datetime | None

    def __init__(self, ...):
        self.last_updated = None

    def __len__(self):
        length = 0
        ...
        return length

    def empty(self) -> bool:
        # Check if the queue is empty
        num_items = 0
        ...
        if num_items > 0:
            return False

        return True

    def get(self) -> Any:
        # Get an item from the queue
        item = {}
        ...
        return item

    def get_all(self) -> list[Any]:
        # Get all items from the queue
        items = []
        ...
        return items

    def put(self, item: Any):
        # Put item to queue
        ...

em = EventManager()

# Add a batched listener and pass in our custom Queue implementation
@em.on(
    event="category.some_event",
    batch=True,
    batch_window=60,
    queue_type=MyCustomQueue,
)
def handle_batch_process(data: list[Any]):
    ...

# Add a batched listener configured to use Threading with our custom Queue implementation
@em.on(
    event="category.some_event",
    batch=True,
    batch_window=60,
    fork_type=Thread,
    queue_type=MyCustomQueue,
)
def handle_batch_process(data: list[Any]):
    ...

API Documentation

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

pyeventmanager-0.2.3.tar.gz (11.4 kB view hashes)

Uploaded Source

Built Distribution

pyeventmanager-0.2.3-py3-none-any.whl (11.5 kB view hashes)

Uploaded Python 3

Supported by

AWS AWS Cloud computing and Security Sponsor Datadog Datadog Monitoring Fastly Fastly CDN Google Google Download Analytics Microsoft Microsoft PSF Sponsor Pingdom Pingdom Monitoring Sentry Sentry Error logging StatusPage StatusPage Status page