Projector Examples
Read-side projectors from the poker domain. All code is from the actual examples/ directory.
Projectors subscribe to events and produce read-optimized views: text logs, database tables, search indexes, or external API calls.
Implementation Styles
Angzarr supports two projector implementation styles:
| Style | Description | Best For |
|---|---|---|
| OO (Object-Oriented) | Projector class with @projects/[Projects] decorators | Rich state, encapsulation |
| Functional | StateRouter or ProjectorHandler with function handlers | Simple projections, stateless transforms |
| Language | OO | Functional |
|---|---|---|
| Python | ✓ | ✓ |
| C# | ✓ | ✓ |
| Rust | ✓ | ✓ |
| Java | ✓ | ✓ |
| Go | ✓ | ✓ |
| C++ | — | ✓ |
Output Projector
The Output Projector subscribes to events from multiple domains (player, table, hand) and writes formatted game logs to a file. This demonstrates a multi-domain projector.
- Python OO
- Python Functional
- Go OO
- Go Functional
- Rust OO
- Rust Functional
- Java OO
- Java Functional
- C# OO
- C++
examples/python/prj-output-oo/main.py
class OutputProjector(Projector):
"""Output projector using OO-style decorators."""
name = "output"
input_domains = ["player", "table", "hand"]
@projects(player.PlayerRegistered)
def project_registered(self, event: player.PlayerRegistered) -> types.Projection:
write_log(f"PLAYER registered: {event.display_name} ({event.email})")
return types.Projection(projector=self.name)
@projects(player.FundsDeposited)
def project_deposited(self, event: player.FundsDeposited) -> types.Projection:
amount = event.amount.amount if event.HasField("amount") else 0
new_balance = event.new_balance.amount if event.HasField("new_balance") else 0
write_log(f"PLAYER deposited {amount}, balance: {new_balance}")
return types.Projection(projector=self.name)
@projects(table.TableCreated)
def project_table_created(self, event: table.TableCreated) -> types.Projection:
write_log(f"TABLE created: {event.table_name} ({event.game_variant})")
return types.Projection(projector=self.name)
@projects(table.PlayerJoined)
def project_player_joined(self, event: table.PlayerJoined) -> types.Projection:
player_id = truncate_id(event.player_root)
write_log(f"TABLE player {player_id} joined with {event.stack} chips")
return types.Projection(projector=self.name)
@projects(table.HandStarted)
def project_hand_started(self, event: table.HandStarted) -> types.Projection:
write_log(
f"TABLE hand #{event.hand_number} started, "
f"{len(event.active_players)} players, dealer at position {event.dealer_position}"
)
return types.Projection(projector=self.name)
@projects(hand.CardsDealt)
def project_cards_dealt(self, event: hand.CardsDealt) -> types.Projection:
write_log(f"HAND cards dealt to {len(event.player_cards)} players")
return types.Projection(projector=self.name)
@projects(hand.BlindPosted)
def project_blind_posted(self, event: hand.BlindPosted) -> types.Projection:
player_id = truncate_id(event.player_root)
write_log(f"HAND player {player_id} posted {event.blind_type} blind: {event.amount}")
return types.Projection(projector=self.name)
@projects(hand.ActionTaken)
def project_action_taken(self, event: hand.ActionTaken) -> types.Projection:
player_id = truncate_id(event.player_root)
write_log(f"HAND player {player_id}: {event.action} {event.amount}")
return types.Projection(projector=self.name)
@projects(hand.PotAwarded)
def project_pot_awarded(self, event: hand.PotAwarded) -> types.Projection:
winners = [
f"{truncate_id(w.player_root)} wins {w.amount}" for w in event.winners
]
write_log(f"HAND pot awarded: {', '.join(winners)}")
return types.Projection(projector=self.name)
@projects(hand.HandComplete)
def project_hand_complete(self, event: hand.HandComplete) -> types.Projection:
write_log(f"HAND #{event.hand_number} complete")
return types.Projection(projector=self.name)
examples/python/prj-output/projector.py
class OutputProjector:
"""
Projector that subscribes to events from all domains and outputs text.
This is a read-side component that:
1. Receives events via saga routing
2. Unpacks them from Any wrappers
3. Renders them as human-readable text
4. Outputs to configured destination (console, file, etc.)
"""
def __init__(
self,
output_fn: Callable[[str], None] = print,
show_timestamps: bool = False,
):
self.renderer = TextRenderer()
self.output_fn = output_fn
self.show_timestamps = show_timestamps
def set_player_name(self, player_root: bytes, name: str):
"""Set display name for a player."""
self.renderer.set_player_name(player_root, name)
def handle_event(self, event_page: types.EventPage) -> None:
"""Handle a single event page from any domain."""
event_any = event_page.event
type_url = event_any.type_url
# Extract event type from type_url
# Format: "type.poker/examples.EventName"
event_type = type_url.split(".")[-1] if "." in type_url else type_url
if event_type not in EVENT_TYPES:
self.output_fn(f"[Unknown event type: {type_url}]")
return
# Unpack the event
event_class = EVENT_TYPES[event_type]
event = event_class()
event_any.Unpack(event)
# Render and output
text = self.renderer.render(event_type, event)
if text:
if self.show_timestamps and event_page.created_at:
from datetime import datetime, timezone
ts = datetime.fromtimestamp(event_page.created_at.seconds, tz=timezone.utc)
text = f"[{ts.strftime('%H:%M:%S')}] {text}"
self.output_fn(text)
def handle_event_book(self, event_book: types.EventBook) -> None:
"""Handle all events in an event book."""
for page in event_book.pages:
self.handle_event(page)
def project_from_stream(self, event_stream) -> None:
"""
Project events from a stream (generator or async iterator).
This is the main entry point for saga-routed events.
"""
for event_book in event_stream:
self.handle_event_book(event_book)
examples/go/prj-output-oo/main.go
// OutputProjector writes game events to a log file.
type OutputProjector struct {
angzarr.ProjectorBase
}
// NewOutputProjector creates a new OutputProjector with registered handlers.
func NewOutputProjector() *OutputProjector {
p := &OutputProjector{}
p.Init("output", []string{"player", "table", "hand"})
// Register projection handlers
p.Projects("PlayerRegistered", p.projectRegistered)
p.Projects("FundsDeposited", p.projectDeposited)
p.Projects("TableCreated", p.projectTableCreated)
p.Projects("PlayerJoined", p.projectPlayerJoined)
p.Projects("HandStarted", p.projectHandStarted)
p.Projects("CardsDealt", p.projectCardsDealt)
p.Projects("BlindPosted", p.projectBlindPosted)
p.Projects("ActionTaken", p.projectActionTaken)
p.Projects("PotAwarded", p.projectPotAwarded)
p.Projects("HandComplete", p.projectHandComplete)
return p
}
func (p *OutputProjector) projectRegistered(event *examples.PlayerRegistered) *pb.Projection {
writeLog(fmt.Sprintf("PLAYER registered: %s (%s)", event.DisplayName, event.Email))
return nil
}
func (p *OutputProjector) projectDeposited(event *examples.FundsDeposited) *pb.Projection {
amount := int64(0)
newBalance := int64(0)
if event.Amount != nil {
amount = event.Amount.Amount
}
if event.NewBalance != nil {
newBalance = event.NewBalance.Amount
}
writeLog(fmt.Sprintf("PLAYER deposited %d, balance: %d", amount, newBalance))
return nil
}
func (p *OutputProjector) projectTableCreated(event *examples.TableCreated) *pb.Projection {
writeLog(fmt.Sprintf("TABLE created: %s (%s)", event.TableName, event.GameVariant.String()))
return nil
}
func (p *OutputProjector) projectPlayerJoined(event *examples.PlayerJoined) *pb.Projection {
playerID := angzarr.BytesToUUIDText(event.PlayerRoot)
writeLog(fmt.Sprintf("TABLE player %s joined with %d chips", playerID, event.Stack))
return nil
}
func (p *OutputProjector) projectHandStarted(event *examples.HandStarted) *pb.Projection {
writeLog(fmt.Sprintf("TABLE hand #%d started, %d players, dealer at position %d",
event.HandNumber, len(event.ActivePlayers), event.DealerPosition))
return nil
}
func (p *OutputProjector) projectCardsDealt(event *examples.CardsDealt) *pb.Projection {
writeLog(fmt.Sprintf("HAND cards dealt to %d players", len(event.PlayerCards)))
return nil
}
func (p *OutputProjector) projectBlindPosted(event *examples.BlindPosted) *pb.Projection {
playerID := angzarr.BytesToUUIDText(event.PlayerRoot)
writeLog(fmt.Sprintf("HAND player %s posted %s blind: %d", playerID, event.BlindType, event.Amount))
return nil
}
func (p *OutputProjector) projectActionTaken(event *examples.ActionTaken) *pb.Projection {
playerID := angzarr.BytesToUUIDText(event.PlayerRoot)
writeLog(fmt.Sprintf("HAND player %s: %s %d", playerID, event.Action.String(), event.Amount))
return nil
}
func (p *OutputProjector) projectPotAwarded(event *examples.PotAwarded) *pb.Projection {
winners := make([]string, len(event.Winners))
for i, w := range event.Winners {
winners[i] = fmt.Sprintf("%s wins %d", angzarr.BytesToUUIDText(w.PlayerRoot), w.Amount)
}
writeLog(fmt.Sprintf("HAND pot awarded: %v", winners))
return nil
}
func (p *OutputProjector) projectHandComplete(event *examples.HandComplete) *pb.Projection {
writeLog(fmt.Sprintf("HAND #%d complete", event.HandNumber))
return nil
}
examples/go/prj-output/main.go
func handleEvents(events *pb.EventBook) (*pb.Projection, error) {
if events == nil || events.Cover == nil {
return &pb.Projection{}, nil
}
domain := events.Cover.Domain
rootID := angzarr.RootIDText(events)
var seq uint32
for _, page := range events.Pages {
if page.Event == nil {
continue
}
seq = getSequence(page)
typeURL := page.Event.TypeUrl
typeName := typeURL[strings.LastIndex(typeURL, ".")+1:]
msg := formatEvent(domain, rootID, typeName, page.Event.Value)
writeLog(msg)
}
return &pb.Projection{
Cover: events.Cover,
Projector: "output",
Sequence: seq,
}, nil
}
examples/rust/prj-output-oo/src/main.rs
/// Output projector using OO-style annotations.
pub struct OutputProjector;
#[projector(name = "output")]
impl OutputProjector {
#[projects(PlayerRegistered)]
fn project_registered(&self, event: PlayerRegistered) -> Projection {
write_log(&format!(
"PLAYER registered: {} ({})",
event.display_name, event.email
));
Projection {
projector: "output".to_string(),
..Default::default()
}
}
#[projects(FundsDeposited)]
fn project_deposited(&self, event: FundsDeposited) -> Projection {
let amount = event.amount.as_ref().map(|a| a.amount).unwrap_or(0);
let new_balance = event.new_balance.as_ref().map(|b| b.amount).unwrap_or(0);
write_log(&format!(
"PLAYER deposited {}, balance: {}",
amount, new_balance
));
Projection {
projector: "output".to_string(),
..Default::default()
}
}
#[projects(TableCreated)]
fn project_table_created(&self, event: TableCreated) -> Projection {
write_log(&format!(
"TABLE created: {} ({:?})",
event.table_name, event.game_variant
));
Projection {
projector: "output".to_string(),
..Default::default()
}
}
#[projects(PlayerJoined)]
fn project_player_joined(&self, event: PlayerJoined) -> Projection {
let player_id = truncate_id(&event.player_root);
write_log(&format!(
"TABLE player {} joined with {} chips",
player_id, event.stack
));
Projection {
projector: "output".to_string(),
..Default::default()
}
}
#[projects(HandStarted)]
fn project_hand_started(&self, event: HandStarted) -> Projection {
write_log(&format!(
"TABLE hand #{} started, {} players, dealer at position {}",
event.hand_number,
event.active_players.len(),
event.dealer_position
));
Projection {
projector: "output".to_string(),
..Default::default()
}
}
#[projects(CardsDealt)]
fn project_cards_dealt(&self, event: CardsDealt) -> Projection {
write_log(&format!(
"HAND cards dealt to {} players",
event.player_cards.len()
));
Projection {
projector: "output".to_string(),
..Default::default()
}
}
#[projects(BlindPosted)]
fn project_blind_posted(&self, event: BlindPosted) -> Projection {
let player_id = truncate_id(&event.player_root);
write_log(&format!(
"HAND player {} posted {} blind: {}",
player_id, event.blind_type, event.amount
));
Projection {
projector: "output".to_string(),
..Default::default()
}
}
#[projects(ActionTaken)]
fn project_action_taken(&self, event: ActionTaken) -> Projection {
let player_id = truncate_id(&event.player_root);
write_log(&format!(
"HAND player {}: {:?} {}",
player_id, event.action, event.amount
));
Projection {
projector: "output".to_string(),
..Default::default()
}
}
#[projects(PotAwarded)]
fn project_pot_awarded(&self, event: PotAwarded) -> Projection {
let winners: Vec<String> = event
.winners
.iter()
.map(|w| format!("{} wins {}", truncate_id(&w.player_root), w.amount))
.collect();
write_log(&format!("HAND pot awarded: {}", winners.join(", ")));
Projection {
projector: "output".to_string(),
..Default::default()
}
}
#[projects(HandComplete)]
fn project_hand_complete(&self, event: HandComplete) -> Projection {
write_log(&format!("HAND #{} complete", event.hand_number));
Projection {
projector: "output".to_string(),
..Default::default()
}
}
}
examples/rust/prj-output/src/main.rs
fn handle_events(events: &EventBook) -> Result<Projection, Status> {
let cover = events.cover.as_ref();
let domain = cover.map(|c| c.domain.as_str()).unwrap_or("");
let root_id = cover
.and_then(|c| c.root.as_ref())
.map(|u| {
if u.value.len() >= 4 {
hex::encode(&u.value[..4])
} else {
hex::encode(&u.value)
}
})
.unwrap_or_default();
let mut seq = 0u32;
for page in &events.pages {
let event_any = match &page.event {
Some(e) => e,
None => continue,
};
seq = get_sequence(page);
let type_url = &event_any.type_url;
let type_name = type_url
.rsplit('.')
.next()
.unwrap_or(type_url);
let msg = format_event(domain, &root_id, type_name, &event_any.value);
write_log(&msg);
}
Ok(Projection {
cover: cover.cloned(),
projector: "output".to_string(),
sequence: seq,
projection: None,
})
}
examples/java/prj-output-oo/src/main/java/dev/angzarr/examples/prjoutputoo/OutputProjector.java
public class OutputProjector extends Projector {
private static final String LOG_FILE = System.getenv().getOrDefault(
"HAND_LOG_FILE", "hand_log_oo.txt"
);
private static PrintWriter logWriter;
public OutputProjector() {
super("output", List.of("player", "table", "hand"));
}
private static synchronized PrintWriter getLogWriter() {
if (logWriter == null) {
try {
logWriter = new PrintWriter(
new BufferedWriter(new FileWriter(LOG_FILE, true))
);
} catch (IOException e) {
System.err.println("Failed to open log file: " + e.getMessage());
}
}
return logWriter;
}
private void writeLog(String msg) {
PrintWriter writer = getLogWriter();
if (writer != null) {
String timestamp = DateTimeFormatter.ISO_INSTANT.format(Instant.now());
writer.println("[" + timestamp + "] " + msg);
writer.flush();
}
}
private String truncateId(com.google.protobuf.ByteString playerRoot) {
byte[] bytes = playerRoot.toByteArray();
if (bytes.length >= 4) {
return String.format("%02x%02x%02x%02x",
bytes[0] & 0xFF, bytes[1] & 0xFF, bytes[2] & 0xFF, bytes[3] & 0xFF);
}
StringBuilder sb = new StringBuilder();
for (byte b : bytes) {
sb.append(String.format("%02x", b & 0xFF));
}
return sb.toString();
}
@Projects(PlayerRegistered.class)
public Projection projectRegistered(PlayerRegistered event) {
writeLog(String.format("PLAYER registered: %s (%s)",
event.getDisplayName(), event.getEmail()));
return Projection.upsert("log", "registered");
}
@Projects(FundsDeposited.class)
public Projection projectDeposited(FundsDeposited event) {
long amount = event.hasAmount() ? event.getAmount().getAmount() : 0;
long newBalance = event.hasNewBalance() ? event.getNewBalance().getAmount() : 0;
writeLog(String.format("PLAYER deposited %d, balance: %d", amount, newBalance));
return Projection.upsert("log", "deposited");
}
@Projects(TableCreated.class)
public Projection projectTableCreated(TableCreated event) {
writeLog(String.format("TABLE created: %s (%s)",
event.getTableName(), event.getGameVariant()));
return Projection.upsert("log", "table_created");
}
@Projects(PlayerJoined.class)
public Projection projectPlayerJoined(PlayerJoined event) {
String playerId = truncateId(event.getPlayerRoot());
writeLog(String.format("TABLE player %s joined with %d chips",
playerId, event.getStack()));
return Projection.upsert("log", "player_joined");
}
@Projects(HandStarted.class)
public Projection projectHandStarted(HandStarted event) {
writeLog(String.format("TABLE hand #%d started, %d players, dealer at position %d",
event.getHandNumber(), event.getActivePlayersCount(), event.getDealerPosition()));
return Projection.upsert("log", "hand_started");
}
@Projects(CardsDealt.class)
public Projection projectCardsDealt(CardsDealt event) {
writeLog(String.format("HAND cards dealt to %d players",
event.getPlayerCardsCount()));
return Projection.upsert("log", "cards_dealt");
}
@Projects(BlindPosted.class)
public Projection projectBlindPosted(BlindPosted event) {
String playerId = truncateId(event.getPlayerRoot());
writeLog(String.format("HAND player %s posted %s blind: %d",
playerId, event.getBlindType(), event.getAmount()));
return Projection.upsert("log", "blind_posted");
}
@Projects(ActionTaken.class)
public Projection projectActionTaken(ActionTaken event) {
String playerId = truncateId(event.getPlayerRoot());
writeLog(String.format("HAND player %s: %s %d",
playerId, event.getAction(), event.getAmount()));
return Projection.upsert("log", "action_taken");
}
@Projects(PotAwarded.class)
public Projection projectPotAwarded(PotAwarded event) {
String winners = event.getWinnersList().stream()
.map(w -> truncateId(w.getPlayerRoot()) + " wins " + w.getAmount())
.collect(Collectors.joining(", "));
writeLog(String.format("HAND pot awarded: %s", winners));
return Projection.upsert("log", "pot_awarded");
}
@Projects(HandComplete.class)
public Projection projectHandComplete(HandComplete event) {
writeLog(String.format("HAND #%d complete", event.getHandNumber()));
return Projection.upsert("log", "hand_complete");
}
}
examples/java/prj-output/src/main/java/dev/angzarr/examples/prjoutput/OutputProjector.java
public class OutputProjector {
private static final Map<String, Class<?>> EVENT_TYPES = new HashMap<>();
static {
// Player events
EVENT_TYPES.put("PlayerRegistered", PlayerRegistered.class);
EVENT_TYPES.put("FundsDeposited", FundsDeposited.class);
EVENT_TYPES.put("FundsWithdrawn", FundsWithdrawn.class);
EVENT_TYPES.put("FundsReserved", FundsReserved.class);
EVENT_TYPES.put("FundsReleased", FundsReleased.class);
// Table events
EVENT_TYPES.put("TableCreated", TableCreated.class);
EVENT_TYPES.put("PlayerJoined", PlayerJoined.class);
EVENT_TYPES.put("PlayerLeft", PlayerLeft.class);
EVENT_TYPES.put("HandStarted", HandStarted.class);
EVENT_TYPES.put("HandEnded", HandEnded.class);
// Hand events
EVENT_TYPES.put("CardsDealt", CardsDealt.class);
EVENT_TYPES.put("BlindPosted", BlindPosted.class);
EVENT_TYPES.put("ActionTaken", ActionTaken.class);
EVENT_TYPES.put("CommunityCardsDealt", CommunityCardsDealt.class);
EVENT_TYPES.put("PotAwarded", PotAwarded.class);
EVENT_TYPES.put("HandComplete", HandComplete.class);
}
private final TextRenderer renderer;
private final Consumer<String> outputFn;
private final boolean showTimestamps;
private final DateTimeFormatter timeFormatter;
public OutputProjector(Consumer<String> outputFn, boolean showTimestamps) {
this.renderer = new TextRenderer();
this.outputFn = outputFn;
this.showTimestamps = showTimestamps;
this.timeFormatter = DateTimeFormatter.ofPattern("HH:mm:ss").withZone(ZoneOffset.UTC);
}
/**
* Get list of domains this projector subscribes to.
*/
public List<String> getInputDomains() {
return List.of("player", "table", "hand");
}
/**
* Set display name for a player root.
*/
public void setPlayerName(byte[] playerRoot, String name) {
renderer.setPlayerName(playerRoot, name);
}
/**
* Handle an event book and return a projection.
*/
public Projection handle(EventBook eventBook) {
handleEventBook(eventBook);
// Return a projection with the sequence number
int seq = 0;
if (eventBook.getPagesCount() > 0) {
EventPage lastPage = eventBook.getPages(eventBook.getPagesCount() - 1);
seq = lastPage.getSequence();
}
return Projection.newBuilder()
.setCover(eventBook.getCover())
.setProjector("output")
.setSequence(seq)
.build();
}
/**
* Handle all events in an event book.
*/
public void handleEventBook(EventBook eventBook) {
for (EventPage page : eventBook.getPagesList()) {
handleEvent(page);
}
}
/**
* Handle a single event page from any domain.
*/
public void handleEvent(EventPage eventPage) {
Any eventAny = eventPage.getEvent();
String typeUrl = eventAny.getTypeUrl();
// Extract event type from type_url
String eventType = extractEventType(typeUrl);
if (!EVENT_TYPES.containsKey(eventType)) {
outputFn.accept("[Unknown event type: " + typeUrl + "]");
return;
}
try {
// Unpack the event
@SuppressWarnings("unchecked")
Class<? extends com.google.protobuf.Message> eventClass =
(Class<? extends com.google.protobuf.Message>) EVENT_TYPES.get(eventType);
Object event = eventAny.unpack(eventClass);
// Render and output
String text = renderer.render(eventType, event);
if (text != null && !text.isEmpty()) {
if (showTimestamps && eventPage.hasCreatedAt()) {
Instant ts = Instant.ofEpochSecond(
eventPage.getCreatedAt().getSeconds(),
eventPage.getCreatedAt().getNanos()
);
text = "[" + timeFormatter.format(ts) + "] " + text;
}
outputFn.accept(text);
}
} catch (InvalidProtocolBufferException e) {
outputFn.accept("[Failed to unpack: " + typeUrl + "]");
}
}
private String extractEventType(String typeUrl) {
int dotIndex = typeUrl.lastIndexOf('.');
if (dotIndex >= 0) {
return typeUrl.substring(dotIndex + 1);
}
return typeUrl;
}
/**
* Create a file-based projector.
*/
public static OutputProjector forFile(String path, boolean showTimestamps) throws IOException {
PrintWriter writer = new PrintWriter(new FileWriter(path, true));
return new OutputProjector(line -> {
writer.println(line);
writer.flush();
}, showTimestamps);
}
/**
* Create a console-based projector.
*/
public static OutputProjector forConsole(boolean showTimestamps) {
return new OutputProjector(System.out::println, showTimestamps);
}
}
examples/csharp/Prj/Output/OutputProjector.cs
public class OutputProjector
{
private readonly Dictionary<string, string> _playerNames = new();
[Projects(typeof(PlayerRegistered))]
public void HandlePlayerRegistered(PlayerRegistered @event)
{
_playerNames[@event.PlayerId] = @event.DisplayName;
Console.WriteLine($"[Player] {@event.DisplayName} registered");
}
[Projects(typeof(FundsDeposited))]
public void HandleFundsDeposited(FundsDeposited @event)
{
var name = _playerNames.GetValueOrDefault(@event.PlayerId, @event.PlayerId);
var amount = @event.Amount?.Amount ?? 0;
Console.WriteLine($"[Player] {name} deposited ${amount / 100.0:F2}");
}
[Projects(typeof(CardsDealt))]
public void HandleCardsDealt(CardsDealt @event)
{
foreach (var player in @event.PlayerCards)
{
var name = _playerNames.GetValueOrDefault(player.PlayerId, player.PlayerId);
var cards = FormatCards(player.HoleCards);
Console.WriteLine($"[Hand] {name} dealt {cards}");
}
}
private static string FormatCards(IEnumerable<Card> cards) =>
string.Join(" ", cards.Select(c => $"{c.Rank}{c.Suit}"));
}
examples/cpp/prj-output/src/main.cpp
/// gRPC service implementation for output projector.
class OutputProjectorService final : public angzarr::ProjectorService::Service {
public:
explicit OutputProjectorService(const std::string& log_path, bool show_timestamps = true)
: log_path_(log_path)
, log_file_(log_path, std::ios::app)
, show_timestamps_(show_timestamps) {}
~OutputProjectorService() {
if (log_file_.is_open()) {
log_file_.close();
}
}
grpc::Status Handle(
grpc::ServerContext* context,
const angzarr::EventBook* request,
angzarr::Projection* response) override {
return process_event_book(*request, response);
}
grpc::Status HandleSpeculative(
grpc::ServerContext* context,
const angzarr::EventBook* request,
angzarr::Projection* response) override {
// Speculative mode - don't write to file
uint32_t seq = 0;
for (const auto& page : request->pages()) {
seq = page.sequence();
}
response->mutable_cover()->CopyFrom(request->cover());
response->set_projector(PROJECTOR_NAME);
response->set_sequence(seq);
return grpc::Status::OK;
}
private:
grpc::Status process_event_book(const angzarr::EventBook& event_book,
angzarr::Projection* response) {
uint32_t seq = 0;
for (const auto& page : event_book.pages()) {
const auto& event_any = page.event();
seq = page.sequence();
// Format and write event
std::string formatted = format_event(event_any, event_book.cover().domain());
if (!formatted.empty()) {
write_line(formatted);
}
}
response->mutable_cover()->CopyFrom(event_book.cover());
response->set_projector(PROJECTOR_NAME);
response->set_sequence(seq);
return grpc::Status::OK;
}
std::string format_event(const google::protobuf::Any& event_any, const std::string& domain) {
std::string prefix;
if (show_timestamps_) {
auto now = std::chrono::system_clock::now();
auto time = std::chrono::system_clock::to_time_t(now);
std::ostringstream ss;
ss << std::put_time(std::localtime(&time), "%H:%M:%S");
prefix = "[" + ss.str() + "] ";
}
const std::string& type_url = event_any.type_url();
// Player events
if (type_url.find("PlayerRegistered") != std::string::npos) {
examples::PlayerRegistered event;
event_any.UnpackTo(&event);
return prefix + "Player registered: " + event.display_name();
}
if (type_url.find("FundsDeposited") != std::string::npos) {
examples::FundsDeposited event;
event_any.UnpackTo(&event);
return prefix + "Funds deposited: " + std::to_string(event.new_balance().amount());
}
// Table events
if (type_url.find("TableCreated") != std::string::npos) {
examples::TableCreated event;
event_any.UnpackTo(&event);
return prefix + "Table created: " + event.table_name();
}
if (type_url.find("PlayerJoined") != std::string::npos) {
examples::PlayerJoined event;
event_any.UnpackTo(&event);
return prefix + "Player joined at position " + std::to_string(event.seat_position());
}
if (type_url.find("HandStarted") != std::string::npos) {
examples::HandStarted event;
event_any.UnpackTo(&event);
return prefix + "Hand started: dealer position " + std::to_string(event.dealer_position());
}
// Hand events
if (type_url.find("CardsDealt") != std::string::npos) {
examples::CardsDealt event;
event_any.UnpackTo(&event);
return prefix + "Cards dealt to " + std::to_string(event.players_size()) + " players";
}
if (type_url.find("BlindPosted") != std::string::npos) {
examples::BlindPosted event;
event_any.UnpackTo(&event);
return prefix + "Blind posted: " + std::to_string(event.amount());
}
if (type_url.find("ActionTaken") != std::string::npos) {
examples::ActionTaken event;
event_any.UnpackTo(&event);
return prefix + "Action: " + examples::ActionType_Name(event.action());
}
if (type_url.find("CommunityCardsDealt") != std::string::npos) {
examples::CommunityCardsDealt event;
event_any.UnpackTo(&event);
return prefix + "Community cards dealt: " + std::to_string(event.cards_size()) + " cards";
}
if (type_url.find("PotAwarded") != std::string::npos) {
examples::PotAwarded event;
event_any.UnpackTo(&event);
int64_t total = 0;
for (const auto& winner : event.winners()) {
total += winner.amount();
}
return prefix + "Pot awarded: " + std::to_string(total);
}
if (type_url.find("HandComplete") != std::string::npos) {
examples::HandComplete event;
event_any.UnpackTo(&event);
return prefix + "Hand complete";
}
// Return empty for unknown event types
return "";
}
void write_line(const std::string& text) {
if (log_file_.is_open()) {
log_file_ << text << std::endl;
log_file_.flush();
}
std::cout << text << std::endl;
}
std::string log_path_;
std::ofstream log_file_;
bool show_timestamps_;
};
StateRouter Pattern
The StateRouter pattern provides fluent event handler registration with explicit state management. It's the functional alternative to OO projectors:
- Python
- Go
- Rust
- Java
- C#
- C++
examples/python/prj-output/output_projector_doc.py
player_names: Dict[str, str] = {}
def handle_player_registered(event: player.PlayerRegistered):
player_names[event.player_id] = event.display_name
print(f"[Player] {event.display_name} registered")
def handle_funds_deposited(event: player.FundsDeposited):
name = player_names.get(event.player_id, event.player_id)
print(f"[Player] {name} deposited ${event.amount.amount / 100:.2f}")
def handle_cards_dealt(event: hand.CardsDealt):
for pc in event.player_cards:
name = player_names.get(pc.player_id, pc.player_id)
print(f"[Hand] {name} dealt cards")
router = (
StateRouter("prj-output")
.subscribes("player", ["PlayerRegistered", "FundsDeposited"])
.subscribes("hand", ["CardsDealt", "ActionTaken", "PotAwarded"])
.on("PlayerRegistered", handle_player_registered)
.on("FundsDeposited", handle_funds_deposited)
.on("CardsDealt", handle_cards_dealt)
)
examples/go/prj-output/output_projector_doc.go
var playerNames = make(map[string]string)
func handlePlayerRegistered(event *examples.PlayerRegistered) {
playerNames[event.PlayerId] = event.DisplayName
fmt.Printf("[Player] %s registered\n", event.DisplayName)
}
func handleFundsDeposited(event *examples.FundsDeposited) {
name := playerNames[event.PlayerId]
if name == "" {
name = event.PlayerId
}
fmt.Printf("[Player] %s deposited $%.2f\n", name, float64(event.Amount.Amount)/100)
}
func handleCardsDealt(event *examples.CardsDealt) {
for _, player := range event.PlayerCards {
name := playerNames[player.PlayerId]
fmt.Printf("[Hand] %s dealt cards\n", name)
}
}
var stateRouter = angzarr.NewStateRouter("prj-output").
Subscribes("player", []string{"PlayerRegistered", "FundsDeposited"}).
Subscribes("hand", []string{"CardsDealt", "ActionTaken", "PotAwarded"}).
On("PlayerRegistered", handlePlayerRegistered).
On("FundsDeposited", handleFundsDeposited).
On("CardsDealt", handleCardsDealt)
examples/rust/prj-output/src/output_projector_doc.rs
fn build_router() -> StateRouter {
StateRouter::new("prj-output")
.subscribes("player", &["PlayerRegistered", "FundsDeposited"])
.subscribes("hand", &["CardsDealt", "ActionTaken", "PotAwarded"])
.on::<PlayerRegistered>(handle_player_registered)
.on::<FundsDeposited>(handle_funds_deposited)
.on::<CardsDealt>(handle_cards_dealt)
}
fn handle_player_registered(event: &PlayerRegistered, state: &mut ProjectorState) {
state.player_names.insert(event.player_id.clone(), event.display_name.clone());
println!("[Player] {} registered", event.display_name);
}
fn handle_funds_deposited(event: &FundsDeposited, state: &mut ProjectorState) {
let name = state.player_names.get(&event.player_id)
.map(|s| s.as_str())
.unwrap_or(&event.player_id);
println!("[Player] {} deposited", name);
}
fn handle_cards_dealt(event: &CardsDealt, state: &mut ProjectorState) {
for player in &event.player_cards {
let name = state.player_names.get(&player.player_id)
.map(|s| s.as_str())
.unwrap_or(&player.player_id);
println!("[Hand] {} dealt cards", name);
}
}
struct ProjectorState {
player_names: HashMap<String, String>,
}
examples/java/prj-output/src/main/java/dev/angzarr/examples/prjoutput/OutputStateRouter.java
class OutputStateRouterExample {
private static final Map<String, String> playerNames = new HashMap<>();
static void handlePlayerRegistered(PlayerRegistered event) {
playerNames.put(event.getPlayerId(), event.getDisplayName());
System.out.printf("[Player] %s registered%n", event.getDisplayName());
}
static void handleFundsDeposited(FundsDeposited event) {
String name = playerNames.getOrDefault(event.getPlayerId(), event.getPlayerId());
System.out.printf("[Player] %s deposited $%.2f%n", name, event.getAmount().getAmount() / 100.0);
}
static void handleCardsDealt(CardsDealt event) {
for (var player : event.getPlayerCardsList()) {
String name = playerNames.getOrDefault(player.getPlayerId(), player.getPlayerId());
System.out.printf("[Hand] %s dealt cards%n", name);
}
}
static StateRouter buildRouter() {
return new StateRouter("prj-output")
.subscribes("player", new String[]{"PlayerRegistered", "FundsDeposited"})
.subscribes("hand", new String[]{"CardsDealt", "ActionTaken", "PotAwarded"})
.on(PlayerRegistered.class, OutputStateRouterExample::handlePlayerRegistered)
.on(FundsDeposited.class, OutputStateRouterExample::handleFundsDeposited)
.on(CardsDealt.class, OutputStateRouterExample::handleCardsDealt);
}
}
examples/csharp/Prj/Output/OutputProjector.cs
public static class OutputProjectorRouter
{
public static StateRouter BuildRouter()
{
var playerNames = new Dictionary<string, string>();
return new StateRouter("prj-output")
.Subscribes("player", new[] { "PlayerRegistered", "FundsDeposited" })
.Subscribes("hand", new[] { "CardsDealt", "ActionTaken", "PotAwarded" })
.On<PlayerRegistered>(evt => {
playerNames[evt.PlayerId] = evt.DisplayName;
Console.WriteLine($"[Player] {evt.DisplayName} registered");
})
.On<FundsDeposited>(evt => {
var name = playerNames.GetValueOrDefault(evt.PlayerId, evt.PlayerId);
Console.WriteLine($"[Player] {name} deposited ${evt.Amount?.Amount / 100.0:F2}");
})
.On<CardsDealt>(evt => {
foreach (var player in evt.PlayerCards)
{
var name = playerNames.GetValueOrDefault(player.PlayerId, player.PlayerId);
Console.WriteLine($"[Hand] {name} dealt cards");
}
});
}
}
examples/cpp/prj-output/src/output_projector_doc.cpp
std::unordered_map<std::string, std::string> player_names;
void handle_player_registered(const PlayerRegistered& event) {
player_names[event.player_id()] = event.display_name();
std::cout << "[Player] " << event.display_name() << " registered\n";
}
void handle_funds_deposited(const FundsDeposited& event) {
auto it = player_names.find(event.player_id());
std::string name = (it != player_names.end()) ? it->second : event.player_id();
std::cout << "[Player] " << name << " deposited\n";
}
void handle_cards_dealt(const CardsDealt& event) {
for (const auto& player : event.player_cards()) {
auto it = player_names.find(player.player_id());
std::string name = (it != player_names.end()) ? it->second : player.player_id();
std::cout << "[Hand] " << name << " dealt cards\n";
}
}
StateRouter build_router() {
return StateRouter("prj-output")
.subscribes("player", {"PlayerRegistered", "FundsDeposited"})
.subscribes("hand", {"CardsDealt", "ActionTaken", "PotAwarded"})
.on<PlayerRegistered>(handle_player_registered)
.on<FundsDeposited>(handle_funds_deposited)
.on<CardsDealt>(handle_cards_dealt);
}
Multi-Domain Subscription
Projectors can subscribe to events from multiple domains. The Output projector subscribes to player, table, and hand domains:
player domain ──┐
├──→ [Output Projector] ──→ hand_log.txt
table domain ──┤
│
hand domain ──┘
Each language's ProjectorHandler/ProjectorBase accepts multiple domain names:
- Python
- Go
- Rust
- Java
handler = ProjectorHandler("output", "player", "table", "hand")
handler := angzarr.NewProjectorHandler("output", "player", "table", "hand")
let handler = ProjectorHandler::new("output", vec!["player", "table", "hand"]);
List<String> domains = List.of("player", "table", "hand");
ProjectorHandler handler = new ProjectorHandler("output", domains);
Projector Principles
- Read-only — Projectors never modify domain state, only create read views
- Idempotent — Same events always produce same projections
- Catchup safe — Can replay full event history to rebuild state
- Domain aware — Subscribe to specific domains, filter unwanted events
- Stateful OK — Can maintain local state (caches, maps) for rendering
Running Projectors
With ProjectorHandler (All Languages)
# Python
cd examples/python && python -m prj-output.main
# Go
cd examples/go && go run ./prj-output
# Rust
cd examples/rust && cargo run --bin prj-output
# Java
cd examples/java && ./gradlew prj-output:run
# C#
cd examples/csharp && dotnet run --project Prj/Output
# C++
cd examples/cpp && ./build/prj-output
Next Steps
- Aggregates — Command handling examples
- Sagas — Cross-domain coordination
- CloudEvents — External event publishing