Skip to main content

Type-safe event emitter with sync and async listener support

Project description

philiprehberger-event-emitter

Tests PyPI version GitHub release Last updated License Bug Reports Feature Requests Sponsor

Type-safe event emitter with sync and async listener support.

Installation

pip install philiprehberger-event-emitter

Usage

Basic Events

from philiprehberger_event_emitter import EventEmitter

emitter = EventEmitter()

def on_user_created(user):
    print(f"User created: {user['name']}")

emitter.on("user:created", on_user_created)
emitter.emit("user:created", {"name": "Alice"})

Unsubscribe

# Using the returned unsubscribe function
unsubscribe = emitter.on("event", handler)
unsubscribe()

# Or manually
emitter.off("event", handler)

One-Time Listeners

emitter.once("init", lambda: print("Only fires once"))
emitter.emit("init")  # prints
emitter.emit("init")  # nothing

Prepend Listeners

# Insert listener at the front of the queue (fires before existing listeners)
emitter.on("data", second_handler)
emitter.prepend("data", first_handler)

emitter.emit("data", payload)  # first_handler runs before second_handler

# One-shot version
emitter.prepend_once("data", one_time_first_handler)

Middleware / Interceptors

# Middleware receives (event, args, kwargs) and can modify or cancel emissions
def logging_middleware(event, args, kwargs):
    print(f"Event fired: {event}")
    return True  # allow emission to proceed

def block_middleware(event, args, kwargs):
    if event == "secret":
        return False  # cancel emission
    return True

remove_logger = emitter.use(logging_middleware)
emitter.use(block_middleware)

emitter.emit("hello", "world")   # logged, listeners fire
emitter.emit("secret", "data")   # blocked, no listeners fire

remove_logger()  # remove the logging middleware

Async Listeners

async def async_handler(data):
    await save_to_db(data)

emitter.on("data:received", async_handler)

# Use async_emit to await async listeners
await emitter.async_emit("data:received", {"key": "value"})

Wait for an Event

import asyncio

async def main():
    emitter = EventEmitter()

    # Schedule an emission after a delay
    async def delayed_emit():
        await asyncio.sleep(0.1)
        emitter.emit("ready", "payload")

    asyncio.create_task(delayed_emit())

    # Block until "ready" fires (with optional timeout)
    args, kwargs = await emitter.wait_for("ready", timeout=5.0)
    print(args[0])  # "payload"

asyncio.run(main())

Emit with Timeout

import asyncio

async def slow_handler(data):
    await asyncio.sleep(10)
    return "done"

emitter.on("process", slow_handler)

# Only returns results from listeners that complete within the timeout
results = await emitter.emit_with_timeout("process", timeout=2.0, data="value")
print(results)  # [] (slow_handler timed out)

Max Listeners

# Warn when too many listeners are added (helps detect memory leaks)
emitter = EventEmitter(max_listeners=10)

Management

emitter.listener_count("event")      # number of listeners
emitter.event_names()                 # list of events with listeners
emitter.remove_all_listeners("event") # remove all for one event
emitter.remove_all_listeners()        # remove all listeners

API

Function / Class Description
EventEmitter(max_listeners=None) Create a new emitter
.on(event, listener) Register listener, returns unsubscribe function
.once(event, listener) Register one-time listener
.prepend(event, listener) Insert listener at front of queue, returns unsubscribe function
.prepend_once(event, listener) One-shot prepend listener
.off(event, listener) Remove a listener
.use(middleware) Register middleware that can modify/cancel emissions, returns remove function
.emit(event, *args, **kwargs) Emit event synchronously
.async_emit(event, *args, **kwargs) Emit event, awaiting async listeners
.wait_for(event, timeout=None) Async wait for an event, returns (args, kwargs)
.emit_with_timeout(event, timeout, *args, **kwargs) Emit with per-listener timeout, returns list of results
.listener_count(event) Count listeners for an event
.event_names() List events with listeners
.remove_all_listeners(event?) Remove all or event-specific listeners

Development

pip install -e .
python -m pytest tests/ -v

Support

If you find this package useful, consider giving it a star on GitHub — it helps motivate continued maintenance and development.

LinkedIn More packages

License

MIT

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

philiprehberger_event_emitter-0.4.0.tar.gz (8.6 kB view details)

Uploaded Source

Built Distribution

If you're not sure about the file name format, learn more about wheel file names.

philiprehberger_event_emitter-0.4.0-py3-none-any.whl (6.4 kB view details)

Uploaded Python 3

File details

Details for the file philiprehberger_event_emitter-0.4.0.tar.gz.

File metadata

File hashes

Hashes for philiprehberger_event_emitter-0.4.0.tar.gz
Algorithm Hash digest
SHA256 4f4ace320b34bfb6d0b266c6dd603b635a865bf27447d85e1d07419996e16cd3
MD5 5737736efaec26a4c71eb8cb5becda57
BLAKE2b-256 75adbba55ffbb622fbd00d0319f871185d1826d99192b62a5ef68cc3de828f19

See more details on using hashes here.

File details

Details for the file philiprehberger_event_emitter-0.4.0-py3-none-any.whl.

File metadata

File hashes

Hashes for philiprehberger_event_emitter-0.4.0-py3-none-any.whl
Algorithm Hash digest
SHA256 3d68c94cfc3af82da202c94bca6284a68d8b389fc6d68bf11f3bebf9a818f18c
MD5 3c140dde4e196c00ce69d227997637b9
BLAKE2b-256 e7042b5890e57f7a4216c1dd31929f235cc606df016f5c07aa7a5a51d0bbe981

See more details on using hashes here.

Supported by

AWS Cloud computing and Security Sponsor Datadog Monitoring Depot Continuous Integration Fastly CDN Google Download Analytics Pingdom Monitoring Sentry Error logging StatusPage Status page