CloudEvents Projector
The CloudEvents projector transforms internal domain events into CloudEvents 1.0 format and publishes them to external systems via HTTP webhooks or Kafka.
This enables event-driven integrations where external consumers (partner APIs, analytics pipelines, notification services) receive filtered, public-facing events without accessing internal event stores.
Architecture
Client Code Framework
─────────── ─────────
EventBook ──→ [ProjectorHandler] ──→ Projection ──→ [CloudEventsCoordinator] ──→ Sink
(filters/transforms) (CloudEventsResponse (HTTP/Kafka)
packed in .projection)
Client projectors use existing projector libraries (ProjectorHandler, StateRouter) to:
- Receive internal events
- Filter sensitive fields (create "public" versions)
- Pack
CloudEventsResponseintoProjection.projection
Framework coordinator detects CloudEventsResponse by type URL and:
- Converts proto
Anydata to JSON via prost-reflect - Fills envelope fields (id, source, time) from event metadata
- Publishes to configured sinks
Client Usage
Two patterns are available for building CloudEvents projectors:
| Pattern | Best For | State Management |
|---|---|---|
| OO-style | Simple projectors, familiar class-based design | Instance fields |
| Router | Complex routing, multiple event types, functional style | Explicit state parameter |
Both patterns work in all supported languages. Choose based on team preference.
OO-Style (Class-Based)
Implement a projector class with handler methods. State lives in instance fields:
- Python
- Go
- Rust
- Java
- C#
- C++
class PlayerCloudEventsProjector(CloudEventsProjector):
"""Publishes public player events to external consumers."""
def __init__(self):
super().__init__("prj-player-cloudevents", "player")
def on_player_registered(self, event: player.PlayerRegistered) -> CloudEvent | None:
# Filter sensitive fields, return public version
public = player.PublicPlayerRegistered(
display_name=event.display_name,
player_type=event.player_type,
# Omit: email (PII)
)
data = Any()
data.Pack(public)
return CloudEvent(type="com.poker.player.registered", data=data)
def on_funds_deposited(self, event: player.FundsDeposited) -> CloudEvent | None:
public = player.PublicFundsDeposited(
amount=event.amount,
)
data = Any()
data.Pack(public)
return CloudEvent(
type="com.poker.player.deposited",
data=data,
extensions={"priority": "normal"},
)
type PlayerCloudEventsProjector struct {
angzarr.CloudEventsProjectorBase
}
func NewPlayerCloudEventsProjector() *PlayerCloudEventsProjector {
p := &PlayerCloudEventsProjector{}
p.Init("prj-player-cloudevents", "player")
return p
}
func (p *PlayerCloudEventsProjector) OnPlayerRegistered(event *examples.PlayerRegistered) *pb.CloudEvent {
// Filter sensitive fields, return public version
public := &examples.PublicPlayerRegistered{
DisplayName: event.DisplayName,
PlayerType: event.PlayerType,
}
data, _ := anypb.New(public)
return &pb.CloudEvent{
Type: "com.poker.player.registered",
Data: data,
}
}
func (p *PlayerCloudEventsProjector) OnFundsDeposited(event *examples.FundsDeposited) *pb.CloudEvent {
public := &examples.PublicFundsDeposited{
Amount: event.Amount,
}
data, _ := anypb.New(public)
return &pb.CloudEvent{
Type: "com.poker.player.deposited",
Data: data,
Extensions: map[string]string{"priority": "normal"},
}
}
pub struct PlayerCloudEventsProjector;
impl CloudEventsProjector for PlayerCloudEventsProjector {
fn name(&self) -> &str { "prj-player-cloudevents" }
fn domain(&self) -> &str { "player" }
}
impl PlayerCloudEventsProjector {
pub fn on_player_registered(&self, event: &PlayerRegistered) -> Option<CloudEvent> {
// Filter sensitive fields, return public version
let public = PublicPlayerRegistered {
display_name: event.display_name.clone(),
player_type: event.player_type,
};
Some(CloudEvent {
r#type: "com.poker.player.registered".into(),
data: Some(Any::from_msg(&public).ok()?),
..Default::default()
})
}
pub fn on_funds_deposited(&self, event: &FundsDeposited) -> Option<CloudEvent> {
let public = PublicFundsDeposited {
amount: event.amount.clone(),
};
Some(CloudEvent {
r#type: "com.poker.player.deposited".into(),
data: Some(Any::from_msg(&public).ok()?),
extensions: [("priority".into(), "normal".into())].into(),
..Default::default()
})
}
}
public class PlayerCloudEventsProjector extends CloudEventsProjector {
public PlayerCloudEventsProjector() {
super("prj-player-cloudevents", "player");
}
@Publishes("PlayerRegistered")
public CloudEvent onPlayerRegistered(PlayerRegistered event) {
// Filter sensitive fields, return public version
var publicEvent = PublicPlayerRegistered.newBuilder()
.setDisplayName(event.getDisplayName())
.setPlayerType(event.getPlayerType())
.build();
return CloudEvent.newBuilder()
.setType("com.poker.player.registered")
.setData(Any.pack(publicEvent))
.build();
}
@Publishes("FundsDeposited")
public CloudEvent onFundsDeposited(FundsDeposited event) {
var publicEvent = PublicFundsDeposited.newBuilder()
.setAmount(event.getAmount())
.build();
return CloudEvent.newBuilder()
.setType("com.poker.player.deposited")
.setData(Any.pack(publicEvent))
.putExtensions("priority", "normal")
.build();
}
}
public class PlayerCloudEventsProjector : CloudEventsProjector
{
public PlayerCloudEventsProjector()
: base("prj-player-cloudevents", "player") { }
[Publishes(typeof(PlayerRegistered))]
public CloudEvent? OnPlayerRegistered(PlayerRegistered @event)
{
// Filter sensitive fields, return public version
var publicEvent = new PublicPlayerRegistered
{
DisplayName = @event.DisplayName,
PlayerType = @event.PlayerType
};
return new CloudEvent
{
Type = "com.poker.player.registered",
Data = Any.Pack(publicEvent)
};
}
[Publishes(typeof(FundsDeposited))]
public CloudEvent? OnFundsDeposited(FundsDeposited @event)
{
var publicEvent = new PublicFundsDeposited
{
Amount = @event.Amount
};
return new CloudEvent
{
Type = "com.poker.player.deposited",
Data = Any.Pack(publicEvent),
Extensions = { ["priority"] = "normal" }
};
}
}
class PlayerCloudEventsProjector : public CloudEventsProjector {
public:
PlayerCloudEventsProjector()
: CloudEventsProjector("prj-player-cloudevents", "player") {}
std::optional<CloudEvent> on_player_registered(const PlayerRegistered& event) {
// Filter sensitive fields, return public version
PublicPlayerRegistered public_event;
public_event.set_display_name(event.display_name());
public_event.set_player_type(event.player_type());
CloudEvent ce;
ce.set_type("com.poker.player.registered");
ce.mutable_data()->PackFrom(public_event);
return ce;
}
std::optional<CloudEvent> on_funds_deposited(const FundsDeposited& event) {
PublicFundsDeposited public_event;
*public_event.mutable_amount() = event.amount();
CloudEvent ce;
ce.set_type("com.poker.player.deposited");
ce.mutable_data()->PackFrom(public_event);
(*ce.mutable_extensions())["priority"] = "normal";
return ce;
}
};
Router Pattern (Functional)
Register handlers with explicit state passing. Better for complex routing or when you prefer functional composition:
- Python
- Go
- Rust
- Java
- C#
- C++
def handle_player_registered(event: player.PlayerRegistered) -> CloudEvent | None:
public = player.PublicPlayerRegistered(
display_name=event.display_name,
player_type=event.player_type,
)
data = Any()
data.Pack(public)
return CloudEvent(type="com.poker.player.registered", data=data)
def handle_funds_deposited(event: player.FundsDeposited) -> CloudEvent | None:
public = player.PublicFundsDeposited(amount=event.amount)
data = Any()
data.Pack(public)
return CloudEvent(
type="com.poker.player.deposited",
data=data,
extensions={"priority": "normal"},
)
router = (
CloudEventsRouter("prj-player-cloudevents", "player")
.on("PlayerRegistered", handle_player_registered)
.on("FundsDeposited", handle_funds_deposited)
)
func handlePlayerRegistered(event *examples.PlayerRegistered) *pb.CloudEvent {
public := &examples.PublicPlayerRegistered{
DisplayName: event.DisplayName,
PlayerType: event.PlayerType,
}
data, _ := anypb.New(public)
return &pb.CloudEvent{Type: "com.poker.player.registered", Data: data}
}
func handleFundsDeposited(event *examples.FundsDeposited) *pb.CloudEvent {
public := &examples.PublicFundsDeposited{Amount: event.Amount}
data, _ := anypb.New(public)
return &pb.CloudEvent{
Type: "com.poker.player.deposited",
Data: data,
Extensions: map[string]string{"priority": "normal"},
}
}
var router = angzarr.NewCloudEventsRouter("prj-player-cloudevents", "player").
On("PlayerRegistered", handlePlayerRegistered).
On("FundsDeposited", handleFundsDeposited)
fn handle_player_registered(event: &PlayerRegistered) -> Option<CloudEvent> {
let public = PublicPlayerRegistered {
display_name: event.display_name.clone(),
player_type: event.player_type,
};
Some(CloudEvent {
r#type: "com.poker.player.registered".into(),
data: Some(Any::from_msg(&public).ok()?),
..Default::default()
})
}
fn handle_funds_deposited(event: &FundsDeposited) -> Option<CloudEvent> {
let public = PublicFundsDeposited {
amount: event.amount.clone(),
};
Some(CloudEvent {
r#type: "com.poker.player.deposited".into(),
data: Some(Any::from_msg(&public).ok()?),
extensions: [("priority".into(), "normal".into())].into(),
..Default::default()
})
}
fn build_router() -> CloudEventsRouter {
CloudEventsRouter::new("prj-player-cloudevents", "player")
.on::<PlayerRegistered>(handle_player_registered)
.on::<FundsDeposited>(handle_funds_deposited)
}
class PlayerCloudEventsHandlers {
static CloudEvent handlePlayerRegistered(PlayerRegistered event) {
var publicEvent = PublicPlayerRegistered.newBuilder()
.setDisplayName(event.getDisplayName())
.setPlayerType(event.getPlayerType())
.build();
return CloudEvent.newBuilder()
.setType("com.poker.player.registered")
.setData(Any.pack(publicEvent))
.build();
}
static CloudEvent handleFundsDeposited(FundsDeposited event) {
var publicEvent = PublicFundsDeposited.newBuilder()
.setAmount(event.getAmount())
.build();
return CloudEvent.newBuilder()
.setType("com.poker.player.deposited")
.setData(Any.pack(publicEvent))
.putExtensions("priority", "normal")
.build();
}
static CloudEventsRouter buildRouter() {
return new CloudEventsRouter("prj-player-cloudevents", "player")
.on(PlayerRegistered.class, PlayerCloudEventsHandlers::handlePlayerRegistered)
.on(FundsDeposited.class, PlayerCloudEventsHandlers::handleFundsDeposited);
}
}
public static class PlayerCloudEventsHandlers
{
public static CloudEvent? HandlePlayerRegistered(PlayerRegistered @event)
{
var publicEvent = new PublicPlayerRegistered
{
DisplayName = @event.DisplayName,
PlayerType = @event.PlayerType
};
return new CloudEvent
{
Type = "com.poker.player.registered",
Data = Any.Pack(publicEvent)
};
}
public static CloudEvent? HandleFundsDeposited(FundsDeposited @event)
{
var publicEvent = new PublicFundsDeposited
{
Amount = @event.Amount
};
return new CloudEvent
{
Type = "com.poker.player.deposited",
Data = Any.Pack(publicEvent),
Extensions = { ["priority"] = "normal" }
};
}
public static CloudEventsRouter BuildRouter() =>
new CloudEventsRouter("prj-player-cloudevents", "player")
.On<PlayerRegistered>(HandlePlayerRegistered)
.On<FundsDeposited>(HandleFundsDeposited);
}
std::optional<CloudEvent> handle_player_registered(const PlayerRegistered& event) {
PublicPlayerRegistered public_event;
public_event.set_display_name(event.display_name());
public_event.set_player_type(event.player_type());
CloudEvent ce;
ce.set_type("com.poker.player.registered");
ce.mutable_data()->PackFrom(public_event);
return ce;
}
std::optional<CloudEvent> handle_funds_deposited(const FundsDeposited& event) {
PublicFundsDeposited public_event;
*public_event.mutable_amount() = event.amount();
CloudEvent ce;
ce.set_type("com.poker.player.deposited");
ce.mutable_data()->PackFrom(public_event);
(*ce.mutable_extensions())["priority"] = "normal";
return ce;
}
CloudEventsRouter build_router() {
return CloudEventsRouter("prj-player-cloudevents", "player")
.on<PlayerRegistered>(handle_player_registered)
.on<FundsDeposited>(handle_funds_deposited);
}
Raw Handle (Low-Level)
For full control over the projection lifecycle, implement the raw handle method:
async fn handle(&self, events: &EventBook, _mode: ProjectionMode) -> Result<Projection, Status> {
let mut cloud_events = Vec::new();
for page in &events.pages {
if let Some(event) = &page.event {
if event.type_url.ends_with("OrderCreated") {
let order = OrderCreated::decode(&event.value[..])?;
// Create filtered public event
let public = PublicOrderCreated {
order_id: order.order_id.clone(),
total: order.total,
};
let data = prost_types::Any::from_msg(&public)?;
cloud_events.push(CloudEvent {
r#type: "com.example.order.created".to_string(),
data: Some(data),
..Default::default()
});
}
}
}
let response = CloudEventsResponse { events: cloud_events };
let projection_any = prost_types::Any::from_msg(&response)?;
Ok(Projection {
cover: events.cover.clone(),
projector: "prj-orders-cloudevents".to_string(),
projection: Some(projection_any),
..Default::default()
})
}
CloudEvent Proto
// CloudEvent represents a single event for external consumption.
//
// Client projectors create these by filtering/transforming internal events.
// Framework fills envelope fields (id, source, time) from Cover/EventPage
// if not explicitly set by the client.
//
// The `data` field is a protobuf Any that framework converts to JSON via
// prost-reflect using the descriptor pool. Clients should pack a "public"
// proto message that omits sensitive fields.
message CloudEvent {
// Event type (e.g., "com.example.order.created").
// Default: proto type_url suffix from original event.
string type = 1;
// Event payload as proto Any.
// Framework converts to JSON for CloudEvents output.
// Client should filter sensitive fields before packing.
google.protobuf.Any data = 2;
// Custom extension attributes.
// Keys should follow CloudEvents naming (lowercase, no dots).
// Framework adds correlationid automatically if present in Cover.
map<string, string> extensions = 3;
// Optional overrides. Framework uses Cover/EventPage values if not set.
optional string id = 4; // Default: {domain}:{root_id}:{sequence}
optional string source = 5; // Default: angzarr/{domain}
optional string subject = 6; // Default: aggregate root ID
}
// CloudEventsResponse is returned by client projectors in Projection.projection.
//
// Framework detects this type by checking projection.type_url and routes
// the events to configured sinks (HTTP webhook, Kafka).
//
// Client may return 0 events (skip), 1 event (typical), or N events
// (fan-out scenarios like multi-tenant notifications).
message CloudEventsResponse {
repeated CloudEvent events = 1;
}
Output Format
Framework produces CloudEvents 1.0 JSON:
{
"specversion": "1.0",
"id": "orders:abc123def456:5",
"type": "com.example.order.created",
"source": "angzarr/orders",
"time": "2024-01-15T10:30:00Z",
"datacontenttype": "application/json",
"subject": "abc123def456",
"correlationid": "corr-xyz-789",
"data": {
"order_id": "ORD-12345",
"total": 9999
}
}
Field Defaults
| Field | Default Value | Override |
|---|---|---|
id | {domain}:{root_id}:{sequence} | CloudEvent.id |
source | angzarr/{domain} | CloudEvent.source |
subject | Aggregate root ID (hex) | CloudEvent.subject |
time | Event creation timestamp | From EventPage |
correlationid | Cover correlation ID | Auto-added as extension |
Extension Attributes
The framework supports custom CloudEvents extension attributes via the extensions map in CloudEvent.
Automatic Lowercasing
Per the CloudEvents spec, extension attribute names must be lowercase. The framework automatically lowercases all extension keys, so you can use any case in your client code:
# All of these become "traceid" in the output
CloudEvent(
type="com.example.order.created",
extensions={
"TraceID": "abc123", # → "traceid"
"PRIORITY": "high", # → "priority"
"customExt": "value", # → "customext"
}
)
This follows Postel's Law: "be tolerant of what you accept, strict in what you emit".
Built-in Extensions
The framework automatically adds:
| Extension | Source | Description |
|---|---|---|
correlationid | Cover.correlation_id | Cross-domain workflow identifier |
Validation
The framework uses the official cloudevents-sdk crate for CloudEvents 1.0 validation. Invalid events (missing required fields, malformed URIs, etc.) are rejected with an error rather than silently published.
Configuration
Environment Variables
| Variable | Description | Default |
|---|---|---|
CLOUDEVENTS_SINK | Sink type: http, kafka, or both | http |
CLOUDEVENTS_HTTP_ENDPOINT | Webhook URL | Required if http |
CLOUDEVENTS_HTTP_TIMEOUT | Request timeout (seconds) | 30 |
CLOUDEVENTS_BATCH_SIZE | Max events per HTTP request | 100 |
CLOUDEVENTS_KAFKA_BROKERS | Kafka bootstrap servers | Required if kafka |
CLOUDEVENTS_KAFKA_TOPIC | Kafka topic name | cloudevents |
CLOUDEVENTS_KAFKA_TIMEOUT | Delivery timeout (seconds) | 5 |
Kafka Authentication
| Variable | Description |
|---|---|
CLOUDEVENTS_KAFKA_SASL_USERNAME | SASL username |
CLOUDEVENTS_KAFKA_SASL_PASSWORD | SASL password |
CLOUDEVENTS_KAFKA_SASL_MECHANISM | PLAIN, SCRAM-SHA-256, SCRAM-SHA-512 |
CLOUDEVENTS_KAFKA_SECURITY_PROTOCOL | PLAINTEXT, SSL, SASL_PLAINTEXT, SASL_SSL |
CLOUDEVENTS_KAFKA_SSL_CA | SSL CA certificate path |
HTTP Sink
POSTs batches using application/cloudevents-batch+json content type:
POST /events HTTP/1.1
Host: webhook.example.com
Content-Type: application/cloudevents-batch+json
[
{"specversion": "1.0", "id": "...", "type": "...", "data": {...}},
{"specversion": "1.0", "id": "...", "type": "...", "data": {...}}
]
Retry behavior:
- Retries on: connection errors, timeouts, 429 (rate limit), 5xx (server errors)
- Exponential backoff: 100ms → 5s max, 5 attempts
- No retry on: 4xx client errors (except 429)
Kafka Sink
Publishes JSON-serialized CloudEvents with:
- Message key:
subjectfield (aggregate root ID) for ordering - Idempotent producer: Enabled by default
- Acks:
all(wait for ISR acknowledgment)
Filtering Sensitive Data
CloudEvents are public-facing. Always create "public" proto messages that exclude:
- PII (emails, phone numbers, addresses)
- Payment details (card numbers, bank accounts)
- Internal IDs and implementation details
- Authentication tokens or secrets
# Internal event (has sensitive data)
class OrderCreated:
order_id: str
customer_email: str # PII
payment_token: str # Secret
total: int
internal_notes: str # Internal
# Public event (safe to expose)
class PublicOrderCreated:
order_id: str
total: int
Multiple Events per Source Event
A single source event can produce multiple CloudEvents (fan-out):
def transform_events(event_book):
cloud_events = []
for page in event_book.pages:
if page.event.type_url.endswith("OrderCreated"):
order = unpack(page.event)
# Event for order tracking
cloud_events.append(CloudEvent(
type="com.example.order.created",
data=pack(OrderPublic(order_id=order.order_id)),
))
# Event for each line item (inventory systems)
for item in order.items:
cloud_events.append(CloudEvent(
type="com.example.lineitem.ordered",
data=pack(LineItemPublic(sku=item.sku, quantity=item.qty)),
))
return pack_response(cloud_events)
Skipping Events
Return an empty CloudEventsResponse to skip publishing:
def transform_events(event_book):
cloud_events = []
for page in event_book.pages:
# Only publish certain event types
if page.event.type_url.endswith("OrderShipped"):
cloud_events.append(...)
# If no events match, returns empty response (nothing published)
return pack_response(cloud_events)
Framework Integration (Rust)
For direct framework usage (not via client projector):
use angzarr::handlers::projectors::{
CloudEventsCoordinator, HttpSink, HttpSinkConfig, NullSink
};
// Create HTTP sink
let sink = HttpSink::new(
HttpSinkConfig::default()
.with_endpoint("https://webhook.example.com/events".to_string())
.with_timeout(Duration::from_secs(30))
)?;
// Create coordinator
let coordinator = CloudEventsCoordinator::new(Arc::new(sink));
// Process projection (returns true if CloudEventsResponse was detected)
let was_cloudevents = coordinator.process(&projection, Some(&source_events)).await?;
Next Steps
- Projectors — Projector patterns and StateRouter
- Framework Projectors — Other built-in projectors