Skip to main content

Python CQRS pattern implementation

Project description

Python CQRS pattern implementation with Transaction Outbox supporting

Overview

This is a package for implementing the CQRS (Command Query Responsibility Segregation) pattern in Python applications. It provides a set of abstractions and utilities to help separate read and write use cases, ensuring better scalability, performance, and maintainability of the application.

This package is a fork of the diator project (documentation) with several enhancements:

  1. Support for Pydantic v2.*;
  2. Kafka support using aiokafka;
  3. Added EventMediator for handling Notification and ECST events coming from the bus;
  4. Redesigned the event and request mapping mechanism to handlers;
  5. Added bootstrap for easy setup;
  6. Added support for Transaction Outbox, ensuring that Notification and ECST events are sent to the broker;
  7. FastAPI supporting;
  8. FastStream supporting;
  9. Protobuf events supporting;
  10. StreamingRequestMediator and StreamingRequestHandler for handling streaming requests with real-time progress updates;
  11. Parallel event processing with configurable concurrency limits.

Request Handlers

Request handlers can be divided into two main types:

Command Handler

Command Handler executes the received command. The logic of the handler may include, for example, modifying the state of the domain model. As a result of executing the command, an event may be produced to the broker.

[!TIP] By default, the command handler does not return any result, but it is not mandatory.

from cqrs.requests.request_handler import RequestHandler, SyncRequestHandler
from cqrs.events.event import Event

class JoinMeetingCommandHandler(RequestHandler[JoinMeetingCommand, None]):

      def __init__(self, meetings_api: MeetingAPIProtocol) -> None:
          self._meetings_api = meetings_api
          self.events: list[Event] = []

      @property
      def events(self) -> typing.List[events.Event]:
          return self._events

      async def handle(self, request: JoinMeetingCommand) -> None:
          await self._meetings_api.join_user(request.user_id, request.meeting_id)


class SyncJoinMeetingCommandHandler(SyncRequestHandler[JoinMeetingCommand, None]):

      def __init__(self, meetings_api: MeetingAPIProtocol) -> None:
          self._meetings_api = meetings_api
          self.events: list[Event] = []

      @property
      def events(self) -> typing.List[events.Event]:
          return self._events

      def handle(self, request: JoinMeetingCommand) -> None:
          # do some sync logic
          ...

A complete example can be found in the documentation

Query handler

Query Handler returns a representation of the requested data, for example, from the read model.

[!TIP] The read model can be constructed based on domain events produced by the Command Handler.

from cqrs.requests.request_handler import RequestHandler
from cqrs.events.event import Event

class ReadMeetingQueryHandler(RequestHandler[ReadMeetingQuery, ReadMeetingQueryResult]):

      def __init__(self, meetings_api: MeetingAPIProtocol) -> None:
          self._meetings_api = meetings_api
          self.events: list[Event] = []

      @property
      def events(self) -> typing.List[events.Event]:
          return self._events

      async def handle(self, request: ReadMeetingQuery) -> ReadMeetingQueryResult:
          link = await self._meetings_api.get_link(request.meeting_id)
          return ReadMeetingQueryResult(link=link, meeting_id=request.meeting_id)

A complete example can be found in the documentation

Streaming Request Handler

Streaming Request Handler processes requests incrementally and yields results as they become available. This is particularly useful for processing large batches of items, file uploads, or any operation that benefits from real-time progress updates.

StreamingRequestHandler works with StreamingRequestMediator that streams results to clients in real-time.

import typing
from cqrs.requests.request_handler import StreamingRequestHandler
from cqrs.events.event import Event

class ProcessFilesCommandHandler(StreamingRequestHandler[ProcessFilesCommand, FileProcessedResult]):
    def __init__(self):
        self._events: list[Event] = []

    @property
    def events(self) -> list[Event]:
        return self._events.copy()

    async def handle(self, request: ProcessFilesCommand) -> typing.AsyncIterator[FileProcessedResult]:
        for file_id in request.file_ids:
            # Process file
            result = FileProcessedResult(file_id=file_id, status="completed", ...)
            # Emit events
            self._events.append(FileProcessedEvent(file_id=file_id, ...))
            yield result
            self._events.clear()

A complete example can be found in the documentation

Event Handlers

Event handlers are designed to process Notification and ECST events that are consumed from the broker. To configure event handling, you need to implement a broker consumer on the side of your application. Below is an example of Kafka event consuming that can be used in the Presentation Layer.

class JoinMeetingCommandHandler(cqrs.RequestHandler[JoinMeetingCommand, None]):
    def __init__(self):
        self._events = []

    @property
    def events(self):
        return self._events

    async def handle(self, request: JoinMeetingCommand) -> None:
        STORAGE[request.meeting_id].append(request.user_id)
        self._events.append(
            UserJoined(user_id=request.user_id, meeting_id=request.meeting_id),
        )
        print(f"User {request.user_id} joined meeting {request.meeting_id}")


class UserJoinedEventHandler(cqrs.EventHandler[UserJoined]):
    async def handle(self, event: UserJoined) -> None:
        print(f"Handle user {event.user_id} joined meeting {event.meeting_id} event")

A complete example can be found in the documentation

Parallel Event Processing

Both RequestMediator and StreamingRequestMediator support parallel processing of domain events. You can control the number of event handlers that run simultaneously using the max_concurrent_event_handlers parameter.

This feature is especially useful when:

  • Multiple event handlers need to process events independently
  • You want to improve performance by processing events concurrently
  • You need to limit resource consumption by controlling concurrency

Configuration:

from cqrs.requests import bootstrap

mediator = bootstrap.bootstrap_streaming(
    di_container=container,
    commands_mapper=commands_mapper,
    domain_events_mapper=domain_events_mapper,
    message_broker=broker,
    max_concurrent_event_handlers=3,  # Process up to 3 events in parallel
    concurrent_event_handle_enable=True,  # Enable parallel processing
)

[!TIP]

  • Set max_concurrent_event_handlers to limit the number of simultaneously running event handlers
  • Set concurrent_event_handle_enable=False to disable parallel processing and process events sequentially
  • The default value for max_concurrent_event_handlers is 10 for StreamingRequestMediator and 1 for RequestMediator

Producing Notification Events

During the handling of a command, cqrs.NotificationEvent events may be generated and then sent to the broker.

class JoinMeetingCommandHandler(cqrs.RequestHandler[JoinMeetingCommand, None]):
    def __init__(self):
        self._events = []

    @property
    def events(self):
        return self._events

    async def handle(self, request: JoinMeetingCommand) -> None:
        print(f"User {request.user_id} joined meeting {request.meeting_id}")
        self._events.append(
            cqrs.NotificationEvent[UserJoinedNotificationPayload](
                event_name="UserJoined",
                topic="user_notification_events",
                payload=UserJoinedNotificationPayload(
                    user_id=request.user_id,
                    meeting_id=request.meeting_id,
                ),
            )
        )
        self._events.append(
            cqrs.NotificationEvent[UserJoinedECSTPayload](
                event_name="UserJoined",
                topic="user_ecst_events",
                payload=UserJoinedECSTPayload(
                    user_id=request.user_id,
                    meeting_id=request.meeting_id,
                ),
            )
        )

A complete example can be found in the documentation

After processing the command/request, if there are any Notification/ECST events, the EventEmitter is invoked to produce the events via the message broker.

[!WARNING] It is important to note that producing events using the events property parameter does not guarantee message delivery to the broker. In the event of broker unavailability or an exception occurring during message formation or sending, the message may be lost. This issue can potentially be addressed by configuring retry attempts for sending messages to the broker, but we recommend using the Transaction Outbox pattern, which is implemented in the current version of the python-cqrs package for this purpose.

Kafka broker

from cqrs.adapters import kafka as kafka_adapter
from cqrs.message_brokers import kafka as kafka_broker


producer = kafka_adapter.kafka_producer_factory(
    dsn="localhost:9092",
    topics=["test.topic1", "test.topic2"],
)
broker = kafka_broker.KafkaMessageBroker(producer)
await broker.send_message(...)

Transactional Outbox

The package implements the Transactional Outbox pattern, which ensures that messages are produced to the broker according to the at-least-once semantics.

def do_some_logic(meeting_room_id: int, session: sql_session.AsyncSession):
    """
    Make changes to the database
    """
    session.add(...)


class JoinMeetingCommandHandler(cqrs.RequestHandler[JoinMeetingCommand, None]):
    def __init__(self, outbox: cqrs.OutboxedEventRepository):
        self.outbox = outbox

    @property
    def events(self):
        return []

    async def handle(self, request: JoinMeetingCommand) -> None:
        print(f"User {request.user_id} joined meeting {request.meeting_id}")
        async with self.outbox as session:
            do_some_logic(request.meeting_id, session) # business logic
            self.outbox.add(
                session,
                cqrs.NotificationEvent[UserJoinedNotificationPayload](
                    event_name="UserJoined",
                    topic="user_notification_events",
                    payload=UserJoinedNotificationPayload(
                        user_id=request.user_id,
                        meeting_id=request.meeting_id,
                    ),
                ),
            )
            self.outbox.add(
                session,
                cqrs.NotificationEvent[UserJoinedECSTPayload](
                    event_name="UserJoined",
                    topic="user_ecst_events",
                    payload=UserJoinedECSTPayload(
                        user_id=request.user_id,
                        meeting_id=request.meeting_id,
                    ),
                ),
            )
            await self.outbox.commit(session)

A complete example can be found in the documentation

[!TIP] You can specify the name of the Outbox table using the environment variable OUTBOX_SQLA_TABLE. By default, it is set to outbox.

[!TIP] If you use the protobuf events you should specify OutboxedEventRepository by protobuf serialize. A complete example can be found in the documentation

Producing Events from Outbox to Kafka

As an implementation of the Transactional Outbox pattern, the SqlAlchemyOutboxedEventRepository is available for use as an access repository to the Outbox storage. It can be utilized in conjunction with the KafkaMessageBroker.

import asyncio
import cqrs
from cqrs.message_brokers import kafka
from cqrs.adapters import kafka as kafka_adapters
from cqrs.compressors import zlib

session_factory = async_sessionmaker(
   create_async_engine(
      f"mysql+asyncmy://{USER}:{PASSWORD}@{HOSTNAME}:{PORT}/{DATABASE}",
      isolation_level="REPEATABLE READ",
   )
)

broker = kafka.KafkaMessageBroker(
   producer=kafka_adapters.kafka_producer_factory(dsn="localhost:9092"),
)

producer = cqrs.EventProducer(broker, cqrs.SqlAlchemyOutboxedEventRepository(session_factory, zlib.ZlibCompressor()))


async def periodically_task():
   async for messages in producer.event_batch_generator():
      for message in messages:
         await producer.send_message(message)
      await producer.repository.commit()
      await asyncio.sleep(10)


loop = asyncio.get_event_loop()
loop.run_until_complete(periodically_task())

A complete example can be found in the documentation

Transaction log tailing

If the Outbox polling strategy does not suit your needs, I recommend exploring the Transaction Log Tailing pattern. The current version of the python-cqrs package does not support the implementation of this pattern.

[!TIP] However, it can be implemented using Debezium + Kafka Connect, which allows you to produce all newly created events within the Outbox storage directly to the corresponding topic in Kafka (or any other broker).

DI container

Use the following example to set up dependency injection in your command, query and event handlers. This will make dependency management simpler.

The package supports two DI container libraries:

di library

import di
...

def setup_di() -> di.Container:
    """
    Binds implementations to dependencies
    """
    container = di.Container()
    container.bind(
        di.bind_by_type(
            dependent.Dependent(cqrs.SqlAlchemyOutboxedEventRepository, scope="request"),
            cqrs.OutboxedEventRepository
        )
    )
    container.bind(
        di.bind_by_type(
            dependent.Dependent(MeetingAPIImplementaion, scope="request"),
            MeetingAPIProtocol
        )
    )
    return container

A complete example can be found in the documentation

dependency-injector library

The package also supports dependency-injector library. You can use DependencyInjectorCQRSContainer adapter to integrate dependency-injector containers with python-cqrs.

from dependency_injector import containers, providers
from cqrs.container.dependency_injector import DependencyInjectorCQRSContainer

class ApplicationContainer(containers.DeclarativeContainer):
    # Define your providers
    service = providers.Factory(ServiceImplementation)

# Create CQRS container adapter
cqrs_container = DependencyInjectorCQRSContainer(ApplicationContainer())

# Use with bootstrap
mediator = bootstrap.bootstrap(
    di_container=cqrs_container,
    commands_mapper=commands_mapper,
    ...
)

Complete examples can be found in:

Mapping

To bind commands, queries and events with specific handlers, you can use the registries EventMap and RequestMap.

from cqrs import requests, events

from app import commands, command_handlers
from app import queries, query_handlers
from app import events as event_models, event_handlers


def init_commands(mapper: requests.RequestMap) -> None:
    mapper.bind(commands.JoinMeetingCommand, command_handlers.JoinMeetingCommandHandler)

def init_queries(mapper: requests.RequestMap) -> None:
    mapper.bind(queries.ReadMeetingQuery, query_handlers.ReadMeetingQueryHandler)

def init_events(mapper: events.EventMap) -> None:
    mapper.bind(events.NotificationEvent[events_models.NotificationMeetingRoomClosed], event_handlers.MeetingRoomClosedNotificationHandler)
    mapper.bind(events.NotificationEvent[event_models.ECSTMeetingRoomClosed], event_handlers.UpdateMeetingRoomReadModelHandler)

Bootstrap

The python-cqrs package implements a set of bootstrap utilities designed to simplify the initial configuration of an application.

import functools

from cqrs.events import bootstrap as event_bootstrap
from cqrs.requests import bootstrap as request_bootstrap

from app import dependencies, mapping, orm


@functools.lru_cache
def mediator_factory():
    return request_bootstrap.bootstrap(
        di_container=dependencies.setup_di(),
        commands_mapper=mapping.init_commands,
        queries_mapper=mapping.init_queries,
        domain_events_mapper=mapping.init_events,
        on_startup=[orm.init_store_event_mapper],
    )


@functools.lru_cache
def event_mediator_factory():
    return event_bootstrap.bootstrap(
        di_container=dependencies.setup_di(),
        events_mapper=mapping.init_events,
        on_startup=[orm.init_store_event_mapper],
    )

Integration with presentation layers

[!TIP] I recommend reading the useful paper Onion Architecture Used in Software Development. Separating user interaction and use-cases into Application and Presentation layers is a good practice. This can improve the Testability, Maintainability, Scalability of the application. It also provides benefits such as Separation of Concerns.

FastAPI requests handling

If your application uses FastAPI (or any other asynchronous framework for creating APIs). In this case you can use python-cqrs to route requests to the appropriate handlers implementing specific use-cases.

import fastapi
import pydantic

from app import dependecies, commands

router = fastapi.APIRouter(prefix="/meetings")


@router.put("/{meeting_id}/{user_id}", status_code=status.HTTP_200_OK)
async def join_metting(
    meeting_id: pydantic.PositiveInt,
    user_id: typing.Text,
    mediator: cqrs.RequestMediator = fastapi.Depends(dependencies.mediator_factory),
):
    await mediator.send(commands.JoinMeetingCommand(meeting_id=meeting_id, user_id=user_id))
    return {"result": "ok"}

A complete example can be found in the documentation

Kafka events consuming

If you build interaction by events over broker like Kafka, you can to implement an event consumer on your application's side, which will call the appropriate handler for each event. An example of handling events from Kafka is provided below.

import cqrs

import pydantic
import faststream
from faststream import kafka

broker = kafka.KafkaBroker(bootstrap_servers=["localhost:9092"])
app = faststream.FastStream(broker)


class HelloWorldPayload(pydantic.BaseModel):
    hello: str = pydantic.Field(default="Hello")
    world: str = pydantic.Field(default="World")


class HelloWorldECSTEventHandler(cqrs.EventHandler[cqrs.NotificationEvent[HelloWorldPayload]]):
    async def handle(self, event: cqrs.NotificationEvent[HelloWorldPayload]) -> None:
        print(f"{event.payload.hello} {event.payload.world}")  # type: ignore


@broker.subscriber(
    "hello_world",
    group_id="examples",
    auto_commit=False,
    value_deserializer=value_deserializer,
    decoder=decoder,
)
async def hello_world_event_handler(
    body: cqrs.NotificationEvent[HelloWorldPayload] | None,
    msg: kafka.KafkaMessage,
    mediator: cqrs.EventMediator = faststream.Depends(mediator_factory),
):
    if body is not None:
        await mediator.send(body)
    await msg.ack()

A complete example can be found in the documentation

FastAPI SSE Streaming

StreamingRequestMediator is ready and designed for use with Server-Sent Events (SSE) in FastAPI applications. This allows you to stream results to clients in real-time as they are processed.

Example FastAPI endpoint with SSE:

import fastapi
import json
from cqrs.requests import bootstrap

def streaming_mediator_factory() -> cqrs.StreamingRequestMediator:
    return bootstrap.bootstrap_streaming(
        di_container=container,
        commands_mapper=commands_mapper,
        domain_events_mapper=domain_events_mapper,
        message_broker=broker,
        max_concurrent_event_handlers=3,
        concurrent_event_handle_enable=True,
    )

@app.post("/process-files")
async def process_files_stream(
    command: ProcessFilesCommand,
    mediator: cqrs.StreamingRequestMediator = fastapi.Depends(streaming_mediator_factory),
) -> fastapi.responses.StreamingResponse:
    async def generate_sse():
        yield f"data: {json.dumps({'type': 'start', 'message': 'Processing...'})}\n\n"

        async for result in mediator.stream(command):
            sse_data = {
                "type": "progress",
                "data": result.model_dump(),
            }
            yield f"data: {json.dumps(sse_data)}\n\n"

        yield f"data: {json.dumps({'type': 'complete'})}\n\n"

    return fastapi.responses.StreamingResponse(
        generate_sse(),
        media_type="text/event-stream",
    )

A complete example can be found in the documentation

Protobuf messaging

The python-cqrs package supports integration with protobuf.
Protocol buffers are Google’s language-neutral, platform-neutral, extensible mechanism for serializing structured data – think XML, but smaller, faster, and simpler. You define how you want your data to be structured once, then you can use special generated source code to easily write and read your structured data to and from a variety of data streams and using a variety of languages.

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

python_cqrs-4.2.0.tar.gz (41.4 kB view details)

Uploaded Source

Built Distribution

If you're not sure about the file name format, learn more about wheel file names.

python_cqrs-4.2.0-py3-none-any.whl (46.0 kB view details)

Uploaded Python 3

File details

Details for the file python_cqrs-4.2.0.tar.gz.

File metadata

  • Download URL: python_cqrs-4.2.0.tar.gz
  • Upload date:
  • Size: 41.4 kB
  • Tags: Source
  • Uploaded using Trusted Publishing? No
  • Uploaded via: twine/6.2.0 CPython/3.12.12

File hashes

Hashes for python_cqrs-4.2.0.tar.gz
Algorithm Hash digest
SHA256 ccc539332927c310c6a8260945095d56ef744eb31434a6bbf74254dde971c242
MD5 e0fbfc93b30a07ff3e01fe7bc7d690bf
BLAKE2b-256 a45944e4f09153c98e732a1080d3d1406ade51958afa811d9c243cb718818dee

See more details on using hashes here.

File details

Details for the file python_cqrs-4.2.0-py3-none-any.whl.

File metadata

  • Download URL: python_cqrs-4.2.0-py3-none-any.whl
  • Upload date:
  • Size: 46.0 kB
  • Tags: Python 3
  • Uploaded using Trusted Publishing? No
  • Uploaded via: twine/6.2.0 CPython/3.12.12

File hashes

Hashes for python_cqrs-4.2.0-py3-none-any.whl
Algorithm Hash digest
SHA256 f8d6b09a11c535cfc2643742e0d1e7f7f435478f801a61f7daa7c864d6cfb9f6
MD5 645280287848db1a0f0168c2a6805619
BLAKE2b-256 4b5bad7ce6c7d43ddda5a5e72f550627ec1589c353dd1983ef8dc2babdaabdf1

See more details on using hashes here.

Supported by

AWS Cloud computing and Security Sponsor Datadog Monitoring Depot Continuous Integration Fastly CDN Google Download Analytics Pingdom Monitoring Sentry Error logging StatusPage Status page