Skip to main content

Buz is a set of light, simple and extensible implementations of event, command and query buses.

Project description

Buz

PyPI version Python Support License: MIT Code style: black

Buz is a lightweight, simple, and extensible Python library that provides implementations of Event, Command, and Query buses following CQRS and Event-Driven Architecture patterns.

๐Ÿ“‹ Table of Contents

โœจ Key Features

  • ๐ŸšŒ Bus Types: Event, Command, and Query buses for clean architecture
  • ๐Ÿ”„ Sync & Async Support: Both synchronous and asynchronous implementations
  • ๐Ÿ”ง Middleware System: Extensible middleware for cross-cutting concerns
  • ๐Ÿ“ฆ Message Brokers: Support for Kafka, RabbitMQ (via Kombu), and in-memory
  • ๐Ÿ”’ Transactional Outbox: Reliable event publishing with transactional guarantees
  • ๐ŸŽฏ Dependency Injection: Built-in locator pattern for handler resolution
  • ๐Ÿ“ Type Safety: Fully typed with mypy support
  • ๐Ÿชถ Lightweight: Minimal dependencies, maximum flexibility

๐Ÿš€ Quick Start

Installation

# Basic installation
pip install buz

# With Kafka support
pip install buz[aiokafka]

# With RabbitMQ support
pip install buz[kombu]

# With dependency injection
pip install buz[pypendency]

Basic Usage

Event Bus Example

from dataclasses import dataclass
from buz import Message
from buz.event import Event, BaseSubscriber
from buz.event.sync import SyncEventBus
from buz.locator.sync import InstanceLocator

@dataclass(frozen=True)
class UserCreated(Event):
    user_id: str
    email: str

class EmailSubscriber(BaseSubscriber):
    def consume(self, event: UserCreated) -> None:
        print(f"Sending welcome email to {event.email}")

class AnalyticsSubscriber(BaseSubscriber):
    def consume(self, event: UserCreated) -> None:
        print(f"Tracking user creation: {event.user_id}")

# Setup
locator: InstanceLocator = InstanceLocator()
locator.register(EmailSubscriber())
locator.register(AnalyticsSubscriber())

event_bus = SyncEventBus(locator)

# Usage
event = UserCreated(user_id="123", email="user@example.com")
event_bus.publish(event)

Command Bus Example

from dataclasses import dataclass
from buz.command import Command
from buz.command.synchronous import BaseCommandHandler
from buz.command.synchronous.self_process import SelfProcessCommandBus
from buz.locator.sync import InstanceLocator

@dataclass(frozen=True)
class CreateUser(Command):
    email: str
    name: str

class CreateUserCommandHandler(BaseCommandHandler):
    def handle(self, command: CreateUser) -> None:
        # Business logic here
        print(f"Creating user: {command.name} ({command.email})")

# Setup
locator = InstanceLocator()
locator.register(CreateUserCommandHandler())

command_bus = SelfProcessCommandBus(locator)

# Usage
command = CreateUser(email="user@example.com", name="John Doe")
command_bus.handle(command)

Query Bus Example

from dataclasses import dataclass
from buz.query import Query, QueryResponse
from buz.query.synchronous import BaseQueryHandler
from buz.query.synchronous.self_process import SelfProcessQueryBus
from buz.locator.sync import InstanceLocator

@dataclass(frozen=True)
class GetUser(Query):
    user_id: str

@dataclass(frozen=True)
class User:
    user_id: str
    name: str
    email: str

class GetUserQueryHandler(BaseQueryHandler):
    def handle(self, query: GetUser) -> QueryResponse:
        # Business logic here
        return QueryResponse(
            content=User(
                user_id=query.user_id,
                name="John Doe",
                email="john@example.com"
            )
        )

# Setup
locator = InstanceLocator()
locator.register(GetUserQueryHandler())

query_bus = SelfProcessQueryBus(locator)

# Usage
query = GetUser(user_id="123")
query_response = query_bus.handle(query)
user = query_response.content
print(f"User: {user.name}")

๐Ÿ—๏ธ Architecture

Buz implements the Command Query Responsibility Segregation (CQRS) pattern with distinct buses:

Event Bus

  • Purpose: Publish domain events and notify multiple subscribers
  • Pattern: Pub/Sub with multiple handlers per event
  • Use Cases: Domain event broadcasting, eventual consistency, integration events

Command Bus

  • Purpose: Execute business operations and commands
  • Pattern: Single handler per command
  • Use Cases: Business logic execution, write operations, state changes

Query Bus

  • Purpose: Retrieve data and execute queries
  • Pattern: Single handler per query with typed responses
  • Use Cases: Data retrieval, read operations, projections

๐Ÿ”ง Advanced Features

Middleware System

Add cross-cutting concerns like logging, validation, and metrics:

from datetime import datetime
from buz.event import Event, Subscriber
from buz.event.middleware import BasePublishMiddleware, BaseConsumeMiddleware
from buz.event.infrastructure.models.execution_context import ExecutionContext

class LoggingPublishMiddleware(BasePublishMiddleware):
    def _before_on_publish(self, event: Event) -> None:
        print(f"Publishing event {event}")

    def _after_on_publish(self, event: Event) -> None:
        return

class MetricsConsumeMiddleware(BaseConsumeMiddleware):
    def __init__(self) -> None:
        self.__consumption_start_time: datetime = datetime.now()

    def _before_on_consume(
        self,
        event: Event,
        subscriber: Subscriber,
        execution_context: ExecutionContext,
    ) -> None:
        self.__consumption_start_time = datetime.now()

    def _after_on_consume(
        self,
        event: Event,
        subscriber: Subscriber,
        execution_context: ExecutionContext,
    ) -> None:
        consumption_time_ms = int((datetime.now() - self.__consumption_start_time).total_seconds() * 1000)
        print(
            f"Subscriber {subscriber.fqn()} consumed event {event.id} successfully in {consumption_time_ms} ms"
        )

# Apply middleware
event_bus = SyncEventBus(
    locator=locator,
    publish_middlewares=[LoggingPublishMiddleware()],
    consume_middlewares=[MetricsConsumeMiddleware()]
)

# Usage
event = UserCreated(user_id="123", email="user@example.com")
event_bus.publish(event)

Transactional Outbox Pattern

Ensure reliable event publishing with database transactions:

from buz.event.transactional_outbox import TransactionalOutboxEventBus

# Configure with your database and event bus
transactional_outbox_bus = TransactionalOutboxEventBus(
    outbox_repository=your_outbox_repository,
    event_to_outbox_record_translator=your_outbox_record_translator,
    ...
)

# Events are stored in database, published later by worker
transactional_outbox_bus.publish(event)

RabbitMQ

from buz.event.infrastructure.kombu.kombu_event_bus import KombuEventBus

kombu_event_bus = KombuEventBus(
    connection=your_connection,
    publish_strategy=your_publish_strategy,
    publish_retry_policy=you_publish_retry_policy,
    ...
)

# Published and consumed in RabbitMQ
kombu_event_bus.publish(event)

Kafka Integration

from buz.kafka import BuzKafkaEventBus

kafka_bus = KafkaEventBus(
    publish_strategy=your_publish_strategy,
    producer=your_producer,
    logger=your_logger,
    ...
)

# Published and consumed in Kafka
kafka_bus.publish(event)

Async Support

from buz.event.async_event_bus import AsyncEventBus
from buz.query.asynchronous import QueryBus as AsyncQueryBus
from buz.command.asynchronous import CommandHandler as AsyncCommandHandler


# Async event bus
async_event_bus = AsyncEventBus(locator)
await async_event_bus.publish(event)

# Async query bus
async_query_bus = AsyncQueryBus(locator)
await async_query_bus.handle(event)

# Async command bus
async_command_bus = AsyncCommandBus(locator)
await async_command_bus.handle(command)

๐Ÿ“ฆ Message Brokers

Supported Brokers

Broker Sync Async Installation
In-Memory โœ… โœ… Built-in
Kafka โœ… โœ… pip install buz[aiokafka]
RabbitMQ โœ… โŒ pip install buz[kombu]

๐Ÿงช Testing

Buz includes testing utilities for unit and integration tests:

from buz.event.sync import SyncEventBus
from buz.locator.sync import InstanceLocator

test_locator = InstanceLocator()
test_bus = SyncEventBus(test_locator)

test_locator.register(EmailSubscriber())
test_bus.publish(UserCreated(user_id="123", email="test@example.com"))

๐Ÿ”— Related Projects

๐Ÿ“‹ Requirements

  • Python 3.9+
  • Optional dependencies based on features used

๐Ÿค Contributing

We welcome contributions! Please see our Contributing Guidelines for details.

Development Setup

# Clone the repository
git clone https://github.com/Feverup/buz.git
cd buz

# Install with development dependencies
make build

# Run tests
make test

# Run linting
make lint

# Format code
make format

๐Ÿ“„ License

This project is licensed under the MIT License - see the LICENSE file for details.

๐Ÿ“š Documentation

  • Changelog - Release notes and version history

๐Ÿ™‹โ€โ™€๏ธ Support

  • Create an Issue for bug reports or feature requests

Made with โค๏ธ by the Fever Platform Team

Project details


Release history Release notifications | RSS feed

Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

buz-2.16.0.tar.gz (59.0 kB view details)

Uploaded Source

Built Distribution

If you're not sure about the file name format, learn more about wheel file names.

buz-2.16.0-py3-none-any.whl (138.9 kB view details)

Uploaded Python 3

File details

Details for the file buz-2.16.0.tar.gz.

File metadata

  • Download URL: buz-2.16.0.tar.gz
  • Upload date:
  • Size: 59.0 kB
  • Tags: Source
  • Uploaded using Trusted Publishing? No
  • Uploaded via: poetry/1.8.3 CPython/3.9.23 Linux/6.11.0-1018-azure

File hashes

Hashes for buz-2.16.0.tar.gz
Algorithm Hash digest
SHA256 15526f9e3179658a590474090ad544bd1e77b45be5014d8160fd9ea1f60c9869
MD5 ceff7a0c2c12c21c40dee615743f09a2
BLAKE2b-256 6eac68e61cf12af4d7ace4447b40d50d3761460d42a0f58d56f9118662f85e52

See more details on using hashes here.

File details

Details for the file buz-2.16.0-py3-none-any.whl.

File metadata

  • Download URL: buz-2.16.0-py3-none-any.whl
  • Upload date:
  • Size: 138.9 kB
  • Tags: Python 3
  • Uploaded using Trusted Publishing? No
  • Uploaded via: poetry/1.8.3 CPython/3.9.23 Linux/6.11.0-1018-azure

File hashes

Hashes for buz-2.16.0-py3-none-any.whl
Algorithm Hash digest
SHA256 57762beacb6917d6ff7963e3b16fd91ca8893b3cf754f5f81da2bd72573d3dfc
MD5 cd3d9f02bc06049df23702d7733b12a0
BLAKE2b-256 ff1c6af3a5da56940078cbc09076906c192c843df9f906ccca010835edd4c79e

See more details on using hashes here.

Supported by

AWS Cloud computing and Security Sponsor Datadog Monitoring Depot Continuous Integration Fastly CDN Google Download Analytics Pingdom Monitoring Sentry Error logging StatusPage Status page