Skip to main content

No project description provided

Project description

event

Version: 6.0.23

Table of Contents

Tests

This project uses pytest for testing and pytest-cov for code coverage analysis.

Installation

First, install the required testing dependencies:

.venv/bin/python -m pip install pytest pytest-cov

Basic Usage

Run all tests with coverage:

.venv/bin/python -m pytest --cov --cov-report=html

Common Commands

# Run tests with coverage for a specific module
.venv/bin/python -m pytest --cov=your_module

# Show which lines are not covered
.venv/bin/python -m pytest --cov=your_module --cov-report=term-missing

# Generate an HTML coverage report
.venv/bin/python -m pytest --cov=your_module --cov-report=html

# Combine terminal and HTML reports
.venv/bin/python -m pytest --cov=your_module --cov-report=term-missing --cov-report=html

# Run specific test file with coverage
.venv/bin/python -m pytest tests/test_file.py --cov=your_module --cov-report=term-missing

Viewing HTML Reports

After generating an HTML report, open htmlcov/index.html in your browser to view detailed line-by-line coverage information.

Coverage Threshold

To enforce a minimum coverage percentage:

.venv/bin/python -m pytest --cov=your_module --cov-fail-under=80

This will cause the test suite to fail if coverage drops below 80%.

Integration in the Suite

This package is part of the Wexample Suite — a collection of high-quality, modular tools designed to work seamlessly together across multiple languages and environments.

Related Packages

The suite includes packages for configuration management, file handling, prompts, and more. Each package can be used independently or as part of the integrated suite.

Visit the Wexample Suite documentation for the complete package ecosystem.

Dependencies

  • wexample-helpers: >=6.9.0

Versioning & Compatibility Policy

Wexample packages follow Semantic Versioning (SemVer):

  • MAJOR: Breaking changes
  • MINOR: New features, backward compatible
  • PATCH: Bug fixes, backward compatible

We maintain backward compatibility within major versions and provide clear migration guides for breaking changes.

License

This project is licensed under the MIT License - see the LICENSE file for details.

Free to use in both personal and commercial projects.

Integration in the Suite

This package is part of the Wexample Suite — a collection of high-quality, modular tools designed to work seamlessly together across multiple languages and environments.

Related Packages

The suite includes packages for configuration management, file handling, prompts, and more. Each package can be used independently or as part of the integrated suite.

Visit the Wexample Suite documentation for the complete package ecosystem.

About us

Wexample stands as a cornerstone of the digital ecosystem — a collective of seasoned engineers, researchers, and creators driven by a relentless pursuit of technological excellence. More than a media platform, it has grown into a vibrant community where innovation meets craftsmanship, and where every line of code reflects a commitment to clarity, durability, and shared intelligence.

This packages suite embodies this spirit. Trusted by professionals and enthusiasts alike, it delivers a consistent, high-quality foundation for modern development — open, elegant, and battle-tested. Its reputation is built on years of collaboration, refinement, and rigorous attention to detail, making it a natural choice for those who demand both robustness and beauty in their tools.

Wexample cultivates a culture of mastery. Each package, each contribution carries the mark of a community that values precision, ethics, and innovation — a community proud to shape the future of digital craftsmanship.

API Reference

Core Classes

Event

Immutable dataclass representing an event.

@dataclass(frozen=True, slots=True)
class Event:
    name: str
    payload: Mapping[str, Any] | None = None
    metadata: Mapping[str, Any] | None = None
    source: Any | None = None
    timestamp: datetime = field(default_factory=lambda: datetime.now(timezone.utc))

Methods:

  • with_update(**changes) - Returns a copy with updated fields
  • derive(name=None, **changes) - Creates a derived event, optionally with a new name

EventDispatcherMixin

Mixin providing event dispatching capabilities.

Methods:

  • add_event_listener(name, callback, *, once=False, priority=DEFAULT_PRIORITY) - Register a listener
  • remove_event_listener(name, callback) - Remove a listener (returns bool)
  • clear_event_listeners(name=None) - Clear listeners (all or for specific event)
  • has_event_listeners(name) - Check if event has listeners
  • dispatch(event, *, payload=None, metadata=None, source=_UNSET) - Dispatch synchronously
  • dispatch_async(event, *, payload=None, metadata=None, source=_UNSET) - Dispatch asynchronously
  • dispatch_event(...) - Alias for dispatch
  • dispatch_event_async(...) - Alias for dispatch_async

EventListenerMixin

Mixin for declarative event listeners using decorators.

Class Method:

  • @on(event_name, *, priority=DEFAULT_PRIORITY, once=False) - Decorator to mark methods as listeners

Methods:

  • bind_to_dispatcher(dispatcher) - Bind all decorated methods to a dispatcher
  • unbind_from_dispatcher() - Unbind from current dispatcher
  • get_bound_dispatcher() - Get the currently bound dispatcher

EventPriority

Enum for common priority values.

class EventPriority(IntEnum):
    LOW = -100
    NORMAL = 0
    HIGH = 100

Constant:

  • DEFAULT_PRIORITY = EventPriority.NORMAL

Type Aliases

EventCallback = Callable[[Event], Awaitable[None] | None]

Dataclasses

ListenerRecord

Internal dataclass storing listener information.

@dataclass(slots=True)
class ListenerRecord:
    callback: EventCallback
    once: bool
    priority: int
    order: int

ListenerSpec

Internal dataclass for decorator metadata.

@dataclass(frozen=True, slots=True)
class ListenerSpec:
    name: str
    priority: int
    once: bool

ListenerState

Internal class managing listener binding state.

class ListenerState:
    dispatcher: EventDispatcherMixin | None
    bindings: List[tuple[str, EventCallback]]

Usage Patterns

Synchronous Listener

def my_listener(event: Event) -> None:
    print(event.name)

Asynchronous Listener

async def my_async_listener(event: Event) -> None:
    await some_async_operation()

Decorator Pattern

class MyListener(EventListenerMixin):
    @EventListenerMixin.on("event.name", priority=EventPriority.HIGH)
    def handle_event(self, event: Event) -> None:
        pass

Custom Priority

dispatcher.add_event_listener("event", callback, priority=50)

Once Listener

dispatcher.add_event_listener("event", callback, once=True)

Examples

Plugin System

from wexample_event import Event, EventDispatcherMixin, EventListenerMixin

class Application(EventDispatcherMixin):
    def start(self) -> None:
        self.dispatch("app.startup")
        print("Application started")
    
    def stop(self) -> None:
        self.dispatch("app.shutdown")
        print("Application stopped")

class LoggingPlugin(EventListenerMixin):
    @EventListenerMixin.on("app.startup")
    def on_startup(self, event: Event) -> None:
        print("[LOG] Application is starting...")
    
    @EventListenerMixin.on("app.shutdown")
    def on_shutdown(self, event: Event) -> None:
        print("[LOG] Application is shutting down...")

class MetricsPlugin(EventListenerMixin):
    @EventListenerMixin.on("app.startup")
    def on_startup(self, event: Event) -> None:
        print("[METRICS] Recording startup time")

# Setup
app = Application()
logging_plugin = LoggingPlugin()
metrics_plugin = MetricsPlugin()

logging_plugin.bind_to_dispatcher(app)
metrics_plugin.bind_to_dispatcher(app)

# Run
app.start()
app.stop()

State Change Notifications

from wexample_event import Event, EventDispatcherMixin

class DataStore(EventDispatcherMixin):
    def __init__(self) -> None:
        self._data: dict = {}
    
    def set(self, key: str, value: any) -> None:
        old_value = self._data.get(key)
        self._data[key] = value
        
        self.dispatch(
            "data.changed",
            payload={"key": key, "old": old_value, "new": value}
        )
    
    def get(self, key: str) -> any:
        return self._data.get(key)

# Create store and add listeners
store = DataStore()

def on_data_changed(event: Event) -> None:
    payload = event.payload
    print(f"Data changed: {payload['key']} = {payload['new']}")

store.add_event_listener("data.changed", on_data_changed)

# Use the store
store.set("username", "alice")
store.set("username", "bob")

Request/Response Pipeline

from wexample_event import Event, EventDispatcherMixin, EventPriority

class RequestPipeline(EventDispatcherMixin):
    def process(self, request: dict) -> dict:
        # Create event with mutable response
        response = {"status": "pending", "data": request}
        
        self.dispatch(
            "request.process",
            payload={"request": request, "response": response}
        )
        
        return response

pipeline = RequestPipeline()

# Authentication middleware (high priority)
def authenticate(event: Event) -> None:
    response = event.payload["response"]
    request = event.payload["request"]
    
    if not request.get("auth_token"):
        response["status"] = "error"
        response["message"] = "Authentication required"
    else:
        response["authenticated"] = True

# Validation middleware (normal priority)
def validate(event: Event) -> None:
    response = event.payload["response"]
    if response["status"] == "error":
        return  # Skip if already errored
    
    request = event.payload["request"]
    if not request.get("data"):
        response["status"] = "error"
        response["message"] = "Data required"

# Processing (low priority)
def process_data(event: Event) -> None:
    response = event.payload["response"]
    if response["status"] == "error":
        return
    
    response["status"] = "success"
    response["processed"] = True

pipeline.add_event_listener("request.process", authenticate, priority=EventPriority.HIGH)
pipeline.add_event_listener("request.process", validate, priority=EventPriority.NORMAL)
pipeline.add_event_listener("request.process", process_data, priority=EventPriority.LOW)

# Process requests
result1 = pipeline.process({"auth_token": "abc", "data": {"value": 123}})
print(result1)  # Success

result2 = pipeline.process({"data": {"value": 456}})
print(result2)  # Error: Authentication required

Event Inheritance

from wexample_event import Event, EventDispatcherMixin, EventListenerMixin

class BaseListener(EventListenerMixin):
    @EventListenerMixin.on("base.event")
    def handle_base(self, event: Event) -> None:
        print("Base handler")

class ExtendedListener(BaseListener):
    @EventListenerMixin.on("extended.event")
    def handle_extended(self, event: Event) -> None:
        print("Extended handler")

dispatcher = EventDispatcherMixin()
listener = ExtendedListener()
listener.bind_to_dispatcher(dispatcher)

# Both base and extended events work
dispatcher.dispatch("base.event")      # Prints: Base handler
dispatcher.dispatch("extended.event")  # Prints: Extended handler

Dynamic Event Names

from wexample_event import Event, EventDispatcherMixin

class EventBus(EventDispatcherMixin):
    def emit(self, event_type: str, **data) -> None:
        self.dispatch(f"event.{event_type}", payload=data)

bus = EventBus()

# Add wildcard-style handlers
def handle_user_events(event: Event) -> None:
    print(f"User event: {event.name}")

bus.add_event_listener("event.user.login", handle_user_events)
bus.add_event_listener("event.user.logout", handle_user_events)

# Emit events
bus.emit("user.login", username="alice")
bus.emit("user.logout", username="alice")

event

Version: 0.0.90

Table of Contents

Tests

This project uses pytest for testing and pytest-cov for code coverage analysis.

Installation

First, install the required testing dependencies:

.venv/bin/python -m pip install pytest pytest-cov

Basic Usage

Run all tests with coverage:

.venv/bin/python -m pytest --cov --cov-report=html

Common Commands

# Run tests with coverage for a specific module
.venv/bin/python -m pytest --cov=your_module

# Show which lines are not covered
.venv/bin/python -m pytest --cov=your_module --cov-report=term-missing

# Generate an HTML coverage report
.venv/bin/python -m pytest --cov=your_module --cov-report=html

# Combine terminal and HTML reports
.venv/bin/python -m pytest --cov=your_module --cov-report=term-missing --cov-report=html

# Run specific test file with coverage
.venv/bin/python -m pytest tests/test_file.py --cov=your_module --cov-report=term-missing

Viewing HTML Reports

After generating an HTML report, open htmlcov/index.html in your browser to view detailed line-by-line coverage information.

Coverage Threshold

To enforce a minimum coverage percentage:

.venv/bin/python -m pytest --cov=your_module --cov-fail-under=80

This will cause the test suite to fail if coverage drops below 80%.

Integration in the Suite

This package is part of the Wexample Suite — a collection of high-quality, modular tools designed to work seamlessly together across multiple languages and environments.

Related Packages

The suite includes packages for configuration management, file handling, prompts, and more. Each package can be used independently or as part of the integrated suite.

Visit the Wexample Suite documentation for the complete package ecosystem.

Dependencies

  • wexample-helpers: >=2.0.0

Versioning & Compatibility Policy

Wexample packages follow Semantic Versioning (SemVer):

  • MAJOR: Breaking changes
  • MINOR: New features, backward compatible
  • PATCH: Bug fixes, backward compatible

We maintain backward compatibility within major versions and provide clear migration guides for breaking changes.

License

This project is licensed under the MIT License - see the LICENSE file for details.

Free to use in both personal and commercial projects.

Integration in the Suite

This package is part of the Wexample Suite — a collection of high-quality, modular tools designed to work seamlessly together across multiple languages and environments.

Related Packages

The suite includes packages for configuration management, file handling, prompts, and more. Each package can be used independently or as part of the integrated suite.

Visit the Wexample Suite documentation for the complete package ecosystem.

About us

Wexample stands as a cornerstone of the digital ecosystem — a collective of seasoned engineers, researchers, and creators driven by a relentless pursuit of technological excellence. More than a media platform, it has grown into a vibrant community where innovation meets craftsmanship, and where every line of code reflects a commitment to clarity, durability, and shared intelligence.

This packages suite embodies this spirit. Trusted by professionals and enthusiasts alike, it delivers a consistent, high-quality foundation for modern development — open, elegant, and battle-tested. Its reputation is built on years of collaboration, refinement, and rigorous attention to detail, making it a natural choice for those who demand both robustness and beauty in their tools.

Wexample cultivates a culture of mastery. Each package, each contribution carries the mark of a community that values precision, ethics, and innovation — a community proud to shape the future of digital craftsmanship.

API Reference

Core Classes

Event

Immutable dataclass representing an event.

@dataclass(frozen=True, slots=True)
class Event:
    name: str
    payload: Mapping[str, Any] | None = None
    metadata: Mapping[str, Any] | None = None
    source: Any | None = None
    timestamp: datetime = field(default_factory=lambda: datetime.now(timezone.utc))

Methods:

  • with_update(**changes) - Returns a copy with updated fields
  • derive(name=None, **changes) - Creates a derived event, optionally with a new name

EventDispatcherMixin

Mixin providing event dispatching capabilities.

Methods:

  • add_event_listener(name, callback, *, once=False, priority=DEFAULT_PRIORITY) - Register a listener
  • remove_event_listener(name, callback) - Remove a listener (returns bool)
  • clear_event_listeners(name=None) - Clear listeners (all or for specific event)
  • has_event_listeners(name) - Check if event has listeners
  • dispatch(event, *, payload=None, metadata=None, source=_UNSET) - Dispatch synchronously
  • dispatch_async(event, *, payload=None, metadata=None, source=_UNSET) - Dispatch asynchronously
  • dispatch_event(...) - Alias for dispatch
  • dispatch_event_async(...) - Alias for dispatch_async

EventListenerMixin

Mixin for declarative event listeners using decorators.

Class Method:

  • @on(event_name, *, priority=DEFAULT_PRIORITY, once=False) - Decorator to mark methods as listeners

Methods:

  • bind_to_dispatcher(dispatcher) - Bind all decorated methods to a dispatcher
  • unbind_from_dispatcher() - Unbind from current dispatcher
  • get_bound_dispatcher() - Get the currently bound dispatcher

EventPriority

Enum for common priority values.

class EventPriority(IntEnum):
    LOW = -100
    NORMAL = 0
    HIGH = 100

Constant:

  • DEFAULT_PRIORITY = EventPriority.NORMAL

Type Aliases

EventCallback = Callable[[Event], Awaitable[None] | None]

Dataclasses

ListenerRecord

Internal dataclass storing listener information.

@dataclass(slots=True)
class ListenerRecord:
    callback: EventCallback
    once: bool
    priority: int
    order: int

ListenerSpec

Internal dataclass for decorator metadata.

@dataclass(frozen=True, slots=True)
class ListenerSpec:
    name: str
    priority: int
    once: bool

ListenerState

Internal class managing listener binding state.

class ListenerState:
    dispatcher: EventDispatcherMixin | None
    bindings: List[tuple[str, EventCallback]]

Usage Patterns

Synchronous Listener

def my_listener(event: Event) -> None:
    print(event.name)

Asynchronous Listener

async def my_async_listener(event: Event) -> None:
    await some_async_operation()

Decorator Pattern

class MyListener(EventListenerMixin):
    @EventListenerMixin.on("event.name", priority=EventPriority.HIGH)
    def handle_event(self, event: Event) -> None:
        pass

Custom Priority

dispatcher.add_event_listener("event", callback, priority=50)

Once Listener

dispatcher.add_event_listener("event", callback, once=True)

Examples

Plugin System

from wexample_event import Event, EventDispatcherMixin, EventListenerMixin

class Application(EventDispatcherMixin):
    def start(self) -> None:
        self.dispatch("app.startup")
        print("Application started")
    
    def stop(self) -> None:
        self.dispatch("app.shutdown")
        print("Application stopped")

class LoggingPlugin(EventListenerMixin):
    @EventListenerMixin.on("app.startup")
    def on_startup(self, event: Event) -> None:
        print("[LOG] Application is starting...")
    
    @EventListenerMixin.on("app.shutdown")
    def on_shutdown(self, event: Event) -> None:
        print("[LOG] Application is shutting down...")

class MetricsPlugin(EventListenerMixin):
    @EventListenerMixin.on("app.startup")
    def on_startup(self, event: Event) -> None:
        print("[METRICS] Recording startup time")

# Setup
app = Application()
logging_plugin = LoggingPlugin()
metrics_plugin = MetricsPlugin()

logging_plugin.bind_to_dispatcher(app)
metrics_plugin.bind_to_dispatcher(app)

# Run
app.start()
app.stop()

State Change Notifications

from wexample_event import Event, EventDispatcherMixin

class DataStore(EventDispatcherMixin):
    def __init__(self) -> None:
        self._data: dict = {}
    
    def set(self, key: str, value: any) -> None:
        old_value = self._data.get(key)
        self._data[key] = value
        
        self.dispatch(
            "data.changed",
            payload={"key": key, "old": old_value, "new": value}
        )
    
    def get(self, key: str) -> any:
        return self._data.get(key)

# Create store and add listeners
store = DataStore()

def on_data_changed(event: Event) -> None:
    payload = event.payload
    print(f"Data changed: {payload['key']} = {payload['new']}")

store.add_event_listener("data.changed", on_data_changed)

# Use the store
store.set("username", "alice")
store.set("username", "bob")

Request/Response Pipeline

from wexample_event import Event, EventDispatcherMixin, EventPriority

class RequestPipeline(EventDispatcherMixin):
    def process(self, request: dict) -> dict:
        # Create event with mutable response
        response = {"status": "pending", "data": request}
        
        self.dispatch(
            "request.process",
            payload={"request": request, "response": response}
        )
        
        return response

pipeline = RequestPipeline()

# Authentication middleware (high priority)
def authenticate(event: Event) -> None:
    response = event.payload["response"]
    request = event.payload["request"]
    
    if not request.get("auth_token"):
        response["status"] = "error"
        response["message"] = "Authentication required"
    else:
        response["authenticated"] = True

# Validation middleware (normal priority)
def validate(event: Event) -> None:
    response = event.payload["response"]
    if response["status"] == "error":
        return  # Skip if already errored
    
    request = event.payload["request"]
    if not request.get("data"):
        response["status"] = "error"
        response["message"] = "Data required"

# Processing (low priority)
def process_data(event: Event) -> None:
    response = event.payload["response"]
    if response["status"] == "error":
        return
    
    response["status"] = "success"
    response["processed"] = True

pipeline.add_event_listener("request.process", authenticate, priority=EventPriority.HIGH)
pipeline.add_event_listener("request.process", validate, priority=EventPriority.NORMAL)
pipeline.add_event_listener("request.process", process_data, priority=EventPriority.LOW)

# Process requests
result1 = pipeline.process({"auth_token": "abc", "data": {"value": 123}})
print(result1)  # Success

result2 = pipeline.process({"data": {"value": 456}})
print(result2)  # Error: Authentication required

Event Inheritance

from wexample_event import Event, EventDispatcherMixin, EventListenerMixin

class BaseListener(EventListenerMixin):
    @EventListenerMixin.on("base.event")
    def handle_base(self, event: Event) -> None:
        print("Base handler")

class ExtendedListener(BaseListener):
    @EventListenerMixin.on("extended.event")
    def handle_extended(self, event: Event) -> None:
        print("Extended handler")

dispatcher = EventDispatcherMixin()
listener = ExtendedListener()
listener.bind_to_dispatcher(dispatcher)

# Both base and extended events work
dispatcher.dispatch("base.event")      # Prints: Base handler
dispatcher.dispatch("extended.event")  # Prints: Extended handler

Dynamic Event Names

from wexample_event import Event, EventDispatcherMixin

class EventBus(EventDispatcherMixin):
    def emit(self, event_type: str, **data) -> None:
        self.dispatch(f"event.{event_type}", payload=data)

bus = EventBus()

# Add wildcard-style handlers
def handle_user_events(event: Event) -> None:
    print(f"User event: {event.name}")

bus.add_event_listener("event.user.login", handle_user_events)
bus.add_event_listener("event.user.logout", handle_user_events)

# Emit events
bus.emit("user.login", username="alice")
bus.emit("user.logout", username="alice")

A Python toolkit for managing events and event-driven workflows.

Quickstart

Basic Event Dispatching

from wexample_event.dataclass.event import Event
from wexample_event.common.dispatcher import EventDispatcherMixin

# Create a dispatcher
class MyApp(EventDispatcherMixin):
    pass

app = MyApp()

# Add a listener
def on_user_login(event: Event) -> None:
    print(f"User logged in: {event.payload['username']}")

app.add_event_listener("user.login", on_user_login)

# Dispatch an event
app.dispatch("user.login", payload={"username": "john"})

Using the Listener Mixin

from wexample_event.dataclass.event import Event
from wexample_event.common.listener import EventListenerMixin
from wexample_event.common.dispatcher import EventDispatcherMixin

class MyDispatcher(EventDispatcherMixin):
    pass

class MyListener(EventListenerMixin):
    @EventListenerMixin.on("user.login")
    def handle_login(self, event: Event) -> None:
        print(f"Login handled: {event.payload['username']}")
    
    @EventListenerMixin.on("user.logout")
    def handle_logout(self, event: Event) -> None:
        print("User logged out")

# Setup
dispatcher = MyDispatcher()
listener = MyListener()
listener.bind_to_dispatcher(dispatcher)

# Dispatch events
dispatcher.dispatch("user.login", payload={"username": "jane"})
dispatcher.dispatch("user.logout")

Async Events

import asyncio
from wexample_event.dataclass.event import Event
from wexample_event.common.dispatcher import EventDispatcherMixin

class AsyncApp(EventDispatcherMixin):
    pass

app = AsyncApp()

async def async_handler(event: Event) -> None:
    await asyncio.sleep(0.1)
    print(f"Async handler: {event.name}")

app.add_event_listener("async.event", async_handler)

# Use dispatch_async for async handlers
await app.dispatch_async("async.event")

Priority and Once

from wexample_event.dataclass.event import Event
from wexample_event.common.priority import EventPriority
from wexample_event.common.dispatcher import EventDispatcherMixin

app = EventDispatcherMixin()

# High priority listener (runs first)
def high_priority(event: Event) -> None:
    print("High priority")

# Low priority listener (runs last)
def low_priority(event: Event) -> None:
    print("Low priority")

# One-time listener
def once_handler(event: Event) -> None:
    print("This runs only once")

app.add_event_listener("test", high_priority, priority=EventPriority.HIGH)
app.add_event_listener("test", low_priority, priority=EventPriority.LOW)
app.add_event_listener("test", once_handler, once=True)

app.dispatch("test")  # All three run
app.dispatch("test")  # Only high and low priority run

Known Limitations & Roadmap

Current limitations and planned features are tracked in the GitHub issues.

See the project roadmap for upcoming features and improvements.

Status & Compatibility

Maturity: Production-ready

Python Support: >=3.10

OS Support: Linux, macOS, Windows

Status: Actively maintained

Useful Links

Migration Notes

When upgrading between major versions, refer to the migration guides in the documentation.

Breaking changes are clearly documented with upgrade paths and examples.

Quickstart

Basic Event Dispatching

from wexample_event.dataclass.event import Event
from wexample_event.common.dispatcher import EventDispatcherMixin

# Create a dispatcher
class MyApp(EventDispatcherMixin):
    pass

app = MyApp()

# Add a listener
def on_user_login(event: Event) -> None:
    print(f"User logged in: {event.payload['username']}")

app.add_event_listener("user.login", on_user_login)

# Dispatch an event
app.dispatch("user.login", payload={"username": "john"})

Using the Listener Mixin

from wexample_event.dataclass.event import Event
from wexample_event.common.listener import EventListenerMixin
from wexample_event.common.dispatcher import EventDispatcherMixin

class MyDispatcher(EventDispatcherMixin):
    pass

class MyListener(EventListenerMixin):
    @EventListenerMixin.on("user.login")
    def handle_login(self, event: Event) -> None:
        print(f"Login handled: {event.payload['username']}")
    
    @EventListenerMixin.on("user.logout")
    def handle_logout(self, event: Event) -> None:
        print("User logged out")

# Setup
dispatcher = MyDispatcher()
listener = MyListener()
listener.bind_to_dispatcher(dispatcher)

# Dispatch events
dispatcher.dispatch("user.login", payload={"username": "jane"})
dispatcher.dispatch("user.logout")

Async Events

import asyncio
from wexample_event.dataclass.event import Event
from wexample_event.common.dispatcher import EventDispatcherMixin

class AsyncApp(EventDispatcherMixin):
    pass

app = AsyncApp()

async def async_handler(event: Event) -> None:
    await asyncio.sleep(0.1)
    print(f"Async handler: {event.name}")

app.add_event_listener("async.event", async_handler)

# Use dispatch_async for async handlers
await app.dispatch_async("async.event")

Priority and Once

from wexample_event.dataclass.event import Event
from wexample_event.common.priority import EventPriority
from wexample_event.common.dispatcher import EventDispatcherMixin

app = EventDispatcherMixin()

# High priority listener (runs first)
def high_priority(event: Event) -> None:
    print("High priority")

# Low priority listener (runs last)
def low_priority(event: Event) -> None:
    print("Low priority")

# One-time listener
def once_handler(event: Event) -> None:
    print("This runs only once")

app.add_event_listener("test", high_priority, priority=EventPriority.HIGH)
app.add_event_listener("test", low_priority, priority=EventPriority.LOW)
app.add_event_listener("test", once_handler, once=True)

app.dispatch("test")  # All three run
app.dispatch("test")  # Only high and low priority run

Known Limitations & Roadmap

Current limitations and planned features are tracked in the GitHub issues.

See the project roadmap for upcoming features and improvements.

Status & Compatibility

Maturity: Production-ready

Python Support: >=3.10

OS Support: Linux, macOS, Windows

Status: Actively maintained

Useful Links

Migration Notes

When upgrading between major versions, refer to the migration guides in the documentation.

Breaking changes are clearly documented with upgrade paths and examples.

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

wexample_event-6.0.23.tar.gz (20.5 kB view details)

Uploaded Source

Built Distribution

If you're not sure about the file name format, learn more about wheel file names.

wexample_event-6.0.23-py3-none-any.whl (12.5 kB view details)

Uploaded Python 3

File details

Details for the file wexample_event-6.0.23.tar.gz.

File metadata

  • Download URL: wexample_event-6.0.23.tar.gz
  • Upload date:
  • Size: 20.5 kB
  • Tags: Source
  • Uploaded using Trusted Publishing? No
  • Uploaded via: pdm/2.25.9 CPython/3.12.3 Linux/6.8.0-110-generic

File hashes

Hashes for wexample_event-6.0.23.tar.gz
Algorithm Hash digest
SHA256 6b8eef4966a14b37a642d7935fc6ea4fac9f6101d3925603639928f66450752c
MD5 94df3ed2d8c742bb71bf58063dd9e34c
BLAKE2b-256 ab539fc3451839681f0c6d698416432f07f0226a78250d875fb92b8abdbaae29

See more details on using hashes here.

File details

Details for the file wexample_event-6.0.23-py3-none-any.whl.

File metadata

  • Download URL: wexample_event-6.0.23-py3-none-any.whl
  • Upload date:
  • Size: 12.5 kB
  • Tags: Python 3
  • Uploaded using Trusted Publishing? No
  • Uploaded via: pdm/2.25.9 CPython/3.12.3 Linux/6.8.0-110-generic

File hashes

Hashes for wexample_event-6.0.23-py3-none-any.whl
Algorithm Hash digest
SHA256 947ee55c64daec63cd290bbbeb17388e1b35a571bbd12572c7ce4bfc40b30e87
MD5 b036ce52a22bbb0befdb430240abf1d1
BLAKE2b-256 f2407ed14779b6045ebe346cac7439b68576682e4f2f58795680ef4fb1dc0817

See more details on using hashes here.

Supported by

AWS Cloud computing and Security Sponsor Datadog Monitoring Depot Continuous Integration Fastly CDN Google Download Analytics Pingdom Monitoring Sentry Error logging StatusPage Status page