Skip to main content

Error Recovery

Angzarr provides layered error recovery: automatic retry for transient failures, dead letter queues for persistent failures, and compensation flows for saga rejections.


Overview


Retry with Backoff

Transient failures (network issues, temporary unavailability) are automatically retried with exponential backoff and jitter.

Default Backoff Policies

ContextMin DelayMax DelayMax AttemptsJitter
Saga/PM commands10ms2s10Yes
gRPC connections100ms5s30Yes

Retryable vs Non-Retryable Errors

gRPC CodeMeaningRetryable?Action
FAILED_PRECONDITIONSequence mismatch (stale client state)YesFetch fresh state, rebuild command
ABORTEDSent to DLQ (MERGE_MANUAL)NoManual intervention required
INVALID_ARGUMENTBad command dataNoFix command, resubmit
NOT_FOUNDAggregate doesn't existNoCheck aggregate ID
INTERNALServer errorNoCheck server logs

Retry Flow

1. Command fails with FAILED_PRECONDITION
2. Error details contain current EventBook
3. Client extracts EventBook from error
4. Client rebuilds state from fresh EventBook
5. Client re-evaluates business logic with fresh state
6. Client builds new command with correct sequence
7. Client resubmits
8. Repeat until success or max attempts
Rebuild, Don't Just Re-sequence

Simply updating the sequence number is insufficient. The command's payload may depend on state that changed. You must rebuild the command by re-running your business logic against the fresh state.

// Extract fresh state from sequence mismatch error
fn handle_sequence_error(status: &Status) -> Option<EventBook> {
if status.code() == Code::FailedPrecondition {
let details = status.details();
if !details.is_empty() {
return EventBook::decode(details).ok();
}
}
None
}

// Retry loop with backoff - rebuilds command from fresh state
async fn execute_with_retry<F>(
intent: &CommandIntent, // Original intent (what user wanted)
build_command: F, // Function to build command from state
client: &mut AggregateClient,
) -> Result<EventBook, Status>
where
F: Fn(&CommandIntent, &EventBook) -> Result<CommandBook, CommandRejectedError>,
{
let backoff = saga_backoff();
let mut delays = backoff.build();
let mut current_state: Option<EventBook> = None;

loop {
// Build command from current state (or fetch if first attempt)
let state = match &current_state {
Some(s) => s.clone(),
None => client.get_state(intent.root_id()).await?,
};

let command = match build_command(intent, &state) {
Ok(cmd) => cmd,
Err(e) => return Err(e.into()), // Business logic rejected
};

match client.execute(command).await {
Ok(events) => return Ok(events),
Err(status) if status.code() == Code::FailedPrecondition => {
if let Some(delay) = delays.next() {
// Extract fresh state for next iteration
current_state = handle_sequence_error(&status);
tokio::time::sleep(delay).await;
} else {
return Err(status);
}
}
Err(status) => return Err(status),
}
}
}

Dead Letter Queue (DLQ)

Messages that cannot be processed after retries are routed to a dead letter queue for manual intervention.

When Messages Go to DLQ

ScenarioMergeStrategyResult
Sequence mismatchMERGE_MANUALImmediate DLQ
Event handler failedAnyDLQ after retries
Payload retrieval failedAnyDLQ
Max retries exceededCOMMUTATIVEDLQ

Topic Naming

DLQ topics are per-domain for isolation:

angzarr.dlq.{domain}

Examples:

  • angzarr.dlq.player — Player domain failures
  • angzarr.dlq.hand — Hand domain failures
  • angzarr.dlq.table — Table domain failures

Dead Letter Structure

// Dead letter queue entry for failed messages requiring manual intervention.
// Per-domain topics: angzarr.dlq.{domain}
message AngzarrDeadLetter {
Cover cover = 1; // Routing: domain, root, correlation_id
oneof payload {
CommandBook rejected_command = 2; // Command that failed
EventBook rejected_events = 9; // Events that failed (saga/projector failures)
}
string rejection_reason = 3; // Human-readable reason
oneof rejection_details {
SequenceMismatchDetails sequence_mismatch = 12; // Sequence conflict details
EventProcessingFailedDetails event_processing_failed = 13; // Handler failure details
PayloadRetrievalFailedDetails payload_retrieval_failed = 14; // Payload store failure details
}
google.protobuf.Timestamp occurred_at = 7;
map<string, string> metadata = 8; // Additional context
string source_component = 10; // Which component sent to DLQ
string source_component_type = 11; // "aggregate" | "saga" | "projector" | "process_manager"
}

DLQ Backends

BackendTopic FormatUse Case
ChannelIn-memoryStandalone mode, testing
AMQPangzarr.dlq exchangeRabbitMQ production
Kafkaangzarr-dlq-{domain}Kafka production
Pub/Subangzarr-dlq-{domain}GCP production
SNS/SQSangzarr-dlq-{domain}AWS production

Configuration

dlq:
backend: amqp # none | channel | amqp | kafka | pubsub | sns_sqs
amqp_url: "amqp://localhost:5672"
# OR
kafka_brokers: "localhost:9092"
# OR
aws_region: "us-east-1"
aws_endpoint_url: "http://localhost:4566" # LocalStack

Processing DLQ Messages

Dead letters require manual review. Common actions:

  1. Fix and replay — Correct the issue, resubmit command
  2. Skip — Mark as handled without replay (data not critical)
  3. Compensate — Emit compensating events manually
  4. Escalate — Route to operations team

Saga Compensation

When a saga-issued command is rejected, angzarr notifies the original aggregate so it can emit compensation events.

Compensation Flow

1. Saga emits command to target aggregate
2. Target aggregate rejects (business rule violation)
3. Framework builds RejectionNotification
4. Framework sends Notification to source aggregate
5. Source aggregate emits compensation events
OR
Source aggregate returns RevocationResponse flags

Notification vs Event

AspectNotificationEvent
Persisted?NoYes
Sequence?NoYes
Replay?NoYes
PurposeSignalingState change

Notifications are ephemeral signals — they trigger behavior but aren't part of the event stream.

RejectionNotification

// Notification payload for command rejection scenarios.
// Embedded in Notification.payload when a saga/PM command is rejected.
//
// Source info for compensation is in rejected_command.pages[].header.angzarr_deferred:
// - source.domain, source.root, source.edition → where to route rejection
// - source_seq → which event triggered the command
message RejectionNotification {
CommandBook rejected_command = 1; // The command that was rejected (full context)
string rejection_reason = 2; // Why: "insufficient_funds", "out_of_stock", etc.
// Fields 3-6 removed: source info now in rejected_command.pages[].header.angzarr_deferred
}

Handling Rejections

The source aggregate registers rejection handlers using @rejected decorators. The framework dispatches based on the rejected command's domain and type — no if/else chains needed.

examples/python/player/agg/rejected.py

def handle_join_rejected(
notification: types.Notification,
state: PlayerState,
) -> types.EventBook | None:
"""Handle JoinTable rejection by releasing reserved funds.

Called when the JoinTable command (issued by saga-player-table after
FundsReserved) is rejected by the Table aggregate.
"""
from google.protobuf.any_pb2 import Any

# Extract rejection details from the notification payload
rejection = types.RejectionNotification()
if notification.payload:
notification.payload.Unpack(rejection)

# Extract table_root from the rejected command
table_root = b""
if rejection.rejected_command and rejection.rejected_command.cover:
if rejection.rejected_command.cover.root:
table_root = rejection.rejected_command.cover.root.value

# Release the funds that were reserved for this table
table_key = table_root.hex()
reserved_amount = state.table_reservations.get(table_key, 0)
new_reserved = state.reserved_funds - reserved_amount
new_available = state.bankroll - new_reserved

event = player.FundsReleased(
amount=poker_types.Currency(amount=reserved_amount, currency_code="CHIPS"),
table_root=table_root,
new_available_balance=poker_types.Currency(
amount=new_available, currency_code="CHIPS"
),
new_reserved_balance=poker_types.Currency(
amount=new_reserved, currency_code="CHIPS"
),
released_at=now(),
)

# Pack the event
event_any = Any()
event_any.Pack(event, type_url_prefix="type.googleapis.com/")

# Build the EventBook using the notification's cover for routing
return types.EventBook(
cover=notification.cover,
pages=[types.EventPage(header=types.PageHeader(sequence=0), event=event_any)],
)


RevocationResponse Flags

When an aggregate cannot handle a rejection, it returns flags:

FlagEffect
emit_system_revocationEmit SagaCompensationFailed event to fallback domain
send_to_dead_letter_queueRoute to DLQ for manual review
escalateTrigger webhook notification
abortStop saga chain, propagate error to caller

Configuration

saga_compensation:
fallback_domain: "system" # Domain for SagaCompensationFailed events
fallback_emit_system_revocation: true # Default when aggregate returns empty
fallback_send_to_dlq: true
fallback_escalate: false
dead_letter_queue_url: "amqp://..." # Optional DLQ URL
escalation_webhook_url: "https://..." # Optional webhook for alerts

MergeStrategy and Conflicts

The MergeStrategy enum controls how sequence conflicts are handled:

StrategyProto ValueBehavior on Conflict
MERGE_COMMUTATIVE0 (default)FAILED_PRECONDITION — retryable with fresh state
MERGE_STRICT1ABORTED — immediate rejection, no retry
MERGE_AGGREGATE_HANDLES2Bypass validation — aggregate decides
MERGE_MANUAL3ABORTED + route to DLQ

When to Use Each

Use CaseStrategyWhy
Financial operations (balance checks)STRICTMust see current state
Counters, metricsCOMMUTATIVEOrder doesn't matter
CRDT-style operationsAGGREGATE_HANDLESAggregate merges conflicts
Auditable operationsMANUALHuman review required

MERGE_MANUAL Flow

1. Command arrives with MERGE_MANUAL
2. Coordinator detects sequence mismatch
3. Command NOT executed
4. AngzarrDeadLetter created with SequenceMismatchDetails
5. Published to DLQ topic
6. Returns ABORTED status to caller

Escalation

For critical failures, angzarr can trigger alerts via webhooks:

saga_compensation:
escalation_webhook_url: "https://ops.example.com/alerts"

Webhook payload:

{
"saga_name": "saga-table-player",
"triggering_aggregate": {
"domain": "table",
"root": "550e8400-e29b-41d4-a716-446655440000"
},
"triggering_sequence": 5,
"rejection_reason": "insufficient_funds",
"compensation_reason": "Aggregate returned empty response",
"occurred_at": "2024-01-15T10:30:00Z"
}

Best Practices

1. Design for Idempotency

Handlers may be retried. Ensure repeated execution produces the same result.

# BAD: Not idempotent
def apply_funds_deposited(state, event):
state.balance += event.amount # Adds again on retry!

# GOOD: Idempotent (events have absolute values)
def apply_funds_deposited(state, event):
state.balance = event.new_balance # Same result on retry

2. Use Appropriate MergeStrategy

# Financial operations: STRICT
@merge_strategy(MergeStrategy.STRICT)
def handle_transfer(state, cmd):
if cmd.amount > state.available:
raise CommandRejectedError("insufficient_funds")

# Counters: COMMUTATIVE
@merge_strategy(MergeStrategy.COMMUTATIVE)
def handle_increment(state, cmd):
return ValueIncremented(delta=cmd.delta)

# Audit-critical: MANUAL
@merge_strategy(MergeStrategy.MANUAL)
def handle_compliance_action(state, cmd):
return ComplianceActionTaken(action=cmd.action)

3. Handle Notifications Explicitly

Don't ignore RejectionNotification — either compensate or escalate:

def handle_rejection(state, rejection):
if can_compensate(rejection):
return build_compensation_event(rejection)
else:
return RevocationResponse(
send_to_dead_letter_queue=True,
escalate=True,
reason="Cannot compensate automatically",
)

4. Monitor DLQ Depth

Set up alerts for DLQ message accumulation:

# Prometheus alert
- alert: DLQBacklog
expr: dlq_messages_total > 100
for: 5m
labels:
severity: warning

Next Steps