Skip to main content

Python client library for Angzarr gRPC services

Project description

⚠️ Notice: This repository was recently extracted from the angzarr monorepo and has not yet been validated as a standalone project. Expect rough edges. See the Angzarr documentation for more information.


title: Python SDK sidebar_label: Python

angzarr-client

Python client library for Angzarr CQRS/ES framework.

:::tip Unified Documentation For cross-language API reference with side-by-side comparisons, see the SDK Documentation. :::

Installation

pip install angzarr-client

Client Usage

Contracts

</code></pre>
<blockquote>
<p>Source: <a href="../features/aggregate_client.feature"><code>aggregate_client.feature</code></a></p>
</blockquote>
<pre lang="gherkin"><code>

Source: query_client.feature

from angzarr_client import DomainClient

# Connect to a domain's aggregate coordinator
client = DomainClient("localhost:1310")

# Build and execute a command
response = client.command("order", root_id) \
    .with_command("CreateOrder", create_order_msg) \
    .execute()

# Query events
events = client.query("order", root_id) \
    .get_event_book()

Aggregate Implementation

Two approaches for implementing aggregates:

1. Rich Domain Model (Recommended)

Use Aggregate ABC with @handles decorator for OO-style aggregates:

from angzarr_client import Aggregate, handles
from angzarr_client.errors import CommandRejectedError

@dataclass
class _PlayerState:
    player_id: str = ""
    bankroll: int = 0

class Player(Aggregate[_PlayerState]):
    domain = "player"  # Required

    def _create_empty_state(self) -> _PlayerState:
        return _PlayerState()

    def _apply_event(self, state: _PlayerState, event_any) -> None:
        if event_any.type_url.endswith("PlayerRegistered"):
            event = PlayerRegistered()
            event_any.Unpack(event)
            state.player_id = event.player_id

    @handles(RegisterPlayer)
    def register(self, cmd: RegisterPlayer) -> PlayerRegistered:
        if self.exists:
            raise CommandRejectedError("Player already exists")
        return PlayerRegistered(player_id=cmd.player_id, ...)

    @handles(DepositFunds)
    def deposit(self, cmd: DepositFunds) -> FundsDeposited:
        ...

    @property
    def exists(self) -> bool:
        return bool(self._get_state().player_id)

Features:

  • @handles(CommandType) validates type hints at decoration time
  • Dispatch table built automatically at class definition
  • domain attribute required, enforced at class creation
  • Abstract methods _create_empty_state() and _apply_event() enforced

gRPC Server:

from angzarr_client import run_aggregate_server

run_aggregate_server(Player, "50303")

2. Function-Based (CommandRouter)

Use CommandRouter with standalone handler functions:

from angzarr_client import CommandRouter
from angzarr_client.proto.angzarr import types_pb2 as types

def rebuild_state(event_book: types.EventBook) -> PlayerState:
    state = PlayerState()
    if event_book:
        for page in event_book.pages:
            apply_event(state, page.event)
    return state

def handle_register(cb, cmd_any, state, seq) -> types.EventBook:
    cmd = RegisterPlayer()
    cmd_any.Unpack(cmd)
    if state.exists:
        raise CommandRejectedError("Player already exists")
    event = PlayerRegistered(player_id=cmd.player_id, ...)
    return pack_event(event, seq)

router = CommandRouter("player", rebuild_state) \
    .on("RegisterPlayer", handle_register) \
    .on("DepositFunds", handle_deposit)

gRPC Server:

from angzarr_client import run_aggregate_server

run_aggregate_server(router, "50303")

Comparison

Aspect Rich Domain Model Function-Based
Pattern OO, encapsulated Procedural, explicit
State Internal, lazy rebuild External, passed in
Commands Method per command Function per command
Validation @handles decorator Manual type unpacking
Topology Auto from domain + @handles Auto from CommandRouter.on()

Testing Aggregates

Both patterns support unit testing without infrastructure:

# Rich Domain Model
def test_register_creates_player():
    player = Player()  # Empty event book
    event = player.register(RegisterPlayer(player_id="alice"))
    assert event.player_id == "alice"
    assert player.exists

# With prior state (rehydration)
def test_deposit_increases_bankroll():
    event_book = build_event_book([PlayerRegistered(...)])
    player = Player(event_book)
    event = player.deposit(DepositFunds(amount=100))
    assert player.bankroll == 100

Error Handling

Contract

</code></pre>
<blockquote>
<p>Source: <a href="../features/error_handling.feature"><code>error_handling.feature</code></a></p>
</blockquote>
<pre lang="python"><code>from angzarr_client.errors import GRPCError, ConnectionError, ClientError

try:
    response = client.aggregate.handle(command)
except GRPCError as e:
    if e.is_not_found():
        # Aggregate doesn't exist
        pass
    elif e.is_precondition_failed():
        # Sequence mismatch (optimistic locking failure)
        pass
    elif e.is_invalid_argument():
        # Invalid command arguments
        pass
except ConnectionError as e:
    # Network/transport error
    pass

Speculative Execution

Test commands without persisting to the event store:

from angzarr_client import SpeculativeClient
from angzarr_client.proto.angzarr import SpeculateAggregateRequest

client = SpeculativeClient.connect("localhost:1310")

# Build speculative request with temporal state
request = SpeculateAggregateRequest(
    command=command_book,
    events=prior_events
)

# Execute without persistence
response = client.aggregate(request)

# Inspect projected events
for page in response.events.pages:
    print(f"Would produce: {page.event.type_url}")

client.close()

Client Types

Client Purpose
QueryClient Query events from aggregates
AggregateClient Send commands to aggregates
SpeculativeClient Dry-run commands, test projectors/sagas
DomainClient Combined query + aggregate for a domain
Client Full client with all capabilities

Error Types

Error Description
ClientError Base class for all errors
CommandRejectedError Business logic rejection
GRPCError gRPC transport failure (has introspection methods)
ConnectionError Connection failure
TransportError Transport-level failure
InvalidArgumentError Invalid input
InvalidTimestampError Timestamp parse failure

Saga/PM Implementation

Sagas

Stateless event translators (events → commands):

from angzarr_client import SagaRouter, SagaDomainHandler

class OrderFulfillmentHandler(SagaDomainHandler):
    def event_types(self) -> list[str]:
        return ["OrderCreated"]

    def handle(self, source: EventBook, event: Any, destinations: Destinations):
        # Translate event to command
        cmd = CreateReservation(order_id=event.order_id)
        seq = destinations.sequence_for("inventory")
        return SagaHandlerResponse(commands=[stamp(cmd, seq)])

router = SagaRouter("saga-order-inventory", "order", handler)

Process Managers

Stateful multi-domain orchestrators:

from angzarr_client import ProcessManagerRouter

class BuyInPmHandler(ProcessManagerDomainHandler):
    def event_types(self) -> list[str]:
        return ["BuyInRequested", "PlayerSeated", "SeatingRejected"]

    def prepare(self, trigger, state, event) -> list[Cover]:
        # Declare destinations needed
        return [Cover(domain="table", root=event.table_root)]

    def handle(self, trigger, state, event, destinations: Destinations):
        # Emit commands or facts
        seq = destinations.sequence_for("table")
        return ProcessManagerResponse(commands=[cmd], facts=[])

router = ProcessManagerRouter("pmg-buy-in", "pmg-buy-in", rebuild_state)
    .domain("player", BuyInPmHandler())
    .domain("table", BuyInPmHandler())

Saga/PM Design Philosophy

Sagas and PMs are coordinators, NOT decision makers.

Output When to Use
Commands (preferred) Normal flow - aggregate validates and decides
Facts Inject external data aggregate can't derive

Key Principles:

  1. Don't rebuild destination state - Use Destinations for sequences only
  2. Let aggregates decide - Business logic in aggregates, not coordinators
  3. Prefer commands with sync mode - Use SyncMode.SIMPLE for immediate feedback
  4. Use facts sparingly - Only for external data injection

License

BSD-3-Clause

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

angzarr_client-0.3.1.tar.gz (186.0 kB view details)

Uploaded Source

Built Distribution

If you're not sure about the file name format, learn more about wheel file names.

angzarr_client-0.3.1-py3-none-any.whl (155.5 kB view details)

Uploaded Python 3

File details

Details for the file angzarr_client-0.3.1.tar.gz.

File metadata

  • Download URL: angzarr_client-0.3.1.tar.gz
  • Upload date:
  • Size: 186.0 kB
  • Tags: Source
  • Uploaded using Trusted Publishing? No
  • Uploaded via: twine/6.1.0 CPython/3.13.7

File hashes

Hashes for angzarr_client-0.3.1.tar.gz
Algorithm Hash digest
SHA256 7925756d6decb407857d9f526a482f294e01d98409cafe3b266d6736663af488
MD5 79a1a81ae1947e0e4a78654df05c675c
BLAKE2b-256 526a04ba1d43d0f64c3d13c6b768be317a045bc80dfc3b386345b32a05b97990

See more details on using hashes here.

File details

Details for the file angzarr_client-0.3.1-py3-none-any.whl.

File metadata

  • Download URL: angzarr_client-0.3.1-py3-none-any.whl
  • Upload date:
  • Size: 155.5 kB
  • Tags: Python 3
  • Uploaded using Trusted Publishing? No
  • Uploaded via: twine/6.1.0 CPython/3.13.7

File hashes

Hashes for angzarr_client-0.3.1-py3-none-any.whl
Algorithm Hash digest
SHA256 c63db718ed44052671e12b56ea25172e8d1637bbf95a638aba677507e7062d70
MD5 64fb30674cd4557e0f1b37e8ef94e9a9
BLAKE2b-256 0ae33cbb82ddac01aff0499db164d78ee5333b791329db2d31b4d55a9a2c630b

See more details on using hashes here.

Supported by

AWS Cloud computing and Security Sponsor Datadog Monitoring Depot Continuous Integration Fastly CDN Google Download Analytics Pingdom Monitoring Sentry Error logging StatusPage Status page