Skip to main content

Python client library for Angzarr gRPC services

Project description

⚠️ Notice: This repository was recently extracted from the angzarr monorepo and has not yet been validated as a standalone project. Expect rough edges. See the Angzarr documentation for more information.


title: Python SDK sidebar_label: Python

angzarr-client

Python client library for Angzarr CQRS/ES framework.

:::tip Unified Documentation For cross-language API reference with side-by-side comparisons, see the SDK Documentation. :::

Installation

pip install angzarr-client

Client Usage

Contracts

</code></pre>
<blockquote>
<p>Source: <a href="../features/aggregate_client.feature"><code>aggregate_client.feature</code></a></p>
</blockquote>
<pre lang="gherkin"><code>

Source: query_client.feature

from angzarr_client import DomainClient

# Connect to a domain's aggregate coordinator
client = DomainClient("localhost:1310")

# Build and execute a command
response = client.command("order", root_id) \
    .with_command("CreateOrder", create_order_msg) \
    .execute()

# Query events
events = client.query("order", root_id) \
    .get_event_book()

Aggregate Implementation

Two approaches for implementing aggregates:

1. Rich Domain Model (Recommended)

Use Aggregate ABC with @handles decorator for OO-style aggregates:

from angzarr_client import Aggregate, handles
from angzarr_client.errors import CommandRejectedError

@dataclass
class _PlayerState:
    player_id: str = ""
    bankroll: int = 0

class Player(Aggregate[_PlayerState]):
    domain = "player"  # Required

    def _create_empty_state(self) -> _PlayerState:
        return _PlayerState()

    def _apply_event(self, state: _PlayerState, event_any) -> None:
        if event_any.type_url.endswith("PlayerRegistered"):
            event = PlayerRegistered()
            event_any.Unpack(event)
            state.player_id = event.player_id

    @handles(RegisterPlayer)
    def register(self, cmd: RegisterPlayer) -> PlayerRegistered:
        if self.exists:
            raise CommandRejectedError("Player already exists")
        return PlayerRegistered(player_id=cmd.player_id, ...)

    @handles(DepositFunds)
    def deposit(self, cmd: DepositFunds) -> FundsDeposited:
        ...

    @property
    def exists(self) -> bool:
        return bool(self._get_state().player_id)

Features:

  • @handles(CommandType) validates type hints at decoration time
  • Dispatch table built automatically at class definition
  • domain attribute required, enforced at class creation
  • Abstract methods _create_empty_state() and _apply_event() enforced

gRPC Server:

from angzarr_client import run_aggregate_server

run_aggregate_server(Player, "50303")

2. Function-Based (CommandRouter)

Use CommandRouter with standalone handler functions:

from angzarr_client import CommandRouter
from angzarr_client.proto.angzarr import types_pb2 as types

def rebuild_state(event_book: types.EventBook) -> PlayerState:
    state = PlayerState()
    if event_book:
        for page in event_book.pages:
            apply_event(state, page.event)
    return state

def handle_register(cb, cmd_any, state, seq) -> types.EventBook:
    cmd = RegisterPlayer()
    cmd_any.Unpack(cmd)
    if state.exists:
        raise CommandRejectedError("Player already exists")
    event = PlayerRegistered(player_id=cmd.player_id, ...)
    return pack_event(event, seq)

router = CommandRouter("player", rebuild_state) \
    .on("RegisterPlayer", handle_register) \
    .on("DepositFunds", handle_deposit)

gRPC Server:

from angzarr_client import run_aggregate_server

run_aggregate_server(router, "50303")

Comparison

Aspect Rich Domain Model Function-Based
Pattern OO, encapsulated Procedural, explicit
State Internal, lazy rebuild External, passed in
Commands Method per command Function per command
Validation @handles decorator Manual type unpacking
Topology Auto from domain + @handles Auto from CommandRouter.on()

Testing Aggregates

Both patterns support unit testing without infrastructure:

# Rich Domain Model
def test_register_creates_player():
    player = Player()  # Empty event book
    event = player.register(RegisterPlayer(player_id="alice"))
    assert event.player_id == "alice"
    assert player.exists

# With prior state (rehydration)
def test_deposit_increases_bankroll():
    event_book = build_event_book([PlayerRegistered(...)])
    player = Player(event_book)
    event = player.deposit(DepositFunds(amount=100))
    assert player.bankroll == 100

Error Handling

Contract

</code></pre>
<blockquote>
<p>Source: <a href="../features/error_handling.feature"><code>error_handling.feature</code></a></p>
</blockquote>
<pre lang="python"><code>from angzarr_client.errors import GRPCError, ConnectionError, ClientError

try:
    response = client.aggregate.handle(command)
except GRPCError as e:
    if e.is_not_found():
        # Aggregate doesn't exist
        pass
    elif e.is_precondition_failed():
        # Sequence mismatch (optimistic locking failure)
        pass
    elif e.is_invalid_argument():
        # Invalid command arguments
        pass
except ConnectionError as e:
    # Network/transport error
    pass

Speculative Execution

Test commands without persisting to the event store:

from angzarr_client import SpeculativeClient
from angzarr_client.proto.angzarr import SpeculateAggregateRequest

client = SpeculativeClient.connect("localhost:1310")

# Build speculative request with temporal state
request = SpeculateAggregateRequest(
    command=command_book,
    events=prior_events
)

# Execute without persistence
response = client.aggregate(request)

# Inspect projected events
for page in response.events.pages:
    print(f"Would produce: {page.event.type_url}")

client.close()

Client Types

Client Purpose
QueryClient Query events from aggregates
AggregateClient Send commands to aggregates
SpeculativeClient Dry-run commands, test projectors/sagas
DomainClient Combined query + aggregate for a domain
Client Full client with all capabilities

Error Types

Error Description
ClientError Base class for all errors
CommandRejectedError Business logic rejection
GRPCError gRPC transport failure (has introspection methods)
ConnectionError Connection failure
TransportError Transport-level failure
InvalidArgumentError Invalid input
InvalidTimestampError Timestamp parse failure

License

BSD-3-Clause

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

angzarr_client-0.2.0.tar.gz (145.0 kB view details)

Uploaded Source

Built Distribution

If you're not sure about the file name format, learn more about wheel file names.

angzarr_client-0.2.0-py3-none-any.whl (80.2 kB view details)

Uploaded Python 3

File details

Details for the file angzarr_client-0.2.0.tar.gz.

File metadata

  • Download URL: angzarr_client-0.2.0.tar.gz
  • Upload date:
  • Size: 145.0 kB
  • Tags: Source
  • Uploaded using Trusted Publishing? No
  • Uploaded via: twine/6.1.0 CPython/3.13.7

File hashes

Hashes for angzarr_client-0.2.0.tar.gz
Algorithm Hash digest
SHA256 8515cc63f8161cf751327425b13d51758a783322f4e5b37d42f5cd683a0f4b01
MD5 da848d7967fb9a4cf9523f5dd8eda9e4
BLAKE2b-256 7a0fd3137a1dc004e05d88522015e1be0ea00e499d5a054936f6ea9e64cb47b1

See more details on using hashes here.

File details

Details for the file angzarr_client-0.2.0-py3-none-any.whl.

File metadata

  • Download URL: angzarr_client-0.2.0-py3-none-any.whl
  • Upload date:
  • Size: 80.2 kB
  • Tags: Python 3
  • Uploaded using Trusted Publishing? No
  • Uploaded via: twine/6.1.0 CPython/3.13.7

File hashes

Hashes for angzarr_client-0.2.0-py3-none-any.whl
Algorithm Hash digest
SHA256 6cfde2d9ea0febc5dee9005737f6f1b53e7747452f0fd90ec80d2174fa30a79c
MD5 1fff2a4110d76cee065aba04ffaa5f31
BLAKE2b-256 d9a1ed57ac3a3bf91fe2e366a1ca060d37e70f8da75b4ea584c9a7fb087a63f4

See more details on using hashes here.

Supported by

AWS Cloud computing and Security Sponsor Datadog Monitoring Depot Continuous Integration Fastly CDN Google Download Analytics Pingdom Monitoring Sentry Error logging StatusPage Status page