Skip to main content

Qx CQRS / Mediator: commands, queries, events, pipeline behaviors

Project description

qx-cqrs

CQRS and Mediator implementation for the Qx framework. Dispatches commands, queries, domain events, integration events, and notifications through a composable pipeline of behaviors.

What lives here

  • qx.cqrs.Mediator — central dispatcher. Resolves handlers via the DI container; supports three registration modes: decorator scan, explicit register(), and type-based lookup.
  • qx.cqrs.Command / Query — base Pydantic models for all messages. Command[TResult] mutates state; Query[TResult] is read-only.
  • qx.cqrs.command_handler / query_handler / event_handler — decorators that bind a handler class to its message type.
  • qx.cqrs.Behavior — single-method interface (handle(request, next)) for pipeline stages.
  • Built-in behaviors: LoggingBehavior, TracingBehavior, MetricsBehavior, IdempotencyBehavior, ExceptionTranslationBehavior, TransactionBehavior, ValidationBehavior, RetryBehavior, AuthorizationBehavior.
  • qx.cqrs.compose — assemble a BehaviorChain from an ordered list of behaviors.

Usage

from qx.cqrs import Command, Mediator, command_handler
from qx.core import Result

class CreateUserCommand(Command[UserDto]):
    email: str
    name: str

@command_handler(CreateUserCommand)
class CreateUserHandler:
    def __init__(self, repo: UserRepository, uow: UnitOfWork) -> None: ...

    async def handle(self, cmd: CreateUserCommand) -> Result[UserDto]:
        result = User.register(cmd.email, cmd.name)
        ...
        return Result.success(dto)

# Bootstrap
mediator = Mediator(container)
mediator.register_decorated("myapp.application")

# Dispatch (in a route or test)
result = await mediator.send(CreateUserCommand(email="a@b.com", name="Ada"), scope=scope)

Pipeline composition

from qx.cqrs import compose, LoggingBehavior, TracingBehavior, ExceptionTranslationBehavior

pipeline = compose(
    LoggingBehavior(),
    TracingBehavior(tracer),
    ExceptionTranslationBehavior(),
)
mediator = Mediator(container, pipeline=pipeline)

Design rules

  • Handlers return Result[T] — they never raise. ExceptionTranslationBehavior converts unexpected exceptions to InfrastructureError before they escape the pipeline.
  • Commands and queries are plain Pydantic models; validation happens automatically before handle() is called.
  • The mediator resolves handlers lazily per send() call so the DI scope for that request is used correctly.

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

qx_cqrs-0.1.0.tar.gz (18.1 kB view details)

Uploaded Source

Built Distribution

If you're not sure about the file name format, learn more about wheel file names.

qx_cqrs-0.1.0-py3-none-any.whl (17.8 kB view details)

Uploaded Python 3

File details

Details for the file qx_cqrs-0.1.0.tar.gz.

File metadata

  • Download URL: qx_cqrs-0.1.0.tar.gz
  • Upload date:
  • Size: 18.1 kB
  • Tags: Source
  • Uploaded using Trusted Publishing? No
  • Uploaded via: twine/6.2.0 CPython/3.14.0

File hashes

Hashes for qx_cqrs-0.1.0.tar.gz
Algorithm Hash digest
SHA256 e6f2f2ed967a10298a4fea51831292a46ce257a8bb55f263033a1a0a1ad2f4c8
MD5 396808325392651837e312e5f8b89c74
BLAKE2b-256 780a4209a1448c2d563be5c633c4d3037cfd7d0642de028a806d0059f27ce858

See more details on using hashes here.

File details

Details for the file qx_cqrs-0.1.0-py3-none-any.whl.

File metadata

  • Download URL: qx_cqrs-0.1.0-py3-none-any.whl
  • Upload date:
  • Size: 17.8 kB
  • Tags: Python 3
  • Uploaded using Trusted Publishing? No
  • Uploaded via: twine/6.2.0 CPython/3.14.0

File hashes

Hashes for qx_cqrs-0.1.0-py3-none-any.whl
Algorithm Hash digest
SHA256 ab1e81e396f6b581d8b9526d5b32f17c07cfdc8afdc6ee550c692c381162d36f
MD5 8fe50ff5d7fda63cf6d24147261fb8f8
BLAKE2b-256 9dae6e0ca9152058e75a3da1836127b70301427a1fe07c4a5ad75118e46fb954

See more details on using hashes here.

Supported by

AWS Cloud computing and Security Sponsor Datadog Monitoring Depot Continuous Integration Fastly CDN Google Download Analytics Pingdom Monitoring Sentry Error logging StatusPage Status page