Skip to main content

Framework Projectors

Angzarr provides several built-in projectors for common operational needs. These are framework-level services that process events without requiring custom business logic.

ProjectorPurposeOutput
LogServiceDebug loggingConsole (stdout)
EventServiceEvent storageDatabase (SQL)
OutboundServiceReal-time streaming + external publishinggRPC streams, HTTP/Kafka
TopologyProjectorComponent graphREST API (Grafana)

LogService

Pretty-prints events to stdout with optional JSON decoding. Useful for development debugging and monitoring event flow.

Features

  • ANSI color-coded output by event type
  • JSON decoding via prost-reflect (if descriptors provided)
  • Hex dump fallback for unknown types

Configuration

VariableDescription
DESCRIPTOR_PATHPath to FileDescriptorSet for JSON decoding

Output Example

────────────────────────────────────────────────────────
orders:abc123de:0000000005
OrderCreated
────────────────────────────────────────────────────────
{
"order_id": "ORD-12345",
"customer_id": "CUST-789",
"total": 9999
}

Color Coding

Event PatternColor
*CreatedGreen
*CompletedCyan
*Cancelled, *FailedRed
*Added, *AppliedYellow
OtherBlue

Usage (Rust)

use angzarr::handlers::projectors::{LogService, LogServiceHandle};

let service = LogService::new();

// Or with descriptors
std::env::set_var("DESCRIPTOR_PATH", "/path/to/descriptors.bin");
let service = LogService::new();

// Use as gRPC service
let handle = LogServiceHandle(Arc::new(service));

EventService

Stores all events as JSON in a SQL database for querying, debugging, and audit trails.

Features

  • Stores events keyed by (domain, root_id, sequence)
  • JSON decoding via prost-reflect
  • Base64 fallback for unknown types
  • Indexes for efficient querying by domain, type, correlation, time

Database Support

  • SQLite (feature: sqlite)
  • PostgreSQL (feature: postgres)

Schema

CREATE TABLE events (
domain TEXT NOT NULL,
root_id TEXT NOT NULL,
sequence INTEGER NOT NULL,
event_type TEXT NOT NULL,
event_json TEXT NOT NULL,
correlation_id TEXT NOT NULL,
created_at TEXT NOT NULL,
PRIMARY KEY (domain, root_id, sequence)
);

-- Indexes
CREATE INDEX idx_events_domain_type ON events(domain, event_type);
CREATE INDEX idx_events_correlation ON events(correlation_id);
CREATE INDEX idx_events_created ON events(created_at);

Queries

-- All events for an aggregate
SELECT * FROM events
WHERE domain = 'orders' AND root_id = 'abc123'
ORDER BY sequence;

-- Events by type
SELECT * FROM events
WHERE domain = 'orders' AND event_type = 'OrderCreated'
ORDER BY created_at DESC
LIMIT 100;

-- Trace a correlation flow
SELECT * FROM events
WHERE correlation_id = 'corr-xyz-789'
ORDER BY created_at;

Usage (Rust)

use angzarr::handlers::projectors::{EventService, EventServiceHandle, connect_pool};

let pool = connect_pool("sqlite:events.db").await?;
let service = EventService::new(pool)
.load_descriptors("/path/to/descriptors.bin");

service.init().await?; // Create schema

let handle = EventServiceHandle(Arc::new(service));

OutboundService

Unified outbound projector for event streaming to both internal and external consumers. Combines gRPC streaming (for gateways) with CloudEvents publishing (for external systems).

Architecture

                                    ┌── gRPC stream (EventBook, multi-page)
EventBook ──→ [OutboundService] ────┼── HTTP (CloudEvents JSON/protobuf)
└── Kafka (CloudEvents JSON/protobuf)

Features

  • gRPC Streaming: Correlation-based filtering for internal consumers (gateways)
  • CloudEvents HTTP: Webhook publishing with JSON or protobuf encoding
  • CloudEvents Kafka: Topic publishing with message key ordering
  • Multiple subscribers per correlation ID
  • Automatic cleanup when subscribers disconnect
  • Configurable content type (JSON or protobuf)

Use Cases

  • WebSocket gateways pushing events to browser clients
  • Long-polling APIs waiting for saga completion
  • Webhook integrations with external systems
  • Event-driven microservices via Kafka

gRPC Service

// EventStreamService: streams events to registered subscribers
service EventStreamService {
// Subscribe to events matching correlation ID (required)
// Returns INVALID_ARGUMENT if correlation_id is empty
rpc Subscribe (EventStreamFilter) returns (stream EventBook);
}
// Subscription filter for event streaming
message EventStreamFilter {
string correlation_id = 1;
}

Configuration

VariableDescriptionDefault
CLOUDEVENTS_SINKSink type: http, kafka, both, nullnull (gRPC only)
OUTBOUND_CONTENT_TYPEjson or protobufjson
CLOUDEVENTS_HTTP_ENDPOINTHTTP webhook URL(required if http)
CLOUDEVENTS_HTTP_TIMEOUTHTTP timeout in seconds30
CLOUDEVENTS_KAFKA_BROKERSKafka bootstrap servers(required if kafka)
CLOUDEVENTS_KAFKA_TOPICKafka topic namecloudevents

Usage (Rust)

use angzarr::handlers::projectors::{OutboundService, OutboundEventHandler};
use angzarr::handlers::projectors::outbound;

// Create from environment config (with sinks)
let service = outbound::from_env()?;

// Or create with explicit configuration
let service = OutboundService::new() // gRPC only
.with_content_type(ContentType::Json);

// Create event handler for bus integration
let handler = OutboundEventHandler::new(Arc::new(service));

// Service implements EventStreamService gRPC trait

Client Example (gRPC)

async def stream_events(correlation_id: str):
async with grpc.aio.insecure_channel("localhost:1340") as channel:
stub = EventStreamServiceStub(channel)
request = EventStreamFilter(correlation_id=correlation_id)

async for event_book in stub.Subscribe(request):
print(f"Received: {event_book.cover.domain}")
for page in event_book.pages:
print(f" Event: {page.payload}")

CloudEvents Format

Each EventBook page becomes a CloudEvent with:

FieldValue
id{domain}:{root_id}:{sequence}
typeangzarr.{event_type}
sourceangzarr/{domain}
subjectRoot ID (hex)
dataSingle-page EventBook (protobuf bytes)
correlationidExtension: correlation ID

TopologyProjector

Builds a graph of runtime components from event observation. Serves the graph via REST API for visualization in Grafana's Node Graph panel.

Features

  • Graph structure inferred from event flow
  • Metrics from event observation (counts, last seen)
  • REST API for Grafana integration
  • K8s watcher mode for pod annotation discovery

Discovery Modes

ModeSourceUse Case
Event observationRuntime event flowDefault
K8s watcherPod annotationsProduction K8s

REST API

GET /api/v1/topology/nodes
GET /api/v1/topology/edges
GET /api/v1/topology/graph # Combined for Grafana

Grafana Integration

Configure as JSON API datasource pointing to the topology REST endpoint. The Node Graph panel displays:

  • Nodes: Components (aggregates, sagas, projectors, PMs)
  • Edges: Event/command flow between components
  • Metrics: Event counts, last event type, timestamps

Usage (Rust)

use angzarr::handlers::projectors::TopologyProjector;
use angzarr::handlers::projectors::topology::store::SqliteTopologyStore;

let store = SqliteTopologyStore::new("topology.db").await?;
let projector = TopologyProjector::new(Arc::new(store), 9099);

projector.init().await?; // Start REST server

// Process events (builds graph from observed traffic)
projector.process_event(&event_book).await?;

Configuration

VariableDescriptionDefault
TOPOLOGY_REST_PORTREST API port9099
TOPOLOGY_DB_PATHSQLite database pathtopology.db

Comparison

FeatureLogEventOutboundTopology
OutputConsoleDatabasegRPC + HTTP/KafkaREST
PurposeDebugAudit/QueryReal-time + IntegrationVisualization
StateStatelessPersistentIn-memoryPersistent
FilteringNoneSQL queriesCorrelationComponent
ProductionDev onlyYesYesYes

Standalone Mode Integration

In standalone mode, register framework projectors via RuntimeBuilder:

let mut runtime = RuntimeBuilder::new()
.with_sqlite_memory()
.register_aggregate("orders", orders_aggregate)
// Framework projectors
.register_projector(
"topology",
topology_projector,
ProjectorConfig::async_(),
)
.build()
.await?;

Next Steps