Skip to main content

angzarr-client

Python client library for Angzarr CQRS/ES framework.

Unified Documentation

For cross-language API reference with side-by-side comparisons, see the SDK Documentation.

Installation

pip install angzarr-client

Client Usage

Contracts

Feature: AggregateClient - Command Execution
The AggregateClient sends commands to aggregates for processing.
Commands are validated, processed, and result in events being persisted.
Supports async (fire-and-forget), sync, and speculative modes.

Without command execution, the system cannot accept user actions or
change aggregate state.

Source: aggregate_client.feature

Feature: QueryClient - Event Retrieval
The QueryClient provides read access to aggregate event histories.
It supports various query modes: full history, range queries, temporal
queries, and correlation-based queries across aggregates.

Without query access, clients cannot reconstruct aggregate state,
catch up projectors, or debug event flows.

Source: query_client.feature

from angzarr_client import DomainClient

# Connect to a domain's aggregate coordinator
client = DomainClient("localhost:1310")

# Build and execute a command
response = client.command("order", root_id) \
.with_command("CreateOrder", create_order_msg) \
.execute()

# Query events
events = client.query("order", root_id) \
.get_event_book()

Aggregate Implementation

Two approaches for implementing aggregates:

Use Aggregate ABC with @handles decorator for OO-style aggregates:

from angzarr_client import Aggregate, handles
from angzarr_client.errors import CommandRejectedError

@dataclass
class _PlayerState:
player_id: str = ""
bankroll: int = 0

class Player(Aggregate[_PlayerState]):
domain = "player" # Required

def _create_empty_state(self) -> _PlayerState:
return _PlayerState()

def _apply_event(self, state: _PlayerState, event_any) -> None:
if event_any.type_url.endswith("PlayerRegistered"):
event = PlayerRegistered()
event_any.Unpack(event)
state.player_id = event.player_id

@handles(RegisterPlayer)
def register(self, cmd: RegisterPlayer) -> PlayerRegistered:
if self.exists:
raise CommandRejectedError("Player already exists")
return PlayerRegistered(player_id=cmd.player_id, ...)

@handles(DepositFunds)
def deposit(self, cmd: DepositFunds) -> FundsDeposited:
...

@property
def exists(self) -> bool:
return bool(self._get_state().player_id)

Features:

  • @handles(CommandType) validates type hints at decoration time
  • Dispatch table built automatically at class definition
  • domain attribute required, enforced at class creation
  • Abstract methods _create_empty_state() and _apply_event() enforced

gRPC Server:

from angzarr_client import run_aggregate_server

run_aggregate_server(Player, "50303")

2. Function-Based (CommandRouter)

Use CommandRouter with standalone handler functions:

from angzarr_client import CommandRouter
from angzarr_client.proto.angzarr import types_pb2 as types

def rebuild_state(event_book: types.EventBook) -> PlayerState:
state = PlayerState()
if event_book:
for page in event_book.pages:
apply_event(state, page.event)
return state

def handle_register(cb, cmd_any, state, seq) -> types.EventBook:
cmd = RegisterPlayer()
cmd_any.Unpack(cmd)
if state.exists:
raise CommandRejectedError("Player already exists")
event = PlayerRegistered(player_id=cmd.player_id, ...)
return pack_event(event, seq)

router = CommandRouter("player", rebuild_state) \
.on("RegisterPlayer", handle_register) \
.on("DepositFunds", handle_deposit)

gRPC Server:

from angzarr_client import run_aggregate_server

run_aggregate_server(router, "50303")

Comparison

AspectRich Domain ModelFunction-Based
PatternOO, encapsulatedProcedural, explicit
StateInternal, lazy rebuildExternal, passed in
CommandsMethod per commandFunction per command
Validation@handles decoratorManual type unpacking
TopologyAuto from domain + @handlesAuto from CommandRouter.on()

Testing Aggregates

Both patterns support unit testing without infrastructure:

# Rich Domain Model
def test_register_creates_player():
player = Player() # Empty event book
event = player.register(RegisterPlayer(player_id="alice"))
assert event.player_id == "alice"
assert player.exists

# With prior state (rehydration)
def test_deposit_increases_bankroll():
event_book = build_event_book([PlayerRegistered(...)])
player = Player(event_book)
event = player.deposit(DepositFunds(amount=100))
assert player.bankroll == 100

Error Handling

Contract

Feature: Error Handling - Client Error Introspection
Client errors provide structured information for retry logic,
user feedback, and debugging. Errors are categorized by type
(connection, validation, business rule) with introspection methods.

Proper error handling enables:
- Automatic retry on transient failures
- User-friendly error messages
- Optimistic concurrency conflict resolution
- Debugging and logging

Source: error_handling.feature

from angzarr_client.errors import GRPCError, ConnectionError, ClientError

try:
response = client.aggregate.handle(command)
except GRPCError as e:
if e.is_not_found():
# Aggregate doesn't exist
pass
elif e.is_precondition_failed():
# Sequence mismatch (optimistic locking failure)
pass
elif e.is_invalid_argument():
# Invalid command arguments
pass
except ConnectionError as e:
# Network/transport error
pass

Speculative Execution

Test commands without persisting to the event store:

from angzarr_client import SpeculativeClient
from angzarr_client.proto.angzarr import SpeculateAggregateRequest

client = SpeculativeClient.connect("localhost:1310")

# Build speculative request with temporal state
request = SpeculateAggregateRequest(
command=command_book,
events=prior_events
)

# Execute without persistence
response = client.aggregate(request)

# Inspect projected events
for page in response.events.pages:
print(f"Would produce: {page.event.type_url}")

client.close()

Client Types

ClientPurpose
QueryClientQuery events from aggregates
AggregateClientSend commands to aggregates
SpeculativeClientDry-run commands, test projectors/sagas
DomainClientCombined query + aggregate for a domain
ClientFull client with all capabilities

Error Types

ErrorDescription
ClientErrorBase class for all errors
CommandRejectedErrorBusiness logic rejection
GRPCErrorgRPC transport failure (has introspection methods)
ConnectionErrorConnection failure
TransportErrorTransport-level failure
InvalidArgumentErrorInvalid input
InvalidTimestampErrorTimestamp parse failure

License

AGPL-3.0-only