Skip to main content

Thread-safe multi-channel event bus with glob filtering, backpressure, and sync/async iteration.

Project description

              .    .  c o d e c h u  .    .
           .   \  |  /  e v e n t s  \  |   .
        ((((( ── ((•)) ──────────── ((•)) ── )))))
           '   /  |  \                /  |   '
              '    '   scan.*   ui.click    '

Thread-safe multi-channel pub/sub — emit from anywhere, listen everywhere.

codechu-events

Thread-safe multi-channel event bus for Python. Pure stdlib, ~150 LOC.

pip install codechu-events

What it gives you

  • Multi-channel pub/sub with glob-pattern filtering: ["scan.*", "ui.click"]
  • Thread-safe — emit from any thread, never blocks
  • Bounded queues per subscriber — slow consumers drop events, fast publishers never wait
  • Sync + async iterationfor ev in sub: or async for ev in sub.aiter():
  • Context manager for clean unsubscribe: with bus.subscribe_ctx([...]) as sub:
  • Heartbeat support for dead-connection detection on idle channels
  • Resource limits — max subscribers + max queue depth, bounded by design
  • Stats for monitoring (subscriber count, drop count, queue depth)

Quick examples

Basic usage

Construct a Bus() explicitly — there is no module-level default, so ownership and lifetime stay in the caller's hands.

from codechu_events import Bus

bus = Bus()

with bus.subscribe_ctx(["scan.*"]) as sub:
    bus.emit("scan.started", path="/home")
    bus.emit("scan.progress", count=42)
    bus.emit("scan.finished", count=128, ok=True)
    bus.emit("foo.bar")  # filtered out
    for ev in sub:
        print(ev["event"], ev)

Multiple isolated buses

A single process can run multiple independent buses — useful for separating domains (e.g. UI events vs telemetry) or for testing:

from codechu_events import Bus

ui_bus = Bus()
telemetry_bus = Bus(max_subscribers=128)  # larger cap for telemetry

ui_sub = ui_bus.subscribe(["ui.*"])
telemetry_sub = telemetry_bus.subscribe(["metric.*"])

ui_bus.emit("ui.click", button="ok")
telemetry_bus.emit("metric.fps", value=58)
# ui_sub only sees ui.click; telemetry_sub only sees metric.fps

Custom subscription class (field-based filter)

For filtering beyond glob, subclass Subscription and override matches():

from codechu_events import Bus, Subscription

class PanelFilter(Subscription):
    """Only events with event['panel'] == 'suggestion'."""

    def matches(self, event_type, event=None):
        if event is None:
            return True  # cheap type-check pass; final check at push
        return event.get("panel") == "suggestion"

bus = Bus()
sub = bus.subscribe(["*"], subscription_class=PanelFilter)
bus.emit("scan.started", panel="suggestion")   # delivered
bus.emit("scan.started", panel="treemap")      # rejected

Async iteration

import asyncio
from codechu_events import Bus

bus = Bus()

async def consume():
    with bus.subscribe_ctx(["scan.*"], heartbeat_sec=5.0) as sub:
        async for ev in sub.aiter():
            print(ev)

asyncio.run(consume())

API reference

Bus

Method Purpose
Bus(max_subscribers=64, queue_max=200) Construct an independent bus.
bus.emit(event_type, **fields) Publish event. Never blocks.
bus.subscribe(types=["*"], heartbeat_sec=5.0) Create a Subscription. Caller must unsubscribe().
bus.subscribe_ctx(types, heartbeat_sec=5.0) Context manager (auto-unsubscribe on exit).
bus.unsubscribe(sub) Idempotent removal + sub.close().
bus.stats() Dict with subscriber count, total emitted, drop counts.
bus.subscriber_count() Active subscription count.
bus.reset() Close all subscriptions and zero counters.

Subscription

Member Purpose
for ev in sub: Sync blocking iteration. Auto-emits _keepalive on idle.
async for ev in sub.aiter(): Async iteration with event loop.
sub.dropped Count of events dropped due to slow consumer.
sub.received Count of events accepted.
sub.close() Stop iteration (sentinel injected).

Resource limits

Constant Default Tweakable
QUEUE_MAX 200 Max events per subscriber queue (drops on overflow)
MAX_SUBSCRIBERS 64 Max concurrent subscribers
DEFAULT_HEARTBEAT_SEC 5.0 Idle keepalive interval

Exceeding MAX_SUBSCRIBERS raises SubscriberLimitExceeded.

License

MIT — see LICENSE.

Part of Codechu.

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

codechu_events-0.2.0.tar.gz (12.3 kB view details)

Uploaded Source

Built Distribution

If you're not sure about the file name format, learn more about wheel file names.

codechu_events-0.2.0-py3-none-any.whl (10.5 kB view details)

Uploaded Python 3

File details

Details for the file codechu_events-0.2.0.tar.gz.

File metadata

  • Download URL: codechu_events-0.2.0.tar.gz
  • Upload date:
  • Size: 12.3 kB
  • Tags: Source
  • Uploaded using Trusted Publishing? Yes
  • Uploaded via: twine/6.1.0 CPython/3.13.12

File hashes

Hashes for codechu_events-0.2.0.tar.gz
Algorithm Hash digest
SHA256 fb0da1f7f276f825a5af1771f82334f310bfba97b1c993b6625ffaea26235dff
MD5 44ca0288efd95ee401cfba3712660f6a
BLAKE2b-256 15d878a04b5ca6952c8de5432417a5a1e45d66cc9e2e5d47f92d00413dcbe367

See more details on using hashes here.

Provenance

The following attestation bundles were made for codechu_events-0.2.0.tar.gz:

Publisher: release.yml on codechu/events-py

Attestations: Values shown here reflect the state when the release was signed and may no longer be current.

File details

Details for the file codechu_events-0.2.0-py3-none-any.whl.

File metadata

  • Download URL: codechu_events-0.2.0-py3-none-any.whl
  • Upload date:
  • Size: 10.5 kB
  • Tags: Python 3
  • Uploaded using Trusted Publishing? Yes
  • Uploaded via: twine/6.1.0 CPython/3.13.12

File hashes

Hashes for codechu_events-0.2.0-py3-none-any.whl
Algorithm Hash digest
SHA256 9a0cfef7943176f9253acd464018d8156857df7eee835f7e98be3580c7032a5e
MD5 f2cd23d1a8b98fb83f1362cdebcdc10b
BLAKE2b-256 4dcd1ba57feec0f3b3ef9bfcf9e107eb78870a5b1c9b4b80fdd030a887a67e63

See more details on using hashes here.

Provenance

The following attestation bundles were made for codechu_events-0.2.0-py3-none-any.whl:

Publisher: release.yml on codechu/events-py

Attestations: Values shown here reflect the state when the release was signed and may no longer be current.

Supported by

AWS Cloud computing and Security Sponsor Datadog Monitoring Depot Continuous Integration Fastly CDN Google Download Analytics Pingdom Monitoring Sentry Error logging StatusPage Status page