Payload Offloading
When event or command payloads exceed message bus size limits, angzarr stores them externally using the claim check pattern.
Overview
Section titled “Overview”Large Payload Flow:
1. Event with 5MB payload2. Payload stored externally → returns PayloadReference3. EventPage contains reference, not payload4. Published to message bus (small message)5. Consumer retrieves payload via reference6. TTL reaper cleans up after retention periodThe claim check pattern trades latency for reliability — payloads that would fail bus size limits are stored separately and retrieved on demand.
When to Use
Section titled “When to Use”| Scenario | Solution |
|---|---|
| Event payload > 256KB (typical bus limit) | Payload offloading |
| Snapshot state > bus limit | Payload offloading |
| Binary attachments (images, documents) | Payload offloading |
| Normal-sized events | Direct embedding (no offloading) |
Storage Backends
Section titled “Storage Backends”Filesystem
Section titled “Filesystem”Local storage for development:
payload_offload: enabled: true store_type: filesystem filesystem: base_path: /var/angzarr/payloadsFiles stored as: /var/angzarr/payloads/{sha256-hash}.bin
Google Cloud Storage
Section titled “Google Cloud Storage”For GCP deployments:
payload_offload: enabled: true store_type: gcs gcs: bucket: my-angzarr-payloads prefix: events/ # Optional path prefixFiles stored as: gs://my-angzarr-payloads/events/{sha256-hash}.bin
Amazon S3
Section titled “Amazon S3”For AWS deployments:
payload_offload: enabled: true store_type: s3 s3: bucket: my-angzarr-payloads prefix: events/ region: us-east-1 endpoint: http://localhost:4566 # Optional (LocalStack)Files stored as: s3://my-angzarr-payloads/events/{sha256-hash}.bin
Content-Addressable Storage
Section titled “Content-Addressable Storage”All backends use SHA-256 content hashing:
| Benefit | How |
|---|---|
| Deduplication | Identical payloads share storage |
| Integrity | Hash verified on retrieval |
| Immutability | Same hash = same content forever |
Payload: [binary data...]Hash: e3b0c44298fc1c149afbf4c8996fb92427ae41e4649b934ca495991b7852b855URI: gs://bucket/e3b0c44298fc1c149afbf4c8996fb92427ae41e4649b934ca495991b7852b855.binPayloadReference
Section titled “PayloadReference”Events and commands reference external payloads via PayloadReference:
// Storage backend type for externally stored payloads (claim check pattern).enum PayloadStorageType { PAYLOAD_STORAGE_TYPE_UNSPECIFIED = 0; PAYLOAD_STORAGE_TYPE_FILESYSTEM = 1; PAYLOAD_STORAGE_TYPE_GCS = 2; PAYLOAD_STORAGE_TYPE_S3 = 3;}
// Reference to externally stored payload (claim check pattern).// Used when event/command payloads exceed message bus size limits.message PayloadReference { PayloadStorageType storage_type = 1; // Location URI: // - file:///var/angzarr/payloads/{hash}.bin // - gs://bucket/prefix/{hash}.bin // - s3://bucket/prefix/{hash}.bin string uri = 2; // Content hash for integrity verification and deduplication (SHA-256) bytes content_hash = 3; // Original serialized payload size in bytes uint64 original_size = 4; // Timestamp when payload was stored (for TTL cleanup) google.protobuf.Timestamp stored_at = 5;}message EventPage { PageHeader header = 1; // Sequence type and provenance google.protobuf.Timestamp created_at = 2; oneof payload { google.protobuf.Any event = 3; PayloadReference external = 4; // Claim check: payload stored externally } // Field 5 removed: FactSequence replaced by PageHeader.external_deferred
// Two-phase commit support (Phase 1: 2PC Storage Model) bool no_commit = 6; // true = pending 2PC (cascade), needs Confirmation; false (default) = immediately committed optional string cascade_id = 7; // Groups related pending events for atomic commit/rollback (null if not in cascade)}Threshold Configuration
Section titled “Threshold Configuration”Configure when offloading triggers:
payload_offload: enabled: true threshold_bytes: 262144 # 256KB - offload payloads larger than this store_type: gcs gcs: bucket: my-payloadsTTL and Cleanup
Section titled “TTL and Cleanup”External payloads have a retention period. The TtlReaper background task cleans up expired payloads:
payload_offload: ttl_days: 30 # Delete payloads older than this reaper_interval_hours: 24 # Run cleanup every 24 hoursCleanup Process
Section titled “Cleanup Process”1. Reaper scans storage for payloads older than TTL2. Cross-references with event store (are events still live?)3. Deletes orphaned payloads4. Logs cleanup metricsManual Cleanup
Section titled “Manual Cleanup”For immediate cleanup:
# Using angzarr CLIangzarr payload-store cleanup --older-than 7d
# Or via APIcurl -X POST http://localhost:9099/admin/payload-store/cleanup?age=7dRetrieval Failures
Section titled “Retrieval Failures”When payload retrieval fails, angzarr routes to DLQ:
// Sequence mismatch details for DLQ entries.// Contains expected vs actual sequence for debugging and replay.message SequenceMismatchDetails { uint32 expected_sequence = 1; // What the command expected uint32 actual_sequence = 2; // What the aggregate was at MergeStrategy merge_strategy = 3; // Strategy that triggered DLQ routing}
// Event processing failure details for DLQ entries.// Contains information about why a saga/projector failed to process events.message EventProcessingFailedDetails { string error = 1; // Error message from the handler uint32 retry_count = 2; // Number of retry attempts before DLQ routing bool is_transient = 3; // Whether the failure is considered transient // Flat array of captured errors representing the cause chain // (most-causal-first; the originating caught error is the LAST // element). Matches Sentry's `exception.values` shape; linkage via // each entry's `mechanism.exception_id` / `mechanism.parent_id`. // Empty when no capture was attempted. See sererr.fyi/spec/proto // for the schema rationale. repeated sererr.v1.CapturedError stack_trace = 4;}
// Payload retrieval failure details for DLQ entries.// Contains information about why an externally stored payload couldn't be retrieved.message PayloadRetrievalFailedDetails { PayloadStorageType storage_type = 1; // Storage backend type string uri = 2; // URI of the payload that couldn't be retrieved bytes content_hash = 3; // Content hash for identification uint64 original_size = 4; // Original payload size in bytes string error = 5; // Error message from the retrieval attempt}Common failure causes:
| Error | Cause | Resolution |
|---|---|---|
| Object not found | TTL expired, manual deletion | Restore from backup or skip |
| Integrity failed | Corruption, hash mismatch | Restore from backup |
| Access denied | Permissions changed | Fix IAM/bucket policies |
| Timeout | Network issues | Retry or check connectivity |
Usage in Handlers
Section titled “Usage in Handlers”Payload offloading is transparent to handlers — the framework handles storage and retrieval:
# Handler receives full payload regardless of storage locationdef handle_large_document(state, cmd): # cmd.document_bytes is already retrieved # No special handling needed return DocumentUploaded( document_id=cmd.document_id, size=len(cmd.document_bytes), hash=compute_hash(cmd.document_bytes), )Manual Offloading (Advanced)
Section titled “Manual Offloading (Advanced)”For explicit control:
use angzarr::payload_store::PayloadStore;
async fn store_large_payload( store: &dyn PayloadStore, data: &[u8],) -> Result<PayloadReference, PayloadStoreError> { store.put(data).await}
async fn retrieve_payload( store: &dyn PayloadStore, reference: &PayloadReference,) -> Result<Vec<u8>, PayloadStoreError> { store.get(reference).await}Monitoring
Section titled “Monitoring”Metrics
Section titled “Metrics”| Metric | Description |
|---|---|
payload_store_put_total | Total payloads stored |
payload_store_put_bytes_total | Total bytes stored |
payload_store_get_total | Total payloads retrieved |
payload_store_get_errors_total | Retrieval failures |
payload_store_cleanup_deleted_total | Payloads deleted by reaper |
Alerts
Section titled “Alerts”# Prometheus alerts- alert: PayloadRetrievalErrors expr: rate(payload_store_get_errors_total[5m]) > 0.1 labels: severity: warning
- alert: PayloadStorageGrowth expr: payload_store_put_bytes_total > 100e9 # 100GB labels: severity: infoBest Practices
Section titled “Best Practices”1. Set Appropriate Thresholds
Section titled “1. Set Appropriate Thresholds”Match your message bus limits:
| Bus | Typical Limit | Recommended Threshold |
|---|---|---|
| Kafka | 1MB default | 512KB |
| RabbitMQ | Unlimited* | 256KB |
| Pub/Sub | 10MB | 1MB |
| SNS/SQS | 256KB | 200KB |
2. Use Appropriate TTL
Section titled “2. Use Appropriate TTL”Balance storage costs vs replay needs:
| Use Case | Recommended TTL |
|---|---|
| Short-lived events | 7 days |
| Standard retention | 30 days |
| Compliance/audit | 365+ days |
3. Monitor Storage Growth
Section titled “3. Monitor Storage Growth”Payload stores can grow quickly with large events:
# Check storage sizegsutil du -s gs://my-payloads/aws s3 ls s3://my-payloads/ --summarize4. Handle Retrieval Failures Gracefully
Section titled “4. Handle Retrieval Failures Gracefully”DLQ entries for payload failures need manual intervention — the original payload may be unrecoverable.
Next Steps
Section titled “Next Steps”- Error Recovery — DLQ and retry handling
- Infrastructure — Deployment configuration