Skip to main content

Process Managers

A process manager coordinates multi-domain workflows with state tracking. Unlike stateless sagas, process managers maintain their own event stream keyed by correlation ID, enabling complex orchestration patterns.


When to Use Process Managers

Use CaseSagaProcess Manager
Single event → single command
Fan-out to multiple domains
Multi-step workflow with state
Events from multiple domains
State machine transitions
Timeouts and retries

Warning: Process Manager Anti-Patterns

Over-reliance on process managers is a foot-gun. If you find yourself:

  • Putting significant business logic in a PM
  • Building PMs with large, complex state
  • Creating PMs for workflows within a single domain
  • Reaching for PMs as a first solution

Your domain factoring is probably wrong.

Process managers should be lightweight state machines coordinating across domains—not business logic containers. They answer "what phase are we in?" and "what happens next?", not "how do we calculate X?"

SymptomLikely Problem
PM has complex validation logicLogic belongs in domain aggregate
PM state mirrors aggregate stateRedundant—query the aggregate
PM handles single-domain workflowUse saga or aggregate instead
PM event stream grows largeToo much responsibility—split domains

Rule of thumb: If a PM does more than track phases and dispatch commands, reconsider your architecture. The PM's job is orchestration, not computation.


Implementation Styles

Angzarr supports two process manager implementation styles:

StyleDescriptionBest For
FunctionalProcessManagerRouter with function handlersSimple routing, explicit
OOProcessManager class with @handles/[Handles] decoratorsRich state, encapsulation

Example: Hand Flow PM

The HandFlowPM orchestrates poker hand phases across table and hand domains:

ProcessManagerRouter with explicit event handler registration:

State Definition

class HandPhase(Enum):
AWAITING_DEAL = "awaiting_deal"
DEALING = "dealing"
BLINDS = "blinds"
BETTING = "betting"
COMPLETE = "complete"


@dataclass
class HandFlowState:
"""PM state - tracks workflow progress."""

hand_id: str = ""
phase: HandPhase = HandPhase.AWAITING_DEAL
player_count: int = 0


Handler Implementation

class HandFlowPM(ProcessManager[HandFlowState]):
"""OO-style process manager using decorators."""

name = "pmg-hand-flow"

def _create_empty_state(self) -> HandFlowState:
return HandFlowState()

def _apply_event(self, state: HandFlowState, event_any: Any) -> None:
"""Apply PM's own events to rebuild state."""
type_url = event_any.type_url

if type_url.endswith("HandFlowStarted"):
# In production, unpack and apply
pass
elif type_url.endswith("PhaseTransitioned"):
pass

@prepares(table.HandStarted)
def prepare_hand_started(self, event: table.HandStarted) -> list[Cover]:
"""Declare hand destination needed when hand starts."""
return [
Cover(
domain="hand",
root=Uuid(value=event.hand_root),
)
]

@output_domain("hand")
@handles(table.HandStarted, input_domain="table")
def on_hand_started(
self, event: table.HandStarted, destinations: list[EventBook]
) -> Optional[hand.DealCards]:
"""Table started a hand -> send DealCards to hand domain."""
# Update local state
self.state.hand_id = event.hand_id
self.state.phase = HandPhase.DEALING
self.state.player_count = event.player_count

return hand.DealCards(
hand_id=event.hand_id,
player_count=event.player_count,
)

@output_domain("hand")
@handles(hand.CardsDealt, input_domain="hand")
def on_cards_dealt(self, event: hand.CardsDealt) -> Optional[hand.PostBlinds]:
"""Cards dealt -> post blinds."""
self.state.phase = HandPhase.BLINDS
return hand.PostBlinds(hand_id=self.state.hand_id)

@output_domain("table")
@handles(hand.HandComplete, input_domain="hand")
def on_hand_complete(self, event: hand.HandComplete) -> Optional[table.EndHand]:
"""Hand complete -> end hand on table."""
self.state.phase = HandPhase.COMPLETE
return table.EndHand(
hand_id=self.state.hand_id,
winner_id=event.winner_id,
)


Both patterns produce identical behavior—choose based on team preference. The functional ProcessManagerRouter is more explicit; the OO approach integrates state and handlers in one class.


Correlation ID

Process managers use the correlation ID as their aggregate root:

message Cover {
string domain = 2;
UUID root = 1;
string correlation_id = 3; // Workflow correlation - flows through all commands/events
Edition edition = 4; // Edition for diverged timelines; empty name = main timeline
// Field 5 removed: external_id moved to ExternalDeferredSequence in PageHeader
}

All events in a workflow share the same correlation ID, allowing the PM to:

  • Receive events from multiple domains
  • Maintain workflow state across events
  • Track progress through the state machine
  • Emit commands or facts to target domains

Commands vs Facts

PMs can emit either commands (requests that can be rejected) or facts (assertions the target must accept):

OutputCan RejectUse Case
CommandYesRequest action that may fail validation
FactNoAssert PM's orchestration decision

Facts are useful when the PM has authority the target aggregate must accept—phase transitions, tournament rulings, dealer decisions:

illustrative
# Command: request action, may be rejected
def handle_hand_complete(event: HandComplete, state: WorkflowState):
return CommandBook(domain="player", command=TransferChips(amount=event.pot))

# Fact: assert PM decision, cannot be rejected
def handle_phase_transition(event: AllPlayersReady, state: WorkflowState):
return FactBook(
domain="hand",
external_id=f"phase:{state.correlation_id}:{state.current_phase}",
event=PhaseAdvanced(phase=state.next_phase),
)

See Commands vs Facts for details on when to use each.


State Persistence

PM state is stored as events in the PM's own event stream:

illustrative - PM event stream
PM Event Stream (correlation_id = "hand-abc-123"):
[0] WorkflowStarted { hand_id: "...", phase: "awaiting_deal" }
[1] PhaseTransitioned { from: "awaiting_deal", to: "dealing" }
[2] PhaseTransitioned { from: "dealing", to: "blinds" }
[3] WorkflowCompleted { winner_id: "..." }

On restart, the PM rebuilds state by replaying its own events.


Rejection Handling

When a PM-issued command is rejected, the PM receives a Notification before the source aggregate:

illustrative - rejection flow
1. PM issues DealCards → Hand rejects (invalid_player_count)


2. PM receives Notification first
- Can update workflow state (mark step failed)
- Can decide to retry or abort


3. Source aggregate receives Notification
- Emits compensation events
illustrative
@rejected("hand", "DealCards")
def handle_deal_rejected(self, state: HandFlowState, notification: Notification):
# Update workflow state
return WorkflowFailed(
hand_id=state.hand_id,
reason=f"Deal failed: {notification.rejection_reason}",
step="deal_cards",
)

Timeouts

Future Feature

PM-scheduled timeouts (e.g., player action timers, turn clocks) are planned but not yet implemented.

See Roadmap for details.


PM vs Saga

AspectSagaProcess Manager
StateStatelessStateful (own event stream)
IdentityNonecorrelation_id
Input domainsSingleMultiple
PersistenceNoYes (workflow events)
TimeoutsNoPlanned
ComplexityLowHigher

Rule of thumb: Start with sagas. Upgrade to PM when you need state tracking or multi-domain input.


Next Steps