Skip to content

Framework Projectors

Angzarr provides several built-in projectors for common operational needs. These are framework-level services that process events without requiring custom business logic.

ProjectorPurposeOutput
LogServiceDebug loggingConsole (stdout)
EventServiceEvent storageDatabase (SQL)
OutboundServiceReal-time streaming + external publishinggRPC streams, HTTP/Kafka

Pretty-prints events to stdout with optional JSON decoding. Useful for development debugging and monitoring event flow.

  • ANSI color-coded output by event type
  • JSON decoding via prost-reflect (if descriptors provided)
  • Hex dump fallback for unknown types
VariableDescription
DESCRIPTOR_PATHPath to FileDescriptorSet for JSON decoding
illustrative - LogService console output
────────────────────────────────────────────────────────
orders:abc123de:0000000005
OrderCreated
────────────────────────────────────────────────────────
{
"order_id": "ORD-12345",
"customer_id": "CUST-789",
"total": 9999
}
Event PatternColor
*CreatedGreen
*CompletedCyan
*Cancelled, *FailedRed
*Added, *AppliedYellow
OtherBlue
illustrative - LogService usage
use angzarr::handlers::projectors::{LogService, LogServiceHandle};
let service = LogService::new();
// Or with descriptors
std::env::set_var("DESCRIPTOR_PATH", "/path/to/descriptors.bin");
let service = LogService::new();
// Use as gRPC service
let handle = LogServiceHandle(Arc::new(service));

Stores all events as JSON in a SQL database for querying, debugging, and audit trails.

  • Stores events keyed by (domain, root_id, sequence)
  • JSON decoding via prost-reflect
  • Base64 fallback for unknown types
  • Indexes for efficient querying by domain, type, correlation, time
  • SQLite (feature: sqlite)
  • PostgreSQL (feature: postgres)
illustrative - EventService schema
CREATE TABLE events (
domain TEXT NOT NULL,
root_id TEXT NOT NULL,
sequence INTEGER NOT NULL,
event_type TEXT NOT NULL,
event_json TEXT NOT NULL,
correlation_id TEXT NOT NULL,
created_at TEXT NOT NULL,
PRIMARY KEY (domain, root_id, sequence)
);
-- Indexes
CREATE INDEX idx_events_domain_type ON events(domain, event_type);
CREATE INDEX idx_events_correlation ON events(correlation_id);
CREATE INDEX idx_events_created ON events(created_at);

EventService exposes its reads over gRPC (and the REST gateway). Use the generated client — never query the backing store directly.

illustrative - EventService queries
# All events for an aggregate
events = event_service.by_root(domain="orders", root_id="abc123")
# Events by type, newest first
recent = event_service.by_type(domain="orders", event_type="OrderCreated", limit=100)
# Trace a correlation flow
trace = event_service.by_correlation("corr-xyz-789")
illustrative - EventService usage
use angzarr::handlers::projectors::{EventService, EventServiceHandle, connect_pool};
let pool = connect_pool("sqlite:events.db").await?;
let service = EventService::new(pool)
.load_descriptors("/path/to/descriptors.bin");
service.init().await?; // Create schema
let handle = EventServiceHandle(Arc::new(service));

Unified outbound projector for event streaming to both internal and external consumers. Combines gRPC streaming (for gateways) with CloudEvents publishing (for external systems).

flowchart LR
    EB[EventBook] --> OS[OutboundService]
    OS --> G["gRPC stream<br/>(EventBook, multi-page)"]
    OS --> H["HTTP<br/>(CloudEvents JSON/protobuf)"]
    OS --> K["Kafka<br/>(CloudEvents JSON/protobuf)"]
  • gRPC Streaming: Correlation-based filtering for internal consumers (gateways)
  • CloudEvents HTTP: Webhook publishing with JSON or protobuf encoding
  • CloudEvents Kafka: Topic publishing with message key ordering
  • Multiple subscribers per correlation ID
  • Automatic cleanup when subscribers disconnect
  • Configurable content type (JSON or protobuf)
  • WebSocket gateways pushing events to browser clients
  • Long-polling APIs waiting for saga completion
  • Webhook integrations with external systems
  • Event-driven microservices via Kafka
// EventStreamService: streams events to registered subscribers
service EventStreamService {
// Subscribe to events matching correlation ID (required)
// Returns INVALID_ARGUMENT if correlation_id is empty
// REST: Server-Sent Events (SSE) stream
rpc Subscribe(EventStreamFilter) returns (stream EventBook) {
option (google.api.http) = {get: "/v1/stream/{correlation_id}"};
}
}
// Subscription filter for event streaming
message EventStreamFilter {
string correlation_id = 1;
}
VariableDescriptionDefault
CLOUDEVENTS_SINKSink type: http, kafka, both, nullnull (gRPC only)
OUTBOUND_CONTENT_TYPEjson or protobufjson
CLOUDEVENTS_HTTP_ENDPOINTHTTP webhook URL(required if http)
CLOUDEVENTS_HTTP_TIMEOUTHTTP timeout in seconds30
CLOUDEVENTS_KAFKA_BROKERSKafka bootstrap servers(required if kafka)
CLOUDEVENTS_KAFKA_TOPICKafka topic namecloudevents
illustrative - OutboundService usage
use angzarr::handlers::projectors::{OutboundService, OutboundEventHandler};
use angzarr::handlers::projectors::outbound;
// Create from environment config (with sinks)
let service = outbound::from_env()?;
// Or create with explicit configuration
let service = OutboundService::new() // gRPC only
.with_content_type(ContentType::Json);
// Create event handler for bus integration
let handler = OutboundEventHandler::new(Arc::new(service));
// Service implements EventStreamService gRPC trait
illustrative - gRPC client streaming
async def stream_events(correlation_id: str):
async with grpc.aio.insecure_channel("localhost:1340") as channel:
stub = EventStreamServiceStub(channel)
request = EventStreamFilter(correlation_id=correlation_id)
async for event_book in stub.Subscribe(request):
print(f"Received: {event_book.cover.domain}")
for page in event_book.pages:
print(f" Event: {page.payload}")

Each EventBook page becomes a CloudEvent with:

FieldValue
id{domain}:{root_id}:{sequence}
typeangzarr.{event_type}
sourceangzarr/{domain}
subjectRoot ID (hex)
dataSingle-page EventBook (protobuf bytes)
correlationidExtension: correlation ID

FeatureLogEventOutbound
OutputConsoleDatabasegRPC + HTTP/Kafka
PurposeDebugAudit/QueryReal-time + Integration
StateStatelessPersistentIn-memory
FilteringNoneSQL queriesCorrelation
ProductionDev onlyYesYes