Skip to main content

Payload Offloading

When event or command payloads exceed message bus size limits, angzarr stores them externally using the claim check pattern.


Overview

Large Payload Flow:

1. Event with 5MB payload
2. Payload stored externally → returns PayloadReference
3. EventPage contains reference, not payload
4. Published to message bus (small message)
5. Consumer retrieves payload via reference
6. TTL reaper cleans up after retention period

The claim check pattern trades latency for reliability — payloads that would fail bus size limits are stored separately and retrieved on demand.


When to Use

ScenarioSolution
Event payload > 256KB (typical bus limit)Payload offloading
Snapshot state > bus limitPayload offloading
Binary attachments (images, documents)Payload offloading
Normal-sized eventsDirect embedding (no offloading)

Storage Backends

Filesystem

Local storage for development and standalone mode:

payload_offload:
enabled: true
store_type: filesystem
filesystem:
base_path: /var/angzarr/payloads

Files stored as: /var/angzarr/payloads/{sha256-hash}.bin

Google Cloud Storage

For GCP deployments:

payload_offload:
enabled: true
store_type: gcs
gcs:
bucket: my-angzarr-payloads
prefix: events/ # Optional path prefix

Files stored as: gs://my-angzarr-payloads/events/{sha256-hash}.bin

Amazon S3

For AWS deployments:

payload_offload:
enabled: true
store_type: s3
s3:
bucket: my-angzarr-payloads
prefix: events/
region: us-east-1
endpoint: http://localhost:4566 # Optional (LocalStack)

Files stored as: s3://my-angzarr-payloads/events/{sha256-hash}.bin


Content-Addressable Storage

All backends use SHA-256 content hashing:

BenefitHow
DeduplicationIdentical payloads share storage
IntegrityHash verified on retrieval
ImmutabilitySame hash = same content forever
Payload: [binary data...]
Hash: e3b0c44298fc1c149afbf4c8996fb92427ae41e4649b934ca495991b7852b855
URI: gs://bucket/e3b0c44298fc1c149afbf4c8996fb92427ae41e4649b934ca495991b7852b855.bin

PayloadReference

Events and commands reference external payloads via PayloadReference:

// Storage backend type for externally stored payloads (claim check pattern).
enum PayloadStorageType {
PAYLOAD_STORAGE_TYPE_UNSPECIFIED = 0;
PAYLOAD_STORAGE_TYPE_FILESYSTEM = 1;
PAYLOAD_STORAGE_TYPE_GCS = 2;
PAYLOAD_STORAGE_TYPE_S3 = 3;
}

// Reference to externally stored payload (claim check pattern).
// Used when event/command payloads exceed message bus size limits.
message PayloadReference {
PayloadStorageType storage_type = 1;
// Location URI:
// - file:///var/angzarr/payloads/{hash}.bin
// - gs://bucket/prefix/{hash}.bin
// - s3://bucket/prefix/{hash}.bin
string uri = 2;
// Content hash for integrity verification and deduplication (SHA-256)
bytes content_hash = 3;
// Original serialized payload size in bytes
uint64 original_size = 4;
// Timestamp when payload was stored (for TTL cleanup)
google.protobuf.Timestamp stored_at = 5;
}
message EventPage {
uint32 sequence = 1;
google.protobuf.Timestamp created_at = 2;
oneof payload {
google.protobuf.Any event = 3;
PayloadReference external = 4; // Claim check: payload stored externally
}
}

Threshold Configuration

Configure when offloading triggers:

payload_offload:
enabled: true
threshold_bytes: 262144 # 256KB - offload payloads larger than this
store_type: gcs
gcs:
bucket: my-payloads

TTL and Cleanup

External payloads have a retention period. The TtlReaper background task cleans up expired payloads:

payload_offload:
ttl_days: 30 # Delete payloads older than this
reaper_interval_hours: 24 # Run cleanup every 24 hours

Cleanup Process

1. Reaper scans storage for payloads older than TTL
2. Cross-references with event store (are events still live?)
3. Deletes orphaned payloads
4. Logs cleanup metrics

Manual Cleanup

For immediate cleanup:

# Using angzarr CLI
angzarr payload-store cleanup --older-than 7d

# Or via API
curl -X POST http://localhost:9099/admin/payload-store/cleanup?age=7d

Retrieval Failures

When payload retrieval fails, angzarr routes to DLQ:

// Sequence mismatch details for DLQ entries.
// Contains expected vs actual sequence for debugging and replay.
message SequenceMismatchDetails {
uint32 expected_sequence = 1; // What the command expected
uint32 actual_sequence = 2; // What the aggregate was at
MergeStrategy merge_strategy = 3; // Strategy that triggered DLQ routing
}

// Event processing failure details for DLQ entries.
// Contains information about why a saga/projector failed to process events.
message EventProcessingFailedDetails {
string error = 1; // Error message from the handler
uint32 retry_count = 2; // Number of retry attempts before DLQ routing
bool is_transient = 3; // Whether the failure is considered transient
}

// Payload retrieval failure details for DLQ entries.
// Contains information about why an externally stored payload couldn't be retrieved.
message PayloadRetrievalFailedDetails {
PayloadStorageType storage_type = 1; // Storage backend type
string uri = 2; // URI of the payload that couldn't be retrieved
bytes content_hash = 3; // Content hash for identification
uint64 original_size = 4; // Original payload size in bytes
string error = 5; // Error message from the retrieval attempt
}

Common failure causes:

ErrorCauseResolution
Object not foundTTL expired, manual deletionRestore from backup or skip
Integrity failedCorruption, hash mismatchRestore from backup
Access deniedPermissions changedFix IAM/bucket policies
TimeoutNetwork issuesRetry or check connectivity

Usage in Handlers

Payload offloading is transparent to handlers — the framework handles storage and retrieval:

# Handler receives full payload regardless of storage location
def handle_large_document(state, cmd):
# cmd.document_bytes is already retrieved
# No special handling needed
return DocumentUploaded(
document_id=cmd.document_id,
size=len(cmd.document_bytes),
hash=compute_hash(cmd.document_bytes),
)

Manual Offloading (Advanced)

For explicit control:

use angzarr::payload_store::PayloadStore;

async fn store_large_payload(
store: &dyn PayloadStore,
data: &[u8],
) -> Result<PayloadReference, PayloadStoreError> {
store.put(data).await
}

async fn retrieve_payload(
store: &dyn PayloadStore,
reference: &PayloadReference,
) -> Result<Vec<u8>, PayloadStoreError> {
store.get(reference).await
}

Monitoring

Metrics

MetricDescription
payload_store_put_totalTotal payloads stored
payload_store_put_bytes_totalTotal bytes stored
payload_store_get_totalTotal payloads retrieved
payload_store_get_errors_totalRetrieval failures
payload_store_cleanup_deleted_totalPayloads deleted by reaper

Alerts

# Prometheus alerts
- alert: PayloadRetrievalErrors
expr: rate(payload_store_get_errors_total[5m]) > 0.1
labels:
severity: warning

- alert: PayloadStorageGrowth
expr: payload_store_put_bytes_total > 100e9 # 100GB
labels:
severity: info

Best Practices

1. Set Appropriate Thresholds

Match your message bus limits:

BusTypical LimitRecommended Threshold
Kafka1MB default512KB
RabbitMQUnlimited*256KB
Pub/Sub10MB1MB
SNS/SQS256KB200KB

2. Use Appropriate TTL

Balance storage costs vs replay needs:

Use CaseRecommended TTL
Short-lived events7 days
Standard retention30 days
Compliance/audit365+ days

3. Monitor Storage Growth

Payload stores can grow quickly with large events:

# Check storage size
gsutil du -s gs://my-payloads/
aws s3 ls s3://my-payloads/ --summarize

4. Handle Retrieval Failures Gracefully

DLQ entries for payload failures need manual intervention — the original payload may be unrecoverable.


Next Steps