Sagas
A saga is a message translator that bridges bounded contexts. When an event occurs in one domain, a saga reacts by issuing commands to another domain.
Sagas are the bridge between domains. Each domain has its own aggregate, but aggregates don't communicate directly. Instead, sagas listen to events from one domain and generate commands for other domains.
Single Domain Subscription
Sagas should subscribe to ONE domain.
Multi-domain subscription creates:
- Ordering ambiguity (which event triggers first?)
- Duplicate processing
- Race conditions
If you need multi-domain subscription, use a Process Manager.
Saga Pattern
Every saga follows this pattern:
- Receive EventBook with domain events
- Filter for events this saga cares about
- Extract data needed to build commands or facts
- Create CommandBooks or FactBooks targeting other aggregates
- Return commands/facts (which Angzarr dispatches)
The dashed domain represents any target domain—sagas always bridge from one domain to another.
Example: Table-Hand Saga
When a table starts a hand, issue a DealCards command to the hand domain:
- Functional
- OO
EventRouter with explicit handler registration:
- Python
- C#
- Rust
- Java
- Go
- C++
@domain("table")
@output_domain("hand")
class TableHandSaga(Saga):
"""Saga that translates HandStarted events to DealCards commands.
Uses the OO pattern with @domain, @output_domain, @prepares, and @handles decorators.
"""
name = "saga-table-hand"
@prepares(table.HandStarted)
def prepare_hand_started(self, event: table.HandStarted) -> list[types.Cover]:
"""Declare the hand aggregate as destination."""
return [
types.Cover(
domain="hand",
root=types.UUID(value=event.hand_root),
)
]
@handles(table.HandStarted)
def handle_hand_started(
self,
event: table.HandStarted,
destinations: list[types.EventBook],
) -> types.CommandBook:
"""Translate HandStarted -> DealCards."""
# Get next sequence from destination state
dest_seq = next_sequence(destinations[0]) if destinations else 0
# Convert SeatSnapshot to PlayerInHand
players = [
hand.PlayerInHand(
player_root=seat.player_root,
position=seat.position,
stack=seat.stack,
)
for seat in event.active_players
]
# Build DealCards command
deal_cards = hand.DealCards(
table_root=event.hand_root,
hand_number=event.hand_number,
game_variant=event.game_variant,
dealer_position=event.dealer_position,
small_blind=event.small_blind,
big_blind=event.big_blind,
)
deal_cards.players.extend(players)
# Return pre-packed CommandBook for full control
from google.protobuf.any_pb2 import Any
cmd_any = Any()
cmd_any.Pack(deal_cards, type_url_prefix="type.googleapis.com/")
return types.CommandBook(
cover=types.Cover(
domain="hand",
root=types.UUID(value=event.hand_root),
),
pages=[
types.CommandPage(
sequence=dest_seq,
command=cmd_any,
)
],
)
private static object HandleHandStarted(HandStarted evt, List<EventBook> destinations)
{
// Sagas are stateless - destinations not used, framework stamps sequences
var players = evt
.ActivePlayers.Select(seat => new PlayerInHand
{
PlayerRoot = seat.PlayerRoot,
Position = seat.Position,
Stack = seat.Stack,
})
.ToList();
var dealCards = new DealCards
{
TableRoot = evt.HandRoot,
HandNumber = evt.HandNumber,
GameVariant = evt.GameVariant,
DealerPosition = evt.DealerPosition,
SmallBlind = evt.SmallBlind,
BigBlind = evt.BigBlind,
};
dealCards.Players.AddRange(players);
var cmdAny = EventRouter.PackCommand(dealCards);
return new CommandBook
{
Cover = new Cover
{
Domain = "hand",
Root = new UUID { Value = evt.HandRoot },
},
Pages =
{
new CommandPage
{
Header = new PageHeader { AngzarrDeferred = new AngzarrDeferredSequence() },
Command = cmdAny,
},
},
};
}
// docs:start:saga_oo
/// Saga handler for Table → Hand domain translation.
#[derive(Clone)]
struct TableHandSagaHandler;
impl SagaDomainHandler for TableHandSagaHandler {
fn event_types(&self) -> Vec<String> {
vec!["HandStarted".into()]
}
fn handle(&self, source: &EventBook, event: &Any) -> CommandResult<SagaHandlerResponse> {
if event.type_url.ends_with("HandStarted") {
return Self::handle_hand_started(source, event);
}
Ok(SagaHandlerResponse::default())
}
}
impl TableHandSagaHandler {
/// Translate HandStarted → DealCards.
///
/// Commands use deferred sequences - framework assigns on delivery.
fn handle_hand_started(
_source: &EventBook,
event_any: &Any,
) -> CommandResult<SagaHandlerResponse> {
let event: HandStarted = event_any.unpack().map_err(|e| {
CommandRejectedError::new(format!("Failed to decode HandStarted: {}", e))
})?;
// Convert SeatSnapshot to PlayerInHand
let players: Vec<PlayerInHand> = event
.active_players
.iter()
.map(|seat| PlayerInHand {
player_root: seat.player_root.clone(),
position: seat.position,
stack: seat.stack,
})
.collect();
// Build DealCards command
let deal_cards = DealCards {
table_root: event.hand_root.clone(), // The hand_root becomes the table_root reference
hand_number: event.hand_number,
game_variant: event.game_variant,
players,
dealer_position: event.dealer_position,
small_blind: event.small_blind,
big_blind: event.big_blind,
deck_seed: vec![], // Let the aggregate generate a random seed
};
let command_any = Any {
type_url: "type.googleapis.com/examples.DealCards".to_string(),
value: deal_cards.encode_to_vec(),
};
Ok(SagaHandlerResponse {
commands: vec![CommandBook {
cover: Some(Cover {
domain: "hand".to_string(),
root: Some(Uuid {
value: event.hand_root,
}),
..Default::default()
}),
// Framework will stamp angzarr_deferred with source info
// and assign sequence on delivery
pages: vec![CommandPage {
payload: Some(command_page::Payload::Command(command_any)),
..Default::default()
}],
}],
events: vec![],
})
}
}
// docs:end:saga_oo
public static CommandBook handleHandStarted(HandStarted event, List<EventBook> destinations) {
// Sagas are stateless - destinations not used, framework stamps sequences
List<PlayerInHand> players = new ArrayList<>();
for (SeatSnapshot seat : event.getActivePlayersList()) {
players.add(
PlayerInHand.newBuilder()
.setPlayerRoot(seat.getPlayerRoot())
.setPosition(seat.getPosition())
.setStack(seat.getStack())
.build());
}
DealCards dealCards =
DealCards.newBuilder()
.setTableRoot(event.getHandRoot())
.setHandNumber(event.getHandNumber())
.setGameVariant(event.getGameVariant())
.setDealerPosition(event.getDealerPosition())
.setSmallBlind(event.getSmallBlind())
.setBigBlind(event.getBigBlind())
.addAllPlayers(players)
.build();
return CommandBook.newBuilder()
.setCover(
Cover.newBuilder()
.setDomain("hand")
.setRoot(UUID.newBuilder().setValue(event.getHandRoot())))
.addPages(
CommandPage.newBuilder()
.setHeader(
PageHeader.newBuilder()
.setAngzarrDeferred(AngzarrDeferredSequence.newBuilder().build())
.build())
.setCommand(EventRouter.packCommand(dealCards)))
.build();
}
// NewTableHandSaga creates a new TableHandSaga with registered handlers.
func NewTableHandSaga() *TableHandSaga {
s := &TableHandSaga{}
s.Init("saga-table-hand", "table", "hand")
// Register event handler
s.Handles(s.handleHandStarted)
return s
}
// handleHandStarted translates HandStarted → DealCards.
// Sagas are stateless translators - framework handles sequence stamping.
func (s *TableHandSaga) handleHandStarted(
event *examples.HandStarted,
) (*pb.CommandBook, error) {
// Convert SeatSnapshot to PlayerInHand
players := make([]*examples.PlayerInHand, len(event.ActivePlayers))
for i, seat := range event.ActivePlayers {
players[i] = &examples.PlayerInHand{
PlayerRoot: seat.PlayerRoot,
Position: seat.Position,
Stack: seat.Stack,
}
}
// Build DealCards command
dealCards := &examples.DealCards{
TableRoot: event.HandRoot,
HandNumber: event.HandNumber,
GameVariant: event.GameVariant,
Players: players,
DealerPosition: event.DealerPosition,
SmallBlind: event.SmallBlind,
BigBlind: event.BigBlind,
}
cmdAny, err := anypb.New(dealCards)
if err != nil {
return nil, err
}
// Use angzarr_deferred - framework stamps sequence on delivery
return &pb.CommandBook{
Cover: &pb.Cover{
Domain: "hand",
Root: &pb.UUID{Value: event.HandRoot},
},
Pages: []*pb.CommandPage{
{
Header: &pb.PageHeader{SequenceType: &pb.PageHeader_AngzarrDeferred{AngzarrDeferred: &pb.AngzarrDeferredSequence{}}},
Payload: &pb.CommandPage_Command{Command: cmdAny},
},
},
}, nil
}
// Handle HandStarted: produce DealCards command for hand.
// Sagas are stateless translators - framework handles sequence stamping.
static std::vector<angzarr::CommandBook> handle_hand_started(
const google::protobuf::Any& event_any, const std::string& source_root,
const std::string& correlation_id, const std::vector<angzarr::EventBook>& destinations) {
(void)source_root;
(void)destinations; // Sagas are stateless - destinations not used
examples::HandStarted event;
event_any.UnpackTo(&event);
// Build DealCards command from HandStarted event
examples::DealCards deal_cards;
deal_cards.set_table_root(event.hand_root());
deal_cards.set_hand_number(event.hand_number());
deal_cards.set_game_variant(event.game_variant());
deal_cards.set_dealer_position(event.dealer_position());
deal_cards.set_small_blind(event.small_blind());
deal_cards.set_big_blind(event.big_blind());
// Add players from active players
for (const auto& seat : event.active_players()) {
auto* player = deal_cards.add_players();
player->set_player_root(seat.player_root());
player->set_position(seat.position());
player->set_stack(seat.stack());
}
// Pack command
google::protobuf::Any cmd_any;
cmd_any.PackFrom(deal_cards, "type.googleapis.com/");
// Build command book
angzarr::CommandBook cmd_book;
cmd_book.mutable_cover()->set_domain("hand");
cmd_book.mutable_cover()->mutable_root()->set_value(event.hand_root());
cmd_book.mutable_cover()->set_correlation_id(correlation_id);
auto* page = cmd_book.add_pages();
// Framework handles sequence stamping
page->mutable_header()->mutable_angzarr_deferred();
page->mutable_command()->CopyFrom(cmd_any);
return {std::move(cmd_book)};
}
Saga class with @prepares/@handles decorators:
- Python
- C#
- Rust
- Java
- Go
- C++
# docs:start:saga_handler
@domain("table")
@output_domain("hand")
class TableHandSaga(Saga):
"""Saga that translates HandStarted events to DealCards commands.
Uses the OO pattern with @domain, @output_domain, @prepares, and @handles decorators.
"""
name = "saga-table-hand"
@prepares(table.HandStarted)
def prepare_hand_started(self, event: table.HandStarted) -> list[types.Cover]:
"""Declare the hand aggregate as destination."""
return [
types.Cover(
domain="hand",
root=types.UUID(value=event.hand_root),
)
]
@handles(table.HandStarted)
def handle_hand_started(
self,
event: table.HandStarted,
destinations: list[types.EventBook],
) -> types.CommandBook:
"""Translate HandStarted -> DealCards."""
# Get next sequence from destination state
dest_seq = next_sequence(destinations[0]) if destinations else 0
# Convert SeatSnapshot to PlayerInHand
players = [
hand.PlayerInHand(
player_root=seat.player_root,
position=seat.position,
stack=seat.stack,
)
for seat in event.active_players
]
# Build DealCards command
deal_cards = hand.DealCards(
table_root=event.hand_root,
hand_number=event.hand_number,
game_variant=event.game_variant,
dealer_position=event.dealer_position,
small_blind=event.small_blind,
big_blind=event.big_blind,
)
deal_cards.players.extend(players)
# Return pre-packed CommandBook for full control
from google.protobuf.any_pb2 import Any
cmd_any = Any()
cmd_any.Pack(deal_cards, type_url_prefix="type.googleapis.com/")
return types.CommandBook(
cover=types.Cover(
domain="hand",
root=types.UUID(value=event.hand_root),
),
pages=[
types.CommandPage(
sequence=dest_seq,
command=cmd_any,
)
],
)
# docs:end:saga_handler
/// <summary>
/// Saga: Table -> Hand (OO Pattern)
///
/// Reacts to HandStarted events from Table domain.
/// Sends DealCards commands to Hand domain.
/// Sagas are stateless translators - framework handles sequence stamping.
///
/// Uses annotation-based handler registration with:
/// - [Handles(typeof(EventType))] for handle phase handlers
/// </summary>
public class TableHandSaga : Saga
{
public override string Name => "saga-table-hand";
public override string InputDomain => "table";
public override string OutputDomain => "hand";
/// <summary>
/// Handle phase: translate Table.HandStarted -> Hand.DealCards.
///
/// Called with the source event. Framework handles sequence stamping.
/// </summary>
[Handles(typeof(HandStarted))]
public CommandBook HandleHandStarted(HandStarted evt, List<EventBook> destinations)
{
// Sagas are stateless - destinations not used, framework stamps sequences
// Convert SeatSnapshot to PlayerInHand
var players = evt
.ActivePlayers.Select(seat => new PlayerInHand
{
PlayerRoot = seat.PlayerRoot,
Position = seat.Position,
Stack = seat.Stack,
})
.ToList();
// Build DealCards command
var dealCards = new DealCards
{
TableRoot = evt.HandRoot,
HandNumber = evt.HandNumber,
GameVariant = evt.GameVariant,
DealerPosition = evt.DealerPosition,
SmallBlind = evt.SmallBlind,
BigBlind = evt.BigBlind,
};
dealCards.Players.AddRange(players);
return new CommandBook
{
Cover = new Cover
{
Domain = "hand",
Root = new UUID { Value = evt.HandRoot },
},
Pages =
{
new CommandPage
{
Header = new PageHeader { AngzarrDeferred = new AngzarrDeferredSequence() },
Command = PackCommand(dealCards),
},
},
};
}
}
/// Saga handler for Table → Hand domain translation.
#[derive(Clone)]
struct TableHandSagaHandler;
impl SagaDomainHandler for TableHandSagaHandler {
fn event_types(&self) -> Vec<String> {
vec!["HandStarted".into()]
}
fn handle(&self, source: &EventBook, event: &Any) -> CommandResult<SagaHandlerResponse> {
if event.type_url.ends_with("HandStarted") {
return Self::handle_hand_started(source, event);
}
Ok(SagaHandlerResponse::default())
}
}
impl TableHandSagaHandler {
/// Translate HandStarted → DealCards.
///
/// Commands use deferred sequences - framework assigns on delivery.
fn handle_hand_started(
_source: &EventBook,
event_any: &Any,
) -> CommandResult<SagaHandlerResponse> {
let event: HandStarted = event_any.unpack().map_err(|e| {
CommandRejectedError::new(format!("Failed to decode HandStarted: {}", e))
})?;
// Convert SeatSnapshot to PlayerInHand
let players: Vec<PlayerInHand> = event
.active_players
.iter()
.map(|seat| PlayerInHand {
player_root: seat.player_root.clone(),
position: seat.position,
stack: seat.stack,
})
.collect();
// Build DealCards command
let deal_cards = DealCards {
table_root: event.hand_root.clone(), // The hand_root becomes the table_root reference
hand_number: event.hand_number,
game_variant: event.game_variant,
players,
dealer_position: event.dealer_position,
small_blind: event.small_blind,
big_blind: event.big_blind,
deck_seed: vec![], // Let the aggregate generate a random seed
};
let command_any = Any {
type_url: "type.googleapis.com/examples.DealCards".to_string(),
value: deal_cards.encode_to_vec(),
};
Ok(SagaHandlerResponse {
commands: vec![CommandBook {
cover: Some(Cover {
domain: "hand".to_string(),
root: Some(Uuid {
value: event.hand_root,
}),
..Default::default()
}),
// Framework will stamp angzarr_deferred with source info
// and assign sequence on delivery
pages: vec![CommandPage {
payload: Some(command_page::Payload::Command(command_any)),
..Default::default()
}],
}],
events: vec![],
})
}
}
/**
* Saga: Table -> Hand (OO Pattern)
*
* <p>Reacts to HandStarted events from Table domain. Sends DealCards commands to Hand domain.
*/
public class TableHandSaga extends Saga {
public TableHandSaga() {
super("saga-table-hand", "table", "hand");
}
@Prepares(HandStarted.class)
public List<Cover> prepareHandStarted(HandStarted event) {
return List.of(
Cover.newBuilder()
.setDomain("hand")
.setRoot(UUID.newBuilder().setValue(event.getHandRoot()))
.build());
}
@Handles(HandStarted.class)
public CommandBook handleHandStarted(HandStarted event, List<EventBook> destinations) {
int destSeq = Saga.nextSequence(destinations.isEmpty() ? null : destinations.get(0));
// Convert SeatSnapshot to PlayerInHand
List<PlayerInHand> players = new ArrayList<>();
for (SeatSnapshot seat : event.getActivePlayersList()) {
players.add(
PlayerInHand.newBuilder()
.setPlayerRoot(seat.getPlayerRoot())
.setPosition(seat.getPosition())
.setStack(seat.getStack())
.build());
}
// Build DealCards command
DealCards dealCards =
DealCards.newBuilder()
.setTableRoot(event.getHandRoot())
.setHandNumber(event.getHandNumber())
.setGameVariant(event.getGameVariant())
.setDealerPosition(event.getDealerPosition())
.setSmallBlind(event.getSmallBlind())
.setBigBlind(event.getBigBlind())
.addAllPlayers(players)
.build();
return CommandBook.newBuilder()
.setCover(
Cover.newBuilder()
.setDomain("hand")
.setRoot(UUID.newBuilder().setValue(event.getHandRoot())))
.addPages(
CommandPage.newBuilder()
.setHeader(PageHeader.newBuilder().setSequence(destSeq).build())
.setCommand(Any.pack(dealCards, "type.googleapis.com/")))
.build();
}
}
// docs:start:saga_handler
// NewTableHandSaga creates a new TableHandSaga with registered handlers.
func NewTableHandSaga() *TableHandSaga {
s := &TableHandSaga{}
s.Init("saga-table-hand", "table", "hand")
// Register event handler
s.Handles(s.handleHandStarted)
return s
}
// handleHandStarted translates HandStarted → DealCards.
// Sagas are stateless translators - framework handles sequence stamping.
func (s *TableHandSaga) handleHandStarted(
event *examples.HandStarted,
) (*pb.CommandBook, error) {
// Convert SeatSnapshot to PlayerInHand
players := make([]*examples.PlayerInHand, len(event.ActivePlayers))
for i, seat := range event.ActivePlayers {
players[i] = &examples.PlayerInHand{
PlayerRoot: seat.PlayerRoot,
Position: seat.Position,
Stack: seat.Stack,
}
}
// Build DealCards command
dealCards := &examples.DealCards{
TableRoot: event.HandRoot,
HandNumber: event.HandNumber,
GameVariant: event.GameVariant,
Players: players,
DealerPosition: event.DealerPosition,
SmallBlind: event.SmallBlind,
BigBlind: event.BigBlind,
}
cmdAny, err := anypb.New(dealCards)
if err != nil {
return nil, err
}
// Use angzarr_deferred - framework stamps sequence on delivery
return &pb.CommandBook{
Cover: &pb.Cover{
Domain: "hand",
Root: &pb.UUID{Value: event.HandRoot},
},
Pages: []*pb.CommandPage{
{
Header: &pb.PageHeader{SequenceType: &pb.PageHeader_AngzarrDeferred{AngzarrDeferred: &pb.AngzarrDeferredSequence{}}},
Payload: &pb.CommandPage_Command{Command: cmdAny},
},
},
}, nil
}
// docs:end:saga_handler
// Handle HandStarted: produce DealCards command for hand.
// Sagas are stateless translators - framework handles sequence stamping.
static std::vector<angzarr::CommandBook> handle_hand_started(
const google::protobuf::Any& event_any, const std::string& source_root,
const std::string& correlation_id, const std::vector<angzarr::EventBook>& destinations) {
(void)source_root;
(void)destinations; // Sagas are stateless - destinations not used
examples::HandStarted event;
event_any.UnpackTo(&event);
// Build DealCards command from HandStarted event
examples::DealCards deal_cards;
deal_cards.set_table_root(event.hand_root());
deal_cards.set_hand_number(event.hand_number());
deal_cards.set_game_variant(event.game_variant());
deal_cards.set_dealer_position(event.dealer_position());
deal_cards.set_small_blind(event.small_blind());
deal_cards.set_big_blind(event.big_blind());
// Add players from active players
for (const auto& seat : event.active_players()) {
auto* player = deal_cards.add_players();
player->set_player_root(seat.player_root());
player->set_position(seat.position());
player->set_stack(seat.stack());
}
// Pack command
google::protobuf::Any cmd_any;
cmd_any.PackFrom(deal_cards, "type.googleapis.com/");
// Build command book
angzarr::CommandBook cmd_book;
cmd_book.mutable_cover()->set_domain("hand");
cmd_book.mutable_cover()->mutable_root()->set_value(event.hand_root());
cmd_book.mutable_cover()->set_correlation_id(correlation_id);
auto* page = cmd_book.add_pages();
// Framework handles sequence stamping
page->mutable_header()->mutable_angzarr_deferred();
page->mutable_command()->CopyFrom(cmd_any);
return {std::move(cmd_book)};
}
EventRouter Registration
- Python
- C#
- Rust
- Java
- Go
- C++
if __name__ == "__main__":
handler = SagaHandler(TableHandSaga)
run_saga_server("saga-table-hand", "50411", handler, logger=logger)
public static EventRouter Create()
{
return new EventRouter("saga-table-hand")
.Domain("table")
.On<HandStarted>(HandleHandStarted);
}
let router = SagaRouter::new("saga-table-hand", "table", TableHandSagaHandler);
public static EventRouter createRouter() {
return new EventRouter("saga-table-hand")
.domain("table")
.on(HandStarted.class, TableHandRouter::handleHandStarted);
}
func main() {
saga := NewTableHandSaga()
angzarr.RunOOSagaServer("saga-table-hand", "50211", saga)
}
angzarr::EventRouter create_table_hand_router() {
return angzarr::EventRouter("saga-table-hand")
.domain("table")
.on("HandStarted", handle_hand_started);
}
Splitter Pattern
When one event should trigger commands to multiple different aggregates, return multiple CommandBook entries — one per target aggregate root. This is the splitter pattern.
Example: When a table settles, distribute payouts to multiple players:
def handle_table_settled(
event: table.TableSettled, context: SagaContext
) -> list[types.CommandBook]:
"""Split one event into commands for multiple player aggregates."""
commands = []
for payout in event.payouts:
cmd = player.TransferFunds(
table_root=event.table_root,
amount=payout.amount,
)
target_seq = context.get_sequence("player", payout.player_root)
commands.append(
types.CommandBook(
cover=types.Cover(
domain="player", root=types.UUID(value=payout.player_root)
),
pages=[types.CommandPage(sequence=target_seq, command=pack_any(cmd))],
)
)
return commands # One CommandBook per player
Each CommandBook targets a different aggregate root. The framework dispatches them independently — if one fails, others may still succeed (handle via compensation).
Compensation Flow
When a saga command is rejected (e.g., table is full), Angzarr routes a Notification back to the source aggregate:
1. Player emits FundsReserved
│
▼
2. Saga issues JoinTable → Table
│
▼
3. Table rejects: "table_full"
│
▼
4. Notification sent to Player
│
▼
5. Player handles rejection → emits FundsReleased
The source aggregate decides how to compensate based on the rejection reason.
- Python
- Rust
- Go
- Java
- C#
- C++
def handle_join_rejected(
notification: types.Notification,
state: PlayerState,
) -> types.EventBook | None:
"""Handle JoinTable rejection by releasing reserved funds.
Called when the JoinTable command (issued by saga-player-table after
FundsReserved) is rejected by the Table aggregate.
"""
from google.protobuf.any_pb2 import Any
# Extract rejection details from the notification payload
rejection = types.RejectionNotification()
if notification.payload:
notification.payload.Unpack(rejection)
# Extract table_root from the rejected command
table_root = b""
if rejection.rejected_command and rejection.rejected_command.cover:
if rejection.rejected_command.cover.root:
table_root = rejection.rejected_command.cover.root.value
# Release the funds that were reserved for this table
table_key = table_root.hex()
reserved_amount = state.table_reservations.get(table_key, 0)
new_reserved = state.reserved_funds - reserved_amount
new_available = state.bankroll - new_reserved
event = player.FundsReleased(
amount=poker_types.Currency(amount=reserved_amount, currency_code="CHIPS"),
table_root=table_root,
new_available_balance=poker_types.Currency(
amount=new_available, currency_code="CHIPS"
),
new_reserved_balance=poker_types.Currency(
amount=new_reserved, currency_code="CHIPS"
),
released_at=now(),
)
# Pack the event
event_any = Any()
event_any.Pack(event, type_url_prefix="type.googleapis.com/")
# Build the EventBook using the notification's cover for routing
return types.EventBook(
cover=notification.cover,
pages=[types.EventPage(header=types.PageHeader(sequence=0), event=event_any)],
)
/// Handle JoinTable rejection by releasing reserved funds.
///
/// Called when the JoinTable command (issued by saga-player-table after
/// FundsReserved) is rejected by the Table aggregate.
pub fn handle_join_rejected(
notification: &Notification,
state: &PlayerState,
) -> CommandResult<RejectionHandlerResponse> {
// Extract rejection details from the notification payload
let rejection = notification
.payload
.as_ref()
.and_then(|any| any.unpack::<RejectionNotification>().ok())
.unwrap_or_default();
warn!(
rejection_reason = %rejection.rejection_reason,
"Player compensation for JoinTable rejection"
);
// Extract table_root from the rejected command
let table_root = rejection
.rejected_command
.as_ref()
.and_then(|cmd| cmd.cover.as_ref())
.map(|cover| {
cover
.root
.as_ref()
.map(|r| r.value.clone())
.unwrap_or_default()
})
.unwrap_or_default();
// Release the funds that were reserved for this table
let table_key = hex::encode(&table_root);
let reserved_amount = state
.table_reservations
.get(&table_key)
.copied()
.unwrap_or(0);
let new_reserved = state.reserved_funds - reserved_amount;
let new_available = state.bankroll - new_reserved;
let event = FundsReleased {
amount: Some(Currency {
amount: reserved_amount,
currency_code: "CHIPS".to_string(),
}),
table_root,
new_available_balance: Some(Currency {
amount: new_available,
currency_code: "CHIPS".to_string(),
}),
new_reserved_balance: Some(Currency {
amount: new_reserved,
currency_code: "CHIPS".to_string(),
}),
released_at: Some(now()),
};
let event_any = pack_event(&event, "examples.FundsReleased");
// Build the EventBook using the notification's cover for routing.
// Sequence 0 is a placeholder - framework assigns actual sequence during persist.
let event_book = EventBook {
cover: notification.cover.clone(),
pages: vec![event_page(0, event_any)],
snapshot: None,
next_sequence: 0,
};
Ok(RejectionHandlerResponse {
events: Some(event_book),
notification: None,
})
}
// HandleTableJoinRejected handles compensation when a table join fails.
//
// Called when a saga/PM command targeting the table aggregate's JoinTable
// command is rejected. This releases the funds that were reserved for the
// failed table join.
func HandleTableJoinRejected(notification *pb.Notification, state PlayerState) *pb.BusinessResponse {
ctx := angzarr.NewCompensationContext(notification)
log.Printf("Player compensation for JoinTable rejection: reason=%s",
ctx.RejectionReason)
// Extract table_root from the rejected command
var tableRoot []byte
if ctx.RejectedCommand != nil && ctx.RejectedCommand.Cover != nil && ctx.RejectedCommand.Cover.Root != nil {
tableRoot = ctx.RejectedCommand.Cover.Root.Value
}
// Release the funds that were reserved for this table
tableKey := hex.EncodeToString(tableRoot)
reservedAmount := state.TableReservations[tableKey]
newReserved := state.ReservedFunds - reservedAmount
newAvailable := state.Bankroll - newReserved
event := &examples.FundsReleased{
Amount: &examples.Currency{Amount: reservedAmount, CurrencyCode: "CHIPS"},
TableRoot: tableRoot,
NewAvailableBalance: &examples.Currency{Amount: newAvailable, CurrencyCode: "CHIPS"},
NewReservedBalance: &examples.Currency{Amount: newReserved, CurrencyCode: "CHIPS"},
ReleasedAt: timestamppb.Now(),
}
eventAny, _ := anypb.New(event)
eventBook := &pb.EventBook{
Cover: notification.Cover,
Pages: []*pb.EventPage{
{
Payload: &pb.EventPage_Event{Event: eventAny},
},
},
}
return angzarr.EmitCompensationEvents(eventBook)
}
@RejectionHandler("JoinTable")
public FundsReleased handleJoinRejected(RejectionNotification rejection, PlayerState state) {
byte[] tableRoot = rejection.getRejectedCommand().getCover().getRoot().getValue();
long reserved = state.getTableReservations().get(tableRoot);
return FundsReleased.newBuilder()
.setAmount(reserved)
.setTableRoot(tableRoot)
.build();
}
[RejectionHandler("JoinTable")]
public FundsReleased HandleJoinRejected(RejectionNotification rejection, PlayerState state)
{
var tableRoot = rejection.RejectedCommand.Cover.Root.Value;
var reserved = state.TableReservations[tableRoot];
return new FundsReleased { Amount = reserved, TableRoot = tableRoot };
}
examples::FundsReleased handle_join_rejected(const angzarr::Notification& notification,
const PlayerState& state) {
// Extract compensation context from the notification
auto ctx = angzarr::CompensationContext::from_notification(notification);
// Get table_root from the notification cover (the target aggregate)
std::string table_key;
if (notification.has_cover() && notification.cover().has_root() &&
!notification.cover().root().value().empty()) {
// Convert root bytes to hex string for lookup
const std::string& root = notification.cover().root().value();
table_key.reserve(root.size() * 2);
for (unsigned char c : root) {
static const char hex[] = "0123456789abcdef";
table_key.push_back(hex[c >> 4]);
table_key.push_back(hex[c & 0x0f]);
}
}
// Get the amount reserved for this table
int64_t reserved_amount = 0;
auto it = state.table_reservations.find(table_key);
if (it != state.table_reservations.end()) {
reserved_amount = it->second;
}
// Compute new balances after release
int64_t new_reserved = state.reserved_funds - reserved_amount;
int64_t new_available = state.bankroll - new_reserved;
// Build FundsReleased event
examples::FundsReleased event;
event.mutable_amount()->set_amount(reserved_amount);
event.mutable_amount()->set_currency_code("CHIPS");
if (notification.has_cover() && notification.cover().has_root()) {
event.set_table_root(notification.cover().root().value());
}
event.mutable_new_available_balance()->set_amount(new_available);
event.mutable_new_available_balance()->set_currency_code("CHIPS");
event.mutable_new_reserved_balance()->set_amount(new_reserved);
event.mutable_new_reserved_balance()->set_currency_code("CHIPS");
auto now = std::chrono::system_clock::now();
auto seconds = std::chrono::duration_cast<std::chrono::seconds>(now.time_since_epoch()).count();
event.mutable_released_at()->set_seconds(seconds);
return event;
}
For complex scenarios (DLQ routing, escalation webhooks, RevocationResponse flags), see Error Recovery.
Commands vs Facts
Sagas can emit either commands or facts to target domains:
| Output | Can Reject | Use Case |
|---|---|---|
| Command | Yes | Request action that may fail validation |
| Fact | No | Assert external reality the aggregate must accept |
Commands go through the target aggregate's guard/validate/compute flow and can be rejected. Facts bypass validation entirely—the aggregate must accept them.
# Command: request an action (can be rejected)
def handle_hand_started(event: HandStarted, destinations: list[EventBook]):
return CommandBook(
cover=Cover(domain="player", root=event.player_id),
pages=[CommandPage(sequence=dest_seq, command=DeductBlinds(amount=50))],
)
# Fact: assert reality (cannot be rejected)
def handle_turn_assigned(event: TurnAssigned, destinations: list[EventBook]):
return FactBook(
cover=Cover(domain="player", root=event.player_id, external_id=f"turn:{event.hand_id}"),
pages=[FactPage(fact=FactSequence(source="hand"), event=YourTurn(hand_id=event.hand_id))],
)
Use facts when the source domain has authority the target must accept—tournament seating, dealer rulings, external system confirmations. See Commands vs Facts for details.
Sequence Handling
Sagas MUST set correct sequence numbers on commands. The framework validates sequences for optimistic concurrency.
Two-Phase Saga Flow
The saga coordinator uses a two-phase flow to provide target domain context:
Phase 1: Coordinator receives source event
↓
Coordinator calls prepare handler to get destination covers
↓
Coordinator fetches EventBooks for declared destinations
↓
Phase 2: Coordinator invokes your saga handler with:
- Source event
- Destination EventBooks (target domain states)
The SagaContext contains the target domain(s) aggregate states—not the source domain. This allows your saga to:
- Get correct sequence numbers for optimistic concurrency
- Make routing decisions based on target state
- Avoid stale sequence errors
# SagaContext contains target domain state (table), not source (player)
# Fetched by coordinator before invoking your handler
target_seq = context.get_sequence("table", table_id)
# Use in command
CommandPage(sequence=target_seq, command=cmd_any)
Commands with incorrect sequences are rejected, triggering automatic retry with fresh state.
Transactional Guarantees
CQRS/ES architectures cannot provide distributed ACID transactions. Instead:
- Design for success: Saga commands should not fail under normal operation
- Handle exceptions: Compensation flow handles the rare rejection cases
- Eventual consistency: Accept that cross-domain operations settle over time
If saga commands frequently fail, reconsider your domain boundaries.
Further Reading
- Message Translator Pattern — The pattern sagas implement
- Messages between Bounded Context — Cross-domain communication
- Choreography vs Orchestration — Sagas use choreography; Process Managers use orchestration
Next Steps
- Commands vs Facts — When to emit facts instead of commands
- Projectors — Building read models
- Process Managers — Stateful multi-domain coordination
- Error Recovery — DLQ, retry, compensation details
- Compensation Flow — Detailed compensation patterns