Skip to content

Error Recovery

Angzarr provides layered error recovery: automatic retry for transient failures, dead letter queues for persistent failures, and compensation flows for saga rejections.


flowchart TB
    Fail[Command fails]
    Trans{Transient?}
    Retry[Retry with backoff]
    Still{Still failing?}
    DLQ1[DLQ]
    Seq{Sequence conflict?}
    MS[Check MergeStrategy]
    Comm[COMMUTATIVE:<br/>Retry with fresh state]
    Strict[STRICT:<br/>Reject immediately]
    AggH[AGGREGATE_HANDLES:<br/>Let aggregate decide]
    Manual[MANUAL:<br/>Send to DLQ]
    Biz{Business rejection?}
    Comp[Compensation flow]

    Fail --> Trans
    Trans -->|yes| Retry --> Still
    Still -->|yes| DLQ1
    Trans -->|no| Seq
    Seq -->|yes| MS
    MS --> Comm
    MS --> Strict
    MS --> AggH
    MS --> Manual
    Seq -->|no| Biz
    Biz -->|yes| Comp

Transient failures (network issues, temporary unavailability) are automatically retried with exponential backoff and jitter.

ContextMin DelayMax DelayMax AttemptsJitter
Saga/PM commands10ms2s10Yes
gRPC connections100ms5s30Yes
gRPC CodeMeaningRetryable?Action
FAILED_PRECONDITIONSequence mismatch (stale client state)YesFetch fresh state, rebuild command
ABORTEDSent to DLQ (MERGE_MANUAL)NoManual intervention required
INVALID_ARGUMENTBad command dataNoFix command, resubmit
NOT_FOUNDAggregate doesn’t existNoCheck aggregate ID
INTERNALServer errorNoCheck server logs
1. Command fails with FAILED_PRECONDITION
2. Error details contain current EventBook
3. Client extracts EventBook from error
4. Client rebuilds state from fresh EventBook
5. Client re-evaluates business logic with fresh state
6. Client builds new command with correct sequence
7. Client resubmits
8. Repeat until success or max attempts

Rebuild, Don’t Just Re-sequence

Simply updating the sequence number is insufficient. The command’s payload may depend on state that changed. You must rebuild the command by re-running your business logic against the fresh state.

import grpc
from angzarr_client.proto import EventBook
def handle_sequence_error(error: grpc.RpcError) -> EventBook | None:
"""Extract fresh state from sequence mismatch error."""
if error.code() == grpc.StatusCode.FAILED_PRECONDITION:
details = error.trailing_metadata()
for key, value in details:
if key == "grpc-status-details-bin":
return EventBook.FromString(value)
return None
async def execute_with_retry(
intent: CommandIntent, # What the user wanted
build_command: Callable, # Function: (intent, state) -> CommandBook
client: AggregateClient,
max_attempts: int = 10,
) -> EventBook:
"""Execute command with retry, rebuilding from fresh state each attempt."""
delay = 0.01
current_state = None
for attempt in range(max_attempts):
# Get state if first attempt, otherwise use state from error
if current_state is None:
current_state = await client.get_state(intent.root_id)
# Rebuild command from current state
try:
command = build_command(intent, current_state)
except CommandRejectedError:
raise # Business logic rejected with fresh state
try:
return await client.execute(command)
except grpc.RpcError as e:
if e.code() == grpc.StatusCode.FAILED_PRECONDITION:
# Extract fresh state for next attempt
current_state = handle_sequence_error(e)
await asyncio.sleep(delay)
delay = min(delay * 2, 2.0)
else:
raise
raise RuntimeError("Max retry attempts exceeded")

Messages that cannot be processed after retries are routed to a dead letter queue for manual intervention.

ScenarioMergeStrategyResult
Sequence mismatchMERGE_MANUALImmediate DLQ
Event handler failedAnyDLQ after retries
Payload retrieval failedAnyDLQ
Max retries exceededCOMMUTATIVEDLQ

DLQ topics are per-domain for isolation:

angzarr.dlq.{domain}

Examples:

  • angzarr.dlq.player — Player domain failures
  • angzarr.dlq.hand — Hand domain failures
  • angzarr.dlq.table — Table domain failures
// Dead letter queue entry for failed messages requiring manual intervention.
// Per-domain topics: angzarr.dlq.{domain}
message AngzarrDeadLetter {
Cover cover = 1; // Routing: domain, root, correlation_id
oneof payload {
CommandBook rejected_command = 2; // Command that failed
EventBook rejected_events = 9; // Events that failed (saga/projector failures)
}
string rejection_reason = 3; // Human-readable reason
oneof rejection_details {
SequenceMismatchDetails sequence_mismatch = 12; // Sequence conflict details
EventProcessingFailedDetails event_processing_failed = 13; // Handler failure details
PayloadRetrievalFailedDetails payload_retrieval_failed = 14; // Payload store failure details
}
google.protobuf.Timestamp occurred_at = 7;
map<string, string> metadata = 8; // Additional context
string source_component = 10; // Which component sent to DLQ
string source_component_type = 11; // "aggregate" | "saga" | "projector" | "process_manager"
}
BackendTopic FormatUse Case
ChannelIn-memoryStandalone mode, testing
AMQPangzarr.dlq exchangeRabbitMQ production
Kafkaangzarr-dlq-{domain}Kafka production
Pub/Subangzarr-dlq-{domain}GCP production
SNS/SQSangzarr-dlq-{domain}AWS production
dlq:
backend: amqp # none | channel | amqp | kafka | pubsub | sns_sqs
amqp_url: "amqp://localhost:5672"
# OR
kafka_brokers: "localhost:9092"
# OR
aws_region: "us-east-1"
aws_endpoint_url: "http://localhost:4566" # LocalStack

Dead letters require manual review. Common actions:

  1. Fix and replay — Correct the issue, resubmit command
  2. Skip — Mark as handled without replay (data not critical)
  3. Compensate — Emit compensating events manually
  4. Escalate — Route to operations team

When a saga-issued command is rejected, angzarr notifies the original aggregate so it can emit compensation events.

1. Saga emits command to target aggregate
2. Target aggregate rejects (business rule violation)
3. Framework builds RejectionNotification
4. Framework sends Notification to source aggregate
5. Source aggregate emits compensation events
OR
Source aggregate returns RevocationResponse flags
AspectNotificationEvent
Persisted?NoYes
Sequence?NoYes
Replay?NoYes
PurposeSignalingState change

Notifications are ephemeral signals — they trigger behavior but aren’t part of the event stream.

// Notification payload for command rejection scenarios.
// Embedded in Notification.payload when a saga/PM command is rejected.
//
// Source info for compensation is in rejected_command.pages[].header.angzarr_deferred:
// - source.domain, source.root, source.edition → where to route rejection
// - source_seq → which event triggered the command
message RejectionNotification {
CommandBook rejected_command = 1; // The command that was rejected (full context)
string rejection_reason = 2; // Why: "insufficient_funds", "out_of_stock", etc.
// Fields 3-6 removed: source info now in rejected_command.pages[].header.angzarr_deferred
}

The source aggregate registers rejection handlers using @rejected decorators. The framework dispatches based on the rejected command’s domain and type — no if/else chains needed.

examples/python/player/agg/rejected.py

def handle_table_join_rejected(
notification: types.Notification,
state: PlayerState,
) -> player.FundsReleased | None:
"""Handle JoinTable rejection by releasing reserved funds.
Returns the FundsReleased event directly (packed into an EventBook by the
router) or ``None`` if no reservation exists for the rejected table.
"""
rejection = types.RejectionNotification()
if notification.HasField("payload"):
notification.payload.Unpack(rejection)
table_root = b""
if rejection.HasField("rejected_command"):
rc = rejection.rejected_command
if rc.HasField("cover") and rc.cover.HasField("root"):
table_root = rc.cover.root.value
table_key = table_root.hex()
reserved_amount = state.table_reservations.get(table_key, 0)
if reserved_amount == 0:
return None
new_reserved = state.reserved_funds - reserved_amount
new_available = state.bankroll - new_reserved
return player.FundsReleased(
amount=poker_types.Currency(amount=reserved_amount, currency_code="CHIPS"),
table_root=table_root,
new_available_balance=poker_types.Currency(
amount=new_available, currency_code="CHIPS"
),
new_reserved_balance=poker_types.Currency(
amount=new_reserved, currency_code="CHIPS"
),
released_at=now(),
)

When an aggregate cannot handle a rejection, it returns flags:

FlagEffect
emit_system_revocationEmit SagaCompensationFailed event to fallback domain
send_to_dead_letter_queueRoute to DLQ for manual review
escalateTrigger webhook notification
abortStop saga chain, propagate error to caller
saga_compensation:
fallback_domain: "system" # Domain for SagaCompensationFailed events
fallback_emit_system_revocation: true # Default when aggregate returns empty
fallback_send_to_dlq: true
fallback_escalate: false
dead_letter_queue_url: "amqp://..." # Optional DLQ URL
escalation_webhook_url: "https://..." # Optional webhook for alerts

The MergeStrategy enum controls how sequence conflicts are handled:

StrategyProto ValueBehavior on Conflict
MERGE_COMMUTATIVE0 (default)FAILED_PRECONDITION — retryable with fresh state
MERGE_STRICT1ABORTED — immediate rejection, no retry
MERGE_AGGREGATE_HANDLES2Bypass validation — aggregate decides
MERGE_MANUAL3ABORTED + route to DLQ
Use CaseStrategyWhy
Financial operations (balance checks)STRICTMust see current state
Counters, metricsCOMMUTATIVEOrder doesn’t matter
CRDT-style operationsAGGREGATE_HANDLESAggregate merges conflicts
Auditable operationsMANUALHuman review required
1. Command arrives with MERGE_MANUAL
2. Coordinator detects sequence mismatch
3. Command NOT executed
4. AngzarrDeadLetter created with SequenceMismatchDetails
5. Published to DLQ topic
6. Returns ABORTED status to caller

For critical failures, angzarr can trigger alerts via webhooks:

saga_compensation:
escalation_webhook_url: "https://ops.example.com/alerts"

Webhook payload:

{
"saga_name": "saga-table-player",
"triggering_aggregate": {
"domain": "table",
"root": "550e8400-e29b-41d4-a716-446655440000"
},
"triggering_sequence": 5,
"rejection_reason": "insufficient_funds",
"compensation_reason": "Aggregate returned empty response",
"occurred_at": "2024-01-15T10:30:00Z"
}

Handlers may be retried. Ensure repeated execution produces the same result.

# BAD: Not idempotent
def apply_funds_deposited(state, event):
state.balance += event.amount # Adds again on retry!
# GOOD: Idempotent (events have absolute values)
def apply_funds_deposited(state, event):
state.balance = event.new_balance # Same result on retry
# Financial operations: STRICT
@merge_strategy(MergeStrategy.STRICT)
def handle_transfer(state, cmd):
if cmd.amount > state.available:
raise CommandRejectedError("insufficient_funds")
# Counters: COMMUTATIVE
@merge_strategy(MergeStrategy.COMMUTATIVE)
def handle_increment(state, cmd):
return ValueIncremented(delta=cmd.delta)
# Audit-critical: MANUAL
@merge_strategy(MergeStrategy.MANUAL)
def handle_compliance_action(state, cmd):
return ComplianceActionTaken(action=cmd.action)

Don’t ignore RejectionNotification — either compensate or escalate:

def handle_rejection(state, rejection):
if can_compensate(rejection):
return build_compensation_event(rejection)
else:
return RevocationResponse(
send_to_dead_letter_queue=True,
escalate=True,
reason="Cannot compensate automatically",
)

Set up alerts for DLQ message accumulation:

# Prometheus alert
- alert: DLQBacklog
expr: dlq_messages_total > 100
for: 5m
labels:
severity: warning