Skip to content

Process Managers

A process manager coordinates multi-domain workflows with state tracking. Unlike stateless sagas, process managers maintain their own event stream keyed by correlation ID, enabling complex orchestration patterns.


Use CaseSagaProcess Manager
Single event → single command
Fan-out to multiple domains
Multi-step workflow with state
Events from multiple domains
State machine transitions
Timeouts and retries

Over-reliance on process managers is a foot-gun. If you find yourself:

  • Putting significant business logic in a PM
  • Building PMs with large, complex state
  • Creating PMs for workflows within a single domain
  • Reaching for PMs as a first solution

Your domain factoring is probably wrong.

Process managers should be lightweight state machines coordinating across domains—not business logic containers. They answer “what phase are we in?” and “what happens next?”, not “how do we calculate X?”

SymptomLikely Problem
PM has complex validation logicLogic belongs in domain aggregate
PM state mirrors aggregate stateRedundant—query the aggregate
PM handles single-domain workflowUse saga or aggregate instead
PM event stream grows largeToo much responsibility—split domains

Rule of thumb: If a PM does more than track phases and dispatch commands, reconsider your architecture. The PM’s job is orchestration, not computation.


The HandFlowPM orchestrates poker hand phases across table and hand domains:

sequenceDiagram
    participant T as Table
    participant PM as HandFlowPM
    participant H as Hand

    T->>PM: HandStarted
    PM->>H: DealCards
    H->>PM: CardsDealt
    PM->>H: Phase transitions
    H->>PM: HandComplete
    PM->>T: EndHand

PMs extend the client library’s ProcessManager base class; handler methods are registered by decorator / attribute / macro. State lives on the instance.

class HandPhase(Enum):
AWAITING_DEAL = "awaiting_deal"
DEALING = "dealing"
BLINDS = "blinds"
BETTING = "betting"
COMPLETE = "complete"
@dataclass
class HandFlowState:
"""PM state - tracks workflow progress."""
hand_id: str = ""
phase: HandPhase = HandPhase.AWAITING_DEAL
player_count: int = 0
@process_manager(
name="pmg-hand-flow",
pm_domain="hand-flow",
sources=["table", "hand"],
targets=["hand", "table"],
state=HandFlowState,
)
class HandFlowPM:
"""OO-style process manager using unified Router decorators."""
@handles(table.HandStarted)
def on_hand_started(
self,
event: table.HandStarted,
state: HandFlowState,
destinations: Destinations,
) -> Optional[ProcessManagerResponse]:
"""Table started a hand -> send DealCards to hand domain."""
state.hand_id = event.hand_id
state.phase = HandPhase.DEALING
state.player_count = event.player_count
deal_cards = hand.DealCards(
hand_id=event.hand_id,
player_count=event.player_count,
)
seq = destinations.sequence_for("hand") if destinations else 0
return ProcessManagerResponse(
commands=[_command_book("hand", deal_cards, seq or 0)]
)
@handles(hand.CardsDealt)
def handle_cards_dealt(
self,
event: hand.CardsDealt,
state: HandFlowState,
destinations: Destinations,
) -> Optional[ProcessManagerResponse]:
"""Cards dealt -> post blinds."""
state.phase = HandPhase.BLINDS
post_blinds = hand.PostBlinds(hand_id=state.hand_id)
seq = destinations.sequence_for("hand") if destinations else 0
return ProcessManagerResponse(
commands=[_command_book("hand", post_blinds, seq or 0)]
)
@handles(hand.HandComplete)
def handle_hand_complete(
self,
event: hand.HandComplete,
state: HandFlowState,
destinations: Destinations,
) -> Optional[ProcessManagerResponse]:
"""Hand complete -> end hand on table."""
state.phase = HandPhase.COMPLETE
end_hand = table.EndHand(
hand_id=state.hand_id,
winner_id=event.winner_id,
)
seq = destinations.sequence_for("table") if destinations else 0
return ProcessManagerResponse(
commands=[_command_book("table", end_hand, seq or 0)]
)

Process managers use the correlation ID as their aggregate root:

message Cover {
string domain = 2;
UUID root = 1;
string correlation_id = 3; // Workflow correlation - flows through all commands/events
Edition edition = 4; // Edition for diverged timelines; empty name = main timeline
// Field 5 removed: external_id moved to ExternalDeferredSequence in PageHeader
// Client-supplied extension slot. Most consumers pack a nested
// ``Cover`` here to express parent-aggregate relationships (e.g.
// table events carry the tournament Cover; hand events carry the
// table Cover). Helper ``unpack_parent_cover`` extracts the typed
// Cover when present. Framework code treats this as opaque routing
// metadata — it propagates without inspecting.
//
// WARNING — mass-propagating: the framework stamps this slot onto
// EVERY event a child aggregate emits, so the payload is paid per
// event. Keep what you pack small. The intended workload is a single
// nested Cover (~50–80 bytes packed); larger payloads multiply
// across the event stream and the bus.
google.protobuf.Any ext = 6;
}

All events in a workflow share the same correlation ID, allowing the PM to:

  • Receive events from multiple domains
  • Maintain workflow state across events
  • Track progress through the state machine
  • Emit commands or facts to target domains

PMs can emit either commands (requests that can be rejected) or facts (assertions the target must accept):

OutputCan RejectUse Case
CommandYesRequest action that may fail validation
FactNoAssert PM’s orchestration decision

Facts are useful when the PM has authority the target aggregate must accept—phase transitions, tournament rulings, dealer decisions:

illustrative
# Command: request action, may be rejected
def handle_hand_complete(event: HandComplete, state: WorkflowState):
return CommandBook(domain="player", command=TransferChips(amount=event.pot))
# Fact: assert PM decision, cannot be rejected
def handle_phase_transition(event: AllPlayersReady, state: WorkflowState):
return FactBook(
domain="hand",
external_id=f"phase:{state.correlation_id}:{state.current_phase}",
event=PhaseAdvanced(phase=state.next_phase),
)

See Commands vs Facts for details on when to use each.


PM state is stored as events in the PM’s own event stream:

illustrative - PM event stream
PM Event Stream (correlation_id = "hand-abc-123"):
[0] WorkflowStarted { hand_id: "...", phase: "awaiting_deal" }
[1] PhaseTransitioned { from: "awaiting_deal", to: "dealing" }
[2] PhaseTransitioned { from: "dealing", to: "blinds" }
[3] WorkflowCompleted { winner_id: "..." }

On restart, the PM rebuilds state by replaying its own events.


When a PM-issued command is rejected, the PM receives a Notification before the source aggregate:

illustrative - rejection flow
1. PM issues DealCards → Hand rejects (invalid_player_count)
2. PM receives Notification first
- Can update workflow state (mark step failed)
- Can decide to retry or abort
3. Source aggregate receives Notification
- Emits compensation events
illustrative
@rejected("hand", "DealCards")
def handle_deal_rejected(self, state: HandFlowState, notification: Notification):
# Update workflow state
return WorkflowFailed(
hand_id=state.hand_id,
reason=f"Deal failed: {notification.rejection_reason}",
step="deal_cards",
)


AspectSagaProcess Manager
StateStatelessStateful (own event stream)
IdentityNonecorrelation_id
Input domainsSingleMultiple
PersistenceNoYes (workflow events)
TimeoutsNoPlanned
ComplexityLowHigher

Rule of thumb: Start with sagas. Upgrade to PM when you need state tracking or multi-domain input.