The Nervous System - A standalone event bus with correlation ID tracking, audit logging, and PII safety
Project description
Aperion Event Bus - The Nervous System
A standalone, production-ready event bus library for Python applications. Extracted from the Aperion Legendary AI ecosystem to serve as a foundational component for distributed systems.
Features
- Async & Sync Handlers - Support for both synchronous and asynchronous event handlers with priority ordering
- Pattern Matching - Subscribe to events using exact matches, wildcards (
chat.*), or catch-all (*) - JSONL Audit Logging - Persistent event log for compliance, debugging, and observability
- Log Rotation - Automatic log rotation with configurable size limits and backup count
- Correlation ID Propagation - End-to-end request tracing across distributed systems (Constitution D1)
- PII Redaction - Built-in redactor for 20+ sensitive data patterns (Constitution D4/B2)
- Handler Error Isolation - One handler's failure doesn't crash others; errors are audited
- Dead Letter Queue - Failed events stored for inspection and retry
- Backpressure Control - Configurable queue limits with DROP, RAISE, or BLOCK policies
- Thread Safety - Safe concurrent subscription/unsubscription during event emission
- Handler Timeouts - Configurable timeout for sync handlers to prevent blocking
- Event Validation - Enforce
{domain}.{action}naming convention at runtime - Metrics Collection - Pluggable metrics backends for observability
- OpenTelemetry Integration - Optional OTEL trace ID bridge for distributed tracing
Installation
# Install from local source
pip install -e .
# Install with development dependencies
pip install -e ".[dev]"
# Install with OpenTelemetry support
pip install -e ".[otel]"
# Install with Prometheus metrics export
pip install -e ".[prometheus]"
# Install all observability features
pip install -e ".[observability]"
Quick Start
from pathlib import Path
from aperion_event_bus import EventBus
# Create an event bus with audit logging
bus = EventBus(
enable_audit=True,
event_log_path=Path("events.jsonl")
)
# Subscribe to events
def on_user_login(event):
print(f"User logged in: {event.payload}")
bus.subscribe(on_user_login, "user.login", priority=100)
# Subscribe to all chat events using wildcard
def on_chat_event(event):
print(f"Chat event: {event.event_type}")
bus.subscribe(on_chat_event, "chat.*")
# Emit events
bus.emit("user.login", {"username": "alice"}, correlation_id="req-123")
bus.emit("chat.message", {"text": "Hello!"})
Event Naming Convention (Required)
All events MUST follow the {domain}.{action} format as mandated by Constitution D2:
| Domain | Examples |
|---|---|
user |
user.login, user.logout, user.created |
chat |
chat.user_input, chat.response, chat.error |
system |
system.startup, system.shutdown, system.handler_error |
session |
session.created, session.expired |
Correlation ID Tracking
The Event Bus enforces correlation ID propagation per Constitution D1:
from aperion_event_bus import EventBus, CorrelationContext
bus = EventBus()
# Option 1: Explicit correlation ID
bus.emit("user.login", {"user": "alice"}, correlation_id="req-abc-123")
# Option 2: Context-based (recommended)
with CorrelationContext("request-xyz-789"):
# All events in this block automatically get the correlation ID
bus.emit("user.login", {"user": "alice"})
bus.emit("session.created", {"session_id": "sess-001"})
# Events will have correlation_id="request-xyz-789"
HTTP Integration
from aperion_event_bus import extract_correlation_id_from_headers, CorrelationContext
# In your HTTP middleware
def middleware(request, call_next):
correlation_id = extract_correlation_id_from_headers(dict(request.headers))
with CorrelationContext(correlation_id):
response = call_next(request)
return response
Priority Ordering
Handlers execute in priority order (higher number runs first):
def critical_handler(event):
print("Runs first")
def normal_handler(event):
print("Runs second")
def low_priority_handler(event):
print("Runs last")
bus.subscribe(critical_handler, "alert.*", priority=300)
bus.subscribe(normal_handler, "alert.*", priority=100)
bus.subscribe(low_priority_handler, "alert.*", priority=10)
Error Handling
Handler errors are isolated - one failing handler doesn't affect others:
def failing_handler(event):
raise ValueError("This handler fails")
def working_handler(event):
print("This still runs!")
bus.subscribe(failing_handler, "test.event", priority=200)
bus.subscribe(working_handler, "test.event", priority=100)
# working_handler will execute despite failing_handler's exception
bus.emit("test.event", {"data": "test"}, wait_for_handlers=True)
Handler errors are automatically logged as system.handler_error events in the audit log.
PII Redaction
The Redactor class helps ensure no PII appears in audit logs (Constitution D4):
from aperion_event_bus import EventBus, Redactor
# Enable PII redaction in audit logs
redactor = Redactor(
enabled=True,
redact_keys={"password", "ssn", "credit_card", "email"}
)
bus = EventBus(
enable_audit=True,
event_log_path=Path("events.jsonl"),
redactor=redactor
)
# Payload PII will be redacted in the audit log
bus.emit("user.signup", {
"username": "alice",
"password": "secret123", # → "[REDACTED]"
"email": "alice@example.com" # → "[REDACTED_EMAIL]"
})
Statistics & Monitoring
stats = bus.get_stats()
print(stats)
# {
# "active_subscriptions": 5,
# "total_subscriptions": 5,
# "audit_enabled": True,
# "max_concurrent_handlers": 10,
# "event_types_subscribed": ["user.*", "chat.*", "system.*"],
# "total_events_logged": 1234,
# "log_file_size": 102400
# }
# Monitor backpressure
print(f"Pending events: {bus.get_pending_count()}")
Backpressure Control
Prevent memory exhaustion under high load:
from aperion_event_bus import EventBus, OverflowPolicy, BackpressureError
# DROP policy - silently drop events when queue is full
bus = EventBus(
max_pending_events=1000,
overflow_policy=OverflowPolicy.DROP
)
# RAISE policy - raise exception when queue is full
bus = EventBus(
max_pending_events=1000,
overflow_policy=OverflowPolicy.RAISE
)
try:
bus.emit("high.volume", {"data": "..."})
except BackpressureError:
print("Queue is full!")
# BLOCK policy - wait until space is available
bus = EventBus(
max_pending_events=1000,
overflow_policy=OverflowPolicy.BLOCK
)
Log Rotation
Prevent disk exhaustion with automatic log rotation:
bus = EventBus(
event_log_path=Path("events.jsonl"),
max_log_size_bytes=10 * 1024 * 1024, # 10 MB
max_log_files=5 # Keep 5 rotated files
)
# Creates: events.jsonl, events.1.jsonl, events.2.jsonl, ...
Handler Timeouts
Prevent slow handlers from blocking:
bus = EventBus(
handler_timeout=5.0 # 5 second timeout
)
def slow_handler(event):
time.sleep(30) # Will be killed after 5 seconds
bus.subscribe(slow_handler, "test.*")
bus.emit("test.event", {}, wait_for_handlers=True) # Returns after 5s, not 30s
Event Validation
Enforce the {domain}.{action} naming convention at runtime:
from aperion_event_bus import EventBus, EventValidator, ValidationError
# Enable strict validation
bus = EventBus(validate_events=True)
bus.emit("user.login", {"user": "alice"}) # OK
bus.emit("invalid", {"data": "test"}) # Raises ValidationError!
# Custom validator with allowed domains
validator = EventValidator(
strict=True,
allowed_domains={"user", "chat", "system"}
)
bus = EventBus(validate_events=True, validator=validator)
bus.emit("user.login", {}) # OK - domain is allowed
bus.emit("order.created", {}) # Raises ValidationError - domain not allowed
Validation Helpers
from aperion_event_bus import is_valid_event_type, extract_domain, extract_action
is_valid_event_type("user.login") # True
is_valid_event_type("invalid") # False
extract_domain("user.login") # "user"
extract_action("user.login") # "login"
extract_action("chat.message.sent") # "message.sent"
Metrics Collection
Collect observability metrics with pluggable backends:
from aperion_event_bus import EventBus, MetricsCollector, InMemoryMetrics
# Create a metrics collector with in-memory backend (good for testing)
backend = InMemoryMetrics()
metrics = MetricsCollector(backend)
bus = EventBus(metrics=metrics)
bus.subscribe(lambda e: None, "user.*")
bus.emit("user.login", {"user": "alice"}, wait_for_handlers=True)
# Get metrics snapshot
snapshot = metrics.get_snapshot()
print(snapshot)
# {
# "events_emitted": {"user.login": 1},
# "events_dropped": {},
# "total_events_emitted": 1,
# "handler_stats": {
# "<lambda>": {"total_calls": 1, "success_count": 1, "failure_count": 0, ...}
# }
# }
Custom Metrics Backend
from aperion_event_bus import MetricsCollector, CallbackMetrics
def my_metrics_callback(metric_type, name, value, tags):
# Send to your metrics system (Datadog, StatsD, etc.)
print(f"{metric_type}: {name}={value} tags={tags}")
metrics = MetricsCollector(CallbackMetrics(my_metrics_callback))
bus = EventBus(metrics=metrics)
Stats Include Metrics
stats = bus.get_stats()
# Now includes:
# - "metrics_enabled": True
# - "validation_enabled": True
# - "metrics": {...} # Full metrics snapshot
OpenTelemetry Integration
Bridge correlation IDs with OpenTelemetry trace IDs (optional):
# First, install opentelemetry: pip install -e ".[otel]"
from aperion_event_bus import is_otel_available, get_trace_correlation_id, traced
if is_otel_available():
# Use OTEL trace ID as correlation ID
correlation_id = get_trace_correlation_id()
bus.emit("user.login", {"user": "alice"}, correlation_id=correlation_id)
# Wrap functions with OTEL spans
@traced(name="process_user")
def process_user(user_id):
# This function will be traced
pass
OTEL Degradation
OpenTelemetry is optional. If not installed, all OTEL functions return no-op values:
from aperion_event_bus import is_otel_available, get_trace_correlation_id
if not is_otel_available():
# get_trace_correlation_id() returns None
# traced() decorator is a no-op
pass
Dead Letter Queue
Failed events are stored in a Dead Letter Queue for inspection and retry:
from aperion_event_bus import EventBus, DeadLetterQueue, FailureReason
# Create a DLQ with max 1000 entries
dlq = DeadLetterQueue(max_size=1000)
# Attach to EventBus
bus = EventBus(dead_letter_queue=dlq)
# Handler that sometimes fails
def flaky_handler(event):
if event.payload.get("fail"):
raise ValueError("Simulated failure")
bus.subscribe(flaky_handler, "test.*")
# This event will fail and go to DLQ
bus.emit("test.event", {"fail": True}, wait_for_handlers=True)
# Inspect failed events
print(f"DLQ size: {dlq.size()}")
for failed in dlq.get_all():
print(f"Failed: {failed.event.event_type} - {failed.error}")
print(f"Reason: {failed.reason}")
# Filter by reason
timeouts = dlq.get_by_reason(FailureReason.HANDLER_TIMEOUT)
# Retry a specific event
success = dlq.retry(failed.id, bus)
# Retry all events (with max retry limit)
results = dlq.retry_all(bus, max_retries=3)
# Clear old entries
dlq.clear_older_than(hours=24)
# Get DLQ stats (also included in bus.get_stats())
stats = dlq.get_stats()
# {
# "size": 5,
# "max_size": 1000,
# "by_reason": {"handler_error": 3, "handler_timeout": 2},
# "by_event_type": {"test.event": 5}
# }
Failure Reasons
| Reason | Description |
|---|---|
HANDLER_ERROR |
Handler raised an exception |
HANDLER_TIMEOUT |
Handler exceeded timeout limit |
ALL_HANDLERS_FAILED |
All handlers for event failed |
VALIDATION_ERROR |
Event failed validation |
SERIALIZATION_ERROR |
Event could not be serialized |
API Reference
EventBus
EventBus(
event_log_path: Optional[Path] = None,
max_concurrent_handlers: int = 10,
enable_audit: bool = True,
redactor: Optional[Redactor] = None,
handler_timeout: float = 30.0,
max_pending_events: Optional[int] = None,
overflow_policy: OverflowPolicy = OverflowPolicy.DROP,
max_log_size_bytes: Optional[int] = None,
max_log_files: int = 5,
metrics: Optional[MetricsCollector] = None,
validate_events: bool = False,
validator: Optional[EventValidator] = None,
dead_letter_queue: Optional[DeadLetterQueue] = None
)
Methods
| Method | Description |
|---|---|
subscribe(handler, event_types, priority=100) |
Subscribe to events. Returns subscription ID. |
unsubscribe(subscription_id) |
Remove a subscription. Returns True if found. |
emit(event_type, payload, source=None, correlation_id=None, wait_for_handlers=False) |
Emit an event. Returns event ID. |
get_events(event_type=None, since=None, limit=100) |
Read events from audit log. |
get_stats() |
Get event bus statistics. |
clear_log() |
Clear the audit log. |
shutdown() |
Shutdown the event bus. |
Event
Event(
event_type: str,
payload: dict[str, Any],
timestamp: float = <auto>,
event_id: str = <auto>,
source: Optional[str] = None,
correlation_id: Optional[str] = None
)
Immutable event record. Use event.to_jsonl() and Event.from_jsonl(line) for serialization.
Correlation Functions
| Function | Description |
|---|---|
get_correlation_id() |
Get current correlation ID from context |
set_correlation_id(id) |
Set correlation ID in current context |
CorrelationContext(id) |
Context manager for correlation ID scope |
extract_correlation_id_from_headers(headers) |
Extract or generate correlation ID from HTTP headers |
Development
# Install dev dependencies
pip install -e ".[dev]"
# Run tests
make test
# Run tests with coverage
make coverage
# Lint code
make lint
# Format code
make format
# Type check
make type-check
Architecture
event_bus/
├── __init__.py # Public API exports
├── bus.py # EventBus class
├── events.py # Event dataclass
├── correlation.py # Correlation ID management
├── audit.py # JSONL logging & Redactor
├── validation.py # Event type validation
├── metrics.py # Metrics collection backends
└── telemetry.py # OpenTelemetry integration
Constitution Enforcement
This library enforces the following constitutional requirements:
| Constitution | Requirement | Implementation |
|---|---|---|
| D1 | Correlation IDs | Auto-propagation via CorrelationContext |
| D2 | Event Naming | {domain}.{action} format enforced via validate_events=True |
| D4 | No Secrets/PII | Redactor class for audit log scrubbing |
| B2 | PII Safety | Redactor with configurable patterns |
License
MIT
Project details
Release history Release notifications | RSS feed
Download files
Download the file for your platform. If you're not sure which to choose, learn more about installing packages.
Source Distribution
Built Distribution
Filter files by name, interpreter, ABI, and platform.
If you're not sure about the file name format, learn more about wheel file names.
Copy a direct link to the current filters
File details
Details for the file aperion_event_bus-1.2.1.tar.gz.
File metadata
- Download URL: aperion_event_bus-1.2.1.tar.gz
- Upload date:
- Size: 74.0 kB
- Tags: Source
- Uploaded using Trusted Publishing? Yes
- Uploaded via: twine/6.1.0 CPython/3.13.7
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
17c5913395988302da324eb0b5e67cb9e75deeff82b49e9fa628f8f75c4fb7c9
|
|
| MD5 |
519a55c3cc2d3b9674cc8e3d9ea1084a
|
|
| BLAKE2b-256 |
3abb368cc8da4704603ee86bb6758aa34a678720546f3151169d82a33061f9fc
|
Provenance
The following attestation bundles were made for aperion_event_bus-1.2.1.tar.gz:
Publisher:
release.yml on invictustitan2/aperion-event-bus
-
Statement:
-
Statement type:
https://in-toto.io/Statement/v1 -
Predicate type:
https://docs.pypi.org/attestations/publish/v1 -
Subject name:
aperion_event_bus-1.2.1.tar.gz -
Subject digest:
17c5913395988302da324eb0b5e67cb9e75deeff82b49e9fa628f8f75c4fb7c9 - Sigstore transparency entry: 941982910
- Sigstore integration time:
-
Permalink:
invictustitan2/aperion-event-bus@89611de91826f3e623ab2b1c3ba550863ec51900 -
Branch / Tag:
refs/tags/v1.2.1 - Owner: https://github.com/invictustitan2
-
Access:
public
-
Token Issuer:
https://token.actions.githubusercontent.com -
Runner Environment:
github-hosted -
Publication workflow:
release.yml@89611de91826f3e623ab2b1c3ba550863ec51900 -
Trigger Event:
push
-
Statement type:
File details
Details for the file aperion_event_bus-1.2.1-py3-none-any.whl.
File metadata
- Download URL: aperion_event_bus-1.2.1-py3-none-any.whl
- Upload date:
- Size: 36.0 kB
- Tags: Python 3
- Uploaded using Trusted Publishing? Yes
- Uploaded via: twine/6.1.0 CPython/3.13.7
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
422a8e96098548e4b709e825b1bbeb408062e34a1cba2fe5a0a16f1e82246b2a
|
|
| MD5 |
9f19e781e7cbe60a0cb1df810ad67024
|
|
| BLAKE2b-256 |
b62caa3d2e5bab94908d4946d4a2250a2299394e2b96a65016a995397e753cbc
|
Provenance
The following attestation bundles were made for aperion_event_bus-1.2.1-py3-none-any.whl:
Publisher:
release.yml on invictustitan2/aperion-event-bus
-
Statement:
-
Statement type:
https://in-toto.io/Statement/v1 -
Predicate type:
https://docs.pypi.org/attestations/publish/v1 -
Subject name:
aperion_event_bus-1.2.1-py3-none-any.whl -
Subject digest:
422a8e96098548e4b709e825b1bbeb408062e34a1cba2fe5a0a16f1e82246b2a - Sigstore transparency entry: 941982950
- Sigstore integration time:
-
Permalink:
invictustitan2/aperion-event-bus@89611de91826f3e623ab2b1c3ba550863ec51900 -
Branch / Tag:
refs/tags/v1.2.1 - Owner: https://github.com/invictustitan2
-
Access:
public
-
Token Issuer:
https://token.actions.githubusercontent.com -
Runner Environment:
github-hosted -
Publication workflow:
release.yml@89611de91826f3e623ab2b1c3ba550863ec51900 -
Trigger Event:
push
-
Statement type: