Domain modeling types aett event store
Project description
Æt (Aett) is an Event Store for Python
Aett Domain provide base classes for aggregate
and saga
as encapsulations of business rules and processes, respectively.
Usage
The Aggregate
class is abstract and a subtype aggregate would implement the external interfaces and internal behavior
and define which events are raised as a response input.
from aett.domain.Domain import Aggregate
from dataclasses import dataclass
from aett.eventstore.EventStream import EventStream, Memento, DomainEvent
import datetime
@dataclass(frozen=True, kw_only=True)
class SampleEvent(DomainEvent):
value: int
class ExampleAggregate(Aggregate[Memento]):
def __init__(self, event_stream: EventStream, memento: Memento = None):
self.value = 0
super().__init__(event_stream, memento)
def apply_memento(self, memento: Memento) -> None:
if self.id != memento.id:
raise ValueError("Memento id does not match aggregate id")
self.value = memento.payload
def get_memento(self) -> Memento:
"""
The memento is a snapshot of the aggregate state. It is used to rehydrate the aggregate.
It is backed by the Python __getstate__ and __setstate__ methods.
"""
return Memento(id=self.id, version=self.version, payload={'key': self.value})
def set_value(self, value: int) -> None:
self.raise_event(
SampleEvent(value=value, id=self.id, version=self.version,
timestamp=datetime.datetime.now(datetime.timezone.utc)))
def _apply(self, event: SampleEvent) -> None:
"""
The apply method is a convention named method to apply the event to the aggregate. It is called from the raise_event method using multiple dispatch
"""
self.value = event.value
The saga
class is likewise abstract and a subtype saga would implement the external interfaces and internal behavior,
similar to the aggregate
class.
from aett.domain.Domain import Saga
from aett.eventstore.EventStream import DomainEvent
from dataclasses import dataclass
@dataclass(frozen=True, kw_only=True)
class SampleEvent(DomainEvent):
value: int
class SampleSaga(Saga):
def _apply(self, event: SampleEvent) -> None:
pass
Project details
Release history Release notifications | RSS feed
Download files
Download the file for your platform. If you're not sure which to choose, learn more about installing packages.
Source Distribution
aett_domain-2.0.2.tar.gz
(8.7 kB
view details)
Built Distribution
File details
Details for the file aett_domain-2.0.2.tar.gz
.
File metadata
- Download URL: aett_domain-2.0.2.tar.gz
- Upload date:
- Size: 8.7 kB
- Tags: Source
- Uploaded using Trusted Publishing? No
- Uploaded via: twine/5.0.0 CPython/3.12.4
File hashes
Algorithm | Hash digest | |
---|---|---|
SHA256 | b786380eba90c667a7ef8ed1997fad267381c734d235c9f7f28d6e598ab9c935 |
|
MD5 | 2edb364e7165d334ed6d18bf905ae8ba |
|
BLAKE2b-256 | 2c8b7db8e16c37cc74007eff518bdfba03f9fa5af26a1710427ec5674217d3eb |
File details
Details for the file aett_domain-2.0.2-py3-none-any.whl
.
File metadata
- Download URL: aett_domain-2.0.2-py3-none-any.whl
- Upload date:
- Size: 7.6 kB
- Tags: Python 3
- Uploaded using Trusted Publishing? No
- Uploaded via: twine/5.0.0 CPython/3.12.4
File hashes
Algorithm | Hash digest | |
---|---|---|
SHA256 | e248e489518e934bdcaef9bf1e87caa546631c9c172f5c6749961729dbde7a06 |
|
MD5 | 33c065db0413fcb7840486c0ab137494 |
|
BLAKE2b-256 | 4a36f1290fc25fdba09554fad25764bbc8e124bf9430c3c7a27496e040cf096f |