Patterns Reference
Common CQRS and Event Sourcing patterns used in angzarr.
Pattern Catalog
| Category | Patterns |
|---|---|
| Delivery | Outbox, Idempotent Consumer |
| Schema Evolution | Upcasting |
| Coordination | Correlation ID, Merge Strategy, Sync Mode |
| Query | Temporal Query |
Delivery Patterns
Outbox Pattern
You probably don't need this. Modern managed messaging (Kafka, SQS, Pub/Sub) already guarantees delivery. Only consider outbox if your messaging layer lacks durability.
The outbox pattern ensures atomicity between database writes and event publishing:
1. Events persisted to event store }
2. Events written to outbox table } ← Single transaction
3. Background process polls outbox
4. Events published to message bus
5. Outbox entries marked published
When to use:
- In-memory or non-durable message transport
- Regulatory requirement for local audit trail
Skip when using:
- Kafka with
acks=all - AWS SQS/SNS
- GCP Pub/Sub
- RabbitMQ with persistent queues
Idempotent Consumer
Consumers must tolerate duplicate events. Design for natural idempotency:
| Operation | Idempotent? | Fix |
|---|---|---|
INSERT | No | Use INSERT ... ON CONFLICT DO NOTHING |
UPDATE SET x = x + 1 | No | Use absolute: UPDATE SET x = $value |
UPDATE SET x = $value | Yes | Already idempotent |
DELETE WHERE id = $1 | Yes | Already idempotent |
⍼ Angzarr's sequence numbers ensure events are never applied twice—both deltas (amount: 50) and absolute values (new_balance: 150) are idempotent.
Schema Evolution
Upcasting
Transform old event versions to current version when reading:
- Python
- Rust
- Go
- Java
- C#
- C++
class PlayerRegisteredV1ToV2Upcaster:
def can_upcast(self, event_type: str, version: int) -> bool:
return event_type == "PlayerRegistered" and version == 1
def upcast(self, event: dict) -> dict:
# V1 had "name", V2 split into "first_name" and "last_name"
name_parts = event["name"].split(" ", 1)
return {
"first_name": name_parts[0],
"last_name": name_parts[1] if len(name_parts) > 1 else "",
"email": event["email"],
}
impl Upcaster for PlayerRegisteredV1ToV2 {
fn can_upcast(&self, event_type: &str, version: u32) -> bool {
event_type == "PlayerRegistered" && version == 1
}
fn upcast(&self, event: Value) -> Value {
let name = event["name"].as_str().unwrap_or("");
let parts: Vec<&str> = name.splitn(2, ' ').collect();
json!({
"first_name": parts.get(0).unwrap_or(&""),
"last_name": parts.get(1).unwrap_or(&""),
"email": event["email"],
})
}
}
type PlayerRegisteredV1ToV2 struct{}
func (u *PlayerRegisteredV1ToV2) CanUpcast(eventType string, version int) bool {
return eventType == "PlayerRegistered" && version == 1
}
func (u *PlayerRegisteredV1ToV2) Upcast(event map[string]interface{}) map[string]interface{} {
name := event["name"].(string)
parts := strings.SplitN(name, " ", 2)
lastName := ""
if len(parts) > 1 {
lastName = parts[1]
}
return map[string]interface{}{
"first_name": parts[0],
"last_name": lastName,
"email": event["email"],
}
}
public class PlayerRegisteredV1ToV2 implements Upcaster {
@Override
public boolean canUpcast(String eventType, int version) {
return "PlayerRegistered".equals(eventType) && version == 1;
}
@Override
public Map<String, Object> upcast(Map<String, Object> event) {
String name = (String) event.get("name");
String[] parts = name.split(" ", 2);
return Map.of(
"first_name", parts[0],
"last_name", parts.length > 1 ? parts[1] : "",
"email", event.get("email")
);
}
}
public class PlayerRegisteredV1ToV2 : IUpcaster
{
public bool CanUpcast(string eventType, int version)
=> eventType == "PlayerRegistered" && version == 1;
public Dictionary<string, object> Upcast(Dictionary<string, object> @event)
{
var name = (string)@event["name"];
var parts = name.Split(' ', 2);
return new Dictionary<string, object>
{
["first_name"] = parts[0],
["last_name"] = parts.Length > 1 ? parts[1] : "",
["email"] = @event["email"]
};
}
}
class PlayerRegisteredV1ToV2 : public Upcaster {
public:
bool can_upcast(const std::string& event_type, int version) const override {
return event_type == "PlayerRegistered" && version == 1;
}
nlohmann::json upcast(const nlohmann::json& event) const override {
std::string name = event["name"];
size_t pos = name.find(' ');
return {
{"first_name", pos != std::string::npos ? name.substr(0, pos) : name},
{"last_name", pos != std::string::npos ? name.substr(pos + 1) : ""},
{"email", event["email"]}
};
}
};
Key points:
- Stored events remain unchanged
- Transformation happens on read
- Enables gradual schema migration
Coordination Patterns
Correlation ID
Links related events across aggregates:
message Cover {
string domain = 2;
UUID root = 1;
string correlation_id = 3; // Workflow correlation - flows through all commands/events
Edition edition = 4; // Edition for diverged timelines; empty name = main timeline
}
Propagation rules:
- Client provides correlation_id on initial command (if cross-domain tracking needed)
- Framework does NOT auto-generate—if not provided, stays empty
- Once set, propagates through sagas and process managers
Merge Strategy
Controls concurrency handling for optimistic locking:
| Strategy | Proto Value | Conflict Behavior | gRPC Code | Retryable? |
|---|---|---|---|---|
| COMMUTATIVE | 0 (default) | Return fresh state | FAILED_PRECONDITION | Yes |
| STRICT | 1 | Immediate rejection | ABORTED | No |
| AGGREGATE_HANDLES | 2 | Aggregate decides | Varies | Varies |
| MANUAL | 3 | Route to DLQ | ABORTED | No |
When to use each:
| Use Case | Strategy | Why |
|---|---|---|
| Financial operations | STRICT | Must see current balance |
| Counters, metrics | COMMUTATIVE | Order doesn't matter, safe to retry |
| CRDT-style operations | AGGREGATE_HANDLES | Aggregate merges conflicts |
| Audit-critical operations | MANUAL | Human review required |
- Python
- Go
- Rust
- Java
- C#
- C++
# STRICT: Must see current balance — immediate rejection on conflict
@merge_strategy(MergeStrategy.STRICT)
def handle_reserve_funds(state, cmd):
if cmd.amount > state.available():
raise CommandRejectedError("insufficient_funds")
return FundsReserved(amount=cmd.amount)
# COMMUTATIVE (default): Safe to retry with fresh state
@merge_strategy(MergeStrategy.COMMUTATIVE)
def handle_add_points(state, cmd):
return PointsAdded(points=cmd.points)
# MANUAL: Route to DLQ for human review
@merge_strategy(MergeStrategy.MANUAL)
def handle_compliance_action(state, cmd):
return ComplianceActionTaken(action=cmd.action)
// STRICT: Must see current balance — immediate rejection on conflict
func handleReserveFunds(state *State, cmd *ReserveFunds) (*FundsReserved, error) {
if cmd.Amount > state.Available() {
return nil, status.Error(codes.FailedPrecondition, "insufficient_funds")
}
return &FundsReserved{Amount: cmd.Amount}, nil
}
// Router configuration with merge strategies
router := CommandRouter("player").
On("ReserveFunds", handleReserveFunds, MergeStrategy_STRICT).
On("AddPoints", handleAddPoints, MergeStrategy_COMMUTATIVE).
On("ComplianceAction", handleCompliance, MergeStrategy_MANUAL)
// STRICT: Must see current balance — immediate rejection on conflict
fn handle_reserve_funds(state: &State, cmd: &ReserveFunds) -> Result<FundsReserved, Status> {
if cmd.amount > state.available() {
return Err(Status::failed_precondition("insufficient_funds"));
}
Ok(FundsReserved { amount: cmd.amount })
}
// Router configuration with merge strategies
let router = CommandRouter::new("player")
.on::<ReserveFunds>(handle_reserve_funds).merge_strategy(MergeStrategy::Strict)
.on::<AddPoints>(handle_add_points).merge_strategy(MergeStrategy::Commutative)
.on::<ComplianceAction>(handle_compliance).merge_strategy(MergeStrategy::Manual);
// STRICT: Must see current balance — immediate rejection on conflict
@MergeStrategy(MergeStrategy.STRICT)
@Handles(ReserveFunds.class)
public FundsReserved handleReserveFunds(State state, ReserveFunds cmd) {
if (cmd.getAmount() > state.available()) {
throw new CommandRejectedError("insufficient_funds");
}
return FundsReserved.newBuilder().setAmount(cmd.getAmount()).build();
}
// MANUAL: Route to DLQ for human review
@MergeStrategy(MergeStrategy.MANUAL)
@Handles(ComplianceAction.class)
public ComplianceActionTaken handleCompliance(State state, ComplianceAction cmd) {
return ComplianceActionTaken.newBuilder().setAction(cmd.getAction()).build();
}
// STRICT: Must see current balance — immediate rejection on conflict
[MergeStrategy(MergeStrategy.Strict)]
[Handles(typeof(ReserveFunds))]
public FundsReserved HandleReserveFunds(State state, ReserveFunds cmd)
{
if (cmd.Amount > state.Available)
throw new CommandRejectedError("insufficient_funds");
return new FundsReserved { Amount = cmd.Amount };
}
// MANUAL: Route to DLQ for human review
[MergeStrategy(MergeStrategy.Manual)]
[Handles(typeof(ComplianceAction))]
public ComplianceActionTaken HandleCompliance(State state, ComplianceAction cmd)
=> new ComplianceActionTaken { Action = cmd.Action };
// STRICT: Must see current balance — immediate rejection on conflict
FundsReserved handle_reserve_funds(const State& state, const ReserveFunds& cmd) {
if (cmd.amount() > state.available()) {
throw CommandRejectedError("insufficient_funds");
}
FundsReserved event;
event.set_amount(cmd.amount());
return event;
}
// Router configuration with merge strategies
auto router = CommandRouter("player")
.on<ReserveFunds>(handle_reserve_funds, MergeStrategy::STRICT)
.on<AddPoints>(handle_add_points, MergeStrategy::COMMUTATIVE)
.on<ComplianceAction>(handle_compliance, MergeStrategy::MANUAL);
See Error Recovery for retry behavior and DLQ routing details.
Sync Mode
Controls when command processing returns to caller:
| Mode | Proto Value | Description | Use Case |
|---|---|---|---|
| UNSPECIFIED | 0 | Fire and forget (async) | Background tasks |
| SIMPLE | 1 | Wait for projectors only | Read-after-write consistency |
| CASCADE | 2 | Wait for projectors + saga cascade | Full sync (expensive) |
// Controls synchronous processing behavior
enum SyncMode {
SYNC_MODE_UNSPECIFIED = 0; // Async: fire and forget (default)
SYNC_MODE_SIMPLE = 1; // Sync projectors only, no saga cascade
SYNC_MODE_CASCADE = 2; // Full sync: projectors + saga cascade (expensive)
}
Trade-offs:
| Mode | Latency | Consistency | Cost |
|---|---|---|---|
| UNSPECIFIED | Lowest | Eventual | Cheapest |
| SIMPLE | Medium | Read-your-writes | Moderate |
| CASCADE | Highest | Full | Expensive |
Query Patterns
Temporal Query
Reconstruct state at any point in history:
# Get player balance as of yesterday
events = event_store.get_events(
domain="player",
root=player_id,
up_to_timestamp=yesterday,
)
state = build_state(events)
print(f"Balance at {yesterday}: {state.bankroll}")
Enabled by:
- Immutable event stream
- Events contain absolute state
- No destructive updates
Advanced Patterns
Edition / Branching
Editions enable divergent timelines for speculative execution, historical analysis, or saga coordination:
// Edition identifier with optional explicit divergence points.
//
// Two modes:
// - Implicit (divergences empty): Divergence derived from first edition event's sequence
// - Explicit (divergences populated): Per-domain divergence points for historical branching,
// saga coordination, or speculative execution
message Edition {
string name = 1; // Edition name, e.g., "v2"; empty = main timeline
repeated DomainDivergence divergences = 2; // Optional: explicit per-domain divergence points
}
// Explicit divergence point for a specific domain.
// Used when creating historical branches or coordinating saga writes across domains.
message DomainDivergence {
string domain = 1; // Domain name
uint32 sequence = 2; // Divergence sequence number
}
Use cases:
| Scenario | How |
|---|---|
| What-if analysis | Branch at sequence N, apply hypothetical commands |
| Historical branches | Explore alternative histories |
| Saga coordination | Coordinate writes across domains with explicit divergence |
| Speculative execution | Execute commands speculatively, merge or discard |
Two modes:
- Implicit divergence — First event in edition determines divergence point
- Explicit divergence — Per-domain divergence points specified upfront
# Create a what-if branch
edition = Edition(name="bonus-scenario")
cover = Cover(
domain="player",
root=player_id,
edition=edition,
)
# Commands on this cover write to the branch, not main timeline
cmd = CommandBook(cover=cover, pages=[...])
Snapshot Retention
Controls how snapshots are managed during compaction:
// Controls snapshot retention during cleanup
enum SnapshotRetention {
RETENTION_DEFAULT = 0; // Persist every 16 events, treated as TRANSIENT otherwise
RETENTION_PERSIST = 1; // Keep indefinitely (business milestone)
RETENTION_TRANSIENT = 2; // Delete when newer snapshot written
}
When to use:
| Retention | Use Case | Example |
|---|---|---|
| DEFAULT | Normal operation | Most events |
| PERSIST | Business milestones | End-of-day balance, audit checkpoints |
| TRANSIENT | Temporary optimization | Intermediate computation state |
Snapshot with retention:
# Mark a snapshot as a business milestone (never delete)
snapshot = Snapshot(
sequence=current_sequence,
state=state.SerializeToString(),
retention=SnapshotRetention.RETENTION_PERSIST,
)
Component Coordination
Saga vs Process Manager
| Aspect | Saga | Process Manager |
|---|---|---|
| State | Stateless | Own event stream |
| Input | Single domain | Multiple domains |
| Identity | None | correlation_id |
| Timeouts | No | Yes |
Rule of thumb: Start with sagas. Upgrade to PM when you need state or multi-domain input.
Next Steps
- Aggregates — Command handling
- Sagas — Cross-domain coordination
- Process Managers — Stateful orchestration