ML Training Patterns
Your event store is already a dataset. Editions let you generate synthetic scenarios. PyTorch sees the same proto messages your production code handles.
The Opportunity
Every event in your system carries:
- The state before the decision
- The decision made (command/action)
- The outcome (resulting events)
For supervised learning, this is labeled data. For reinforcement learning, this is experience replay. For anomaly detection, this is the normal distribution.
And you already have it.
Event Streams as Training Data
Reading Historical Events
import torch
from torch.utils.data import IterableDataset
class EventDataset(IterableDataset):
def __init__(self, domain: str, event_types: list[str]):
self.domain = domain
self.event_types = event_types
def __iter__(self):
for event_book in stream_events(self.domain, self.event_types):
# Extract features from state + event
features = extract_features(event_book)
label = extract_label(event_book)
yield torch.tensor(features), torch.tensor(label)
# Use with DataLoader
dataset = EventDataset("hand", ["PlayerActed", "HandComplete"])
loader = DataLoader(dataset, batch_size=32)
Same Proto, Different Context
The proto messages your aggregate handles are the same messages your training pipeline consumes:
# In aggregate
def handle_player_action(cmd: PlayerAction, state: HandState):
# Business logic
...
# In training pipeline
def extract_features(event_book: EventBook) -> np.ndarray:
state = rebuild_state(event_book)
# Same HandState, same fields
return np.array([
state.pot_size,
state.player_stack,
state.position,
...
])
No translation layer. No ETL. The types are the contract.
Generating Counterfactuals with Editions
Real history gives you one path. Editions let you explore alternatives.
Example: Poker Bot Training
def generate_training_data(hand_ids: list[str], n_alternates: int = 10):
samples = []
for hand_id in hand_ids:
# Get the actual outcome
actual = get_event_book("hand", hand_id)
samples.append(extract_sample(actual, is_actual=True))
# Generate counterfactuals
for i in range(n_alternates):
action = sample_alternate_action(actual)
edition = Edition(name=f"train-{hand_id}-{i}")
# What would have happened?
result = speculate(
command=PlayerAction(hand_id=hand_id, action=action),
edition=edition,
)
samples.append(extract_sample(result, is_actual=False))
# Cleanup
delete_edition_events(domain="hand", edition=edition.name)
return samples
Each counterfactual is a complete simulation through your actual aggregate logic—not a simplified model.
Training Patterns
Supervised Learning
Predict outcomes from state:
# Labels: did the player win?
# Features: hand state at decision point
model = HandOutcomePredictor()
for features, label in loader:
pred = model(features)
loss = criterion(pred, label)
loss.backward()
optimizer.step()
Reinforcement Learning
Event history as experience buffer:
class PokerExperienceBuffer:
def __init__(self, domain: str):
self.domain = domain
def sample(self, batch_size: int):
hands = sample_hands(self.domain, batch_size)
experiences = []
for hand in hands:
state = extract_state(hand)
action = extract_action(hand)
reward = extract_reward(hand)
next_state = extract_next_state(hand)
experiences.append((state, action, reward, next_state))
return experiences
# Use with any RL algorithm
buffer = PokerExperienceBuffer("hand")
for epoch in range(1000):
batch = buffer.sample(32)
train_step(agent, batch)
Anomaly Detection
Learn normal patterns, flag deviations:
# Train on historical events
normal_events = stream_events("transactions", ["TransactionCompleted"])
autoencoder = train_autoencoder(normal_events)
# Flag anomalies in real-time via projector
@handles("TransactionCompleted")
def detect_anomaly(event: TransactionCompleted, state):
features = extract_features(event)
reconstruction_error = autoencoder.reconstruction_error(features)
if reconstruction_error > threshold:
alert(event, reconstruction_error)
Integration Architecture
Why Not a Separate ML Pipeline?
| Traditional Approach | Angzarr Approach |
|---|---|
| ETL from production DB | Read events directly |
| Schema drift issues | Proto is the contract |
| Stale training data | Real-time event streams |
| Simulation != production | Same aggregate logic |
The event store is the single source of truth for both production and training.
When You Want Isolation
Direct event access works well for many workloads. But if your training jobs are heavy and you need to isolate load from production, ETL to an external store is straightforward:
- Export events to a data lake (S3, GCS, HDFS)
- Replicate to a read replica dedicated to ML workloads
- Stream to a separate analytical store (ClickHouse, BigQuery)
The proto schema travels with the data. Your training code still sees the same messages—just from a different backing store.
Start by training directly against the event store. When load becomes a concern, migrate to isolated infrastructure. No upfront complexity tax.
Replay from External Sources
Events stored in your data warehouse can flow back through Angzarr. Replay them through the same domains to rebuild state, or route them through entirely different domains to test new aggregate logic, run backtests, or generate derived datasets.
The event store doesn't have to be the origin. Any source that produces valid proto messages can feed the system.
See Also
- Editions — Temporal branching for counterfactuals
- Projections — Building ML feature stores
- Polyglot — Python for ML, Rust for production