Sagas
A saga is a message translator that bridges bounded contexts. When an event occurs in one domain, a saga reacts by issuing commands to another domain.
Sagas are the bridge between domains. Each domain has its own aggregate, but aggregates don't communicate directly. Instead, sagas listen to events from one domain and generate commands for other domains.
Single Domain Subscription
Sagas should subscribe to ONE domain.
Multi-domain subscription creates:
- Ordering ambiguity (which event triggers first?)
- Duplicate processing
- Race conditions
If you need multi-domain subscription, use a Process Manager.
Saga Pattern
Every saga follows this pattern:
- Receive EventBook with domain events
- Filter for events this saga cares about
- Extract data needed to build commands
- Create CommandBooks targeting other aggregates
- Return commands (which Angzarr dispatches)
The dashed domain represents any target domain—sagas always bridge from one domain to another.
Example: Table-Hand Saga
When a table starts a hand, issue a DealCards command to the hand domain:
- Python
- Go
- Rust
- Java
- C#
- C++
def handle_hand_started(
event: Any,
root: types.UUID | None,
correlation_id: str,
destinations: list[types.EventBook],
) -> list[types.CommandBook]:
"""Translate HandStarted -> DealCards."""
hand_started = table.HandStarted()
event.Unpack(hand_started)
# Get next sequence from destination state
dest_seq = next_sequence(destinations[0]) if destinations else 0
# Convert SeatSnapshot to PlayerInHand
players = [
hand.PlayerInHand(
player_root=seat.player_root,
position=seat.position,
stack=seat.stack,
)
for seat in hand_started.active_players
]
# Build DealCards command
deal_cards = hand.DealCards(
table_root=hand_started.hand_root,
hand_number=hand_started.hand_number,
game_variant=hand_started.game_variant,
dealer_position=hand_started.dealer_position,
small_blind=hand_started.small_blind,
big_blind=hand_started.big_blind,
)
deal_cards.players.extend(players)
cmd_any = Any()
cmd_any.Pack(deal_cards, type_url_prefix="type.googleapis.com/")
return [
types.CommandBook(
cover=types.Cover(
domain="hand",
root=types.UUID(value=hand_started.hand_root),
correlation_id=correlation_id,
),
pages=[
types.CommandPage(
sequence=dest_seq,
command=cmd_any,
)
],
)
]
// handleHandStarted translates HandStarted → DealCards.
func handleHandStarted(source *pb.EventBook, event *anypb.Any, destinations []*pb.EventBook) ([]*pb.CommandBook, error) {
var handStarted examples.HandStarted
if err := proto.Unmarshal(event.Value, &handStarted); err != nil {
return nil, err
}
// Get next sequence from destination state
var destSeq uint32
if len(destinations) > 0 {
destSeq = angzarr.NextSequence(destinations[0])
}
// Get correlation ID from source
var correlationID string
if source.Cover != nil {
correlationID = source.Cover.CorrelationId
}
// Convert SeatSnapshot to PlayerInHand
players := make([]*examples.PlayerInHand, len(handStarted.ActivePlayers))
for i, seat := range handStarted.ActivePlayers {
players[i] = &examples.PlayerInHand{
PlayerRoot: seat.PlayerRoot,
Position: seat.Position,
Stack: seat.Stack,
}
}
// Build DealCards command
dealCards := &examples.DealCards{
TableRoot: handStarted.HandRoot,
HandNumber: handStarted.HandNumber,
GameVariant: handStarted.GameVariant,
Players: players,
DealerPosition: handStarted.DealerPosition,
SmallBlind: handStarted.SmallBlind,
BigBlind: handStarted.BigBlind,
}
cmdAny, err := anypb.New(dealCards)
if err != nil {
return nil, err
}
return []*pb.CommandBook{
{
Cover: &pb.Cover{
Domain: "hand",
Root: &pb.UUID{Value: handStarted.HandRoot},
CorrelationId: correlationID,
},
Pages: []*pb.CommandPage{
{
Sequence: destSeq,
Command: cmdAny,
},
},
},
}, nil
}
/// Execute handler: translate HandStarted → DealCards.
fn handle_hand_started(
_source: &EventBook,
event_any: &Any,
destinations: &[EventBook],
) -> CommandResult<Option<CommandBook>> {
let event: HandStarted = event_any
.unpack()
.map_err(|e| CommandRejectedError::new(format!("Failed to decode HandStarted: {}", e)))?;
// Get the destination's next sequence
let dest_seq = destinations
.first()
.map(|eb| eb.next_sequence)
.unwrap_or(0);
// Convert SeatSnapshot to PlayerInHand
let players: Vec<PlayerInHand> = event
.active_players
.iter()
.map(|seat| PlayerInHand {
player_root: seat.player_root.clone(),
position: seat.position,
stack: seat.stack,
})
.collect();
// Build DealCards command
let deal_cards = DealCards {
table_root: event.hand_root.clone(), // The hand_root becomes the table_root reference
hand_number: event.hand_number,
game_variant: event.game_variant,
players,
dealer_position: event.dealer_position,
small_blind: event.small_blind,
big_blind: event.big_blind,
deck_seed: vec![], // Let the aggregate generate a random seed
};
let command_any = Any {
type_url: "type.googleapis.com/examples.DealCards".to_string(),
value: deal_cards.encode_to_vec(),
};
Ok(Some(CommandBook {
cover: Some(Cover {
domain: "hand".to_string(),
root: Some(Uuid { value: event.hand_root }),
..Default::default()
}),
pages: vec![CommandPage {
sequence: dest_seq,
payload: Some(command_page::Payload::Command(command_any)),
..Default::default()
}],
saga_origin: None,
}))
}
public static CommandBook handleHandStarted(HandStarted event, List<EventBook> destinations) {
int destSeq = EventRouter.nextSequence(destinations.isEmpty() ? null : destinations.get(0));
List<PlayerInHand> players = new ArrayList<>();
for (SeatSnapshot seat : event.getActivePlayersList()) {
players.add(PlayerInHand.newBuilder()
.setPlayerRoot(seat.getPlayerRoot())
.setPosition(seat.getPosition())
.setStack(seat.getStack())
.build());
}
DealCards dealCards = DealCards.newBuilder()
.setTableRoot(event.getHandRoot())
.setHandNumber(event.getHandNumber())
.setGameVariant(event.getGameVariant())
.setDealerPosition(event.getDealerPosition())
.setSmallBlind(event.getSmallBlind())
.setBigBlind(event.getBigBlind())
.addAllPlayers(players)
.build();
return CommandBook.newBuilder()
.setCover(Cover.newBuilder()
.setDomain("hand")
.setRoot(UUID.newBuilder().setValue(event.getHandRoot())))
.addPages(CommandPage.newBuilder()
.setSequence(destSeq)
.setCommand(EventRouter.packCommand(dealCards)))
.build();
}
private static object HandleHandStarted(HandStarted evt, List<EventBook> destinations)
{
var destSeq = EventRouter.NextSequence(destinations.FirstOrDefault());
var players = evt.ActivePlayers.Select(seat => new PlayerInHand
{
PlayerRoot = seat.PlayerRoot,
Position = seat.Position,
Stack = seat.Stack
}).ToList();
var dealCards = new DealCards
{
TableRoot = evt.HandRoot,
HandNumber = evt.HandNumber,
GameVariant = evt.GameVariant,
DealerPosition = evt.DealerPosition,
SmallBlind = evt.SmallBlind,
BigBlind = evt.BigBlind
};
dealCards.Players.AddRange(players);
var cmdAny = EventRouter.PackCommand(dealCards);
return new CommandBook
{
Cover = new Cover
{
Domain = "hand",
Root = new UUID { Value = evt.HandRoot }
},
Pages = { new CommandPage { Sequence = destSeq, Command = cmdAny } }
};
}
angzarr::CommandBook handle_hand_started(
const examples::HandStarted& event,
const std::vector<angzarr::EventBook>& destinations) {
int dest_seq = destinations.empty() ? 0 : destinations[0].next_sequence();
// Build DealCards command from HandStarted event
examples::DealCards deal_cards;
deal_cards.set_table_root(event.hand_root());
deal_cards.set_hand_number(event.hand_number());
deal_cards.set_game_variant(event.game_variant());
deal_cards.set_dealer_position(event.dealer_position());
deal_cards.set_small_blind(event.small_blind());
deal_cards.set_big_blind(event.big_blind());
// Add players from active players
for (const auto& seat : event.active_players()) {
auto* player = deal_cards.add_players();
player->set_player_root(seat.player_root());
player->set_position(seat.position());
player->set_stack(seat.stack());
}
// Pack command
google::protobuf::Any cmd_any;
cmd_any.PackFrom(deal_cards, "type.googleapis.com/");
// Build command book
angzarr::CommandBook cmd_book;
cmd_book.mutable_cover()->set_domain("hand");
cmd_book.mutable_cover()->mutable_root()->set_value(event.hand_root());
auto* page = cmd_book.add_pages();
page->set_sequence(dest_seq);
page->mutable_command()->CopyFrom(cmd_any);
return cmd_book;
}
EventRouter Registration
- Python
- Go
- Rust
- Java
- C#
- C++
router = (
EventRouter("saga-table-hand")
.domain("table")
.prepare("HandStarted", prepare_hand_started)
.on("HandStarted", handle_hand_started)
)
router := angzarr.NewEventRouter("saga-table-hand").
Domain("table").
Prepare("HandStarted", prepareHandStarted).
On("HandStarted", handleHandStarted)
let router = EventRouter::new("saga-table-hand")
.domain("table")
.prepare("examples.HandStarted", prepare_hand_started)
.on("examples.HandStarted", handle_hand_started);
public static EventRouter createRouter() {
return new EventRouter("saga-table-hand")
.domain("table")
.prepare(HandStarted.class, TableHandRouter::prepareHandStarted)
.on(HandStarted.class, TableHandRouter::handleHandStarted);
}
public static EventRouter Create()
{
return new EventRouter("saga-table-hand")
.Domain("table")
.Prepare<HandStarted>(PrepareHandStarted)
.On<HandStarted>(HandleHandStarted);
}
angzarr::EventRouter create_table_hand_router() {
return angzarr::EventRouter("saga-table-hand")
.domain("table")
.prepare<examples::HandStarted>(prepare_hand_started)
.on<examples::HandStarted>(handle_hand_started);
}
Splitter Pattern
When one event should trigger commands to multiple different aggregates, return multiple CommandBook entries — one per target aggregate root. This is the splitter pattern.
Example: When a table settles, distribute payouts to multiple players:
- Python
- Go
- Rust
- Java
- C#
- C++
def handle_table_settled(event: table.TableSettled, context: SagaContext) -> list[types.CommandBook]:
"""Split one event into commands for multiple player aggregates."""
commands = []
for payout in event.payouts:
cmd = player.TransferFunds(
table_root=event.table_root,
amount=payout.amount,
)
target_seq = context.get_sequence("player", payout.player_root)
commands.append(types.CommandBook(
cover=types.Cover(domain="player", root=types.UUID(value=payout.player_root)),
pages=[types.CommandPage(sequence=target_seq, command=pack_any(cmd))],
))
return commands # One CommandBook per player
func handleTableSettled(event *examples.TableSettled, ctx *angzarr.SagaContext) ([]*pb.CommandBook, error) {
// Split one event into commands for multiple player aggregates
var commands []*pb.CommandBook
for _, payout := range event.Payouts {
cmd := &examples.TransferFunds{
TableRoot: event.TableRoot,
Amount: payout.Amount,
}
targetSeq := ctx.GetSequence("player", payout.PlayerRoot)
cmdAny, _ := anypb.New(cmd)
commands = append(commands, &pb.CommandBook{
Cover: &pb.Cover{Domain: "player", Root: &pb.UUID{Value: payout.PlayerRoot}},
Pages: []*pb.CommandPage{{
Sequence: &pb.CommandPage_Num{Num: targetSeq},
Command: cmdAny,
}},
})
}
return commands, nil // One CommandBook per player
}
fn handle_table_settled(event: &TableSettled, context: &SagaContext) -> Vec<CommandBook> {
// Split one event into commands for multiple player aggregates
event.payouts.iter().map(|payout| {
let cmd = TransferFunds {
table_root: event.table_root.clone(),
amount: payout.amount.clone(),
};
let target_seq = context.get_sequence("player", &payout.player_root);
CommandBook {
cover: Some(Cover {
domain: "player".into(),
root: Some(Uuid { value: payout.player_root.clone() }),
..Default::default()
}),
pages: vec![CommandPage {
sequence: Some(angzarr_client::proto::angzarr::command_page::Sequence::Num(target_seq)),
command: Some(Any::from_msg(&cmd).unwrap()),
..Default::default()
}],
..Default::default()
}
}).collect() // One CommandBook per player
}
class SplitterExample {
List<CommandBook> handleTableSettled(TableSettled event, SagaContext ctx) {
// Split one event into commands for multiple player aggregates
List<CommandBook> commands = new ArrayList<>();
for (var payout : event.getPayoutsList()) {
var cmd = TransferFunds.newBuilder()
.setTableRoot(event.getTableRoot())
.setAmount(payout.getAmount())
.build();
long targetSeq = ctx.getSequence("player", payout.getPlayerRoot());
commands.add(CommandBook.newBuilder()
.setCover(Cover.newBuilder()
.setDomain("player")
.setRoot(UUID.newBuilder().setValue(payout.getPlayerRoot()).build())
.build())
.addPages(CommandPage.newBuilder()
.setNum((int) targetSeq)
.setCommand(Any.pack(cmd))
.build())
.build());
}
return commands; // One CommandBook per player
}
}
public static class SplitterExample
{
public static IEnumerable<CommandBook> HandleTableSettled(TableSettled @event, SagaContext ctx)
{
// Split one event into commands for multiple player aggregates
foreach (var payout in @event.Payouts)
{
var cmd = new TransferFunds
{
TableRoot = @event.TableRoot,
Amount = payout.Amount
};
var targetSeq = ctx.GetSequence("player", payout.PlayerRoot);
yield return new CommandBook
{
Cover = new Cover
{
Domain = "player",
Root = new UUID { Value = payout.PlayerRoot }
},
Pages = {
new CommandPage
{
Num = (uint)targetSeq,
Command = Any.Pack(cmd)
}
}
};
}
// One CommandBook per player
}
}
std::vector<CommandBook> handle_table_settled(
const TableSettled& event, const SagaContext& ctx) {
// Split one event into commands for multiple player aggregates
std::vector<CommandBook> commands;
for (const auto& payout : event.payouts()) {
TransferFunds cmd;
cmd.set_table_root(event.table_root());
*cmd.mutable_amount() = payout.amount();
uint32_t target_seq = ctx.get_sequence("player", payout.player_root());
CommandBook book;
auto* cover = book.mutable_cover();
cover->set_domain("player");
cover->mutable_root()->set_value(payout.player_root());
auto* page = book.add_pages();
page->set_num(target_seq);
page->mutable_command()->PackFrom(cmd);
commands.push_back(book);
}
return commands; // One CommandBook per player
}
Each CommandBook targets a different aggregate root. The framework dispatches them independently — if one fails, others may still succeed (handle via compensation).
Compensation Flow
When a saga command is rejected (e.g., table is full), Angzarr routes a Notification back to the source aggregate:
1. Player emits FundsReserved
│
▼
2. Saga issues JoinTable → Table
│
▼
3. Table rejects: "table_full"
│
▼
4. Notification sent to Player
│
▼
5. Player handles rejection → emits FundsReleased
The source aggregate decides how to compensate based on the rejection reason.
- Python
- Rust
- Go
- Java
- C#
- C++
@rejected("table", "JoinTable")
def handle_join_rejected(state: PlayerState, notification: Notification) -> FundsReleased:
# Release the funds that were reserved for this failed join
return FundsReleased(
amount=state.reserved_amount,
reason=f"Join failed: {notification.rejection_reason}",
new_available=state.bankroll,
new_reserved=0,
)
fn handle_join_rejected(
state: &PlayerState,
notification: &Notification,
) -> Result<FundsReleased, Status> {
Ok(FundsReleased {
amount: state.reserved_amount,
reason: format!("Join failed: {}", notification.rejection_reason),
new_available: state.bankroll,
new_reserved: 0,
})
}
func handleJoinRejected(state *PlayerState, notification *Notification) (*FundsReleased, error) {
return &FundsReleased{
Amount: state.ReservedAmount,
Reason: fmt.Sprintf("Join failed: %s", notification.RejectionReason),
NewAvailable: state.Bankroll,
NewReserved: 0,
}, nil
}
@Rejected(domain = "table", command = "JoinTable")
public FundsReleased handleJoinRejected(PlayerState state, Notification notification) {
return FundsReleased.newBuilder()
.setAmount(state.getReservedAmount())
.setReason("Join failed: " + notification.getRejectionReason())
.setNewAvailable(state.getBankroll())
.setNewReserved(0)
.build();
}
[Rejected("table", "JoinTable")]
public FundsReleased HandleJoinRejected(PlayerState state, Notification notification)
{
return new FundsReleased
{
Amount = state.ReservedAmount,
Reason = $"Join failed: {notification.RejectionReason}",
NewAvailable = state.Bankroll,
NewReserved = 0
};
}
FundsReleased handle_join_rejected(
const PlayerState& state, const Notification& notification) {
FundsReleased event;
event.set_amount(state.reserved_amount());
event.set_reason("Join failed: " + notification.rejection_reason());
event.set_new_available(state.bankroll());
event.set_new_reserved(0);
return event;
}
For complex scenarios (DLQ routing, escalation webhooks, RevocationResponse flags), see Error Recovery.
Sequence Handling
Sagas MUST set correct sequence numbers on commands. The framework validates sequences for optimistic concurrency.
Two-Phase Saga Flow
The saga coordinator uses a two-phase flow to provide target domain context:
Phase 1: Coordinator receives source event
↓
Coordinator calls prepare handler to get destination covers
↓
Coordinator fetches EventBooks for declared destinations
↓
Phase 2: Coordinator invokes your saga handler with:
- Source event
- Destination EventBooks (target domain states)
The SagaContext contains the target domain(s) aggregate states—not the source domain. This allows your saga to:
- Get correct sequence numbers for optimistic concurrency
- Make routing decisions based on target state
- Avoid stale sequence errors
# SagaContext contains target domain state (table), not source (player)
# Fetched by coordinator before invoking your handler
target_seq = context.get_sequence("table", table_id)
# Use in command
CommandPage(sequence=target_seq, command=cmd_any)
Commands with incorrect sequences are rejected, triggering automatic retry with fresh state.
Transactional Guarantees
CQRS/ES architectures cannot provide distributed ACID transactions. Instead:
- Design for success: Saga commands should not fail under normal operation
- Handle exceptions: Compensation flow handles the rare rejection cases
- Eventual consistency: Accept that cross-domain operations settle over time
If saga commands frequently fail, reconsider your domain boundaries.
Further Reading
- Message Translator Pattern — The pattern sagas implement
- Messages between Bounded Context — Cross-domain communication
- Choreography vs Orchestration — Sagas use choreography; Process Managers use orchestration
Next Steps
- Projectors — Building read models
- Process Managers — Stateful multi-domain coordination
- Error Recovery — DLQ, retry, compensation details
- Compensation Flow — Detailed compensation patterns