Skip to main content

Python client library for Angzarr gRPC services

Project description

⚠️ Notice: This repository was recently extracted from the angzarr monorepo and has not yet been validated as a standalone project. Expect rough edges. See the Angzarr documentation for more information.


title: Python SDK sidebar_label: Python

angzarr-client

Python client library for Angzarr CQRS/ES framework.

:::tip Unified Documentation For cross-language API reference with side-by-side comparisons, see the SDK Documentation. :::

Installation

pip install angzarr-client

Client Usage

Contracts

</code></pre>
<blockquote>
<p>Source: <a href="../features/aggregate_client.feature"><code>aggregate_client.feature</code></a></p>
</blockquote>
<pre lang="gherkin"><code>

Source: query_client.feature

from angzarr_client import DomainClient

# Connect to a domain's aggregate coordinator
client = DomainClient("localhost:1310")

# Build and execute a command
response = client.command("order", root_id) \
    .with_command("CreateOrder", create_order_msg) \
    .execute()

# Query events
events = client.query("order", root_id) \
    .get_event_book()

Aggregate Implementation

Two approaches for implementing aggregates:

1. Rich Domain Model (Recommended)

Use Aggregate ABC with @handles decorator for OO-style aggregates:

from angzarr_client import Aggregate, handles
from angzarr_client.errors import CommandRejectedError

@dataclass
class _PlayerState:
    player_id: str = ""
    bankroll: int = 0

class Player(Aggregate[_PlayerState]):
    domain = "player"  # Required

    def _create_empty_state(self) -> _PlayerState:
        return _PlayerState()

    def _apply_event(self, state: _PlayerState, event_any) -> None:
        if event_any.type_url.endswith("PlayerRegistered"):
            event = PlayerRegistered()
            event_any.Unpack(event)
            state.player_id = event.player_id

    @handles(RegisterPlayer)
    def register(self, cmd: RegisterPlayer) -> PlayerRegistered:
        if self.exists:
            raise CommandRejectedError("Player already exists")
        return PlayerRegistered(player_id=cmd.player_id, ...)

    @handles(DepositFunds)
    def deposit(self, cmd: DepositFunds) -> FundsDeposited:
        ...

    @property
    def exists(self) -> bool:
        return bool(self._get_state().player_id)

Features:

  • @handles(CommandType) validates type hints at decoration time
  • Dispatch table built automatically at class definition
  • domain attribute required, enforced at class creation
  • Abstract methods _create_empty_state() and _apply_event() enforced

gRPC Server:

from angzarr_client import run_aggregate_server

run_aggregate_server(Player, "50303")

2. Function-Based (CommandRouter)

Use CommandRouter with standalone handler functions:

from angzarr_client import CommandRouter
from angzarr_client.proto.angzarr import types_pb2 as types

def rebuild_state(event_book: types.EventBook) -> PlayerState:
    state = PlayerState()
    if event_book:
        for page in event_book.pages:
            apply_event(state, page.event)
    return state

def handle_register(cb, cmd_any, state, seq) -> types.EventBook:
    cmd = RegisterPlayer()
    cmd_any.Unpack(cmd)
    if state.exists:
        raise CommandRejectedError("Player already exists")
    event = PlayerRegistered(player_id=cmd.player_id, ...)
    return pack_event(event, seq)

router = CommandRouter("player", rebuild_state) \
    .on("RegisterPlayer", handle_register) \
    .on("DepositFunds", handle_deposit)

gRPC Server:

from angzarr_client import run_aggregate_server

run_aggregate_server(router, "50303")

Comparison

Aspect Rich Domain Model Function-Based
Pattern OO, encapsulated Procedural, explicit
State Internal, lazy rebuild External, passed in
Commands Method per command Function per command
Validation @handles decorator Manual type unpacking
Topology Auto from domain + @handles Auto from CommandRouter.on()

Testing Aggregates

Both patterns support unit testing without infrastructure:

# Rich Domain Model
def test_register_creates_player():
    player = Player()  # Empty event book
    event = player.register(RegisterPlayer(player_id="alice"))
    assert event.player_id == "alice"
    assert player.exists

# With prior state (rehydration)
def test_deposit_increases_bankroll():
    event_book = build_event_book([PlayerRegistered(...)])
    player = Player(event_book)
    event = player.deposit(DepositFunds(amount=100))
    assert player.bankroll == 100

Error Handling

Contract

</code></pre>
<blockquote>
<p>Source: <a href="../features/error_handling.feature"><code>error_handling.feature</code></a></p>
</blockquote>
<pre lang="python"><code>from angzarr_client.errors import GRPCError, ConnectionError, ClientError

try:
    response = client.aggregate.handle(command)
except GRPCError as e:
    if e.is_not_found():
        # Aggregate doesn't exist
        pass
    elif e.is_precondition_failed():
        # Sequence mismatch (optimistic locking failure)
        pass
    elif e.is_invalid_argument():
        # Invalid command arguments
        pass
except ConnectionError as e:
    # Network/transport error
    pass

Speculative Execution

Test commands without persisting to the event store:

from angzarr_client import SpeculativeClient
from angzarr_client.proto.angzarr import SpeculateAggregateRequest

client = SpeculativeClient.connect("localhost:1310")

# Build speculative request with temporal state
request = SpeculateAggregateRequest(
    command=command_book,
    events=prior_events
)

# Execute without persistence
response = client.aggregate(request)

# Inspect projected events
for page in response.events.pages:
    print(f"Would produce: {page.event.type_url}")

client.close()

Client Types

Client Purpose
QueryClient Query events from aggregates
AggregateClient Send commands to aggregates
SpeculativeClient Dry-run commands, test projectors/sagas
DomainClient Combined query + aggregate for a domain
Client Full client with all capabilities

Error Types

Error Description
ClientError Base class for all errors
CommandRejectedError Business logic rejection
GRPCError gRPC transport failure (has introspection methods)
ConnectionError Connection failure
TransportError Transport-level failure
InvalidArgumentError Invalid input
InvalidTimestampError Timestamp parse failure

License

BSD-3-Clause

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

angzarr_client-0.2.2.tar.gz (171.6 kB view details)

Uploaded Source

Built Distribution

If you're not sure about the file name format, learn more about wheel file names.

angzarr_client-0.2.2-py3-none-any.whl (139.8 kB view details)

Uploaded Python 3

File details

Details for the file angzarr_client-0.2.2.tar.gz.

File metadata

  • Download URL: angzarr_client-0.2.2.tar.gz
  • Upload date:
  • Size: 171.6 kB
  • Tags: Source
  • Uploaded using Trusted Publishing? No
  • Uploaded via: twine/6.1.0 CPython/3.13.7

File hashes

Hashes for angzarr_client-0.2.2.tar.gz
Algorithm Hash digest
SHA256 c323314297b5944457e47ad7ce242a55997bb3feafb8960f72fbf40f28a25c22
MD5 ddb2cf9d696e9bf2685146721a5b9656
BLAKE2b-256 df547aed6fe232bb73f1fca9fe7b212bc54bad906b3225ae6a7f303907cfee22

See more details on using hashes here.

File details

Details for the file angzarr_client-0.2.2-py3-none-any.whl.

File metadata

  • Download URL: angzarr_client-0.2.2-py3-none-any.whl
  • Upload date:
  • Size: 139.8 kB
  • Tags: Python 3
  • Uploaded using Trusted Publishing? No
  • Uploaded via: twine/6.1.0 CPython/3.13.7

File hashes

Hashes for angzarr_client-0.2.2-py3-none-any.whl
Algorithm Hash digest
SHA256 188a4e7d2fbcb036757823392224b7572b29867fb7fecbe0069b7c02d27aa000
MD5 fe07d7157623c3b7b1e306e7fd112460
BLAKE2b-256 09e0a6d9f9f68e32d136838cb62aa5332e715c0ba48dfc5b67a38e15108692ef

See more details on using hashes here.

Supported by

AWS Cloud computing and Security Sponsor Datadog Monitoring Depot Continuous Integration Fastly CDN Google Download Analytics Pingdom Monitoring Sentry Error logging StatusPage Status page