Process Managers
A process manager coordinates multi-domain workflows with state tracking. Unlike stateless sagas, process managers maintain their own event stream keyed by correlation ID, enabling complex orchestration patterns.
When to Use Process Managers
| Use Case | Saga | Process Manager |
|---|---|---|
| Single event → single command | ✓ | |
| Fan-out to multiple domains | ✓ | |
| Multi-step workflow with state | ✓ | |
| Events from multiple domains | ✓ | |
| State machine transitions | ✓ | |
| Timeouts and retries | ✓ |
Warning: Process Manager Anti-Patterns
Over-reliance on process managers is a foot-gun. If you find yourself:
- Putting significant business logic in a PM
- Building PMs with large, complex state
- Creating PMs for workflows within a single domain
- Reaching for PMs as a first solution
Your domain factoring is probably wrong.
Process managers should be lightweight state machines coordinating across domains—not business logic containers. They answer "what phase are we in?" and "what happens next?", not "how do we calculate X?"
| Symptom | Likely Problem |
|---|---|
| PM has complex validation logic | Logic belongs in domain aggregate |
| PM state mirrors aggregate state | Redundant—query the aggregate |
| PM handles single-domain workflow | Use saga or aggregate instead |
| PM event stream grows large | Too much responsibility—split domains |
Rule of thumb: If a PM does more than track phases and dispatch commands, reconsider your architecture. The PM's job is orchestration, not computation.
Example: Hand Flow PM
The HandFlowPM orchestrates poker hand phases across table and hand domains:
State Definition
- Python
- Go
- Rust
- Java
- C#
- C++
class HandPhase(Enum):
AWAITING_DEAL = "awaiting_deal"
DEALING = "dealing"
BLINDS = "blinds"
BETTING = "betting"
COMPLETE = "complete"
@dataclass
class HandFlowState:
hand_id: str = ""
phase: HandPhase = HandPhase.AWAITING_DEAL
player_count: int = 0
type HandPhase int
const (
AwaitingDeal HandPhase = iota
Dealing
Blinds
Betting
Complete
)
type HandFlowState struct {
HandID string
Phase HandPhase
PlayerCount int32
}
#[derive(Default)]
pub struct HandFlowState {
hand_id: String,
phase: HandPhase,
player_count: u32,
}
#[derive(Default, PartialEq)]
pub enum HandPhase {
#[default]
AwaitingDeal,
Dealing,
Blinds,
Betting,
Complete,
}
enum HandPhase {
AWAITING_DEAL, DEALING, BLINDS, BETTING, COMPLETE
}
class HandFlowState {
private String handId = "";
private HandPhase phase = HandPhase.AWAITING_DEAL;
private int playerCount = 0;
public String getHandId() { return handId; }
public void setHandId(String handId) { this.handId = handId; }
public HandPhase getPhase() { return phase; }
public void setPhase(HandPhase phase) { this.phase = phase; }
public int getPlayerCount() { return playerCount; }
public void setPlayerCount(int playerCount) { this.playerCount = playerCount; }
}
public enum HandPhase { AwaitingDeal, Dealing, Blinds, Betting, Complete }
public class HandFlowState
{
public string HandId { get; set; } = "";
public HandPhase Phase { get; set; } = HandPhase.AwaitingDeal;
public int PlayerCount { get; set; } = 0;
}
enum class HandPhase { AwaitingDeal, Dealing, Blinds, Betting, Complete };
struct HandFlowState {
std::string hand_id;
HandPhase phase = HandPhase::AwaitingDeal;
int32_t player_count = 0;
};
Handler Implementation
- Python
- Go
- Rust
- Java
- C#
- C++
class HandFlowPM(ProcessManager):
def handle_hand_started(self, event: table.HandStarted, state: HandFlowState):
# Transition: AWAITING_DEAL -> DEALING
state.hand_id = event.hand_id
state.phase = HandPhase.DEALING
state.player_count = event.player_count
# Emit command to hand domain
return [hand.DealCards(
hand_id=event.hand_id,
player_count=event.player_count,
)]
def handle_cards_dealt(self, event: hand.CardsDealt, state: HandFlowState):
# Transition: DEALING -> BLINDS
state.phase = HandPhase.BLINDS
return [hand.PostBlinds(hand_id=state.hand_id)]
def handle_hand_complete(self, event: hand.HandComplete, state: HandFlowState):
# Transition: * -> COMPLETE
state.phase = HandPhase.COMPLETE
# Signal table domain
return [table.EndHand(
hand_id=state.hand_id,
winner_id=event.winner_id,
)]
type HandFlowPM struct{}
func (pm *HandFlowPM) HandleHandStarted(event *examples.HandStarted, state *HandFlowState) []*pb.CommandBook {
state.HandID = event.HandId
state.Phase = Dealing
state.PlayerCount = event.PlayerCount
return []*pb.CommandBook{
angzarr.BuildCommand("hand", &examples.DealCards{
HandId: event.HandId,
PlayerCount: event.PlayerCount,
}),
}
}
func (pm *HandFlowPM) HandleCardsDealt(event *examples.CardsDealt, state *HandFlowState) []*pb.CommandBook {
state.Phase = Blinds
return []*pb.CommandBook{
angzarr.BuildCommand("hand", &examples.PostBlinds{HandId: state.HandID}),
}
}
func (pm *HandFlowPM) HandleHandComplete(event *examples.HandComplete, state *HandFlowState) []*pb.CommandBook {
state.Phase = Complete
return []*pb.CommandBook{
angzarr.BuildCommand("table", &examples.EndHand{
HandId: state.HandID,
WinnerId: event.WinnerId,
}),
}
}
pub struct HandFlowPM;
impl HandFlowPM {
pub fn handle_hand_started(
&self,
event: &HandStarted,
state: &mut HandFlowState,
) -> Vec<CommandBook> {
state.hand_id = event.hand_id.clone();
state.phase = HandPhase::Dealing;
state.player_count = event.player_count;
vec![build_command("hand", DealCards {
hand_id: event.hand_id.clone(),
player_count: event.player_count,
})]
}
pub fn handle_cards_dealt(
&self,
_event: &CardsDealt,
state: &mut HandFlowState,
) -> Vec<CommandBook> {
state.phase = HandPhase::Blinds;
vec![build_command("hand", PostBlinds {
hand_id: state.hand_id.clone(),
})]
}
pub fn handle_hand_complete(
&self,
event: &HandComplete,
state: &mut HandFlowState,
) -> Vec<CommandBook> {
state.phase = HandPhase::Complete;
vec![build_command("table", EndHand {
hand_id: state.hand_id.clone(),
winner_id: event.winner_id.clone(),
})]
}
}
public class HandFlowPM extends ProcessManager<HandFlowState> {
@ReactsTo(HandStarted.class)
public List<CommandBook> handleHandStarted(HandStarted event, HandFlowState state) {
state.setHandId(event.getHandId());
state.setPhase(HandPhase.DEALING);
state.setPlayerCount(event.getPlayerCount());
return List.of(buildCommand("hand", DealCards.newBuilder()
.setHandId(event.getHandId())
.setPlayerCount(event.getPlayerCount())
.build()));
}
@ReactsTo(CardsDealt.class)
public List<CommandBook> handleCardsDealt(CardsDealt event, HandFlowState state) {
state.setPhase(HandPhase.BLINDS);
return List.of(buildCommand("hand", PostBlinds.newBuilder()
.setHandId(state.getHandId())
.build()));
}
@ReactsTo(HandComplete.class)
public List<CommandBook> handleHandComplete(HandComplete event, HandFlowState state) {
state.setPhase(HandPhase.COMPLETE);
return List.of(buildCommand("table", EndHand.newBuilder()
.setHandId(state.getHandId())
.setWinnerId(event.getWinnerId())
.build()));
}
}
public class HandFlowPM : ProcessManager<HandFlowState>
{
[ReactsTo(typeof(HandStarted))]
public IEnumerable<CommandBook> HandleHandStarted(HandStarted evt, HandFlowState state)
{
state.HandId = evt.HandId;
state.Phase = HandPhase.Dealing;
state.PlayerCount = evt.PlayerCount;
yield return BuildCommand("hand", new DealCards
{
HandId = evt.HandId,
PlayerCount = evt.PlayerCount
});
}
[ReactsTo(typeof(CardsDealt))]
public IEnumerable<CommandBook> HandleCardsDealt(CardsDealt evt, HandFlowState state)
{
state.Phase = HandPhase.Blinds;
yield return BuildCommand("hand", new PostBlinds { HandId = state.HandId });
}
[ReactsTo(typeof(HandComplete))]
public IEnumerable<CommandBook> HandleHandComplete(HandComplete evt, HandFlowState state)
{
state.Phase = HandPhase.Complete;
yield return BuildCommand("table", new EndHand
{
HandId = state.HandId,
WinnerId = evt.WinnerId
});
}
}
class HandFlowPM : public ProcessManager<HandFlowState> {
public:
std::vector<CommandBook> handle_hand_started(
const HandStarted& event, HandFlowState& state) {
state.hand_id = event.hand_id();
state.phase = HandPhase::Dealing;
state.player_count = event.player_count();
DealCards cmd;
cmd.set_hand_id(event.hand_id());
cmd.set_player_count(event.player_count());
return {build_command("hand", cmd)};
}
std::vector<CommandBook> handle_cards_dealt(
const CardsDealt& event, HandFlowState& state) {
state.phase = HandPhase::Blinds;
PostBlinds cmd;
cmd.set_hand_id(state.hand_id);
return {build_command("hand", cmd)};
}
std::vector<CommandBook> handle_hand_complete(
const HandComplete& event, HandFlowState& state) {
state.phase = HandPhase::Complete;
EndHand cmd;
cmd.set_hand_id(state.hand_id);
cmd.set_winner_id(event.winner_id());
return {build_command("table", cmd)};
}
};
Correlation ID
Process managers use the correlation ID as their aggregate root:
message Cover {
string domain = 2;
UUID root = 1;
string correlation_id = 3; // Workflow correlation - flows through all commands/events
Edition edition = 4; // Edition for diverged timelines; empty name = main timeline
}
All events in a workflow share the same correlation ID, allowing the PM to:
- Receive events from multiple domains
- Maintain workflow state across events
- Track progress through the state machine
State Persistence
PM state is stored as events in the PM's own event stream:
PM Event Stream (correlation_id = "hand-abc-123"):
[0] WorkflowStarted { hand_id: "...", phase: "awaiting_deal" }
[1] PhaseTransitioned { from: "awaiting_deal", to: "dealing" }
[2] PhaseTransitioned { from: "dealing", to: "blinds" }
[3] WorkflowCompleted { winner_id: "..." }
On restart, the PM rebuilds state by replaying its own events.
Rejection Handling
When a PM-issued command is rejected, the PM receives a Notification before the source aggregate:
1. PM issues DealCards → Hand rejects (invalid_player_count)
│
▼
2. PM receives Notification first
- Can update workflow state (mark step failed)
- Can decide to retry or abort
│
▼
3. Source aggregate receives Notification
- Emits compensation events
- Python
@rejected("hand", "DealCards")
def handle_deal_rejected(self, state: HandFlowState, notification: Notification):
# Update workflow state
return WorkflowFailed(
hand_id=state.hand_id,
reason=f"Deal failed: {notification.rejection_reason}",
step="deal_cards",
)
Timeouts
PMs can schedule timeouts for player actions:
def handle_action_required(self, event: ActionRequired, state: HandFlowState):
# Schedule timeout
return [ScheduleTimeout(
correlation_id=state.correlation_id,
seconds=30,
timeout_event=PlayerTimedOut(player_id=event.player_id),
)]
def handle_player_timed_out(self, event: PlayerTimedOut, state: HandFlowState):
# Auto-fold on timeout
return [Fold(hand_id=state.hand_id, player_id=event.player_id)]
PM vs Saga
| Aspect | Saga | Process Manager |
|---|---|---|
| State | Stateless | Stateful (own event stream) |
| Identity | None | correlation_id |
| Input domains | Single | Multiple |
| Persistence | No | Yes (workflow events) |
| Timeouts | No | Yes |
| Complexity | Low | Higher |
Rule of thumb: Start with sagas. Upgrade to PM when you need state tracking or multi-domain input.