Qx CQRS / Mediator: commands, queries, events, pipeline behaviors
Project description
qx-cqrs
CQRS and Mediator implementation for the Qx framework. Dispatches commands, queries, domain events, integration events, and notifications through a composable pipeline of behaviors.
What lives here
qx.cqrs.Mediator— central dispatcher. Resolves handlers via the DI container; supports three registration modes: decorator scan, explicitregister(), and type-based lookup.qx.cqrs.Command/Query— base Pydantic models for all messages.Command[TResult]mutates state;Query[TResult]is read-only.qx.cqrs.command_handler/query_handler/event_handler— decorators that bind a handler class to its message type.qx.cqrs.Behavior— single-method interface (handle(request, next)) for pipeline stages.- Built-in behaviors:
LoggingBehavior,TracingBehavior,MetricsBehavior,IdempotencyBehavior,ExceptionTranslationBehavior,TransactionBehavior,ValidationBehavior,RetryBehavior,AuthorizationBehavior. qx.cqrs.compose— assemble aBehaviorChainfrom an ordered list of behaviors.
Usage
from qx.cqrs import Command, Mediator, command_handler
from qx.core import Result
class CreateUserCommand(Command[UserDto]):
email: str
name: str
@command_handler(CreateUserCommand)
class CreateUserHandler:
def __init__(self, repo: UserRepository, uow: UnitOfWork) -> None: ...
async def handle(self, cmd: CreateUserCommand) -> Result[UserDto]:
result = User.register(cmd.email, cmd.name)
...
return Result.success(dto)
# Bootstrap
mediator = Mediator(container)
mediator.register_decorated("myapp.application")
# Dispatch (in a route or test)
result = await mediator.send(CreateUserCommand(email="a@b.com", name="Ada"), scope=scope)
Pipeline composition
from qx.cqrs import compose, LoggingBehavior, TracingBehavior, ExceptionTranslationBehavior
pipeline = compose(
LoggingBehavior(),
TracingBehavior(tracer),
ExceptionTranslationBehavior(),
)
mediator = Mediator(container, pipeline=pipeline)
Design rules
- Handlers return
Result[T]— they never raise.ExceptionTranslationBehaviorconverts unexpected exceptions toInfrastructureErrorbefore they escape the pipeline. - Commands and queries are plain Pydantic models; validation happens automatically before
handle()is called. - The mediator resolves handlers lazily per
send()call so the DI scope for that request is used correctly.
Project details
Download files
Download the file for your platform. If you're not sure which to choose, learn more about installing packages.
Source Distribution
qx_cqrs-1.0.0.tar.gz
(19.3 kB
view details)
Built Distribution
Filter files by name, interpreter, ABI, and platform.
If you're not sure about the file name format, learn more about wheel file names.
Copy a direct link to the current filters
qx_cqrs-1.0.0-py3-none-any.whl
(18.2 kB
view details)
File details
Details for the file qx_cqrs-1.0.0.tar.gz.
File metadata
- Download URL: qx_cqrs-1.0.0.tar.gz
- Upload date:
- Size: 19.3 kB
- Tags: Source
- Uploaded using Trusted Publishing? No
- Uploaded via: twine/6.1.0 CPython/3.13.12
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
58bd69800d32148bb4b4d89a46fdf27bf7d4e5e5fbf3541b52605e4998979a83
|
|
| MD5 |
646411fa397cbcddaf2a12fc4b603624
|
|
| BLAKE2b-256 |
9fe7662248676c6234583ab41120da258fc5a6f26fc5b2b184772036e0d76e8e
|
File details
Details for the file qx_cqrs-1.0.0-py3-none-any.whl.
File metadata
- Download URL: qx_cqrs-1.0.0-py3-none-any.whl
- Upload date:
- Size: 18.2 kB
- Tags: Python 3
- Uploaded using Trusted Publishing? No
- Uploaded via: twine/6.1.0 CPython/3.13.12
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
13f714ad7b064d8c8ef71a2e8a161e93209c2a3bc857a728d90f24f1ea538775
|
|
| MD5 |
804004946cbbd53bb44c26c2330b27b9
|
|
| BLAKE2b-256 |
b36d54381750b2db6b9696445a36c578d14577195e36cba26fbdbdac626861f2
|