Skip to main content

Buz is a set of light, simple and extensible implementations of event, command and query buses.

Project description

Buz

PyPI version Python Support License: MIT Code style: black

Buz is a lightweight, simple, and extensible Python library that provides implementations of Event, Command, and Query buses following CQRS and Event-Driven Architecture patterns.

๐Ÿ“‹ Table of Contents

โœจ Key Features

  • ๐ŸšŒ Bus Types: Event, Command, and Query buses for clean architecture
  • ๐Ÿ”„ Sync & Async Support: Both synchronous and asynchronous implementations
  • ๐Ÿ”ง Middleware System: Extensible middleware for cross-cutting concerns
  • ๐Ÿ“ฆ Message Brokers: Support for Kafka, RabbitMQ (via Kombu), and in-memory
  • ๐Ÿ”’ Transactional Outbox: Reliable event publishing with transactional guarantees
  • ๐ŸŽฏ Dependency Injection: Built-in locator pattern for handler resolution
  • ๐Ÿ“ Type Safety: Fully typed with mypy support
  • ๐Ÿชถ Lightweight: Minimal dependencies, maximum flexibility

๐Ÿš€ Quick Start

Installation

# Basic installation
pip install buz

# With Kafka support
pip install buz[aiokafka]

# With RabbitMQ support
pip install buz[kombu]

# With dependency injection
pip install buz[pypendency]

Basic Usage

Event Bus Example

from dataclasses import dataclass
from buz import Message
from buz.event import Event, BaseSubscriber
from buz.event.sync import SyncEventBus
from buz.locator.sync import InstanceLocator

@dataclass(frozen=True)
class UserCreated(Event):
    user_id: str
    email: str

class EmailSubscriber(BaseSubscriber):
    def consume(self, event: UserCreated) -> None:
        print(f"Sending welcome email to {event.email}")

class AnalyticsSubscriber(BaseSubscriber):
    def consume(self, event: UserCreated) -> None:
        print(f"Tracking user creation: {event.user_id}")

# Setup
locator: InstanceLocator = InstanceLocator()
locator.register(EmailSubscriber())
locator.register(AnalyticsSubscriber())

event_bus = SyncEventBus(locator)

# Usage
event = UserCreated(user_id="123", email="user@example.com")
event_bus.publish(event)

Command Bus Example

from dataclasses import dataclass
from buz.command import Command
from buz.command.synchronous import BaseCommandHandler
from buz.command.synchronous.self_process import SelfProcessCommandBus
from buz.locator.sync import InstanceLocator

@dataclass(frozen=True)
class CreateUser(Command):
    email: str
    name: str

class CreateUserCommandHandler(BaseCommandHandler):
    def handle(self, command: CreateUser) -> None:
        # Business logic here
        print(f"Creating user: {command.name} ({command.email})")

# Setup
locator = InstanceLocator()
locator.register(CreateUserCommandHandler())

command_bus = SelfProcessCommandBus(locator)

# Usage
command = CreateUser(email="user@example.com", name="John Doe")
command_bus.handle(command)

Query Bus Example

from dataclasses import dataclass
from buz.query import Query, QueryResponse
from buz.query.synchronous import BaseQueryHandler
from buz.query.synchronous.self_process import SelfProcessQueryBus
from buz.locator.sync import InstanceLocator

@dataclass(frozen=True)
class GetUser(Query):
    user_id: str

@dataclass(frozen=True)
class User:
    user_id: str
    name: str
    email: str

class GetUserQueryHandler(BaseQueryHandler):
    def handle(self, query: GetUser) -> QueryResponse:
        # Business logic here
        return QueryResponse(
            content=User(
                user_id=query.user_id,
                name="John Doe",
                email="john@example.com"
            )
        )

# Setup
locator = InstanceLocator()
locator.register(GetUserQueryHandler())

query_bus = SelfProcessQueryBus(locator)

# Usage
query = GetUser(user_id="123")
query_response = query_bus.handle(query)
user = query_response.content
print(f"User: {user.name}")

๐Ÿ—๏ธ Architecture

Buz implements the Command Query Responsibility Segregation (CQRS) pattern with distinct buses:

Event Bus

  • Purpose: Publish domain events and notify multiple subscribers
  • Pattern: Pub/Sub with multiple handlers per event
  • Use Cases: Domain event broadcasting, eventual consistency, integration events

Command Bus

  • Purpose: Execute business operations and commands
  • Pattern: Single handler per command
  • Use Cases: Business logic execution, write operations, state changes

Query Bus

  • Purpose: Retrieve data and execute queries
  • Pattern: Single handler per query with typed responses
  • Use Cases: Data retrieval, read operations, projections

๐Ÿ”ง Advanced Features

Middleware System

Add cross-cutting concerns like logging, validation, and metrics:

from datetime import datetime
from buz.event import Event, Subscriber
from buz.event.middleware import BasePublishMiddleware, BaseConsumeMiddleware
from buz.event.infrastructure.models.execution_context import ExecutionContext

class LoggingPublishMiddleware(BasePublishMiddleware):
    def _before_on_publish(self, event: Event) -> None:
        print(f"Publishing event {event}")

    def _after_on_publish(self, event: Event) -> None:
        return

class MetricsConsumeMiddleware(BaseConsumeMiddleware):
    def __init__(self) -> None:
        self.__consumption_start_time: datetime = datetime.now()

    def _before_on_consume(
        self,
        event: Event,
        subscriber: Subscriber,
        execution_context: ExecutionContext,
    ) -> None:
        self.__consumption_start_time = datetime.now()

    def _after_on_consume(
        self,
        event: Event,
        subscriber: Subscriber,
        execution_context: ExecutionContext,
    ) -> None:
        consumption_time_ms = int((datetime.now() - self.__consumption_start_time).total_seconds() * 1000)
        print(
            f"Subscriber {subscriber.fqn()} consumed event {event.id} successfully in {consumption_time_ms} ms"
        )

# Apply middleware
event_bus = SyncEventBus(
    locator=locator,
    publish_middlewares=[LoggingPublishMiddleware()],
    consume_middlewares=[MetricsConsumeMiddleware()]
)

# Usage
event = UserCreated(user_id="123", email="user@example.com")
event_bus.publish(event)

Transactional Outbox Pattern

Ensure reliable event publishing with database transactions:

from buz.event.transactional_outbox import TransactionalOutboxEventBus

# Configure with your database and event bus
transactional_outbox_bus = TransactionalOutboxEventBus(
    outbox_repository=your_outbox_repository,
    event_to_outbox_record_translator=your_outbox_record_translator,
    ...
)

# Events are stored in database, published later by worker
transactional_outbox_bus.publish(event)

RabbitMQ

from buz.event.infrastructure.kombu.kombu_event_bus import KombuEventBus

kombu_event_bus = KombuEventBus(
    connection=your_connection,
    publish_strategy=your_publish_strategy,
    publish_retry_policy=you_publish_retry_policy,
    ...
)

# Published and consumed in RabbitMQ
kombu_event_bus.publish(event)

Kafka Integration

from buz.kafka import BuzKafkaEventBus

kafka_bus = KafkaEventBus(
    publish_strategy=your_publish_strategy,
    producer=your_producer,
    logger=your_logger,
    ...
)

# Published and consumed in Kafka
kafka_bus.publish(event)

Async Support

from buz.event.async_event_bus import AsyncEventBus
from buz.query.asynchronous import QueryBus as AsyncQueryBus
from buz.command.asynchronous import CommandHandler as AsyncCommandHandler


# Async event bus
async_event_bus = AsyncEventBus(locator)
await async_event_bus.publish(event)

# Async query bus
async_query_bus = AsyncQueryBus(locator)
await async_query_bus.handle(event)

# Async command bus
async_command_bus = AsyncCommandBus(locator)
await async_command_bus.handle(command)

๐Ÿ“ฆ Message Brokers

Supported Brokers

Broker Sync Async Installation
In-Memory โœ… โœ… Built-in
Kafka โœ… โœ… pip install buz[aiokafka]
RabbitMQ โœ… โŒ pip install buz[kombu]

๐Ÿงช Testing

Buz includes testing utilities for unit and integration tests:

from buz.event.sync import SyncEventBus
from buz.locator.sync import InstanceLocator

test_locator = InstanceLocator()
test_bus = SyncEventBus(test_locator)

test_locator.register(EmailSubscriber())
test_bus.publish(UserCreated(user_id="123", email="test@example.com"))

๐Ÿ”— Related Projects

๐Ÿ“‹ Requirements

  • Python 3.9+
  • Optional dependencies based on features used

๐Ÿค Contributing

We welcome contributions! Please see our Contributing Guidelines for details.

Development Setup

# Clone the repository
git clone https://github.com/Feverup/buz.git
cd buz

# Install with development dependencies
make build

# Run tests
make test

# Run linting
make lint

# Format code
make format

๐Ÿ“„ License

This project is licensed under the MIT License - see the LICENSE file for details.

๐Ÿ“š Documentation

  • Changelog - Release notes and version history

๐Ÿ™‹โ€โ™€๏ธ Support

  • Create an Issue for bug reports or feature requests

Made with โค๏ธ by the Fever Platform Team

Project details


Release history Release notifications | RSS feed

Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

buz-3.0.0rc1.tar.gz (67.2 kB view details)

Uploaded Source

Built Distribution

If you're not sure about the file name format, learn more about wheel file names.

buz-3.0.0rc1-py3-none-any.whl (161.6 kB view details)

Uploaded Python 3

File details

Details for the file buz-3.0.0rc1.tar.gz.

File metadata

  • Download URL: buz-3.0.0rc1.tar.gz
  • Upload date:
  • Size: 67.2 kB
  • Tags: Source
  • Uploaded using Trusted Publishing? No
  • Uploaded via: poetry/1.8.3 CPython/3.10.19 Linux/6.14.0-1017-azure

File hashes

Hashes for buz-3.0.0rc1.tar.gz
Algorithm Hash digest
SHA256 5c73d4a32f6c4e71a63e7f2fbe49dd4d390fbeea5fc5b6674dd317152763c19b
MD5 5bb0921454854cd06179558db5909386
BLAKE2b-256 b676aa5a1561d485454734cb972dae03af7b85f953481e5e02907db5568cab88

See more details on using hashes here.

File details

Details for the file buz-3.0.0rc1-py3-none-any.whl.

File metadata

  • Download URL: buz-3.0.0rc1-py3-none-any.whl
  • Upload date:
  • Size: 161.6 kB
  • Tags: Python 3
  • Uploaded using Trusted Publishing? No
  • Uploaded via: poetry/1.8.3 CPython/3.10.19 Linux/6.14.0-1017-azure

File hashes

Hashes for buz-3.0.0rc1-py3-none-any.whl
Algorithm Hash digest
SHA256 6e57d1238883576f986250789382a9841962923a8d55554fde3a149e3ed5108e
MD5 1f1a1516a80be14cf4cd3ade3a054c7d
BLAKE2b-256 8c5823e671269262bb431efd2b6c82e9bd4546325ca6bed867b1e4169469aafe

See more details on using hashes here.

Supported by

AWS Cloud computing and Security Sponsor Datadog Monitoring Depot Continuous Integration Fastly CDN Google Download Analytics Pingdom Monitoring Sentry Error logging StatusPage Status page