Builders
Fluent builders simplify command and query construction. Instead of manually building protobuf messages, use method chaining for readable, type-safe construction.
CommandBuilder
Section titled “CommandBuilder”Constructs commands with fluent method chaining. Handles correlation ID generation, sequence management, and protobuf serialization.
Basic Usage
Section titled “Basic Usage”from angzarr_client import DomainClient
client = DomainClient.connect("localhost:1310")
# Build and execute a commandresponse = client.command("orders", root_id) \ .with_correlation_id("order-123") \ .with_sequence(5) \ .with_command("type.googleapis.com/examples.UpdateOrder", update_cmd) \ .execute()Creating New Aggregates
Section titled “Creating New Aggregates”When creating a new aggregate, omit the root UUID. The coordinator generates one and returns it in the response:
# Create a new aggregateresponse = client.command_new("orders") \ .with_correlation_id("order-123") \ .with_command("type.googleapis.com/examples.CreateOrder", create_cmd) \ .execute()
# Extract the generated root IDnew_root_id = helpers.root_uuid(response.events)Auto-Generated Correlation IDs
Section titled “Auto-Generated Correlation IDs”If you don’t specify a correlation ID, the builder generates a UUID automatically:
// These are equivalent:.withCorrelationId(UUID.randomUUID().toString())// vs just omitting it - builder generates oneQueryBuilder
Section titled “QueryBuilder”Constructs event queries with support for temporal queries, sequence ranges, and filtering.
Basic Query
Section titled “Basic Query”from angzarr_client import DomainClient
client = DomainClient.connect("localhost:1310")
# Query all events for an aggregateevents = client.query("orders", root_id).get_event_book()Temporal Queries
Section titled “Temporal Queries”Reconstruct state as it existed at a specific point in time:
# As of a specific sequence numberevents = client.query("orders", root_id) \ .as_of_sequence(10) \ .get_event_book()
# As of a specific timestamp (RFC3339)events = client.query("orders", root_id) \ .as_of_time("2024-01-15T10:30:00Z") \ .get_event_book()Sequence Ranges
Section titled “Sequence Ranges”Fetch a subset of events for pagination or incremental sync:
# Events from sequence 5 onwardsevents = client.query("orders", root_id) \ .range(5) \ .get_event_book()
# Events in range [5, 15]events = client.query("orders", root_id) \ .range_to(5, 15) \ .get_event_book()Query by Correlation ID
Section titled “Query by Correlation ID”Fetch events across aggregates that share a correlation ID (workflow tracking):
# Query by correlation IDevents = client.query_domain("orders") \ .by_correlation_id("corr-456") \ .get_event_book()Builder Methods Reference
Section titled “Builder Methods Reference”CommandBuilder
Section titled “CommandBuilder”| Method | Description |
|---|---|
with_correlation_id(id) | Set correlation ID for distributed tracing |
with_sequence(seq) | Set expected sequence for optimistic locking |
with_command(type_url, msg) | Set the command payload |
build() | Build the CommandBook without executing |
execute() | Build and execute the command |
QueryBuilder
Section titled “QueryBuilder”| Method | Description |
|---|---|
by_correlation_id(id) | Query by correlation ID across aggregates |
with_edition(edition) | Filter by schema edition |
range(lower) | Events from sequence lower onwards |
range_to(lower, upper) | Events in sequence range |
as_of_sequence(seq) | Temporal query at sequence |
as_of_time(rfc3339) | Temporal query at timestamp |
build() | Build the Query without executing |
get_event_book() | Execute and return EventBook |
get_pages() | Execute and return just the pages |
Next Steps
Section titled “Next Steps”- Error Handling — Error types and introspection methods
- Speculative Execution — What-if scenarios without persistence