Projectors
A projector subscribes to events and performs side effects—typically building read models, writing to databases, or streaming to external systems. Projectors transform the event stream into query-optimized views.
Unlike aggregates and sagas, projectors are read-only from the event sourcing perspective. They observe events but never emit commands back to aggregates.
Common Use Cases
| Projector | Events | Output |
|---|---|---|
| Search indexer | Product events | Elasticsearch updates |
| Dashboard stream | All domain events | WebSocket push |
| Analytics ETL | Transaction events | Data warehouse |
| Cache warmer | Player events | Redis cache |
| Output renderer | Poker events | Console text |
Implementation Styles
Angzarr supports two projector implementation styles:
| Style | Description | Best For |
|---|---|---|
| Functional | StateRouter with function handlers | Simple projections, stateless |
| OO | Projector class with @projects/[Projects] decorators | Rich state, encapsulation |
Example: Output Projector
Transforms poker events into human-readable text:
- Functional
- OO
StateRouter with explicit event handler registration:
- Python
- C#
- Rust
- Java
- Go
- C++
class OutputProjector:
"""
Projector that subscribes to events from all domains and outputs text.
This is a read-side component that:
1. Receives events via saga routing
2. Unpacks them from Any wrappers
3. Renders them as human-readable text
4. Outputs to configured destination (console, file, etc.)
"""
def __init__(
self,
output_fn: Callable[[str], None] = print,
show_timestamps: bool = False,
):
self.renderer = TextRenderer()
self.output_fn = output_fn
self.show_timestamps = show_timestamps
def set_player_name(self, player_root: bytes, name: str):
"""Set display name for a player."""
self.renderer.set_player_name(player_root, name)
def handle_event(self, event_page: types.EventPage) -> None:
"""Handle a single event page from any domain."""
event_any = event_page.event
type_url = event_any.type_url
# Extract event type from type_url
# Format: "type.poker/examples.EventName"
event_type = type_url.split(".")[-1] if "." in type_url else type_url
if event_type not in EVENT_TYPES:
self.output_fn(f"[Unknown event type: {type_url}]")
return
# Unpack the event
event_class = EVENT_TYPES[event_type]
event = event_class()
event_any.Unpack(event)
# Render and output
text = self.renderer.render(event_type, event)
if text:
if self.show_timestamps and event_page.created_at:
from datetime import datetime, timezone
ts = datetime.fromtimestamp(
event_page.created_at.seconds, tz=timezone.utc
)
text = f"[{ts.strftime('%H:%M:%S')}] {text}"
self.output_fn(text)
def handle_event_book(self, event_book: types.EventBook) -> None:
"""Handle all events in an event book."""
for page in event_book.pages:
self.handle_event(page)
def project_from_stream(self, event_stream) -> None:
"""
Project events from a stream (generator or async iterator).
This is the main entry point for saga-routed events.
"""
for event_book in event_stream:
self.handle_event_book(event_book)
public static class OutputProjectorRouter
{
public static StateRouter BuildRouter()
{
var playerNames = new Dictionary<string, string>();
return new StateRouter("prj-output")
.Subscribes("player", new[] { "PlayerRegistered", "FundsDeposited" })
.Subscribes("hand", new[] { "CardsDealt", "ActionTaken", "PotAwarded" })
.On<PlayerRegistered>(evt =>
{
playerNames[evt.PlayerId] = evt.DisplayName;
Console.WriteLine($"[Player] {evt.DisplayName} registered");
})
.On<FundsDeposited>(evt =>
{
var name = playerNames.GetValueOrDefault(evt.PlayerId, evt.PlayerId);
Console.WriteLine($"[Player] {name} deposited ${evt.Amount?.Amount / 100.0:F2}");
})
.On<CardsDealt>(evt =>
{
foreach (var player in evt.PlayerCards)
{
var name = playerNames.GetValueOrDefault(player.PlayerId, player.PlayerId);
Console.WriteLine($"[Hand] {name} dealt cards");
}
});
}
}
#[allow(clippy::result_large_err)]
fn handle_events(events: &EventBook) -> Result<Projection, Status> {
let cover = events.cover.as_ref();
let domain = cover.map(|c| c.domain.as_str()).unwrap_or("");
let root_id = cover
.and_then(|c| c.root.as_ref())
.map(|u| {
if u.value.len() >= 4 {
hex::encode(&u.value[..4])
} else {
hex::encode(&u.value)
}
})
.unwrap_or_default();
let mut seq = 0u32;
for page in &events.pages {
let event_any = match &page.payload {
Some(event_page::Payload::Event(e)) => e,
_ => continue,
};
seq = get_sequence(page);
let type_url = &event_any.type_url;
let type_name = type_url.rsplit('.').next().unwrap_or(type_url);
let msg = format_event(domain, &root_id, type_name, &event_any.value);
write_log(&msg);
}
Ok(Projection {
cover: cover.cloned(),
projector: "output".to_string(),
sequence: seq,
projection: None,
})
}
public class OutputProjector {
private static final Map<String, Class<?>> EVENT_TYPES = new HashMap<>();
static {
// Player events
EVENT_TYPES.put("PlayerRegistered", PlayerRegistered.class);
EVENT_TYPES.put("FundsDeposited", FundsDeposited.class);
EVENT_TYPES.put("FundsWithdrawn", FundsWithdrawn.class);
EVENT_TYPES.put("FundsReserved", FundsReserved.class);
EVENT_TYPES.put("FundsReleased", FundsReleased.class);
// Table events
EVENT_TYPES.put("TableCreated", TableCreated.class);
EVENT_TYPES.put("PlayerJoined", PlayerJoined.class);
EVENT_TYPES.put("PlayerLeft", PlayerLeft.class);
EVENT_TYPES.put("HandStarted", HandStarted.class);
EVENT_TYPES.put("HandEnded", HandEnded.class);
// Hand events
EVENT_TYPES.put("CardsDealt", CardsDealt.class);
EVENT_TYPES.put("BlindPosted", BlindPosted.class);
EVENT_TYPES.put("ActionTaken", ActionTaken.class);
EVENT_TYPES.put("CommunityCardsDealt", CommunityCardsDealt.class);
EVENT_TYPES.put("PotAwarded", PotAwarded.class);
EVENT_TYPES.put("HandComplete", HandComplete.class);
}
private final TextRenderer renderer;
private final Consumer<String> outputFn;
private final boolean showTimestamps;
private final DateTimeFormatter timeFormatter;
public OutputProjector(Consumer<String> outputFn, boolean showTimestamps) {
this.renderer = new TextRenderer();
this.outputFn = outputFn;
this.showTimestamps = showTimestamps;
this.timeFormatter = DateTimeFormatter.ofPattern("HH:mm:ss").withZone(ZoneOffset.UTC);
}
/** Get list of domains this projector subscribes to. */
public List<String> getInputDomains() {
return List.of("player", "table", "hand");
}
/** Set display name for a player root. */
public void setPlayerName(byte[] playerRoot, String name) {
renderer.setPlayerName(playerRoot, name);
}
/** Handle an event book and return a projection. */
public Projection handle(EventBook eventBook) {
handleEventBook(eventBook);
// Return a projection with the sequence number
int seq = 0;
if (eventBook.getPagesCount() > 0) {
EventPage lastPage = eventBook.getPages(eventBook.getPagesCount() - 1);
seq = lastPage.getHeader().getSequence();
}
return Projection.newBuilder()
.setCover(eventBook.getCover())
.setProjector("output")
.setSequence(seq)
.build();
}
/** Handle all events in an event book. */
public void handleEventBook(EventBook eventBook) {
for (EventPage page : eventBook.getPagesList()) {
handleEvent(page);
}
}
/** Handle a single event page from any domain. */
public void handleEvent(EventPage eventPage) {
Any eventAny = eventPage.getEvent();
String typeUrl = eventAny.getTypeUrl();
// Extract event type from type_url
String eventType = extractEventType(typeUrl);
if (!EVENT_TYPES.containsKey(eventType)) {
outputFn.accept("[Unknown event type: " + typeUrl + "]");
return;
}
try {
// Unpack the event
@SuppressWarnings("unchecked")
Class<? extends com.google.protobuf.Message> eventClass =
(Class<? extends com.google.protobuf.Message>) EVENT_TYPES.get(eventType);
Object event = eventAny.unpack(eventClass);
// Render and output
String text = renderer.render(eventType, event);
if (text != null && !text.isEmpty()) {
if (showTimestamps && eventPage.hasCreatedAt()) {
Instant ts =
Instant.ofEpochSecond(
eventPage.getCreatedAt().getSeconds(), eventPage.getCreatedAt().getNanos());
text = "[" + timeFormatter.format(ts) + "] " + text;
}
outputFn.accept(text);
}
} catch (InvalidProtocolBufferException e) {
outputFn.accept("[Failed to unpack: " + typeUrl + "]");
}
}
private String extractEventType(String typeUrl) {
int dotIndex = typeUrl.lastIndexOf('.');
if (dotIndex >= 0) {
return typeUrl.substring(dotIndex + 1);
}
return typeUrl;
}
/** Create a file-based projector. */
public static OutputProjector forFile(String path, boolean showTimestamps) throws IOException {
PrintWriter writer = new PrintWriter(new FileWriter(path, true));
return new OutputProjector(
line -> {
writer.println(line);
writer.flush();
},
showTimestamps);
}
/** Create a console-based projector. */
public static OutputProjector forConsole(boolean showTimestamps) {
return new OutputProjector(System.out::println, showTimestamps);
}
}
func handleEvents(events *pb.EventBook) (*pb.Projection, error) {
if events == nil || events.Cover == nil {
return &pb.Projection{}, nil
}
domain := events.Cover.Domain
rootID := angzarr.RootIDText(events)
var seq uint32
for _, page := range events.Pages {
event := page.GetEvent()
if event == nil {
continue
}
seq = getSequence(page)
typeURL := event.TypeUrl
typeName := typeURL[strings.LastIndex(typeURL, ".")+1:]
msg := formatEvent(domain, rootID, typeName, event.Value)
writeLog(msg)
}
return &pb.Projection{
Cover: events.Cover,
Projector: "output",
Sequence: seq,
}, nil
}
/// gRPC service implementation for output projector.
class OutputProjectorService final : public angzarr::ProjectorService::Service {
public:
explicit OutputProjectorService(const std::string& log_path, bool show_timestamps = true)
: log_path_(log_path),
log_file_(log_path, std::ios::app),
show_timestamps_(show_timestamps) {}
~OutputProjectorService() {
if (log_file_.is_open()) {
log_file_.close();
}
}
grpc::Status Handle(grpc::ServerContext* context, const angzarr::EventBook* request,
angzarr::Projection* response) override {
return process_event_book(*request, response);
}
grpc::Status HandleSpeculative(grpc::ServerContext* context, const angzarr::EventBook* request,
angzarr::Projection* response) override {
(void)context;
// Speculative mode - don't write to file
uint32_t seq = 0;
for (const auto& page : request->pages()) {
if (page.has_header() && page.header().has_sequence()) {
seq = page.header().sequence();
}
}
response->mutable_cover()->CopyFrom(request->cover());
response->set_projector(PROJECTOR_NAME);
response->set_sequence(seq);
return grpc::Status::OK;
}
private:
grpc::Status process_event_book(const angzarr::EventBook& event_book,
angzarr::Projection* response) {
uint32_t seq = 0;
for (const auto& page : event_book.pages()) {
const auto& event_any = page.event();
if (page.has_header() && page.header().has_sequence()) {
seq = page.header().sequence();
}
// Format and write event
std::string formatted = format_event(event_any, event_book.cover().domain());
if (!formatted.empty()) {
write_line(formatted);
}
}
response->mutable_cover()->CopyFrom(event_book.cover());
response->set_projector(PROJECTOR_NAME);
response->set_sequence(seq);
return grpc::Status::OK;
}
std::string format_event(const google::protobuf::Any& event_any, const std::string& domain) {
std::string prefix;
if (show_timestamps_) {
auto now = std::chrono::system_clock::now();
auto time = std::chrono::system_clock::to_time_t(now);
std::ostringstream ss;
ss << std::put_time(std::localtime(&time), "%H:%M:%S");
prefix = "[" + ss.str() + "] ";
}
const std::string& type_url = event_any.type_url();
// Player events
if (type_url.find("PlayerRegistered") != std::string::npos) {
examples::PlayerRegistered event;
event_any.UnpackTo(&event);
return prefix + "Player registered: " + event.display_name();
}
if (type_url.find("FundsDeposited") != std::string::npos) {
examples::FundsDeposited event;
event_any.UnpackTo(&event);
return prefix + "Funds deposited: " + std::to_string(event.new_balance().amount());
}
// Table events
if (type_url.find("TableCreated") != std::string::npos) {
examples::TableCreated event;
event_any.UnpackTo(&event);
return prefix + "Table created: " + event.table_name();
}
if (type_url.find("PlayerJoined") != std::string::npos) {
examples::PlayerJoined event;
event_any.UnpackTo(&event);
return prefix + "Player joined at position " + std::to_string(event.seat_position());
}
if (type_url.find("HandStarted") != std::string::npos) {
examples::HandStarted event;
event_any.UnpackTo(&event);
return prefix + "Hand started: dealer position " +
std::to_string(event.dealer_position());
}
// Hand events
if (type_url.find("CardsDealt") != std::string::npos) {
examples::CardsDealt event;
event_any.UnpackTo(&event);
return prefix + "Cards dealt to " + std::to_string(event.players_size()) + " players";
}
if (type_url.find("BlindPosted") != std::string::npos) {
examples::BlindPosted event;
event_any.UnpackTo(&event);
return prefix + "Blind posted: " + std::to_string(event.amount());
}
if (type_url.find("ActionTaken") != std::string::npos) {
examples::ActionTaken event;
event_any.UnpackTo(&event);
return prefix + "Action: " + examples::ActionType_Name(event.action());
}
if (type_url.find("CommunityCardsDealt") != std::string::npos) {
examples::CommunityCardsDealt event;
event_any.UnpackTo(&event);
return prefix + "Community cards dealt: " + std::to_string(event.cards_size()) +
" cards";
}
if (type_url.find("PotAwarded") != std::string::npos) {
examples::PotAwarded event;
event_any.UnpackTo(&event);
int64_t total = 0;
for (const auto& winner : event.winners()) {
total += winner.amount();
}
return prefix + "Pot awarded: " + std::to_string(total);
}
if (type_url.find("HandComplete") != std::string::npos) {
examples::HandComplete event;
event_any.UnpackTo(&event);
return prefix + "Hand complete";
}
// Return empty for unknown event types
return "";
}
void write_line(const std::string& text) {
if (log_file_.is_open()) {
log_file_ << text << std::endl;
log_file_.flush();
}
std::cout << text << std::endl;
}
std::string log_path_;
std::ofstream log_file_;
bool show_timestamps_;
};
Projector class with decorator-based handler registration:
- Python
- C#
- Rust
- Java
- Go
- C++
class OutputProjector(Projector):
"""Output projector using OO-style decorators with multi-domain support."""
name = "output"
@handles(player.PlayerRegistered, input_domain="player")
def project_registered(self, event: player.PlayerRegistered) -> types.Projection:
write_log(f"PLAYER registered: {event.display_name} ({event.email})")
return types.Projection(projector=self.name)
@handles(player.FundsDeposited, input_domain="player")
def project_deposited(self, event: player.FundsDeposited) -> types.Projection:
amount = event.amount.amount if event.HasField("amount") else 0
new_balance = event.new_balance.amount if event.HasField("new_balance") else 0
write_log(f"PLAYER deposited {amount}, balance: {new_balance}")
return types.Projection(projector=self.name)
@handles(table.TableCreated, input_domain="table")
def project_table_created(self, event: table.TableCreated) -> types.Projection:
write_log(f"TABLE created: {event.table_name} ({event.game_variant})")
return types.Projection(projector=self.name)
@handles(table.PlayerJoined, input_domain="table")
def project_player_joined(self, event: table.PlayerJoined) -> types.Projection:
player_id = truncate_id(event.player_root)
write_log(f"TABLE player {player_id} joined with {event.stack} chips")
return types.Projection(projector=self.name)
@handles(table.HandStarted, input_domain="table")
def project_hand_started(self, event: table.HandStarted) -> types.Projection:
write_log(
f"TABLE hand #{event.hand_number} started, "
f"{len(event.active_players)} players, dealer at position {event.dealer_position}"
)
return types.Projection(projector=self.name)
@handles(hand.CardsDealt, input_domain="hand")
def project_cards_dealt(self, event: hand.CardsDealt) -> types.Projection:
write_log(f"HAND cards dealt to {len(event.player_cards)} players")
return types.Projection(projector=self.name)
@handles(hand.BlindPosted, input_domain="hand")
def project_blind_posted(self, event: hand.BlindPosted) -> types.Projection:
player_id = truncate_id(event.player_root)
write_log(
f"HAND player {player_id} posted {event.blind_type} blind: {event.amount}"
)
return types.Projection(projector=self.name)
@handles(hand.ActionTaken, input_domain="hand")
def project_action_taken(self, event: hand.ActionTaken) -> types.Projection:
player_id = truncate_id(event.player_root)
write_log(f"HAND player {player_id}: {event.action} {event.amount}")
return types.Projection(projector=self.name)
@handles(hand.PotAwarded, input_domain="hand")
def project_pot_awarded(self, event: hand.PotAwarded) -> types.Projection:
winners = [
f"{truncate_id(w.player_root)} wins {w.amount}" for w in event.winners
]
write_log(f"HAND pot awarded: {', '.join(winners)}")
return types.Projection(projector=self.name)
@handles(hand.HandComplete, input_domain="hand")
def project_hand_complete(self, event: hand.HandComplete) -> types.Projection:
write_log(f"HAND #{event.hand_number} complete")
return types.Projection(projector=self.name)
/// <summary>
/// Projector: Output (OO Pattern)
///
/// Subscribes to player, table, and hand domain events.
/// Writes formatted game logs to a file.
///
/// This is the OO-style implementation using Projector base class with
/// [Projects(typeof(EventType))] annotated methods.
/// </summary>
public class OutputProjector : Projector
{
private static readonly string LogFile =
Environment.GetEnvironmentVariable("HAND_LOG_FILE") ?? "hand_log_oo.txt";
private static StreamWriter? _logWriter;
public override string Name => "output";
public override IReadOnlyList<string> InputDomains => new[] { "player", "table", "hand" };
private static StreamWriter GetLogWriter()
{
if (_logWriter == null)
{
_logWriter = new StreamWriter(LogFile, append: true);
}
return _logWriter;
}
private void WriteLog(string msg)
{
var writer = GetLogWriter();
var timestamp = DateTime.UtcNow.ToString("yyyy-MM-ddTHH:mm:ss.fffZ");
writer.WriteLine($"[{timestamp}] {msg}");
writer.Flush();
}
private static string TruncateId(ByteString playerRoot)
{
var bytes = playerRoot.ToByteArray();
if (bytes.Length >= 4)
{
return $"{bytes[0]:x2}{bytes[1]:x2}{bytes[2]:x2}{bytes[3]:x2}";
}
return Convert.ToHexString(bytes).ToLowerInvariant();
}
[Projects(typeof(PlayerRegistered))]
public Projection ProjectRegistered(PlayerRegistered evt)
{
WriteLog($"PLAYER registered: {evt.DisplayName} ({evt.Email})");
return new Projection { Projector = Name };
}
[Projects(typeof(FundsDeposited))]
public Projection ProjectDeposited(FundsDeposited evt)
{
var amount = evt.Amount?.Amount ?? 0;
var newBalance = evt.NewBalance?.Amount ?? 0;
WriteLog($"PLAYER deposited {amount}, balance: {newBalance}");
return new Projection { Projector = Name };
}
[Projects(typeof(TableCreated))]
public Projection ProjectTableCreated(TableCreated evt)
{
WriteLog($"TABLE created: {evt.TableName} ({evt.GameVariant})");
return new Projection { Projector = Name };
}
[Projects(typeof(PlayerJoined))]
public Projection ProjectPlayerJoined(PlayerJoined evt)
{
var playerId = TruncateId(evt.PlayerRoot);
WriteLog($"TABLE player {playerId} joined with {evt.Stack} chips");
return new Projection { Projector = Name };
}
[Projects(typeof(HandStarted))]
public Projection ProjectHandStarted(HandStarted evt)
{
WriteLog(
$"TABLE hand #{evt.HandNumber} started, {evt.ActivePlayers.Count} players, dealer at position {evt.DealerPosition}"
);
return new Projection { Projector = Name };
}
[Projects(typeof(CardsDealt))]
public Projection ProjectCardsDealt(CardsDealt evt)
{
WriteLog($"HAND cards dealt to {evt.PlayerCards.Count} players");
return new Projection { Projector = Name };
}
[Projects(typeof(BlindPosted))]
public Projection ProjectBlindPosted(BlindPosted evt)
{
var playerId = TruncateId(evt.PlayerRoot);
WriteLog($"HAND player {playerId} posted {evt.BlindType} blind: {evt.Amount}");
return new Projection { Projector = Name };
}
[Projects(typeof(ActionTaken))]
public Projection ProjectActionTaken(ActionTaken evt)
{
var playerId = TruncateId(evt.PlayerRoot);
WriteLog($"HAND player {playerId}: {evt.Action} {evt.Amount}");
return new Projection { Projector = Name };
}
[Projects(typeof(PotAwarded))]
public Projection ProjectPotAwarded(PotAwarded evt)
{
var winners = string.Join(
", ",
evt.Winners.Select(w => $"{TruncateId(w.PlayerRoot)} wins {w.Amount}")
);
WriteLog($"HAND pot awarded: {winners}");
return new Projection { Projector = Name };
}
[Projects(typeof(HandComplete))]
public Projection ProjectHandComplete(HandComplete evt)
{
WriteLog($"HAND #{evt.HandNumber} complete");
return new Projection { Projector = Name };
}
}
/// Output projector using OO-style decorators with multi-domain support.
pub struct OutputProjector;
#[projector(name = "output", inputs = ["player", "table", "hand"])]
impl OutputProjector {
#[handles(PlayerRegistered, domain = "player")]
fn project_registered(&self, event: &PlayerRegistered) -> Projection {
write_log(&format!(
"PLAYER registered: {} ({})",
event.display_name, event.email
));
Projection::default()
}
#[handles(FundsDeposited, domain = "player")]
fn project_deposited(&self, event: &FundsDeposited) -> Projection {
let amount = event.amount.as_ref().map(|m| m.amount).unwrap_or(0);
let balance = event.new_balance.as_ref().map(|m| m.amount).unwrap_or(0);
write_log(&format!("PLAYER deposited {}, balance: {}", amount, balance));
Projection::default()
}
#[handles(TableCreated, domain = "table")]
fn project_table_created(&self, event: &TableCreated) -> Projection {
write_log(&format!(
"TABLE created: {} (variant {})",
event.table_name, event.game_variant
));
Projection::default()
}
#[handles(PlayerJoined, domain = "table")]
fn project_player_joined(&self, event: &PlayerJoined) -> Projection {
let player_id = truncate_id(&event.player_root);
write_log(&format!(
"TABLE player {} joined with {} chips",
player_id, event.stack
));
Projection::default()
}
#[handles(HandStarted, domain = "table")]
fn project_hand_started(&self, event: &HandStarted) -> Projection {
write_log(&format!(
"TABLE hand #{} started, {} players, dealer at position {}",
event.hand_number,
event.active_players.len(),
event.dealer_position
));
Projection::default()
}
#[handles(CardsDealt, domain = "hand")]
fn project_cards_dealt(&self, event: &CardsDealt) -> Projection {
write_log(&format!(
"HAND cards dealt to {} players",
event.player_cards.len()
));
Projection::default()
}
#[handles(BlindPosted, domain = "hand")]
fn project_blind_posted(&self, event: &BlindPosted) -> Projection {
let player_id = truncate_id(&event.player_root);
write_log(&format!(
"HAND player {} posted {:?} blind: {}",
player_id, event.blind_type, event.amount
));
Projection::default()
}
#[handles(ActionTaken, domain = "hand")]
fn project_action_taken(&self, event: &ActionTaken) -> Projection {
let player_id = truncate_id(&event.player_root);
write_log(&format!(
"HAND player {}: {:?} {}",
player_id, event.action, event.amount
));
Projection::default()
}
#[handles(PotAwarded, domain = "hand")]
fn project_pot_awarded(&self, event: &PotAwarded) -> Projection {
let winners: Vec<String> = event
.winners
.iter()
.map(|w| format!("{} wins {}", truncate_id(&w.player_root), w.amount))
.collect();
write_log(&format!("HAND pot awarded: {}", winners.join(", ")));
Projection::default()
}
#[handles(HandComplete, domain = "hand")]
fn project_hand_complete(&self, event: &HandComplete) -> Projection {
write_log(&format!("HAND #{} complete", event.hand_number));
Projection::default()
}
}
public class OutputProjector extends Projector {
private static final String LOG_FILE = System.getenv().getOrDefault(
"HAND_LOG_FILE", "hand_log_oo.txt"
);
private static PrintWriter logWriter;
public OutputProjector() {
super("output", List.of("player", "table", "hand"));
}
private static synchronized PrintWriter getLogWriter() {
if (logWriter == null) {
try {
logWriter = new PrintWriter(
new BufferedWriter(new FileWriter(LOG_FILE, true))
);
} catch (IOException e) {
System.err.println("Failed to open log file: " + e.getMessage());
}
}
return logWriter;
}
private void writeLog(String msg) {
PrintWriter writer = getLogWriter();
if (writer != null) {
String timestamp = DateTimeFormatter.ISO_INSTANT.format(Instant.now());
writer.println("[" + timestamp + "] " + msg);
writer.flush();
}
}
private String truncateId(com.google.protobuf.ByteString playerRoot) {
byte[] bytes = playerRoot.toByteArray();
if (bytes.length >= 4) {
return String.format("%02x%02x%02x%02x",
bytes[0] & 0xFF, bytes[1] & 0xFF, bytes[2] & 0xFF, bytes[3] & 0xFF);
}
StringBuilder sb = new StringBuilder();
for (byte b : bytes) {
sb.append(String.format("%02x", b & 0xFF));
}
return sb.toString();
}
@Projects(PlayerRegistered.class)
public Projection projectRegistered(PlayerRegistered event) {
writeLog(String.format("PLAYER registered: %s (%s)",
event.getDisplayName(), event.getEmail()));
return Projection.upsert("log", "registered");
}
@Projects(FundsDeposited.class)
public Projection projectDeposited(FundsDeposited event) {
long amount = event.hasAmount() ? event.getAmount().getAmount() : 0;
long newBalance = event.hasNewBalance() ? event.getNewBalance().getAmount() : 0;
writeLog(String.format("PLAYER deposited %d, balance: %d", amount, newBalance));
return Projection.upsert("log", "deposited");
}
@Projects(TableCreated.class)
public Projection projectTableCreated(TableCreated event) {
writeLog(String.format("TABLE created: %s (%s)",
event.getTableName(), event.getGameVariant()));
return Projection.upsert("log", "table_created");
}
@Projects(PlayerJoined.class)
public Projection projectPlayerJoined(PlayerJoined event) {
String playerId = truncateId(event.getPlayerRoot());
writeLog(String.format("TABLE player %s joined with %d chips",
playerId, event.getStack()));
return Projection.upsert("log", "player_joined");
}
@Projects(HandStarted.class)
public Projection projectHandStarted(HandStarted event) {
writeLog(String.format("TABLE hand #%d started, %d players, dealer at position %d",
event.getHandNumber(), event.getActivePlayersCount(), event.getDealerPosition()));
return Projection.upsert("log", "hand_started");
}
@Projects(CardsDealt.class)
public Projection projectCardsDealt(CardsDealt event) {
writeLog(String.format("HAND cards dealt to %d players",
event.getPlayerCardsCount()));
return Projection.upsert("log", "cards_dealt");
}
@Projects(BlindPosted.class)
public Projection projectBlindPosted(BlindPosted event) {
String playerId = truncateId(event.getPlayerRoot());
writeLog(String.format("HAND player %s posted %s blind: %d",
playerId, event.getBlindType(), event.getAmount()));
return Projection.upsert("log", "blind_posted");
}
@Projects(ActionTaken.class)
public Projection projectActionTaken(ActionTaken event) {
String playerId = truncateId(event.getPlayerRoot());
writeLog(String.format("HAND player %s: %s %d",
playerId, event.getAction(), event.getAmount()));
return Projection.upsert("log", "action_taken");
}
@Projects(PotAwarded.class)
public Projection projectPotAwarded(PotAwarded event) {
String winners = event.getWinnersList().stream()
.map(w -> truncateId(w.getPlayerRoot()) + " wins " + w.getAmount())
.collect(Collectors.joining(", "));
writeLog(String.format("HAND pot awarded: %s", winners));
return Projection.upsert("log", "pot_awarded");
}
@Projects(HandComplete.class)
public Projection projectHandComplete(HandComplete event) {
writeLog(String.format("HAND #%d complete", event.getHandNumber()));
return Projection.upsert("log", "hand_complete");
}
}
// OutputProjector writes game events to a log file.
type OutputProjector struct {
angzarr.ProjectorBase
}
// NewOutputProjector creates a new OutputProjector with registered handlers.
func NewOutputProjector() *OutputProjector {
p := &OutputProjector{}
p.Init("output", []string{"player", "table", "hand"})
// Register projection handlers
p.Projects(p.projectRegistered)
p.Projects(p.projectDeposited)
p.Projects(p.projectTableCreated)
p.Projects(p.projectPlayerJoined)
p.Projects(p.projectHandStarted)
p.Projects(p.projectCardsDealt)
p.Projects(p.projectBlindPosted)
p.Projects(p.projectActionTaken)
p.Projects(p.projectPotAwarded)
p.Projects(p.projectHandComplete)
return p
}
func (p *OutputProjector) projectRegistered(event *examples.PlayerRegistered) *pb.Projection {
writeLog(fmt.Sprintf("PLAYER registered: %s (%s)", event.DisplayName, event.Email))
return nil
}
func (p *OutputProjector) projectDeposited(event *examples.FundsDeposited) *pb.Projection {
amount := int64(0)
newBalance := int64(0)
if event.Amount != nil {
amount = event.Amount.Amount
}
if event.NewBalance != nil {
newBalance = event.NewBalance.Amount
}
writeLog(fmt.Sprintf("PLAYER deposited %d, balance: %d", amount, newBalance))
return nil
}
func (p *OutputProjector) projectTableCreated(event *examples.TableCreated) *pb.Projection {
writeLog(fmt.Sprintf("TABLE created: %s (%s)", event.TableName, event.GameVariant.String()))
return nil
}
func (p *OutputProjector) projectPlayerJoined(event *examples.PlayerJoined) *pb.Projection {
playerID := angzarr.BytesToUUIDText(event.PlayerRoot)
writeLog(fmt.Sprintf("TABLE player %s joined with %d chips", playerID, event.Stack))
return nil
}
func (p *OutputProjector) projectHandStarted(event *examples.HandStarted) *pb.Projection {
writeLog(fmt.Sprintf("TABLE hand #%d started, %d players, dealer at position %d",
event.HandNumber, len(event.ActivePlayers), event.DealerPosition))
return nil
}
func (p *OutputProjector) projectCardsDealt(event *examples.CardsDealt) *pb.Projection {
writeLog(fmt.Sprintf("HAND cards dealt to %d players", len(event.PlayerCards)))
return nil
}
func (p *OutputProjector) projectBlindPosted(event *examples.BlindPosted) *pb.Projection {
playerID := angzarr.BytesToUUIDText(event.PlayerRoot)
writeLog(fmt.Sprintf("HAND player %s posted %s blind: %d", playerID, event.BlindType, event.Amount))
return nil
}
func (p *OutputProjector) projectActionTaken(event *examples.ActionTaken) *pb.Projection {
playerID := angzarr.BytesToUUIDText(event.PlayerRoot)
writeLog(fmt.Sprintf("HAND player %s: %s %d", playerID, event.Action.String(), event.Amount))
return nil
}
func (p *OutputProjector) projectPotAwarded(event *examples.PotAwarded) *pb.Projection {
winners := make([]string, len(event.Winners))
for i, w := range event.Winners {
winners[i] = fmt.Sprintf("%s wins %d", angzarr.BytesToUUIDText(w.PlayerRoot), w.Amount)
}
writeLog(fmt.Sprintf("HAND pot awarded: %v", winners))
return nil
}
func (p *OutputProjector) projectHandComplete(event *examples.HandComplete) *pb.Projection {
writeLog(fmt.Sprintf("HAND #%d complete", event.HandNumber))
return nil
}
/**
* Projector: Output (OO Pattern)
*
* This uses the Projector base class with ANGZARR_PROJECTS macros
* for handler registration.
*
* Note: C++ Projector base class currently supports single domain,
* so this example focuses on player domain. Multi-domain support
* would require extending the base class.
*/
class OutputProjector : public angzarr::Projector {
public:
ANGZARR_PROJECTOR("output", "player")
ANGZARR_PROJECTS(PlayerRegistered)
(const examples::PlayerRegistered& event) {
write_log("PLAYER registered: " + event.display_name() + " (" + event.email() + ")");
return angzarr::Projection::upsert("log", "registered");
}
ANGZARR_PROJECTS(FundsDeposited)
(const examples::FundsDeposited& event) {
int64_t amount = event.has_amount() ? event.amount().amount() : 0;
int64_t balance = event.has_new_balance() ? event.new_balance().amount() : 0;
write_log("PLAYER deposited " + std::to_string(amount) +
", balance: " + std::to_string(balance));
return angzarr::Projection::upsert("log", "deposited");
}
ANGZARR_PROJECTS(FundsWithdrawn)
(const examples::FundsWithdrawn& event) {
int64_t amount = event.has_amount() ? event.amount().amount() : 0;
write_log("PLAYER withdrew " + std::to_string(amount));
return angzarr::Projection::upsert("log", "withdrawn");
}
};
Both patterns produce identical behavior—choose based on team preference. The functional StateRouter is more explicit; the OO approach integrates state and handlers in one class.
Multi-Domain Projectors
Projectors can subscribe to events from multiple domains, but should not unless absolutely required:
# Avoid this pattern when possible
router = StateRouter("prj-output")
.subscribes("player", ["PlayerRegistered", "FundsDeposited"])
.subscribes("table", ["PlayerJoined", "HandStarted"])
.subscribes("hand", ["CardsDealt", "ActionTaken"])
If you need multi-domain subscription, check your domain boundaries first. Needing to join events across domains often indicates the domains are incorrectly partitioned. Consider whether those events belong in the same domain.
When multi-domain is truly necessary (e.g., a cross-cutting analytics view), it's technically safe because projectors only observe—but prefer single-domain projectors where possible for simpler reasoning and deployment.
Synchronous vs Asynchronous
| Mode | Use Case | Behavior |
|---|---|---|
| Async (default) | Analytics, search indexing | Fire-and-forget |
| Sync | Read-after-write | Command waits for projector |
Synchronous projections enable CQRS patterns where commands must return updated read models.
Position Tracking
Projectors track their position in the event stream to enable:
- Catch-up: Resume from last processed event after restart
- Replay: Rebuild read models from scratch
The framework manages position tracking automatically.
Framework Projectors
Angzarr provides built-in projectors for common operational needs:
| Projector | Purpose | Output |
|---|---|---|
| LogService | Debug logging | Console |
| EventService | Event storage | Database |
| OutboundService | Real-time streaming | gRPC |
| CloudEvents | External publishing | HTTP/Kafka |
See Framework Projectors for details.
Next Steps
- CloudEvents — Publish events to external systems
- Framework Projectors — Built-in operational projectors
- Process Managers — Stateful multi-domain coordination
- Testing — Testing projectors with Gherkin