Projector Examples
Read-side projectors from the poker domain. All code is from the actual examples/ directory.
Projectors subscribe to events and produce read-optimized views: text logs, database tables, search indexes, or external API calls.
Implementation Styles
Angzarr supports two projector implementation styles:
| Style | Description | Best For |
|---|---|---|
| OO (Object-Oriented) | Projector class with @projects/[Projects] decorators | Rich state, encapsulation |
| Functional | StateRouter or ProjectorHandler with function handlers | Simple projections, stateless transforms |
| Language | OO | Functional |
|---|---|---|
| Python | ✓ | ✓ |
| Java | ✓ | ✓ |
| C# | ✓ | ✓ |
| Rust | — | ✓ |
| Go | ✓ | ✓ |
| C++ | ✓ | ✓ |
Output Projector
The Output Projector subscribes to events from multiple domains (player, table, hand) and writes formatted game logs to a file. This demonstrates a multi-domain projector.
examples/cpp/prj-output/src/main.cpp
/// gRPC service implementation for output projector.
class OutputProjectorService final : public angzarr::ProjectorService::Service {
public:
explicit OutputProjectorService(const std::string& log_path, bool show_timestamps = true)
: log_path_(log_path),
log_file_(log_path, std::ios::app),
show_timestamps_(show_timestamps) {}
~OutputProjectorService() {
if (log_file_.is_open()) {
log_file_.close();
}
}
grpc::Status Handle(grpc::ServerContext* context, const angzarr::EventBook* request,
angzarr::Projection* response) override {
return process_event_book(*request, response);
}
grpc::Status HandleSpeculative(grpc::ServerContext* context, const angzarr::EventBook* request,
angzarr::Projection* response) override {
(void)context;
// Speculative mode - don't write to file
uint32_t seq = 0;
for (const auto& page : request->pages()) {
if (page.has_header() && page.header().has_sequence()) {
seq = page.header().sequence();
}
}
response->mutable_cover()->CopyFrom(request->cover());
response->set_projector(PROJECTOR_NAME);
response->set_sequence(seq);
return grpc::Status::OK;
}
private:
grpc::Status process_event_book(const angzarr::EventBook& event_book,
angzarr::Projection* response) {
uint32_t seq = 0;
for (const auto& page : event_book.pages()) {
const auto& event_any = page.event();
if (page.has_header() && page.header().has_sequence()) {
seq = page.header().sequence();
}
// Format and write event
std::string formatted = format_event(event_any, event_book.cover().domain());
if (!formatted.empty()) {
write_line(formatted);
}
}
response->mutable_cover()->CopyFrom(event_book.cover());
response->set_projector(PROJECTOR_NAME);
response->set_sequence(seq);
return grpc::Status::OK;
}
std::string format_event(const google::protobuf::Any& event_any, const std::string& domain) {
std::string prefix;
if (show_timestamps_) {
auto now = std::chrono::system_clock::now();
auto time = std::chrono::system_clock::to_time_t(now);
std::ostringstream ss;
ss << std::put_time(std::localtime(&time), "%H:%M:%S");
prefix = "[" + ss.str() + "] ";
}
const std::string& type_url = event_any.type_url();
// Player events
if (type_url.find("PlayerRegistered") != std::string::npos) {
examples::PlayerRegistered event;
event_any.UnpackTo(&event);
return prefix + "Player registered: " + event.display_name();
}
if (type_url.find("FundsDeposited") != std::string::npos) {
examples::FundsDeposited event;
event_any.UnpackTo(&event);
return prefix + "Funds deposited: " + std::to_string(event.new_balance().amount());
}
// Table events
if (type_url.find("TableCreated") != std::string::npos) {
examples::TableCreated event;
event_any.UnpackTo(&event);
return prefix + "Table created: " + event.table_name();
}
if (type_url.find("PlayerJoined") != std::string::npos) {
examples::PlayerJoined event;
event_any.UnpackTo(&event);
return prefix + "Player joined at position " + std::to_string(event.seat_position());
}
if (type_url.find("HandStarted") != std::string::npos) {
examples::HandStarted event;
event_any.UnpackTo(&event);
return prefix + "Hand started: dealer position " +
std::to_string(event.dealer_position());
}
// Hand events
if (type_url.find("CardsDealt") != std::string::npos) {
examples::CardsDealt event;
event_any.UnpackTo(&event);
return prefix + "Cards dealt to " + std::to_string(event.players_size()) + " players";
}
if (type_url.find("BlindPosted") != std::string::npos) {
examples::BlindPosted event;
event_any.UnpackTo(&event);
return prefix + "Blind posted: " + std::to_string(event.amount());
}
if (type_url.find("ActionTaken") != std::string::npos) {
examples::ActionTaken event;
event_any.UnpackTo(&event);
return prefix + "Action: " + examples::ActionType_Name(event.action());
}
if (type_url.find("CommunityCardsDealt") != std::string::npos) {
examples::CommunityCardsDealt event;
event_any.UnpackTo(&event);
return prefix + "Community cards dealt: " + std::to_string(event.cards_size()) +
" cards";
}
if (type_url.find("PotAwarded") != std::string::npos) {
examples::PotAwarded event;
event_any.UnpackTo(&event);
int64_t total = 0;
for (const auto& winner : event.winners()) {
total += winner.amount();
}
return prefix + "Pot awarded: " + std::to_string(total);
}
if (type_url.find("HandComplete") != std::string::npos) {
examples::HandComplete event;
event_any.UnpackTo(&event);
return prefix + "Hand complete";
}
// Return empty for unknown event types
return "";
}
void write_line(const std::string& text) {
if (log_file_.is_open()) {
log_file_ << text << std::endl;
log_file_.flush();
}
std::cout << text << std::endl;
}
std::string log_path_;
std::ofstream log_file_;
bool show_timestamps_;
};
StateRouter Pattern
The StateRouter pattern provides fluent event handler registration with explicit state management. It's the functional alternative to OO projectors:
examples/cpp/prj-output/src/output_projector_doc.cpp
std::unordered_map<std::string, std::string> player_names;
void handle_player_registered(const PlayerRegistered& event) {
player_names[event.player_id()] = event.display_name();
std::cout << "[Player] " << event.display_name() << " registered\n";
}
void handle_funds_deposited(const FundsDeposited& event) {
auto it = player_names.find(event.player_id());
std::string name = (it != player_names.end()) ? it->second : event.player_id();
std::cout << "[Player] " << name << " deposited\n";
}
void handle_cards_dealt(const CardsDealt& event) {
for (const auto& player : event.player_cards()) {
auto it = player_names.find(player.player_id());
std::string name = (it != player_names.end()) ? it->second : player.player_id();
std::cout << "[Hand] " << name << " dealt cards\n";
}
}
StateRouter build_router() {
return StateRouter("prj-output")
.subscribes("player", {"PlayerRegistered", "FundsDeposited"})
.subscribes("hand", {"CardsDealt", "ActionTaken", "PotAwarded"})
.on<PlayerRegistered>(handle_player_registered)
.on<FundsDeposited>(handle_funds_deposited)
.on<CardsDealt>(handle_cards_dealt);
}
Multi-Domain Subscription
Projectors can subscribe to events from multiple domains. The Output projector subscribes to player, table, and hand domains:
player domain ──┐
├──→ [Output Projector] ──→ hand_log.txt
table domain ──┤
│
hand domain ──┘
Each language's ProjectorHandler/ProjectorBase accepts multiple domain names:
- Python
- Go
- Rust
- Java
handler = ProjectorHandler("output", "player", "table", "hand")
handler := angzarr.NewProjectorHandler("output", "player", "table", "hand")
let handler = ProjectorHandler::new("output", vec!["player", "table", "hand"]);
List<String> domains = List.of("player", "table", "hand");
ProjectorHandler handler = new ProjectorHandler("output", domains);
Projector Principles
- Read-only — Projectors never modify domain state, only create read views
- Idempotent — Same events always produce same projections
- Catchup safe — Can replay full event history to rebuild state
- Domain aware — Subscribe to specific domains, filter unwanted events
- Stateful OK — Can maintain local state (caches, maps) for rendering
Running Projectors
With ProjectorHandler (All Languages)
# Python
cd examples/python && python -m prj-output.main
# Go
cd examples/go && go run ./prj-output
# Rust
cd examples/rust && cargo run --bin prj-output
# Java
cd examples/java && ./gradlew prj-output:run
# C#
cd examples/csharp && dotnet run --project Prj/Output
# C++
cd examples/cpp && ./build/prj-output
Next Steps
- Aggregates — Command handling examples
- Sagas — Cross-domain coordination
- CloudEvents — External event publishing