Skip to main content

Type-safe event emitter with sync and async listener support

Project description

philiprehberger-event-emitter

Tests PyPI version Last updated

Type-safe event emitter with sync and async listener support.

Installation

pip install philiprehberger-event-emitter

Usage

Basic Events

from philiprehberger_event_emitter import EventEmitter

emitter = EventEmitter()

def on_user_created(user):
    print(f"User created: {user['name']}")

emitter.on("user:created", on_user_created)
emitter.emit("user:created", {"name": "Alice"})

Unsubscribe

# Using the returned unsubscribe function
unsubscribe = emitter.on("event", handler)
unsubscribe()

# Or manually
emitter.off("event", handler)

One-Time Listeners

emitter.once("init", lambda: print("Only fires once"))
emitter.emit("init")  # prints
emitter.emit("init")  # nothing

Prepend Listeners

# Insert listener at the front of the queue (fires before existing listeners)
emitter.on("data", second_handler)
emitter.prepend("data", first_handler)

emitter.emit("data", payload)  # first_handler runs before second_handler

# One-shot version
emitter.prepend_once("data", one_time_first_handler)

Middleware / Interceptors

# Middleware receives (event, args, kwargs) and can modify or cancel emissions
def logging_middleware(event, args, kwargs):
    print(f"Event fired: {event}")
    return True  # allow emission to proceed

def block_middleware(event, args, kwargs):
    if event == "secret":
        return False  # cancel emission
    return True

remove_logger = emitter.use(logging_middleware)
emitter.use(block_middleware)

emitter.emit("hello", "world")   # logged, listeners fire
emitter.emit("secret", "data")   # blocked, no listeners fire

remove_logger()  # remove the logging middleware

Async Listeners

async def async_handler(data):
    await save_to_db(data)

emitter.on("data:received", async_handler)

# Use async_emit to await async listeners
await emitter.async_emit("data:received", {"key": "value"})

Wait for an Event

import asyncio

async def main():
    emitter = EventEmitter()

    # Schedule an emission after a delay
    async def delayed_emit():
        await asyncio.sleep(0.1)
        emitter.emit("ready", "payload")

    asyncio.create_task(delayed_emit())

    # Block until "ready" fires (with optional timeout)
    args, kwargs = await emitter.wait_for("ready", timeout=5.0)
    print(args[0])  # "payload"

asyncio.run(main())

Emit with Timeout

import asyncio

async def slow_handler(data):
    await asyncio.sleep(10)
    return "done"

emitter.on("process", slow_handler)

# Only returns results from listeners that complete within the timeout
results = await emitter.emit_with_timeout("process", timeout=2.0, data="value")
print(results)  # [] (slow_handler timed out)

Max Listeners

# Warn when too many listeners are added (helps detect memory leaks)
emitter = EventEmitter(max_listeners=10)

Management

emitter.listener_count("event")      # number of listeners
emitter.event_names()                 # list of events with listeners
emitter.remove_all_listeners("event") # remove all for one event
emitter.remove_all_listeners()        # remove all listeners

API

Function / Class Description
EventEmitter(max_listeners=None) Create a new emitter
.on(event, listener) Register listener, returns unsubscribe function
.once(event, listener) Register one-time listener
.prepend(event, listener) Insert listener at front of queue, returns unsubscribe function
.prepend_once(event, listener) One-shot prepend listener
.off(event, listener) Remove a listener
.use(middleware) Register middleware that can modify/cancel emissions, returns remove function
.emit(event, *args, **kwargs) Emit event synchronously
.async_emit(event, *args, **kwargs) Emit event, awaiting async listeners
.wait_for(event, timeout=None) Async wait for an event, returns (args, kwargs)
.emit_with_timeout(event, timeout, *args, **kwargs) Emit with per-listener timeout, returns list of results
.listener_count(event) Count listeners for an event
.event_names() List events with listeners
.remove_all_listeners(event?) Remove all or event-specific listeners

Development

pip install -e .
python -m pytest tests/ -v

Support

If you find this project useful:

Star the repo

🐛 Report issues

💡 Suggest features

❤️ Sponsor development

🌐 All Open Source Projects

💻 GitHub Profile

🔗 LinkedIn Profile

License

MIT

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

philiprehberger_event_emitter-0.4.1.tar.gz (8.6 kB view details)

Uploaded Source

Built Distribution

If you're not sure about the file name format, learn more about wheel file names.

File details

Details for the file philiprehberger_event_emitter-0.4.1.tar.gz.

File metadata

File hashes

Hashes for philiprehberger_event_emitter-0.4.1.tar.gz
Algorithm Hash digest
SHA256 adca935a5cb40247a7148e37372e08fd0f30bfad8f7c6ee5a7d911c8ca9f432f
MD5 13fb9db419ba23b44624aab61671fd70
BLAKE2b-256 0670573ef17416194eae24a82219f9ff23ccc74e9dfea73ec5af417e5202bbdc

See more details on using hashes here.

File details

Details for the file philiprehberger_event_emitter-0.4.1-py3-none-any.whl.

File metadata

File hashes

Hashes for philiprehberger_event_emitter-0.4.1-py3-none-any.whl
Algorithm Hash digest
SHA256 ce20ebf4da03c2cd68e3b8968a741fb965f72f1da4136f1260b2a3fba44c3594
MD5 e6b9a4cb9a021085c0a1d34f0fcfa9ff
BLAKE2b-256 3d39621c5fad089947b663627169faf5a18dea498031e9b6a1b3dd7a499de94b

See more details on using hashes here.

Supported by

AWS Cloud computing and Security Sponsor Datadog Monitoring Depot Continuous Integration Fastly CDN Google Download Analytics Pingdom Monitoring Sentry Error logging StatusPage Status page