Aggregates
A command handler (also called aggregate) processes commands for a domain, validates business rules against current state, and emits events. This is the consistency boundary for domain objects.
There is exactly one aggregate codebase per domain. The “player” domain has one aggregate that handles all player-related commands (RegisterPlayer, DepositFunds, ReserveFunds, etc.). This single codebase scales horizontally across many processes.
The OO Handler Pattern
Section titled “The OO Handler Pattern”Every aggregate is a class derived from the client library’s CommandHandler<State> (Python, Java) / CommandHandler[State] (C#) / CommandHandler<State> (Rust generic, Go embedded struct, C++ template). Handler methods are decorated/annotated per language idiom:
| Decorator | Purpose |
|---|---|
@handles(CommandType) | Marks the method as the command handler; returns the resulting event |
@applies(EventType) | Marks the method as the event applier; mutates state in place |
The base class discovers decorated methods by reflection at construction time. There is no separate router to build or compose.
Handler method names match the proto-generated command type in the language’s idiomatic casing — handle_register_player, HandleRegisterPlayer, RegisterPlayerHandler. Appliers follow the same convention — apply_player_registered, ApplyPlayerRegistered, etc.
Querying External Systems
Section titled “Querying External Systems”Aggregates may query external systems when deciding how to handle a command — checking inventory with a warehouse API, validating payment details with a processor, or fetching exchange rates. This is an integration point where your event-sourced domain connects with non-DDD systems, legacy applications, or third-party services.
However, aggregates should only read from external systems, never write. Side effects to external systems belong in projectors, which react to committed events. This separation ensures that failed commands don’t leave partial writes in external systems.
Example: Command Handler + Applier on One Class
Section titled “Example: Command Handler + Applier on One Class”@handles(table_proto.CreateTable)def handle_create_table( self, cmd: table_proto.CreateTable, state: _TableState | None = None, seq: int | None = None,) -> table_proto.TableCreated: """Create a new table.""" router_mode = state is not None saved = self._router_bind(state) if router_mode else None try: if self.exists: raise CommandRejectedError("Table already exists") if not cmd.table_name: raise CommandRejectedError("table_name is required") if cmd.small_blind <= 0: raise CommandRejectedError.invalid_argument( "small_blind must be positive" ) if cmd.big_blind <= 0 or cmd.big_blind < cmd.small_blind: raise CommandRejectedError("big_blind must be >= small_blind") if cmd.min_buy_in <= 0: raise CommandRejectedError.invalid_argument( "min_buy_in must be positive" ) if cmd.max_buy_in < cmd.min_buy_in: raise CommandRejectedError("max_buy_in must be >= min_buy_in") if cmd.max_players < 2 or cmd.max_players > 10: raise CommandRejectedError("max_players must be 2-10")
event = table_proto.TableCreated( table_name=cmd.table_name, game_variant=cmd.game_variant, small_blind=cmd.small_blind, big_blind=cmd.big_blind, min_buy_in=cmd.min_buy_in, max_buy_in=cmd.max_buy_in, max_players=cmd.max_players, action_timeout_seconds=cmd.action_timeout_seconds or 30, created_at=now(), ) if not router_mode: self._emit(event) return event finally: if router_mode: self._state = savedThe same class carries both @handles command methods and @applies event methods. Encapsulating state, rules, and reducers on a single aggregate instance is the consistent pattern across all six languages.
State Reconstruction
Section titled “State Reconstruction”State is rebuilt by applying events in sequence. Each @applies method receives a fresh state instance and mutates it; the base class orchestrates replay.
Prior Events: [0] PlayerRegistered { username: "Alice", initial_bankroll: 1000 } [1] FundsDeposited { amount: 500, new_bankroll: 1500 } [2] FundsReserved { amount: 200, new_available: 1300, new_reserved: 200 }
Rebuild: state = PlayerState() # fresh instance agg.apply_player_registered(state, event0) # state.registered = true, state.bankroll = 1000 agg.apply_funds_deposited(state, event1) # state.bankroll = 1500 agg.apply_funds_reserved(state, event2) # state.reserved = 200, state.available = 1300
Result: PlayerState { registered: true, bankroll: 1500, reserved: 200, available: 1300 }Unit Testing
Section titled “Unit Testing”Instantiate the aggregate, seed state, invoke a @handles method, assert on the returned event. No router composition, no mocks for framework internals.
def test_deposit_increases_bankroll(): agg = Player() agg.state.registered = True agg.state.bankroll = 1000
event = agg.handle_deposit_funds(DepositFunds(amount=500))
assert event.new_bankroll == 1500See Testing for the full three-level strategy (unit, Gherkin feature, integration).
Server Wiring
Section titled “Server Wiring”Pass the handler class (not an instance) to the server factory. The framework constructs one aggregate per command, rebuilding state from the event log.
"""Table bounded context gRPC server.
Uses the unified Router API with the @command_handler class decorator."""
import sysfrom pathlib import Path
import structlog
sys.path.insert(0, str(Path(__file__).parent.parent.parent / "angzarr"))
from angzarr_client import ( CommandHandlerGrpc, Router, configure_logging, run_server,)from angzarr_client.proto.angzarr import command_handler_pb2_grpc
from .handlers import Table
router = Router("table").with_handler(Table()).build()
if __name__ == "__main__": configure_logging() logger = structlog.get_logger() servicer = CommandHandlerGrpc(router) run_server( command_handler_pb2_grpc.add_CommandHandlerServiceServicer_to_server, servicer, service_name="table-agg", domain="table", default_port="50402", logger=logger, )Receiving Facts
Section titled “Receiving Facts”Aggregates receive facts in addition to commands. Facts are events injected directly — they bypass command validation because they represent external reality the aggregate cannot reject.
class Player(CommandHandler[PlayerState]): domain = "player"
@handles(DepositFunds) def handle_deposit_funds(self, cmd: DepositFunds) -> FundsDeposited: if cmd.amount <= 0: raise CommandRejectedError("Invalid amount") # can reject return FundsDeposited(new_bankroll=self.state.bankroll + cmd.amount)
@handles_fact(TurnAssigned) def handle_turn_assigned(self, fact: TurnAssigned) -> None: self.state.current_hand = fact.hand_id # cannot reject self.state.is_my_turn = TrueFacts arrive from:
- Sagas/PMs asserting cross-domain decisions
- External systems (payment confirmations, IoT sensors)
- Human overrides (floor manager rulings)
The aggregate’s only job is to record the fact and update state. See Commands vs Facts for details.
Event Sequencing
Section titled “Event Sequencing”Each event has a sequence field. The aggregate computes the next sequence from prior events:
def next_sequence(event_book: EventBook) -> int: if event_book.pages: return event_book.pages[-1].sequence + 1 if event_book.snapshot: return event_book.snapshot.sequence + 1 return 0Events with incorrect sequences are rejected (optimistic concurrency control).
Snapshots
Section titled “Snapshots”For aggregates with many events, enable snapshots to avoid full replay:
- Define state as a protobuf message
- Return the updated state in
EventBook.snapshot_state - Angzarr persists snapshots automatically
On subsequent commands, only events after the snapshot are loaded.
Next Steps
Section titled “Next Steps”- Sagas — Cross-domain command orchestration
- Projectors — Building read models
- Commands vs Facts — When to use facts over commands
- Testing — Three-level testing strategy