Skip to main content

Python event bus library based on CloudEvents specification, supporting in-process and distributed event handling

Project description

EventBus

A Python event bus library based on the CloudEvents specification, supporting in-process and distributed event handling.

Features

  • CloudEvents Specification: Full implementation of CloudEvents v1.0 specification
  • Dual-Mode Event Handling:
    • PROCESS scope: Fast in-process event handling
    • APP scope: Distributed event distribution across instances
  • FastStream Compatible: Provides memory broker compatible with FastStream API
  • RPC Support: Built-in request-response pattern
  • FastAPI Integration: Seamless integration with FastAPI applications
  • Decorator Pattern: Clean event handler registration
  • Statistics and Introspection: Built-in event statistics and handler query functionality

Installation

# Install with uv (recommended)
uv pip install -e .

# Install with optional dependencies for examples
uv pip install -e ".[examples]"

# Install with Redis support for distributed events
uv pip install -e ".[redis]"

# Or install core dependencies only
pip install pydantic faststream

Quick Start

Basic Usage

import asyncio
from eventbus.memory_broker import AsyncQueueBroker
from eventbus.eventbus import EventBus
from eventbus.event import SkyEvent, EventScope

# Create brokers (both using AsyncQueueBroker for simplicity)
process_broker = AsyncQueueBroker()
app_broker = AsyncQueueBroker()

# Create EventBus
bus = EventBus(process_broker, app_broker)

# Define event class
class OrderCreatedEvent(SkyEvent):
    type: str = "order.created"
    order_id: str
    amount: float
    scope: EventScope = EventScope.PROCESS

# Define handler function
async def handle_order(event_data: dict):
    print(f"Processing order: {event_data['order_id']}")

# Subscribe to event
bus.subscribe("order.created", handle_order)

# Start EventBus
await bus.start()

# Publish event
event = OrderCreatedEvent(
    source="order-service",
    order_id="ORD-001",
    amount=99.9
)
await bus.publish(event)

# Stop EventBus
await bus.stop()

FastAPI Integration

from contextlib import asynccontextmanager
from fastapi import FastAPI
from eventbus.memory_broker import AsyncQueueBroker
from eventbus.eventbus import init_eventbus, event_handler
from eventbus.event import SkyEvent, EventScope

# Define event
class OrderCreatedEvent(SkyEvent):
    type: str = "order.created"
    order_id: str
    scope: EventScope = EventScope.PROCESS

# Register handler using decorator
@event_handler(OrderCreatedEvent)
async def handle_order(event_data: dict):
    print(f"Processing order: {event_data['order_id']}")

# FastAPI lifecycle management
@asynccontextmanager
async def lifespan(app: FastAPI):
    process_broker = AsyncQueueBroker()
    app_broker = AsyncQueueBroker()
    event_bus = init_eventbus(process_broker, app_broker)
    app.state.event_bus = event_bus
    await event_bus.start()
    yield
    await event_bus.stop()

app = FastAPI(lifespan=lifespan)

@app.post("/orders")
async def create_order(order_id: str, amount: float):
    event = OrderCreatedEvent(
        source="api",
        order_id=order_id,
        amount=amount
    )
    await app.state.event_bus.publish(event)
    return {"status": "created"}

Distributed Setup (Optional)

For distributed event handling across multiple instances, use FastStream brokers:

from eventbus.memory_broker import AsyncQueueBroker
from faststream.redis import RedisBroker
from eventbus.eventbus import EventBus

# Process broker: in-memory for fast local events
process_broker = AsyncQueueBroker()

# App broker: Redis for distributed events across instances
app_broker = RedisBroker("redis://localhost:6379")

bus = EventBus(process_broker, app_broker)

Note: Redis support requires the [redis] extra: pip install -e ".[redis]". For single-instance applications, you can use AsyncQueueBroker for both brokers.

Core Concepts

CloudEvent

Event class based on CloudEvents v1.0 specification.

Required Attributes:

  • id: Event unique identifier (auto-generated UUID)
  • source: Event source identifier (URI-reference format)
  • specversion: CloudEvents specification version (default "1.0")
  • type: Event type identifier

Optional Attributes:

  • datacontenttype: Data content type (default "application/json")
  • dataschema: Data schema URI
  • subject: Event subject
  • time: Event timestamp (auto-generated)
  • data: Event payload data
  • extensions: Extension attributes dictionary

SkyEvent

Extends CloudEvent with event scope functionality.

Additional Attributes:

  • scope: Event scope (EventScope.PROCESS or EventScope.APP)

EventScope Explanation:

  • PROCESS: In-process events, handled only in current instance, uses memory queue, fast response
  • APP: Application-level events, distributed to all instances via broker (e.g., Redis)

AsyncQueueBroker

FastStream API-compatible memory broker, suitable for:

  • Development and testing environments
  • Single-process applications
  • Scenarios without Redis requirement

Main Methods:

  • subscriber(channel): Create subscriber decorator
  • publisher(channel): Create publisher decorator
  • publish(message, channel): Publish message
  • request(message, channel, timeout): RPC request
  • start() / stop(): Start/stop broker
  • get_stats(): Get statistics
  • get_subscribers(): Get subscriber information

EventBus

Event bus core class, manages event subscription and publishing.

Main Methods:

  • subscribe(event_type, handler): Register event handler
  • publish(event): Publish event
  • start() / stop(): Start/stop event bus
  • get_handlers(): Get all registered handlers

Decorators:

  • @event_handler(EventClass): Auto-register event handler

Usage Examples

1. Basic Pub/Sub

from eventbus.memory_broker import AsyncQueueBroker

broker = AsyncQueueBroker()

@broker.subscriber("events.user.created")
async def handle_user(data: dict):
    print(f"New user: {data['username']}")

await broker.start()
await broker.publish({"username": "alice"}, channel="events.user.created")
await broker.stop()

2. Multiple Subscribers

@broker.subscriber("events.order.created")
async def send_email(data: dict):
    print(f"Sending email: Order {data['order_id']}")

@broker.subscriber("events.order.created")
async def update_inventory(data: dict):
    print(f"Updating inventory: Order {data['order_id']}")

await broker.publish({"order_id": "123"}, channel="events.order.created")

3. Publisher Decorator

@broker.publisher("events.result")
async def process_data(data: dict) -> dict:
    # Return value automatically published to events.result
    return {"status": "processed", "data": data}

@broker.subscriber("events.result")
async def handle_result(data: dict):
    print(f"Result: {data}")

await process_data({"value": 42})

4. RPC Pattern

@broker.subscriber("rpc.calculate")
async def calculate(data: dict) -> dict:
    result = data["a"] + data["b"]
    return {"result": result}

# Send request and wait for response
response = await broker.request(
    {"a": 10, "b": 20},
    channel="rpc.calculate",
    timeout=1.0
)
print(response)  # {"result": 30}

5. Context Manager

async with AsyncQueueBroker() as broker:
    @broker.subscriber("events.test")
    async def handler(data: dict):
        print(data)

    await broker.publish({"msg": "hello"}, channel="events.test")
    await asyncio.sleep(0.1)

6. Event Scopes

# In-process event (fast)
process_event = SkyEvent(
    type="cache.cleared",
    source="cache-service",
    scope=EventScope.PROCESS,
    data={"cache_key": "user_123"}
)
await bus.publish(process_event)

# Application-level event (distributed)
app_event = SkyEvent(
    type="user.registered",
    source="auth-service",
    scope=EventScope.APP,
    data={"user_id": "456"}
)
await bus.publish(app_event)

7. Statistics

stats = broker.get_stats()
print(f"Published: {stats['published']}")
print(f"Consumed: {stats['consumed']}")
print(f"Errors: {stats['errors']}")
print(f"Channels: {stats['channels']}")
print(f"Subscribers: {stats['subscribers']}")

8. Handler Introspection

handlers = bus.get_handlers()
for event_type, handler_list in handlers.items():
    print(f"{event_type}:")
    for h in handler_list:
        print(f"  - {h['function_name']} ({h['module']})")

Example Files

The examples directory contains complete example code:

AsyncQueueBroker Examples

EventBus Examples

Running examples:

# AsyncQueueBroker examples
python examples/01_basic_pubsub.py

# EventBus examples
python examples/08_eventbus_basic.py

# FastAPI integration (requires Redis)
python examples/15_fastapi_eventbus_integration.py

API Reference

CloudEvent

class CloudEvent(BaseModel):
    id: str                              # Auto-generated
    source: str                          # Required
    specversion: str = "1.0"            # Default value
    type: str                            # Required
    datacontenttype: Optional[str]       # Default "application/json"
    dataschema: Optional[str]
    subject: Optional[str]
    time: Optional[datetime]             # Auto-generated
    data: Optional[Dict[str, Any]]
    extensions: Dict[str, Any]           # Extension attributes

    def to_dict() -> Dict[str, Any]
    def to_json() -> str
    @classmethod
    def from_dict(cls, data: Dict[str, Any]) -> "CloudEvent"

SkyEvent

class SkyEvent(CloudEvent):
    scope: EventScope = EventScope.APP   # Event scope

    @property
    def event_id(self) -> str            # Compatibility property
    @property
    def event_type(self) -> str          # Compatibility property
    @property
    def timestamp(self) -> datetime      # Compatibility property

AsyncQueueBroker

class AsyncQueueBroker:
    def __init__(self, url: str = "", *, max_queue_size: int = 1000)

    async def start() -> None
    async def stop() -> None
    async def connect() -> None
    async def ping(timeout: Optional[float] = None) -> bool

    def subscriber(self, channel: str, **kwargs) -> InMemorySubscriber
    def publisher(self, channel: str, **kwargs) -> InMemoryPublisher

    async def publish(
        self,
        message: Any = None,
        channel: Optional[str] = None,
        *,
        headers: Optional[Dict[str, Any]] = None,
        correlation_id: Optional[str] = None,
        reply_to: str = "",
        **kwargs
    ) -> int

    async def request(
        self,
        message: Any = None,
        channel: Optional[str] = None,
        *,
        timeout: float = 0.5,
        **kwargs
    ) -> Any

    def get_stats() -> Dict[str, Any]
    def get_subscribers() -> Dict[str, List[Dict[str, str]]]

EventBus

class EventBus:
    def __init__(
        self,
        process_level_broker: Any,
        app_level_broker: Any,
        logger: Optional[logging.Logger] = None
    )

    async def start() -> None
    async def stop() -> None

    def subscribe(self, event_type: str, handler: Callable) -> None
    async def publish(self, event: SkyEvent) -> None

    def get_handlers() -> Dict[str, List[Dict[str, str]]]

# Decorator
def event_handler(event_class: Type[SkyEvent]) -> Callable

# Initialization function
def init_eventbus(
    process_level_broker: Any,
    app_level_broker: Any,
    logger: Optional[logging.Logger] = None
) -> EventBus

Best Practices

1. Choose Appropriate Event Scope

  • PROCESS Scope: Suitable for scenarios without cross-instance communication

    • Cache updates
    • Local state changes
    • Fast-response internal events
  • APP Scope: Suitable for scenarios requiring distributed processing

    • User registration/login
    • Order creation
    • Notification sending
    • Business requiring multi-instance coordination

2. Event Naming Convention

Use reverse domain name style:

# Good naming
"com.example.user.created"
"com.example.order.payment.completed"
"com.example.notification.email.sent"

# Avoid
"user_created"
"ORDER_CREATED"
"notification"

3. Event Data Structure

Keep event data concise, include only necessary information:

# Good practice
class UserCreatedEvent(SkyEvent):
    type: str = "user.created"
    user_id: str
    email: str
    created_at: datetime

# Avoid including large amounts of data in events
# If detailed information is needed, query in the handler

4. Error Handling

Exceptions in event handlers are caught and logged, won't affect other handlers:

@event_handler(OrderCreatedEvent)
async def handle_order(event_data: dict):
    try:
        # Business logic
        await process_order(event_data)
    except Exception as e:
        logger.error(f"Failed to process order: {e}")
        # Can publish error event or perform compensation

5. Testing

Use AsyncQueueBroker for unit testing:

import pytest
from eventbus.memory_broker import AsyncQueueBroker

@pytest.mark.asyncio
async def test_event_handling():
    broker = AsyncQueueBroker()
    received = []

    @broker.subscriber("test.event")
    async def handler(data: dict):
        received.append(data)

    await broker.start()
    await broker.publish({"value": 42}, channel="test.event")
    await asyncio.sleep(0.1)
    await broker.stop()

    assert len(received) == 1
    assert received[0]["value"] == 42

FastStream Integration

EventBus can integrate with various FastStream brokers:

Redis Broker

from faststream.redis import RedisBroker

process_broker = AsyncQueueBroker()
app_broker = RedisBroker("redis://localhost:6379")
event_bus = EventBus(process_broker, app_broker)

Kafka Broker

from faststream.kafka import KafkaBroker

process_broker = AsyncQueueBroker()
app_broker = KafkaBroker("localhost:9092")
event_bus = EventBus(process_broker, app_broker)

RabbitMQ Broker

from faststream.rabbit import RabbitBroker

process_broker = AsyncQueueBroker()
app_broker = RabbitBroker("amqp://guest:guest@localhost:5672/")
event_bus = EventBus(process_broker, app_broker)

Performance Considerations

  • AsyncQueueBroker: Memory queue, highest performance, suitable for single process
  • In-process events (PROCESS): Direct invocation, lowest latency
  • Application-level events (APP): Distributed via broker, has network latency
  • Queue size: Default 1000, adjustable via max_queue_size
  • Concurrent processing: Each channel has independent consumer task

Troubleshooting

Events Not Being Processed

  1. Check if broker is started: await broker.start()
  2. Check if channel names match
  3. Check if handler is correctly registered
  4. Review log output

RPC Timeout

  1. Increase timeout parameter
  2. Check if handler returns value
  3. Confirm handler doesn't throw exceptions

High Memory Usage

  1. Reduce max_queue_size
  2. Check for message backlog
  3. Optimize handler performance

License

Apache License 2.0

Contributing

Issues and Pull Requests are welcome!

Related Resources

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

opensecflow_eventbus-0.1.0.tar.gz (22.7 kB view details)

Uploaded Source

Built Distribution

If you're not sure about the file name format, learn more about wheel file names.

opensecflow_eventbus-0.1.0-py3-none-any.whl (19.8 kB view details)

Uploaded Python 3

File details

Details for the file opensecflow_eventbus-0.1.0.tar.gz.

File metadata

  • Download URL: opensecflow_eventbus-0.1.0.tar.gz
  • Upload date:
  • Size: 22.7 kB
  • Tags: Source
  • Uploaded using Trusted Publishing? No
  • Uploaded via: twine/6.2.0 CPython/3.12.12

File hashes

Hashes for opensecflow_eventbus-0.1.0.tar.gz
Algorithm Hash digest
SHA256 700635efc2377352d42a68d0297b2091c0162ffda5a8bef7d7bc9336c47b35ee
MD5 f981ee22c726b37f22223b7023e66371
BLAKE2b-256 a4cfe34aae27e50d9a66157d8e406a5c2bd620dae0f95a8c85dfd372e143d8f0

See more details on using hashes here.

File details

Details for the file opensecflow_eventbus-0.1.0-py3-none-any.whl.

File metadata

File hashes

Hashes for opensecflow_eventbus-0.1.0-py3-none-any.whl
Algorithm Hash digest
SHA256 5a75c8293917fdca454c6213c2e1fb7a2ee0c6bc505e3b3cb03d5b5764e82847
MD5 4903f96f1a5b59bb38e94112d57ddb42
BLAKE2b-256 1663ad5c8ed371d5024441d94552fc11aaf586e257c87382c315edcd4909b5a7

See more details on using hashes here.

Supported by

AWS Cloud computing and Security Sponsor Datadog Monitoring Depot Continuous Integration Fastly CDN Google Download Analytics Pingdom Monitoring Sentry Error logging StatusPage Status page