Process Managers
A process manager coordinates multi-domain workflows with state tracking. Unlike stateless sagas, process managers maintain their own event stream keyed by correlation ID, enabling complex orchestration patterns.
When to Use Process Managers
| Use Case | Saga | Process Manager |
|---|---|---|
| Single event → single command | ✓ | |
| Fan-out to multiple domains | ✓ | |
| Multi-step workflow with state | ✓ | |
| Events from multiple domains | ✓ | |
| State machine transitions | ✓ | |
| Timeouts and retries | ✓ |
Warning: Process Manager Anti-Patterns
Over-reliance on process managers is a foot-gun. If you find yourself:
- Putting significant business logic in a PM
- Building PMs with large, complex state
- Creating PMs for workflows within a single domain
- Reaching for PMs as a first solution
Your domain factoring is probably wrong.
Process managers should be lightweight state machines coordinating across domains—not business logic containers. They answer "what phase are we in?" and "what happens next?", not "how do we calculate X?"
| Symptom | Likely Problem |
|---|---|
| PM has complex validation logic | Logic belongs in domain aggregate |
| PM state mirrors aggregate state | Redundant—query the aggregate |
| PM handles single-domain workflow | Use saga or aggregate instead |
| PM event stream grows large | Too much responsibility—split domains |
Rule of thumb: If a PM does more than track phases and dispatch commands, reconsider your architecture. The PM's job is orchestration, not computation.
Implementation Styles
Angzarr supports two process manager implementation styles:
| Style | Description | Best For |
|---|---|---|
| Functional | ProcessManagerRouter with function handlers | Simple routing, explicit |
| OO | ProcessManager class with @handles/[Handles] decorators | Rich state, encapsulation |
Example: Hand Flow PM
The HandFlowPM orchestrates poker hand phases across table and hand domains:
- Functional
- OO
ProcessManagerRouter with explicit event handler registration:
State Definition
- Python
- Go
- Rust
- Java
- C#
- C++
class HandPhase(Enum):
AWAITING_DEAL = "awaiting_deal"
DEALING = "dealing"
BLINDS = "blinds"
BETTING = "betting"
COMPLETE = "complete"
@dataclass
class HandFlowState:
"""PM state - tracks workflow progress."""
hand_id: str = ""
phase: HandPhase = HandPhase.AWAITING_DEAL
player_count: int = 0
// PMState is the PM's aggregate state (rebuilt from its own events).
// For simplicity in this example, we use a minimal state.
type PMState struct {
HandRoot []byte
HandInProgress bool
}
#[derive(Default, Clone)]
pub struct HandFlowState {
hand_root: Vec<u8>,
hand_number: i64,
phase: HandPhase,
blinds_posted: u32,
}
#[derive(Default, PartialEq, Clone, Copy)]
pub enum HandPhase {
#[default]
AwaitingDeal,
Dealing,
Blinds,
Betting,
Complete,
}
enum HandPhase {
AWAITING_DEAL, DEALING, BLINDS, BETTING, COMPLETE
}
class HandFlowState {
private String handId = "";
private HandPhase phase = HandPhase.AWAITING_DEAL;
private int playerCount = 0;
public String getHandId() { return handId; }
public void setHandId(String handId) { this.handId = handId; }
public HandPhase getPhase() { return phase; }
public void setPhase(HandPhase phase) { this.phase = phase; }
public int getPlayerCount() { return playerCount; }
public void setPlayerCount(int playerCount) { this.playerCount = playerCount; }
}
public enum HandPhase
{
AwaitingDeal,
Dealing,
Blinds,
Betting,
Complete,
}
public class HandFlowState
{
public string HandId { get; set; } = "";
public HandPhase Phase { get; set; } = HandPhase.AwaitingDeal;
public int PlayerCount { get; set; } = 0;
}
enum class HandPhase { AwaitingDeal, Dealing, Blinds, Betting, Complete };
struct HandFlowState {
std::string hand_id;
HandPhase phase = HandPhase::AwaitingDeal;
int32_t player_count = 0;
};
Handler Implementation
- Python
- Go
- Rust
- Java
- C#
- C++
class HandFlowPM(ProcessManager[HandFlowState]):
"""OO-style process manager using decorators."""
name = "pmg-hand-flow"
def _create_empty_state(self) -> HandFlowState:
return HandFlowState()
def _apply_event(self, state: HandFlowState, event_any: Any) -> None:
"""Apply PM's own events to rebuild state."""
type_url = event_any.type_url
if type_url.endswith("HandFlowStarted"):
# In production, unpack and apply
pass
elif type_url.endswith("PhaseTransitioned"):
pass
@prepares(table.HandStarted)
def prepare_hand_started(self, event: table.HandStarted) -> list[Cover]:
"""Declare hand destination needed when hand starts."""
return [
Cover(
domain="hand",
root=Uuid(value=event.hand_root),
)
]
@output_domain("hand")
@handles(table.HandStarted, input_domain="table")
def on_hand_started(
self, event: table.HandStarted, destinations: list[EventBook]
) -> Optional[hand.DealCards]:
"""Table started a hand -> send DealCards to hand domain."""
# Update local state
self.state.hand_id = event.hand_id
self.state.phase = HandPhase.DEALING
self.state.player_count = event.player_count
return hand.DealCards(
hand_id=event.hand_id,
player_count=event.player_count,
)
@output_domain("hand")
@handles(hand.CardsDealt, input_domain="hand")
def on_cards_dealt(self, event: hand.CardsDealt) -> Optional[hand.PostBlinds]:
"""Cards dealt -> post blinds."""
self.state.phase = HandPhase.BLINDS
return hand.PostBlinds(hand_id=self.state.hand_id)
@output_domain("table")
@handles(hand.HandComplete, input_domain="hand")
def on_hand_complete(self, event: hand.HandComplete) -> Optional[table.EndHand]:
"""Hand complete -> end hand on table."""
self.state.phase = HandPhase.COMPLETE
return table.EndHand(
hand_id=self.state.hand_id,
winner_id=event.winner_id,
)
package main
import (
angzarr "github.com/benjaminabbitt/angzarr/client/go"
pb "github.com/benjaminabbitt/angzarr/client/go/proto/angzarr"
"github.com/benjaminabbitt/angzarr/client/go/proto/examples"
)
// docs:start:pm_state_oo
// PMState is the PM's aggregate state (rebuilt from its own events).
// For simplicity in this example, we use a minimal state.
type PMState struct {
HandRoot []byte
HandInProgress bool
}
// docs:end:pm_state_oo
// HandFlowPM is the OO-style process manager for hand flow orchestration.
type HandFlowPM struct {
angzarr.ProcessManagerBase[*PMState]
}
// NewHandFlowPM creates a new HandFlowPM with all handlers registered.
func NewHandFlowPM() *HandFlowPM {
pm := &HandFlowPM{}
pm.Init("pmg-hand-flow", "pmg-hand-flow", []string{"table", "hand"})
pm.WithStateFactory(func() *PMState { return &PMState{} })
// Register prepare handlers
pm.Prepares(pm.prepareHandStarted)
// Register event handlers
pm.Handles(pm.handleHandStarted)
pm.Handles(pm.handleCardsDealt)
pm.Handles(pm.handleBlindPosted)
pm.Handles(pm.handleActionTaken)
pm.Handles(pm.handleCommunityDealt)
pm.Handles(pm.handlePotAwarded)
return pm
}
// prepareHandStarted declares the hand destination needed when a hand starts.
func (pm *HandFlowPM) prepareHandStarted(
trigger *pb.EventBook,
state *PMState,
event *examples.HandStarted,
) []*pb.Cover {
return []*pb.Cover{{
Domain: "hand",
Root: &pb.UUID{Value: event.HandRoot},
}}
}
// handleHandStarted processes the HandStarted event.
func (pm *HandFlowPM) handleHandStarted(
trigger *pb.EventBook,
state *PMState,
event *examples.HandStarted,
dests []*pb.EventBook,
) ([]*pb.CommandBook, *pb.EventBook, error) {
// Initialize hand process (not persisted in this simplified version).
// The saga-table-hand will send DealCards, so we don't emit commands here.
return nil, nil, nil
}
// handleCardsDealt processes the CardsDealt event.
func (pm *HandFlowPM) handleCardsDealt(
trigger *pb.EventBook,
state *PMState,
event *examples.CardsDealt,
dests []*pb.EventBook,
) ([]*pb.CommandBook, *pb.EventBook, error) {
// Post small blind command.
// In a real implementation, we'd track state to know which blind to post.
// For now, we assume the hand aggregate tracks this.
return nil, nil, nil
}
// handleBlindPosted processes the BlindPosted event.
func (pm *HandFlowPM) handleBlindPosted(
trigger *pb.EventBook,
state *PMState,
event *examples.BlindPosted,
dests []*pb.EventBook,
) ([]*pb.CommandBook, *pb.EventBook, error) {
// In a full implementation, we'd check if both blinds are posted
// and then start the betting round.
return nil, nil, nil
}
// handleActionTaken processes the ActionTaken event.
func (pm *HandFlowPM) handleActionTaken(
trigger *pb.EventBook,
state *PMState,
event *examples.ActionTaken,
dests []*pb.EventBook,
) ([]*pb.CommandBook, *pb.EventBook, error) {
// In a full implementation, we'd check if betting is complete
// and advance to the next phase.
return nil, nil, nil
}
// handleCommunityDealt processes the CommunityCardsDealt event.
func (pm *HandFlowPM) handleCommunityDealt(
trigger *pb.EventBook,
state *PMState,
event *examples.CommunityCardsDealt,
dests []*pb.EventBook,
) ([]*pb.CommandBook, *pb.EventBook, error) {
// Start new betting round after community cards.
return nil, nil, nil
}
// handlePotAwarded processes the PotAwarded event.
func (pm *HandFlowPM) handlePotAwarded(
trigger *pb.EventBook,
state *PMState,
event *examples.PotAwarded,
dests []*pb.EventBook,
) ([]*pb.CommandBook, *pb.EventBook, error) {
// Hand is complete. Clean up.
return nil, nil, nil
}
/// Process manager handler for hand flow orchestration.
///
/// Listens to events from both table and hand domains to coordinate
/// the poker hand lifecycle.
struct HandFlowPmHandler;
impl ProcessManagerDomainHandler<HandFlowState> for HandFlowPmHandler {
fn event_types(&self) -> Vec<String> {
vec![
"HandStarted".into(),
"CardsDealt".into(),
"BlindPosted".into(),
"ActionTaken".into(),
"CommunityCardsDealt".into(),
"PotAwarded".into(),
"HandComplete".into(),
]
}
fn prepare(&self, _trigger: &EventBook, _state: &HandFlowState, event: &Any) -> Vec<Cover> {
// Declare destinations needed based on the triggering event
if event.type_url.ends_with("HandStarted") {
if let Ok(evt) = event.unpack::<HandStarted>() {
// We'll need access to the hand aggregate
return vec![Cover {
domain: "hand".to_string(),
root: Some(Uuid {
value: evt.hand_root,
}),
..Default::default()
}];
}
}
vec![]
}
fn handle(
&self,
_trigger: &EventBook,
state: &HandFlowState,
event: &Any,
_destinations: &[EventBook],
) -> CommandResult<ProcessManagerResponse> {
// Clone state for mutation (PM state is rebuilt from events, not mutated in-place)
let mut local_state = state.clone();
let type_url = &event.type_url;
if type_url.ends_with("HandStarted") {
return self.handle_hand_started(&mut local_state, event);
} else if type_url.ends_with("CardsDealt") {
return self.handle_cards_dealt(&mut local_state, event);
} else if type_url.ends_with("BlindPosted") {
return self.handle_blind_posted(&mut local_state, event);
} else if type_url.ends_with("ActionTaken") {
return self.handle_action_taken(&mut local_state, event);
} else if type_url.ends_with("CommunityCardsDealt") {
return self.handle_community_dealt(&mut local_state, event);
} else if type_url.ends_with("PotAwarded") {
return self.handle_pot_awarded(&mut local_state, event);
} else if type_url.ends_with("HandComplete") {
return self.handle_hand_complete(&mut local_state, event);
}
Ok(ProcessManagerResponse::default())
}
}
impl HandFlowPmHandler {
/// Handle HandStarted event from table domain.
///
/// Initializes the PM state for tracking this hand.
/// The saga-table-hand handles sending DealCards, so we don't emit commands here.
fn handle_hand_started(
&self,
state: &mut HandFlowState,
event_any: &Any,
) -> CommandResult<ProcessManagerResponse> {
let event: HandStarted = event_any.unpack().map_err(|e| {
CommandRejectedError::new(format!("Failed to decode HandStarted: {}", e))
})?;
state.hand_root = event.hand_root;
state.hand_number = event.hand_number;
state.phase = HandPhase::Dealing;
// No commands - saga-table-hand sends DealCards
Ok(ProcessManagerResponse::default())
}
/// Handle CardsDealt event from hand domain.
///
/// Cards have been dealt, waiting for blinds.
fn handle_cards_dealt(
&self,
state: &mut HandFlowState,
event_any: &Any,
) -> CommandResult<ProcessManagerResponse> {
let _event: CardsDealt = event_any.unpack().map_err(|e| {
CommandRejectedError::new(format!("Failed to decode CardsDealt: {}", e))
})?;
state.phase = HandPhase::Blinds;
// Blinds are posted by players/coordinator, not by PM
Ok(ProcessManagerResponse::default())
}
/// Handle BlindPosted event from hand domain.
fn handle_blind_posted(
&self,
state: &mut HandFlowState,
event_any: &Any,
) -> CommandResult<ProcessManagerResponse> {
let _event: BlindPosted = event_any.unpack().map_err(|e| {
CommandRejectedError::new(format!("Failed to decode BlindPosted: {}", e))
})?;
state.blinds_posted += 1;
// In a full implementation, check if all blinds posted then start betting
if state.blinds_posted >= 2 {
state.phase = HandPhase::Betting;
}
Ok(ProcessManagerResponse::default())
}
/// Handle ActionTaken event from hand domain.
fn handle_action_taken(
&self,
state: &mut HandFlowState,
event_any: &Any,
) -> CommandResult<ProcessManagerResponse> {
let _event: ActionTaken = event_any.unpack().map_err(|e| {
CommandRejectedError::new(format!("Failed to decode ActionTaken: {}", e))
})?;
// In a full implementation, track betting round progress
// and advance phases when rounds complete
let _ = state; // State tracking would go here
Ok(ProcessManagerResponse::default())
}
/// Handle CommunityCardsDealt event from hand domain.
fn handle_community_dealt(
&self,
_state: &mut HandFlowState,
event_any: &Any,
) -> CommandResult<ProcessManagerResponse> {
let _event: CommunityCardsDealt = event_any.unpack().map_err(|e| {
CommandRejectedError::new(format!("Failed to decode CommunityCardsDealt: {}", e))
})?;
// New betting round starts after community cards
Ok(ProcessManagerResponse::default())
}
/// Handle PotAwarded event from hand domain.
fn handle_pot_awarded(
&self,
state: &mut HandFlowState,
event_any: &Any,
) -> CommandResult<ProcessManagerResponse> {
let _event: PotAwarded = event_any.unpack().map_err(|e| {
CommandRejectedError::new(format!("Failed to decode PotAwarded: {}", e))
})?;
state.phase = HandPhase::Complete;
Ok(ProcessManagerResponse::default())
}
/// Handle HandComplete event from hand domain.
fn handle_hand_complete(
&self,
state: &mut HandFlowState,
event_any: &Any,
) -> CommandResult<ProcessManagerResponse> {
let _event: HandComplete = event_any.unpack().map_err(|e| {
CommandRejectedError::new(format!("Failed to decode HandComplete: {}", e))
})?;
state.phase = HandPhase::Complete;
// The saga-hand-table handles sending EndHand to table domain
Ok(ProcessManagerResponse::default())
}
}
public class HandFlowPM extends ProcessManager<HandFlowState> {
@Handles(HandStarted.class)
public List<CommandBook> handleHandStarted(HandStarted event, HandFlowState state) {
state.setHandId(event.getHandId());
state.setPhase(HandPhase.DEALING);
state.setPlayerCount(event.getPlayerCount());
return List.of(buildCommand("hand", DealCards.newBuilder()
.setHandId(event.getHandId())
.setPlayerCount(event.getPlayerCount())
.build()));
}
@Handles(CardsDealt.class)
public List<CommandBook> handleCardsDealt(CardsDealt event, HandFlowState state) {
state.setPhase(HandPhase.BLINDS);
return List.of(buildCommand("hand", PostBlinds.newBuilder()
.setHandId(state.getHandId())
.build()));
}
@Handles(HandComplete.class)
public List<CommandBook> handleHandComplete(HandComplete event, HandFlowState state) {
state.setPhase(HandPhase.COMPLETE);
return List.of(buildCommand("table", EndHand.newBuilder()
.setHandId(state.getHandId())
.setWinnerId(event.getWinnerId())
.build()));
}
}
public class HandFlowPM : ProcessManager<HandFlowState>
{
[Handles(typeof(HandStarted))]
public IEnumerable<CommandBook> HandleHandStarted(HandStarted evt, HandFlowState state)
{
state.HandId = evt.HandId;
state.Phase = HandPhase.Dealing;
state.PlayerCount = evt.PlayerCount;
yield return BuildCommand(
"hand",
new DealCards { HandId = evt.HandId, PlayerCount = evt.PlayerCount }
);
}
[Handles(typeof(CardsDealt))]
public IEnumerable<CommandBook> HandleCardsDealt(CardsDealt evt, HandFlowState state)
{
state.Phase = HandPhase.Blinds;
yield return BuildCommand("hand", new PostBlinds { HandId = state.HandId });
}
[Handles(typeof(HandComplete))]
public IEnumerable<CommandBook> HandleHandComplete(HandComplete evt, HandFlowState state)
{
state.Phase = HandPhase.Complete;
yield return BuildCommand(
"table",
new EndHand { HandId = state.HandId, WinnerId = evt.WinnerId }
);
}
}
class HandFlowPM : public ProcessManager<HandFlowState> {
public:
std::vector<CommandBook> handle_hand_started(const HandStarted& event, HandFlowState& state) {
state.hand_id = event.hand_id();
state.phase = HandPhase::Dealing;
state.player_count = event.player_count();
DealCards cmd;
cmd.set_hand_id(event.hand_id());
cmd.set_player_count(event.player_count());
return {build_command("hand", cmd)};
}
std::vector<CommandBook> handle_cards_dealt(const CardsDealt& event, HandFlowState& state) {
state.phase = HandPhase::Blinds;
PostBlinds cmd;
cmd.set_hand_id(state.hand_id);
return {build_command("hand", cmd)};
}
std::vector<CommandBook> handle_hand_complete(const HandComplete& event, HandFlowState& state) {
state.phase = HandPhase::Complete;
EndHand cmd;
cmd.set_hand_id(state.hand_id);
cmd.set_winner_id(event.winner_id());
return {build_command("table", cmd)};
}
};
ProcessManager class with decorator-based handler registration:
State Definition
- Python
- Go
- Rust
- Java
- C#
- C++
class HandPhase(Enum):
AWAITING_DEAL = "awaiting_deal"
DEALING = "dealing"
BLINDS = "blinds"
BETTING = "betting"
COMPLETE = "complete"
@dataclass
class HandFlowState:
"""PM state - tracks workflow progress."""
hand_id: str = ""
phase: HandPhase = HandPhase.AWAITING_DEAL
player_count: int = 0
// PMState is the PM's aggregate state (rebuilt from its own events).
// For simplicity in this example, we use a minimal state.
type PMState struct {
HandRoot []byte
HandInProgress bool
}
#[derive(Default, Clone)]
pub struct HandFlowState {
hand_root: Vec<u8>,
hand_number: i64,
phase: HandPhase,
blinds_posted: u32,
}
#[derive(Default, PartialEq, Clone, Copy)]
pub enum HandPhase {
#[default]
AwaitingDeal,
Dealing,
Blinds,
Betting,
Complete,
}
private enum HandPhase {
DEALING,
POSTING_BLINDS,
BETTING,
DEALING_COMMUNITY,
COMPLETE
}
private static class HandProcess {
String handId;
byte[] handRoot;
long handNumber;
int dealerPosition;
int smallBlindPosition;
int bigBlindPosition;
long smallBlind;
long bigBlind;
HandPhase phase = HandPhase.DEALING;
Map<Integer, PlayerState> players = new HashMap<>();
List<Integer> activePositions = new ArrayList<>();
boolean smallBlindPosted;
boolean bigBlindPosted;
long currentBet;
long minRaise;
long potTotal;
int actionOn;
int lastAggressor;
Instant actionStartedAt;
int communityCardCount;
int bettingPhase;
}
private static class PlayerState {
byte[] playerRoot;
int position;
long stack;
long betThisRound;
long totalInvested;
boolean hasActed;
boolean hasFolded;
boolean allIn;
}
/// <summary>
/// PM's aggregate state (rebuilt from its own events).
/// For simplicity in this example, we use a minimal state.
/// </summary>
public class PMState
{
public byte[]? HandRoot { get; set; }
public bool HandInProgress { get; set; }
}
/// Process manager state for a single hand.
struct PMState {
std::string hand_root;
bool hand_in_progress = false;
};
Handler Implementation
- Python
- Go
- Rust
- Java
- C#
- C++
class HandFlowPM(ProcessManager[HandFlowState]):
"""OO-style process manager using decorators."""
name = "pmg-hand-flow"
def _create_empty_state(self) -> HandFlowState:
return HandFlowState()
def _apply_event(self, state: HandFlowState, event_any: Any) -> None:
"""Apply PM's own events to rebuild state."""
type_url = event_any.type_url
if type_url.endswith("HandFlowStarted"):
# In production, unpack and apply
pass
elif type_url.endswith("PhaseTransitioned"):
pass
@prepares(table.HandStarted)
def prepare_hand_started(self, event: table.HandStarted) -> list[Cover]:
"""Declare hand destination needed when hand starts."""
return [
Cover(
domain="hand",
root=Uuid(value=event.hand_root),
)
]
@output_domain("hand")
@handles(table.HandStarted, input_domain="table")
def on_hand_started(
self, event: table.HandStarted, destinations: list[EventBook]
) -> Optional[hand.DealCards]:
"""Table started a hand -> send DealCards to hand domain."""
# Update local state
self.state.hand_id = event.hand_id
self.state.phase = HandPhase.DEALING
self.state.player_count = event.player_count
return hand.DealCards(
hand_id=event.hand_id,
player_count=event.player_count,
)
@output_domain("hand")
@handles(hand.CardsDealt, input_domain="hand")
def on_cards_dealt(self, event: hand.CardsDealt) -> Optional[hand.PostBlinds]:
"""Cards dealt -> post blinds."""
self.state.phase = HandPhase.BLINDS
return hand.PostBlinds(hand_id=self.state.hand_id)
@output_domain("table")
@handles(hand.HandComplete, input_domain="hand")
def on_hand_complete(self, event: hand.HandComplete) -> Optional[table.EndHand]:
"""Hand complete -> end hand on table."""
self.state.phase = HandPhase.COMPLETE
return table.EndHand(
hand_id=self.state.hand_id,
winner_id=event.winner_id,
)
package main
import (
angzarr "github.com/benjaminabbitt/angzarr/client/go"
pb "github.com/benjaminabbitt/angzarr/client/go/proto/angzarr"
"github.com/benjaminabbitt/angzarr/client/go/proto/examples"
)
// docs:start:pm_state_oo
// PMState is the PM's aggregate state (rebuilt from its own events).
// For simplicity in this example, we use a minimal state.
type PMState struct {
HandRoot []byte
HandInProgress bool
}
// docs:end:pm_state_oo
// HandFlowPM is the OO-style process manager for hand flow orchestration.
type HandFlowPM struct {
angzarr.ProcessManagerBase[*PMState]
}
// NewHandFlowPM creates a new HandFlowPM with all handlers registered.
func NewHandFlowPM() *HandFlowPM {
pm := &HandFlowPM{}
pm.Init("pmg-hand-flow", "pmg-hand-flow", []string{"table", "hand"})
pm.WithStateFactory(func() *PMState { return &PMState{} })
// Register prepare handlers
pm.Prepares(pm.prepareHandStarted)
// Register event handlers
pm.Handles(pm.handleHandStarted)
pm.Handles(pm.handleCardsDealt)
pm.Handles(pm.handleBlindPosted)
pm.Handles(pm.handleActionTaken)
pm.Handles(pm.handleCommunityDealt)
pm.Handles(pm.handlePotAwarded)
return pm
}
// prepareHandStarted declares the hand destination needed when a hand starts.
func (pm *HandFlowPM) prepareHandStarted(
trigger *pb.EventBook,
state *PMState,
event *examples.HandStarted,
) []*pb.Cover {
return []*pb.Cover{{
Domain: "hand",
Root: &pb.UUID{Value: event.HandRoot},
}}
}
// handleHandStarted processes the HandStarted event.
func (pm *HandFlowPM) handleHandStarted(
trigger *pb.EventBook,
state *PMState,
event *examples.HandStarted,
dests []*pb.EventBook,
) ([]*pb.CommandBook, *pb.EventBook, error) {
// Initialize hand process (not persisted in this simplified version).
// The saga-table-hand will send DealCards, so we don't emit commands here.
return nil, nil, nil
}
// handleCardsDealt processes the CardsDealt event.
func (pm *HandFlowPM) handleCardsDealt(
trigger *pb.EventBook,
state *PMState,
event *examples.CardsDealt,
dests []*pb.EventBook,
) ([]*pb.CommandBook, *pb.EventBook, error) {
// Post small blind command.
// In a real implementation, we'd track state to know which blind to post.
// For now, we assume the hand aggregate tracks this.
return nil, nil, nil
}
// handleBlindPosted processes the BlindPosted event.
func (pm *HandFlowPM) handleBlindPosted(
trigger *pb.EventBook,
state *PMState,
event *examples.BlindPosted,
dests []*pb.EventBook,
) ([]*pb.CommandBook, *pb.EventBook, error) {
// In a full implementation, we'd check if both blinds are posted
// and then start the betting round.
return nil, nil, nil
}
// handleActionTaken processes the ActionTaken event.
func (pm *HandFlowPM) handleActionTaken(
trigger *pb.EventBook,
state *PMState,
event *examples.ActionTaken,
dests []*pb.EventBook,
) ([]*pb.CommandBook, *pb.EventBook, error) {
// In a full implementation, we'd check if betting is complete
// and advance to the next phase.
return nil, nil, nil
}
// handleCommunityDealt processes the CommunityCardsDealt event.
func (pm *HandFlowPM) handleCommunityDealt(
trigger *pb.EventBook,
state *PMState,
event *examples.CommunityCardsDealt,
dests []*pb.EventBook,
) ([]*pb.CommandBook, *pb.EventBook, error) {
// Start new betting round after community cards.
return nil, nil, nil
}
// handlePotAwarded processes the PotAwarded event.
func (pm *HandFlowPM) handlePotAwarded(
trigger *pb.EventBook,
state *PMState,
event *examples.PotAwarded,
dests []*pb.EventBook,
) ([]*pb.CommandBook, *pb.EventBook, error) {
// Hand is complete. Clean up.
return nil, nil, nil
}
/// Process manager handler for hand flow orchestration.
///
/// Listens to events from both table and hand domains to coordinate
/// the poker hand lifecycle.
struct HandFlowPmHandler;
impl ProcessManagerDomainHandler<HandFlowState> for HandFlowPmHandler {
fn event_types(&self) -> Vec<String> {
vec![
"HandStarted".into(),
"CardsDealt".into(),
"BlindPosted".into(),
"ActionTaken".into(),
"CommunityCardsDealt".into(),
"PotAwarded".into(),
"HandComplete".into(),
]
}
fn prepare(&self, _trigger: &EventBook, _state: &HandFlowState, event: &Any) -> Vec<Cover> {
// Declare destinations needed based on the triggering event
if event.type_url.ends_with("HandStarted") {
if let Ok(evt) = event.unpack::<HandStarted>() {
// We'll need access to the hand aggregate
return vec![Cover {
domain: "hand".to_string(),
root: Some(Uuid {
value: evt.hand_root,
}),
..Default::default()
}];
}
}
vec![]
}
fn handle(
&self,
_trigger: &EventBook,
state: &HandFlowState,
event: &Any,
_destinations: &[EventBook],
) -> CommandResult<ProcessManagerResponse> {
// Clone state for mutation (PM state is rebuilt from events, not mutated in-place)
let mut local_state = state.clone();
let type_url = &event.type_url;
if type_url.ends_with("HandStarted") {
return self.handle_hand_started(&mut local_state, event);
} else if type_url.ends_with("CardsDealt") {
return self.handle_cards_dealt(&mut local_state, event);
} else if type_url.ends_with("BlindPosted") {
return self.handle_blind_posted(&mut local_state, event);
} else if type_url.ends_with("ActionTaken") {
return self.handle_action_taken(&mut local_state, event);
} else if type_url.ends_with("CommunityCardsDealt") {
return self.handle_community_dealt(&mut local_state, event);
} else if type_url.ends_with("PotAwarded") {
return self.handle_pot_awarded(&mut local_state, event);
} else if type_url.ends_with("HandComplete") {
return self.handle_hand_complete(&mut local_state, event);
}
Ok(ProcessManagerResponse::default())
}
}
impl HandFlowPmHandler {
/// Handle HandStarted event from table domain.
///
/// Initializes the PM state for tracking this hand.
/// The saga-table-hand handles sending DealCards, so we don't emit commands here.
fn handle_hand_started(
&self,
state: &mut HandFlowState,
event_any: &Any,
) -> CommandResult<ProcessManagerResponse> {
let event: HandStarted = event_any.unpack().map_err(|e| {
CommandRejectedError::new(format!("Failed to decode HandStarted: {}", e))
})?;
state.hand_root = event.hand_root;
state.hand_number = event.hand_number;
state.phase = HandPhase::Dealing;
// No commands - saga-table-hand sends DealCards
Ok(ProcessManagerResponse::default())
}
/// Handle CardsDealt event from hand domain.
///
/// Cards have been dealt, waiting for blinds.
fn handle_cards_dealt(
&self,
state: &mut HandFlowState,
event_any: &Any,
) -> CommandResult<ProcessManagerResponse> {
let _event: CardsDealt = event_any.unpack().map_err(|e| {
CommandRejectedError::new(format!("Failed to decode CardsDealt: {}", e))
})?;
state.phase = HandPhase::Blinds;
// Blinds are posted by players/coordinator, not by PM
Ok(ProcessManagerResponse::default())
}
/// Handle BlindPosted event from hand domain.
fn handle_blind_posted(
&self,
state: &mut HandFlowState,
event_any: &Any,
) -> CommandResult<ProcessManagerResponse> {
let _event: BlindPosted = event_any.unpack().map_err(|e| {
CommandRejectedError::new(format!("Failed to decode BlindPosted: {}", e))
})?;
state.blinds_posted += 1;
// In a full implementation, check if all blinds posted then start betting
if state.blinds_posted >= 2 {
state.phase = HandPhase::Betting;
}
Ok(ProcessManagerResponse::default())
}
/// Handle ActionTaken event from hand domain.
fn handle_action_taken(
&self,
state: &mut HandFlowState,
event_any: &Any,
) -> CommandResult<ProcessManagerResponse> {
let _event: ActionTaken = event_any.unpack().map_err(|e| {
CommandRejectedError::new(format!("Failed to decode ActionTaken: {}", e))
})?;
// In a full implementation, track betting round progress
// and advance phases when rounds complete
let _ = state; // State tracking would go here
Ok(ProcessManagerResponse::default())
}
/// Handle CommunityCardsDealt event from hand domain.
fn handle_community_dealt(
&self,
_state: &mut HandFlowState,
event_any: &Any,
) -> CommandResult<ProcessManagerResponse> {
let _event: CommunityCardsDealt = event_any.unpack().map_err(|e| {
CommandRejectedError::new(format!("Failed to decode CommunityCardsDealt: {}", e))
})?;
// New betting round starts after community cards
Ok(ProcessManagerResponse::default())
}
/// Handle PotAwarded event from hand domain.
fn handle_pot_awarded(
&self,
state: &mut HandFlowState,
event_any: &Any,
) -> CommandResult<ProcessManagerResponse> {
let _event: PotAwarded = event_any.unpack().map_err(|e| {
CommandRejectedError::new(format!("Failed to decode PotAwarded: {}", e))
})?;
state.phase = HandPhase::Complete;
Ok(ProcessManagerResponse::default())
}
/// Handle HandComplete event from hand domain.
fn handle_hand_complete(
&self,
state: &mut HandFlowState,
event_any: &Any,
) -> CommandResult<ProcessManagerResponse> {
let _event: HandComplete = event_any.unpack().map_err(|e| {
CommandRejectedError::new(format!("Failed to decode HandComplete: {}", e))
})?;
state.phase = HandPhase::Complete;
// The saga-hand-table handles sending EndHand to table domain
Ok(ProcessManagerResponse::default())
}
}
public class HandFlowPM extends ProcessManager<Struct> {
private final Map<String, HandProcess> processes = new HashMap<>();
public HandFlowPM() {
super("hand-flow");
}
@Override
protected Struct createEmptyState() {
return Struct.getDefaultInstance();
}
@Prepares(HandStarted.class)
public List<Cover> prepareHandStarted(HandStarted event) {
return List.of(
Cover.newBuilder()
.setDomain("hand")
.setRoot(dev.angzarr.UUID.newBuilder().setValue(event.getHandRoot()))
.build());
}
@Handles(HandStarted.class)
public List<CommandBook> handleHandStarted(HandStarted event) {
byte[] handRoot = event.getHandRoot().toByteArray();
String handId = bytesToHex(handRoot) + "_" + event.getHandNumber();
HandProcess process = new HandProcess();
process.handId = handId;
process.handRoot = handRoot;
process.handNumber = event.getHandNumber();
process.dealerPosition = event.getDealerPosition();
process.smallBlindPosition = event.getSmallBlindPosition();
process.bigBlindPosition = event.getBigBlindPosition();
process.smallBlind = event.getSmallBlind();
process.bigBlind = event.getBigBlind();
process.phase = HandPhase.DEALING;
for (SeatSnapshot player : event.getActivePlayersList()) {
PlayerState ps = new PlayerState();
ps.playerRoot = player.getPlayerRoot().toByteArray();
ps.position = player.getPosition();
ps.stack = player.getStack();
process.players.put(player.getPosition(), ps);
process.activePositions.add(player.getPosition());
}
Collections.sort(process.activePositions);
processes.put(handId, process);
return List.of(); // DealCards comes from saga
}
@Handles(CardsDealt.class)
public List<CommandBook> handleCardsDealt(CardsDealt event) {
String handId = bytesToHex(event.getTableRoot().toByteArray()) + "_" + event.getHandNumber();
HandProcess process = processes.get(handId);
if (process == null) return List.of();
process.phase = HandPhase.POSTING_BLINDS;
process.minRaise = process.bigBlind;
CommandBook cmd = buildPostBlindCommand(process);
return cmd != null ? List.of(cmd) : List.of();
}
@Handles(BlindPosted.class)
public List<CommandBook> handleBlindPosted(BlindPosted event) {
HandProcess process = findProcessByPlayer(event.getPlayerRoot().toByteArray());
if (process == null) return List.of();
for (PlayerState player : process.players.values()) {
if (Arrays.equals(player.playerRoot, event.getPlayerRoot().toByteArray())) {
player.stack = event.getPlayerStack();
player.betThisRound = event.getAmount();
player.totalInvested = event.getAmount();
break;
}
}
process.potTotal = event.getPotTotal();
if ("small".equals(event.getBlindType())) {
process.smallBlindPosted = true;
process.currentBet = event.getAmount();
CommandBook cmd = buildPostBlindCommand(process);
return cmd != null ? List.of(cmd) : List.of();
} else if ("big".equals(event.getBlindType())) {
process.bigBlindPosted = true;
process.currentBet = event.getAmount();
startBetting(process);
return List.of();
}
return List.of();
}
@Handles(ActionTaken.class)
public List<CommandBook> handleActionTaken(ActionTaken event) {
HandProcess process = findProcessByPlayer(event.getPlayerRoot().toByteArray());
if (process == null) return List.of();
for (PlayerState player : process.players.values()) {
if (Arrays.equals(player.playerRoot, event.getPlayerRoot().toByteArray())) {
player.stack = event.getPlayerStack();
player.hasActed = true;
if (event.getAction() == ActionType.FOLD) {
player.hasFolded = true;
} else if (event.getAction() == ActionType.ALL_IN) {
player.allIn = true;
player.betThisRound += event.getAmount();
player.totalInvested += event.getAmount();
} else if (event.getAction() == ActionType.CALL
|| event.getAction() == ActionType.BET
|| event.getAction() == ActionType.RAISE) {
player.betThisRound += event.getAmount();
player.totalInvested += event.getAmount();
}
if ((event.getAction() == ActionType.BET
|| event.getAction() == ActionType.RAISE
|| event.getAction() == ActionType.ALL_IN)
&& player.betThisRound > process.currentBet) {
long raiseAmount = player.betThisRound - process.currentBet;
process.currentBet = player.betThisRound;
process.minRaise = Math.max(process.minRaise, raiseAmount);
process.lastAggressor = player.position;
for (PlayerState p : process.players.values()) {
if (p.position != player.position && !p.hasFolded && !p.allIn) {
p.hasActed = false;
}
}
}
break;
}
}
process.potTotal = event.getPotTotal();
if (isBettingComplete(process)) {
CommandBook cmd = endBettingRound(process);
return cmd != null ? List.of(cmd) : List.of();
} else {
advanceAction(process);
return List.of();
}
}
@Handles(CommunityCardsDealt.class)
public List<CommandBook> handleCommunityDealt(CommunityCardsDealt event) {
for (HandProcess process : processes.values()) {
if (process.phase == HandPhase.DEALING_COMMUNITY) {
process.communityCardCount = event.getAllCommunityCardsCount();
process.bettingPhase = event.getPhaseValue();
startBetting(process);
return List.of();
}
}
return List.of();
}
@Handles(PotAwarded.class)
public List<CommandBook> handlePotAwarded(PotAwarded event) {
for (HandProcess process : processes.values()) {
if (process.phase != HandPhase.COMPLETE) {
process.phase = HandPhase.COMPLETE;
}
}
return List.of();
}
// --- Helper methods ---
private CommandBook buildPostBlindCommand(HandProcess process) {
PlayerState player;
String blindType;
long amount;
if (!process.smallBlindPosted) {
player = process.players.get(process.smallBlindPosition);
blindType = "small";
amount = process.smallBlind;
} else if (!process.bigBlindPosted) {
player = process.players.get(process.bigBlindPosition);
blindType = "big";
amount = process.bigBlind;
} else {
return null;
}
if (player == null) return null;
PostBlind cmd =
PostBlind.newBuilder()
.setPlayerRoot(ByteString.copyFrom(player.playerRoot))
.setBlindType(blindType)
.setAmount(amount)
.build();
return CommandBook.newBuilder()
.setCover(
Cover.newBuilder()
.setDomain("hand")
.setRoot(
dev.angzarr.UUID.newBuilder().setValue(ByteString.copyFrom(process.handRoot))))
.addPages(CommandPage.newBuilder().setCommand(Any.pack(cmd, "type.googleapis.com/")))
.build();
}
private void startBetting(HandProcess process) {
process.phase = HandPhase.BETTING;
for (PlayerState player : process.players.values()) {
player.betThisRound = 0;
player.hasActed = false;
}
process.currentBet = 0;
if (process.bettingPhase == BettingPhase.PREFLOP_VALUE) {
process.actionOn = findNextActive(process, process.bigBlindPosition);
} else {
process.actionOn = findNextActive(process, process.dealerPosition);
}
process.actionStartedAt = Instant.now();
}
private void advanceAction(HandProcess process) {
process.actionOn = findNextActive(process, process.actionOn);
process.actionStartedAt = Instant.now();
}
private int findNextActive(HandProcess process, int afterPosition) {
List<Integer> positions = process.activePositions;
int n = positions.size();
if (n == 0) return -1;
int startIdx = 0;
for (int i = 0; i < n; i++) {
if (positions.get(i) > afterPosition) {
startIdx = i;
break;
}
}
for (int i = 0; i < n; i++) {
int idx = (startIdx + i) % n;
int pos = positions.get(idx);
PlayerState player = process.players.get(pos);
if (player != null && !player.hasFolded && !player.allIn) {
return pos;
}
}
return -1;
}
private boolean isBettingComplete(HandProcess process) {
List<PlayerState> activePlayers = new ArrayList<>();
for (PlayerState p : process.players.values()) {
if (!p.hasFolded && !p.allIn) {
activePlayers.add(p);
}
}
if (activePlayers.size() <= 1) return true;
for (PlayerState player : activePlayers) {
if (!player.hasActed) return false;
if (player.betThisRound < process.currentBet && !player.allIn) return false;
}
return true;
}
private CommandBook endBettingRound(HandProcess process) {
List<PlayerState> playersInHand = new ArrayList<>();
List<PlayerState> activePlayers = new ArrayList<>();
for (PlayerState p : process.players.values()) {
if (!p.hasFolded) {
playersInHand.add(p);
if (!p.allIn) activePlayers.add(p);
}
}
if (playersInHand.size() == 1) {
return awardPotToLastPlayer(process, playersInHand.get(0));
}
return advancePhase(process);
}
private CommandBook advancePhase(HandProcess process) {
if (process.bettingPhase == BettingPhase.PREFLOP_VALUE) {
process.phase = HandPhase.DEALING_COMMUNITY;
return buildDealCommunityCommand(process, 3);
} else if (process.bettingPhase == BettingPhase.FLOP_VALUE) {
process.phase = HandPhase.DEALING_COMMUNITY;
return buildDealCommunityCommand(process, 1);
} else if (process.bettingPhase == BettingPhase.TURN_VALUE) {
process.phase = HandPhase.DEALING_COMMUNITY;
return buildDealCommunityCommand(process, 1);
} else if (process.bettingPhase == BettingPhase.RIVER_VALUE) {
return autoAwardPot(process);
}
return null;
}
private CommandBook buildDealCommunityCommand(HandProcess process, int count) {
DealCommunityCards cmd = DealCommunityCards.newBuilder().setCount(count).build();
return CommandBook.newBuilder()
.setCover(
Cover.newBuilder()
.setDomain("hand")
.setRoot(
dev.angzarr.UUID.newBuilder().setValue(ByteString.copyFrom(process.handRoot))))
.addPages(CommandPage.newBuilder().setCommand(Any.pack(cmd, "type.googleapis.com/")))
.build();
}
private CommandBook awardPotToLastPlayer(HandProcess process, PlayerState winner) {
process.phase = HandPhase.COMPLETE;
AwardPot cmd =
AwardPot.newBuilder()
.addAwards(
PotAward.newBuilder()
.setPlayerRoot(ByteString.copyFrom(winner.playerRoot))
.setAmount(process.potTotal)
.setPotType("main"))
.build();
return CommandBook.newBuilder()
.setCover(
Cover.newBuilder()
.setDomain("hand")
.setRoot(
dev.angzarr.UUID.newBuilder().setValue(ByteString.copyFrom(process.handRoot))))
.addPages(CommandPage.newBuilder().setCommand(Any.pack(cmd, "type.googleapis.com/")))
.build();
}
private CommandBook autoAwardPot(HandProcess process) {
List<PlayerState> playersInHand = new ArrayList<>();
for (PlayerState p : process.players.values()) {
if (!p.hasFolded) playersInHand.add(p);
}
if (playersInHand.isEmpty()) return null;
long split = process.potTotal / playersInHand.size();
long remainder = process.potTotal % playersInHand.size();
AwardPot.Builder cmdBuilder = AwardPot.newBuilder();
for (int i = 0; i < playersInHand.size(); i++) {
PlayerState player = playersInHand.get(i);
long amount = split + (i < remainder ? 1 : 0);
cmdBuilder.addAwards(
PotAward.newBuilder()
.setPlayerRoot(ByteString.copyFrom(player.playerRoot))
.setAmount(amount)
.setPotType("main"));
}
process.phase = HandPhase.COMPLETE;
return CommandBook.newBuilder()
.setCover(
Cover.newBuilder()
.setDomain("hand")
.setRoot(
dev.angzarr.UUID.newBuilder().setValue(ByteString.copyFrom(process.handRoot))))
.addPages(
CommandPage.newBuilder()
.setCommand(Any.pack(cmdBuilder.build(), "type.googleapis.com/")))
.build();
}
private HandProcess findProcessByPlayer(byte[] playerRoot) {
for (HandProcess process : processes.values()) {
for (PlayerState player : process.players.values()) {
if (Arrays.equals(player.playerRoot, playerRoot)) {
return process;
}
}
}
return null;
}
private static String bytesToHex(byte[] bytes) {
StringBuilder sb = new StringBuilder();
for (byte b : bytes) {
sb.append(String.format("%02x", b));
}
return sb.toString();
}
// --- Inner classes for state tracking ---
// docs:start:pm_state_oo
private enum HandPhase {
DEALING,
POSTING_BLINDS,
BETTING,
DEALING_COMMUNITY,
COMPLETE
}
private static class HandProcess {
String handId;
byte[] handRoot;
long handNumber;
int dealerPosition;
int smallBlindPosition;
int bigBlindPosition;
long smallBlind;
long bigBlind;
HandPhase phase = HandPhase.DEALING;
Map<Integer, PlayerState> players = new HashMap<>();
List<Integer> activePositions = new ArrayList<>();
boolean smallBlindPosted;
boolean bigBlindPosted;
long currentBet;
long minRaise;
long potTotal;
int actionOn;
int lastAggressor;
Instant actionStartedAt;
int communityCardCount;
int bettingPhase;
}
private static class PlayerState {
byte[] playerRoot;
int position;
long stack;
long betThisRound;
long totalInvested;
boolean hasActed;
boolean hasFolded;
boolean allIn;
}
// docs:end:pm_state_oo
}
/// <summary>
/// Hand Flow Process Manager using OO-style attributes.
///
/// This PM orchestrates poker hand flow by:
/// - Tracking when hands start and complete
/// - Coordinating between table and hand domains
/// </summary>
public class HandFlowPM : ProcessManager<PMState>
{
public override string Name => "hand-flow";
public HandFlowPM()
: base() { }
public HandFlowPM(EventBook? processState)
: base(processState) { }
protected override PMState CreateEmptyState() => new();
protected override void ApplyEvent(PMState state, Any eventAny)
{
// In this simplified example, we don't persist PM events.
}
/// <summary>
/// Declare the hand destination needed when a hand starts.
/// </summary>
[Prepares(typeof(HandStarted))]
public List<Cover> PrepareHandStarted(HandStarted evt)
{
return new List<Cover>
{
new Cover
{
Domain = "hand",
Root = new Angzarr.UUID { Value = evt.HandRoot },
},
};
}
/// <summary>
/// Process the HandStarted event.
///
/// Initialize hand process (not persisted in this simplified version).
/// The saga-table-hand will send DealCards, so we don't emit commands here.
/// </summary>
[Handles(typeof(HandStarted), InputDomain = "table")]
public List<CommandBook> HandleHandStarted(HandStarted evt, List<EventBook> destinations)
{
return new List<CommandBook>();
}
/// <summary>
/// Process the CardsDealt event.
///
/// Post small blind command. In a real implementation, we'd track state
/// to know which blind to post.
/// </summary>
[Handles(typeof(CardsDealt), InputDomain = "hand")]
public List<CommandBook> HandleCardsDealt(CardsDealt evt, List<EventBook> destinations)
{
return new List<CommandBook>();
}
/// <summary>
/// Process the BlindPosted event.
///
/// In a full implementation, we'd check if both blinds are posted
/// and then start the betting round.
/// </summary>
[Handles(typeof(BlindPosted), InputDomain = "hand")]
public List<CommandBook> HandleBlindPosted(BlindPosted evt, List<EventBook> destinations)
{
return new List<CommandBook>();
}
/// <summary>
/// Process the ActionTaken event.
///
/// In a full implementation, we'd check if betting is complete
/// and advance to the next phase.
/// </summary>
[Handles(typeof(ActionTaken), InputDomain = "hand")]
public List<CommandBook> HandleActionTaken(ActionTaken evt, List<EventBook> destinations)
{
return new List<CommandBook>();
}
/// <summary>
/// Process the CommunityCardsDealt event.
///
/// Start new betting round after community cards.
/// </summary>
[Handles(typeof(CommunityCardsDealt), InputDomain = "hand")]
public List<CommandBook> HandleCommunityDealt(
CommunityCardsDealt evt,
List<EventBook> destinations
)
{
return new List<CommandBook>();
}
/// <summary>
/// Process the PotAwarded event.
///
/// Hand is complete. Clean up.
/// </summary>
[Handles(typeof(PotAwarded), InputDomain = "hand")]
public List<CommandBook> HandlePotAwarded(PotAwarded evt, List<EventBook> destinations)
{
return new List<CommandBook>();
}
}
/// Hand Flow Process Manager using OO-style explicit registration.
///
/// This PM orchestrates poker hand flow by:
/// - Tracking when hands start and complete
/// - Coordinating between table and hand domains
class HandFlowPM {
public:
HandFlowPM() {
// Register prepare handlers
prepare_handlers_["HandStarted"] = [this](const google::protobuf::Any& any) {
examples::HandStarted evt;
any.UnpackTo(&evt);
return prepare_HandStarted(evt);
};
// Register event handlers
handlers_["HandStarted"] = [this](const google::protobuf::Any& any,
const std::string& corr_id) {
examples::HandStarted evt;
any.UnpackTo(&evt);
return handle_HandStarted(evt, corr_id);
};
handlers_["CardsDealt"] = [this](const google::protobuf::Any& any,
const std::string& corr_id) {
examples::CardsDealt evt;
any.UnpackTo(&evt);
return handle_CardsDealt(evt, corr_id);
};
handlers_["BlindPosted"] = [this](const google::protobuf::Any& any,
const std::string& corr_id) {
examples::BlindPosted evt;
any.UnpackTo(&evt);
return handle_BlindPosted(evt, corr_id);
};
handlers_["ActionTaken"] = [this](const google::protobuf::Any& any,
const std::string& corr_id) {
examples::ActionTaken evt;
any.UnpackTo(&evt);
return handle_ActionTaken(evt, corr_id);
};
handlers_["CommunityCardsDealt"] = [this](const google::protobuf::Any& any,
const std::string& corr_id) {
examples::CommunityCardsDealt evt;
any.UnpackTo(&evt);
return handle_CommunityCardsDealt(evt, corr_id);
};
handlers_["PotAwarded"] = [this](const google::protobuf::Any& any,
const std::string& corr_id) {
examples::PotAwarded evt;
any.UnpackTo(&evt);
return handle_PotAwarded(evt, corr_id);
};
// Register event appliers
appliers_["HandStarted"] = [this](PMState& state, const google::protobuf::Any& any) {
examples::HandStarted evt;
any.UnpackTo(&evt);
apply_HandStarted(state, evt);
};
appliers_["PotAwarded"] = [this](PMState& state, const google::protobuf::Any& any) {
examples::PotAwarded evt;
any.UnpackTo(&evt);
apply_PotAwarded(state, evt);
};
}
std::string name() const { return "pmg-hand-flow-oo"; }
std::vector<std::string> input_domains() const { return {"table", "hand"}; }
/// Prepare destinations for events (two-phase protocol).
std::vector<angzarr::Cover> prepare_destinations(const angzarr::EventBook& book) {
std::vector<angzarr::Cover> destinations;
for (const auto& page : book.pages()) {
if (!page.has_event()) continue;
auto suffix = angzarr::helpers::type_name_from_url(page.event().type_url());
auto it = prepare_handlers_.find(suffix);
if (it != prepare_handlers_.end()) {
auto covers = it->second(page.event());
destinations.insert(destinations.end(), covers.begin(), covers.end());
}
}
return destinations;
}
/// Dispatch events to handlers.
std::vector<angzarr::CommandBook> dispatch(
const angzarr::EventBook& book, const angzarr::EventBook* prior_events = nullptr,
const std::vector<angzarr::EventBook>& /* destinations */ = {}) {
rebuild_state(prior_events);
auto correlation_id = book.has_cover() ? book.cover().correlation_id() : "";
if (correlation_id.empty()) {
return {}; // PMs require correlation ID
}
std::vector<angzarr::CommandBook> commands;
for (const auto& page : book.pages()) {
if (!page.has_event()) continue;
auto suffix = angzarr::helpers::type_name_from_url(page.event().type_url());
// Apply event to state first
auto applier_it = appliers_.find(suffix);
if (applier_it != appliers_.end()) {
applier_it->second(state_, page.event());
}
// Dispatch to handler
auto it = handlers_.find(suffix);
if (it != handlers_.end()) {
auto cmds = it->second(page.event(), correlation_id);
commands.insert(commands.end(), cmds.begin(), cmds.end());
}
}
return commands;
}
const PMState& state() const { return state_; }
protected:
/// Declare the hand destination needed when a hand starts.
std::vector<angzarr::Cover> prepare_HandStarted(const examples::HandStarted& evt) {
angzarr::Cover cover;
cover.set_domain("hand");
cover.mutable_root()->set_value(evt.hand_root());
return {cover};
}
/// Process the HandStarted event from table domain.
std::vector<angzarr::CommandBook> handle_HandStarted(const examples::HandStarted& /* evt */,
const std::string& /* corr_id */) {
// No commands to emit - saga-table-hand handles the DealCards command
return {};
}
/// Apply HandStarted to state.
void apply_HandStarted(PMState& state, const examples::HandStarted& evt) {
state.hand_root = evt.hand_root();
state.hand_in_progress = true;
}
/// Process the CardsDealt event from hand domain.
std::vector<angzarr::CommandBook> handle_CardsDealt(const examples::CardsDealt& /* evt */,
const std::string& /* corr_id */) {
return {};
}
/// Process the BlindPosted event from hand domain.
std::vector<angzarr::CommandBook> handle_BlindPosted(const examples::BlindPosted& /* evt */,
const std::string& /* corr_id */) {
return {};
}
/// Process the ActionTaken event from hand domain.
std::vector<angzarr::CommandBook> handle_ActionTaken(const examples::ActionTaken& /* evt */,
const std::string& /* corr_id */) {
return {};
}
/// Process the CommunityCardsDealt event from hand domain.
std::vector<angzarr::CommandBook> handle_CommunityCardsDealt(
const examples::CommunityCardsDealt& /* evt */, const std::string& /* corr_id */) {
return {};
}
/// Process the PotAwarded event from hand domain.
std::vector<angzarr::CommandBook> handle_PotAwarded(const examples::PotAwarded& /* evt */,
const std::string& /* corr_id */) {
return {};
}
/// Apply PotAwarded to state.
void apply_PotAwarded(PMState& state, const examples::PotAwarded& /* evt */) {
state.hand_in_progress = false;
}
private:
void rebuild_state(const angzarr::EventBook* event_book) {
state_ = PMState{};
if (!event_book) return;
for (const auto& page : event_book->pages()) {
if (!page.has_event()) continue;
auto suffix = angzarr::helpers::type_name_from_url(page.event().type_url());
auto it = appliers_.find(suffix);
if (it != appliers_.end()) {
it->second(state_, page.event());
}
}
}
using EventHandler = std::function<std::vector<angzarr::CommandBook>(
const google::protobuf::Any&, const std::string&)>;
using PrepareHandler = std::function<std::vector<angzarr::Cover>(const google::protobuf::Any&)>;
using EventApplier = std::function<void(PMState&, const google::protobuf::Any&)>;
std::map<std::string, EventHandler> handlers_;
std::map<std::string, PrepareHandler> prepare_handlers_;
std::map<std::string, EventApplier> appliers_;
PMState state_;
};
Both patterns produce identical behavior—choose based on team preference. The functional ProcessManagerRouter is more explicit; the OO approach integrates state and handlers in one class.
Correlation ID
Process managers use the correlation ID as their aggregate root:
message Cover {
string domain = 2;
UUID root = 1;
string correlation_id = 3; // Workflow correlation - flows through all commands/events
Edition edition = 4; // Edition for diverged timelines; empty name = main timeline
// Field 5 removed: external_id moved to ExternalDeferredSequence in PageHeader
}
All events in a workflow share the same correlation ID, allowing the PM to:
- Receive events from multiple domains
- Maintain workflow state across events
- Track progress through the state machine
- Emit commands or facts to target domains
Commands vs Facts
PMs can emit either commands (requests that can be rejected) or facts (assertions the target must accept):
| Output | Can Reject | Use Case |
|---|---|---|
| Command | Yes | Request action that may fail validation |
| Fact | No | Assert PM's orchestration decision |
Facts are useful when the PM has authority the target aggregate must accept—phase transitions, tournament rulings, dealer decisions:
# Command: request action, may be rejected
def handle_hand_complete(event: HandComplete, state: WorkflowState):
return CommandBook(domain="player", command=TransferChips(amount=event.pot))
# Fact: assert PM decision, cannot be rejected
def handle_phase_transition(event: AllPlayersReady, state: WorkflowState):
return FactBook(
domain="hand",
external_id=f"phase:{state.correlation_id}:{state.current_phase}",
event=PhaseAdvanced(phase=state.next_phase),
)
See Commands vs Facts for details on when to use each.
State Persistence
PM state is stored as events in the PM's own event stream:
PM Event Stream (correlation_id = "hand-abc-123"):
[0] WorkflowStarted { hand_id: "...", phase: "awaiting_deal" }
[1] PhaseTransitioned { from: "awaiting_deal", to: "dealing" }
[2] PhaseTransitioned { from: "dealing", to: "blinds" }
[3] WorkflowCompleted { winner_id: "..." }
On restart, the PM rebuilds state by replaying its own events.
Rejection Handling
When a PM-issued command is rejected, the PM receives a Notification before the source aggregate:
1. PM issues DealCards → Hand rejects (invalid_player_count)
│
▼
2. PM receives Notification first
- Can update workflow state (mark step failed)
- Can decide to retry or abort
│
▼
3. Source aggregate receives Notification
- Emits compensation events
- Python
@rejected("hand", "DealCards")
def handle_deal_rejected(self, state: HandFlowState, notification: Notification):
# Update workflow state
return WorkflowFailed(
hand_id=state.hand_id,
reason=f"Deal failed: {notification.rejection_reason}",
step="deal_cards",
)
Timeouts
PM-scheduled timeouts (e.g., player action timers, turn clocks) are planned but not yet implemented.
See Roadmap for details.
PM vs Saga
| Aspect | Saga | Process Manager |
|---|---|---|
| State | Stateless | Stateful (own event stream) |
| Identity | None | correlation_id |
| Input domains | Single | Multiple |
| Persistence | No | Yes (workflow events) |
| Timeouts | No | Planned |
| Complexity | Low | Higher |
Rule of thumb: Start with sagas. Upgrade to PM when you need state tracking or multi-domain input.
Next Steps
- Sagas — Simpler stateless coordination
- Commands vs Facts — When to emit facts instead of commands
- Why Poker — PM patterns in poker
- Testing — Testing PMs with Gherkin