Skip to main content

Error Recovery

Angzarr provides layered error recovery: automatic retry for transient failures, dead letter queues for persistent failures, and compensation flows for saga rejections.


Overview


Retry with Backoff

Transient failures (network issues, temporary unavailability) are automatically retried with exponential backoff and jitter.

Default Backoff Policies

ContextMin DelayMax DelayMax AttemptsJitter
Saga/PM commands10ms2s10Yes
gRPC connections100ms5s30Yes

Retryable vs Non-Retryable Errors

gRPC CodeMeaningRetryable?Action
FAILED_PRECONDITIONSequence mismatch (stale client state)YesFetch fresh state, rebuild command
ABORTEDSent to DLQ (MERGE_MANUAL)NoManual intervention required
INVALID_ARGUMENTBad command dataNoFix command, resubmit
NOT_FOUNDAggregate doesn't existNoCheck aggregate ID
INTERNALServer errorNoCheck server logs

Retry Flow

1. Command fails with FAILED_PRECONDITION
2. Error details contain current EventBook
3. Client extracts EventBook from error
4. Client rebuilds command with correct sequence
5. Client resubmits
6. Repeat until success or max attempts
// Extract fresh state from sequence mismatch error
fn handle_sequence_error(status: &Status) -> Option<EventBook> {
if status.code() == Code::FailedPrecondition {
// EventBook is serialized in status details
let details = status.details();
if !details.is_empty() {
return EventBook::decode(details).ok();
}
}
None
}

// Retry loop with backoff
async fn execute_with_retry(
mut command: CommandBook,
client: &mut AggregateClient,
) -> Result<EventBook, Status> {
let backoff = saga_backoff();
let mut delays = backoff.build();

loop {
match client.execute(command.clone()).await {
Ok(events) => return Ok(events),
Err(status) if status.code() == Code::FailedPrecondition => {
if let Some(delay) = delays.next() {
// Extract fresh state and rebuild command
if let Some(current) = handle_sequence_error(&status) {
command.pages[0].sequence = current.next_sequence;
}
tokio::time::sleep(delay).await;
} else {
return Err(status); // Max retries exceeded
}
}
Err(status) => return Err(status), // Non-retryable
}
}
}

Dead Letter Queue (DLQ)

Messages that cannot be processed after retries are routed to a dead letter queue for manual intervention.

When Messages Go to DLQ

ScenarioMergeStrategyResult
Sequence mismatchMERGE_MANUALImmediate DLQ
Event handler failedAnyDLQ after retries
Payload retrieval failedAnyDLQ
Max retries exceededCOMMUTATIVEDLQ

Topic Naming

DLQ topics are per-domain for isolation:

angzarr.dlq.{domain}

Examples:

  • angzarr.dlq.player — Player domain failures
  • angzarr.dlq.hand — Hand domain failures
  • angzarr.dlq.table — Table domain failures

Dead Letter Structure

// Dead letter queue entry for failed messages requiring manual intervention.
// Per-domain topics: angzarr.dlq.{domain}
message AngzarrDeadLetter {
Cover cover = 1; // Routing: domain, root, correlation_id
oneof payload {
CommandBook rejected_command = 2; // Command that failed
EventBook rejected_events = 9; // Events that failed (saga/projector failures)
}
string rejection_reason = 3; // Human-readable reason
oneof rejection_details {
SequenceMismatchDetails sequence_mismatch = 12; // Sequence conflict details
EventProcessingFailedDetails event_processing_failed = 13; // Handler failure details
PayloadRetrievalFailedDetails payload_retrieval_failed = 14; // Payload store failure details
}
google.protobuf.Timestamp occurred_at = 7;
map<string, string> metadata = 8; // Additional context
string source_component = 10; // Which component sent to DLQ
string source_component_type = 11; // "aggregate" | "saga" | "projector" | "process_manager"
}

DLQ Backends

BackendTopic FormatUse Case
ChannelIn-memoryStandalone mode, testing
AMQPangzarr.dlq exchangeRabbitMQ production
Kafkaangzarr-dlq-{domain}Kafka production
Pub/Subangzarr-dlq-{domain}GCP production
SNS/SQSangzarr-dlq-{domain}AWS production

Configuration

dlq:
backend: amqp # none | channel | amqp | kafka | pubsub | sns_sqs
amqp_url: "amqp://localhost:5672"
# OR
kafka_brokers: "localhost:9092"
# OR
aws_region: "us-east-1"
aws_endpoint_url: "http://localhost:4566" # LocalStack

Processing DLQ Messages

Dead letters require manual review. Common actions:

  1. Fix and replay — Correct the issue, resubmit command
  2. Skip — Mark as handled without replay (data not critical)
  3. Compensate — Emit compensating events manually
  4. Escalate — Route to operations team

Saga Compensation

When a saga-issued command is rejected, angzarr notifies the original aggregate so it can emit compensation events.

Compensation Flow

1. Saga emits command to target aggregate
2. Target aggregate rejects (business rule violation)
3. Framework builds RejectionNotification
4. Framework sends Notification to source aggregate
5. Source aggregate emits compensation events
OR
Source aggregate returns RevocationResponse flags

Notification vs Event

AspectNotificationEvent
Persisted?NoYes
Sequence?NoYes
Replay?NoYes
PurposeSignalingState change

Notifications are ephemeral signals — they trigger behavior but aren't part of the event stream.

RejectionNotification

// Notification payload for command rejection scenarios.
// Embedded in Notification.payload when a saga/PM command is rejected.
message RejectionNotification {
CommandBook rejected_command = 1; // The command that was rejected (full context)
string rejection_reason = 2; // Why: "insufficient_funds", "out_of_stock", etc.
string issuer_name = 3; // Saga/PM name that issued the command
string issuer_type = 4; // "saga" | "process_manager"
Cover source_aggregate = 5; // Aggregate that originally triggered the flow
uint32 source_event_sequence = 6; // Event sequence that triggered the saga/PM
}

Handling Rejections

The source aggregate registers rejection handlers using @rejected decorators. The framework dispatches based on the rejected command's domain and type — no if/else chains needed.

examples/python/player/agg/handlers/player.py


@rejected(domain="table", command="JoinTable")
def handle_join_rejected(
self, notification: types.Notification
) -> player_proto.FundsReleased:
"""Release reserved funds when table join fails.

Called when the JoinTable command (issued by saga-player-table after
FundsReserved) is rejected by the Table aggregate.
"""
ctx = CompensationContext.from_notification(notification)

logger.warning(
"Player compensation for JoinTable rejection: reason=%s",
ctx.rejection_reason,
)

# Extract table_root from the rejected command
table_root = b""
if ctx.rejected_command and ctx.rejected_command.cover:
table_root = ctx.rejected_command.cover.root.value

# Release the funds that were reserved for this table
reserved_amount = self.table_reservations.get(table_root.hex(), 0)
new_reserved = self.reserved_funds - reserved_amount
new_available = self.bankroll - new_reserved

return player_proto.FundsReleased(
amount=poker_types.Currency(amount=reserved_amount, currency_code="CHIPS"),
table_root=table_root,
new_available_balance=poker_types.Currency(
amount=new_available, currency_code="CHIPS"
),
new_reserved_balance=poker_types.Currency(
amount=new_reserved, currency_code="CHIPS"
),
released_at=now(),
)

RevocationResponse Flags

When an aggregate cannot handle a rejection, it returns flags:

FlagEffect
emit_system_revocationEmit SagaCompensationFailed event to fallback domain
send_to_dead_letter_queueRoute to DLQ for manual review
escalateTrigger webhook notification
abortStop saga chain, propagate error to caller

Configuration

saga_compensation:
fallback_domain: "system" # Domain for SagaCompensationFailed events
fallback_emit_system_revocation: true # Default when aggregate returns empty
fallback_send_to_dlq: true
fallback_escalate: false
dead_letter_queue_url: "amqp://..." # Optional DLQ URL
escalation_webhook_url: "https://..." # Optional webhook for alerts

MergeStrategy and Conflicts

The MergeStrategy enum controls how sequence conflicts are handled:

StrategyProto ValueBehavior on Conflict
MERGE_COMMUTATIVE0 (default)FAILED_PRECONDITION — retryable with fresh state
MERGE_STRICT1ABORTED — immediate rejection, no retry
MERGE_AGGREGATE_HANDLES2Bypass validation — aggregate decides
MERGE_MANUAL3ABORTED + route to DLQ

When to Use Each

Use CaseStrategyWhy
Financial operations (balance checks)STRICTMust see current state
Counters, metricsCOMMUTATIVEOrder doesn't matter
CRDT-style operationsAGGREGATE_HANDLESAggregate merges conflicts
Auditable operationsMANUALHuman review required

MERGE_MANUAL Flow

1. Command arrives with MERGE_MANUAL
2. Coordinator detects sequence mismatch
3. Command NOT executed
4. AngzarrDeadLetter created with SequenceMismatchDetails
5. Published to DLQ topic
6. Returns ABORTED status to caller

Escalation

For critical failures, angzarr can trigger alerts via webhooks:

saga_compensation:
escalation_webhook_url: "https://ops.example.com/alerts"

Webhook payload:

{
"saga_name": "saga-table-player",
"triggering_aggregate": {
"domain": "table",
"root": "550e8400-e29b-41d4-a716-446655440000"
},
"triggering_sequence": 5,
"rejection_reason": "insufficient_funds",
"compensation_reason": "Aggregate returned empty response",
"occurred_at": "2024-01-15T10:30:00Z"
}

Best Practices

1. Design for Idempotency

Handlers may be retried. Ensure repeated execution produces the same result.

# BAD: Not idempotent
def apply_funds_deposited(state, event):
state.balance += event.amount # Adds again on retry!

# GOOD: Idempotent (events have absolute values)
def apply_funds_deposited(state, event):
state.balance = event.new_balance # Same result on retry

2. Use Appropriate MergeStrategy

# Financial operations: STRICT
@merge_strategy(MergeStrategy.STRICT)
def handle_transfer(state, cmd):
if cmd.amount > state.available:
raise CommandRejectedError("insufficient_funds")

# Counters: COMMUTATIVE
@merge_strategy(MergeStrategy.COMMUTATIVE)
def handle_increment(state, cmd):
return ValueIncremented(delta=cmd.delta)

# Audit-critical: MANUAL
@merge_strategy(MergeStrategy.MANUAL)
def handle_compliance_action(state, cmd):
return ComplianceActionTaken(action=cmd.action)

3. Handle Notifications Explicitly

Don't ignore RejectionNotification — either compensate or escalate:

def handle_rejection(state, rejection):
if can_compensate(rejection):
return build_compensation_event(rejection)
else:
return RevocationResponse(
send_to_dead_letter_queue=True,
escalate=True,
reason="Cannot compensate automatically",
)

4. Monitor DLQ Depth

Set up alerts for DLQ message accumulation:

# Prometheus alert
- alert: DLQBacklog
expr: dlq_messages_total > 100
for: 5m
labels:
severity: warning

Next Steps