Projectors
A projector subscribes to events and performs side effects—typically building read models, writing to databases, or streaming to external systems. Projectors transform the event stream into query-optimized views.
Unlike aggregates and sagas, projectors are read-only from the event sourcing perspective. They observe events but never emit commands back to aggregates.
Common Use Cases
| Projector | Events | Output |
|---|---|---|
| Search indexer | Product events | Elasticsearch updates |
| Dashboard stream | All domain events | WebSocket push |
| Analytics ETL | Transaction events | Data warehouse |
| Cache warmer | Player events | Redis cache |
| Output renderer | Poker events | Console text |
Example: Output Projector
Transforms poker events into human-readable text:
- Python
- Go
- Rust
- Java
- C#
- C++
class OutputProjector(Projector):
def __init__(self):
self.player_names: Dict[str, str] = {}
def handle_player_registered(self, event: player.PlayerRegistered):
self.player_names[event.player_id] = event.display_name
print(f"[Player] {event.display_name} registered")
def handle_funds_deposited(self, event: player.FundsDeposited):
name = self.player_names.get(event.player_id, event.player_id)
amount = event.amount.amount if event.amount else 0
print(f"[Player] {name} deposited ${amount / 100:.2f}")
def handle_cards_dealt(self, event: hand.CardsDealt):
for player_cards in event.player_cards:
name = self.player_names.get(player_cards.player_id, player_cards.player_id)
cards = format_cards(player_cards.hole_cards)
print(f"[Hand] {name} dealt {cards}")
type OutputProjector struct {
playerNames map[string]string
}
func (p *OutputProjector) HandlePlayerRegistered(event *examples.PlayerRegistered) {
p.playerNames[event.PlayerId] = event.DisplayName
fmt.Printf("[Player] %s registered\n", event.DisplayName)
}
func (p *OutputProjector) HandleFundsDeposited(event *examples.FundsDeposited) {
name := p.playerNames[event.PlayerId]
if name == "" {
name = event.PlayerId
}
fmt.Printf("[Player] %s deposited $%.2f\n", name, float64(event.Amount.Amount)/100)
}
func (p *OutputProjector) HandleCardsDealt(event *examples.CardsDealt) {
for _, player := range event.PlayerCards {
name := p.playerNames[player.PlayerId]
cards := formatCards(player.HoleCards)
fmt.Printf("[Hand] %s dealt %s\n", name, cards)
}
}
pub struct OutputProjector {
player_names: HashMap<String, String>,
}
impl OutputProjector {
pub fn handle_player_registered(&mut self, event: &PlayerRegistered) {
self.player_names.insert(event.player_id.clone(), event.display_name.clone());
println!("[Player] {} registered", event.display_name);
}
pub fn handle_funds_deposited(&mut self, event: &FundsDeposited) {
let name = self.player_names.get(&event.player_id)
.map(|s| s.as_str())
.unwrap_or(&event.player_id);
let amount = event.amount.as_ref().map(|a| a.amount).unwrap_or(0);
println!("[Player] {} deposited ${:.2}", name, amount as f64 / 100.0);
}
pub fn handle_cards_dealt(&mut self, event: &CardsDealt) {
for player in &event.player_cards {
let name = self.player_names.get(&player.player_id)
.map(|s| s.as_str())
.unwrap_or(&player.player_id);
let cards = format_cards(&player.hole_cards);
println!("[Hand] {} dealt {}", name, cards);
}
}
}
public class OutputProjectorDoc {
private final Map<String, String> playerNames = new HashMap<>();
@Projects(PlayerRegistered.class)
public void handlePlayerRegistered(PlayerRegistered event) {
playerNames.put(event.getPlayerId(), event.getDisplayName());
System.out.printf("[Player] %s registered%n", event.getDisplayName());
}
@Projects(FundsDeposited.class)
public void handleFundsDeposited(FundsDeposited event) {
String name = playerNames.getOrDefault(event.getPlayerId(), event.getPlayerId());
System.out.printf("[Player] %s deposited $%.2f%n", name, event.getAmount().getAmount() / 100.0);
}
@Projects(CardsDealt.class)
public void handleCardsDealt(CardsDealt event) {
for (var player : event.getPlayerCardsList()) {
String name = playerNames.getOrDefault(player.getPlayerId(), player.getPlayerId());
String cards = formatCards(player.getHoleCardsList());
System.out.printf("[Hand] %s dealt %s%n", name, cards);
}
}
private String formatCards(java.util.List<?> cards) {
return "cards"; // Simplified for documentation
}
}
public class OutputProjector
{
private readonly Dictionary<string, string> _playerNames = new();
[Projects(typeof(PlayerRegistered))]
public void HandlePlayerRegistered(PlayerRegistered @event)
{
_playerNames[@event.PlayerId] = @event.DisplayName;
Console.WriteLine($"[Player] {@event.DisplayName} registered");
}
[Projects(typeof(FundsDeposited))]
public void HandleFundsDeposited(FundsDeposited @event)
{
var name = _playerNames.GetValueOrDefault(@event.PlayerId, @event.PlayerId);
var amount = @event.Amount?.Amount ?? 0;
Console.WriteLine($"[Player] {name} deposited ${amount / 100.0:F2}");
}
[Projects(typeof(CardsDealt))]
public void HandleCardsDealt(CardsDealt @event)
{
foreach (var player in @event.PlayerCards)
{
var name = _playerNames.GetValueOrDefault(player.PlayerId, player.PlayerId);
var cards = FormatCards(player.HoleCards);
Console.WriteLine($"[Hand] {name} dealt {cards}");
}
}
private static string FormatCards(IEnumerable<Card> cards) =>
string.Join(" ", cards.Select(c => $"{c.Rank}{c.Suit}"));
}
class OutputProjector {
public:
void handle_player_registered(const PlayerRegistered& event) {
player_names_[event.player_id()] = event.display_name();
std::cout << "[Player] " << event.display_name() << " registered\n";
}
void handle_funds_deposited(const FundsDeposited& event) {
auto it = player_names_.find(event.player_id());
std::string name = (it != player_names_.end()) ? it->second : event.player_id();
std::cout << "[Player] " << name << " deposited $"
<< std::fixed << std::setprecision(2)
<< (event.amount().amount() / 100.0) << "\n";
}
void handle_cards_dealt(const CardsDealt& event) {
for (const auto& player : event.player_cards()) {
auto it = player_names_.find(player.player_id());
std::string name = (it != player_names_.end()) ? it->second : player.player_id();
std::string cards = format_cards(player.hole_cards());
std::cout << "[Hand] " << name << " dealt " << cards << "\n";
}
}
private:
std::unordered_map<std::string, std::string> player_names_;
std::string format_cards(const auto& cards) {
return "cards"; // Simplified for documentation
}
};
StateRouter
Use the StateRouter to register event handlers:
- Python
- Go
- Rust
- Java
- C#
- C++
player_names: Dict[str, str] = {}
def handle_player_registered(event: player.PlayerRegistered):
player_names[event.player_id] = event.display_name
print(f"[Player] {event.display_name} registered")
def handle_funds_deposited(event: player.FundsDeposited):
name = player_names.get(event.player_id, event.player_id)
print(f"[Player] {name} deposited ${event.amount.amount / 100:.2f}")
def handle_cards_dealt(event: hand.CardsDealt):
for pc in event.player_cards:
name = player_names.get(pc.player_id, pc.player_id)
print(f"[Hand] {name} dealt cards")
router = (
StateRouter("prj-output")
.subscribes("player", ["PlayerRegistered", "FundsDeposited"])
.subscribes("hand", ["CardsDealt", "ActionTaken", "PotAwarded"])
.on("PlayerRegistered", handle_player_registered)
.on("FundsDeposited", handle_funds_deposited)
.on("CardsDealt", handle_cards_dealt)
)
var playerNames = make(map[string]string)
func handlePlayerRegistered(event *examples.PlayerRegistered) {
playerNames[event.PlayerId] = event.DisplayName
fmt.Printf("[Player] %s registered\n", event.DisplayName)
}
func handleFundsDeposited(event *examples.FundsDeposited) {
name := playerNames[event.PlayerId]
if name == "" {
name = event.PlayerId
}
fmt.Printf("[Player] %s deposited $%.2f\n", name, float64(event.Amount.Amount)/100)
}
func handleCardsDealt(event *examples.CardsDealt) {
for _, player := range event.PlayerCards {
name := playerNames[player.PlayerId]
fmt.Printf("[Hand] %s dealt cards\n", name)
}
}
var stateRouter = angzarr.NewStateRouter("prj-output").
Subscribes("player", []string{"PlayerRegistered", "FundsDeposited"}).
Subscribes("hand", []string{"CardsDealt", "ActionTaken", "PotAwarded"}).
On("PlayerRegistered", handlePlayerRegistered).
On("FundsDeposited", handleFundsDeposited).
On("CardsDealt", handleCardsDealt)
fn build_router() -> StateRouter {
StateRouter::new("prj-output")
.subscribes("player", &["PlayerRegistered", "FundsDeposited"])
.subscribes("hand", &["CardsDealt", "ActionTaken", "PotAwarded"])
.on::<PlayerRegistered>(handle_player_registered)
.on::<FundsDeposited>(handle_funds_deposited)
.on::<CardsDealt>(handle_cards_dealt)
}
fn handle_player_registered(event: &PlayerRegistered, state: &mut ProjectorState) {
state.player_names.insert(event.player_id.clone(), event.display_name.clone());
println!("[Player] {} registered", event.display_name);
}
fn handle_funds_deposited(event: &FundsDeposited, state: &mut ProjectorState) {
let name = state.player_names.get(&event.player_id)
.map(|s| s.as_str())
.unwrap_or(&event.player_id);
println!("[Player] {} deposited", name);
}
fn handle_cards_dealt(event: &CardsDealt, state: &mut ProjectorState) {
for player in &event.player_cards {
let name = state.player_names.get(&player.player_id)
.map(|s| s.as_str())
.unwrap_or(&player.player_id);
println!("[Hand] {} dealt cards", name);
}
}
struct ProjectorState {
player_names: HashMap<String, String>,
}
class OutputStateRouterExample {
private static final Map<String, String> playerNames = new HashMap<>();
static void handlePlayerRegistered(PlayerRegistered event) {
playerNames.put(event.getPlayerId(), event.getDisplayName());
System.out.printf("[Player] %s registered%n", event.getDisplayName());
}
static void handleFundsDeposited(FundsDeposited event) {
String name = playerNames.getOrDefault(event.getPlayerId(), event.getPlayerId());
System.out.printf("[Player] %s deposited $%.2f%n", name, event.getAmount().getAmount() / 100.0);
}
static void handleCardsDealt(CardsDealt event) {
for (var player : event.getPlayerCardsList()) {
String name = playerNames.getOrDefault(player.getPlayerId(), player.getPlayerId());
System.out.printf("[Hand] %s dealt cards%n", name);
}
}
static StateRouter buildRouter() {
return new StateRouter("prj-output")
.subscribes("player", new String[]{"PlayerRegistered", "FundsDeposited"})
.subscribes("hand", new String[]{"CardsDealt", "ActionTaken", "PotAwarded"})
.on(PlayerRegistered.class, OutputStateRouterExample::handlePlayerRegistered)
.on(FundsDeposited.class, OutputStateRouterExample::handleFundsDeposited)
.on(CardsDealt.class, OutputStateRouterExample::handleCardsDealt);
}
}
public static class OutputProjectorRouter
{
public static StateRouter BuildRouter()
{
var playerNames = new Dictionary<string, string>();
return new StateRouter("prj-output")
.Subscribes("player", new[] { "PlayerRegistered", "FundsDeposited" })
.Subscribes("hand", new[] { "CardsDealt", "ActionTaken", "PotAwarded" })
.On<PlayerRegistered>(evt => {
playerNames[evt.PlayerId] = evt.DisplayName;
Console.WriteLine($"[Player] {evt.DisplayName} registered");
})
.On<FundsDeposited>(evt => {
var name = playerNames.GetValueOrDefault(evt.PlayerId, evt.PlayerId);
Console.WriteLine($"[Player] {name} deposited ${evt.Amount?.Amount / 100.0:F2}");
})
.On<CardsDealt>(evt => {
foreach (var player in evt.PlayerCards)
{
var name = playerNames.GetValueOrDefault(player.PlayerId, player.PlayerId);
Console.WriteLine($"[Hand] {name} dealt cards");
}
});
}
}
std::unordered_map<std::string, std::string> player_names;
void handle_player_registered(const PlayerRegistered& event) {
player_names[event.player_id()] = event.display_name();
std::cout << "[Player] " << event.display_name() << " registered\n";
}
void handle_funds_deposited(const FundsDeposited& event) {
auto it = player_names.find(event.player_id());
std::string name = (it != player_names.end()) ? it->second : event.player_id();
std::cout << "[Player] " << name << " deposited\n";
}
void handle_cards_dealt(const CardsDealt& event) {
for (const auto& player : event.player_cards()) {
auto it = player_names.find(player.player_id());
std::string name = (it != player_names.end()) ? it->second : player.player_id();
std::cout << "[Hand] " << name << " dealt cards\n";
}
}
StateRouter build_router() {
return StateRouter("prj-output")
.subscribes("player", {"PlayerRegistered", "FundsDeposited"})
.subscribes("hand", {"CardsDealt", "ActionTaken", "PotAwarded"})
.on<PlayerRegistered>(handle_player_registered)
.on<FundsDeposited>(handle_funds_deposited)
.on<CardsDealt>(handle_cards_dealt);
}
Multi-Domain Projectors
Projectors can subscribe to events from multiple domains, but should not unless absolutely required:
# Avoid this pattern when possible
router = StateRouter("prj-output")
.subscribes("player", ["PlayerRegistered", "FundsDeposited"])
.subscribes("table", ["PlayerJoined", "HandStarted"])
.subscribes("hand", ["CardsDealt", "ActionTaken"])
If you need multi-domain subscription, check your domain boundaries first. Needing to join events across domains often indicates the domains are incorrectly partitioned. Consider whether those events belong in the same domain.
When multi-domain is truly necessary (e.g., a cross-cutting analytics view), it's technically safe because projectors only observe—but prefer single-domain projectors where possible for simpler reasoning and deployment.
Synchronous vs Asynchronous
| Mode | Use Case | Behavior |
|---|---|---|
| Async (default) | Analytics, search indexing | Fire-and-forget |
| Sync | Read-after-write | Command waits for projector |
Synchronous projections enable CQRS patterns where commands must return updated read models.
Position Tracking
Projectors track their position in the event stream to enable:
- Catch-up: Resume from last processed event after restart
- Replay: Rebuild read models from scratch
The framework manages position tracking automatically.
Framework Projectors
Angzarr provides built-in projectors for common operational needs:
| Projector | Purpose | Output |
|---|---|---|
| LogService | Debug logging | Console |
| EventService | Event storage | Database |
| OutboundService | Real-time streaming | gRPC |
| TopologyProjector | Component graph | REST/Grafana |
| CloudEvents | External publishing | HTTP/Kafka |
See Framework Projectors for details.
Next Steps
- CloudEvents — Publish events to external systems
- Framework Projectors — Built-in operational projectors
- Process Managers — Stateful multi-domain coordination
- Testing — Testing projectors with Gherkin