Skip to main content

Patterns Reference

Common CQRS and Event Sourcing patterns used in angzarr.


Pattern Catalog

CategoryPatterns
DeliveryOutbox, Idempotent Consumer
Schema EvolutionUpcasting
CoordinationCorrelation ID, Merge Strategy, Sync Mode
QueryTemporal Query

Delivery Patterns

Outbox Pattern

You probably don't need this. Modern managed messaging (Kafka, SQS, Pub/Sub) already guarantees delivery. Only consider outbox if your messaging layer lacks durability.

The outbox pattern ensures atomicity between database writes and event publishing:

1. Events persisted to event store  }
2. Events written to outbox table } ← Single transaction
3. Background process polls outbox
4. Events published to message bus
5. Outbox entries marked published

When to use:

  • In-memory or non-durable message transport
  • Regulatory requirement for local audit trail

Skip when using:

  • Kafka with acks=all
  • AWS SQS/SNS
  • GCP Pub/Sub
  • RabbitMQ with persistent queues

Idempotent Consumer

Consumers must tolerate duplicate events. Design for natural idempotency:

OperationIdempotent?Fix
INSERTNoUse INSERT ... ON CONFLICT DO NOTHING
UPDATE SET x = x + 1NoUse absolute: UPDATE SET x = $value
UPDATE SET x = $valueYesAlready idempotent
DELETE WHERE id = $1YesAlready idempotent

⍼ Angzarr's sequence numbers ensure events are never applied twice—both deltas (amount: 50) and absolute values (new_balance: 150) are idempotent.


Schema Evolution

Upcasting

Transform old event versions to current version when reading:

class PlayerRegisteredV1ToV2Upcaster:
def can_upcast(self, event_type: str, version: int) -> bool:
return event_type == "PlayerRegistered" and version == 1

def upcast(self, event: dict) -> dict:
# V1 had "name", V2 split into "first_name" and "last_name"
name_parts = event["name"].split(" ", 1)
return {
"first_name": name_parts[0],
"last_name": name_parts[1] if len(name_parts) > 1 else "",
"email": event["email"],
}

Key points:

  • Stored events remain unchanged
  • Transformation happens on read
  • Enables gradual schema migration

Coordination Patterns

Correlation ID

Links related events across aggregates:

message Cover {
string domain = 2;
UUID root = 1;
string correlation_id = 3; // Workflow correlation - flows through all commands/events
Edition edition = 4; // Edition for diverged timelines; empty name = main timeline
}

Propagation rules:

  • Client provides correlation_id on initial command (if cross-domain tracking needed)
  • Framework does NOT auto-generate—if not provided, stays empty
  • Once set, propagates through sagas and process managers

Merge Strategy

Controls concurrency handling for optimistic locking:

StrategyProto ValueConflict BehaviorgRPC CodeRetryable?
COMMUTATIVE0 (default)Return fresh stateFAILED_PRECONDITIONYes
STRICT1Immediate rejectionABORTEDNo
AGGREGATE_HANDLES2Aggregate decidesVariesVaries
MANUAL3Route to DLQABORTEDNo

When to use each:

Use CaseStrategyWhy
Financial operationsSTRICTMust see current balance
Counters, metricsCOMMUTATIVEOrder doesn't matter, safe to retry
CRDT-style operationsAGGREGATE_HANDLESAggregate merges conflicts
Audit-critical operationsMANUALHuman review required
# STRICT: Must see current balance — immediate rejection on conflict
@merge_strategy(MergeStrategy.STRICT)
def handle_reserve_funds(state, cmd):
if cmd.amount > state.available():
raise CommandRejectedError("insufficient_funds")
return FundsReserved(amount=cmd.amount)

# COMMUTATIVE (default): Safe to retry with fresh state
@merge_strategy(MergeStrategy.COMMUTATIVE)
def handle_add_points(state, cmd):
return PointsAdded(points=cmd.points)

# MANUAL: Route to DLQ for human review
@merge_strategy(MergeStrategy.MANUAL)
def handle_compliance_action(state, cmd):
return ComplianceActionTaken(action=cmd.action)

See Error Recovery for retry behavior and DLQ routing details.

Sync Mode

Controls when command processing returns to caller:

ModeProto ValueDescriptionUse Case
UNSPECIFIED0Fire and forget (async)Background tasks
SIMPLE1Wait for projectors onlyRead-after-write consistency
CASCADE2Wait for projectors + saga cascadeFull sync (expensive)
// Controls synchronous processing behavior
enum SyncMode {
SYNC_MODE_UNSPECIFIED = 0; // Async: fire and forget (default)
SYNC_MODE_SIMPLE = 1; // Sync projectors only, no saga cascade
SYNC_MODE_CASCADE = 2; // Full sync: projectors + saga cascade (expensive)
}

Trade-offs:

ModeLatencyConsistencyCost
UNSPECIFIEDLowestEventualCheapest
SIMPLEMediumRead-your-writesModerate
CASCADEHighestFullExpensive

Query Patterns

Temporal Query

Reconstruct state at any point in history:

# Get player balance as of yesterday
events = event_store.get_events(
domain="player",
root=player_id,
up_to_timestamp=yesterday,
)
state = build_state(events)
print(f"Balance at {yesterday}: {state.bankroll}")

Enabled by:

  • Immutable event stream
  • Events contain absolute state
  • No destructive updates

Advanced Patterns

Edition / Branching

Editions enable divergent timelines for speculative execution, historical analysis, or saga coordination:

// Edition identifier with optional explicit divergence points.
//
// Two modes:
// - Implicit (divergences empty): Divergence derived from first edition event's sequence
// - Explicit (divergences populated): Per-domain divergence points for historical branching,
// saga coordination, or speculative execution
message Edition {
string name = 1; // Edition name, e.g., "v2"; empty = main timeline
repeated DomainDivergence divergences = 2; // Optional: explicit per-domain divergence points
}

// Explicit divergence point for a specific domain.
// Used when creating historical branches or coordinating saga writes across domains.
message DomainDivergence {
string domain = 1; // Domain name
uint32 sequence = 2; // Divergence sequence number
}

Use cases:

ScenarioHow
What-if analysisBranch at sequence N, apply hypothetical commands
Historical branchesExplore alternative histories
Saga coordinationCoordinate writes across domains with explicit divergence
Speculative executionExecute commands speculatively, merge or discard

Two modes:

  1. Implicit divergence — First event in edition determines divergence point
  2. Explicit divergence — Per-domain divergence points specified upfront
# Create a what-if branch
edition = Edition(name="bonus-scenario")
cover = Cover(
domain="player",
root=player_id,
edition=edition,
)

# Commands on this cover write to the branch, not main timeline
cmd = CommandBook(cover=cover, pages=[...])

Snapshot Retention

Controls how snapshots are managed during compaction:

// Controls snapshot retention during cleanup
enum SnapshotRetention {
RETENTION_DEFAULT = 0; // Persist every 16 events, treated as TRANSIENT otherwise
RETENTION_PERSIST = 1; // Keep indefinitely (business milestone)
RETENTION_TRANSIENT = 2; // Delete when newer snapshot written
}

When to use:

RetentionUse CaseExample
DEFAULTNormal operationMost events
PERSISTBusiness milestonesEnd-of-day balance, audit checkpoints
TRANSIENTTemporary optimizationIntermediate computation state

Snapshot with retention:

# Mark a snapshot as a business milestone (never delete)
snapshot = Snapshot(
sequence=current_sequence,
state=state.SerializeToString(),
retention=SnapshotRetention.RETENTION_PERSIST,
)

Component Coordination

Saga vs Process Manager

AspectSagaProcess Manager
StateStatelessOwn event stream
InputSingle domainMultiple domains
IdentityNonecorrelation_id
TimeoutsNoYes

Rule of thumb: Start with sagas. Upgrade to PM when you need state or multi-domain input.


Next Steps