Skip to main content

CloudEvents Projector

The CloudEvents projector transforms internal domain events into CloudEvents 1.0 format and publishes them to external systems via HTTP webhooks or Kafka.

This enables event-driven integrations where external consumers (partner APIs, analytics pipelines, notification services) receive filtered, public-facing events without accessing internal event stores.


Architecture

                    Client Code                        Framework
─────────── ─────────
EventBook ──→ [ProjectorHandler] ──→ Projection ──→ [CloudEventsCoordinator] ──→ Sink
(filters/transforms) (CloudEventsResponse (HTTP/Kafka)
packed in .projection)

Client projectors use existing projector libraries (ProjectorHandler, StateRouter) to:

  1. Receive internal events
  2. Filter sensitive fields (create "public" versions)
  3. Pack CloudEventsResponse into Projection.projection

Framework coordinator detects CloudEventsResponse by type URL and:

  1. Converts proto Any data to JSON via prost-reflect
  2. Fills envelope fields (id, source, time) from event metadata
  3. Publishes to configured sinks

Client Usage

Two patterns are available for building CloudEvents projectors:

PatternBest ForState Management
OO-styleSimple projectors, familiar class-based designInstance fields
RouterComplex routing, multiple event types, functional styleExplicit state parameter

Both patterns work in all supported languages. Choose based on team preference.

OO-Style (Class-Based)

Implement a projector class with handler methods. State lives in instance fields:

class PlayerCloudEventsProjector(CloudEventsProjector):
"""Publishes public player events to external consumers."""

def __init__(self):
super().__init__("prj-player-cloudevents", "player")

def on_player_registered(self, event: player.PlayerRegistered) -> CloudEvent | None:
# Filter sensitive fields, return public version
public = player.PublicPlayerRegistered(
display_name=event.display_name,
player_type=event.player_type,
# Omit: email (PII)
)
data = Any()
data.Pack(public)
return CloudEvent(type="com.poker.player.registered", data=data)

def on_funds_deposited(self, event: player.FundsDeposited) -> CloudEvent | None:
public = player.PublicFundsDeposited(
amount=event.amount,
)
data = Any()
data.Pack(public)
return CloudEvent(
type="com.poker.player.deposited",
data=data,
extensions={"priority": "normal"},
)

Router Pattern (Functional)

Register handlers with explicit state passing. Better for complex routing or when you prefer functional composition:

def handle_player_registered(event: player.PlayerRegistered) -> CloudEvent | None:
public = player.PublicPlayerRegistered(
display_name=event.display_name,
player_type=event.player_type,
)
data = Any()
data.Pack(public)
return CloudEvent(type="com.poker.player.registered", data=data)


def handle_funds_deposited(event: player.FundsDeposited) -> CloudEvent | None:
public = player.PublicFundsDeposited(amount=event.amount)
data = Any()
data.Pack(public)
return CloudEvent(
type="com.poker.player.deposited",
data=data,
extensions={"priority": "normal"},
)


router = (
CloudEventsRouter("prj-player-cloudevents", "player")
.on("PlayerRegistered", handle_player_registered)
.on("FundsDeposited", handle_funds_deposited)
)

Raw Handle (Low-Level)

For full control over the projection lifecycle, implement the raw handle method:

async fn handle(&self, events: &EventBook, _mode: ProjectionMode) -> Result<Projection, Status> {
let mut cloud_events = Vec::new();

for page in &events.pages {
if let Some(event) = &page.event {
if event.type_url.ends_with("OrderCreated") {
let order = OrderCreated::decode(&event.value[..])?;

// Create filtered public event
let public = PublicOrderCreated {
order_id: order.order_id.clone(),
total: order.total,
};
let data = prost_types::Any::from_msg(&public)?;

cloud_events.push(CloudEvent {
r#type: "com.example.order.created".to_string(),
data: Some(data),
..Default::default()
});
}
}
}

let response = CloudEventsResponse { events: cloud_events };
let projection_any = prost_types::Any::from_msg(&response)?;

Ok(Projection {
cover: events.cover.clone(),
projector: "prj-orders-cloudevents".to_string(),
projection: Some(projection_any),
..Default::default()
})
}

CloudEvent Proto

// CloudEvent represents a single event for external consumption.
//
// Client projectors create these by filtering/transforming internal events.
// Framework fills envelope fields (id, source, time) from Cover/EventPage
// if not explicitly set by the client.
//
// The `data` field is a protobuf Any that framework converts to JSON via
// prost-reflect using the descriptor pool. Clients should pack a "public"
// proto message that omits sensitive fields.
message CloudEvent {
// Event type (e.g., "com.example.order.created").
// Default: proto type_url suffix from original event.
string type = 1;

// Event payload as proto Any.
// Framework converts to JSON for CloudEvents output.
// Client should filter sensitive fields before packing.
google.protobuf.Any data = 2;

// Custom extension attributes.
// Keys should follow CloudEvents naming (lowercase, no dots).
// Framework adds correlationid automatically if present in Cover.
map<string, string> extensions = 3;

// Optional overrides. Framework uses Cover/EventPage values if not set.
optional string id = 4; // Default: {domain}:{root_id}:{sequence}
optional string source = 5; // Default: angzarr/{domain}
optional string subject = 6; // Default: aggregate root ID
}

// CloudEventsResponse is returned by client projectors in Projection.projection.
//
// Framework detects this type by checking projection.type_url and routes
// the events to configured sinks (HTTP webhook, Kafka).
//
// Client may return 0 events (skip), 1 event (typical), or N events
// (fan-out scenarios like multi-tenant notifications).
message CloudEventsResponse {
repeated CloudEvent events = 1;
}

Output Format

Framework produces CloudEvents 1.0 JSON:

{
"specversion": "1.0",
"id": "orders:abc123def456:5",
"type": "com.example.order.created",
"source": "angzarr/orders",
"time": "2024-01-15T10:30:00Z",
"datacontenttype": "application/json",
"subject": "abc123def456",
"correlationid": "corr-xyz-789",
"data": {
"order_id": "ORD-12345",
"total": 9999
}
}

Field Defaults

FieldDefault ValueOverride
id{domain}:{root_id}:{sequence}CloudEvent.id
sourceangzarr/{domain}CloudEvent.source
subjectAggregate root ID (hex)CloudEvent.subject
timeEvent creation timestampFrom EventPage
correlationidCover correlation IDAuto-added as extension

Extension Attributes

The framework supports custom CloudEvents extension attributes via the extensions map in CloudEvent.

Automatic Lowercasing

Per the CloudEvents spec, extension attribute names must be lowercase. The framework automatically lowercases all extension keys, so you can use any case in your client code:

# All of these become "traceid" in the output
CloudEvent(
type="com.example.order.created",
extensions={
"TraceID": "abc123", # → "traceid"
"PRIORITY": "high", # → "priority"
"customExt": "value", # → "customext"
}
)

This follows Postel's Law: "be tolerant of what you accept, strict in what you emit".

Built-in Extensions

The framework automatically adds:

ExtensionSourceDescription
correlationidCover.correlation_idCross-domain workflow identifier

Validation

The framework uses the official cloudevents-sdk crate for CloudEvents 1.0 validation. Invalid events (missing required fields, malformed URIs, etc.) are rejected with an error rather than silently published.


Configuration

Environment Variables

VariableDescriptionDefault
CLOUDEVENTS_SINKSink type: http, kafka, or bothhttp
CLOUDEVENTS_HTTP_ENDPOINTWebhook URLRequired if http
CLOUDEVENTS_HTTP_TIMEOUTRequest timeout (seconds)30
CLOUDEVENTS_BATCH_SIZEMax events per HTTP request100
CLOUDEVENTS_KAFKA_BROKERSKafka bootstrap serversRequired if kafka
CLOUDEVENTS_KAFKA_TOPICKafka topic namecloudevents
CLOUDEVENTS_KAFKA_TIMEOUTDelivery timeout (seconds)5

Kafka Authentication

VariableDescription
CLOUDEVENTS_KAFKA_SASL_USERNAMESASL username
CLOUDEVENTS_KAFKA_SASL_PASSWORDSASL password
CLOUDEVENTS_KAFKA_SASL_MECHANISMPLAIN, SCRAM-SHA-256, SCRAM-SHA-512
CLOUDEVENTS_KAFKA_SECURITY_PROTOCOLPLAINTEXT, SSL, SASL_PLAINTEXT, SASL_SSL
CLOUDEVENTS_KAFKA_SSL_CASSL CA certificate path

HTTP Sink

POSTs batches using application/cloudevents-batch+json content type:

POST /events HTTP/1.1
Host: webhook.example.com
Content-Type: application/cloudevents-batch+json

[
{"specversion": "1.0", "id": "...", "type": "...", "data": {...}},
{"specversion": "1.0", "id": "...", "type": "...", "data": {...}}
]

Retry behavior:

  • Retries on: connection errors, timeouts, 429 (rate limit), 5xx (server errors)
  • Exponential backoff: 100ms → 5s max, 5 attempts
  • No retry on: 4xx client errors (except 429)

Kafka Sink

Publishes JSON-serialized CloudEvents with:

  • Message key: subject field (aggregate root ID) for ordering
  • Idempotent producer: Enabled by default
  • Acks: all (wait for ISR acknowledgment)

Filtering Sensitive Data

CloudEvents are public-facing. Always create "public" proto messages that exclude:

  • PII (emails, phone numbers, addresses)
  • Payment details (card numbers, bank accounts)
  • Internal IDs and implementation details
  • Authentication tokens or secrets
# Internal event (has sensitive data)
class OrderCreated:
order_id: str
customer_email: str # PII
payment_token: str # Secret
total: int
internal_notes: str # Internal

# Public event (safe to expose)
class PublicOrderCreated:
order_id: str
total: int

Multiple Events per Source Event

A single source event can produce multiple CloudEvents (fan-out):

def transform_events(event_book):
cloud_events = []

for page in event_book.pages:
if page.event.type_url.endswith("OrderCreated"):
order = unpack(page.event)

# Event for order tracking
cloud_events.append(CloudEvent(
type="com.example.order.created",
data=pack(OrderPublic(order_id=order.order_id)),
))

# Event for each line item (inventory systems)
for item in order.items:
cloud_events.append(CloudEvent(
type="com.example.lineitem.ordered",
data=pack(LineItemPublic(sku=item.sku, quantity=item.qty)),
))

return pack_response(cloud_events)

Skipping Events

Return an empty CloudEventsResponse to skip publishing:

def transform_events(event_book):
cloud_events = []

for page in event_book.pages:
# Only publish certain event types
if page.event.type_url.endswith("OrderShipped"):
cloud_events.append(...)

# If no events match, returns empty response (nothing published)
return pack_response(cloud_events)

Framework Integration (Rust)

For direct framework usage (not via client projector):

use angzarr::handlers::projectors::{
CloudEventsCoordinator, HttpSink, HttpSinkConfig, NullSink
};

// Create HTTP sink
let sink = HttpSink::new(
HttpSinkConfig::default()
.with_endpoint("https://webhook.example.com/events".to_string())
.with_timeout(Duration::from_secs(30))
)?;

// Create coordinator
let coordinator = CloudEventsCoordinator::new(Arc::new(sink));

// Process projection (returns true if CloudEventsResponse was detected)
let was_cloudevents = coordinator.process(&projection, Some(&source_events)).await?;

Next Steps