Skip to main content

Projectors

A projector subscribes to events and performs side effects—typically building read models, writing to databases, or streaming to external systems. Projectors transform the event stream into query-optimized views.

Unlike aggregates and sagas, projectors are read-only from the event sourcing perspective. They observe events but never emit commands back to aggregates.


Common Use Cases

ProjectorEventsOutput
Search indexerProduct eventsElasticsearch updates
Dashboard streamAll domain eventsWebSocket push
Analytics ETLTransaction eventsData warehouse
Cache warmerPlayer eventsRedis cache
Output rendererPoker eventsConsole text

Example: Output Projector

Transforms poker events into human-readable text:

class OutputProjector(Projector):
def __init__(self):
self.player_names: Dict[str, str] = {}

def handle_player_registered(self, event: player.PlayerRegistered):
self.player_names[event.player_id] = event.display_name
print(f"[Player] {event.display_name} registered")

def handle_funds_deposited(self, event: player.FundsDeposited):
name = self.player_names.get(event.player_id, event.player_id)
amount = event.amount.amount if event.amount else 0
print(f"[Player] {name} deposited ${amount / 100:.2f}")

def handle_cards_dealt(self, event: hand.CardsDealt):
for player_cards in event.player_cards:
name = self.player_names.get(player_cards.player_id, player_cards.player_id)
cards = format_cards(player_cards.hole_cards)
print(f"[Hand] {name} dealt {cards}")

StateRouter

Use the StateRouter to register event handlers:

player_names: Dict[str, str] = {}


def handle_player_registered(event: player.PlayerRegistered):
player_names[event.player_id] = event.display_name
print(f"[Player] {event.display_name} registered")


def handle_funds_deposited(event: player.FundsDeposited):
name = player_names.get(event.player_id, event.player_id)
print(f"[Player] {name} deposited ${event.amount.amount / 100:.2f}")


def handle_cards_dealt(event: hand.CardsDealt):
for pc in event.player_cards:
name = player_names.get(pc.player_id, pc.player_id)
print(f"[Hand] {name} dealt cards")


router = (
StateRouter("prj-output")
.subscribes("player", ["PlayerRegistered", "FundsDeposited"])
.subscribes("hand", ["CardsDealt", "ActionTaken", "PotAwarded"])
.on("PlayerRegistered", handle_player_registered)
.on("FundsDeposited", handle_funds_deposited)
.on("CardsDealt", handle_cards_dealt)
)

Multi-Domain Projectors

Projectors can subscribe to events from multiple domains, but should not unless absolutely required:

# Avoid this pattern when possible
router = StateRouter("prj-output")
.subscribes("player", ["PlayerRegistered", "FundsDeposited"])
.subscribes("table", ["PlayerJoined", "HandStarted"])
.subscribes("hand", ["CardsDealt", "ActionTaken"])

If you need multi-domain subscription, check your domain boundaries first. Needing to join events across domains often indicates the domains are incorrectly partitioned. Consider whether those events belong in the same domain.

When multi-domain is truly necessary (e.g., a cross-cutting analytics view), it's technically safe because projectors only observe—but prefer single-domain projectors where possible for simpler reasoning and deployment.


Synchronous vs Asynchronous

ModeUse CaseBehavior
Async (default)Analytics, search indexingFire-and-forget
SyncRead-after-writeCommand waits for projector

Synchronous projections enable CQRS patterns where commands must return updated read models.


Position Tracking

Projectors track their position in the event stream to enable:

  • Catch-up: Resume from last processed event after restart
  • Replay: Rebuild read models from scratch

The framework manages position tracking automatically.


Framework Projectors

Angzarr provides built-in projectors for common operational needs:

ProjectorPurposeOutput
LogServiceDebug loggingConsole
EventServiceEvent storageDatabase
OutboundServiceReal-time streaminggRPC
TopologyProjectorComponent graphREST/Grafana
CloudEventsExternal publishingHTTP/Kafka

See Framework Projectors for details.


Next Steps