Skip to main content

Pydantic-based event management system for Python.

Project description

PyEventManager

Read the Docs!

PyEventManager is a simple event-based routing package allowing code to be structured in a pseudo-decentralized manner.

Instead of calling functions directly, you can emit an event and have one or more functions (listeners) configured to listen on that event execute. These listeners can currently be run either in a new thread or a new process, allowing the creation of background tasks for long-running jobs off an API event, for example.

Wrapped listeners return a Future that can be waited on to recieve the response(s) back from the listener.

There are multiple execution options when registering a listener:

  • Simple: Execute the function (listener) when the specified event is emitted.
  • Batch: Batch up the data from mulitple event emissions until no new events occur for interval seconds, then execute the function with all of the received data.
  • Scheduled: Run a function on an interval with no inputs.

For each listener type, there are multiple execution options determining how the function will be executed; determined by fork_type

  • Process(Default): The listener is run in a new Process
  • Thread: The listener is run in a new Thread

Todo

  • Clean up/improve docstrings
  • Add tests
  • Add support for async execution within an existing event loop
  • Add support for external data stores (redis, rabbitmq?, etc.) for persistence of event data / batching

Installation

Install via pip

pip install pyeventmanager


Usage

Simple Listener

    from event_manager import EventManager

    em = EventManager()

    # Use a decorator to register a simple listener

    @em.on(event="somecategory.event")
    def handle_some_event(data: MyDataType):
        ...

    # Register a function to handle all events in the system
    @em.on(event="*")
    def handle_all_events(data: Any):
        ...

    # Register a function to handle all events for a category using wildcard
    @em.on(event="somecategory.*")
    def handle_all_somecategory_events(data: Any):
        ...

    # Register a simple listener using callback syntax
    def also_handle_some_event(data: MyDataType):
        ...

    em.on(event="somecategory.event", func=also_handle_some_event)

    # Emit an event passing data
    ## *args, **kwargs are passed through to listener,
    ##  so any fields can be used as long as the matching listener accepts the same
    em.emit(event="somecategory.event", data=MyDataType(...))

    # Emit an event, wait for jobs to finish, and get the results
    from concurrent.futures import wait

    futures = em.emit(event="somecategory.event, data=MyDataType(...))
    wait(futures)

    results = [f.result() for f in futures]

Simple Listener With Threading

    from event_manager import EventManager, ForkType

    em = EventManager()

    # Use Threading instead of Processing
    @em.on(event="something.*", fork_type=ForkType.THREAD)
    def handle_something():
        ...

Batch Listener

    from event_manager import EventManager, ForkType, ThreadQueue

    em = EventManager()

    # Batch all data for `category.some_event` until no new events occur for 60 seconds
    @em.on_batch(event="category.some_event", batch_idle_window=60)
    def handle_some_event_batch(data: list[MyDataType]):
        ...

    # Same batch, using Threading.
    # The queue type will be auto-detected if not specified, but better to be explicit.
    @em.on(
        event="category.some_event",
        fork_type=ForkType.THREAD,
        batch_idle_window=60,
        queue_type=ThreadQueue,
    )
    def handle_some_event_batch(data: list[MyDataType]):
        ...

    # Batch data until 30 seconds pass, or 20 events come through, whichever happends first
    @em.on_batch(event="category.some_event", batch_count=20, batch_window=30)
    def handle_some_event_batch(data: list[MyDataType]):
        ...

Scheduled Listener

Interval is defined using a datetime.timedelta object.

    from datetime import timedelta

    from event_manager import EventManager

    em = EventManager()

    # Schedule a function to be run daily
    @em.schedule(interval=timedelta(days=1))
    def run_daily():
        ...

    # Schedule a function to be run hourly
    @em.schedule(interval=timedelta(hours=1))
    def run_hourly():
        ...

Using A Custom Queue for Batch Listener

    from datetime import datetime
    from typing import Any

    from event_manager import EventManager, ForkType, QueueInterface

    class MyCustomQueue(QueueInterface):
        last_updated: datetime | None

        def __init__(self, ...):
            self.last_updated = None

        def __len__(self):
            length = 0
            ...
            return length

        def empty(self) -> bool:
            # Check if the queue is empty
            num_items = 0
            ...
            if num_items > 0:
                return False

            return True

        def get(self) -> Any:
            # Get an item from the queue
            item = {}
            ...
            return item

        def get_all(self) -> list[Any]:
            # Get all items from the queue
            items = []
            ...
            return items

        def put(self, item: Any):
            # Put item to queue
            ...

    em = EventManager()

    # Add a batched listener and pass in our custom Queue implementation
    @em.on(
        event="category.some_event",
        batch_idle_window=60,
        queue_type=MyCustomQueue,
    )
    def handle_batch_process(data: list[Any]):
        ...

    # Add a batched listener configured to use Threading with our custom Queue implementation
    @em.on(
        event="category.some_event",
        fork_type=ForkType.THREAD,
        batch_idle_window=60,
        queue_type=MyCustomQueue,
    )
    def handle_batch_process(data: list[Any]):
        ...

API Documentation

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

pyeventmanager-0.5.7.tar.gz (9.4 kB view details)

Uploaded Source

Built Distribution

If you're not sure about the file name format, learn more about wheel file names.

pyeventmanager-0.5.7-py3-none-any.whl (13.5 kB view details)

Uploaded Python 3

File details

Details for the file pyeventmanager-0.5.7.tar.gz.

File metadata

  • Download URL: pyeventmanager-0.5.7.tar.gz
  • Upload date:
  • Size: 9.4 kB
  • Tags: Source
  • Uploaded using Trusted Publishing? No
  • Uploaded via: uv/0.11.6 {"installer":{"name":"uv","version":"0.11.6","subcommand":["publish"]},"python":null,"implementation":{"name":null,"version":null},"distro":{"name":"Ubuntu","version":"24.04","id":"noble","libc":null},"system":{"name":null,"release":null},"cpu":null,"openssl_version":null,"setuptools_version":null,"rustc_version":null,"ci":true}

File hashes

Hashes for pyeventmanager-0.5.7.tar.gz
Algorithm Hash digest
SHA256 f433ad12b69905dd0a05f712b2159b0e9b249df8844db5ce7035fac4bc41199c
MD5 c853a323427153403c1082e9b337505c
BLAKE2b-256 53bd067c99441fa9ac905f1ddbae7f2fccf32b7b981c678046e2ca9a58dd45dd

See more details on using hashes here.

File details

Details for the file pyeventmanager-0.5.7-py3-none-any.whl.

File metadata

  • Download URL: pyeventmanager-0.5.7-py3-none-any.whl
  • Upload date:
  • Size: 13.5 kB
  • Tags: Python 3
  • Uploaded using Trusted Publishing? No
  • Uploaded via: uv/0.11.6 {"installer":{"name":"uv","version":"0.11.6","subcommand":["publish"]},"python":null,"implementation":{"name":null,"version":null},"distro":{"name":"Ubuntu","version":"24.04","id":"noble","libc":null},"system":{"name":null,"release":null},"cpu":null,"openssl_version":null,"setuptools_version":null,"rustc_version":null,"ci":true}

File hashes

Hashes for pyeventmanager-0.5.7-py3-none-any.whl
Algorithm Hash digest
SHA256 10db3c8fdf68ea9763bcc396597dcd7a73a913d679f66db755eb5833da3de7e7
MD5 c3b2190975988e6438cf87225a9235eb
BLAKE2b-256 dad1f00858d0c04a209cf9bf2d0c03b012851424796cdd9f3c747e55011aaa59

See more details on using hashes here.

Supported by

AWS Cloud computing and Security Sponsor Datadog Monitoring Depot Continuous Integration Fastly CDN Google Download Analytics Pingdom Monitoring Sentry Error logging StatusPage Status page