Thread-safe multi-channel event bus with glob filtering, backpressure, and sync/async iteration.
Project description
. . c o d e c h u . .
. \ | / e v e n t s \ | .
((((( ── ((•)) ──────────── ((•)) ── )))))
' / | \ / | '
' ' scan.* ui.click '
Thread-safe multi-channel pub/sub — emit from anywhere, listen everywhere.
codechu-events
Thread-safe multi-channel event bus for Python. Pure stdlib, ~150 LOC.
pip install codechu-events
What it gives you
- Multi-channel pub/sub with glob-pattern filtering:
["scan.*", "ui.click"] - Thread-safe — emit from any thread, never blocks
- Bounded queues per subscriber — slow consumers drop events, fast publishers never wait
- Sync + async iteration —
for ev in sub:orasync for ev in sub.aiter(): - Context manager for clean unsubscribe:
with bus.subscribe_ctx([...]) as sub: - Heartbeat support for dead-connection detection on idle channels
- Resource limits — max subscribers + max queue depth, bounded by design
- Stats for monitoring (subscriber count, drop count, queue depth)
Quick examples
Basic usage
Construct a Bus() explicitly — there is no module-level default, so
ownership and lifetime stay in the caller's hands.
from codechu_events import Bus
bus = Bus()
with bus.subscribe_ctx(["scan.*"]) as sub:
bus.emit("scan.started", path="/home")
bus.emit("scan.progress", count=42)
bus.emit("scan.finished", count=128, ok=True)
bus.emit("foo.bar") # filtered out
for ev in sub:
print(ev["event"], ev)
Multiple isolated buses
A single process can run multiple independent buses — useful for separating domains (e.g. UI events vs telemetry) or for testing:
from codechu_events import Bus
ui_bus = Bus()
telemetry_bus = Bus(max_subscribers=128) # larger cap for telemetry
ui_sub = ui_bus.subscribe(["ui.*"])
telemetry_sub = telemetry_bus.subscribe(["metric.*"])
ui_bus.emit("ui.click", button="ok")
telemetry_bus.emit("metric.fps", value=58)
# ui_sub only sees ui.click; telemetry_sub only sees metric.fps
Custom subscription class (field-based filter)
For filtering beyond glob, subclass Subscription and override matches():
from codechu_events import Bus, Subscription
class PanelFilter(Subscription):
"""Only events with event['panel'] == 'suggestion'."""
def matches(self, event_type, event=None):
if event is None:
return True # cheap type-check pass; final check at push
return event.get("panel") == "suggestion"
bus = Bus()
sub = bus.subscribe(["*"], subscription_class=PanelFilter)
bus.emit("scan.started", panel="suggestion") # delivered
bus.emit("scan.started", panel="treemap") # rejected
Async iteration
import asyncio
from codechu_events import Bus
bus = Bus()
async def consume():
with bus.subscribe_ctx(["scan.*"], heartbeat_sec=5.0) as sub:
async for ev in sub.aiter():
print(ev)
asyncio.run(consume())
Documentation
- API reference — every public symbol, glob syntax, heartbeat semantics.
- Migration guide — v0.1 → v0.2 (module-level shims removed).
- Recipes — singleton bus, isolated buses, async, backpressure, heartbeat, tests.
API reference
Bus
| Method | Purpose |
|---|---|
Bus(max_subscribers=64, queue_max=200) |
Construct an independent bus. |
bus.emit(event_type, **fields) |
Publish event. Never blocks. |
bus.subscribe(types=["*"], heartbeat_sec=5.0) |
Create a Subscription. Caller must unsubscribe(). |
bus.subscribe_ctx(types, heartbeat_sec=5.0) |
Context manager (auto-unsubscribe on exit). |
bus.unsubscribe(sub) |
Idempotent removal + sub.close(). |
bus.stats() |
Dict with subscriber count, total emitted, drop counts. |
bus.subscriber_count() |
Active subscription count. |
bus.reset() |
Close all subscriptions and zero counters. |
Subscription
| Member | Purpose |
|---|---|
for ev in sub: |
Sync blocking iteration. Auto-emits _keepalive on idle. |
async for ev in sub.aiter(): |
Async iteration with event loop. |
sub.dropped |
Count of events dropped due to slow consumer. |
sub.received |
Count of events accepted. |
sub.close() |
Stop iteration (sentinel injected). |
Resource limits
| Constant | Default | Tweakable |
|---|---|---|
QUEUE_MAX |
200 | Max events per subscriber queue (drops on overflow) |
MAX_SUBSCRIBERS |
64 | Max concurrent subscribers |
DEFAULT_HEARTBEAT_SEC |
5.0 | Idle keepalive interval |
Exceeding MAX_SUBSCRIBERS raises SubscriberLimitExceeded.
License
MIT — see LICENSE.
Part of Codechu.
Project details
Release history Release notifications | RSS feed
Download files
Download the file for your platform. If you're not sure which to choose, learn more about installing packages.
Source Distribution
Built Distribution
Filter files by name, interpreter, ABI, and platform.
If you're not sure about the file name format, learn more about wheel file names.
Copy a direct link to the current filters
File details
Details for the file codechu_events-0.3.0.tar.gz.
File metadata
- Download URL: codechu_events-0.3.0.tar.gz
- Upload date:
- Size: 14.8 kB
- Tags: Source
- Uploaded using Trusted Publishing? Yes
- Uploaded via: twine/6.1.0 CPython/3.13.12
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
057ad1be022595ffff10744c58c7a4eef21cdf00421f755ef71bf223ee862f25
|
|
| MD5 |
c0be3ca0beeca6baec75f73fbe0d8845
|
|
| BLAKE2b-256 |
4d6deebf8ad9c9cc3250ada1e227339c1766711d94f7ad608ed0a9e8d87299a9
|
Provenance
The following attestation bundles were made for codechu_events-0.3.0.tar.gz:
Publisher:
release.yml on codechu/events-py
-
Statement:
-
Statement type:
https://in-toto.io/Statement/v1 -
Predicate type:
https://docs.pypi.org/attestations/publish/v1 -
Subject name:
codechu_events-0.3.0.tar.gz -
Subject digest:
057ad1be022595ffff10744c58c7a4eef21cdf00421f755ef71bf223ee862f25 - Sigstore transparency entry: 1582031843
- Sigstore integration time:
-
Permalink:
codechu/events-py@9d29b1bdb22ce3ffd6611c69f12e98ad198e7c88 -
Branch / Tag:
refs/tags/v0.3.0 - Owner: https://github.com/codechu
-
Access:
public
-
Token Issuer:
https://token.actions.githubusercontent.com -
Runner Environment:
github-hosted -
Publication workflow:
release.yml@9d29b1bdb22ce3ffd6611c69f12e98ad198e7c88 -
Trigger Event:
push
-
Statement type:
File details
Details for the file codechu_events-0.3.0-py3-none-any.whl.
File metadata
- Download URL: codechu_events-0.3.0-py3-none-any.whl
- Upload date:
- Size: 11.8 kB
- Tags: Python 3
- Uploaded using Trusted Publishing? Yes
- Uploaded via: twine/6.1.0 CPython/3.13.12
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
5cf8275b5b67b6daf7484a3422049578681f01d35fd9649576e02f5c23caf8de
|
|
| MD5 |
80c0771b9c45c630abbfb87fa051090b
|
|
| BLAKE2b-256 |
69e3f4bf2d8c91853bd56f1d7434b21c8e2ddf773668c03cadda5f367b78b32c
|
Provenance
The following attestation bundles were made for codechu_events-0.3.0-py3-none-any.whl:
Publisher:
release.yml on codechu/events-py
-
Statement:
-
Statement type:
https://in-toto.io/Statement/v1 -
Predicate type:
https://docs.pypi.org/attestations/publish/v1 -
Subject name:
codechu_events-0.3.0-py3-none-any.whl -
Subject digest:
5cf8275b5b67b6daf7484a3422049578681f01d35fd9649576e02f5c23caf8de - Sigstore transparency entry: 1582031961
- Sigstore integration time:
-
Permalink:
codechu/events-py@9d29b1bdb22ce3ffd6611c69f12e98ad198e7c88 -
Branch / Tag:
refs/tags/v0.3.0 - Owner: https://github.com/codechu
-
Access:
public
-
Token Issuer:
https://token.actions.githubusercontent.com -
Runner Environment:
github-hosted -
Publication workflow:
release.yml@9d29b1bdb22ce3ffd6611c69f12e98ad198e7c88 -
Trigger Event:
push
-
Statement type: