Skip to main content

Python client library for Angzarr gRPC services

Project description

⚠️ Notice: This repository was recently extracted from the angzarr monorepo and has not yet been validated as a standalone project. Expect rough edges. See the Angzarr documentation for more information.


title: Python SDK sidebar_label: Python

angzarr-client

Python client library for Angzarr CQRS/ES framework.

:::tip Unified Documentation For cross-language API reference with side-by-side comparisons, see the SDK Documentation. :::

Installation

pip install angzarr-client

Client Usage

Contracts

</code></pre>
<blockquote>
<p>Source: <a href="../features/aggregate_client.feature"><code>aggregate_client.feature</code></a></p>
</blockquote>
<pre lang="gherkin"><code>

Source: query_client.feature

from angzarr_client import DomainClient

# Connect to a domain's aggregate coordinator
client = DomainClient("localhost:1310")

# Build and execute a command
response = client.command("order", root_id) \
    .with_command("CreateOrder", create_order_msg) \
    .execute()

# Query events
events = client.query("order", root_id) \
    .get_event_book()

Aggregate Implementation

Two approaches for implementing aggregates:

1. Rich Domain Model (Recommended)

Use Aggregate ABC with @handles decorator for OO-style aggregates:

from angzarr_client import Aggregate, handles
from angzarr_client.errors import CommandRejectedError

@dataclass
class _PlayerState:
    player_id: str = ""
    bankroll: int = 0

class Player(Aggregate[_PlayerState]):
    domain = "player"  # Required

    def _create_empty_state(self) -> _PlayerState:
        return _PlayerState()

    def _apply_event(self, state: _PlayerState, event_any) -> None:
        if event_any.type_url.endswith("PlayerRegistered"):
            event = PlayerRegistered()
            event_any.Unpack(event)
            state.player_id = event.player_id

    @handles(RegisterPlayer)
    def register(self, cmd: RegisterPlayer) -> PlayerRegistered:
        if self.exists:
            raise CommandRejectedError("Player already exists")
        return PlayerRegistered(player_id=cmd.player_id, ...)

    @handles(DepositFunds)
    def deposit(self, cmd: DepositFunds) -> FundsDeposited:
        ...

    @property
    def exists(self) -> bool:
        return bool(self._get_state().player_id)

Features:

  • @handles(CommandType) validates type hints at decoration time
  • Dispatch table built automatically at class definition
  • domain attribute required, enforced at class creation
  • Abstract methods _create_empty_state() and _apply_event() enforced

gRPC Server:

from angzarr_client import run_aggregate_server

run_aggregate_server(Player, "50303")

2. Function-Based (CommandRouter)

Use CommandRouter with standalone handler functions:

from angzarr_client import CommandRouter
from angzarr_client.proto.angzarr import types_pb2 as types

def rebuild_state(event_book: types.EventBook) -> PlayerState:
    state = PlayerState()
    if event_book:
        for page in event_book.pages:
            apply_event(state, page.event)
    return state

def handle_register(cb, cmd_any, state, seq) -> types.EventBook:
    cmd = RegisterPlayer()
    cmd_any.Unpack(cmd)
    if state.exists:
        raise CommandRejectedError("Player already exists")
    event = PlayerRegistered(player_id=cmd.player_id, ...)
    return pack_event(event, seq)

router = CommandRouter("player", rebuild_state) \
    .on("RegisterPlayer", handle_register) \
    .on("DepositFunds", handle_deposit)

gRPC Server:

from angzarr_client import run_aggregate_server

run_aggregate_server(router, "50303")

Comparison

Aspect Rich Domain Model Function-Based
Pattern OO, encapsulated Procedural, explicit
State Internal, lazy rebuild External, passed in
Commands Method per command Function per command
Validation @handles decorator Manual type unpacking
Topology Auto from domain + @handles Auto from CommandRouter.on()

Testing Aggregates

Both patterns support unit testing without infrastructure:

# Rich Domain Model
def test_register_creates_player():
    player = Player()  # Empty event book
    event = player.register(RegisterPlayer(player_id="alice"))
    assert event.player_id == "alice"
    assert player.exists

# With prior state (rehydration)
def test_deposit_increases_bankroll():
    event_book = build_event_book([PlayerRegistered(...)])
    player = Player(event_book)
    event = player.deposit(DepositFunds(amount=100))
    assert player.bankroll == 100

Error Handling

Contract

</code></pre>
<blockquote>
<p>Source: <a href="../features/error_handling.feature"><code>error_handling.feature</code></a></p>
</blockquote>
<pre lang="python"><code>from angzarr_client.errors import GRPCError, ConnectionError, ClientError

try:
    response = client.aggregate.handle(command)
except GRPCError as e:
    if e.is_not_found():
        # Aggregate doesn't exist
        pass
    elif e.is_precondition_failed():
        # Sequence mismatch (optimistic locking failure)
        pass
    elif e.is_invalid_argument():
        # Invalid command arguments
        pass
except ConnectionError as e:
    # Network/transport error
    pass

Speculative Execution

Test commands without persisting to the event store:

from angzarr_client import SpeculativeClient
from angzarr_client.proto.angzarr import SpeculateAggregateRequest

client = SpeculativeClient.connect("localhost:1310")

# Build speculative request with temporal state
request = SpeculateAggregateRequest(
    command=command_book,
    events=prior_events
)

# Execute without persistence
response = client.aggregate(request)

# Inspect projected events
for page in response.events.pages:
    print(f"Would produce: {page.event.type_url}")

client.close()

Client Types

Client Purpose
QueryClient Query events from aggregates
AggregateClient Send commands to aggregates
SpeculativeClient Dry-run commands, test projectors/sagas
DomainClient Combined query + aggregate for a domain
Client Full client with all capabilities

Error Types

Error Description
ClientError Base class for all errors
CommandRejectedError Business logic rejection
GRPCError gRPC transport failure (has introspection methods)
ConnectionError Connection failure
TransportError Transport-level failure
InvalidArgumentError Invalid input
InvalidTimestampError Timestamp parse failure

License

BSD-3-Clause

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

angzarr_client-0.2.1.tar.gz (145.0 kB view details)

Uploaded Source

Built Distribution

If you're not sure about the file name format, learn more about wheel file names.

angzarr_client-0.2.1-py3-none-any.whl (80.2 kB view details)

Uploaded Python 3

File details

Details for the file angzarr_client-0.2.1.tar.gz.

File metadata

  • Download URL: angzarr_client-0.2.1.tar.gz
  • Upload date:
  • Size: 145.0 kB
  • Tags: Source
  • Uploaded using Trusted Publishing? No
  • Uploaded via: twine/6.1.0 CPython/3.13.7

File hashes

Hashes for angzarr_client-0.2.1.tar.gz
Algorithm Hash digest
SHA256 2a0b4856f47890a71edf35d54536e344d3d1ba705ea8edf7cf237e2581cb392e
MD5 04783549c97d1667a842f99bf777a8a5
BLAKE2b-256 d51a51bee0d5a9d4e4e9063cff5e2c09e051b459f77ecd537c4806f7beb71193

See more details on using hashes here.

File details

Details for the file angzarr_client-0.2.1-py3-none-any.whl.

File metadata

  • Download URL: angzarr_client-0.2.1-py3-none-any.whl
  • Upload date:
  • Size: 80.2 kB
  • Tags: Python 3
  • Uploaded using Trusted Publishing? No
  • Uploaded via: twine/6.1.0 CPython/3.13.7

File hashes

Hashes for angzarr_client-0.2.1-py3-none-any.whl
Algorithm Hash digest
SHA256 9d31c42a232ff6183fd5b7751365daf497e4bf097614d871c8e0a2383e779d38
MD5 52488343d3a1ea4c82af26604c5caab7
BLAKE2b-256 26c5ac624a6a89b32793a1ad3604319f3f0031f64245d84cace742c2624a001c

See more details on using hashes here.

Supported by

AWS Cloud computing and Security Sponsor Datadog Monitoring Depot Continuous Integration Fastly CDN Google Download Analytics Pingdom Monitoring Sentry Error logging StatusPage Status page