Skip to main content

A small Python library for defining domain-driven design aggregates and domain events.

Project description

Aggregate Root

Aggregate Root is a Python library that provides a base implementation for aggregate roots and domain events in a Domain-Driven Design (DDD) context. This library simplifies the creation and management of aggregate roots and domain events, enabling developers to focus on business logic and domain rules.

Installation

You can install the aggregate-root library from PyPI using pip:

pip install aggregate-root

Usage

AggregateRoot and DomainEvent

AggregateRoot is the base class for aggregate roots, and DomainEvent is the base class for domain events. Together, they enable the creation and management of Domain-Driven Design (DDD) aggregates.

Example

import re
from dataclasses import dataclass
from datetime import datetime, timedelta
from typing import Optional

from aggregate_root import AggregateRoot, DomainEvent

# Domain Events (should be idempotent, past-tense verbs)
# It's not necessary to include the aggregate ID in the event payload
# as an aggregate's ID is not permitted to change over the course of
# its life. When persisting state or publishing the events, the aggregate
# ID should be obtained from the aggregate instance itself.


# In this example, a design choice was made to exclude initial state from
# the BookCreated event, in favour of producing multiple events in the
# Book aggregate's factory method.

@dataclass(frozen=True)
class BookCreated(DomainEvent):
    pass


@dataclass(frozen=True)
class BookTitleUpdated(DomainEvent):
    title: str


@dataclass(frozen=True)
class BookAuthorUpdated(DomainEvent):
    author: str


@dataclass(frozen=True)
class BookYearPublishedUpdated(DomainEvent):
    year_published: int


@dataclass(frozen=True)
class CopyAdded(DomainEvent):
    barcode: str
    date: datetime


@dataclass(frozen=True)
class CopyRemoved(DomainEvent):
    barcode: str
    date: datetime


@dataclass(frozen=True)
class CopyBorrowed(DomainEvent):
    barcode: str
    date: datetime
    borrower_id: str
    due_at: datetime


@dataclass(frozen=True)
class CopyReturned(DomainEvent):
    barcode: str
    date: datetime


# The Book aggregate is defined by a Book AggregateRoot that holds reference
# to zero or more BookCopy instances. The idea being, the root entity describes
# the common publication information and the BookCopy holds information 
# pertaining to a specific physical instance. This makes sense within the
# inventory bounded context of a library's inventory management subdomain.

class Book(AggregateRoot):
    def __init__(self, isbn: str):
        super().__init__(id=isbn)
        self._copies: list[BookCopy] = []
        self._title: str = ""
        self._author: str = ""
        self._year_published: int = 0

    @property
    def title(self):
        return self._title

    @property
    def author(self):
        return self._author

    @property
    def year_published(self):
        return self._year_published

    @property
    def copies(self):
        return self._copies

    @classmethod
    def create(cls, isbn: str, title: str, author: str, year_published: int) -> "Book":
        """
        Factory method to create a new book.
        """
        if not re.match(r"^\d{13}$", isbn):
            raise ValueError("ISBN must be 13 digits")
        if not title:
            raise ValueError("Title cannot be empty")
        if not author:
            raise ValueError("Author cannot be empty")
        if year_published < 0 or year_published > datetime.now().year:
            raise ValueError("Invalid year published")

        instance = cls(isbn)

        # Produce multiple events on create.
        # By not including initial state in the BookCreated event, we allow for each piece of
        # data in the aggregate to be represented by a single event type only. This may be
        # helpful to external event consumers (like read model updaters, or other microservices).
        instance.produce_events(
            BookCreated(),
            BookTitleUpdated(title),
            BookAuthorUpdated(author),
            BookYearPublishedUpdated(year_published),
        )
        return instance

    # Methods decorated with @AggregateRoot.produces_events should not directly affect state.
    # This is where domain invariants and business rules should be validated before returning
    # one or more DomainEvent subclass instances.

    @AggregateRoot.produces_events
    def add_copy(self, barcode: str) -> CopyAdded:
        """
        Add a copy of the book
        """
        if not barcode:
            raise ValueError("Barcode cannot be empty")
        if any(copy.barcode == barcode for copy in self._copies):
            raise ValueError("Copy already exists")
        return CopyAdded(barcode, datetime.now())

    @AggregateRoot.produces_events
    def update_title(self, title: str) -> BookTitleUpdated:
        """
        Update the title of the book
        """
        if not title:
            raise ValueError("Title cannot be empty")
        return BookTitleUpdated(title)

    @AggregateRoot.produces_events
    def update_author(self, author: str) -> BookAuthorUpdated:
        """
        Update the author of the book
        """
        if not author:
            raise ValueError("Author cannot be empty")
        return BookAuthorUpdated(author)

    @AggregateRoot.produces_events
    def update_year_published(self, year_published: int) -> BookYearPublishedUpdated:
        """
        Update the year published of the book
        """
        if year_published < 0 or year_published > datetime.now().year:
            raise ValueError("Invalid year published")
        return BookYearPublishedUpdated(year_published)

    @AggregateRoot.produces_events
    def remove_copy(self, barcode: str) -> CopyRemoved:
        """
        Remove a copy of the book
        """
        for copy in self._copies:
            if copy.barcode == barcode:
                return CopyRemoved(barcode, datetime.now())
        raise ValueError("Copy not found")

    @AggregateRoot.produces_events
    def borrow_copy(self, barcode: str, borrower_id: str) -> CopyBorrowed:
        """
        Borrow a copy of the book
        """
        for copy in self._copies:
            if copy.barcode == barcode:
                if copy.borrowed:
                    raise ValueError("Copy is already borrowed")
                now = datetime.now()
                due_date = now + timedelta(days=14)
                return CopyBorrowed(barcode, now, borrower_id, due_date)
        raise ValueError("Copy not found")

    @AggregateRoot.produces_events
    def return_copy(self, barcode: str) -> CopyReturned:
        """
        Return a copy of the book
        """
        for copy in self._copies:
            if copy.barcode == barcode:
                if not copy.borrowed:
                    raise ValueError("Copy is not borrowed")
                return CopyReturned(barcode, datetime.now())
        raise ValueError("Copy not found")


    # Methods decorated with @AggregateRoot.handles_events are responsible for updating aggregate
    # state. They must not raise any exceptions. This allows for backwards compatibility in
    # event-sourced systems that rehydrate aggregates from an event store using 
    # AggregateRoot.apply_event() as opposed to setting state from a simple CRUD data model.
    # I.e., if business logic changes over time, we do not want historical events failing to be
    # applied when loading an aggregate that was previously updated using old business logic.

    @AggregateRoot.handles_events(BookTitleUpdated)
    def _handle_book_title_updated(self, event: BookTitleUpdated):
        self._title = event.title

    @AggregateRoot.handles_events(BookAuthorUpdated)
    def _handle_book_author_updated(self, event: BookAuthorUpdated):
        self._author = event.author

    @AggregateRoot.handles_events(BookYearPublishedUpdated)
    def _handle_book_year_published_updated(self, event: BookYearPublishedUpdated):
        self._year_published = event.year_published

    @AggregateRoot.handles_events(CopyAdded)
    def _handle_copy_added(self, event: CopyAdded):
        self._copies.append(BookCopy(event.barcode))

    @AggregateRoot.handles_events(CopyRemoved)
    def _handle_copy_removed(self, event: CopyRemoved):
        self._copies = [copy for copy in self._copies if copy.barcode != event.barcode]

    @AggregateRoot.handles_events(CopyBorrowed)
    def _handle_copy_borrowed(self, event: CopyBorrowed):
        for copy in self._copies:
            if copy.barcode == event.barcode:
                copy.check_out(event.borrower_id)

    @AggregateRoot.handles_events(CopyReturned)
    def _handle_copy_returned(self, event: CopyReturned):
        for copy in self._copies:
            if copy.barcode == event.barcode:
                copy.check_in()


# BookCopy is part of the Book aggregate, but is not a subclass of AggregateRoot.
# this is because in DDD, all interactions with a domain aggregate must come through the root.
# I.e., The Book AggregateRoot is responsible for managing it's BookCopy instances.
# A domain aggregate is a transactional boundary. Whenever an interface wishes to execute a
# command that impacts a book copy, it must load the entire Book aggregate and subsequently
# save the entire aggregate in a single transaction.
class BookCopy:
    def __init__(self, barcode: str):
        self._barcode: str = barcode
        self.borrower_id: Optional[str] = None

    def check_out(self, borrower_id: str):
        self.borrower_id = borrower_id

    def check_in(self):
        self.borrower_id = None

    @property
    def borrowed(self):
        return self.borrower_id is not None

    @property
    def barcode(self):
        return self._barcode


# example usage:
# In real-world applications, interactions with the domain would come through application
# services and persistence would be abstracted via repositories.

# Use the factory method to create a new book record
book = Book.create("9781617294549", "Microservices Patterns", "Chris Richardson", 2019)

# Add some physical copies
book.add_copy("BARCODE_1")
book.add_copy("BARCODE_2")

# Borrow a copy
book.borrow_copy("BARCODE_1", "USER_1")

# Return a copy
book.return_copy("BARCODE_1")

# Remove a copy from the shelves
book.remove_copy("BARCODE_1")

# In a repository, obtain the pending events for saving
events = book.pending_events
# ... save the events or use the events to update the data model
book.clear_events()

# Or, to automatically clear the pending events after saving.
with book.flush() as events:
    # ... save the events or use the events to update the data model

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

aggregate_root-0.0.3.tar.gz (9.7 kB view details)

Uploaded Source

Built Distribution

aggregate_root-0.0.3-py3-none-any.whl (9.7 kB view details)

Uploaded Python 3

File details

Details for the file aggregate_root-0.0.3.tar.gz.

File metadata

  • Download URL: aggregate_root-0.0.3.tar.gz
  • Upload date:
  • Size: 9.7 kB
  • Tags: Source
  • Uploaded using Trusted Publishing? No
  • Uploaded via: twine/5.1.0 CPython/3.12.1

File hashes

Hashes for aggregate_root-0.0.3.tar.gz
Algorithm Hash digest
SHA256 b4abd3640b2f885eca31755ed63d1109454326c0f5258de2ecef4e0b98750ef4
MD5 372568b4680c548f3f6a0bf2475ecfcd
BLAKE2b-256 b52a8f10267330d8c5bfeeb3e131bf85fc62bab30842b236a4f1e936f36b392b

See more details on using hashes here.

File details

Details for the file aggregate_root-0.0.3-py3-none-any.whl.

File metadata

File hashes

Hashes for aggregate_root-0.0.3-py3-none-any.whl
Algorithm Hash digest
SHA256 a33a76f93cb8281b41ec7c0cd496523f4dbf16104b9ea7c68d86113ddc9c2040
MD5 3061c1bb8b7ba359844f38d927aff315
BLAKE2b-256 3482f6d9a8c1524745e194c352fd10cbabdec71e08d31d6e83d5c3bb79ed67b2

See more details on using hashes here.

Supported by

AWS AWS Cloud computing and Security Sponsor Datadog Datadog Monitoring Fastly Fastly CDN Google Google Download Analytics Microsoft Microsoft PSF Sponsor Pingdom Pingdom Monitoring Sentry Sentry Error logging StatusPage StatusPage Status page