Skip to main content

Reusable, composable design pattern helpers for Django REST APIs

Project description

dmr-toolkit

Reusable, composable design pattern helpers for Django REST APIs.

dmr-toolkit gives Django API developers first-class, opinionated implementations of the most common architectural patterns — Repository, Specification, State Machine, Observer/EventBus, Plugin Registry, Lazy Loader, Strategy, Cache Layer, Component, Command, Unit of Work, and Event Sourcing — all wired together and ready to use.


Installation

pip install dmr-toolkit

For OpenTelemetry observability support:

pip install "dmr-toolkit[otel]"

Requirements: Python 3.10+, Django 4.2+


Quick Start

Add dmr_toolkit_patterns to your INSTALLED_APPS to enable Django migrations for audit and event-sourcing models:

INSTALLED_APPS = [
    ...
    "dmr_toolkit_patterns",
]

Run migrations:

python manage.py migrate

Optional configuration in settings.py:

DMR_TOOLKIT = {
    "patterns": {
        "audit": True,               # Enable audit logs for StateMachine + Command
        "cache_backend": "default",  # Django cache alias
        "event_bus": "default",
        "event_store_backend": "django",
        "plugin_group": "dmr_toolkit.plugins",
    }
}

Patterns

Repository

Decouples domain logic from data access. Swap storage backends in tests with zero friction.

from dmr_toolkit.patterns import Repository, DjangoRepository, NotFound

# Abstract base — implement your own
class OrderRepository(Repository[Order, int]):
    def get(self, id: int) -> Order: ...
    def list(self, spec=None) -> list[Order]: ...
    # ... add, update, remove, and async variants

# Concrete Django-backed implementation
class DjangoOrderRepository(DjangoRepository[Order, int]):
    model = Order  # binds to Order.objects manager

repo = DjangoOrderRepository()

try:
    order = repo.get(42)
except NotFound:
    print("Order not found")

orders = repo.list()          # all orders
await repo.aget(42)           # async variant
await repo.alist()            # async list

Specification

Composable, testable query predicates. Build complex filters from small units without raw ORM expressions in business logic.

from dmr_toolkit.patterns import Specification
from django.db.models import Q

class ActiveSpec(Specification):
    def is_satisfied_by(self, entity) -> bool:
        return entity.status == "active"

    def to_query(self) -> Q:
        return Q(status="active")

class PremiumSpec(Specification):
    def is_satisfied_by(self, entity) -> bool:
        return entity.tier == "premium"

    def to_query(self) -> Q:
        return Q(tier="premium")

# Compose with operators
active_premium = ActiveSpec() & PremiumSpec()
active_or_premium = ActiveSpec() | PremiumSpec()
not_active = ~ActiveSpec()

# Use with Repository
repo.list(active_premium)

# Serialise / deserialise
data = active_premium.to_dict()
restored = Specification.from_dict(data)

State Machine

Declarative entity lifecycle management. Transitions are validated, audited, and impossible to bypass accidentally.

from dmr_toolkit.patterns import StateMachine, Transition

class OrderMachine(StateMachine):
    class Meta:
        states = ["pending", "paid", "shipped", "cancelled"]
        state_field = "status"
        audit = True  # writes StateTransitionAudit records
        transitions = [
            Transition("pay",    source="pending",  target="paid"),
            Transition("ship",   source="paid",     target="shipped"),
            Transition("cancel", source="pending",  target="cancelled",
                       guards=[lambda entity, user: entity.can_cancel]),
        ]

machine = OrderMachine()
machine.trigger(order, "pay", user=request.user)   # updates order.status → "paid"
machine.trigger(order, "ship", user=request.user)  # → "shipped"

# Raises InvalidTransition if source state is wrong
# Raises GuardFailed if a guard returns False

print(machine.diagram())  # Mermaid stateDiagram-v2 string

Observer / EventBus

Typed publish/subscribe. Decouple domain events from side-effect handlers without Django signal boilerplate.

from dmr_toolkit.patterns import EventBus

bus = EventBus()

@bus.on("order.paid")
def send_receipt(payload):
    email_service.send(payload["email"], "Your receipt")

@bus.on("order.*")          # wildcard — matches any order.* event
async def log_order_event(payload):
    await audit_log.write(payload)

bus.emit("order.paid", {"email": "user@example.com", "amount": 99.99})
await bus.aemit("order.shipped", {"tracking": "ABC123"})

Fault isolation: if one handler raises, the rest still run. All errors are collected into a single EventDispatchError.


Plugin Registry

Runtime plugin discovery and management. Third-party packages extend behaviour without modifying core code.

from dmr_toolkit.patterns import Plugin, PluginRegistry

# Define the interface
class Plugin:
    def activate(self): ...
    def deactivate(self): ...

# Third-party plugin (declared in pyproject.toml entry points)
# [project.entry-points."dmr_toolkit.plugins"]
# my_plugin = "my_package:MyPlugin"

registry = PluginRegistry()
registry.load_entry_points("dmr_toolkit.plugins")  # auto-discovers installed plugins

# Manual management
registry.register("my_plugin", MyPlugin)
plugin_class = registry.get("my_plugin")
registry.unregister("my_plugin")

for meta in registry.list_plugins():
    print(meta.name, meta.version, meta.status)

Lazy Loader

Descriptor for deferred, cached resource initialisation. Expensive resources are initialised only on first access.

from dmr_toolkit.patterns import lazy

class MyService:
    # Factory called once on first access; result cached on the instance
    client = lazy(lambda: ExpensiveAPIClient())

    # With TTL — re-initialises after 60 seconds
    config = lazy(lambda: load_remote_config(), ttl=60.0)

svc = MyService()
svc.client   # factory called here
svc.client   # returns cached value — factory NOT called again

# Clear cache on demand
MyService.client.reset(svc)
svc.client   # factory called again

Thread-safe: the factory is called at most once per instance under concurrent access.


Strategy

Interchangeable algorithms selected at runtime. No conditional branching in business logic.

from dmr_toolkit.patterns import Strategy
from abc import abstractmethod

class PricingStrategy(Strategy):
    @abstractmethod
    def execute(self, order) -> float: ...

@PricingStrategy.register("standard")
class StandardPricing(PricingStrategy):
    def execute(self, order) -> float:
        return order.subtotal

@PricingStrategy.register("premium")
class PremiumPricing(PricingStrategy):
    def execute(self, order) -> float:
        return order.subtotal * 0.9  # 10% discount

# Resolve at runtime
strategy_cls = PricingStrategy.resolve("premium")
price = strategy_cls().execute(order)

# Raises StrategyNotFound with available names if not registered

Cache Layer

Transparent, key-managed caching with minimal boilerplate.

from dmr_toolkit.patterns import cached, cache_invalidate

@cached(ttl=300, key_prefix="orders:", vary_on=["user_id"])
def get_user_orders(user_id: int) -> list:
    return Order.objects.filter(user_id=user_id)

# Per-user cache isolation — different user_id → different cache entry
orders = get_user_orders(user_id=42)

@cache_invalidate("orders:*")
def create_order(user_id: int, data: dict) -> Order:
    return Order.objects.create(user_id=user_id, **data)

# Calling create_order() invalidates all "orders:*" cache keys
create_order(user_id=42, data={"item": "Widget"})

Component

Self-contained, reusable API response fragments that fetch their own data and compose into larger responses.

from dmr_toolkit.patterns import Component, DMRComponentMixin

class UserCard(Component):
    def render(self, context: dict) -> dict:
        user = User.objects.get(id=context["user_id"])
        return {"user": {"id": user.id, "name": user.name}}

class OrderSummary(Component):
    def render(self, context: dict) -> dict:
        orders = Order.objects.filter(user_id=context["user_id"])
        return {"order_count": orders.count()}

class ProfilePage(Component):
    children = [UserCard(), OrderSummary()]
    cache_ttl = 60  # cache rendered output for 60 seconds

    def render(self, context: dict) -> dict:
        return {"page": "profile"}

# Async rendering — children resolved concurrently
page = ProfilePage()
result = await page.arender({"user_id": 42})
# result = {"page": "profile", "user": {...}, "order_count": 5}

# In a DMR_Controller
class ProfileController(DMRComponentMixin):
    def get(self, request):
        return self.component_response(ProfilePage())

Command

Encapsulated, auditable, optionally reversible business operations.

from dmr_toolkit.patterns import Command

class CreateOrderCommand(Command[Order]):
    def __init__(self, user_id: int, items: list):
        self.user_id = user_id
        self.items = items
        self._created_order = None

    def execute(self) -> Order:
        self._created_order = Order.objects.create(
            user_id=self.user_id, items=self.items
        )
        return self._created_order

    def undo(self) -> None:
        if self._created_order:
            self._created_order.delete()

cmd = CreateOrderCommand(user_id=42, items=["Widget"])
order = cmd.execute()   # creates order, writes CommandAuditLog if audit=True
cmd.undo()              # deletes the order

# Async variants
order = await cmd.aexecute()
await cmd.aundo()

Unit of Work

Groups multiple Repository operations into a single atomic transaction with automatic rollback on failure.

from dmr_toolkit.patterns import UnitOfWork

uow = UnitOfWork()

with uow:
    order = order_repo.add(Order(user_id=42, total=99.99))
    inventory_repo.update(item)
    bus.emit("order.created", {"id": order.id})
    # All operations committed atomically on exit
    # EventBus events dispatched AFTER successful commit

# On exception — all operations rolled back, events suppressed
try:
    with uow:
        order_repo.add(bad_order)
        raise ValueError("something went wrong")
except ValueError:
    pass  # transaction rolled back, no events dispatched

# Async context manager
async with uow:
    await order_repo.aadd(order)

# Nested — inner block uses savepoints
with uow:
    order_repo.add(order)
    with uow:                    # inner savepoint
        inventory_repo.update(item)
        raise ValueError()       # rolls back inner savepoint only
    # outer transaction still intact

Event Store

Append-only domain event log for Event Sourcing and audit trails.

from dmr_toolkit.patterns import EventStore

store = EventStore()

# Append events
event = store.append(
    aggregate_id="order-42",
    aggregate_type="Order",
    event_type="OrderCreated",
    payload={"user_id": 42, "total": 99.99},
)

store.append("order-42", "Order", "OrderPaid", {"method": "card"})
store.append("order-42", "Order", "OrderShipped", {"tracking": "ABC"})

# Load and replay
events = store.load("order-42")
for e in events:
    print(f"seq={e.sequence} type={e.event_type}")

# Optimistic concurrency
store.append("order-42", "Order", "OrderCancelled", {}, expected_seq=3)
# Raises ConcurrencyConflict if current head != 3

# Projections
store.project("OrderPaid", lambda e: update_revenue_report(e.payload))

# Snapshots — skip replaying old events
store.save_snapshot("order-42", state={"status": "shipped"}, seq=3)
snapshot = store.load_snapshot("order-42")
events_after = store.load("order-42", after_seq=snapshot.sequence)

# Human-readable replay log
print(store.replay_log("order-42"))
# seq=1 type=OrderCreated
# seq=2 type=OrderPaid
# seq=3 type=OrderShipped

Exception Hierarchy

All exceptions carry structured context for easy debugging:

PatternError (base)
├── NotFound                  — Repository.get on missing entity
├── InvalidTransition         — StateMachine invalid source state
├── GuardFailed               — StateMachine guard returned False
├── DuplicatePlugin           — PluginRegistry duplicate name
├── StrategyNotFound          — Strategy.resolve unknown name
├── ConcurrencyConflict       — EventStore expected_seq mismatch
└── EventDispatchError        — Observer aggregate of handler errors
from dmr_toolkit.patterns.exceptions import NotFound, InvalidTransition

try:
    order = repo.get(999)
except NotFound as e:
    print(repr(e))  # NotFound(model='Order', id=999)

Test Helpers

dmr_toolkit.testing ships in-memory doubles for every pattern so your unit tests never touch the database:

from dmr_toolkit.testing import (
    InMemoryRepository,
    FakeEventStore,
    MockObserver,
    StrategyTestCase,
)

# InMemoryRepository — full CRUD, raises NotFound correctly
repo = InMemoryRepository()
entity = repo.add(MyEntity(id=1, name="test"))
assert repo.get(1) == entity

# FakeEventStore — full append/load/snapshot, no DB
store = FakeEventStore()
store.append("agg-1", "Order", "Created", {})
events = store.load("agg-1")

# MockObserver — records emitted events for assertion
mock_bus = MockObserver()
mock_bus.emit("order.paid", {"id": 42})
mock_bus.assert_emitted("order.paid", {"id": 42})

# StrategyTestCase — isolates strategy registry per test
class TestMyStrategy(StrategyTestCase):
    strategy_class = MyStrategy

    def test_resolve(self):
        # Registry is fresh for each test — no cross-test pollution
        MyStrategy.register("test")(ConcreteStrategy)
        assert MyStrategy.resolve("test") is ConcreteStrategy

OpenTelemetry Observability

Install the optional dependency:

pip install "dmr-toolkit[otel]"

When your application configures an OTel tracer provider, dmr-toolkit automatically emits spans for:

  • Repository queries (repository.get, repository.list, etc.)
  • Command executions (command.execute, command.undo)
  • State Machine transitions (state_machine.trigger)
  • Cache hits and misses (cache.hit, cache.miss, cache.invalidate)

No configuration needed — spans are no-ops when OTel is not installed or not configured.


Settings Reference

DMR_TOOLKIT = {
    "patterns": {
        # Enable audit logging for StateMachine transitions and Command executions
        # Writes StateTransitionAudit and CommandAuditLog records
        "audit": False,

        # Django cache alias used by @cached and Component.cache_ttl
        "cache_backend": "default",

        # EventBus backend: "default" (in-process) or "celery"
        "event_bus": "default",

        # EventStore backend: "django" (ORM) or "redis" (requires redis extra)
        "event_store_backend": "django",

        # Entry-point group for PluginRegistry.load_entry_points()
        "plugin_group": "dmr_toolkit.plugins",
    }
}

Django Migrations

The following models require migrations when using Django ORM persistence:

Model Purpose
StateTransitionAudit Audit log for StateMachine transitions
CommandAuditLog Audit log for Command executions
DomainEvent Immutable event records for Event Sourcing
Snapshot Aggregate snapshots for Event Sourcing
python manage.py migrate dmr_toolkit_patterns

License

MIT

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

dmr_toolkit-0.1.0.tar.gz (36.0 kB view details)

Uploaded Source

Built Distribution

If you're not sure about the file name format, learn more about wheel file names.

dmr_toolkit-0.1.0-py3-none-any.whl (46.8 kB view details)

Uploaded Python 3

File details

Details for the file dmr_toolkit-0.1.0.tar.gz.

File metadata

  • Download URL: dmr_toolkit-0.1.0.tar.gz
  • Upload date:
  • Size: 36.0 kB
  • Tags: Source
  • Uploaded using Trusted Publishing? No
  • Uploaded via: uv/0.10.6 {"installer":{"name":"uv","version":"0.10.6","subcommand":["publish"]},"python":null,"implementation":{"name":null,"version":null},"distro":null,"system":{"name":null,"release":null},"cpu":null,"openssl_version":null,"setuptools_version":null,"rustc_version":null,"ci":null}

File hashes

Hashes for dmr_toolkit-0.1.0.tar.gz
Algorithm Hash digest
SHA256 d654d33556113834c2f892fd34d4bfb62fcb6ff1f888cf7ad71202df494d3a3b
MD5 b1a72772f9051f3260eeaebc63047a5c
BLAKE2b-256 f7708dd9ccc29f4fd3faf6bd50c56b89e33670bbba641a994ebad362e010d723

See more details on using hashes here.

File details

Details for the file dmr_toolkit-0.1.0-py3-none-any.whl.

File metadata

  • Download URL: dmr_toolkit-0.1.0-py3-none-any.whl
  • Upload date:
  • Size: 46.8 kB
  • Tags: Python 3
  • Uploaded using Trusted Publishing? No
  • Uploaded via: uv/0.10.6 {"installer":{"name":"uv","version":"0.10.6","subcommand":["publish"]},"python":null,"implementation":{"name":null,"version":null},"distro":null,"system":{"name":null,"release":null},"cpu":null,"openssl_version":null,"setuptools_version":null,"rustc_version":null,"ci":null}

File hashes

Hashes for dmr_toolkit-0.1.0-py3-none-any.whl
Algorithm Hash digest
SHA256 48c3b609a007f55dbeaf6facbdeb32708fec5298229f18a4fc0daaae8e450a97
MD5 bacfeaeca4aa29c5ab42d7d8dc07b7d3
BLAKE2b-256 2f2c946761c28a7f4c91578a3cb74e53c09f7663632af4b2c8458db04a04ee11

See more details on using hashes here.

Supported by

AWS Cloud computing and Security Sponsor Datadog Monitoring Depot Continuous Integration Fastly CDN Google Download Analytics Pingdom Monitoring Sentry Error logging StatusPage Status page