Skip to main content

Thread-safe multi-channel event bus with glob filtering, backpressure, and sync/async iteration.

Project description

codechu-events

Thread-safe multi-channel event bus for Python. Pure stdlib, ~150 LOC.

pip install codechu-events

What it gives you

  • Multi-channel pub/sub with glob-pattern filtering: ["scan.*", "ui.click"]
  • Thread-safe — emit from any thread, never blocks
  • Bounded queues per subscriber — slow consumers drop events, fast publishers never wait
  • Sync + async iterationfor ev in sub: or async for ev in sub.aiter():
  • Context manager for clean unsubscribe: with subscribe_ctx([...]) as sub:
  • Heartbeat support for dead-connection detection on idle channels
  • Resource limits — max subscribers + max queue depth, bounded by design
  • Stats for monitoring (subscriber count, drop count, queue depth)

Quick examples

Default global bus (simple programs)

import codechu_events as events

# Subscriber (sync iteration)
def consume():
    with events.subscribe_ctx(["scan.*", "ui.click"]) as sub:
        for ev in sub:
            print(ev["event"], ev)

# Publisher (any thread, never blocks)
events.emit("scan.started", path="/home")
events.emit("scan.progress", count=42)
events.emit("scan.finished", count=128, ok=True)
events.emit("ui.click", button="cancel")     # also delivered
events.emit("foo.bar")                        # filtered out

Multiple isolated buses

A single process can run multiple independent buses — useful for separating domains (e.g. UI events vs telemetry) or for testing:

from codechu_events import Bus

ui_bus = Bus()
telemetry_bus = Bus(max_subscribers=128)  # larger cap for telemetry

ui_sub = ui_bus.subscribe(["ui.*"])
telemetry_sub = telemetry_bus.subscribe(["metric.*"])

ui_bus.emit("ui.click", button="ok")
telemetry_bus.emit("metric.fps", value=58)
# ui_sub only sees ui.click; telemetry_sub only sees metric.fps

Custom subscription class (field-based filter)

For filtering beyond glob, subclass Subscription and override matches():

from codechu_events import Bus, Subscription

class PanelFilter(Subscription):
    """Only events with event['panel'] == 'suggestion'."""

    def matches(self, event_type, event=None):
        if event is None:
            return True  # cheap type-check pass; final check at push
        return event.get("panel") == "suggestion"

bus = Bus()
sub = bus.subscribe(["*"], subscription_class=PanelFilter)
bus.emit("scan.started", panel="suggestion")   # delivered
bus.emit("scan.started", panel="treemap")      # rejected

Async iteration

import asyncio, codechu_events as events

async def consume():
    with events.subscribe_ctx(["scan.*"], heartbeat_sec=5.0) as sub:
        async for ev in sub.aiter():
            print(ev)

asyncio.run(consume())

API reference

Function Purpose
emit(event_type, **fields) Publish event. Never blocks.
subscribe(types=["*"], heartbeat_sec=5.0) Create a Subscription. Caller must unsubscribe().
subscribe_ctx(types, heartbeat_sec=5.0) Same as subscribe(), but a context manager (auto-unsubscribe).
stats() Returns dict with subscriber count, total emitted, drop counts.

Subscription API

Member Purpose
for ev in sub: Sync blocking iteration. Auto-emits _keepalive on idle.
async for ev in sub.aiter(): Async iteration with event loop.
sub.dropped Count of events dropped due to slow consumer.
sub.received Count of events accepted.
sub.close() Stop iteration (sentinel injected).

Resource limits

Constant Default Tweakable
QUEUE_MAX 200 Max events per subscriber queue (drops on overflow)
MAX_SUBSCRIBERS 64 Max concurrent subscribers
DEFAULT_HEARTBEAT_SEC 5.0 Idle keepalive interval

Exceeding MAX_SUBSCRIBERS raises SubscriberLimitExceeded.

License

MIT — see LICENSE.

Part of Codechu.

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

codechu_events-0.1.0.tar.gz (12.4 kB view details)

Uploaded Source

Built Distribution

If you're not sure about the file name format, learn more about wheel file names.

codechu_events-0.1.0-py3-none-any.whl (9.1 kB view details)

Uploaded Python 3

File details

Details for the file codechu_events-0.1.0.tar.gz.

File metadata

  • Download URL: codechu_events-0.1.0.tar.gz
  • Upload date:
  • Size: 12.4 kB
  • Tags: Source
  • Uploaded using Trusted Publishing? No
  • Uploaded via: twine/6.2.0 CPython/3.12.3

File hashes

Hashes for codechu_events-0.1.0.tar.gz
Algorithm Hash digest
SHA256 e2caa0a201fae918b69ecccd94825ca8277fda1ac5505f5a4dfb07cb2d294cba
MD5 d5316cebf2e8aa9f8fd62dc66aac2916
BLAKE2b-256 3ff0d9da3413c8496ca7bb09defaf428b2720033ed62099b35a59a4b015362f0

See more details on using hashes here.

File details

Details for the file codechu_events-0.1.0-py3-none-any.whl.

File metadata

  • Download URL: codechu_events-0.1.0-py3-none-any.whl
  • Upload date:
  • Size: 9.1 kB
  • Tags: Python 3
  • Uploaded using Trusted Publishing? No
  • Uploaded via: twine/6.2.0 CPython/3.12.3

File hashes

Hashes for codechu_events-0.1.0-py3-none-any.whl
Algorithm Hash digest
SHA256 dc9f180ba501a65e896656f8f86f7dd24394ea72146923938f774c60956c8e5f
MD5 a67cabde3b947afe067bfc4d4fb2782b
BLAKE2b-256 912aa8f36c37dab620adaec611d5015271d5ec1c4967b53926a3d99609550fa3

See more details on using hashes here.

Supported by

AWS Cloud computing and Security Sponsor Datadog Monitoring Depot Continuous Integration Fastly CDN Google Download Analytics Pingdom Monitoring Sentry Error logging StatusPage Status page