Skip to content

ML Training Patterns

Your event store is already a dataset. Editions let you generate synthetic scenarios. PyTorch sees the same proto messages your production code handles.


Every event in your system carries:

  • The state before the decision
  • The decision made (command/action)
  • The outcome (resulting events)

For supervised learning, this is labeled data. For reinforcement learning, this is experience replay. For anomaly detection, this is the normal distribution.

And you already have it.


illustrative - PyTorch dataset
import torch
from torch.utils.data import IterableDataset
class EventDataset(IterableDataset):
def __init__(self, domain: str, event_types: list[str]):
self.domain = domain
self.event_types = event_types
def __iter__(self):
for event_book in stream_events(self.domain, self.event_types):
# Extract features from state + event
features = extract_features(event_book)
label = extract_label(event_book)
yield torch.tensor(features), torch.tensor(label)
# Use with DataLoader
dataset = EventDataset("hand", ["PlayerActed", "HandComplete"])
loader = DataLoader(dataset, batch_size=32)

The proto messages your aggregate handles are the same messages your training pipeline consumes:

illustrative - shared proto messages
# In aggregate
def handle_player_action(cmd: PlayerAction, state: HandState):
# Business logic
...
# In training pipeline
def extract_features(event_book: EventBook) -> np.ndarray:
state = rebuild_state(event_book)
# Same HandState, same fields
return np.array([
state.pot_size,
state.player_stack,
state.position,
...
])

No translation layer. No ETL. The types are the contract.


Real history gives you one path. Editions let you explore alternatives.

illustrative - counterfactual generation
def generate_training_data(hand_ids: list[str], n_alternates: int = 10):
samples = []
for hand_id in hand_ids:
# Get the actual outcome
actual = get_event_book("hand", hand_id)
samples.append(extract_sample(actual, is_actual=True))
# Generate counterfactuals
for i in range(n_alternates):
action = sample_alternate_action(actual)
edition = Edition(name=f"train-{hand_id}-{i}")
# What would have happened?
result = speculate(
command=PlayerAction(hand_id=hand_id, action=action),
edition=edition,
)
samples.append(extract_sample(result, is_actual=False))
# Cleanup
delete_edition_events(domain="hand", edition=edition.name)
return samples

Each counterfactual is a complete simulation through your actual aggregate logic—not a simplified model.


Predict outcomes from state:

illustrative - supervised learning loop
# Labels: did the player win?
# Features: hand state at decision point
model = HandOutcomePredictor()
for features, label in loader:
pred = model(features)
loss = criterion(pred, label)
loss.backward()
optimizer.step()

Event history as experience buffer:

illustrative - RL experience buffer
class PokerExperienceBuffer:
def __init__(self, domain: str):
self.domain = domain
def sample(self, batch_size: int):
hands = sample_hands(self.domain, batch_size)
experiences = []
for hand in hands:
state = extract_state(hand)
action = extract_action(hand)
reward = extract_reward(hand)
next_state = extract_next_state(hand)
experiences.append((state, action, reward, next_state))
return experiences
# Use with any RL algorithm
buffer = PokerExperienceBuffer("hand")
for epoch in range(1000):
batch = buffer.sample(32)
train_step(agent, batch)

Learn normal patterns, flag deviations:

illustrative - anomaly detection
# Train on historical events
normal_events = stream_events("transactions", ["TransactionCompleted"])
autoencoder = train_autoencoder(normal_events)
# Flag anomalies in real-time via projector
@handles("TransactionCompleted")
def detect_anomaly(event: TransactionCompleted, state):
features = extract_features(event)
reconstruction_error = autoencoder.reconstruction_error(features)
if reconstruction_error > threshold:
alert(event, reconstruction_error)

flowchart TB
    subgraph store["Event Store"]
        events[(Events)]
    end

    subgraph consumers["Consumers"]
        prod["Production<br/>Aggregates"]
        train["Training<br/>Pipeline"]
        serve["Serving<br/>Projector"]
    end

    subgraph ml["ML"]
        model["PyTorch<br/>Model"]
    end

    events --> prod
    events --> train
    events --> serve
    train --> model
    model -.->|"Same proto messages"| prod
    model -.->|"Same proto messages"| serve

Traditional ApproachAngzarr Approach
ETL from production DBRead events directly
Schema drift issuesProto is the contract
Stale training dataReal-time event streams
Simulation != productionSame aggregate logic

The event store is the single source of truth for both production and training.

Direct event access works well for many workloads. But if your training jobs are heavy and you need to isolate load from production, ETL to an external store is straightforward:

  • Export events to a data lake (S3, GCS, HDFS)
  • Replicate to a read replica dedicated to ML workloads
  • Stream to a separate analytical store (ClickHouse, BigQuery)

The proto schema travels with the data. Your training code still sees the same messages—just from a different backing store.

Start by training directly against the event store. When load becomes a concern, migrate to isolated infrastructure. No upfront complexity tax.

Events stored in your data warehouse can flow back through Angzarr. Replay them through the same domains to rebuild state, or route them through entirely different domains to test new aggregate logic, run backtests, or generate derived datasets.

The event store doesn’t have to be the origin. Any source that produces valid proto messages can feed the system.


  • Editions — Temporal branching for counterfactuals
  • Projections — Building ML feature stores
  • Polyglot — Python for ML, Rust for production