Skip to main content

Event management system for Python with support for Threading and Multiprocessing for task running.

Project description

PyEventManager

Read the Docs!

PyEventManager is a simple event-based routing package allowing code to be structured in a pseudo-decentralized manner.

Instead of calling functions directly, you can emit an event and have one or more functions (listeners) configured to listen on that event execute. These listeners can currently be run either in a new thread or a new process, allowing the creation of background tasks for long-running jobs off an API event, for example.

Wrapped listeners return a Future that can be waited on to recieve the response(s) back from the listener.

There are multiple execution options when registering a listener:

  • Simple: Execute the function (listener) when the specified event is emitted.
  • Batch: Batch up the data from mulitple event emissions until no new events occur for interval seconds, then execute the function with all of the received data.
  • Scheduled: Run a function on an interval with no inputs.

For each listener type, there are multiple execution options determining how the function will be executed; determined by fork_type

  • Process(Default): The listener is run in a new Process
  • Thread: The listener is run in a new Thread

Todo

  • Clean up/improve docstrings
  • Add tests
  • Add support for async execution within an existing event loop
  • Add support for external data stores (redis, rabbitmq?, etc.) for persistence of event data / batching

Installation

Install via pip

pip install pyeventmanager


Usage

Simple Listener

    from event_manager import EventManager

    em = EventManager()

    # Use a decorator to register a simple listener

    @em.on(event="somecategory.event")
    def handle_some_event(data: MyDataType):
        ...

    # Register a function to handle all events in the system
    @em.on(event="*")
    def handle_all_events(data: Any):
        ...

    # Register a function to handle all events for a category using wildcard
    @em.on(event="somecategory.*")
    def handle_all_somecategory_events(data: Any):
        ...

    # Register a simple listener using callback syntax
    def also_handle_some_event(data: MyDataType):
        ...

    em.on(event="somecategory.event", func=also_handle_some_event)

    # Emit an event passing data
    ## *args, **kwargs are passed through to listener,
    ##  so any fields can be used as long as the matching listener accepts the same
    em.emit(event="somecategory.event", data=MyDataType(...))

    # Emit an event, wait for jobs to finish, and get the results
    from concurrent.futures import wait

    futures = em.emit(event="somecategory.event, data=MyDataType(...))
    wait(futures)

    results = [f.result() for f in futures]

Simple Listener With Threading

    from event_manager import EventManager, ForkType

    em = EventManager()

    # Use Threading instead of Processing
    @em.on(event="something.*", fork_type=ForkType.THREAD)
    def handle_something():
        ...

Batch Listener

    from event_manager import EventManager, ForkType, ThreadQueue

    em = EventManager()

    # Batch all data for `category.some_event` until no new events occur for 60 seconds
    @em.on_batch(event="category.some_event", batch_idle_window=60)
    def handle_some_event_batch(data: list[MyDataType]):
        ...

    # Same batch, using Threading.
    # The queue type will be auto-detected if not specified, but better to be explicit.
    @em.on(
        event="category.some_event",
        fork_type=ForkType.THREAD,
        batch_idle_window=60,
        queue_type=ThreadQueue,
    )
    def handle_some_event_batch(data: list[MyDataType]):
        ...

    # Batch data until 30 seconds pass, or 20 events come through, whichever happends first
    @em.on_batch(event="category.some_event", batch_count=20, batch_window=30)
    def handle_some_event_batch(data: list[MyDataType]):
        ...

Scheduled Listener

Interval is defined using a datetime.timedelta object.

    from datetime import timedelta

    from event_manager import EventManager

    em = EventManager()

    # Schedule a function to be run daily
    @em.schedule(interval=timedelta(days=1))
    def run_daily():
        ...

    # Schedule a function to be run hourly
    @em.schedule(interval=timedelta(hours=1))
    def run_hourly():
        ...

Using A Custom Queue for Batch Listener

    from datetime import datetime
    from typing import Any

    from event_manager import EventManager, ForkType, QueueInterface

    class MyCustomQueue(QueueInterface):
        last_updated: datetime | None

        def __init__(self, ...):
            self.last_updated = None

        def __len__(self):
            length = 0
            ...
            return length

        def empty(self) -> bool:
            # Check if the queue is empty
            num_items = 0
            ...
            if num_items > 0:
                return False

            return True

        def get(self) -> Any:
            # Get an item from the queue
            item = {}
            ...
            return item

        def get_all(self) -> list[Any]:
            # Get all items from the queue
            items = []
            ...
            return items

        def put(self, item: Any):
            # Put item to queue
            ...

    em = EventManager()

    # Add a batched listener and pass in our custom Queue implementation
    @em.on(
        event="category.some_event",
        batch_idle_window=60,
        queue_type=MyCustomQueue,
    )
    def handle_batch_process(data: list[Any]):
        ...

    # Add a batched listener configured to use Threading with our custom Queue implementation
    @em.on(
        event="category.some_event",
        fork_type=ForkType.THREAD,
        batch_idle_window=60,
        queue_type=MyCustomQueue,
    )
    def handle_batch_process(data: list[Any]):
        ...

API Documentation

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

pyeventmanager-0.5.6.tar.gz (13.2 kB view details)

Uploaded Source

Built Distribution

pyeventmanager-0.5.6-py3-none-any.whl (13.4 kB view details)

Uploaded Python 3

File details

Details for the file pyeventmanager-0.5.6.tar.gz.

File metadata

  • Download URL: pyeventmanager-0.5.6.tar.gz
  • Upload date:
  • Size: 13.2 kB
  • Tags: Source
  • Uploaded using Trusted Publishing? No
  • Uploaded via: python-httpx/0.27.0

File hashes

Hashes for pyeventmanager-0.5.6.tar.gz
Algorithm Hash digest
SHA256 73024af2ad24b34e75a05161d45e0e2ae79c30c9482476dbef18e05230043672
MD5 a8713e981a88bfc4565d6a52bf626f7a
BLAKE2b-256 a567c7585a24599fbd34811475598b17b5ae3a58e9aaa01eb3541f4e64c40080

See more details on using hashes here.

File details

Details for the file pyeventmanager-0.5.6-py3-none-any.whl.

File metadata

File hashes

Hashes for pyeventmanager-0.5.6-py3-none-any.whl
Algorithm Hash digest
SHA256 248d94791f5067b95e6d633c678747a8183e842555e895ff6fb7ddc8a68e52bf
MD5 3c2af95b808b749ff4a33a2839d0ec30
BLAKE2b-256 13c1f2a1964c7ee662e87b088404be938a6ea61f64851386a4bad4ab4facc66c

See more details on using hashes here.

Supported by

AWS AWS Cloud computing and Security Sponsor Datadog Datadog Monitoring Fastly Fastly CDN Google Google Download Analytics Microsoft Microsoft PSF Sponsor Pingdom Pingdom Monitoring Sentry Sentry Error logging StatusPage StatusPage Status page