Skip to content

Aggregate Examples

Real handler examples from the poker domain. All code is from the actual examples/ directory.

All aggregates in all six languages use the OO pattern: a class derived from the client library’s CommandHandler<State> with @handles for command dispatch and @applies for event-to-state application (naming adapted per language idiom).


@handles(CommandType) on a method registers it as the handler for that command. The method returns the event; the framework persists it and advances state.

@handles(table_proto.CreateTable)
def handle_create_table(
self,
cmd: table_proto.CreateTable,
state: _TableState | None = None,
seq: int | None = None,
) -> table_proto.TableCreated:
"""Create a new table."""
router_mode = state is not None
saved = self._router_bind(state) if router_mode else None
try:
if self.exists:
raise CommandRejectedError("Table already exists")
if not cmd.table_name:
raise CommandRejectedError("table_name is required")
if cmd.small_blind <= 0:
raise CommandRejectedError.invalid_argument(
"small_blind must be positive"
)
if cmd.big_blind <= 0 or cmd.big_blind < cmd.small_blind:
raise CommandRejectedError("big_blind must be >= small_blind")
if cmd.min_buy_in <= 0:
raise CommandRejectedError.invalid_argument(
"min_buy_in must be positive"
)
if cmd.max_buy_in < cmd.min_buy_in:
raise CommandRejectedError("max_buy_in must be >= min_buy_in")
if cmd.max_players < 2 or cmd.max_players > 10:
raise CommandRejectedError("max_players must be 2-10")
event = table_proto.TableCreated(
table_name=cmd.table_name,
game_variant=cmd.game_variant,
small_blind=cmd.small_blind,
big_blind=cmd.big_blind,
min_buy_in=cmd.min_buy_in,
max_buy_in=cmd.max_buy_in,
max_players=cmd.max_players,
action_timeout_seconds=cmd.action_timeout_seconds or 30,
created_at=now(),
)
if not router_mode:
self._emit(event)
return event
finally:
if router_mode:
self._state = saved

The Player aggregate demonstrates two-phase reservation. Funds are reserved before joining a table, then released if the join fails.

def handle_reserve_funds(
cmd: player.ReserveFunds, state: PlayerState
) -> player.FundsReserved:
"""Reserve funds for a table buy-in."""
reserve_funds_guard(state)
amount = reserve_funds_validate(cmd, state)
return reserve_funds_compute(cmd, state, amount)

@applies(EventType) on a method registers it as the reducer for that event. State is rebuilt by replaying the event stream; each applier mutates a fresh state instance in order.

@handles(table_proto.CreateTable)
def handle_create_table(
self,
cmd: table_proto.CreateTable,
state: _TableState | None = None,
seq: int | None = None,
) -> table_proto.TableCreated:
"""Create a new table."""
router_mode = state is not None
saved = self._router_bind(state) if router_mode else None
try:
if self.exists:
raise CommandRejectedError("Table already exists")
if not cmd.table_name:
raise CommandRejectedError("table_name is required")
if cmd.small_blind <= 0:
raise CommandRejectedError.invalid_argument(
"small_blind must be positive"
)
if cmd.big_blind <= 0 or cmd.big_blind < cmd.small_blind:
raise CommandRejectedError("big_blind must be >= small_blind")
if cmd.min_buy_in <= 0:
raise CommandRejectedError.invalid_argument(
"min_buy_in must be positive"
)
if cmd.max_buy_in < cmd.min_buy_in:
raise CommandRejectedError("max_buy_in must be >= min_buy_in")
if cmd.max_players < 2 or cmd.max_players > 10:
raise CommandRejectedError("max_players must be 2-10")
event = table_proto.TableCreated(
table_name=cmd.table_name,
game_variant=cmd.game_variant,
small_blind=cmd.small_blind,
big_blind=cmd.big_blind,
min_buy_in=cmd.min_buy_in,
max_buy_in=cmd.max_buy_in,
max_players=cmd.max_players,
action_timeout_seconds=cmd.action_timeout_seconds or 30,
created_at=now(),
)
if not router_mode:
self._emit(event)
return event
finally:
if router_mode:
self._state = saved

The table example interleaves @applies and @handles methods in one class — the base class indexes them by reflection at construction time. No separate router to register or compose.


Pass the handler class (not an instance) to the server factory. The framework instantiates the aggregate per command, rebuilding state from the event log.

"""Table bounded context gRPC server.
Uses the unified Router API with the @command_handler class decorator.
"""
import sys
from pathlib import Path
import structlog
sys.path.insert(0, str(Path(__file__).parent.parent.parent / "angzarr"))
from angzarr_client import (
CommandHandlerGrpc,
Router,
configure_logging,
run_server,
)
from angzarr_client.proto.angzarr import command_handler_pb2_grpc
from .handlers import Table
router = Router("table").with_handler(Table()).build()
if __name__ == "__main__":
configure_logging()
logger = structlog.get_logger()
servicer = CommandHandlerGrpc(router)
run_server(
command_handler_pb2_grpc.add_CommandHandlerServiceServicer_to_server,
servicer,
service_name="table-agg",
domain="table",
default_port="50402",
logger=logger,
)

⍼ Angzarr uses Gherkin feature files as living specifications. The same feature files run against all language implementations, guaranteeing identical business behavior.

Example Gherkin feature:

Feature: Player fund reservation
Players must reserve funds when joining a table to ensure
they can cover their buy-in before sitting down.
Scenario: Reserve funds for table buy-in
Given a player with $500 available
When the player reserves $200 for a table
Then the player's available balance is $300
And the player's reserved balance is $200
Scenario: Cannot reserve more than available
Given a player with $500 available
When the player tries to reserve $600
Then the request fails with "insufficient funds"
And the player's available balance remains $500

Because @handles methods are ordinary methods on an aggregate instance, they test directly: construct the aggregate, seed its state, invoke the method, assert on the returned event.

def test_deposit_increases_bankroll():
agg = Player()
agg.state.registered = True
agg.state.bankroll = 1000
event = agg.handle_deposit_funds(DepositFunds(amount=500))
assert event.new_bankroll == 1500

See Testing for the full three-level testing strategy.