Aggregates
A command handler (also called aggregate) processes commands for a domain, validates business rules against current state, and emits events. This is the consistency boundary for domain objects.
There is exactly one aggregate codebase per domain. The "player" domain has one aggregate that handles all player-related commands (RegisterPlayer, DepositFunds, ReserveFunds, etc.). This single codebase scales horizontally across many processes.
Handler Pattern: guard/validate/compute
All aggregate command handlers follow a three-function pattern that makes business logic 100% unit testable without mocking:
| Step | Purpose | Pure Function |
|---|---|---|
| guard(state) | Check state preconditions (aggregate exists, correct phase) | state → Result |
| validate(cmd, state) | Validate command inputs against state | cmd + state → Result |
| compute(cmd, state) | Build the resulting event | cmd + state → Event |
The public handle_* function is thin orchestration: unpack protobuf, call guard → validate → compute, pack event.
Example: Deposit Handler
Guard
Checks state preconditions before processing:
- Python
- Go
- Rust
- Java
- C#
- C++
@staticmethod
def _guard_deposit(state: _PlayerState) -> None:
if not state.player_id:
raise CommandRejectedError("Player does not exist")
func guardDepositFunds(state PlayerState) error {
if !state.Exists() {
return angzarr.NewCommandRejectedError("Player does not exist")
}
return nil
}
fn guard(state: &PlayerState) -> CommandResult<()> {
if !state.exists() {
return Err(CommandRejectedError::new("Player does not exist"));
}
Ok(())
}
static void guard(PlayerState state) {
if (!state.exists()) {
throw Errors.CommandRejectedError.preconditionFailed("Player does not exist");
}
}
internal static void Guard(PlayerState state)
{
if (!state.Exists)
throw CommandRejectedError.PreconditionFailed("Player does not exist");
}
void guard(const PlayerState& state) {
if (!state.exists()) {
throw angzarr::CommandRejectedError::precondition_failed("Player does not exist");
}
}
Validate
Validates command inputs and extracts validated data:
- Python
- Go
- Rust
- Java
- C#
- C++
@staticmethod
def _validate_deposit(cmd: player_proto.DepositFunds) -> int:
amount = cmd.amount.amount if cmd.amount else 0
if amount <= 0:
raise CommandRejectedError("amount must be positive")
return amount
func validateDepositFunds(cmd *examples.DepositFunds) (int64, error) {
amount := int64(0)
if cmd.Amount != nil {
amount = cmd.Amount.Amount
}
if amount <= 0 {
return 0, angzarr.NewCommandRejectedError("amount must be positive")
}
return amount, nil
}
fn validate(cmd: &DepositFunds) -> CommandResult<i64> {
let amount = cmd.amount.as_ref().map(|c| c.amount).unwrap_or(0);
if amount <= 0 {
return Err(CommandRejectedError::new("amount must be positive"));
}
Ok(amount)
}
static long validate(DepositFunds cmd) {
long amount = cmd.hasAmount() ? cmd.getAmount().getAmount() : 0;
if (amount <= 0) {
throw Errors.CommandRejectedError.invalidArgument("amount must be positive");
}
return amount;
}
internal static long Validate(DepositFunds cmd)
{
var amount = cmd.Amount?.Amount ?? 0;
if (amount <= 0)
throw CommandRejectedError.InvalidArgument("amount must be positive");
return amount;
}
int64_t validate(const examples::DepositFunds& cmd) {
int64_t amount = cmd.has_amount() ? cmd.amount().amount() : 0;
if (amount <= 0) {
throw angzarr::CommandRejectedError::invalid_argument("amount must be positive");
}
return amount;
}
Compute
Builds the resulting event from validated inputs:
- Python
- Go
- Rust
- Java
- C#
- C++
@staticmethod
def _compute_deposit(
cmd: player_proto.DepositFunds, state: _PlayerState, amount: int
) -> player_proto.FundsDeposited:
new_balance = state.bankroll + amount
return player_proto.FundsDeposited(
amount=cmd.amount,
new_balance=poker_types.Currency(amount=new_balance, currency_code="CHIPS"),
deposited_at=now(),
)
func computeFundsDeposited(cmd *examples.DepositFunds, state PlayerState, amount int64) *examples.FundsDeposited {
newBalance := state.Bankroll + amount
return &examples.FundsDeposited{
Amount: cmd.Amount,
NewBalance: &examples.Currency{Amount: newBalance, CurrencyCode: "CHIPS"},
DepositedAt: timestamppb.New(time.Now()),
}
}
fn compute(cmd: &DepositFunds, state: &PlayerState, amount: i64) -> FundsDeposited {
let new_balance = state.bankroll + amount;
FundsDeposited {
amount: cmd.amount.clone(),
new_balance: Some(Currency {
amount: new_balance,
currency_code: "CHIPS".to_string(),
}),
deposited_at: Some(angzarr_client::now()),
}
}
static FundsDeposited compute(DepositFunds cmd, PlayerState state, long amount) {
long newBalance = state.getBankroll() + amount;
return FundsDeposited.newBuilder()
.setAmount(cmd.getAmount())
.setNewBalance(Currency.newBuilder()
.setAmount(newBalance)
.setCurrencyCode("CHIPS"))
.setDepositedAt(now())
.build();
}
internal static FundsDeposited Compute(DepositFunds cmd, PlayerState state, long amount)
{
var newBalance = state.Bankroll + amount;
return new FundsDeposited
{
Amount = cmd.Amount,
NewBalance = new Currency { Amount = newBalance, CurrencyCode = "CHIPS" },
DepositedAt = Timestamp.FromDateTime(DateTime.UtcNow)
};
}
examples::FundsDeposited compute(const examples::DepositFunds& cmd, const PlayerState& state, int64_t amount) {
int64_t new_balance = state.bankroll + amount;
examples::FundsDeposited event;
event.mutable_amount()->CopyFrom(cmd.amount());
event.mutable_new_balance()->set_amount(new_balance);
event.mutable_new_balance()->set_currency_code("CHIPS");
auto now = std::chrono::system_clock::now();
auto seconds = std::chrono::duration_cast<std::chrono::seconds>(now.time_since_epoch()).count();
event.mutable_deposited_at()->set_seconds(seconds);
return event;
}
State Reconstruction
State is rebuilt by applying events in sequence:
Prior Events:
[0] PlayerRegistered { username: "Alice", initial_bankroll: 1000 }
[1] FundsDeposited { amount: 500, new_bankroll: 1500 }
[2] FundsReserved { amount: 200, new_available: 1300, new_reserved: 200 }
Rebuild:
state = PlayerState::default()
apply(PlayerRegistered) → state.registered = true, state.bankroll = 1000
apply(FundsDeposited) → state.bankroll = 1500
apply(FundsReserved) → state.reserved = 200, state.available = 1300
Result:
PlayerState { registered: true, bankroll: 1500, reserved: 200, available: 1300 }
Unit Testing
Each function is testable in isolation:
- Python
- Go
- Rust
- Java
- C#
- C++
def test_deposit_increases_bankroll():
state = PlayerState(registered=True, bankroll=1000)
cmd = DepositFunds(amount=500)
event = compute_deposit(cmd, state)
assert event.new_bankroll == 1500
func TestDepositIncreasesBankroll(t *testing.T) {
state := &PlayerState{Registered: true, Bankroll: 1000}
cmd := &DepositFunds{Amount: 500}
event := computeDeposit(cmd, state)
assert.Equal(t, int32(1500), event.NewBankroll)
}
#[test]
fn test_deposit_increases_bankroll() {
let state = PlayerState { registered: true, bankroll: 1000, ..Default::default() };
let cmd = DepositFunds { amount: 500 };
let event = compute_deposit(&cmd, &state);
assert_eq!(event.new_bankroll, 1500);
}
@Test
void testDepositIncreasesBankroll() {
PlayerState state = new PlayerState();
state.registered = true;
state.bankroll = 1000;
DepositFunds cmd = DepositFunds.newBuilder().setAmount(500).build();
FundsDeposited event = computeDeposit(cmd, state);
assertEquals(1500, event.getNewBankroll());
}
[Fact]
public void TestDepositIncreasesBankroll()
{
var state = new PlayerState { Registered = true, Bankroll = 1000 };
var cmd = new DepositFunds { Amount = 500 };
var @event = ComputeDeposit(cmd, state);
Assert.Equal(1500, @event.NewBankroll);
}
TEST(DepositTest, IncreasesBankroll) {
PlayerState state;
state.registered = true;
state.bankroll = 1000;
DepositFunds cmd;
cmd.set_amount(500);
FundsDeposited event = compute_deposit(cmd, state);
EXPECT_EQ(event.new_bankroll(), 1500);
}
CommandRouter
Use the CommandRouter to register handlers and dispatch commands:
- Python
- Go
- Rust
- Java
- C#
- C++
router = (
CommandRouter("player", state_from_event_book)
.on(name(player.RegisterPlayer), handle_register_player)
.on(name(player.DepositFunds), handle_deposit_funds)
.on(name(player.WithdrawFunds), handle_withdraw_funds)
.on(name(player.ReserveFunds), handle_reserve_funds)
.on(name(player.ReleaseFunds), handle_release_funds)
.on(name(player.RequestAction), handle_request_action)
)
router := angzarr.NewCommandRouter("player", handlers.RebuildState).
On("RegisterPlayer", handlers.HandleRegisterPlayer).
On("DepositFunds", handlers.HandleDepositFunds).
On("WithdrawFunds", handlers.HandleWithdrawFunds).
On("ReserveFunds", handlers.HandleReserveFunds).
On("ReleaseFunds", handlers.HandleReleaseFunds).
OnRejected("table", "JoinTable", handlers.HandleTableJoinRejected)
let router = CommandRouter::new("player", state::rebuild_state)
.on("examples.RegisterPlayer", handlers::handle_register_player)
.on("examples.DepositFunds", handlers::handle_deposit_funds)
.on("examples.WithdrawFunds", handlers::handle_withdraw_funds)
.on("examples.ReserveFunds", handlers::handle_reserve_funds)
.on("examples.ReleaseFunds", handlers::handle_release_funds)
.on_rejected("table", "examples.JoinTable", handlers::handle_join_rejected);
public static CommandRouter<PlayerState> create() {
return new CommandRouter<PlayerState>("player", StateBuilder::fromEventBook)
.on("RegisterPlayer", RegisterPlayer.class, RegisterHandler::handle)
.on("DepositFunds", DepositFunds.class, DepositHandler::handle)
.on("WithdrawFunds", WithdrawFunds.class, WithdrawHandler::handle)
.on("ReserveFunds", ReserveFunds.class, ReserveHandler::handle)
.on("ReleaseFunds", ReleaseFunds.class, ReleaseHandler::handle);
}
public static CommandRouter Create()
{
return new CommandRouter("player", eb => PlayerState.FromEventBook(eb))
.On<RegisterPlayer>((cmd, state) => RegisterHandler.Handle(cmd, (PlayerState)state))
.On<DepositFunds>((cmd, state) => DepositHandler.Handle(cmd, (PlayerState)state))
.On<WithdrawFunds>((cmd, state) => WithdrawHandler.Handle(cmd, (PlayerState)state))
.On<ReserveFunds>((cmd, state) => ReserveHandler.Handle(cmd, (PlayerState)state))
.On<ReleaseFunds>((cmd, state) => ReleaseHandler.Handle(cmd, (PlayerState)state))
.On<TransferFunds>((cmd, state) => TransferHandler.Handle(cmd, (PlayerState)state));
}
angzarr::CommandRouter<player::PlayerState> create_router() {
return angzarr::CommandRouter<player::PlayerState>(PLAYER_DOMAIN, player::PlayerState::from_event_book)
.on<examples::RegisterPlayer, examples::PlayerRegistered>(player::handlers::handle_register)
.on<examples::DepositFunds, examples::FundsDeposited>(player::handlers::handle_deposit)
.on<examples::WithdrawFunds, examples::FundsWithdrawn>(player::handlers::handle_withdraw)
.on<examples::ReserveFunds, examples::FundsReserved>(player::handlers::handle_reserve)
.on<examples::ReleaseFunds, examples::FundsReleased>(player::handlers::handle_release)
.on<examples::TransferFunds, examples::FundsTransferred>(player::handlers::handle_transfer);
}
OO Alternative: Aggregate Base Class
All languages also support an OO approach where handlers are methods on an Aggregate subclass, using decorators/attributes/macros for dispatch:
- Python
- Java
- C#
- C++
- Go
- Rust
# --- Deposit: guard/validate/compute ---
# docs:start:deposit_guard
@staticmethod
def _guard_deposit(state: _PlayerState) -> None:
if not state.player_id:
raise CommandRejectedError("Player does not exist")
# docs:end:deposit_guard
# docs:start:deposit_validate
@staticmethod
def _validate_deposit(cmd: player_proto.DepositFunds) -> int:
amount = cmd.amount.amount if cmd.amount else 0
if amount <= 0:
raise CommandRejectedError("amount must be positive")
return amount
# docs:end:deposit_validate
# docs:start:deposit_compute
@staticmethod
def _compute_deposit(
cmd: player_proto.DepositFunds, state: _PlayerState, amount: int
) -> player_proto.FundsDeposited:
new_balance = state.bankroll + amount
return player_proto.FundsDeposited(
amount=cmd.amount,
new_balance=poker_types.Currency(amount=new_balance, currency_code="CHIPS"),
deposited_at=now(),
)
# docs:end:deposit_compute
@handles(player_proto.RegisterPlayer)
def register(self, cmd: player_proto.RegisterPlayer) -> player_proto.PlayerRegistered:
"""Register a new player."""
if self.exists:
raise CommandRejectedError("Player already exists")
if not cmd.display_name:
raise CommandRejectedError("display_name is required")
if not cmd.email:
raise CommandRejectedError("email is required")
return player_proto.PlayerRegistered(
display_name=cmd.display_name,
email=cmd.email,
player_type=cmd.player_type,
ai_model_id=cmd.ai_model_id,
registered_at=now(),
)
@handles(player_proto.DepositFunds)
def deposit(self, cmd: player_proto.DepositFunds) -> player_proto.FundsDeposited:
"""Deposit funds into player's bankroll."""
state = self._get_state()
self._guard_deposit(state)
amount = self._validate_deposit(cmd)
return self._compute_deposit(cmd, state, amount)
@handles(player_proto.WithdrawFunds)
def withdraw(self, cmd: player_proto.WithdrawFunds) -> player_proto.FundsWithdrawn:
"""Withdraw funds from player's bankroll."""
if not self.exists:
raise CommandRejectedError("Player does not exist")
amount = cmd.amount.amount if cmd.amount else 0
if amount <= 0:
raise CommandRejectedError("amount must be positive")
if amount > self.available_balance:
raise CommandRejectedError("Insufficient funds")
new_balance = self.bankroll - amount
return player_proto.FundsWithdrawn(
amount=cmd.amount,
new_balance=poker_types.Currency(amount=new_balance, currency_code="CHIPS"),
withdrawn_at=now(),
)
# docs:start:reserve_funds_oo
@handles(player_proto.ReserveFunds)
def reserve(self, cmd: player_proto.ReserveFunds) -> player_proto.FundsReserved:
"""Reserve funds for a table buy-in."""
if not self.exists:
raise CommandRejectedError("Player does not exist")
amount = cmd.amount.amount if cmd.amount else 0
if amount <= 0:
raise CommandRejectedError("amount must be positive")
table_key = cmd.table_root.hex()
if table_key in self.table_reservations:
raise CommandRejectedError("Funds already reserved for this table")
if amount > self.available_balance:
raise CommandRejectedError("Insufficient funds")
new_reserved = self.reserved_funds + amount
new_available = self.bankroll - new_reserved
return player_proto.FundsReserved(
amount=cmd.amount,
table_root=cmd.table_root,
new_available_balance=poker_types.Currency(
amount=new_available, currency_code="CHIPS"
),
new_reserved_balance=poker_types.Currency(
amount=new_reserved, currency_code="CHIPS"
),
reserved_at=now(),
)
# docs:end:reserve_funds_oo
@handles(player_proto.ReleaseFunds)
def release(self, cmd: player_proto.ReleaseFunds) -> player_proto.FundsReleased:
"""Release reserved funds when leaving a table."""
if not self.exists:
raise CommandRejectedError("Player does not exist")
table_key = cmd.table_root.hex()
reserved_for_table = self.table_reservations.get(table_key, 0)
if reserved_for_table == 0:
raise CommandRejectedError("No funds reserved for this table")
new_reserved = self.reserved_funds - reserved_for_table
new_available = self.bankroll - new_reserved
return player_proto.FundsReleased(
amount=poker_types.Currency(amount=reserved_for_table, currency_code="CHIPS"),
table_root=cmd.table_root,
new_available_balance=poker_types.Currency(
amount=new_available, currency_code="CHIPS"
),
new_reserved_balance=poker_types.Currency(
amount=new_reserved, currency_code="CHIPS"
),
released_at=now(),
)
@Handles(RegisterPlayer.class)
public PlayerRegistered register(RegisterPlayer cmd) {
// Guard
if (exists()) {
throw Errors.CommandRejectedError.preconditionFailed("Player already exists");
}
// Validate
if (cmd.getDisplayName().isEmpty()) {
throw Errors.CommandRejectedError.invalidArgument("display_name is required");
}
if (cmd.getEmail().isEmpty()) {
throw Errors.CommandRejectedError.invalidArgument("email is required");
}
// Compute
return PlayerRegistered.newBuilder()
.setDisplayName(cmd.getDisplayName())
.setEmail(cmd.getEmail())
.setPlayerType(cmd.getPlayerType())
.setAiModelId(cmd.getAiModelId())
.setRegisteredAt(now())
.build();
}
@Handles(DepositFunds.class)
public FundsDeposited deposit(DepositFunds cmd) {
// Guard
if (!exists()) {
throw Errors.CommandRejectedError.preconditionFailed("Player does not exist");
}
// Validate
long amount = cmd.hasAmount() ? cmd.getAmount().getAmount() : 0;
if (amount <= 0) {
throw Errors.CommandRejectedError.invalidArgument("amount must be positive");
}
// Compute
long newBalance = getBankroll() + amount;
return FundsDeposited.newBuilder()
.setAmount(cmd.getAmount())
.setNewBalance(Currency.newBuilder()
.setAmount(newBalance)
.setCurrencyCode("CHIPS"))
.setDepositedAt(now())
.build();
}
@Handles(WithdrawFunds.class)
public FundsWithdrawn withdraw(WithdrawFunds cmd) {
// Guard
if (!exists()) {
throw Errors.CommandRejectedError.preconditionFailed("Player does not exist");
}
// Validate
long amount = cmd.hasAmount() ? cmd.getAmount().getAmount() : 0;
if (amount <= 0) {
throw Errors.CommandRejectedError.invalidArgument("amount must be positive");
}
if (amount > getAvailableBalance()) {
throw Errors.CommandRejectedError.preconditionFailed("Insufficient funds");
}
// Compute
long newBalance = getBankroll() - amount;
return FundsWithdrawn.newBuilder()
.setAmount(cmd.getAmount())
.setNewBalance(Currency.newBuilder()
.setAmount(newBalance)
.setCurrencyCode("CHIPS"))
.setWithdrawnAt(now())
.build();
}
// docs:start:reserve_funds_oo
@Handles(ReserveFunds.class)
public FundsReserved reserve(ReserveFunds cmd) {
// Guard
if (!exists()) {
throw Errors.CommandRejectedError.preconditionFailed("Player does not exist");
}
// Validate
long amount = cmd.hasAmount() ? cmd.getAmount().getAmount() : 0;
if (amount <= 0) {
throw Errors.CommandRejectedError.invalidArgument("amount must be positive");
}
String tableKey = bytesToHex(cmd.getTableRoot().toByteArray());
if (getState().hasReservationFor(tableKey)) {
throw Errors.CommandRejectedError.preconditionFailed("Funds already reserved for this table");
}
if (amount > getAvailableBalance()) {
throw Errors.CommandRejectedError.preconditionFailed("Insufficient funds");
}
// Compute
long newReserved = getReservedFunds() + amount;
long newAvailable = getBankroll() - newReserved;
return FundsReserved.newBuilder()
.setAmount(cmd.getAmount())
.setTableRoot(cmd.getTableRoot())
.setNewAvailableBalance(Currency.newBuilder()
.setAmount(newAvailable)
.setCurrencyCode("CHIPS"))
.setNewReservedBalance(Currency.newBuilder()
.setAmount(newReserved)
.setCurrencyCode("CHIPS"))
.setReservedAt(now())
.build();
}
// docs:end:reserve_funds_oo
@Handles(ReleaseFunds.class)
public FundsReleased release(ReleaseFunds cmd) {
// Guard
if (!exists()) {
throw Errors.CommandRejectedError.preconditionFailed("Player does not exist");
}
// Validate
String tableKey = bytesToHex(cmd.getTableRoot().toByteArray());
long reservedForTable = getState().getReservationForTable(tableKey);
if (reservedForTable == 0) {
throw Errors.CommandRejectedError.preconditionFailed("No funds reserved for this table");
}
// Compute
long newReserved = getReservedFunds() - reservedForTable;
long newAvailable = getBankroll() - newReserved;
return FundsReleased.newBuilder()
.setAmount(Currency.newBuilder()
.setAmount(reservedForTable)
.setCurrencyCode("CHIPS"))
.setTableRoot(cmd.getTableRoot())
.setNewAvailableBalance(Currency.newBuilder()
.setAmount(newAvailable)
.setCurrencyCode("CHIPS"))
.setNewReservedBalance(Currency.newBuilder()
.setAmount(newReserved)
.setCurrencyCode("CHIPS"))
.setReleasedAt(now())
.build();
}
[Handles(typeof(RegisterPlayer))]
public PlayerRegistered HandleRegister(RegisterPlayer cmd)
{
if (Exists)
throw CommandRejectedError.PreconditionFailed("Player already exists");
if (string.IsNullOrEmpty(cmd.DisplayName))
throw CommandRejectedError.InvalidArgument("display_name is required");
if (string.IsNullOrEmpty(cmd.Email))
throw CommandRejectedError.InvalidArgument("email is required");
return new PlayerRegistered
{
DisplayName = cmd.DisplayName,
Email = cmd.Email,
PlayerType = cmd.PlayerType,
AiModelId = cmd.AiModelId,
RegisteredAt = Timestamp.FromDateTime(DateTime.UtcNow)
};
}
[Handles(typeof(DepositFunds))]
public FundsDeposited HandleDeposit(DepositFunds cmd)
{
if (!Exists)
throw CommandRejectedError.PreconditionFailed("Player does not exist");
var amount = cmd.Amount?.Amount ?? 0;
if (amount <= 0)
throw CommandRejectedError.InvalidArgument("amount must be positive");
var newBalance = Bankroll + amount;
return new FundsDeposited
{
Amount = cmd.Amount,
NewBalance = new Currency { Amount = newBalance, CurrencyCode = "CHIPS" },
DepositedAt = Timestamp.FromDateTime(DateTime.UtcNow)
};
}
[Handles(typeof(WithdrawFunds))]
public FundsWithdrawn HandleWithdraw(WithdrawFunds cmd)
{
if (!Exists)
throw CommandRejectedError.PreconditionFailed("Player does not exist");
var amount = cmd.Amount?.Amount ?? 0;
if (amount <= 0)
throw CommandRejectedError.InvalidArgument("amount must be positive");
if (amount > AvailableBalance)
throw CommandRejectedError.PreconditionFailed("Insufficient funds");
var newBalance = Bankroll - amount;
return new FundsWithdrawn
{
Amount = cmd.Amount,
NewBalance = new Currency { Amount = newBalance, CurrencyCode = "CHIPS" },
WithdrawnAt = Timestamp.FromDateTime(DateTime.UtcNow)
};
}
// docs:start:reserve_funds_oo
[Handles(typeof(ReserveFunds))]
public FundsReserved HandleReserve(ReserveFunds cmd)
{
if (!Exists)
throw CommandRejectedError.PreconditionFailed("Player does not exist");
var amount = cmd.Amount?.Amount ?? 0;
if (amount <= 0)
throw CommandRejectedError.InvalidArgument("amount must be positive");
var tableKey = Convert.ToHexString(cmd.TableRoot.ToByteArray()).ToLowerInvariant();
if (TableReservations.ContainsKey(tableKey))
throw CommandRejectedError.PreconditionFailed("Funds already reserved for this table");
if (amount > AvailableBalance)
throw CommandRejectedError.PreconditionFailed("Insufficient funds");
var newReserved = ReservedFunds + amount;
var newAvailable = Bankroll - newReserved;
return new FundsReserved
{
Amount = cmd.Amount,
TableRoot = cmd.TableRoot,
NewAvailableBalance = new Currency { Amount = newAvailable, CurrencyCode = "CHIPS" },
NewReservedBalance = new Currency { Amount = newReserved, CurrencyCode = "CHIPS" },
ReservedAt = Timestamp.FromDateTime(DateTime.UtcNow)
};
}
// docs:end:reserve_funds_oo
[Handles(typeof(ReleaseFunds))]
public FundsReleased HandleRelease(ReleaseFunds cmd)
{
if (!Exists)
throw CommandRejectedError.PreconditionFailed("Player does not exist");
var tableKey = Convert.ToHexString(cmd.TableRoot.ToByteArray()).ToLowerInvariant();
if (!TableReservations.TryGetValue(tableKey, out var reservedForTable) || reservedForTable == 0)
throw CommandRejectedError.PreconditionFailed("No funds reserved for this table");
var newReserved = ReservedFunds - reservedForTable;
var newAvailable = Bankroll - newReserved;
return new FundsReleased
{
Amount = new Currency { Amount = reservedForTable, CurrencyCode = "CHIPS" },
TableRoot = cmd.TableRoot,
NewAvailableBalance = new Currency { Amount = newAvailable, CurrencyCode = "CHIPS" },
NewReservedBalance = new Currency { Amount = newReserved, CurrencyCode = "CHIPS" },
ReleasedAt = Timestamp.FromDateTime(DateTime.UtcNow)
};
}
[Handles(typeof(TransferFunds))]
public FundsTransferred HandleTransfer(TransferFunds cmd)
{
if (!Exists)
throw CommandRejectedError.PreconditionFailed("Player does not exist");
var amount = cmd.Amount?.Amount ?? 0;
var newBalance = Bankroll + amount;
return new FundsTransferred
{
FromPlayerRoot = cmd.FromPlayerRoot,
ToPlayerRoot = Google.Protobuf.ByteString.CopyFromUtf8(PlayerId),
Amount = cmd.Amount,
HandRoot = cmd.HandRoot,
Reason = cmd.Reason,
NewBalance = new Currency { Amount = newBalance, CurrencyCode = "CHIPS" },
TransferredAt = Timestamp.FromDateTime(DateTime.UtcNow)
};
}
Player::Player() {
// Register command handlers
register_handler<examples::RegisterPlayer, examples::PlayerRegistered>(
[this](const examples::RegisterPlayer& cmd, const PlayerState&) {
return handle_register(cmd);
}
);
register_handler<examples::DepositFunds, examples::FundsDeposited>(
[this](const examples::DepositFunds& cmd, const PlayerState&) {
return handle_deposit(cmd);
}
);
register_handler<examples::WithdrawFunds, examples::FundsWithdrawn>(
[this](const examples::WithdrawFunds& cmd, const PlayerState&) {
return handle_withdraw(cmd);
}
);
register_handler<examples::ReserveFunds, examples::FundsReserved>(
[this](const examples::ReserveFunds& cmd, const PlayerState&) {
return handle_reserve(cmd);
}
);
register_handler<examples::ReleaseFunds, examples::FundsReleased>(
[this](const examples::ReleaseFunds& cmd, const PlayerState&) {
return handle_release(cmd);
}
);
register_handler<examples::TransferFunds, examples::FundsTransferred>(
[this](const examples::TransferFunds& cmd, const PlayerState&) {
return handle_transfer(cmd);
}
);
}
examples::PlayerRegistered Player::handle_register(const examples::RegisterPlayer& cmd) {
return handlers::handle_register(cmd, state_);
}
examples::FundsDeposited Player::handle_deposit(const examples::DepositFunds& cmd) {
return handlers::handle_deposit(cmd, state_);
}
examples::FundsWithdrawn Player::handle_withdraw(const examples::WithdrawFunds& cmd) {
return handlers::handle_withdraw(cmd, state_);
}
examples::FundsReserved Player::handle_reserve(const examples::ReserveFunds& cmd) {
return handlers::handle_reserve(cmd, state_);
}
examples::FundsReleased Player::handle_release(const examples::ReleaseFunds& cmd) {
return handlers::handle_release(cmd, state_);
}
examples::FundsTransferred Player::handle_transfer(const examples::TransferFunds& cmd) {
return handlers::handle_transfer(cmd, state_);
}
Go uses method chaining on AggregateBase rather than struct tags:
// NewHand creates a new Hand aggregate with prior events for state reconstruction.
func NewHand(eventBook *pb.EventBook) *Hand {
h := &Hand{}
h.Init(eventBook, func() HandState {
return HandState{
Players: make(map[string]*PlayerHandState),
Pots: []*PotState{{PotType: "main"}},
}
})
h.SetDomain("hand")
// Register event appliers
h.Applies("CardsDealt", h.applyCardsDealt)
h.Applies("BlindPosted", h.applyBlindPosted)
h.Applies("ActionTaken", h.applyActionTaken)
h.Applies("BettingRoundComplete", h.applyBettingRoundComplete)
h.Applies("CommunityCardsDealt", h.applyCommunityCardsDealt)
h.Applies("DrawCompleted", h.applyDrawCompleted)
h.Applies("ShowdownStarted", h.applyShowdownStarted)
h.Applies("CardsRevealed", h.applyCardsRevealed)
h.Applies("CardsMucked", h.applyCardsMucked)
h.Applies("PotAwarded", h.applyPotAwarded)
h.Applies("HandComplete", h.applyHandComplete)
// Register command handlers
h.Handles("DealCards", h.dealCards)
h.Handles("PostBlind", h.postBlind)
h.Handles("PlayerAction", h.playerAction)
h.Handles("DealCommunityCards", h.dealCommunityCards)
h.Handles("RequestDraw", h.requestDraw)
h.Handles("RevealCards", h.revealCards)
h.HandlesMulti("AwardPot", h.awardPot)
return h
}
Rust uses proc macros on impl blocks:
#[aggregate(domain = "table", state = TableState)]
impl TableAggregate {
// ==========================================================================
// Event Appliers
// ==========================================================================
#[applies(TableCreated)]
fn apply_created(state: &mut TableState, event: TableCreated) {
state.table_id = format!("table_{}", event.table_name);
state.table_name = event.table_name;
state.game_variant = GameVariant::try_from(event.game_variant).unwrap_or_default();
state.small_blind = event.small_blind;
state.big_blind = event.big_blind;
state.min_buy_in = event.min_buy_in;
state.max_buy_in = event.max_buy_in;
state.max_players = event.max_players;
state.action_timeout_seconds = event.action_timeout_seconds;
state.status = "waiting".to_string();
}
#[applies(PlayerJoined)]
fn apply_player_joined(state: &mut TableState, event: PlayerJoined) {
state.seats.insert(
event.seat_position,
SeatState {
position: event.seat_position,
player_root: event.player_root,
stack: event.stack,
is_sitting_out: false,
},
);
}
#[applies(PlayerLeft)]
fn apply_player_left(state: &mut TableState, event: PlayerLeft) {
state.seats.remove(&event.seat_position);
}
#[applies(HandStarted)]
fn apply_hand_started(state: &mut TableState, event: HandStarted) {
state.current_hand_root = event.hand_root;
state.hand_count = event.hand_number;
state.dealer_position = event.dealer_position;
state.status = "in_hand".to_string();
}
#[applies(HandEnded)]
fn apply_hand_ended(state: &mut TableState, event: HandEnded) {
state.current_hand_root.clear();
state.status = "waiting".to_string();
for (player_hex, delta) in &event.stack_changes {
for seat in state.seats.values_mut() {
if hex::encode(&seat.player_root) == *player_hex {
seat.stack += delta;
break;
}
}
}
}
// ==========================================================================
// Command Handlers
// ==========================================================================
#[handles(CreateTable)]
pub fn create(
&self,
cb: &CommandBook,
cmd: CreateTable,
state: &TableState,
seq: u32,
) -> CommandResult<EventBook> {
// Guard
if state.exists() {
return Err(CommandRejectedError::new("Table already exists"));
}
// Validate
if cmd.table_name.is_empty() {
return Err(CommandRejectedError::new("table_name is required"));
}
if cmd.small_blind <= 0 {
return Err(CommandRejectedError::new("small_blind must be positive"));
}
if cmd.big_blind < cmd.small_blind {
return Err(CommandRejectedError::new("big_blind must be >= small_blind"));
}
if cmd.max_players < 2 || cmd.max_players > 10 {
return Err(CommandRejectedError::new("max_players must be 2-10"));
}
// Compute
let event = TableCreated {
table_name: cmd.table_name,
game_variant: cmd.game_variant,
small_blind: cmd.small_blind,
big_blind: cmd.big_blind,
min_buy_in: cmd.min_buy_in,
max_buy_in: cmd.max_buy_in,
max_players: cmd.max_players,
action_timeout_seconds: cmd.action_timeout_seconds,
created_at: Some(angzarr_client::now()),
};
Ok(new_event_book(cb, seq, &event, "examples.TableCreated"))
}
Both patterns produce identical behavior—choose based on team preference. The functional CommandRouter is more explicit; the OO approach integrates state and handlers in one class.
Event Sequencing
Each event has a sequence field. The aggregate computes the next sequence from prior events:
def next_sequence(event_book: EventBook) -> int:
if event_book.pages:
return event_book.pages[-1].sequence + 1
if event_book.snapshot:
return event_book.snapshot.sequence + 1
return 0
Events with incorrect sequences are rejected (optimistic concurrency control).
Snapshots
For aggregates with many events, enable snapshots to avoid full replay:
- Define state as a protobuf message
- Return the updated state in
EventBook.snapshot_state - Angzarr persists snapshots automatically
On subsequent commands, only events after the snapshot are loaded.
Next Steps
- Sagas — Cross-domain command orchestration
- Projectors — Building read models
- Testing — Three-level testing strategy