Patterns Reference
Common CQRS and Event Sourcing patterns used in angzarr.
Pattern Catalog
Section titled “Pattern Catalog”| Category | Patterns |
|---|---|
| Delivery | Outbox, Idempotent Consumer |
| Schema Evolution | Upcasting |
| Coordination | Correlation ID, Merge Strategy, Sync Mode |
| Query | Temporal Query |
Delivery Patterns
Section titled “Delivery Patterns”Outbox Pattern
Section titled “Outbox Pattern”You probably don’t need this. Modern managed messaging (Kafka, SQS, Pub/Sub) already guarantees delivery. Only consider outbox if your messaging layer lacks durability.
The outbox pattern ensures atomicity between database writes and event publishing:
1. Events persisted to event store }2. Events written to outbox table } ← Single transaction3. Background process polls outbox4. Events published to message bus5. Outbox entries marked publishedWhen to use:
- In-memory or non-durable message transport
- Regulatory requirement for local audit trail
Skip when using:
- Kafka with
acks=all - AWS SQS/SNS
- GCP Pub/Sub
- RabbitMQ with persistent queues
Idempotent Consumer
Section titled “Idempotent Consumer”Consumers must tolerate duplicate events. Design for natural idempotency:
| Operation | Idempotent? | Fix |
|---|---|---|
INSERT | No | Use INSERT ... ON CONFLICT DO NOTHING |
UPDATE SET x = x + 1 | No | Use absolute: UPDATE SET x = $value |
UPDATE SET x = $value | Yes | Already idempotent |
DELETE WHERE id = $1 | Yes | Already idempotent |
⍼ Angzarr’s sequence numbers ensure events are never applied twice—both deltas (amount: 50) and absolute values (new_balance: 150) are idempotent.
Schema Evolution
Section titled “Schema Evolution”Upcasting
Section titled “Upcasting”Transform old event versions to current version when reading:
class PlayerRegisteredV1ToV2Upcaster: def can_upcast(self, event_type: str, version: int) -> bool: return event_type == "PlayerRegistered" and version == 1
def upcast(self, event: dict) -> dict: # V1 had "name", V2 split into "first_name" and "last_name" name_parts = event["name"].split(" ", 1) return { "first_name": name_parts[0], "last_name": name_parts[1] if len(name_parts) > 1 else "", "email": event["email"], }Key points:
- Stored events remain unchanged
- Transformation happens on read
- Enables gradual schema migration
Coordination Patterns
Section titled “Coordination Patterns”Correlation ID
Section titled “Correlation ID”Links related events across aggregates:
message Cover { string domain = 2; UUID root = 1; string correlation_id = 3; // Workflow correlation - flows through all commands/events Edition edition = 4; // Edition for diverged timelines; empty name = main timeline // Field 5 removed: external_id moved to ExternalDeferredSequence in PageHeader // Client-supplied extension slot. Most consumers pack a nested // ``Cover`` here to express parent-aggregate relationships (e.g. // table events carry the tournament Cover; hand events carry the // table Cover). Helper ``unpack_parent_cover`` extracts the typed // Cover when present. Framework code treats this as opaque routing // metadata — it propagates without inspecting. // // WARNING — mass-propagating: the framework stamps this slot onto // EVERY event a child aggregate emits, so the payload is paid per // event. Keep what you pack small. The intended workload is a single // nested Cover (~50–80 bytes packed); larger payloads multiply // across the event stream and the bus. google.protobuf.Any ext = 6;}Propagation rules:
- Client provides correlation_id on initial command (if cross-domain tracking needed)
- Framework does NOT auto-generate—if not provided, stays empty
- Once set, propagates through sagas and process managers
Merge Strategy
Section titled “Merge Strategy”Controls concurrency handling for optimistic locking:
| Strategy | Proto Value | Conflict Behavior | gRPC Code | Retryable? |
|---|---|---|---|---|
| COMMUTATIVE | 0 (default) | Return fresh state | FAILED_PRECONDITION | Yes |
| STRICT | 1 | Immediate rejection | ABORTED | No |
| AGGREGATE_HANDLES | 2 | Aggregate decides | Varies | Varies |
| MANUAL | 3 | Route to DLQ | ABORTED | No |
When to use each:
| Use Case | Strategy | Why |
|---|---|---|
| Financial operations | STRICT | Must see current balance |
| Counters, metrics | COMMUTATIVE | Order doesn’t matter, safe to retry |
| CRDT-style operations | AGGREGATE_HANDLES | Aggregate merges conflicts |
| Audit-critical operations | MANUAL | Human review required |
# STRICT: Must see current balance — immediate rejection on conflict@merge_strategy(MergeStrategy.STRICT)def handle_reserve_funds(state, cmd): if cmd.amount > state.available(): raise CommandRejectedError("insufficient_funds") return FundsReserved(amount=cmd.amount)
# COMMUTATIVE (default): Safe to retry with fresh state@merge_strategy(MergeStrategy.COMMUTATIVE)def handle_add_points(state, cmd): return PointsAdded(points=cmd.points)
# MANUAL: Route to DLQ for human review@merge_strategy(MergeStrategy.MANUAL)def handle_compliance_action(state, cmd): return ComplianceActionTaken(action=cmd.action)See Error Recovery for retry behavior and DLQ routing details.
Sync Mode
Section titled “Sync Mode”Controls when command processing returns to caller:
| Mode | Proto Value | Description | Use Case |
|---|---|---|---|
| UNSPECIFIED | 0 | Fire and forget (async) | Background tasks |
| SIMPLE | 1 | Wait for projectors only | Read-after-write consistency |
| CASCADE | 2 | Wait for projectors + saga cascade | Full sync (expensive) |
// Controls synchronous processing behavior.//// Impact ordering (lowest to highest):// ASYNC < DECISION < SIMPLE < CASCADE//// Primary caller of DECISION is process managers coordinating cross-aggregate// flows: the PM issues a command to the target aggregate and needs to know// *only* whether the aggregate accepted or rejected, without paying the cost// of projector propagation or saga fan-out. The aggregate's accept path// (events persisted + returned) and reject path (CommandRejectedError) are// both observable; everything downstream runs asynchronously.enum SyncMode { SYNC_MODE_ASYNC = 0; // Async: fire and forget (default) SYNC_MODE_SIMPLE = 1; // Sync projectors only, no saga cascade SYNC_MODE_CASCADE = 2; // Full sync: projectors + saga cascade (expensive) SYNC_MODE_DECISION = 3; // Sync aggregate accept/reject only; projectors + sagas run async SYNC_MODE_ISOLATED = 4; // Sync accept/reject + persist; NO downstream (sync OR async). Replay / migration / recovery writes that must not trigger reactions. Distinct from DECISION (which still publishes to bus for async downstream).}Trade-offs:
| Mode | Latency | Consistency | Cost |
|---|---|---|---|
| UNSPECIFIED | Lowest | Eventual | Cheapest |
| SIMPLE | Medium | Read-your-writes | Moderate |
| CASCADE | Highest | Full | Expensive |
Query Patterns
Section titled “Query Patterns”Temporal Query
Section titled “Temporal Query”Reconstruct state at any point in history:
# Get player balance as of yesterdayevents = event_store.get_events( domain="player", root=player_id, up_to_timestamp=yesterday,)state = build_state(events)print(f"Balance at {yesterday}: {state.bankroll}")Enabled by:
- Immutable event stream
- Events contain absolute state
- No destructive updates
Advanced Patterns
Section titled “Advanced Patterns”Edition / Branching
Section titled “Edition / Branching”Editions enable divergent timelines for speculative execution, historical analysis, or saga coordination:
// Edition identifier with optional explicit divergence points.//// Two modes:// - Implicit (divergences empty): Divergence derived from first edition event's sequence// - Explicit (divergences populated): Per-domain divergence points for historical branching,// saga coordination, or speculative executionmessage Edition { string name = 1; // Edition name, e.g., "v2"; empty = main timeline repeated DomainDivergence divergences = 2; // Optional: explicit per-domain divergence points}
// Explicit divergence point for a specific domain.// Used when creating historical branches or coordinating saga writes across domains.message DomainDivergence { string domain = 1; // Domain name uint32 sequence = 2; // Divergence sequence number}Use cases:
| Scenario | How |
|---|---|
| What-if analysis | Branch at sequence N, apply hypothetical commands |
| Historical branches | Explore alternative histories |
| Saga coordination | Coordinate writes across domains with explicit divergence |
| Speculative execution | Execute commands speculatively, merge or discard |
Two modes:
- Implicit divergence — First event in edition determines divergence point
- Explicit divergence — Per-domain divergence points specified upfront
# Create a what-if branchedition = Edition(name="bonus-scenario")cover = Cover( domain="player", root=player_id, edition=edition,)
# Commands on this cover write to the branch, not main timelinecmd = CommandBook(cover=cover, pages=[...])Snapshot Retention
Section titled “Snapshot Retention”Controls how snapshots are managed during compaction:
// Controls snapshot retention during cleanupenum SnapshotRetention { RETENTION_DEFAULT = 0; // Persist every 16 events, treated as TRANSIENT otherwise RETENTION_PERSIST = 1; // Keep indefinitely (business milestone) RETENTION_TRANSIENT = 2; // Delete when newer snapshot written}When to use:
| Retention | Use Case | Example |
|---|---|---|
| DEFAULT | Normal operation | Most events |
| PERSIST | Business milestones | End-of-day balance, audit checkpoints |
| TRANSIENT | Temporary optimization | Intermediate computation state |
Snapshot with retention:
# Mark a snapshot as a business milestone (never delete)snapshot = Snapshot( sequence=current_sequence, state=state.SerializeToString(), retention=SnapshotRetention.RETENTION_PERSIST,)Component Coordination
Section titled “Component Coordination”Saga vs Process Manager
Section titled “Saga vs Process Manager”| Aspect | Saga | Process Manager |
|---|---|---|
| State | Stateless | Own event stream |
| Input | Single domain | Multiple domains |
| Identity | None | correlation_id |
| Timeouts | No | Yes |
Rule of thumb: Start with sagas. Upgrade to PM when you need state or multi-domain input.
Next Steps
Section titled “Next Steps”- Aggregates — Command handling
- Sagas — Cross-domain coordination
- Process Managers — Stateful orchestration