Error Recovery
Angzarr provides layered error recovery: automatic retry for transient failures, dead letter queues for persistent failures, and compensation flows for saga rejections.
Overview
Section titled “Overview”flowchart TB
Fail[Command fails]
Trans{Transient?}
Retry[Retry with backoff]
Still{Still failing?}
DLQ1[DLQ]
Seq{Sequence conflict?}
MS[Check MergeStrategy]
Comm[COMMUTATIVE:<br/>Retry with fresh state]
Strict[STRICT:<br/>Reject immediately]
AggH[AGGREGATE_HANDLES:<br/>Let aggregate decide]
Manual[MANUAL:<br/>Send to DLQ]
Biz{Business rejection?}
Comp[Compensation flow]
Fail --> Trans
Trans -->|yes| Retry --> Still
Still -->|yes| DLQ1
Trans -->|no| Seq
Seq -->|yes| MS
MS --> Comm
MS --> Strict
MS --> AggH
MS --> Manual
Seq -->|no| Biz
Biz -->|yes| Comp
Retry with Backoff
Section titled “Retry with Backoff”Transient failures (network issues, temporary unavailability) are automatically retried with exponential backoff and jitter.
Default Backoff Policies
Section titled “Default Backoff Policies”| Context | Min Delay | Max Delay | Max Attempts | Jitter |
|---|---|---|---|---|
| Saga/PM commands | 10ms | 2s | 10 | Yes |
| gRPC connections | 100ms | 5s | 30 | Yes |
Retryable vs Non-Retryable Errors
Section titled “Retryable vs Non-Retryable Errors”| gRPC Code | Meaning | Retryable? | Action |
|---|---|---|---|
FAILED_PRECONDITION | Sequence mismatch (stale client state) | Yes | Fetch fresh state, rebuild command |
ABORTED | Sent to DLQ (MERGE_MANUAL) | No | Manual intervention required |
INVALID_ARGUMENT | Bad command data | No | Fix command, resubmit |
NOT_FOUND | Aggregate doesn’t exist | No | Check aggregate ID |
INTERNAL | Server error | No | Check server logs |
Retry Flow
Section titled “Retry Flow”1. Command fails with FAILED_PRECONDITION2. Error details contain current EventBook3. Client extracts EventBook from error4. Client rebuilds state from fresh EventBook5. Client re-evaluates business logic with fresh state6. Client builds new command with correct sequence7. Client resubmits8. Repeat until success or max attemptsRebuild, Don’t Just Re-sequence
Simply updating the sequence number is insufficient. The command’s payload may depend on state that changed. You must rebuild the command by re-running your business logic against the fresh state.
import grpcfrom angzarr_client.proto import EventBook
def handle_sequence_error(error: grpc.RpcError) -> EventBook | None: """Extract fresh state from sequence mismatch error.""" if error.code() == grpc.StatusCode.FAILED_PRECONDITION: details = error.trailing_metadata() for key, value in details: if key == "grpc-status-details-bin": return EventBook.FromString(value) return None
async def execute_with_retry( intent: CommandIntent, # What the user wanted build_command: Callable, # Function: (intent, state) -> CommandBook client: AggregateClient, max_attempts: int = 10,) -> EventBook: """Execute command with retry, rebuilding from fresh state each attempt.""" delay = 0.01 current_state = None
for attempt in range(max_attempts): # Get state if first attempt, otherwise use state from error if current_state is None: current_state = await client.get_state(intent.root_id)
# Rebuild command from current state try: command = build_command(intent, current_state) except CommandRejectedError: raise # Business logic rejected with fresh state
try: return await client.execute(command) except grpc.RpcError as e: if e.code() == grpc.StatusCode.FAILED_PRECONDITION: # Extract fresh state for next attempt current_state = handle_sequence_error(e) await asyncio.sleep(delay) delay = min(delay * 2, 2.0) else: raise
raise RuntimeError("Max retry attempts exceeded")Dead Letter Queue (DLQ)
Section titled “Dead Letter Queue (DLQ)”Messages that cannot be processed after retries are routed to a dead letter queue for manual intervention.
When Messages Go to DLQ
Section titled “When Messages Go to DLQ”| Scenario | MergeStrategy | Result |
|---|---|---|
| Sequence mismatch | MERGE_MANUAL | Immediate DLQ |
| Event handler failed | Any | DLQ after retries |
| Payload retrieval failed | Any | DLQ |
| Max retries exceeded | COMMUTATIVE | DLQ |
Topic Naming
Section titled “Topic Naming”DLQ topics are per-domain for isolation:
angzarr.dlq.{domain}Examples:
angzarr.dlq.player— Player domain failuresangzarr.dlq.hand— Hand domain failuresangzarr.dlq.table— Table domain failures
Dead Letter Structure
Section titled “Dead Letter Structure”// Dead letter queue entry for failed messages requiring manual intervention.// Per-domain topics: angzarr.dlq.{domain}message AngzarrDeadLetter { Cover cover = 1; // Routing: domain, root, correlation_id oneof payload { CommandBook rejected_command = 2; // Command that failed EventBook rejected_events = 9; // Events that failed (saga/projector failures) } string rejection_reason = 3; // Human-readable reason oneof rejection_details { SequenceMismatchDetails sequence_mismatch = 12; // Sequence conflict details EventProcessingFailedDetails event_processing_failed = 13; // Handler failure details PayloadRetrievalFailedDetails payload_retrieval_failed = 14; // Payload store failure details } google.protobuf.Timestamp occurred_at = 7; map<string, string> metadata = 8; // Additional context string source_component = 10; // Which component sent to DLQ string source_component_type = 11; // "aggregate" | "saga" | "projector" | "process_manager"}DLQ Backends
Section titled “DLQ Backends”| Backend | Topic Format | Use Case |
|---|---|---|
| Channel | In-memory | Standalone mode, testing |
| AMQP | angzarr.dlq exchange | RabbitMQ production |
| Kafka | angzarr-dlq-{domain} | Kafka production |
| Pub/Sub | angzarr-dlq-{domain} | GCP production |
| SNS/SQS | angzarr-dlq-{domain} | AWS production |
Configuration
Section titled “Configuration”dlq: backend: amqp # none | channel | amqp | kafka | pubsub | sns_sqs amqp_url: "amqp://localhost:5672" # OR kafka_brokers: "localhost:9092" # OR aws_region: "us-east-1" aws_endpoint_url: "http://localhost:4566" # LocalStackProcessing DLQ Messages
Section titled “Processing DLQ Messages”Dead letters require manual review. Common actions:
- Fix and replay — Correct the issue, resubmit command
- Skip — Mark as handled without replay (data not critical)
- Compensate — Emit compensating events manually
- Escalate — Route to operations team
Saga Compensation
Section titled “Saga Compensation”When a saga-issued command is rejected, angzarr notifies the original aggregate so it can emit compensation events.
Compensation Flow
Section titled “Compensation Flow”1. Saga emits command to target aggregate2. Target aggregate rejects (business rule violation)3. Framework builds RejectionNotification4. Framework sends Notification to source aggregate5. Source aggregate emits compensation events OR Source aggregate returns RevocationResponse flagsNotification vs Event
Section titled “Notification vs Event”| Aspect | Notification | Event |
|---|---|---|
| Persisted? | No | Yes |
| Sequence? | No | Yes |
| Replay? | No | Yes |
| Purpose | Signaling | State change |
Notifications are ephemeral signals — they trigger behavior but aren’t part of the event stream.
RejectionNotification
Section titled “RejectionNotification”// Notification payload for command rejection scenarios.// Embedded in Notification.payload when a saga/PM command is rejected.//// Source info for compensation is in rejected_command.pages[].header.angzarr_deferred:// - source.domain, source.root, source.edition → where to route rejection// - source_seq → which event triggered the commandmessage RejectionNotification { CommandBook rejected_command = 1; // The command that was rejected (full context) string rejection_reason = 2; // Why: "insufficient_funds", "out_of_stock", etc. // Fields 3-6 removed: source info now in rejected_command.pages[].header.angzarr_deferred}Handling Rejections
Section titled “Handling Rejections”The source aggregate registers rejection handlers using @rejected decorators. The framework dispatches based on the rejected command’s domain and type — no if/else chains needed.
examples/python/player/agg/rejected.py
def handle_table_join_rejected( notification: types.Notification, state: PlayerState,) -> player.FundsReleased | None: """Handle JoinTable rejection by releasing reserved funds.
Returns the FundsReleased event directly (packed into an EventBook by the router) or ``None`` if no reservation exists for the rejected table. """ rejection = types.RejectionNotification() if notification.HasField("payload"): notification.payload.Unpack(rejection)
table_root = b"" if rejection.HasField("rejected_command"): rc = rejection.rejected_command if rc.HasField("cover") and rc.cover.HasField("root"): table_root = rc.cover.root.value
table_key = table_root.hex() reserved_amount = state.table_reservations.get(table_key, 0) if reserved_amount == 0: return None new_reserved = state.reserved_funds - reserved_amount new_available = state.bankroll - new_reserved
return player.FundsReleased( amount=poker_types.Currency(amount=reserved_amount, currency_code="CHIPS"), table_root=table_root, new_available_balance=poker_types.Currency( amount=new_available, currency_code="CHIPS" ), new_reserved_balance=poker_types.Currency( amount=new_reserved, currency_code="CHIPS" ), released_at=now(), )RevocationResponse Flags
Section titled “RevocationResponse Flags”When an aggregate cannot handle a rejection, it returns flags:
| Flag | Effect |
|---|---|
emit_system_revocation | Emit SagaCompensationFailed event to fallback domain |
send_to_dead_letter_queue | Route to DLQ for manual review |
escalate | Trigger webhook notification |
abort | Stop saga chain, propagate error to caller |
Configuration
Section titled “Configuration”saga_compensation: fallback_domain: "system" # Domain for SagaCompensationFailed events fallback_emit_system_revocation: true # Default when aggregate returns empty fallback_send_to_dlq: true fallback_escalate: false dead_letter_queue_url: "amqp://..." # Optional DLQ URL escalation_webhook_url: "https://..." # Optional webhook for alertsMergeStrategy and Conflicts
Section titled “MergeStrategy and Conflicts”The MergeStrategy enum controls how sequence conflicts are handled:
| Strategy | Proto Value | Behavior on Conflict |
|---|---|---|
MERGE_COMMUTATIVE | 0 (default) | FAILED_PRECONDITION — retryable with fresh state |
MERGE_STRICT | 1 | ABORTED — immediate rejection, no retry |
MERGE_AGGREGATE_HANDLES | 2 | Bypass validation — aggregate decides |
MERGE_MANUAL | 3 | ABORTED + route to DLQ |
When to Use Each
Section titled “When to Use Each”| Use Case | Strategy | Why |
|---|---|---|
| Financial operations (balance checks) | STRICT | Must see current state |
| Counters, metrics | COMMUTATIVE | Order doesn’t matter |
| CRDT-style operations | AGGREGATE_HANDLES | Aggregate merges conflicts |
| Auditable operations | MANUAL | Human review required |
MERGE_MANUAL Flow
Section titled “MERGE_MANUAL Flow”1. Command arrives with MERGE_MANUAL2. Coordinator detects sequence mismatch3. Command NOT executed4. AngzarrDeadLetter created with SequenceMismatchDetails5. Published to DLQ topic6. Returns ABORTED status to callerEscalation
Section titled “Escalation”For critical failures, angzarr can trigger alerts via webhooks:
saga_compensation: escalation_webhook_url: "https://ops.example.com/alerts"Webhook payload:
{ "saga_name": "saga-table-player", "triggering_aggregate": { "domain": "table", "root": "550e8400-e29b-41d4-a716-446655440000" }, "triggering_sequence": 5, "rejection_reason": "insufficient_funds", "compensation_reason": "Aggregate returned empty response", "occurred_at": "2024-01-15T10:30:00Z"}Best Practices
Section titled “Best Practices”1. Design for Idempotency
Section titled “1. Design for Idempotency”Handlers may be retried. Ensure repeated execution produces the same result.
# BAD: Not idempotentdef apply_funds_deposited(state, event): state.balance += event.amount # Adds again on retry!
# GOOD: Idempotent (events have absolute values)def apply_funds_deposited(state, event): state.balance = event.new_balance # Same result on retry2. Use Appropriate MergeStrategy
Section titled “2. Use Appropriate MergeStrategy”# Financial operations: STRICT@merge_strategy(MergeStrategy.STRICT)def handle_transfer(state, cmd): if cmd.amount > state.available: raise CommandRejectedError("insufficient_funds")
# Counters: COMMUTATIVE@merge_strategy(MergeStrategy.COMMUTATIVE)def handle_increment(state, cmd): return ValueIncremented(delta=cmd.delta)
# Audit-critical: MANUAL@merge_strategy(MergeStrategy.MANUAL)def handle_compliance_action(state, cmd): return ComplianceActionTaken(action=cmd.action)3. Handle Notifications Explicitly
Section titled “3. Handle Notifications Explicitly”Don’t ignore RejectionNotification — either compensate or escalate:
def handle_rejection(state, rejection): if can_compensate(rejection): return build_compensation_event(rejection) else: return RevocationResponse( send_to_dead_letter_queue=True, escalate=True, reason="Cannot compensate automatically", )4. Monitor DLQ Depth
Section titled “4. Monitor DLQ Depth”Set up alerts for DLQ message accumulation:
# Prometheus alert- alert: DLQBacklog expr: dlq_messages_total > 100 for: 5m labels: severity: warningNext Steps
Section titled “Next Steps”- Patterns Reference — MergeStrategy, SyncMode details
- Sagas — Cross-domain coordination
- Payload Offloading — Large message handling