Skip to main content

Projector Examples

Read-side projectors from the poker domain. All code is from the actual examples/ directory.

Projectors subscribe to events and produce read-optimized views: text logs, database tables, search indexes, or external API calls.


Implementation Styles

Angzarr supports two projector implementation styles:

StyleDescriptionBest For
OO (Object-Oriented)Projector class with @projects/[Projects] decoratorsRich state, encapsulation
FunctionalStateRouter or ProjectorHandler with function handlersSimple projections, stateless transforms
LanguageOOFunctional
Python
Java
C#
Rust
Go
C++

Output Projector

The Output Projector subscribes to events from multiple domains (player, table, hand) and writes formatted game logs to a file. This demonstrates a multi-domain projector.

examples/cpp/prj-output/src/main.cpp

/// gRPC service implementation for output projector.
class OutputProjectorService final : public angzarr::ProjectorService::Service {
public:
explicit OutputProjectorService(const std::string& log_path, bool show_timestamps = true)
: log_path_(log_path),
log_file_(log_path, std::ios::app),
show_timestamps_(show_timestamps) {}

~OutputProjectorService() {
if (log_file_.is_open()) {
log_file_.close();
}
}

grpc::Status Handle(grpc::ServerContext* context, const angzarr::EventBook* request,
angzarr::Projection* response) override {
return process_event_book(*request, response);
}

grpc::Status HandleSpeculative(grpc::ServerContext* context, const angzarr::EventBook* request,
angzarr::Projection* response) override {
(void)context;
// Speculative mode - don't write to file
uint32_t seq = 0;
for (const auto& page : request->pages()) {
if (page.has_header() && page.header().has_sequence()) {
seq = page.header().sequence();
}
}

response->mutable_cover()->CopyFrom(request->cover());
response->set_projector(PROJECTOR_NAME);
response->set_sequence(seq);

return grpc::Status::OK;
}

private:
grpc::Status process_event_book(const angzarr::EventBook& event_book,
angzarr::Projection* response) {
uint32_t seq = 0;

for (const auto& page : event_book.pages()) {
const auto& event_any = page.event();
if (page.has_header() && page.header().has_sequence()) {
seq = page.header().sequence();
}

// Format and write event
std::string formatted = format_event(event_any, event_book.cover().domain());
if (!formatted.empty()) {
write_line(formatted);
}
}

response->mutable_cover()->CopyFrom(event_book.cover());
response->set_projector(PROJECTOR_NAME);
response->set_sequence(seq);

return grpc::Status::OK;
}

std::string format_event(const google::protobuf::Any& event_any, const std::string& domain) {
std::string prefix;
if (show_timestamps_) {
auto now = std::chrono::system_clock::now();
auto time = std::chrono::system_clock::to_time_t(now);
std::ostringstream ss;
ss << std::put_time(std::localtime(&time), "%H:%M:%S");
prefix = "[" + ss.str() + "] ";
}

const std::string& type_url = event_any.type_url();

// Player events
if (type_url.find("PlayerRegistered") != std::string::npos) {
examples::PlayerRegistered event;
event_any.UnpackTo(&event);
return prefix + "Player registered: " + event.display_name();
}

if (type_url.find("FundsDeposited") != std::string::npos) {
examples::FundsDeposited event;
event_any.UnpackTo(&event);
return prefix + "Funds deposited: " + std::to_string(event.new_balance().amount());
}

// Table events
if (type_url.find("TableCreated") != std::string::npos) {
examples::TableCreated event;
event_any.UnpackTo(&event);
return prefix + "Table created: " + event.table_name();
}

if (type_url.find("PlayerJoined") != std::string::npos) {
examples::PlayerJoined event;
event_any.UnpackTo(&event);
return prefix + "Player joined at position " + std::to_string(event.seat_position());
}

if (type_url.find("HandStarted") != std::string::npos) {
examples::HandStarted event;
event_any.UnpackTo(&event);
return prefix + "Hand started: dealer position " +
std::to_string(event.dealer_position());
}

// Hand events
if (type_url.find("CardsDealt") != std::string::npos) {
examples::CardsDealt event;
event_any.UnpackTo(&event);
return prefix + "Cards dealt to " + std::to_string(event.players_size()) + " players";
}

if (type_url.find("BlindPosted") != std::string::npos) {
examples::BlindPosted event;
event_any.UnpackTo(&event);
return prefix + "Blind posted: " + std::to_string(event.amount());
}

if (type_url.find("ActionTaken") != std::string::npos) {
examples::ActionTaken event;
event_any.UnpackTo(&event);
return prefix + "Action: " + examples::ActionType_Name(event.action());
}

if (type_url.find("CommunityCardsDealt") != std::string::npos) {
examples::CommunityCardsDealt event;
event_any.UnpackTo(&event);
return prefix + "Community cards dealt: " + std::to_string(event.cards_size()) +
" cards";
}

if (type_url.find("PotAwarded") != std::string::npos) {
examples::PotAwarded event;
event_any.UnpackTo(&event);
int64_t total = 0;
for (const auto& winner : event.winners()) {
total += winner.amount();
}
return prefix + "Pot awarded: " + std::to_string(total);
}

if (type_url.find("HandComplete") != std::string::npos) {
examples::HandComplete event;
event_any.UnpackTo(&event);
return prefix + "Hand complete";
}

// Return empty for unknown event types
return "";
}

void write_line(const std::string& text) {
if (log_file_.is_open()) {
log_file_ << text << std::endl;
log_file_.flush();
}
std::cout << text << std::endl;
}

std::string log_path_;
std::ofstream log_file_;
bool show_timestamps_;
};

StateRouter Pattern

The StateRouter pattern provides fluent event handler registration with explicit state management. It's the functional alternative to OO projectors:

examples/cpp/prj-output/src/output_projector_doc.cpp

std::unordered_map<std::string, std::string> player_names;

void handle_player_registered(const PlayerRegistered& event) {
player_names[event.player_id()] = event.display_name();
std::cout << "[Player] " << event.display_name() << " registered\n";
}

void handle_funds_deposited(const FundsDeposited& event) {
auto it = player_names.find(event.player_id());
std::string name = (it != player_names.end()) ? it->second : event.player_id();
std::cout << "[Player] " << name << " deposited\n";
}

void handle_cards_dealt(const CardsDealt& event) {
for (const auto& player : event.player_cards()) {
auto it = player_names.find(player.player_id());
std::string name = (it != player_names.end()) ? it->second : player.player_id();
std::cout << "[Hand] " << name << " dealt cards\n";
}
}

StateRouter build_router() {
return StateRouter("prj-output")
.subscribes("player", {"PlayerRegistered", "FundsDeposited"})
.subscribes("hand", {"CardsDealt", "ActionTaken", "PotAwarded"})
.on<PlayerRegistered>(handle_player_registered)
.on<FundsDeposited>(handle_funds_deposited)
.on<CardsDealt>(handle_cards_dealt);
}

Multi-Domain Subscription

Projectors can subscribe to events from multiple domains. The Output projector subscribes to player, table, and hand domains:

player domain   ──┐
├──→ [Output Projector] ──→ hand_log.txt
table domain ──┤

hand domain ──┘

Each language's ProjectorHandler/ProjectorBase accepts multiple domain names:

handler = ProjectorHandler("output", "player", "table", "hand")

Projector Principles

  1. Read-only — Projectors never modify domain state, only create read views
  2. Idempotent — Same events always produce same projections
  3. Catchup safe — Can replay full event history to rebuild state
  4. Domain aware — Subscribe to specific domains, filter unwanted events
  5. Stateful OK — Can maintain local state (caches, maps) for rendering

Running Projectors

With ProjectorHandler (All Languages)

# Python
cd examples/python && python -m prj-output.main

# Go
cd examples/go && go run ./prj-output

# Rust
cd examples/rust && cargo run --bin prj-output

# Java
cd examples/java && ./gradlew prj-output:run

# C#
cd examples/csharp && dotnet run --project Prj/Output

# C++
cd examples/cpp && ./build/prj-output

Next Steps