Skip to main content

Framework Projectors

Angzarr provides several built-in projectors for common operational needs. These are framework-level services that process events without requiring custom business logic.

ProjectorPurposeOutput
LogServiceDebug loggingConsole (stdout)
EventServiceEvent storageDatabase (SQL)
OutboundServiceReal-time streaming + external publishinggRPC streams, HTTP/Kafka

LogService

Pretty-prints events to stdout with optional JSON decoding. Useful for development debugging and monitoring event flow.

Features

  • ANSI color-coded output by event type
  • JSON decoding via prost-reflect (if descriptors provided)
  • Hex dump fallback for unknown types

Configuration

VariableDescription
DESCRIPTOR_PATHPath to FileDescriptorSet for JSON decoding

Output Example

illustrative - LogService console output
────────────────────────────────────────────────────────
orders:abc123de:0000000005
OrderCreated
────────────────────────────────────────────────────────
{
"order_id": "ORD-12345",
"customer_id": "CUST-789",
"total": 9999
}

Color Coding

Event PatternColor
*CreatedGreen
*CompletedCyan
*Cancelled, *FailedRed
*Added, *AppliedYellow
OtherBlue

Usage (Rust)

illustrative - LogService usage
use angzarr::handlers::projectors::{LogService, LogServiceHandle};

let service = LogService::new();

// Or with descriptors
std::env::set_var("DESCRIPTOR_PATH", "/path/to/descriptors.bin");
let service = LogService::new();

// Use as gRPC service
let handle = LogServiceHandle(Arc::new(service));

EventService

Stores all events as JSON in a SQL database for querying, debugging, and audit trails.

Features

  • Stores events keyed by (domain, root_id, sequence)
  • JSON decoding via prost-reflect
  • Base64 fallback for unknown types
  • Indexes for efficient querying by domain, type, correlation, time

Database Support

  • SQLite (feature: sqlite)
  • PostgreSQL (feature: postgres)

Schema

illustrative - EventService schema
CREATE TABLE events (
domain TEXT NOT NULL,
root_id TEXT NOT NULL,
sequence INTEGER NOT NULL,
event_type TEXT NOT NULL,
event_json TEXT NOT NULL,
correlation_id TEXT NOT NULL,
created_at TEXT NOT NULL,
PRIMARY KEY (domain, root_id, sequence)
);

-- Indexes
CREATE INDEX idx_events_domain_type ON events(domain, event_type);
CREATE INDEX idx_events_correlation ON events(correlation_id);
CREATE INDEX idx_events_created ON events(created_at);

Queries

illustrative - EventService queries
-- All events for an aggregate
SELECT * FROM events
WHERE domain = 'orders' AND root_id = 'abc123'
ORDER BY sequence;

-- Events by type
SELECT * FROM events
WHERE domain = 'orders' AND event_type = 'OrderCreated'
ORDER BY created_at DESC
LIMIT 100;

-- Trace a correlation flow
SELECT * FROM events
WHERE correlation_id = 'corr-xyz-789'
ORDER BY created_at;

Usage (Rust)

illustrative - EventService usage
use angzarr::handlers::projectors::{EventService, EventServiceHandle, connect_pool};

let pool = connect_pool("sqlite:events.db").await?;
let service = EventService::new(pool)
.load_descriptors("/path/to/descriptors.bin");

service.init().await?; // Create schema

let handle = EventServiceHandle(Arc::new(service));

OutboundService

Unified outbound projector for event streaming to both internal and external consumers. Combines gRPC streaming (for gateways) with CloudEvents publishing (for external systems).

Architecture

illustrative - OutboundService architecture
                                    ┌── gRPC stream (EventBook, multi-page)
EventBook ──→ [OutboundService] ────┼── HTTP (CloudEvents JSON/protobuf)
└── Kafka (CloudEvents JSON/protobuf)

Features

  • gRPC Streaming: Correlation-based filtering for internal consumers (gateways)
  • CloudEvents HTTP: Webhook publishing with JSON or protobuf encoding
  • CloudEvents Kafka: Topic publishing with message key ordering
  • Multiple subscribers per correlation ID
  • Automatic cleanup when subscribers disconnect
  • Configurable content type (JSON or protobuf)

Use Cases

  • WebSocket gateways pushing events to browser clients
  • Long-polling APIs waiting for saga completion
  • Webhook integrations with external systems
  • Event-driven microservices via Kafka

gRPC Service

// EventStreamService: streams events to registered subscribers
service EventStreamService {
// Subscribe to events matching correlation ID (required)
// Returns INVALID_ARGUMENT if correlation_id is empty
// REST: Server-Sent Events (SSE) stream
rpc Subscribe(EventStreamFilter) returns (stream EventBook) {
option (google.api.http) = {get: "/v1/stream/{correlation_id}"};
}
}
// Subscription filter for event streaming
message EventStreamFilter {
string correlation_id = 1;
}

Configuration

VariableDescriptionDefault
CLOUDEVENTS_SINKSink type: http, kafka, both, nullnull (gRPC only)
OUTBOUND_CONTENT_TYPEjson or protobufjson
CLOUDEVENTS_HTTP_ENDPOINTHTTP webhook URL(required if http)
CLOUDEVENTS_HTTP_TIMEOUTHTTP timeout in seconds30
CLOUDEVENTS_KAFKA_BROKERSKafka bootstrap servers(required if kafka)
CLOUDEVENTS_KAFKA_TOPICKafka topic namecloudevents

Usage (Rust)

illustrative - OutboundService usage
use angzarr::handlers::projectors::{OutboundService, OutboundEventHandler};
use angzarr::handlers::projectors::outbound;

// Create from environment config (with sinks)
let service = outbound::from_env()?;

// Or create with explicit configuration
let service = OutboundService::new() // gRPC only
.with_content_type(ContentType::Json);

// Create event handler for bus integration
let handler = OutboundEventHandler::new(Arc::new(service));

// Service implements EventStreamService gRPC trait

Client Example (gRPC)

illustrative - gRPC client streaming
async def stream_events(correlation_id: str):
async with grpc.aio.insecure_channel("localhost:1340") as channel:
stub = EventStreamServiceStub(channel)
request = EventStreamFilter(correlation_id=correlation_id)

async for event_book in stub.Subscribe(request):
print(f"Received: {event_book.cover.domain}")
for page in event_book.pages:
print(f" Event: {page.payload}")

CloudEvents Format

Each EventBook page becomes a CloudEvent with:

FieldValue
id{domain}:{root_id}:{sequence}
typeangzarr.{event_type}
sourceangzarr/{domain}
subjectRoot ID (hex)
dataSingle-page EventBook (protobuf bytes)
correlationidExtension: correlation ID

Comparison

FeatureLogEventOutbound
OutputConsoleDatabasegRPC + HTTP/Kafka
PurposeDebugAudit/QueryReal-time + Integration
StateStatelessPersistentIn-memory
FilteringNoneSQL queriesCorrelation
ProductionDev onlyYesYes

Standalone Mode Integration

In standalone mode, register framework projectors via RuntimeBuilder:

illustrative - standalone mode registration
let mut runtime = RuntimeBuilder::new()
.with_sqlite_memory()
.register_aggregate("orders", orders_aggregate)
.build()
.await?;

Next Steps