Python client library for Angzarr gRPC services
Project description
⚠️ Notice: This repository was recently extracted from the angzarr monorepo and has not yet been validated as a standalone project. Expect rough edges. See the Angzarr documentation for more information.
title: Python SDK sidebar_label: Python
angzarr-client
Python client library for Angzarr CQRS/ES framework.
:::tip Unified Documentation For cross-language API reference with side-by-side comparisons, see the SDK Documentation. :::
Installation
pip install angzarr-client
Client Usage
Contracts
</code></pre>
<blockquote>
<p>Source: <a href="../features/aggregate_client.feature"><code>aggregate_client.feature</code></a></p>
</blockquote>
<pre lang="gherkin"><code>
Source:
query_client.feature
from angzarr_client import DomainClient
# Connect to a domain's aggregate coordinator
client = DomainClient("localhost:1310")
# Build and execute a command
response = client.command("order", root_id) \
.with_command("CreateOrder", create_order_msg) \
.execute()
# Query events
events = client.query("order", root_id) \
.get_event_book()
Aggregate Implementation
Two approaches for implementing aggregates:
1. Rich Domain Model (Recommended)
Use Aggregate ABC with @handles decorator for OO-style aggregates:
from angzarr_client import Aggregate, handles
from angzarr_client.errors import CommandRejectedError
@dataclass
class _PlayerState:
player_id: str = ""
bankroll: int = 0
class Player(Aggregate[_PlayerState]):
domain = "player" # Required
def _create_empty_state(self) -> _PlayerState:
return _PlayerState()
def _apply_event(self, state: _PlayerState, event_any) -> None:
if event_any.type_url.endswith("PlayerRegistered"):
event = PlayerRegistered()
event_any.Unpack(event)
state.player_id = event.player_id
@handles(RegisterPlayer)
def register(self, cmd: RegisterPlayer) -> PlayerRegistered:
if self.exists:
raise CommandRejectedError("Player already exists")
return PlayerRegistered(player_id=cmd.player_id, ...)
@handles(DepositFunds)
def deposit(self, cmd: DepositFunds) -> FundsDeposited:
...
@property
def exists(self) -> bool:
return bool(self._get_state().player_id)
Features:
@handles(CommandType)validates type hints at decoration time- Dispatch table built automatically at class definition
domainattribute required, enforced at class creation- Abstract methods
_create_empty_state()and_apply_event()enforced
gRPC Server:
from angzarr_client import run_aggregate_server
run_aggregate_server(Player, "50303")
2. Function-Based (CommandRouter)
Use CommandRouter with standalone handler functions:
from angzarr_client import CommandRouter
from angzarr_client.proto.angzarr import types_pb2 as types
def rebuild_state(event_book: types.EventBook) -> PlayerState:
state = PlayerState()
if event_book:
for page in event_book.pages:
apply_event(state, page.event)
return state
def handle_register(cb, cmd_any, state, seq) -> types.EventBook:
cmd = RegisterPlayer()
cmd_any.Unpack(cmd)
if state.exists:
raise CommandRejectedError("Player already exists")
event = PlayerRegistered(player_id=cmd.player_id, ...)
return pack_event(event, seq)
router = CommandRouter("player", rebuild_state) \
.on("RegisterPlayer", handle_register) \
.on("DepositFunds", handle_deposit)
gRPC Server:
from angzarr_client import run_aggregate_server
run_aggregate_server(router, "50303")
Comparison
| Aspect | Rich Domain Model | Function-Based |
|---|---|---|
| Pattern | OO, encapsulated | Procedural, explicit |
| State | Internal, lazy rebuild | External, passed in |
| Commands | Method per command | Function per command |
| Validation | @handles decorator |
Manual type unpacking |
| Topology | Auto from domain + @handles |
Auto from CommandRouter.on() |
Testing Aggregates
Both patterns support unit testing without infrastructure:
# Rich Domain Model
def test_register_creates_player():
player = Player() # Empty event book
event = player.register(RegisterPlayer(player_id="alice"))
assert event.player_id == "alice"
assert player.exists
# With prior state (rehydration)
def test_deposit_increases_bankroll():
event_book = build_event_book([PlayerRegistered(...)])
player = Player(event_book)
event = player.deposit(DepositFunds(amount=100))
assert player.bankroll == 100
Error Handling
Contract
</code></pre>
<blockquote>
<p>Source: <a href="../features/error_handling.feature"><code>error_handling.feature</code></a></p>
</blockquote>
<pre lang="python"><code>from angzarr_client.errors import GRPCError, ConnectionError, ClientError
try:
response = client.aggregate.handle(command)
except GRPCError as e:
if e.is_not_found():
# Aggregate doesn't exist
pass
elif e.is_precondition_failed():
# Sequence mismatch (optimistic locking failure)
pass
elif e.is_invalid_argument():
# Invalid command arguments
pass
except ConnectionError as e:
# Network/transport error
pass
Speculative Execution
Test commands without persisting to the event store:
from angzarr_client import SpeculativeClient
from angzarr_client.proto.angzarr import SpeculateAggregateRequest
client = SpeculativeClient.connect("localhost:1310")
# Build speculative request with temporal state
request = SpeculateAggregateRequest(
command=command_book,
events=prior_events
)
# Execute without persistence
response = client.aggregate(request)
# Inspect projected events
for page in response.events.pages:
print(f"Would produce: {page.event.type_url}")
client.close()
Client Types
| Client | Purpose |
|---|---|
QueryClient |
Query events from aggregates |
AggregateClient |
Send commands to aggregates |
SpeculativeClient |
Dry-run commands, test projectors/sagas |
DomainClient |
Combined query + aggregate for a domain |
Client |
Full client with all capabilities |
Error Types
| Error | Description |
|---|---|
ClientError |
Base class for all errors |
CommandRejectedError |
Business logic rejection |
GRPCError |
gRPC transport failure (has introspection methods) |
ConnectionError |
Connection failure |
TransportError |
Transport-level failure |
InvalidArgumentError |
Invalid input |
InvalidTimestampError |
Timestamp parse failure |
Saga/PM Implementation
Sagas
Stateless event translators (events → commands):
from angzarr_client import SagaRouter, SagaDomainHandler
class OrderFulfillmentHandler(SagaDomainHandler):
def event_types(self) -> list[str]:
return ["OrderCreated"]
def handle(self, source: EventBook, event: Any, destinations: Destinations):
# Translate event to command
cmd = CreateReservation(order_id=event.order_id)
seq = destinations.sequence_for("inventory")
return SagaHandlerResponse(commands=[stamp(cmd, seq)])
router = SagaRouter("saga-order-inventory", "order", handler)
Process Managers
Stateful multi-domain orchestrators:
from angzarr_client import ProcessManagerRouter
class BuyInPmHandler(ProcessManagerDomainHandler):
def event_types(self) -> list[str]:
return ["BuyInRequested", "PlayerSeated", "SeatingRejected"]
def prepare(self, trigger, state, event) -> list[Cover]:
# Declare destinations needed
return [Cover(domain="table", root=event.table_root)]
def handle(self, trigger, state, event, destinations: Destinations):
# Emit commands or facts
seq = destinations.sequence_for("table")
return ProcessManagerResponse(commands=[cmd], facts=[])
router = ProcessManagerRouter("pmg-buy-in", "pmg-buy-in", rebuild_state)
.domain("player", BuyInPmHandler())
.domain("table", BuyInPmHandler())
Saga/PM Design Philosophy
Sagas and PMs are coordinators, NOT decision makers.
| Output | When to Use |
|---|---|
| Commands (preferred) | Normal flow - aggregate validates and decides |
| Facts | Inject external data aggregate can't derive |
Key Principles:
- Don't rebuild destination state - Use
Destinationsfor sequences only - Let aggregates decide - Business logic in aggregates, not coordinators
- Prefer commands with sync mode - Use
SyncMode.SIMPLEfor immediate feedback - Use facts sparingly - Only for external data injection
License
BSD-3-Clause
Project details
Release history Release notifications | RSS feed
Download files
Download the file for your platform. If you're not sure which to choose, learn more about installing packages.
Source Distribution
Built Distribution
Filter files by name, interpreter, ABI, and platform.
If you're not sure about the file name format, learn more about wheel file names.
Copy a direct link to the current filters
File details
Details for the file angzarr_client-0.3.1.tar.gz.
File metadata
- Download URL: angzarr_client-0.3.1.tar.gz
- Upload date:
- Size: 186.0 kB
- Tags: Source
- Uploaded using Trusted Publishing? No
- Uploaded via: twine/6.1.0 CPython/3.13.7
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
7925756d6decb407857d9f526a482f294e01d98409cafe3b266d6736663af488
|
|
| MD5 |
79a1a81ae1947e0e4a78654df05c675c
|
|
| BLAKE2b-256 |
526a04ba1d43d0f64c3d13c6b768be317a045bc80dfc3b386345b32a05b97990
|
File details
Details for the file angzarr_client-0.3.1-py3-none-any.whl.
File metadata
- Download URL: angzarr_client-0.3.1-py3-none-any.whl
- Upload date:
- Size: 155.5 kB
- Tags: Python 3
- Uploaded using Trusted Publishing? No
- Uploaded via: twine/6.1.0 CPython/3.13.7
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
c63db718ed44052671e12b56ea25172e8d1637bbf95a638aba677507e7062d70
|
|
| MD5 |
64fb30674cd4557e0f1b37e8ef94e9a9
|
|
| BLAKE2b-256 |
0ae33cbb82ddac01aff0499db164d78ee5333b791329db2d31b4d55a9a2c630b
|