Skip to content

Builders

Fluent builders simplify command and query construction. Instead of manually building protobuf messages, use method chaining for readable, type-safe construction.


Constructs commands with fluent method chaining. Handles correlation ID generation, sequence management, and protobuf serialization.

from angzarr_client import DomainClient
client = DomainClient.connect("localhost:1310")
# Build and execute a command
response = client.command("orders", root_id) \
.with_correlation_id("order-123") \
.with_sequence(5) \
.with_command("type.googleapis.com/examples.UpdateOrder", update_cmd) \
.execute()

When creating a new aggregate, omit the root UUID. The coordinator generates one and returns it in the response:

# Create a new aggregate
response = client.command_new("orders") \
.with_correlation_id("order-123") \
.with_command("type.googleapis.com/examples.CreateOrder", create_cmd) \
.execute()
# Extract the generated root ID
new_root_id = helpers.root_uuid(response.events)

If you don’t specify a correlation ID, the builder generates a UUID automatically:

illustrative - auto-generated correlation ID
// These are equivalent:
.withCorrelationId(UUID.randomUUID().toString())
// vs just omitting it - builder generates one

Constructs event queries with support for temporal queries, sequence ranges, and filtering.

from angzarr_client import DomainClient
client = DomainClient.connect("localhost:1310")
# Query all events for an aggregate
events = client.query("orders", root_id).get_event_book()

Reconstruct state as it existed at a specific point in time:

# As of a specific sequence number
events = client.query("orders", root_id) \
.as_of_sequence(10) \
.get_event_book()
# As of a specific timestamp (RFC3339)
events = client.query("orders", root_id) \
.as_of_time("2024-01-15T10:30:00Z") \
.get_event_book()

Fetch a subset of events for pagination or incremental sync:

# Events from sequence 5 onwards
events = client.query("orders", root_id) \
.range(5) \
.get_event_book()
# Events in range [5, 15]
events = client.query("orders", root_id) \
.range_to(5, 15) \
.get_event_book()

Fetch events across aggregates that share a correlation ID (workflow tracking):

# Query by correlation ID
events = client.query_domain("orders") \
.by_correlation_id("corr-456") \
.get_event_book()

MethodDescription
with_correlation_id(id)Set correlation ID for distributed tracing
with_sequence(seq)Set expected sequence for optimistic locking
with_command(type_url, msg)Set the command payload
build()Build the CommandBook without executing
execute()Build and execute the command
MethodDescription
by_correlation_id(id)Query by correlation ID across aggregates
with_edition(edition)Filter by schema edition
range(lower)Events from sequence lower onwards
range_to(lower, upper)Events in sequence range
as_of_sequence(seq)Temporal query at sequence
as_of_time(rfc3339)Temporal query at timestamp
build()Build the Query without executing
get_event_book()Execute and return EventBook
get_pages()Execute and return just the pages