Thread-safe multi-channel event bus with glob filtering, backpressure, and sync/async iteration.
Project description
codechu-events
Thread-safe multi-channel event bus for Python. Pure stdlib, ~150 LOC.
pip install codechu-events
What it gives you
- Multi-channel pub/sub with glob-pattern filtering:
["scan.*", "ui.click"] - Thread-safe — emit from any thread, never blocks
- Bounded queues per subscriber — slow consumers drop events, fast publishers never wait
- Sync + async iteration —
for ev in sub:orasync for ev in sub.aiter(): - Context manager for clean unsubscribe:
with subscribe_ctx([...]) as sub: - Heartbeat support for dead-connection detection on idle channels
- Resource limits — max subscribers + max queue depth, bounded by design
- Stats for monitoring (subscriber count, drop count, queue depth)
Quick examples
Default global bus (simple programs)
import codechu_events as events
# Subscriber (sync iteration)
def consume():
with events.subscribe_ctx(["scan.*", "ui.click"]) as sub:
for ev in sub:
print(ev["event"], ev)
# Publisher (any thread, never blocks)
events.emit("scan.started", path="/home")
events.emit("scan.progress", count=42)
events.emit("scan.finished", count=128, ok=True)
events.emit("ui.click", button="cancel") # also delivered
events.emit("foo.bar") # filtered out
Multiple isolated buses
A single process can run multiple independent buses — useful for separating domains (e.g. UI events vs telemetry) or for testing:
from codechu_events import Bus
ui_bus = Bus()
telemetry_bus = Bus(max_subscribers=128) # larger cap for telemetry
ui_sub = ui_bus.subscribe(["ui.*"])
telemetry_sub = telemetry_bus.subscribe(["metric.*"])
ui_bus.emit("ui.click", button="ok")
telemetry_bus.emit("metric.fps", value=58)
# ui_sub only sees ui.click; telemetry_sub only sees metric.fps
Custom subscription class (field-based filter)
For filtering beyond glob, subclass Subscription and override matches():
from codechu_events import Bus, Subscription
class PanelFilter(Subscription):
"""Only events with event['panel'] == 'suggestion'."""
def matches(self, event_type, event=None):
if event is None:
return True # cheap type-check pass; final check at push
return event.get("panel") == "suggestion"
bus = Bus()
sub = bus.subscribe(["*"], subscription_class=PanelFilter)
bus.emit("scan.started", panel="suggestion") # delivered
bus.emit("scan.started", panel="treemap") # rejected
Async iteration
import asyncio, codechu_events as events
async def consume():
with events.subscribe_ctx(["scan.*"], heartbeat_sec=5.0) as sub:
async for ev in sub.aiter():
print(ev)
asyncio.run(consume())
API reference
| Function | Purpose |
|---|---|
emit(event_type, **fields) |
Publish event. Never blocks. |
subscribe(types=["*"], heartbeat_sec=5.0) |
Create a Subscription. Caller must unsubscribe(). |
subscribe_ctx(types, heartbeat_sec=5.0) |
Same as subscribe(), but a context manager (auto-unsubscribe). |
stats() |
Returns dict with subscriber count, total emitted, drop counts. |
Subscription API
| Member | Purpose |
|---|---|
for ev in sub: |
Sync blocking iteration. Auto-emits _keepalive on idle. |
async for ev in sub.aiter(): |
Async iteration with event loop. |
sub.dropped |
Count of events dropped due to slow consumer. |
sub.received |
Count of events accepted. |
sub.close() |
Stop iteration (sentinel injected). |
Resource limits
| Constant | Default | Tweakable |
|---|---|---|
QUEUE_MAX |
200 | Max events per subscriber queue (drops on overflow) |
MAX_SUBSCRIBERS |
64 | Max concurrent subscribers |
DEFAULT_HEARTBEAT_SEC |
5.0 | Idle keepalive interval |
Exceeding MAX_SUBSCRIBERS raises SubscriberLimitExceeded.
License
MIT — see LICENSE.
Part of Codechu.
Project details
Release history Release notifications | RSS feed
Download files
Download the file for your platform. If you're not sure which to choose, learn more about installing packages.
Source Distribution
Built Distribution
Filter files by name, interpreter, ABI, and platform.
If you're not sure about the file name format, learn more about wheel file names.
Copy a direct link to the current filters
File details
Details for the file codechu_events-0.1.0.tar.gz.
File metadata
- Download URL: codechu_events-0.1.0.tar.gz
- Upload date:
- Size: 12.4 kB
- Tags: Source
- Uploaded using Trusted Publishing? No
- Uploaded via: twine/6.2.0 CPython/3.12.3
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
e2caa0a201fae918b69ecccd94825ca8277fda1ac5505f5a4dfb07cb2d294cba
|
|
| MD5 |
d5316cebf2e8aa9f8fd62dc66aac2916
|
|
| BLAKE2b-256 |
3ff0d9da3413c8496ca7bb09defaf428b2720033ed62099b35a59a4b015362f0
|
File details
Details for the file codechu_events-0.1.0-py3-none-any.whl.
File metadata
- Download URL: codechu_events-0.1.0-py3-none-any.whl
- Upload date:
- Size: 9.1 kB
- Tags: Python 3
- Uploaded using Trusted Publishing? No
- Uploaded via: twine/6.2.0 CPython/3.12.3
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
dc9f180ba501a65e896656f8f86f7dd24394ea72146923938f774c60956c8e5f
|
|
| MD5 |
a67cabde3b947afe067bfc4d4fb2782b
|
|
| BLAKE2b-256 |
912aa8f36c37dab620adaec611d5015271d5ec1c4967b53926a3d99609550fa3
|