Skip to main content

Process Managers

A process manager coordinates multi-domain workflows with state tracking. Unlike stateless sagas, process managers maintain their own event stream keyed by correlation ID, enabling complex orchestration patterns.


When to Use Process Managers

Use CaseSagaProcess Manager
Single event → single command
Fan-out to multiple domains
Multi-step workflow with state
Events from multiple domains
State machine transitions
Timeouts and retries

Warning: Process Manager Anti-Patterns

Over-reliance on process managers is a foot-gun. If you find yourself:

  • Putting significant business logic in a PM
  • Building PMs with large, complex state
  • Creating PMs for workflows within a single domain
  • Reaching for PMs as a first solution

Your domain factoring is probably wrong.

Process managers should be lightweight state machines coordinating across domains—not business logic containers. They answer "what phase are we in?" and "what happens next?", not "how do we calculate X?"

SymptomLikely Problem
PM has complex validation logicLogic belongs in domain aggregate
PM state mirrors aggregate stateRedundant—query the aggregate
PM handles single-domain workflowUse saga or aggregate instead
PM event stream grows largeToo much responsibility—split domains

Rule of thumb: If a PM does more than track phases and dispatch commands, reconsider your architecture. The PM's job is orchestration, not computation.


Example: Hand Flow PM

The HandFlowPM orchestrates poker hand phases across table and hand domains:

State Definition

class HandPhase(Enum):
AWAITING_DEAL = "awaiting_deal"
DEALING = "dealing"
BLINDS = "blinds"
BETTING = "betting"
COMPLETE = "complete"


@dataclass
class HandFlowState:
hand_id: str = ""
phase: HandPhase = HandPhase.AWAITING_DEAL
player_count: int = 0

Handler Implementation

class HandFlowPM(ProcessManager):
def handle_hand_started(self, event: table.HandStarted, state: HandFlowState):
# Transition: AWAITING_DEAL -> DEALING
state.hand_id = event.hand_id
state.phase = HandPhase.DEALING
state.player_count = event.player_count

# Emit command to hand domain
return [hand.DealCards(
hand_id=event.hand_id,
player_count=event.player_count,
)]

def handle_cards_dealt(self, event: hand.CardsDealt, state: HandFlowState):
# Transition: DEALING -> BLINDS
state.phase = HandPhase.BLINDS
return [hand.PostBlinds(hand_id=state.hand_id)]

def handle_hand_complete(self, event: hand.HandComplete, state: HandFlowState):
# Transition: * -> COMPLETE
state.phase = HandPhase.COMPLETE

# Signal table domain
return [table.EndHand(
hand_id=state.hand_id,
winner_id=event.winner_id,
)]

Correlation ID

Process managers use the correlation ID as their aggregate root:

message Cover {
string domain = 2;
UUID root = 1;
string correlation_id = 3; // Workflow correlation - flows through all commands/events
Edition edition = 4; // Edition for diverged timelines; empty name = main timeline
}

All events in a workflow share the same correlation ID, allowing the PM to:

  • Receive events from multiple domains
  • Maintain workflow state across events
  • Track progress through the state machine

State Persistence

PM state is stored as events in the PM's own event stream:

PM Event Stream (correlation_id = "hand-abc-123"):
[0] WorkflowStarted { hand_id: "...", phase: "awaiting_deal" }
[1] PhaseTransitioned { from: "awaiting_deal", to: "dealing" }
[2] PhaseTransitioned { from: "dealing", to: "blinds" }
[3] WorkflowCompleted { winner_id: "..." }

On restart, the PM rebuilds state by replaying its own events.


Rejection Handling

When a PM-issued command is rejected, the PM receives a Notification before the source aggregate:

1. PM issues DealCards → Hand rejects (invalid_player_count)


2. PM receives Notification first
- Can update workflow state (mark step failed)
- Can decide to retry or abort


3. Source aggregate receives Notification
- Emits compensation events
@rejected("hand", "DealCards")
def handle_deal_rejected(self, state: HandFlowState, notification: Notification):
# Update workflow state
return WorkflowFailed(
hand_id=state.hand_id,
reason=f"Deal failed: {notification.rejection_reason}",
step="deal_cards",
)

Timeouts

PMs can schedule timeouts for player actions:

def handle_action_required(self, event: ActionRequired, state: HandFlowState):
# Schedule timeout
return [ScheduleTimeout(
correlation_id=state.correlation_id,
seconds=30,
timeout_event=PlayerTimedOut(player_id=event.player_id),
)]

def handle_player_timed_out(self, event: PlayerTimedOut, state: HandFlowState):
# Auto-fold on timeout
return [Fold(hand_id=state.hand_id, player_id=event.player_id)]

PM vs Saga

AspectSagaProcess Manager
StateStatelessStateful (own event stream)
IdentityNonecorrelation_id
Input domainsSingleMultiple
PersistenceNoYes (workflow events)
TimeoutsNoYes
ComplexityLowHigher

Rule of thumb: Start with sagas. Upgrade to PM when you need state tracking or multi-domain input.


Next Steps

  • Sagas — Simpler stateless coordination
  • Why Poker — PM patterns in poker
  • Testing — Testing PMs with Gherkin