Skip to main content

Aggregates

A command handler (also called aggregate) processes commands for a domain, validates business rules against current state, and emits events. This is the consistency boundary for domain objects.

There is exactly one aggregate codebase per domain. The "player" domain has one aggregate that handles all player-related commands (RegisterPlayer, DepositFunds, ReserveFunds, etc.). This single codebase scales horizontally across many processes.


Handler Pattern: guard/validate/compute

All aggregate command handlers follow a three-function pattern that makes business logic 100% unit testable without mocking:

StepPurposePure Function
guard(state)Check state preconditions (aggregate exists, correct phase)state → Result
validate(cmd, state)Validate command inputs against statecmd + state → Result
compute(cmd, state)Build the resulting eventcmd + state → Event

The public handle_* function is thin orchestration: unpack protobuf, call guard → validate → compute, pack event.


Example: Deposit Handler

Guard

Checks state preconditions before processing:

@staticmethod
def _guard_deposit(state: _PlayerState) -> None:
if not state.player_id:
raise CommandRejectedError("Player does not exist")

Validate

Validates command inputs and extracts validated data:

@staticmethod
def _validate_deposit(cmd: player_proto.DepositFunds) -> int:
amount = cmd.amount.amount if cmd.amount else 0
if amount <= 0:
raise CommandRejectedError("amount must be positive")
return amount

Compute

Builds the resulting event from validated inputs:

@staticmethod
def _compute_deposit(
cmd: player_proto.DepositFunds, state: _PlayerState, amount: int
) -> player_proto.FundsDeposited:
new_balance = state.bankroll + amount
return player_proto.FundsDeposited(
amount=cmd.amount,
new_balance=poker_types.Currency(amount=new_balance, currency_code="CHIPS"),
deposited_at=now(),
)

State Reconstruction

State is rebuilt by applying events in sequence:

Prior Events:
[0] PlayerRegistered { username: "Alice", initial_bankroll: 1000 }
[1] FundsDeposited { amount: 500, new_bankroll: 1500 }
[2] FundsReserved { amount: 200, new_available: 1300, new_reserved: 200 }

Rebuild:
state = PlayerState::default()
apply(PlayerRegistered) → state.registered = true, state.bankroll = 1000
apply(FundsDeposited) → state.bankroll = 1500
apply(FundsReserved) → state.reserved = 200, state.available = 1300

Result:
PlayerState { registered: true, bankroll: 1500, reserved: 200, available: 1300 }

Unit Testing

Each function is testable in isolation:

def test_deposit_increases_bankroll():
state = PlayerState(registered=True, bankroll=1000)
cmd = DepositFunds(amount=500)

event = compute_deposit(cmd, state)

assert event.new_bankroll == 1500

CommandRouter

Use the CommandRouter to register handlers and dispatch commands:

router = (
CommandRouter("player", state_from_event_book)
.on(name(player.RegisterPlayer), handle_register_player)
.on(name(player.DepositFunds), handle_deposit_funds)
.on(name(player.WithdrawFunds), handle_withdraw_funds)
.on(name(player.ReserveFunds), handle_reserve_funds)
.on(name(player.ReleaseFunds), handle_release_funds)
.on(name(player.RequestAction), handle_request_action)
)

OO Alternative: Aggregate Base Class

All languages also support an OO approach where handlers are methods on an Aggregate subclass, using decorators/attributes/macros for dispatch:

# --- Deposit: guard/validate/compute ---

# docs:start:deposit_guard
@staticmethod
def _guard_deposit(state: _PlayerState) -> None:
if not state.player_id:
raise CommandRejectedError("Player does not exist")
# docs:end:deposit_guard

# docs:start:deposit_validate
@staticmethod
def _validate_deposit(cmd: player_proto.DepositFunds) -> int:
amount = cmd.amount.amount if cmd.amount else 0
if amount <= 0:
raise CommandRejectedError("amount must be positive")
return amount
# docs:end:deposit_validate

# docs:start:deposit_compute
@staticmethod
def _compute_deposit(
cmd: player_proto.DepositFunds, state: _PlayerState, amount: int
) -> player_proto.FundsDeposited:
new_balance = state.bankroll + amount
return player_proto.FundsDeposited(
amount=cmd.amount,
new_balance=poker_types.Currency(amount=new_balance, currency_code="CHIPS"),
deposited_at=now(),
)
# docs:end:deposit_compute

@handles(player_proto.RegisterPlayer)
def register(self, cmd: player_proto.RegisterPlayer) -> player_proto.PlayerRegistered:
"""Register a new player."""
if self.exists:
raise CommandRejectedError("Player already exists")
if not cmd.display_name:
raise CommandRejectedError("display_name is required")
if not cmd.email:
raise CommandRejectedError("email is required")

return player_proto.PlayerRegistered(
display_name=cmd.display_name,
email=cmd.email,
player_type=cmd.player_type,
ai_model_id=cmd.ai_model_id,
registered_at=now(),
)

@handles(player_proto.DepositFunds)
def deposit(self, cmd: player_proto.DepositFunds) -> player_proto.FundsDeposited:
"""Deposit funds into player's bankroll."""
state = self._get_state()
self._guard_deposit(state)
amount = self._validate_deposit(cmd)
return self._compute_deposit(cmd, state, amount)

@handles(player_proto.WithdrawFunds)
def withdraw(self, cmd: player_proto.WithdrawFunds) -> player_proto.FundsWithdrawn:
"""Withdraw funds from player's bankroll."""
if not self.exists:
raise CommandRejectedError("Player does not exist")

amount = cmd.amount.amount if cmd.amount else 0
if amount <= 0:
raise CommandRejectedError("amount must be positive")
if amount > self.available_balance:
raise CommandRejectedError("Insufficient funds")

new_balance = self.bankroll - amount
return player_proto.FundsWithdrawn(
amount=cmd.amount,
new_balance=poker_types.Currency(amount=new_balance, currency_code="CHIPS"),
withdrawn_at=now(),
)

# docs:start:reserve_funds_oo
@handles(player_proto.ReserveFunds)
def reserve(self, cmd: player_proto.ReserveFunds) -> player_proto.FundsReserved:
"""Reserve funds for a table buy-in."""
if not self.exists:
raise CommandRejectedError("Player does not exist")

amount = cmd.amount.amount if cmd.amount else 0
if amount <= 0:
raise CommandRejectedError("amount must be positive")

table_key = cmd.table_root.hex()
if table_key in self.table_reservations:
raise CommandRejectedError("Funds already reserved for this table")

if amount > self.available_balance:
raise CommandRejectedError("Insufficient funds")

new_reserved = self.reserved_funds + amount
new_available = self.bankroll - new_reserved
return player_proto.FundsReserved(
amount=cmd.amount,
table_root=cmd.table_root,
new_available_balance=poker_types.Currency(
amount=new_available, currency_code="CHIPS"
),
new_reserved_balance=poker_types.Currency(
amount=new_reserved, currency_code="CHIPS"
),
reserved_at=now(),
)
# docs:end:reserve_funds_oo

@handles(player_proto.ReleaseFunds)
def release(self, cmd: player_proto.ReleaseFunds) -> player_proto.FundsReleased:
"""Release reserved funds when leaving a table."""
if not self.exists:
raise CommandRejectedError("Player does not exist")

table_key = cmd.table_root.hex()
reserved_for_table = self.table_reservations.get(table_key, 0)
if reserved_for_table == 0:
raise CommandRejectedError("No funds reserved for this table")

new_reserved = self.reserved_funds - reserved_for_table
new_available = self.bankroll - new_reserved
return player_proto.FundsReleased(
amount=poker_types.Currency(amount=reserved_for_table, currency_code="CHIPS"),
table_root=cmd.table_root,
new_available_balance=poker_types.Currency(
amount=new_available, currency_code="CHIPS"
),
new_reserved_balance=poker_types.Currency(
amount=new_reserved, currency_code="CHIPS"
),
released_at=now(),
)

Both patterns produce identical behavior—choose based on team preference. The functional CommandRouter is more explicit; the OO approach integrates state and handlers in one class.


Event Sequencing

Each event has a sequence field. The aggregate computes the next sequence from prior events:

def next_sequence(event_book: EventBook) -> int:
if event_book.pages:
return event_book.pages[-1].sequence + 1
if event_book.snapshot:
return event_book.snapshot.sequence + 1
return 0

Events with incorrect sequences are rejected (optimistic concurrency control).


Snapshots

For aggregates with many events, enable snapshots to avoid full replay:

  1. Define state as a protobuf message
  2. Return the updated state in EventBook.snapshot_state
  3. Angzarr persists snapshots automatically

On subsequent commands, only events after the snapshot are loaded.


Next Steps

  • Sagas — Cross-domain command orchestration
  • Projectors — Building read models
  • Testing — Three-level testing strategy