Skip to main content

Sagas

A saga is a message translator that bridges bounded contexts. When an event occurs in one domain, a saga reacts by issuing commands to another domain.

Sagas are the bridge between domains. Each domain has its own aggregate, but aggregates don't communicate directly. Instead, sagas listen to events from one domain and generate commands for other domains.


Single Domain Subscription

Sagas should subscribe to ONE domain.

Multi-domain subscription creates:

  • Ordering ambiguity (which event triggers first?)
  • Duplicate processing
  • Race conditions

If you need multi-domain subscription, use a Process Manager.


Saga Pattern

Every saga follows this pattern:

  1. Receive EventBook with domain events
  2. Filter for events this saga cares about
  3. Extract data needed to build commands or facts
  4. Create CommandBooks or FactBooks targeting other aggregates
  5. Return commands/facts (which Angzarr dispatches)

The dashed domain represents any target domain—sagas always bridge from one domain to another.


Example: Table-Hand Saga

When a table starts a hand, issue a DealCards command to the hand domain:

EventRouter with explicit handler registration:

@domain("table")
@output_domain("hand")
class TableHandSaga(Saga):
"""Saga that translates HandStarted events to DealCards commands.

Uses the OO pattern with @domain, @output_domain, @prepares, and @handles decorators.
"""

name = "saga-table-hand"

@prepares(table.HandStarted)
def prepare_hand_started(self, event: table.HandStarted) -> list[types.Cover]:
"""Declare the hand aggregate as destination."""
return [
types.Cover(
domain="hand",
root=types.UUID(value=event.hand_root),
)
]

@handles(table.HandStarted)
def handle_hand_started(
self,
event: table.HandStarted,
destinations: list[types.EventBook],
) -> types.CommandBook:
"""Translate HandStarted -> DealCards."""
# Get next sequence from destination state
dest_seq = next_sequence(destinations[0]) if destinations else 0

# Convert SeatSnapshot to PlayerInHand
players = [
hand.PlayerInHand(
player_root=seat.player_root,
position=seat.position,
stack=seat.stack,
)
for seat in event.active_players
]

# Build DealCards command
deal_cards = hand.DealCards(
table_root=event.hand_root,
hand_number=event.hand_number,
game_variant=event.game_variant,
dealer_position=event.dealer_position,
small_blind=event.small_blind,
big_blind=event.big_blind,
)
deal_cards.players.extend(players)

# Return pre-packed CommandBook for full control
from google.protobuf.any_pb2 import Any

cmd_any = Any()
cmd_any.Pack(deal_cards, type_url_prefix="type.googleapis.com/")

return types.CommandBook(
cover=types.Cover(
domain="hand",
root=types.UUID(value=event.hand_root),
),
pages=[
types.CommandPage(
sequence=dest_seq,
command=cmd_any,
)
],
)


EventRouter Registration

if __name__ == "__main__":
handler = SagaHandler(TableHandSaga)
run_saga_server("saga-table-hand", "50411", handler, logger=logger)

Splitter Pattern

When one event should trigger commands to multiple different aggregates, return multiple CommandBook entries — one per target aggregate root. This is the splitter pattern.

Example: When a table settles, distribute payouts to multiple players:

def handle_table_settled(
event: table.TableSettled, context: SagaContext
) -> list[types.CommandBook]:
"""Split one event into commands for multiple player aggregates."""
commands = []

for payout in event.payouts:
cmd = player.TransferFunds(
table_root=event.table_root,
amount=payout.amount,
)

target_seq = context.get_sequence("player", payout.player_root)

commands.append(
types.CommandBook(
cover=types.Cover(
domain="player", root=types.UUID(value=payout.player_root)
),
pages=[types.CommandPage(sequence=target_seq, command=pack_any(cmd))],
)
)

return commands # One CommandBook per player


Each CommandBook targets a different aggregate root. The framework dispatches them independently — if one fails, others may still succeed (handle via compensation).


Compensation Flow

When a saga command is rejected (e.g., table is full), Angzarr routes a Notification back to the source aggregate:

illustrative - compensation flow diagram
1. Player emits FundsReserved


2. Saga issues JoinTable → Table


3. Table rejects: "table_full"


4. Notification sent to Player


5. Player handles rejection → emits FundsReleased

The source aggregate decides how to compensate based on the rejection reason.

def handle_join_rejected(
notification: types.Notification,
state: PlayerState,
) -> types.EventBook | None:
"""Handle JoinTable rejection by releasing reserved funds.

Called when the JoinTable command (issued by saga-player-table after
FundsReserved) is rejected by the Table aggregate.
"""
from google.protobuf.any_pb2 import Any

# Extract rejection details from the notification payload
rejection = types.RejectionNotification()
if notification.payload:
notification.payload.Unpack(rejection)

# Extract table_root from the rejected command
table_root = b""
if rejection.rejected_command and rejection.rejected_command.cover:
if rejection.rejected_command.cover.root:
table_root = rejection.rejected_command.cover.root.value

# Release the funds that were reserved for this table
table_key = table_root.hex()
reserved_amount = state.table_reservations.get(table_key, 0)
new_reserved = state.reserved_funds - reserved_amount
new_available = state.bankroll - new_reserved

event = player.FundsReleased(
amount=poker_types.Currency(amount=reserved_amount, currency_code="CHIPS"),
table_root=table_root,
new_available_balance=poker_types.Currency(
amount=new_available, currency_code="CHIPS"
),
new_reserved_balance=poker_types.Currency(
amount=new_reserved, currency_code="CHIPS"
),
released_at=now(),
)

# Pack the event
event_any = Any()
event_any.Pack(event, type_url_prefix="type.googleapis.com/")

# Build the EventBook using the notification's cover for routing
return types.EventBook(
cover=notification.cover,
pages=[types.EventPage(header=types.PageHeader(sequence=0), event=event_any)],
)


Advanced Compensation

For complex scenarios (DLQ routing, escalation webhooks, RevocationResponse flags), see Error Recovery.


Commands vs Facts

Sagas can emit either commands or facts to target domains:

OutputCan RejectUse Case
CommandYesRequest action that may fail validation
FactNoAssert external reality the aggregate must accept

Commands go through the target aggregate's guard/validate/compute flow and can be rejected. Facts bypass validation entirely—the aggregate must accept them.

illustrative
# Command: request an action (can be rejected)
def handle_hand_started(event: HandStarted, destinations: list[EventBook]):
return CommandBook(
cover=Cover(domain="player", root=event.player_id),
pages=[CommandPage(sequence=dest_seq, command=DeductBlinds(amount=50))],
)

# Fact: assert reality (cannot be rejected)
def handle_turn_assigned(event: TurnAssigned, destinations: list[EventBook]):
return FactBook(
cover=Cover(domain="player", root=event.player_id, external_id=f"turn:{event.hand_id}"),
pages=[FactPage(fact=FactSequence(source="hand"), event=YourTurn(hand_id=event.hand_id))],
)

Use facts when the source domain has authority the target must accept—tournament seating, dealer rulings, external system confirmations. See Commands vs Facts for details.


Sequence Handling

Sagas MUST set correct sequence numbers on commands. The framework validates sequences for optimistic concurrency.

Two-Phase Saga Flow

The saga coordinator uses a two-phase flow to provide target domain context:

illustrative - two-phase flow
Phase 1: Coordinator receives source event

Coordinator calls prepare handler to get destination covers

Coordinator fetches EventBooks for declared destinations

Phase 2: Coordinator invokes your saga handler with:
- Source event
- Destination EventBooks (target domain states)

The SagaContext contains the target domain(s) aggregate states—not the source domain. This allows your saga to:

  • Get correct sequence numbers for optimistic concurrency
  • Make routing decisions based on target state
  • Avoid stale sequence errors
illustrative
# SagaContext contains target domain state (table), not source (player)
# Fetched by coordinator before invoking your handler

target_seq = context.get_sequence("table", table_id)

# Use in command
CommandPage(sequence=target_seq, command=cmd_any)

Commands with incorrect sequences are rejected, triggering automatic retry with fresh state.


Transactional Guarantees

CQRS/ES architectures cannot provide distributed ACID transactions. Instead:

  1. Design for success: Saga commands should not fail under normal operation
  2. Handle exceptions: Compensation flow handles the rare rejection cases
  3. Eventual consistency: Accept that cross-domain operations settle over time

If saga commands frequently fail, reconsider your domain boundaries.


Further Reading


Next Steps