ML Training Patterns
Your event store is already a dataset. Editions let you generate synthetic scenarios. PyTorch sees the same proto messages your production code handles.
The Opportunity
Section titled “The Opportunity”Every event in your system carries:
- The state before the decision
- The decision made (command/action)
- The outcome (resulting events)
For supervised learning, this is labeled data. For reinforcement learning, this is experience replay. For anomaly detection, this is the normal distribution.
And you already have it.
Event Streams as Training Data
Section titled “Event Streams as Training Data”Reading Historical Events
Section titled “Reading Historical Events”import torchfrom torch.utils.data import IterableDataset
class EventDataset(IterableDataset): def __init__(self, domain: str, event_types: list[str]): self.domain = domain self.event_types = event_types
def __iter__(self): for event_book in stream_events(self.domain, self.event_types): # Extract features from state + event features = extract_features(event_book) label = extract_label(event_book) yield torch.tensor(features), torch.tensor(label)
# Use with DataLoaderdataset = EventDataset("hand", ["PlayerActed", "HandComplete"])loader = DataLoader(dataset, batch_size=32)Same Proto, Different Context
Section titled “Same Proto, Different Context”The proto messages your aggregate handles are the same messages your training pipeline consumes:
# In aggregatedef handle_player_action(cmd: PlayerAction, state: HandState): # Business logic ...
# In training pipelinedef extract_features(event_book: EventBook) -> np.ndarray: state = rebuild_state(event_book) # Same HandState, same fields return np.array([ state.pot_size, state.player_stack, state.position, ... ])No translation layer. No ETL. The types are the contract.
Generating Counterfactuals with Editions
Section titled “Generating Counterfactuals with Editions”Real history gives you one path. Editions let you explore alternatives.
Example: Poker Bot Training
Section titled “Example: Poker Bot Training”def generate_training_data(hand_ids: list[str], n_alternates: int = 10): samples = []
for hand_id in hand_ids: # Get the actual outcome actual = get_event_book("hand", hand_id) samples.append(extract_sample(actual, is_actual=True))
# Generate counterfactuals for i in range(n_alternates): action = sample_alternate_action(actual) edition = Edition(name=f"train-{hand_id}-{i}")
# What would have happened? result = speculate( command=PlayerAction(hand_id=hand_id, action=action), edition=edition, )
samples.append(extract_sample(result, is_actual=False))
# Cleanup delete_edition_events(domain="hand", edition=edition.name)
return samplesEach counterfactual is a complete simulation through your actual aggregate logic—not a simplified model.
Training Patterns
Section titled “Training Patterns”Supervised Learning
Section titled “Supervised Learning”Predict outcomes from state:
# Labels: did the player win?# Features: hand state at decision point
model = HandOutcomePredictor()for features, label in loader: pred = model(features) loss = criterion(pred, label) loss.backward() optimizer.step()Reinforcement Learning
Section titled “Reinforcement Learning”Event history as experience buffer:
class PokerExperienceBuffer: def __init__(self, domain: str): self.domain = domain
def sample(self, batch_size: int): hands = sample_hands(self.domain, batch_size) experiences = [] for hand in hands: state = extract_state(hand) action = extract_action(hand) reward = extract_reward(hand) next_state = extract_next_state(hand) experiences.append((state, action, reward, next_state)) return experiences
# Use with any RL algorithmbuffer = PokerExperienceBuffer("hand")for epoch in range(1000): batch = buffer.sample(32) train_step(agent, batch)Anomaly Detection
Section titled “Anomaly Detection”Learn normal patterns, flag deviations:
# Train on historical eventsnormal_events = stream_events("transactions", ["TransactionCompleted"])autoencoder = train_autoencoder(normal_events)
# Flag anomalies in real-time via projector@handles("TransactionCompleted")def detect_anomaly(event: TransactionCompleted, state): features = extract_features(event) reconstruction_error = autoencoder.reconstruction_error(features) if reconstruction_error > threshold: alert(event, reconstruction_error)Integration Architecture
Section titled “Integration Architecture”flowchart TB
subgraph store["Event Store"]
events[(Events)]
end
subgraph consumers["Consumers"]
prod["Production<br/>Aggregates"]
train["Training<br/>Pipeline"]
serve["Serving<br/>Projector"]
end
subgraph ml["ML"]
model["PyTorch<br/>Model"]
end
events --> prod
events --> train
events --> serve
train --> model
model -.->|"Same proto messages"| prod
model -.->|"Same proto messages"| serve
Why Not a Separate ML Pipeline?
Section titled “Why Not a Separate ML Pipeline?”| Traditional Approach | Angzarr Approach |
|---|---|
| ETL from production DB | Read events directly |
| Schema drift issues | Proto is the contract |
| Stale training data | Real-time event streams |
| Simulation != production | Same aggregate logic |
The event store is the single source of truth for both production and training.
When You Want Isolation
Section titled “When You Want Isolation”Direct event access works well for many workloads. But if your training jobs are heavy and you need to isolate load from production, ETL to an external store is straightforward:
- Export events to a data lake (S3, GCS, HDFS)
- Replicate to a read replica dedicated to ML workloads
- Stream to a separate analytical store (ClickHouse, BigQuery)
The proto schema travels with the data. Your training code still sees the same messages—just from a different backing store.
Start by training directly against the event store. When load becomes a concern, migrate to isolated infrastructure. No upfront complexity tax.
Replay from External Sources
Section titled “Replay from External Sources”Events stored in your data warehouse can flow back through Angzarr. Replay them through the same domains to rebuild state, or route them through entirely different domains to test new aggregate logic, run backtests, or generate derived datasets.
The event store doesn’t have to be the origin. Any source that produces valid proto messages can feed the system.
See Also
Section titled “See Also”- Editions — Temporal branching for counterfactuals
- Projections — Building ML feature stores
- Polyglot — Python for ML, Rust for production