Error Recovery
Angzarr provides layered error recovery: automatic retry for transient failures, dead letter queues for persistent failures, and compensation flows for saga rejections.
Overview
Retry with Backoff
Transient failures (network issues, temporary unavailability) are automatically retried with exponential backoff and jitter.
Default Backoff Policies
| Context | Min Delay | Max Delay | Max Attempts | Jitter |
|---|---|---|---|---|
| Saga/PM commands | 10ms | 2s | 10 | Yes |
| gRPC connections | 100ms | 5s | 30 | Yes |
Retryable vs Non-Retryable Errors
| gRPC Code | Meaning | Retryable? | Action |
|---|---|---|---|
FAILED_PRECONDITION | Sequence mismatch (stale client state) | Yes | Fetch fresh state, rebuild command |
ABORTED | Sent to DLQ (MERGE_MANUAL) | No | Manual intervention required |
INVALID_ARGUMENT | Bad command data | No | Fix command, resubmit |
NOT_FOUND | Aggregate doesn't exist | No | Check aggregate ID |
INTERNAL | Server error | No | Check server logs |
Retry Flow
1. Command fails with FAILED_PRECONDITION
2. Error details contain current EventBook
3. Client extracts EventBook from error
4. Client rebuilds command with correct sequence
5. Client resubmits
6. Repeat until success or max attempts
- Rust
- Python
- Go
// Extract fresh state from sequence mismatch error
fn handle_sequence_error(status: &Status) -> Option<EventBook> {
if status.code() == Code::FailedPrecondition {
// EventBook is serialized in status details
let details = status.details();
if !details.is_empty() {
return EventBook::decode(details).ok();
}
}
None
}
// Retry loop with backoff
async fn execute_with_retry(
mut command: CommandBook,
client: &mut AggregateClient,
) -> Result<EventBook, Status> {
let backoff = saga_backoff();
let mut delays = backoff.build();
loop {
match client.execute(command.clone()).await {
Ok(events) => return Ok(events),
Err(status) if status.code() == Code::FailedPrecondition => {
if let Some(delay) = delays.next() {
// Extract fresh state and rebuild command
if let Some(current) = handle_sequence_error(&status) {
command.pages[0].sequence = current.next_sequence;
}
tokio::time::sleep(delay).await;
} else {
return Err(status); // Max retries exceeded
}
}
Err(status) => return Err(status), // Non-retryable
}
}
}
import grpc
from angzarr_client.proto import EventBook
def handle_sequence_error(error: grpc.RpcError) -> EventBook | None:
"""Extract fresh state from sequence mismatch error."""
if error.code() == grpc.StatusCode.FAILED_PRECONDITION:
details = error.trailing_metadata()
for key, value in details:
if key == "grpc-status-details-bin":
return EventBook.FromString(value)
return None
async def execute_with_retry(
command: CommandBook,
client: AggregateClient,
max_attempts: int = 10,
) -> EventBook:
"""Execute command with exponential backoff retry."""
delay = 0.01 # Start at 10ms
for attempt in range(max_attempts):
try:
return await client.execute(command)
except grpc.RpcError as e:
if e.code() == grpc.StatusCode.FAILED_PRECONDITION:
# Extract fresh state and rebuild
if current := handle_sequence_error(e):
command.pages[0].sequence = current.next_sequence
await asyncio.sleep(delay)
delay = min(delay * 2, 2.0) # Cap at 2s
else:
raise # Non-retryable
raise RuntimeError("Max retry attempts exceeded")
import (
"google.golang.org/grpc/codes"
"google.golang.org/grpc/status"
pb "github.com/angzarr/angzarr/proto/angzarr"
)
// handleSequenceError extracts fresh state from error details
func handleSequenceError(err error) *pb.EventBook {
st, ok := status.FromError(err)
if !ok || st.Code() != codes.FailedPrecondition {
return nil
}
details := st.Details()
for _, detail := range details {
if eb, ok := detail.(*pb.EventBook); ok {
return eb
}
}
return nil
}
// executeWithRetry retries with exponential backoff
func executeWithRetry(
ctx context.Context,
command *pb.CommandBook,
client pb.AggregateClient,
) (*pb.EventBook, error) {
delay := 10 * time.Millisecond
maxDelay := 2 * time.Second
maxAttempts := 10
for attempt := 0; attempt < maxAttempts; attempt++ {
events, err := client.Execute(ctx, command)
if err == nil {
return events, nil
}
st, _ := status.FromError(err)
if st.Code() == codes.FailedPrecondition {
// Extract fresh state and rebuild
if current := handleSequenceError(err); current != nil {
command.Pages[0].Sequence = current.NextSequence
}
time.Sleep(delay)
delay = min(delay*2, maxDelay)
} else {
return nil, err // Non-retryable
}
}
return nil, fmt.Errorf("max retry attempts exceeded")
}
Dead Letter Queue (DLQ)
Messages that cannot be processed after retries are routed to a dead letter queue for manual intervention.
When Messages Go to DLQ
| Scenario | MergeStrategy | Result |
|---|---|---|
| Sequence mismatch | MERGE_MANUAL | Immediate DLQ |
| Event handler failed | Any | DLQ after retries |
| Payload retrieval failed | Any | DLQ |
| Max retries exceeded | COMMUTATIVE | DLQ |
Topic Naming
DLQ topics are per-domain for isolation:
angzarr.dlq.{domain}
Examples:
angzarr.dlq.player— Player domain failuresangzarr.dlq.hand— Hand domain failuresangzarr.dlq.table— Table domain failures
Dead Letter Structure
// Dead letter queue entry for failed messages requiring manual intervention.
// Per-domain topics: angzarr.dlq.{domain}
message AngzarrDeadLetter {
Cover cover = 1; // Routing: domain, root, correlation_id
oneof payload {
CommandBook rejected_command = 2; // Command that failed
EventBook rejected_events = 9; // Events that failed (saga/projector failures)
}
string rejection_reason = 3; // Human-readable reason
oneof rejection_details {
SequenceMismatchDetails sequence_mismatch = 12; // Sequence conflict details
EventProcessingFailedDetails event_processing_failed = 13; // Handler failure details
PayloadRetrievalFailedDetails payload_retrieval_failed = 14; // Payload store failure details
}
google.protobuf.Timestamp occurred_at = 7;
map<string, string> metadata = 8; // Additional context
string source_component = 10; // Which component sent to DLQ
string source_component_type = 11; // "aggregate" | "saga" | "projector" | "process_manager"
}
DLQ Backends
| Backend | Topic Format | Use Case |
|---|---|---|
| Channel | In-memory | Standalone mode, testing |
| AMQP | angzarr.dlq exchange | RabbitMQ production |
| Kafka | angzarr-dlq-{domain} | Kafka production |
| Pub/Sub | angzarr-dlq-{domain} | GCP production |
| SNS/SQS | angzarr-dlq-{domain} | AWS production |
Configuration
dlq:
backend: amqp # none | channel | amqp | kafka | pubsub | sns_sqs
amqp_url: "amqp://localhost:5672"
# OR
kafka_brokers: "localhost:9092"
# OR
aws_region: "us-east-1"
aws_endpoint_url: "http://localhost:4566" # LocalStack
Processing DLQ Messages
Dead letters require manual review. Common actions:
- Fix and replay — Correct the issue, resubmit command
- Skip — Mark as handled without replay (data not critical)
- Compensate — Emit compensating events manually
- Escalate — Route to operations team
Saga Compensation
When a saga-issued command is rejected, angzarr notifies the original aggregate so it can emit compensation events.
Compensation Flow
1. Saga emits command to target aggregate
2. Target aggregate rejects (business rule violation)
3. Framework builds RejectionNotification
4. Framework sends Notification to source aggregate
5. Source aggregate emits compensation events
OR
Source aggregate returns RevocationResponse flags
Notification vs Event
| Aspect | Notification | Event |
|---|---|---|
| Persisted? | No | Yes |
| Sequence? | No | Yes |
| Replay? | No | Yes |
| Purpose | Signaling | State change |
Notifications are ephemeral signals — they trigger behavior but aren't part of the event stream.
RejectionNotification
// Notification payload for command rejection scenarios.
// Embedded in Notification.payload when a saga/PM command is rejected.
message RejectionNotification {
CommandBook rejected_command = 1; // The command that was rejected (full context)
string rejection_reason = 2; // Why: "insufficient_funds", "out_of_stock", etc.
string issuer_name = 3; // Saga/PM name that issued the command
string issuer_type = 4; // "saga" | "process_manager"
Cover source_aggregate = 5; // Aggregate that originally triggered the flow
uint32 source_event_sequence = 6; // Event sequence that triggered the saga/PM
}
Handling Rejections
The source aggregate registers rejection handlers using @rejected decorators. The framework dispatches based on the rejected command's domain and type — no if/else chains needed.
- Python
- Rust
- Go
- Java
- C#
examples/python/player/agg/handlers/player.py
@rejected(domain="table", command="JoinTable")
def handle_join_rejected(
self, notification: types.Notification
) -> player_proto.FundsReleased:
"""Release reserved funds when table join fails.
Called when the JoinTable command (issued by saga-player-table after
FundsReserved) is rejected by the Table aggregate.
"""
ctx = CompensationContext.from_notification(notification)
logger.warning(
"Player compensation for JoinTable rejection: reason=%s",
ctx.rejection_reason,
)
# Extract table_root from the rejected command
table_root = b""
if ctx.rejected_command and ctx.rejected_command.cover:
table_root = ctx.rejected_command.cover.root.value
# Release the funds that were reserved for this table
reserved_amount = self.table_reservations.get(table_root.hex(), 0)
new_reserved = self.reserved_funds - reserved_amount
new_available = self.bankroll - new_reserved
return player_proto.FundsReleased(
amount=poker_types.Currency(amount=reserved_amount, currency_code="CHIPS"),
table_root=table_root,
new_available_balance=poker_types.Currency(
amount=new_available, currency_code="CHIPS"
),
new_reserved_balance=poker_types.Currency(
amount=new_reserved, currency_code="CHIPS"
),
released_at=now(),
)
examples/rust/player/agg/src/handlers/rejected.rs
/// Handle JoinTable rejection by releasing reserved funds.
///
/// Called when the JoinTable command (issued by saga-player-table after
/// FundsReserved) is rejected by the Table aggregate.
pub fn handle_join_rejected(
notification: &Notification,
state: &PlayerState,
) -> CommandResult<RejectionHandlerResponse> {
// Extract rejection details from the notification payload
let rejection = notification
.payload
.as_ref()
.and_then(|any| any.unpack::<RejectionNotification>().ok())
.unwrap_or_default();
warn!(
rejection_reason = %rejection.rejection_reason,
"Player compensation for JoinTable rejection"
);
// Extract table_root from the rejected command
let table_root = rejection
.rejected_command
.as_ref()
.and_then(|cmd| cmd.cover.as_ref())
.map(|cover| cover.root.as_ref().map(|r| r.value.clone()).unwrap_or_default())
.unwrap_or_default();
// Release the funds that were reserved for this table
let table_key = hex::encode(&table_root);
let reserved_amount = state.table_reservations.get(&table_key).copied().unwrap_or(0);
let new_reserved = state.reserved_funds - reserved_amount;
let new_available = state.bankroll - new_reserved;
let event = FundsReleased {
amount: Some(Currency {
amount: reserved_amount,
currency_code: "CHIPS".to_string(),
}),
table_root,
new_available_balance: Some(Currency {
amount: new_available,
currency_code: "CHIPS".to_string(),
}),
new_reserved_balance: Some(Currency {
amount: new_reserved,
currency_code: "CHIPS".to_string(),
}),
released_at: Some(now()),
};
let event_any = pack_event(&event, "examples.FundsReleased");
// Build the EventBook using the notification's cover for routing.
// Sequence 0 is a placeholder - framework assigns actual sequence during persist.
let event_book = EventBook {
cover: notification.cover.clone(),
pages: vec![event_page(0, event_any)],
snapshot: None,
next_sequence: 0,
};
Ok(RejectionHandlerResponse {
events: Some(event_book),
notification: None,
})
}
examples/go/player/agg/handlers/revocation.go
// HandleTableJoinRejected handles compensation when a table join fails.
//
// Called when a saga/PM command targeting the table aggregate's JoinTable
// command is rejected. This releases the funds that were reserved for the
// failed table join.
func HandleTableJoinRejected(notification *pb.Notification, state PlayerState) *pb.BusinessResponse {
ctx := angzarr.NewCompensationContext(notification)
log.Printf("Player compensation for JoinTable rejection: reason=%s",
ctx.RejectionReason)
// Extract table_root from the rejected command
var tableRoot []byte
if ctx.RejectedCommand != nil && ctx.RejectedCommand.Cover != nil && ctx.RejectedCommand.Cover.Root != nil {
tableRoot = ctx.RejectedCommand.Cover.Root.Value
}
// Release the funds that were reserved for this table
tableKey := hex.EncodeToString(tableRoot)
reservedAmount := state.TableReservations[tableKey]
newReserved := state.ReservedFunds - reservedAmount
newAvailable := state.Bankroll - newReserved
event := &examples.FundsReleased{
Amount: &examples.Currency{Amount: reservedAmount, CurrencyCode: "CHIPS"},
TableRoot: tableRoot,
NewAvailableBalance: &examples.Currency{Amount: newAvailable, CurrencyCode: "CHIPS"},
NewReservedBalance: &examples.Currency{Amount: newReserved, CurrencyCode: "CHIPS"},
ReleasedAt: timestamppb.Now(),
}
eventAny, _ := anypb.New(event)
return angzarr.EmitCompensationEvents(angzarr.NewEventBookFromNotification(notification, eventAny))
}
examples/java/player/agg/src/main/java/dev/angzarr/examples/player/Player.java
@Rejected(domain = "table", command = "JoinTable")
public FundsReleased handleJoinRejected(Notification notification) {
var ctx = CompensationContext.from(notification);
logger.warning("Player compensation for JoinTable rejection: reason=" + ctx.getRejectionReason());
// Extract table_root from the rejected command
byte[] tableRoot = new byte[0];
if (ctx.getRejectedCommand() != null
&& ctx.getRejectedCommand().getCover() != null
&& ctx.getRejectedCommand().getCover().hasRoot()) {
tableRoot = ctx.getRejectedCommand().getCover().getRoot().getValue().toByteArray();
}
// Release the funds that were reserved for this table
String tableKey = bytesToHex(tableRoot);
long reservedAmount = getState().getReservationForTable(tableKey);
long newReserved = getReservedFunds() - reservedAmount;
long newAvailable = getBankroll() - newReserved;
return FundsReleased.newBuilder()
.setAmount(Currency.newBuilder()
.setAmount(reservedAmount)
.setCurrencyCode("CHIPS"))
.setTableRoot(com.google.protobuf.ByteString.copyFrom(tableRoot))
.setNewAvailableBalance(Currency.newBuilder()
.setAmount(newAvailable)
.setCurrencyCode("CHIPS"))
.setNewReservedBalance(Currency.newBuilder()
.setAmount(newReserved)
.setCurrencyCode("CHIPS"))
.setReleasedAt(now())
.build();
}
examples/csharp/Player/Agg/Player.cs
[Rejected("table", "JoinTable")]
public FundsReleased HandleTableJoinRejected(Notification notification)
{
var ctx = CompensationContext.From(notification);
Console.WriteLine($"Player compensation for JoinTable rejection: reason={ctx.RejectionReason}");
// Extract table_root from the rejected command
var tableRoot = ctx.RejectedCommand?.Cover?.Root?.Value ?? ByteString.Empty;
// Release the funds that were reserved for this table
var tableKey = Convert.ToHexString(tableRoot.ToByteArray()).ToLowerInvariant();
TableReservations.TryGetValue(tableKey, out var reservedAmount);
var newReserved = ReservedFunds - reservedAmount;
var newAvailable = Bankroll - newReserved;
return new FundsReleased
{
Amount = new Currency { Amount = reservedAmount, CurrencyCode = "CHIPS" },
TableRoot = tableRoot,
NewAvailableBalance = new Currency { Amount = newAvailable, CurrencyCode = "CHIPS" },
NewReservedBalance = new Currency { Amount = newReserved, CurrencyCode = "CHIPS" },
ReleasedAt = Timestamp.FromDateTime(DateTime.UtcNow)
};
}
RevocationResponse Flags
When an aggregate cannot handle a rejection, it returns flags:
| Flag | Effect |
|---|---|
emit_system_revocation | Emit SagaCompensationFailed event to fallback domain |
send_to_dead_letter_queue | Route to DLQ for manual review |
escalate | Trigger webhook notification |
abort | Stop saga chain, propagate error to caller |
Configuration
saga_compensation:
fallback_domain: "system" # Domain for SagaCompensationFailed events
fallback_emit_system_revocation: true # Default when aggregate returns empty
fallback_send_to_dlq: true
fallback_escalate: false
dead_letter_queue_url: "amqp://..." # Optional DLQ URL
escalation_webhook_url: "https://..." # Optional webhook for alerts
MergeStrategy and Conflicts
The MergeStrategy enum controls how sequence conflicts are handled:
| Strategy | Proto Value | Behavior on Conflict |
|---|---|---|
MERGE_COMMUTATIVE | 0 (default) | FAILED_PRECONDITION — retryable with fresh state |
MERGE_STRICT | 1 | ABORTED — immediate rejection, no retry |
MERGE_AGGREGATE_HANDLES | 2 | Bypass validation — aggregate decides |
MERGE_MANUAL | 3 | ABORTED + route to DLQ |
When to Use Each
| Use Case | Strategy | Why |
|---|---|---|
| Financial operations (balance checks) | STRICT | Must see current state |
| Counters, metrics | COMMUTATIVE | Order doesn't matter |
| CRDT-style operations | AGGREGATE_HANDLES | Aggregate merges conflicts |
| Auditable operations | MANUAL | Human review required |
MERGE_MANUAL Flow
1. Command arrives with MERGE_MANUAL
2. Coordinator detects sequence mismatch
3. Command NOT executed
4. AngzarrDeadLetter created with SequenceMismatchDetails
5. Published to DLQ topic
6. Returns ABORTED status to caller
Escalation
For critical failures, angzarr can trigger alerts via webhooks:
saga_compensation:
escalation_webhook_url: "https://ops.example.com/alerts"
Webhook payload:
{
"saga_name": "saga-table-player",
"triggering_aggregate": {
"domain": "table",
"root": "550e8400-e29b-41d4-a716-446655440000"
},
"triggering_sequence": 5,
"rejection_reason": "insufficient_funds",
"compensation_reason": "Aggregate returned empty response",
"occurred_at": "2024-01-15T10:30:00Z"
}
Best Practices
1. Design for Idempotency
Handlers may be retried. Ensure repeated execution produces the same result.
# BAD: Not idempotent
def apply_funds_deposited(state, event):
state.balance += event.amount # Adds again on retry!
# GOOD: Idempotent (events have absolute values)
def apply_funds_deposited(state, event):
state.balance = event.new_balance # Same result on retry
2. Use Appropriate MergeStrategy
# Financial operations: STRICT
@merge_strategy(MergeStrategy.STRICT)
def handle_transfer(state, cmd):
if cmd.amount > state.available:
raise CommandRejectedError("insufficient_funds")
# Counters: COMMUTATIVE
@merge_strategy(MergeStrategy.COMMUTATIVE)
def handle_increment(state, cmd):
return ValueIncremented(delta=cmd.delta)
# Audit-critical: MANUAL
@merge_strategy(MergeStrategy.MANUAL)
def handle_compliance_action(state, cmd):
return ComplianceActionTaken(action=cmd.action)
3. Handle Notifications Explicitly
Don't ignore RejectionNotification — either compensate or escalate:
def handle_rejection(state, rejection):
if can_compensate(rejection):
return build_compensation_event(rejection)
else:
return RevocationResponse(
send_to_dead_letter_queue=True,
escalate=True,
reason="Cannot compensate automatically",
)
4. Monitor DLQ Depth
Set up alerts for DLQ message accumulation:
# Prometheus alert
- alert: DLQBacklog
expr: dlq_messages_total > 100
for: 5m
labels:
severity: warning
Next Steps
- Patterns Reference — MergeStrategy, SyncMode details
- Sagas — Cross-domain coordination
- Payload Offloading — Large message handling