Skip to main content

No project description provided

Project description

wexample-event

Version: 0.0.80

Table of Contents

Status & Compatibility

Maturity: Production-ready

Python Support: >=3.10

OS Support: Linux, macOS, Windows

Status: Actively maintained

Quickstart

Basic Event Dispatching

from wexample_event.dataclass.event import Event
from wexample_event.common.dispatcher import EventDispatcherMixin

# Create a dispatcher
class MyApp(EventDispatcherMixin):
    pass

app = MyApp()

# Add a listener
def on_user_login(event: Event) -> None:
    print(f"User logged in: {event.payload['username']}")

app.add_event_listener("user.login", on_user_login)

# Dispatch an event
app.dispatch("user.login", payload={"username": "john"})

Using the Listener Mixin

from wexample_event.dataclass.event import Event
from wexample_event.common.listener import EventListenerMixin
from wexample_event.common.dispatcher import EventDispatcherMixin

class MyDispatcher(EventDispatcherMixin):
    pass

class MyListener(EventListenerMixin):
    @EventListenerMixin.on("user.login")
    def handle_login(self, event: Event) -> None:
        print(f"Login handled: {event.payload['username']}")
    
    @EventListenerMixin.on("user.logout")
    def handle_logout(self, event: Event) -> None:
        print("User logged out")

# Setup
dispatcher = MyDispatcher()
listener = MyListener()
listener.bind_to_dispatcher(dispatcher)

# Dispatch events
dispatcher.dispatch("user.login", payload={"username": "jane"})
dispatcher.dispatch("user.logout")

Async Events

import asyncio
from wexample_event.dataclass.event import Event
from wexample_event.common.dispatcher import EventDispatcherMixin

class AsyncApp(EventDispatcherMixin):
    pass

app = AsyncApp()

async def async_handler(event: Event) -> None:
    await asyncio.sleep(0.1)
    print(f"Async handler: {event.name}")

app.add_event_listener("async.event", async_handler)

# Use dispatch_async for async handlers
await app.dispatch_async("async.event")

Priority and Once

from wexample_event.dataclass.event import Event
from wexample_event.common.priority import EventPriority
from wexample_event.common.dispatcher import EventDispatcherMixin

app = EventDispatcherMixin()

# High priority listener (runs first)
def high_priority(event: Event) -> None:
    print("High priority")

# Low priority listener (runs last)
def low_priority(event: Event) -> None:
    print("Low priority")

# One-time listener
def once_handler(event: Event) -> None:
    print("This runs only once")

app.add_event_listener("test", high_priority, priority=EventPriority.HIGH)
app.add_event_listener("test", low_priority, priority=EventPriority.LOW)
app.add_event_listener("test", once_handler, once=True)

app.dispatch("test")  # All three run
app.dispatch("test")  # Only high and low priority run

API Reference

Core Classes

Event

Immutable dataclass representing an event.

@dataclass(frozen=True, slots=True)
class Event:
    name: str
    payload: Mapping[str, Any] | None = None
    metadata: Mapping[str, Any] | None = None
    source: Any | None = None
    timestamp: datetime = field(default_factory=lambda: datetime.now(timezone.utc))

Methods:

  • with_update(**changes) - Returns a copy with updated fields
  • derive(name=None, **changes) - Creates a derived event, optionally with a new name

EventDispatcherMixin

Mixin providing event dispatching capabilities.

Methods:

  • add_event_listener(name, callback, *, once=False, priority=DEFAULT_PRIORITY) - Register a listener
  • remove_event_listener(name, callback) - Remove a listener (returns bool)
  • clear_event_listeners(name=None) - Clear listeners (all or for specific event)
  • has_event_listeners(name) - Check if event has listeners
  • dispatch(event, *, payload=None, metadata=None, source=_UNSET) - Dispatch synchronously
  • dispatch_async(event, *, payload=None, metadata=None, source=_UNSET) - Dispatch asynchronously
  • dispatch_event(...) - Alias for dispatch
  • dispatch_event_async(...) - Alias for dispatch_async

EventListenerMixin

Mixin for declarative event listeners using decorators.

Class Method:

  • @on(event_name, *, priority=DEFAULT_PRIORITY, once=False) - Decorator to mark methods as listeners

Methods:

  • bind_to_dispatcher(dispatcher) - Bind all decorated methods to a dispatcher
  • unbind_from_dispatcher() - Unbind from current dispatcher
  • get_bound_dispatcher() - Get the currently bound dispatcher

EventPriority

Enum for common priority values.

class EventPriority(IntEnum):
    LOW = -100
    NORMAL = 0
    HIGH = 100

Constant:

  • DEFAULT_PRIORITY = EventPriority.NORMAL

Type Aliases

EventCallback = Callable[[Event], Awaitable[None] | None]

Dataclasses

ListenerRecord

Internal dataclass storing listener information.

@dataclass(slots=True)
class ListenerRecord:
    callback: EventCallback
    once: bool
    priority: int
    order: int

ListenerSpec

Internal dataclass for decorator metadata.

@dataclass(frozen=True, slots=True)
class ListenerSpec:
    name: str
    priority: int
    once: bool

ListenerState

Internal class managing listener binding state.

class ListenerState:
    dispatcher: EventDispatcherMixin | None
    bindings: List[tuple[str, EventCallback]]

Usage Patterns

Synchronous Listener

def my_listener(event: Event) -> None:
    print(event.name)

Asynchronous Listener

async def my_async_listener(event: Event) -> None:
    await some_async_operation()

Decorator Pattern

class MyListener(EventListenerMixin):
    @EventListenerMixin.on("event.name", priority=EventPriority.HIGH)
    def handle_event(self, event: Event) -> None:
        pass

Custom Priority

dispatcher.add_event_listener("event", callback, priority=50)

Once Listener

dispatcher.add_event_listener("event", callback, once=True)

Examples

Plugin System

from wexample_event import Event, EventDispatcherMixin, EventListenerMixin

class Application(EventDispatcherMixin):
    def start(self) -> None:
        self.dispatch("app.startup")
        print("Application started")
    
    def stop(self) -> None:
        self.dispatch("app.shutdown")
        print("Application stopped")

class LoggingPlugin(EventListenerMixin):
    @EventListenerMixin.on("app.startup")
    def on_startup(self, event: Event) -> None:
        print("[LOG] Application is starting...")
    
    @EventListenerMixin.on("app.shutdown")
    def on_shutdown(self, event: Event) -> None:
        print("[LOG] Application is shutting down...")

class MetricsPlugin(EventListenerMixin):
    @EventListenerMixin.on("app.startup")
    def on_startup(self, event: Event) -> None:
        print("[METRICS] Recording startup time")

# Setup
app = Application()
logging_plugin = LoggingPlugin()
metrics_plugin = MetricsPlugin()

logging_plugin.bind_to_dispatcher(app)
metrics_plugin.bind_to_dispatcher(app)

# Run
app.start()
app.stop()

State Change Notifications

from wexample_event import Event, EventDispatcherMixin

class DataStore(EventDispatcherMixin):
    def __init__(self) -> None:
        self._data: dict = {}
    
    def set(self, key: str, value: any) -> None:
        old_value = self._data.get(key)
        self._data[key] = value
        
        self.dispatch(
            "data.changed",
            payload={"key": key, "old": old_value, "new": value}
        )
    
    def get(self, key: str) -> any:
        return self._data.get(key)

# Create store and add listeners
store = DataStore()

def on_data_changed(event: Event) -> None:
    payload = event.payload
    print(f"Data changed: {payload['key']} = {payload['new']}")

store.add_event_listener("data.changed", on_data_changed)

# Use the store
store.set("username", "alice")
store.set("username", "bob")

Request/Response Pipeline

from wexample_event import Event, EventDispatcherMixin, EventPriority

class RequestPipeline(EventDispatcherMixin):
    def process(self, request: dict) -> dict:
        # Create event with mutable response
        response = {"status": "pending", "data": request}
        
        self.dispatch(
            "request.process",
            payload={"request": request, "response": response}
        )
        
        return response

pipeline = RequestPipeline()

# Authentication middleware (high priority)
def authenticate(event: Event) -> None:
    response = event.payload["response"]
    request = event.payload["request"]
    
    if not request.get("auth_token"):
        response["status"] = "error"
        response["message"] = "Authentication required"
    else:
        response["authenticated"] = True

# Validation middleware (normal priority)
def validate(event: Event) -> None:
    response = event.payload["response"]
    if response["status"] == "error":
        return  # Skip if already errored
    
    request = event.payload["request"]
    if not request.get("data"):
        response["status"] = "error"
        response["message"] = "Data required"

# Processing (low priority)
def process_data(event: Event) -> None:
    response = event.payload["response"]
    if response["status"] == "error":
        return
    
    response["status"] = "success"
    response["processed"] = True

pipeline.add_event_listener("request.process", authenticate, priority=EventPriority.HIGH)
pipeline.add_event_listener("request.process", validate, priority=EventPriority.NORMAL)
pipeline.add_event_listener("request.process", process_data, priority=EventPriority.LOW)

# Process requests
result1 = pipeline.process({"auth_token": "abc", "data": {"value": 123}})
print(result1)  # Success

result2 = pipeline.process({"data": {"value": 456}})
print(result2)  # Error: Authentication required

Event Inheritance

from wexample_event import Event, EventDispatcherMixin, EventListenerMixin

class BaseListener(EventListenerMixin):
    @EventListenerMixin.on("base.event")
    def handle_base(self, event: Event) -> None:
        print("Base handler")

class ExtendedListener(BaseListener):
    @EventListenerMixin.on("extended.event")
    def handle_extended(self, event: Event) -> None:
        print("Extended handler")

dispatcher = EventDispatcherMixin()
listener = ExtendedListener()
listener.bind_to_dispatcher(dispatcher)

# Both base and extended events work
dispatcher.dispatch("base.event")      # Prints: Base handler
dispatcher.dispatch("extended.event")  # Prints: Extended handler

Dynamic Event Names

from wexample_event import Event, EventDispatcherMixin

class EventBus(EventDispatcherMixin):
    def emit(self, event_type: str, **data) -> None:
        self.dispatch(f"event.{event_type}", payload=data)

bus = EventBus()

# Add wildcard-style handlers
def handle_user_events(event: Event) -> None:
    print(f"User event: {event.name}")

bus.add_event_listener("event.user.login", handle_user_events)
bus.add_event_listener("event.user.logout", handle_user_events)

# Emit events
bus.emit("user.login", username="alice")
bus.emit("user.logout", username="alice")

Tests

This project uses pytest for testing and pytest-cov for code coverage analysis.

Installation

First, install the required testing dependencies:

.venv/bin/python -m pip install pytest pytest-cov

Basic Usage

Run all tests with coverage:

.venv/bin/python -m pytest --cov --cov-report=html

Common Commands

# Run tests with coverage for a specific module
.venv/bin/python -m pytest --cov=your_module

# Show which lines are not covered
.venv/bin/python -m pytest --cov=your_module --cov-report=term-missing

# Generate an HTML coverage report
.venv/bin/python -m pytest --cov=your_module --cov-report=html

# Combine terminal and HTML reports
.venv/bin/python -m pytest --cov=your_module --cov-report=term-missing --cov-report=html

# Run specific test file with coverage
.venv/bin/python -m pytest tests/test_file.py --cov=your_module --cov-report=term-missing

Viewing HTML Reports

After generating an HTML report, open htmlcov/index.html in your browser to view detailed line-by-line coverage information.

Coverage Threshold

To enforce a minimum coverage percentage:

.venv/bin/python -m pytest --cov=your_module --cov-fail-under=80

This will cause the test suite to fail if coverage drops below 80%.

Known Limitations & Roadmap

Current limitations and planned features are tracked in the GitHub issues.

See the project roadmap for upcoming features and improvements.

Useful Links

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

wexample_event-0.0.80.tar.gz (17.7 kB view details)

Uploaded Source

Built Distribution

If you're not sure about the file name format, learn more about wheel file names.

wexample_event-0.0.80-py3-none-any.whl (11.2 kB view details)

Uploaded Python 3

File details

Details for the file wexample_event-0.0.80.tar.gz.

File metadata

  • Download URL: wexample_event-0.0.80.tar.gz
  • Upload date:
  • Size: 17.7 kB
  • Tags: Source
  • Uploaded using Trusted Publishing? No
  • Uploaded via: pdm/2.25.9 CPython/3.12.3 Linux/6.8.0-101-generic

File hashes

Hashes for wexample_event-0.0.80.tar.gz
Algorithm Hash digest
SHA256 a37e56255ff13aae8c9c8fdb8385da9f723e55c5d1a5465d8f5ccf784b7ae2f4
MD5 a02f281e54aff38f9791d2648f6985e0
BLAKE2b-256 a7454e44318a76062e2316bb70b3d786d0823d7866f4fb24813cc6d814f52226

See more details on using hashes here.

File details

Details for the file wexample_event-0.0.80-py3-none-any.whl.

File metadata

  • Download URL: wexample_event-0.0.80-py3-none-any.whl
  • Upload date:
  • Size: 11.2 kB
  • Tags: Python 3
  • Uploaded using Trusted Publishing? No
  • Uploaded via: pdm/2.25.9 CPython/3.12.3 Linux/6.8.0-101-generic

File hashes

Hashes for wexample_event-0.0.80-py3-none-any.whl
Algorithm Hash digest
SHA256 f30e2ef941a4786a8733a1d3f69109a0eb9a96457a6cf952f2c2f1861d93c35c
MD5 ab9ba907cfa324e3083188ec0ea124d6
BLAKE2b-256 7d1fb59984ce04d40c739d7387b1a59fbc4a222729dc56e2ad6d685a17fcd5e5

See more details on using hashes here.

Supported by

AWS Cloud computing and Security Sponsor Datadog Monitoring Depot Continuous Integration Fastly CDN Google Download Analytics Pingdom Monitoring Sentry Error logging StatusPage Status page