Buz is a set of light, simple and extensible implementations of event, command and query buses.
Project description
Buz
Buz is a lightweight, simple, and extensible Python library that provides implementations of Event, Command, and Query buses following CQRS and Event-Driven Architecture patterns.
๐ Table of Contents
- Buz
โจ Key Features
- ๐ Bus Types: Event, Command, and Query buses for clean architecture
- ๐ Sync & Async Support: Both synchronous and asynchronous implementations
- ๐ง Middleware System: Extensible middleware for cross-cutting concerns
- ๐ฆ Message Brokers: Support for Kafka, RabbitMQ (via Kombu), and in-memory
- ๐ Transactional Outbox: Reliable event publishing with transactional guarantees
- ๐ฏ Dependency Injection: Built-in locator pattern for handler resolution
- ๐ Type Safety: Fully typed with mypy support
- ๐ชถ Lightweight: Minimal dependencies, maximum flexibility
๐ Quick Start
Installation
# Basic installation
pip install buz
# With Kafka support
pip install buz[aiokafka]
# With RabbitMQ support
pip install buz[kombu]
# With dependency injection
pip install buz[pypendency]
Basic Usage
Event Bus Example
from dataclasses import dataclass
from buz import Message
from buz.event import Event, BaseSubscriber
from buz.event.sync import SyncEventBus
from buz.locator.sync import InstanceLocator
@dataclass(frozen=True)
class UserCreated(Event):
user_id: str
email: str
class EmailSubscriber(BaseSubscriber):
def consume(self, event: UserCreated) -> None:
print(f"Sending welcome email to {event.email}")
class AnalyticsSubscriber(BaseSubscriber):
def consume(self, event: UserCreated) -> None:
print(f"Tracking user creation: {event.user_id}")
# Setup
locator: InstanceLocator = InstanceLocator()
locator.register(EmailSubscriber())
locator.register(AnalyticsSubscriber())
event_bus = SyncEventBus(locator)
# Usage
event = UserCreated(user_id="123", email="user@example.com")
event_bus.publish(event)
Command Bus Example
from dataclasses import dataclass
from buz.command import Command
from buz.command.synchronous import BaseCommandHandler
from buz.command.synchronous.self_process import SelfProcessCommandBus
from buz.locator.sync import InstanceLocator
@dataclass(frozen=True)
class CreateUser(Command):
email: str
name: str
class CreateUserCommandHandler(BaseCommandHandler):
def handle(self, command: CreateUser) -> None:
# Business logic here
print(f"Creating user: {command.name} ({command.email})")
# Setup
locator = InstanceLocator()
locator.register(CreateUserCommandHandler())
command_bus = SelfProcessCommandBus(locator)
# Usage
command = CreateUser(email="user@example.com", name="John Doe")
command_bus.handle(command)
Query Bus Example
from dataclasses import dataclass
from buz.query import Query, QueryResponse
from buz.query.synchronous import BaseQueryHandler
from buz.query.synchronous.self_process import SelfProcessQueryBus
from buz.locator.sync import InstanceLocator
@dataclass(frozen=True)
class GetUser(Query):
user_id: str
@dataclass(frozen=True)
class User:
user_id: str
name: str
email: str
class GetUserQueryHandler(BaseQueryHandler):
def handle(self, query: GetUser) -> QueryResponse:
# Business logic here
return QueryResponse(
content=User(
user_id=query.user_id,
name="John Doe",
email="john@example.com"
)
)
# Setup
locator = InstanceLocator()
locator.register(GetUserQueryHandler())
query_bus = SelfProcessQueryBus(locator)
# Usage
query = GetUser(user_id="123")
query_response = query_bus.handle(query)
user = query_response.content
print(f"User: {user.name}")
๐๏ธ Architecture
Buz implements the Command Query Responsibility Segregation (CQRS) pattern with distinct buses:
Event Bus
- Purpose: Publish domain events and notify multiple subscribers
- Pattern: Pub/Sub with multiple handlers per event
- Use Cases: Domain event broadcasting, eventual consistency, integration events
Command Bus
- Purpose: Execute business operations and commands
- Pattern: Single handler per command
- Use Cases: Business logic execution, write operations, state changes
Query Bus
- Purpose: Retrieve data and execute queries
- Pattern: Single handler per query with typed responses
- Use Cases: Data retrieval, read operations, projections
๐ง Advanced Features
Middleware System
Add cross-cutting concerns like logging, validation, and metrics:
from datetime import datetime
from buz.event import Event, Subscriber
from buz.event.middleware import BasePublishMiddleware, BaseConsumeMiddleware
from buz.event.infrastructure.models.execution_context import ExecutionContext
class LoggingPublishMiddleware(BasePublishMiddleware):
def _before_on_publish(self, event: Event) -> None:
print(f"Publishing event {event}")
def _after_on_publish(self, event: Event) -> None:
return
class MetricsConsumeMiddleware(BaseConsumeMiddleware):
def __init__(self) -> None:
self.__consumption_start_time: datetime = datetime.now()
def _before_on_consume(
self,
event: Event,
subscriber: Subscriber,
execution_context: ExecutionContext,
) -> None:
self.__consumption_start_time = datetime.now()
def _after_on_consume(
self,
event: Event,
subscriber: Subscriber,
execution_context: ExecutionContext,
) -> None:
consumption_time_ms = int((datetime.now() - self.__consumption_start_time).total_seconds() * 1000)
print(
f"Subscriber {subscriber.fqn()} consumed event {event.id} successfully in {consumption_time_ms} ms"
)
# Apply middleware
event_bus = SyncEventBus(
locator=locator,
publish_middlewares=[LoggingPublishMiddleware()],
consume_middlewares=[MetricsConsumeMiddleware()]
)
# Usage
event = UserCreated(user_id="123", email="user@example.com")
event_bus.publish(event)
Transactional Outbox Pattern
Ensure reliable event publishing with database transactions:
from buz.event.transactional_outbox import TransactionalOutboxEventBus
# Configure with your database and event bus
transactional_outbox_bus = TransactionalOutboxEventBus(
outbox_repository=your_outbox_repository,
event_to_outbox_record_translator=your_outbox_record_translator,
...
)
# Events are stored in database, published later by worker
transactional_outbox_bus.publish(event)
RabbitMQ
from buz.event.infrastructure.kombu.kombu_event_bus import KombuEventBus
kombu_event_bus = KombuEventBus(
connection=your_connection,
publish_strategy=your_publish_strategy,
publish_retry_policy=you_publish_retry_policy,
...
)
# Published and consumed in RabbitMQ
kombu_event_bus.publish(event)
Kafka Integration
from buz.kafka import BuzKafkaEventBus
kafka_bus = KafkaEventBus(
publish_strategy=your_publish_strategy,
producer=your_producer,
logger=your_logger,
...
)
# Published and consumed in Kafka
kafka_bus.publish(event)
Async Support
from buz.event.async_event_bus import AsyncEventBus
from buz.query.asynchronous import QueryBus as AsyncQueryBus
from buz.command.asynchronous import CommandHandler as AsyncCommandHandler
# Async event bus
async_event_bus = AsyncEventBus(locator)
await async_event_bus.publish(event)
# Async query bus
async_query_bus = AsyncQueryBus(locator)
await async_query_bus.handle(event)
# Async command bus
async_command_bus = AsyncCommandBus(locator)
await async_command_bus.handle(command)
๐ฆ Message Brokers
Supported Brokers
| Broker | Sync | Async | Installation |
|---|---|---|---|
| In-Memory | โ | โ | Built-in |
| Kafka | โ | โ | pip install buz[aiokafka] |
| RabbitMQ | โ | โ | pip install buz[kombu] |
๐งช Testing
Buz includes testing utilities for unit and integration tests:
from buz.event.sync import SyncEventBus
from buz.locator.sync import InstanceLocator
test_locator = InstanceLocator()
test_bus = SyncEventBus(test_locator)
test_locator.register(EmailSubscriber())
test_bus.publish(UserCreated(user_id="123", email="test@example.com"))
๐ Related Projects
- buz-fever-shared: Opinionated utilities and standards for Buz
- buz-basic-example: Complete example project with Docker setup
๐ Requirements
- Python 3.9+
- Optional dependencies based on features used
๐ค Contributing
We welcome contributions! Please see our Contributing Guidelines for details.
Development Setup
# Clone the repository
git clone https://github.com/Feverup/buz.git
cd buz
# Install with development dependencies
make build
# Run tests
make test
# Run linting
make lint
# Format code
make format
๐ License
This project is licensed under the MIT License - see the LICENSE file for details.
๐ Documentation
- Changelog - Release notes and version history
๐โโ๏ธ Support
- Create an Issue for bug reports or feature requests
Made with โค๏ธ by the Fever Platform Team
Project details
Release history Release notifications | RSS feed
Download files
Download the file for your platform. If you're not sure which to choose, learn more about installing packages.
Source Distribution
Built Distribution
Filter files by name, interpreter, ABI, and platform.
If you're not sure about the file name format, learn more about wheel file names.
Copy a direct link to the current filters
File details
Details for the file buz-2.16.0.tar.gz.
File metadata
- Download URL: buz-2.16.0.tar.gz
- Upload date:
- Size: 59.0 kB
- Tags: Source
- Uploaded using Trusted Publishing? No
- Uploaded via: poetry/1.8.3 CPython/3.9.23 Linux/6.11.0-1018-azure
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
15526f9e3179658a590474090ad544bd1e77b45be5014d8160fd9ea1f60c9869
|
|
| MD5 |
ceff7a0c2c12c21c40dee615743f09a2
|
|
| BLAKE2b-256 |
6eac68e61cf12af4d7ace4447b40d50d3761460d42a0f58d56f9118662f85e52
|
File details
Details for the file buz-2.16.0-py3-none-any.whl.
File metadata
- Download URL: buz-2.16.0-py3-none-any.whl
- Upload date:
- Size: 138.9 kB
- Tags: Python 3
- Uploaded using Trusted Publishing? No
- Uploaded via: poetry/1.8.3 CPython/3.9.23 Linux/6.11.0-1018-azure
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
57762beacb6917d6ff7963e3b16fd91ca8893b3cf754f5f81da2bd72573d3dfc
|
|
| MD5 |
cd3d9f02bc06049df23702d7733b12a0
|
|
| BLAKE2b-256 |
ff1c6af3a5da56940078cbc09076906c192c843df9f906ccca010835edd4c79e
|