Skip to main content

Type-safe event emitter with sync and async listener support

Project description

philiprehberger-event-emitter

Tests PyPI version Last updated

Type-safe event emitter with sync and async listener support.

Installation

pip install philiprehberger-event-emitter

Usage

Basic Events

from philiprehberger_event_emitter import EventEmitter

emitter = EventEmitter()

def on_user_created(user):
    print(f"User created: {user['name']}")

emitter.on("user:created", on_user_created)
emitter.emit("user:created", {"name": "Alice"})

Unsubscribe

# Using the returned unsubscribe function
unsubscribe = emitter.on("event", handler)
unsubscribe()

# Or manually
emitter.off("event", handler)

One-Time Listeners

emitter.once("init", lambda: print("Only fires once"))
emitter.emit("init")  # prints
emitter.emit("init")  # nothing

Prepend Listeners

# Insert listener at the front of the queue (fires before existing listeners)
emitter.on("data", second_handler)
emitter.prepend("data", first_handler)

emitter.emit("data", payload)  # first_handler runs before second_handler

# One-shot version
emitter.prepend_once("data", one_time_first_handler)

Middleware / Interceptors

# Middleware receives (event, args, kwargs) and can modify or cancel emissions
def logging_middleware(event, args, kwargs):
    print(f"Event fired: {event}")
    return True  # allow emission to proceed

def block_middleware(event, args, kwargs):
    if event == "secret":
        return False  # cancel emission
    return True

remove_logger = emitter.use(logging_middleware)
emitter.use(block_middleware)

emitter.emit("hello", "world")   # logged, listeners fire
emitter.emit("secret", "data")   # blocked, no listeners fire

remove_logger()  # remove the logging middleware

Async Listeners

async def async_handler(data):
    await save_to_db(data)

emitter.on("data:received", async_handler)

# Use async_emit to await async listeners
await emitter.async_emit("data:received", {"key": "value"})

Wait for an Event

import asyncio

async def main():
    emitter = EventEmitter()

    # Schedule an emission after a delay
    async def delayed_emit():
        await asyncio.sleep(0.1)
        emitter.emit("ready", "payload")

    asyncio.create_task(delayed_emit())

    # Block until "ready" fires (with optional timeout)
    args, kwargs = await emitter.wait_for("ready", timeout=5.0)
    print(args[0])  # "payload"

asyncio.run(main())

Collect return values

# Sync: collect listener return values in registration order
emitter.on("compute", lambda x: x * 2)
emitter.on("compute", lambda x: x + 100)
results = emitter.emit_and_collect("compute", 5)
print(results)  # [10, 105]

# Async: awaits coroutine listeners and collects mixed sync + async returns
import asyncio

async def main():
    emitter = EventEmitter()

    def sync_handler(x):
        return f"sync:{x}"

    async def async_handler(x):
        return f"async:{x}"

    emitter.on("evt", sync_handler)
    emitter.on("evt", async_handler)
    results = await emitter.async_emit_and_collect("evt", 1)
    print(results)  # ["sync:1", "async:1"]

asyncio.run(main())

emit_and_collect is sync-only and raises TypeError if any registered listener is a coroutine function — use async_emit_and_collect in that case.

Emit with Timeout

import asyncio

async def slow_handler(data):
    await asyncio.sleep(10)
    return "done"

emitter.on("process", slow_handler)

# Only returns results from listeners that complete within the timeout
results = await emitter.emit_with_timeout("process", timeout=2.0, data="value")
print(results)  # [] (slow_handler timed out)

Max Listeners

# Warn when too many listeners are added (helps detect memory leaks)
emitter = EventEmitter(max_listeners=10)

Management

emitter.listener_count("event")      # number of listeners
emitter.event_names()                 # list of events with listeners
emitter.remove_all_listeners("event") # remove all for one event
emitter.remove_all_listeners()        # remove all listeners

API

Function / Class Description
EventEmitter(max_listeners=None) Create a new emitter
.on(event, listener) Register listener, returns unsubscribe function
.once(event, listener) Register one-time listener
.prepend(event, listener) Insert listener at front of queue, returns unsubscribe function
.prepend_once(event, listener) One-shot prepend listener
.off(event, listener) Remove a listener
.use(middleware) Register middleware that can modify/cancel emissions, returns remove function
.emit(event, *args, **kwargs) Emit event synchronously
.emit_and_collect(event, *args, **kwargs) Emit synchronously and return list of listener return values (raises if any listener is async)
.async_emit(event, *args, **kwargs) Emit event, awaiting async listeners
.async_emit_and_collect(event, *args, **kwargs) Emit, awaiting coroutines, and return list of listener return values
.wait_for(event, timeout=None) Async wait for an event, returns (args, kwargs)
.emit_with_timeout(event, timeout, *args, **kwargs) Emit with per-listener timeout, returns list of results
.listener_count(event) Count listeners for an event
.event_names() List events with listeners
.remove_all_listeners(event?) Remove all or event-specific listeners

Development

pip install -e .
python -m pytest tests/ -v

Support

If you find this project useful:

Star the repo

🐛 Report issues

💡 Suggest features

❤️ Sponsor development

🌐 All Open Source Projects

💻 GitHub Profile

🔗 LinkedIn Profile

License

MIT

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

philiprehberger_event_emitter-0.5.0.tar.gz (9.7 kB view details)

Uploaded Source

Built Distribution

If you're not sure about the file name format, learn more about wheel file names.

philiprehberger_event_emitter-0.5.0-py3-none-any.whl (7.0 kB view details)

Uploaded Python 3

File details

Details for the file philiprehberger_event_emitter-0.5.0.tar.gz.

File metadata

File hashes

Hashes for philiprehberger_event_emitter-0.5.0.tar.gz
Algorithm Hash digest
SHA256 3bdbb1d0919b0b53ef233e2cd4abf06df2d343674518d70308ec2eb8fecd56fb
MD5 497df2586d0731b5be5441ce3119010d
BLAKE2b-256 de2e7055b418006f5c32095eb878fc4960c6dcc08f222b275f029bc2c362bfe2

See more details on using hashes here.

File details

Details for the file philiprehberger_event_emitter-0.5.0-py3-none-any.whl.

File metadata

File hashes

Hashes for philiprehberger_event_emitter-0.5.0-py3-none-any.whl
Algorithm Hash digest
SHA256 2eb30d3f687e7ed852f6ab617dcc9e31b32fc12d67bd8a4a3f6461316e45d68c
MD5 9ca12a07c4e6d2ec32a9ed2502a839c9
BLAKE2b-256 03e82a411fa4a9c324f51bdb6922d1654725c54c3ecf2e32d0085c93133041a1

See more details on using hashes here.

Supported by

AWS Cloud computing and Security Sponsor Datadog Monitoring Depot Continuous Integration Fastly CDN Google Download Analytics Pingdom Monitoring Sentry Error logging StatusPage Status page