Skip to main content

Sagas

A saga is a message translator that bridges bounded contexts. When an event occurs in one domain, a saga reacts by issuing commands to another domain.

Sagas are the bridge between domains. Each domain has its own aggregate, but aggregates don't communicate directly. Instead, sagas listen to events from one domain and generate commands for other domains.


Single Domain Subscription

Sagas should subscribe to ONE domain.

Multi-domain subscription creates:

  • Ordering ambiguity (which event triggers first?)
  • Duplicate processing
  • Race conditions

If you need multi-domain subscription, use a Process Manager.


Saga Pattern

Every saga follows this pattern:

  1. Receive EventBook with domain events
  2. Filter for events this saga cares about
  3. Extract data needed to build commands
  4. Create CommandBooks targeting other aggregates
  5. Return commands (which Angzarr dispatches)

The dashed domain represents any target domain—sagas always bridge from one domain to another.


Example: Table-Hand Saga

When a table starts a hand, issue a DealCards command to the hand domain:

def handle_hand_started(
event: Any,
root: types.UUID | None,
correlation_id: str,
destinations: list[types.EventBook],
) -> list[types.CommandBook]:
"""Translate HandStarted -> DealCards."""
hand_started = table.HandStarted()
event.Unpack(hand_started)

# Get next sequence from destination state
dest_seq = next_sequence(destinations[0]) if destinations else 0

# Convert SeatSnapshot to PlayerInHand
players = [
hand.PlayerInHand(
player_root=seat.player_root,
position=seat.position,
stack=seat.stack,
)
for seat in hand_started.active_players
]

# Build DealCards command
deal_cards = hand.DealCards(
table_root=hand_started.hand_root,
hand_number=hand_started.hand_number,
game_variant=hand_started.game_variant,
dealer_position=hand_started.dealer_position,
small_blind=hand_started.small_blind,
big_blind=hand_started.big_blind,
)
deal_cards.players.extend(players)

cmd_any = Any()
cmd_any.Pack(deal_cards, type_url_prefix="type.googleapis.com/")

return [
types.CommandBook(
cover=types.Cover(
domain="hand",
root=types.UUID(value=hand_started.hand_root),
correlation_id=correlation_id,
),
pages=[
types.CommandPage(
sequence=dest_seq,
command=cmd_any,
)
],
)
]

EventRouter Registration

router = (
EventRouter("saga-table-hand")
.domain("table")
.prepare("HandStarted", prepare_hand_started)
.on("HandStarted", handle_hand_started)
)

Splitter Pattern

When one event should trigger commands to multiple different aggregates, return multiple CommandBook entries — one per target aggregate root. This is the splitter pattern.

Example: When a table settles, distribute payouts to multiple players:

def handle_table_settled(event: table.TableSettled, context: SagaContext) -> list[types.CommandBook]:
"""Split one event into commands for multiple player aggregates."""
commands = []

for payout in event.payouts:
cmd = player.TransferFunds(
table_root=event.table_root,
amount=payout.amount,
)

target_seq = context.get_sequence("player", payout.player_root)

commands.append(types.CommandBook(
cover=types.Cover(domain="player", root=types.UUID(value=payout.player_root)),
pages=[types.CommandPage(sequence=target_seq, command=pack_any(cmd))],
))

return commands # One CommandBook per player

Each CommandBook targets a different aggregate root. The framework dispatches them independently — if one fails, others may still succeed (handle via compensation).


Compensation Flow

When a saga command is rejected (e.g., table is full), Angzarr routes a Notification back to the source aggregate:

1. Player emits FundsReserved


2. Saga issues JoinTable → Table


3. Table rejects: "table_full"


4. Notification sent to Player


5. Player handles rejection → emits FundsReleased

The source aggregate decides how to compensate based on the rejection reason.

@rejected("table", "JoinTable")
def handle_join_rejected(state: PlayerState, notification: Notification) -> FundsReleased:
# Release the funds that were reserved for this failed join
return FundsReleased(
amount=state.reserved_amount,
reason=f"Join failed: {notification.rejection_reason}",
new_available=state.bankroll,
new_reserved=0,
)
Advanced Compensation

For complex scenarios (DLQ routing, escalation webhooks, RevocationResponse flags), see Error Recovery.


Sequence Handling

Sagas MUST set correct sequence numbers on commands. The framework validates sequences for optimistic concurrency.

Two-Phase Saga Flow

The saga coordinator uses a two-phase flow to provide target domain context:

Phase 1: Coordinator receives source event

Coordinator calls prepare handler to get destination covers

Coordinator fetches EventBooks for declared destinations

Phase 2: Coordinator invokes your saga handler with:
- Source event
- Destination EventBooks (target domain states)

The SagaContext contains the target domain(s) aggregate states—not the source domain. This allows your saga to:

  • Get correct sequence numbers for optimistic concurrency
  • Make routing decisions based on target state
  • Avoid stale sequence errors
# SagaContext contains target domain state (table), not source (player)
# Fetched by coordinator before invoking your handler

target_seq = context.get_sequence("table", table_id)

# Use in command
CommandPage(sequence=target_seq, command=cmd_any)

Commands with incorrect sequences are rejected, triggering automatic retry with fresh state.


Transactional Guarantees

CQRS/ES architectures cannot provide distributed ACID transactions. Instead:

  1. Design for success: Saga commands should not fail under normal operation
  2. Handle exceptions: Compensation flow handles the rare rejection cases
  3. Eventual consistency: Accept that cross-domain operations settle over time

If saga commands frequently fail, reconsider your domain boundaries.


Further Reading


Next Steps