Framework Projectors
Angzarr provides several built-in projectors for common operational needs. These are framework-level services that process events without requiring custom business logic.
| Projector | Purpose | Output |
|---|---|---|
| LogService | Debug logging | Console (stdout) |
| EventService | Event storage | Database (SQL) |
| OutboundService | Real-time streaming + external publishing | gRPC streams, HTTP/Kafka |
| TopologyProjector | Component graph | REST API (Grafana) |
LogService
Pretty-prints events to stdout with optional JSON decoding. Useful for development debugging and monitoring event flow.
Features
- ANSI color-coded output by event type
- JSON decoding via prost-reflect (if descriptors provided)
- Hex dump fallback for unknown types
Configuration
| Variable | Description |
|---|---|
DESCRIPTOR_PATH | Path to FileDescriptorSet for JSON decoding |
Output Example
────────────────────────────────────────────────────────
orders:abc123de:0000000005
OrderCreated
────────────────────────────────────────────────────────
{
"order_id": "ORD-12345",
"customer_id": "CUST-789",
"total": 9999
}
Color Coding
| Event Pattern | Color |
|---|---|
*Created | Green |
*Completed | Cyan |
*Cancelled, *Failed | Red |
*Added, *Applied | Yellow |
| Other | Blue |
Usage (Rust)
use angzarr::handlers::projectors::{LogService, LogServiceHandle};
let service = LogService::new();
// Or with descriptors
std::env::set_var("DESCRIPTOR_PATH", "/path/to/descriptors.bin");
let service = LogService::new();
// Use as gRPC service
let handle = LogServiceHandle(Arc::new(service));
EventService
Stores all events as JSON in a SQL database for querying, debugging, and audit trails.
Features
- Stores events keyed by
(domain, root_id, sequence) - JSON decoding via prost-reflect
- Base64 fallback for unknown types
- Indexes for efficient querying by domain, type, correlation, time
Database Support
- SQLite (feature:
sqlite) - PostgreSQL (feature:
postgres)
Schema
CREATE TABLE events (
domain TEXT NOT NULL,
root_id TEXT NOT NULL,
sequence INTEGER NOT NULL,
event_type TEXT NOT NULL,
event_json TEXT NOT NULL,
correlation_id TEXT NOT NULL,
created_at TEXT NOT NULL,
PRIMARY KEY (domain, root_id, sequence)
);
-- Indexes
CREATE INDEX idx_events_domain_type ON events(domain, event_type);
CREATE INDEX idx_events_correlation ON events(correlation_id);
CREATE INDEX idx_events_created ON events(created_at);
Queries
-- All events for an aggregate
SELECT * FROM events
WHERE domain = 'orders' AND root_id = 'abc123'
ORDER BY sequence;
-- Events by type
SELECT * FROM events
WHERE domain = 'orders' AND event_type = 'OrderCreated'
ORDER BY created_at DESC
LIMIT 100;
-- Trace a correlation flow
SELECT * FROM events
WHERE correlation_id = 'corr-xyz-789'
ORDER BY created_at;
Usage (Rust)
use angzarr::handlers::projectors::{EventService, EventServiceHandle, connect_pool};
let pool = connect_pool("sqlite:events.db").await?;
let service = EventService::new(pool)
.load_descriptors("/path/to/descriptors.bin");
service.init().await?; // Create schema
let handle = EventServiceHandle(Arc::new(service));
OutboundService
Unified outbound projector for event streaming to both internal and external consumers. Combines gRPC streaming (for gateways) with CloudEvents publishing (for external systems).
Architecture
┌── gRPC stream (EventBook, multi-page)
EventBook ──→ [OutboundService] ────┼── HTTP (CloudEvents JSON/protobuf)
└── Kafka (CloudEvents JSON/protobuf)
Features
- gRPC Streaming: Correlation-based filtering for internal consumers (gateways)
- CloudEvents HTTP: Webhook publishing with JSON or protobuf encoding
- CloudEvents Kafka: Topic publishing with message key ordering
- Multiple subscribers per correlation ID
- Automatic cleanup when subscribers disconnect
- Configurable content type (JSON or protobuf)
Use Cases
- WebSocket gateways pushing events to browser clients
- Long-polling APIs waiting for saga completion
- Webhook integrations with external systems
- Event-driven microservices via Kafka
gRPC Service
// EventStreamService: streams events to registered subscribers
service EventStreamService {
// Subscribe to events matching correlation ID (required)
// Returns INVALID_ARGUMENT if correlation_id is empty
rpc Subscribe (EventStreamFilter) returns (stream EventBook);
}
// Subscription filter for event streaming
message EventStreamFilter {
string correlation_id = 1;
}
Configuration
| Variable | Description | Default |
|---|---|---|
CLOUDEVENTS_SINK | Sink type: http, kafka, both, null | null (gRPC only) |
OUTBOUND_CONTENT_TYPE | json or protobuf | json |
CLOUDEVENTS_HTTP_ENDPOINT | HTTP webhook URL | (required if http) |
CLOUDEVENTS_HTTP_TIMEOUT | HTTP timeout in seconds | 30 |
CLOUDEVENTS_KAFKA_BROKERS | Kafka bootstrap servers | (required if kafka) |
CLOUDEVENTS_KAFKA_TOPIC | Kafka topic name | cloudevents |
Usage (Rust)
use angzarr::handlers::projectors::{OutboundService, OutboundEventHandler};
use angzarr::handlers::projectors::outbound;
// Create from environment config (with sinks)
let service = outbound::from_env()?;
// Or create with explicit configuration
let service = OutboundService::new() // gRPC only
.with_content_type(ContentType::Json);
// Create event handler for bus integration
let handler = OutboundEventHandler::new(Arc::new(service));
// Service implements EventStreamService gRPC trait
Client Example (gRPC)
async def stream_events(correlation_id: str):
async with grpc.aio.insecure_channel("localhost:1340") as channel:
stub = EventStreamServiceStub(channel)
request = EventStreamFilter(correlation_id=correlation_id)
async for event_book in stub.Subscribe(request):
print(f"Received: {event_book.cover.domain}")
for page in event_book.pages:
print(f" Event: {page.payload}")
CloudEvents Format
Each EventBook page becomes a CloudEvent with:
| Field | Value |
|---|---|
id | {domain}:{root_id}:{sequence} |
type | angzarr.{event_type} |
source | angzarr/{domain} |
subject | Root ID (hex) |
data | Single-page EventBook (protobuf bytes) |
correlationid | Extension: correlation ID |
TopologyProjector
Builds a graph of runtime components from event observation. Serves the graph via REST API for visualization in Grafana's Node Graph panel.
Features
- Graph structure inferred from event flow
- Metrics from event observation (counts, last seen)
- REST API for Grafana integration
- K8s watcher mode for pod annotation discovery
Discovery Modes
| Mode | Source | Use Case |
|---|---|---|
| Event observation | Runtime event flow | Default |
| K8s watcher | Pod annotations | Production K8s |
REST API
GET /api/v1/topology/nodes
GET /api/v1/topology/edges
GET /api/v1/topology/graph # Combined for Grafana
Grafana Integration
Configure as JSON API datasource pointing to the topology REST endpoint. The Node Graph panel displays:
- Nodes: Components (aggregates, sagas, projectors, PMs)
- Edges: Event/command flow between components
- Metrics: Event counts, last event type, timestamps
Usage (Rust)
use angzarr::handlers::projectors::TopologyProjector;
use angzarr::handlers::projectors::topology::store::SqliteTopologyStore;
let store = SqliteTopologyStore::new("topology.db").await?;
let projector = TopologyProjector::new(Arc::new(store), 9099);
projector.init().await?; // Start REST server
// Process events (builds graph from observed traffic)
projector.process_event(&event_book).await?;
Configuration
| Variable | Description | Default |
|---|---|---|
TOPOLOGY_REST_PORT | REST API port | 9099 |
TOPOLOGY_DB_PATH | SQLite database path | topology.db |
Comparison
| Feature | Log | Event | Outbound | Topology |
|---|---|---|---|---|
| Output | Console | Database | gRPC + HTTP/Kafka | REST |
| Purpose | Debug | Audit/Query | Real-time + Integration | Visualization |
| State | Stateless | Persistent | In-memory | Persistent |
| Filtering | None | SQL queries | Correlation | Component |
| Production | Dev only | Yes | Yes | Yes |
Standalone Mode Integration
In standalone mode, register framework projectors via RuntimeBuilder:
let mut runtime = RuntimeBuilder::new()
.with_sqlite_memory()
.register_aggregate("orders", orders_aggregate)
// Framework projectors
.register_projector(
"topology",
topology_projector,
ProjectorConfig::async_(),
)
.build()
.await?;
Next Steps
- Projectors — Custom projector patterns
- Process Managers — Cross-domain orchestration