Skip to main content

Event management system for Python with support for Threading and Multiprocessing for task running.

Project description

PyEventManager

Read the Docs!

PyEventManager is a simple event-based routing package allowing code to be structured in a pseudo-decentralized manner.

Instead of calling functions directly, you can emit an event and have one or more functions (listeners) configured to listen on that event execute. These listeners can currently be run either in a new thread or a new process, allowing the creation of background tasks for long-running jobs off an API event, for example.

Wrapped listeners return a Future that can be waited on to recieve the response(s) back from the listener.

There are multiple execution options when registering a listener:

  • Simple: Execute the function (listener) when the specified event is emitted.
  • Batch: Batch up the data from mulitple event emissions until no new events occur for interval seconds, then execute the function with all of the received data.
  • Scheduled: Run a function on an interval with no inputs.

For each listener type, there are multiple execution options determining how the function will be executed; determined by fork_type

  • Process(Default): The listener is run in a new Process
  • Thread: The listener is run in a new Thread

Todo

  • Clean up/improve docstrings
  • Add tests
  • Add support for async execution within an existing event loop
  • Add support for external data stores (redis, rabbitmq?, etc.) for persistence of event data / batching

Installation

Install via pip

pip install pyeventmanager


Usage

Simple Listener

    from event_manager import EventManager

    em = EventManager()

    # Use a decorator to register a simple listener

    @em.on(event="somecategory.event")
    def handle_some_event(data: MyDataType):
        ...

    # Register a function to handle all events in the system
    @em.on(event="*")
    def handle_all_events(data: Any):
        ...

    # Register a function to handle all events for a category using wildcard
    @em.on(event="somecategory.*")
    def handle_all_somecategory_events(data: Any):
        ...

    # Register a simple listener using callback syntax
    def also_handle_some_event(data: MyDataType):
        ...

    em.on(event="somecategory.event", func=also_handle_some_event)

    # Emit an event passing data
    ## *args, **kwargs are passed through to listener,
    ##  so any fields can be used as long as the matching listener accepts the same
    em.emit(event="somecategory.event", data=MyDataType(...))

    # Emit an event, wait for jobs to finish, and get the results
    from concurrent.futures import wait

    futures = em.emit(event="somecategory.event, data=MyDataType(...))
    wait(futures)

    results = [f.result() for f in futures]

Simple Listener With Threading

    from event_manager import EventManager, ForkType

    em = EventManager()

    # Use Threading instead of Processing
    @em.on(event="something.*", fork_type=ForkType.THREAD)
    def handle_something():
        ...

Batch Listener

    from event_manager import EventManager, ForkType, ThreadQueue

    em = EventManager()

    # Batch all data for `category.some_event` until no new events occur for 60 seconds
    @em.on_batch(event="category.some_event", batch_idle_window=60)
    def handle_some_event_batch(data: list[MyDataType]):
        ...

    # Same batch, using Threading.
    # The queue type will be auto-detected if not specified, but better to be explicit.
    @em.on(
        event="category.some_event",
        fork_type=ForkType.THREAD,
        batch_idle_window=60,
        queue_type=ThreadQueue,
    )
    def handle_some_event_batch(data: list[MyDataType]):
        ...

    # Batch data until 30 seconds pass, or 20 events come through, whichever happends first
    @em.on_batch(event="category.some_event", batch_count=20, batch_window=30)
    def handle_some_event_batch(data: list[MyDataType]):
        ...

Scheduled Listener

Interval is defined using a datetime.timedelta object.

    from datetime import timedelta

    from event_manager import EventManager

    em = EventManager()

    # Schedule a function to be run daily
    @em.schedule(interval=timedelta(days=1))
    def run_daily():
        ...

    # Schedule a function to be run hourly
    @em.schedule(interval=timedelta(hours=1))
    def run_hourly():
        ...

Using A Custom Queue for Batch Listener

    from datetime import datetime
    from typing import Any

    from event_manager import EventManager, ForkType, QueueInterface

    class MyCustomQueue(QueueInterface):
        last_updated: datetime | None

        def __init__(self, ...):
            self.last_updated = None

        def __len__(self):
            length = 0
            ...
            return length

        def empty(self) -> bool:
            # Check if the queue is empty
            num_items = 0
            ...
            if num_items > 0:
                return False

            return True

        def get(self) -> Any:
            # Get an item from the queue
            item = {}
            ...
            return item

        def get_all(self) -> list[Any]:
            # Get all items from the queue
            items = []
            ...
            return items

        def put(self, item: Any):
            # Put item to queue
            ...

    em = EventManager()

    # Add a batched listener and pass in our custom Queue implementation
    @em.on(
        event="category.some_event",
        batch_idle_window=60,
        queue_type=MyCustomQueue,
    )
    def handle_batch_process(data: list[Any]):
        ...

    # Add a batched listener configured to use Threading with our custom Queue implementation
    @em.on(
        event="category.some_event",
        fork_type=ForkType.THREAD,
        batch_idle_window=60,
        queue_type=MyCustomQueue,
    )
    def handle_batch_process(data: list[Any]):
        ...

API Documentation

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

pyeventmanager-0.5.2.tar.gz (13.0 kB view details)

Uploaded Source

Built Distribution

If you're not sure about the file name format, learn more about wheel file names.

pyeventmanager-0.5.2-py3-none-any.whl (13.4 kB view details)

Uploaded Python 3

File details

Details for the file pyeventmanager-0.5.2.tar.gz.

File metadata

  • Download URL: pyeventmanager-0.5.2.tar.gz
  • Upload date:
  • Size: 13.0 kB
  • Tags: Source
  • Uploaded using Trusted Publishing? No
  • Uploaded via: python-httpx/0.27.0

File hashes

Hashes for pyeventmanager-0.5.2.tar.gz
Algorithm Hash digest
SHA256 465734af8305415e2e5b833e968689085ad029da404c17bfbca844a916cbe12d
MD5 9d4596857751ce159bbfb19c2ce7700d
BLAKE2b-256 2fd0ba1b23f2194bc803cafcffe8c5268d7b8a3016af92bd07cb3897e29fbcf3

See more details on using hashes here.

File details

Details for the file pyeventmanager-0.5.2-py3-none-any.whl.

File metadata

File hashes

Hashes for pyeventmanager-0.5.2-py3-none-any.whl
Algorithm Hash digest
SHA256 5c79d677c47fb8ccff57d5522af68d69a041d3a3868edfdeae3bb641ea279fca
MD5 9479e0e05696efe7ccc5c7235a7f8922
BLAKE2b-256 87cca145fc8eb0ae15ed60f275a35c573b939cec4dfe1bc9a849f38014221ed3

See more details on using hashes here.

Supported by

AWS Cloud computing and Security Sponsor Datadog Monitoring Depot Continuous Integration Fastly CDN Google Download Analytics Pingdom Monitoring Sentry Error logging StatusPage Status page