Reusable, composable design pattern helpers for Django REST APIs
Project description
dmr-toolkit
Reusable, composable design pattern helpers for Django REST APIs.
dmr-toolkit gives Django API developers first-class, opinionated implementations of the most common architectural patterns — Repository, Specification, State Machine, Observer/EventBus, Plugin Registry, Lazy Loader, Strategy, Cache Layer, Component, Command, Unit of Work, and Event Sourcing — all wired together and ready to use.
Installation
pip install dmr-toolkit
For OpenTelemetry observability support:
pip install "dmr-toolkit[otel]"
Requirements: Python 3.10+, Django 4.2+
Quick Start
Add dmr_toolkit_patterns to your INSTALLED_APPS to enable Django migrations for audit and event-sourcing models:
INSTALLED_APPS = [
...
"dmr_toolkit_patterns",
]
Run migrations:
python manage.py migrate
Optional configuration in settings.py:
DMR_TOOLKIT = {
"patterns": {
"audit": True, # Enable audit logs for StateMachine + Command
"cache_backend": "default", # Django cache alias
"event_bus": "default",
"event_store_backend": "django",
"plugin_group": "dmr_toolkit.plugins",
}
}
Patterns
Repository
Decouples domain logic from data access. Swap storage backends in tests with zero friction.
from dmr_toolkit.patterns import Repository, DjangoRepository, NotFound
# Abstract base — implement your own
class OrderRepository(Repository[Order, int]):
def get(self, id: int) -> Order: ...
def list(self, spec=None) -> list[Order]: ...
# ... add, update, remove, and async variants
# Concrete Django-backed implementation
class DjangoOrderRepository(DjangoRepository[Order, int]):
model = Order # binds to Order.objects manager
repo = DjangoOrderRepository()
try:
order = repo.get(42)
except NotFound:
print("Order not found")
orders = repo.list() # all orders
await repo.aget(42) # async variant
await repo.alist() # async list
Specification
Composable, testable query predicates. Build complex filters from small units without raw ORM expressions in business logic.
from dmr_toolkit.patterns import Specification
from django.db.models import Q
class ActiveSpec(Specification):
def is_satisfied_by(self, entity) -> bool:
return entity.status == "active"
def to_query(self) -> Q:
return Q(status="active")
class PremiumSpec(Specification):
def is_satisfied_by(self, entity) -> bool:
return entity.tier == "premium"
def to_query(self) -> Q:
return Q(tier="premium")
# Compose with operators
active_premium = ActiveSpec() & PremiumSpec()
active_or_premium = ActiveSpec() | PremiumSpec()
not_active = ~ActiveSpec()
# Use with Repository
repo.list(active_premium)
# Serialise / deserialise
data = active_premium.to_dict()
restored = Specification.from_dict(data)
State Machine
Declarative entity lifecycle management. Transitions are validated, audited, and impossible to bypass accidentally.
from dmr_toolkit.patterns import StateMachine, Transition
class OrderMachine(StateMachine):
class Meta:
states = ["pending", "paid", "shipped", "cancelled"]
state_field = "status"
audit = True # writes StateTransitionAudit records
transitions = [
Transition("pay", source="pending", target="paid"),
Transition("ship", source="paid", target="shipped"),
Transition("cancel", source="pending", target="cancelled",
guards=[lambda entity, user: entity.can_cancel]),
]
machine = OrderMachine()
machine.trigger(order, "pay", user=request.user) # updates order.status → "paid"
machine.trigger(order, "ship", user=request.user) # → "shipped"
# Raises InvalidTransition if source state is wrong
# Raises GuardFailed if a guard returns False
print(machine.diagram()) # Mermaid stateDiagram-v2 string
Observer / EventBus
Typed publish/subscribe. Decouple domain events from side-effect handlers without Django signal boilerplate.
from dmr_toolkit.patterns import EventBus
bus = EventBus()
@bus.on("order.paid")
def send_receipt(payload):
email_service.send(payload["email"], "Your receipt")
@bus.on("order.*") # wildcard — matches any order.* event
async def log_order_event(payload):
await audit_log.write(payload)
bus.emit("order.paid", {"email": "user@example.com", "amount": 99.99})
await bus.aemit("order.shipped", {"tracking": "ABC123"})
Fault isolation: if one handler raises, the rest still run. All errors are collected into a single EventDispatchError.
Plugin Registry
Runtime plugin discovery and management. Third-party packages extend behaviour without modifying core code.
from dmr_toolkit.patterns import Plugin, PluginRegistry
# Define the interface
class Plugin:
def activate(self): ...
def deactivate(self): ...
# Third-party plugin (declared in pyproject.toml entry points)
# [project.entry-points."dmr_toolkit.plugins"]
# my_plugin = "my_package:MyPlugin"
registry = PluginRegistry()
registry.load_entry_points("dmr_toolkit.plugins") # auto-discovers installed plugins
# Manual management
registry.register("my_plugin", MyPlugin)
plugin_class = registry.get("my_plugin")
registry.unregister("my_plugin")
for meta in registry.list_plugins():
print(meta.name, meta.version, meta.status)
Lazy Loader
Descriptor for deferred, cached resource initialisation. Expensive resources are initialised only on first access.
from dmr_toolkit.patterns import lazy
class MyService:
# Factory called once on first access; result cached on the instance
client = lazy(lambda: ExpensiveAPIClient())
# With TTL — re-initialises after 60 seconds
config = lazy(lambda: load_remote_config(), ttl=60.0)
svc = MyService()
svc.client # factory called here
svc.client # returns cached value — factory NOT called again
# Clear cache on demand
MyService.client.reset(svc)
svc.client # factory called again
Thread-safe: the factory is called at most once per instance under concurrent access.
Strategy
Interchangeable algorithms selected at runtime. No conditional branching in business logic.
from dmr_toolkit.patterns import Strategy
from abc import abstractmethod
class PricingStrategy(Strategy):
@abstractmethod
def execute(self, order) -> float: ...
@PricingStrategy.register("standard")
class StandardPricing(PricingStrategy):
def execute(self, order) -> float:
return order.subtotal
@PricingStrategy.register("premium")
class PremiumPricing(PricingStrategy):
def execute(self, order) -> float:
return order.subtotal * 0.9 # 10% discount
# Resolve at runtime
strategy_cls = PricingStrategy.resolve("premium")
price = strategy_cls().execute(order)
# Raises StrategyNotFound with available names if not registered
Cache Layer
Transparent, key-managed caching with minimal boilerplate.
from dmr_toolkit.patterns import cached, cache_invalidate
@cached(ttl=300, key_prefix="orders:", vary_on=["user_id"])
def get_user_orders(user_id: int) -> list:
return Order.objects.filter(user_id=user_id)
# Per-user cache isolation — different user_id → different cache entry
orders = get_user_orders(user_id=42)
@cache_invalidate("orders:*")
def create_order(user_id: int, data: dict) -> Order:
return Order.objects.create(user_id=user_id, **data)
# Calling create_order() invalidates all "orders:*" cache keys
create_order(user_id=42, data={"item": "Widget"})
Component
Self-contained, reusable API response fragments that fetch their own data and compose into larger responses.
from dmr_toolkit.patterns import Component, DMRComponentMixin
class UserCard(Component):
def render(self, context: dict) -> dict:
user = User.objects.get(id=context["user_id"])
return {"user": {"id": user.id, "name": user.name}}
class OrderSummary(Component):
def render(self, context: dict) -> dict:
orders = Order.objects.filter(user_id=context["user_id"])
return {"order_count": orders.count()}
class ProfilePage(Component):
children = [UserCard(), OrderSummary()]
cache_ttl = 60 # cache rendered output for 60 seconds
def render(self, context: dict) -> dict:
return {"page": "profile"}
# Async rendering — children resolved concurrently
page = ProfilePage()
result = await page.arender({"user_id": 42})
# result = {"page": "profile", "user": {...}, "order_count": 5}
# In a DMR_Controller
class ProfileController(DMRComponentMixin):
def get(self, request):
return self.component_response(ProfilePage())
Command
Encapsulated, auditable, optionally reversible business operations.
from dmr_toolkit.patterns import Command
class CreateOrderCommand(Command[Order]):
def __init__(self, user_id: int, items: list):
self.user_id = user_id
self.items = items
self._created_order = None
def execute(self) -> Order:
self._created_order = Order.objects.create(
user_id=self.user_id, items=self.items
)
return self._created_order
def undo(self) -> None:
if self._created_order:
self._created_order.delete()
cmd = CreateOrderCommand(user_id=42, items=["Widget"])
order = cmd.execute() # creates order, writes CommandAuditLog if audit=True
cmd.undo() # deletes the order
# Async variants
order = await cmd.aexecute()
await cmd.aundo()
Unit of Work
Groups multiple Repository operations into a single atomic transaction with automatic rollback on failure.
from dmr_toolkit.patterns import UnitOfWork
uow = UnitOfWork()
with uow:
order = order_repo.add(Order(user_id=42, total=99.99))
inventory_repo.update(item)
bus.emit("order.created", {"id": order.id})
# All operations committed atomically on exit
# EventBus events dispatched AFTER successful commit
# On exception — all operations rolled back, events suppressed
try:
with uow:
order_repo.add(bad_order)
raise ValueError("something went wrong")
except ValueError:
pass # transaction rolled back, no events dispatched
# Async context manager
async with uow:
await order_repo.aadd(order)
# Nested — inner block uses savepoints
with uow:
order_repo.add(order)
with uow: # inner savepoint
inventory_repo.update(item)
raise ValueError() # rolls back inner savepoint only
# outer transaction still intact
Event Store
Append-only domain event log for Event Sourcing and audit trails.
from dmr_toolkit.patterns import EventStore
store = EventStore()
# Append events
event = store.append(
aggregate_id="order-42",
aggregate_type="Order",
event_type="OrderCreated",
payload={"user_id": 42, "total": 99.99},
)
store.append("order-42", "Order", "OrderPaid", {"method": "card"})
store.append("order-42", "Order", "OrderShipped", {"tracking": "ABC"})
# Load and replay
events = store.load("order-42")
for e in events:
print(f"seq={e.sequence} type={e.event_type}")
# Optimistic concurrency
store.append("order-42", "Order", "OrderCancelled", {}, expected_seq=3)
# Raises ConcurrencyConflict if current head != 3
# Projections
store.project("OrderPaid", lambda e: update_revenue_report(e.payload))
# Snapshots — skip replaying old events
store.save_snapshot("order-42", state={"status": "shipped"}, seq=3)
snapshot = store.load_snapshot("order-42")
events_after = store.load("order-42", after_seq=snapshot.sequence)
# Human-readable replay log
print(store.replay_log("order-42"))
# seq=1 type=OrderCreated
# seq=2 type=OrderPaid
# seq=3 type=OrderShipped
Exception Hierarchy
All exceptions carry structured context for easy debugging:
PatternError (base)
├── NotFound — Repository.get on missing entity
├── InvalidTransition — StateMachine invalid source state
├── GuardFailed — StateMachine guard returned False
├── DuplicatePlugin — PluginRegistry duplicate name
├── StrategyNotFound — Strategy.resolve unknown name
├── ConcurrencyConflict — EventStore expected_seq mismatch
└── EventDispatchError — Observer aggregate of handler errors
from dmr_toolkit.patterns.exceptions import NotFound, InvalidTransition
try:
order = repo.get(999)
except NotFound as e:
print(repr(e)) # NotFound(model='Order', id=999)
Test Helpers
dmr_toolkit.testing ships in-memory doubles for every pattern so your unit tests never touch the database:
from dmr_toolkit.testing import (
InMemoryRepository,
FakeEventStore,
MockObserver,
StrategyTestCase,
)
# InMemoryRepository — full CRUD, raises NotFound correctly
repo = InMemoryRepository()
entity = repo.add(MyEntity(id=1, name="test"))
assert repo.get(1) == entity
# FakeEventStore — full append/load/snapshot, no DB
store = FakeEventStore()
store.append("agg-1", "Order", "Created", {})
events = store.load("agg-1")
# MockObserver — records emitted events for assertion
mock_bus = MockObserver()
mock_bus.emit("order.paid", {"id": 42})
mock_bus.assert_emitted("order.paid", {"id": 42})
# StrategyTestCase — isolates strategy registry per test
class TestMyStrategy(StrategyTestCase):
strategy_class = MyStrategy
def test_resolve(self):
# Registry is fresh for each test — no cross-test pollution
MyStrategy.register("test")(ConcreteStrategy)
assert MyStrategy.resolve("test") is ConcreteStrategy
OpenTelemetry Observability
Install the optional dependency:
pip install "dmr-toolkit[otel]"
When your application configures an OTel tracer provider, dmr-toolkit automatically emits spans for:
- Repository queries (
repository.get,repository.list, etc.) - Command executions (
command.execute,command.undo) - State Machine transitions (
state_machine.trigger) - Cache hits and misses (
cache.hit,cache.miss,cache.invalidate)
No configuration needed — spans are no-ops when OTel is not installed or not configured.
Settings Reference
DMR_TOOLKIT = {
"patterns": {
# Enable audit logging for StateMachine transitions and Command executions
# Writes StateTransitionAudit and CommandAuditLog records
"audit": False,
# Django cache alias used by @cached and Component.cache_ttl
"cache_backend": "default",
# EventBus backend: "default" (in-process) or "celery"
"event_bus": "default",
# EventStore backend: "django" (ORM) or "redis" (requires redis extra)
"event_store_backend": "django",
# Entry-point group for PluginRegistry.load_entry_points()
"plugin_group": "dmr_toolkit.plugins",
}
}
Django Migrations
The following models require migrations when using Django ORM persistence:
| Model | Purpose |
|---|---|
StateTransitionAudit |
Audit log for StateMachine transitions |
CommandAuditLog |
Audit log for Command executions |
DomainEvent |
Immutable event records for Event Sourcing |
Snapshot |
Aggregate snapshots for Event Sourcing |
python manage.py migrate dmr_toolkit_patterns
License
MIT
Project details
Release history Release notifications | RSS feed
Download files
Download the file for your platform. If you're not sure which to choose, learn more about installing packages.
Source Distribution
Built Distribution
Filter files by name, interpreter, ABI, and platform.
If you're not sure about the file name format, learn more about wheel file names.
Copy a direct link to the current filters
File details
Details for the file dmr_toolkit-0.1.0.tar.gz.
File metadata
- Download URL: dmr_toolkit-0.1.0.tar.gz
- Upload date:
- Size: 36.0 kB
- Tags: Source
- Uploaded using Trusted Publishing? No
- Uploaded via: uv/0.10.6 {"installer":{"name":"uv","version":"0.10.6","subcommand":["publish"]},"python":null,"implementation":{"name":null,"version":null},"distro":null,"system":{"name":null,"release":null},"cpu":null,"openssl_version":null,"setuptools_version":null,"rustc_version":null,"ci":null}
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
d654d33556113834c2f892fd34d4bfb62fcb6ff1f888cf7ad71202df494d3a3b
|
|
| MD5 |
b1a72772f9051f3260eeaebc63047a5c
|
|
| BLAKE2b-256 |
f7708dd9ccc29f4fd3faf6bd50c56b89e33670bbba641a994ebad362e010d723
|
File details
Details for the file dmr_toolkit-0.1.0-py3-none-any.whl.
File metadata
- Download URL: dmr_toolkit-0.1.0-py3-none-any.whl
- Upload date:
- Size: 46.8 kB
- Tags: Python 3
- Uploaded using Trusted Publishing? No
- Uploaded via: uv/0.10.6 {"installer":{"name":"uv","version":"0.10.6","subcommand":["publish"]},"python":null,"implementation":{"name":null,"version":null},"distro":null,"system":{"name":null,"release":null},"cpu":null,"openssl_version":null,"setuptools_version":null,"rustc_version":null,"ci":null}
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
48c3b609a007f55dbeaf6facbdeb32708fec5298229f18a4fc0daaae8e450a97
|
|
| MD5 |
bacfeaeca4aa29c5ab42d7d8dc07b7d3
|
|
| BLAKE2b-256 |
2f2c946761c28a7f4c91578a3cb74e53c09f7663632af4b2c8458db04a04ee11
|