Skip to main content

This Python module implements an event handling framework (`EventEmitter`) with support for prioritization (`PriorityEvent`), event filtering, and processing using threading, queues (`queue` module), heap operations (`heapq` module), and logging (`logging` module) capabilities.

Project description

Event Handling Framework with EventEmitter

This Python module implements an event handling framework (EventEmitter) that supports prioritization (PriorityEvent), event filtering, and processing using threading, queues (queue module), heap operations (heapq module), and logging (logging module) capabilities.

Classes

EventHandler

Defines an interface for handling events.

NotificationHandler

Extends EventHandler and handles notifications with specific ID and Message attributes.

Notification

Represents a notification entity with attributes ID, Message, and CreatedAt.

PriorityEvent

Represents an event with its associated priority for queueing and processing.

EventEmitter

Manages event registration, emission, and handling. Features include:

  • Logging: Enables or disables logging and logs events with their priorities.
  • Event Registration: Registers listeners and handlers for specific events.
  • Event Emission: Emits events with context for cancellation and timeout.
  • Event Processing: Processes events in priority order using heap operations.
  • Event Handling: Dispatches events to registered handlers.

Example Usage

main.py

import threading
import time
import queue
import logging
from pyeventhub import EventEmitter, NotificationHandler, Notification

if __name__ == "__main__":
    logging.basicConfig(level=logging.INFO)

    # Example usage
    emitter = EventEmitter(logging=True)

    # Register notification handlers
    handler1 = NotificationHandler(name="EmailHandler")
    handler2 = NotificationHandler(name="SMSHandler")

    emitter.on_event("email_notification", handler1)
    emitter.on_event("sms_notification", handler2)

    # Registering listeners with filters
    def email_filter(data):
        if isinstance(data, Notification):
            return data.ID > 0  # Example filter: process only notifications with ID > 0
        return False

    chEmail = emitter.on("email_notification", email_filter)
    chSMS = emitter.on("sms_notification")

    # Simulate sending notifications with priority
    def send_notifications():
        emitter.emit_with_context(None, "email_notification", Notification(ID=1, Message="New email received", CreatedAt=time.time()), 2)  # Higher priority
        emitter.emit_with_context(None, "sms_notification", Notification(ID=2, Message="You have a new SMS", CreatedAt=time.time()), 1)  # Lower priority

    # Handle notifications asynchronously
    def handle_notifications():
        while True:
            if not chEmail.empty():
                notification = chEmail.get()
                print("Received email notification")
                handler1.handle(notification)
            if not chSMS.empty():
                notification = chSMS.get()
                print("Received SMS notification")
                handler2.handle(notification)

    send_thread = threading.Thread(target=send_notifications)
    handle_thread = threading.Thread(target=handle_notifications)

    send_thread.start()
    handle_thread.start()

    # Allow some time for notifications to be processed
    time.sleep(2)

    # Clean up
    emitter.close()

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

pyeventhub-0.1.2.tar.gz (4.0 kB view details)

Uploaded Source

Built Distribution

pyeventhub-0.1.2-py3-none-any.whl (6.6 kB view details)

Uploaded Python 3

File details

Details for the file pyeventhub-0.1.2.tar.gz.

File metadata

  • Download URL: pyeventhub-0.1.2.tar.gz
  • Upload date:
  • Size: 4.0 kB
  • Tags: Source
  • Uploaded using Trusted Publishing? No
  • Uploaded via: poetry/1.8.3 CPython/3.11.2 Linux/6.1.0-18-amd64

File hashes

Hashes for pyeventhub-0.1.2.tar.gz
Algorithm Hash digest
SHA256 9667b57746ed236a57248c85e60e8560700d62bf7d25f82e49ac57afd0c2e8a6
MD5 f0ca488284802a1742ad2130415a0530
BLAKE2b-256 539ee59d5308d8940bb0f20a6724a27d23f722bc3359e8ce9916dc1e3b53101e

See more details on using hashes here.

File details

Details for the file pyeventhub-0.1.2-py3-none-any.whl.

File metadata

  • Download URL: pyeventhub-0.1.2-py3-none-any.whl
  • Upload date:
  • Size: 6.6 kB
  • Tags: Python 3
  • Uploaded using Trusted Publishing? No
  • Uploaded via: poetry/1.8.3 CPython/3.11.2 Linux/6.1.0-18-amd64

File hashes

Hashes for pyeventhub-0.1.2-py3-none-any.whl
Algorithm Hash digest
SHA256 c87888d95b54fff4f4567dd23e199141f64478baba431ddd7bc68ca5da37966f
MD5 ead7669043ec4dae20335d282f093d24
BLAKE2b-256 aa67bf1060d13dd123b866365f6c7eb7c9252e7032ec8d49952fd85b49c32e2c

See more details on using hashes here.

Supported by

AWS AWS Cloud computing and Security Sponsor Datadog Datadog Monitoring Fastly Fastly CDN Google Google Download Analytics Microsoft Microsoft PSF Sponsor Pingdom Pingdom Monitoring Sentry Sentry Error logging StatusPage StatusPage Status page