Skip to main content

angzarr-client-go

Go client library for Angzarr event-sourcing services.

Unified Documentation

For cross-language API reference with side-by-side comparisons, see the SDK Documentation.

Installation

go get github.com/benjaminabbitt/angzarr/client/go

Usage

Sending Commands

Contract

Feature: AggregateClient - Command Execution
The AggregateClient sends commands to aggregates for processing.
Commands are validated, processed, and result in events being persisted.
Supports async (fire-and-forget), sync, and speculative modes.

Without command execution, the system cannot accept user actions or
change aggregate state.

Source: aggregate_client.feature

package main

import (
"context"
"log"

"github.com/google/uuid"
angzarr "github.com/benjaminabbitt/angzarr/client/go"
pb "github.com/benjaminabbitt/angzarr/client/go/proto/angzarr"
)

func main() {
// Connect to aggregate coordinator
client, err := angzarr.NewAggregateClient("localhost:1310")
if err != nil {
log.Fatal(err)
}
defer client.Close()

// Send a command to create a new aggregate
resp, err := client.CommandNew("orders").
WithCorrelationID("order-123").
WithCommand("type.googleapis.com/examples.CreateOrder", &CreateOrderCommand{
CustomerId: "customer-456",
}).
Execute(context.Background())
if err != nil {
log.Fatal(err)
}

// Get the new aggregate root ID
rootID := angzarr.RootUUID(resp.Events)
log.Printf("Created order: %s", rootID)
}

Querying Events

Contract

Feature: QueryClient - Event Retrieval
The QueryClient provides read access to aggregate event histories.
It supports various query modes: full history, range queries, temporal
queries, and correlation-based queries across aggregates.

Without query access, clients cannot reconstruct aggregate state,
catch up projectors, or debug event flows.

Source: query_client.feature

// Connect to query service
queryClient, err := angzarr.NewQueryClient("localhost:1310")
if err != nil {
log.Fatal(err)
}
defer queryClient.Close()

// Query events for an aggregate
rootID := uuid.MustParse("...")
events, err := queryClient.Query("orders", rootID).
GetEventBook(context.Background())
if err != nil {
log.Fatal(err)
}

// Iterate over events
for _, page := range events.Pages {
log.Printf("Event %d: %s", angzarr.SequenceNum(page), angzarr.TypeNameFromURL(page.Event.TypeUrl))
}

Using Environment Variables

// Connect using environment variable with fallback
client, err := angzarr.AggregateClientFromEnv("ANGZARR_ENDPOINT", "localhost:1310")

Temporal Queries

// Query state as of a specific sequence
events, err := queryClient.Query("orders", rootID).
AsOfSequence(10).
GetEventBook(ctx)

// Query state as of a specific time
events, err := queryClient.Query("orders", rootID).
AsOfTime("2024-01-15T10:30:00Z").
GetEventBook(ctx)

// Query a range of sequences
events, err := queryClient.Query("orders", rootID).
RangeTo(5, 15).
GetEventBook(ctx)

Error Handling

Contract

Feature: Error Handling - Client Error Introspection
Client errors provide structured information for retry logic,
user feedback, and debugging. Errors are categorized by type
(connection, validation, business rule) with introspection methods.

Proper error handling enables:
- Automatic retry on transient failures
- User-friendly error messages
- Optimistic concurrency conflict resolution
- Debugging and logging

Source: error_handling.feature

resp, err := client.Command("orders", rootID).
WithSequence(5).
WithCommand(typeURL, cmd).
Execute(ctx)

if err != nil {
if clientErr := angzarr.AsClientError(err); clientErr != nil {
if clientErr.IsNotFound() {
// Aggregate doesn't exist
} else if clientErr.IsPreconditionFailed() {
// Sequence mismatch (optimistic locking failure)
} else if clientErr.IsConnectionError() {
// Network/transport error
}
}
}

Speculative Execution

Test commands without persisting to the event store:

// Connect to speculative client
specClient, err := angzarr.NewSpeculativeClient("localhost:1310")
if err != nil {
log.Fatal(err)
}
defer specClient.Close()

// Build speculative request with temporal state
request := &pb.SpeculateAggregateRequest{
Command: commandBook,
Events: priorEvents,
}

// Execute without persistence
response, err := specClient.Aggregate(ctx, request)
if err != nil {
log.Fatal(err)
}

// Inspect projected events
for _, page := range response.Events.Pages {
log.Printf("Would produce: %s", page.Event.TypeUrl)
}

Client Types

ClientPurpose
QueryClientQuery events from aggregates
AggregateClientSend commands to aggregates
SpeculativeClientDry-run commands, test projectors/sagas
DomainClientCombined query + aggregate for a domain
ClientFull client with all capabilities

Helper Functions

// UUID conversion
protoUUID := angzarr.UUIDToProto(uuid)
uuid, err := angzarr.ProtoToUUID(protoUUID)

// Type URL helpers
typeURL := angzarr.TypeURL("examples.CreateOrder") // "type.googleapis.com/examples.CreateOrder"
typeName := angzarr.TypeNameFromURL(typeURL) // "CreateOrder"

// Cover accessors
domain := angzarr.Domain(eventBook)
correlationID := angzarr.CorrelationID(eventBook)
rootUUID := angzarr.RootUUID(eventBook)

// Sequence helpers
nextSeq := angzarr.NextSequence(eventBook)

License

AGPL-3.0-only