Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
184 changes: 79 additions & 105 deletions internal/server/spanner/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,16 +19,13 @@ import (
"context"
"fmt"
"log/slog"
"sync"
"sync/atomic"

"cloud.google.com/go/spanner"
pb "github.com/datacommonsorg/mixer/internal/proto"
pbv1 "github.com/datacommonsorg/mixer/internal/proto/v1"
v2 "github.com/datacommonsorg/mixer/internal/server/v2"
"github.com/datacommonsorg/mixer/internal/translator/types"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/status"
"github.com/datacommonsorg/mixer/internal/util"
"google.golang.org/grpc/metadata"
"gopkg.in/yaml.v3"
)

Expand Down Expand Up @@ -58,73 +55,87 @@ type SpannerClient interface {
Close()
}

// spannerDatabaseClient encapsulates the Spanner client that directly interacts with the Spanner database.
type spannerDatabaseClient struct {
client *spanner.Client
timestamp atomic.Int64
ticker Ticker
stopCh chan struct{}
startOnce sync.Once
stopOnce sync.Once
wg sync.WaitGroup

// For mocking in tests.
updateTimestamp func(context.Context) error
}

// newSpannerDatabaseClient creates a new spannerDatabaseClient.
func newSpannerDatabaseClient(client *spanner.Client) (*spannerDatabaseClient, error) {
sc := &spannerDatabaseClient{
client: client,
}

// Set an initial timestamp synchronously before starting the background loop.
sc.ticker = NewTimestampTicker()
sc.stopCh = make(chan struct{})
sc.updateTimestamp = sc.fetchAndUpdateTimestamp
if err := sc.updateTimestamp(context.Background()); err != nil {
slog.Error("Error initializing Spanner staleness timestamp", "error", err.Error())
return nil, err
}
return sc, nil
// selectorClient dispatches calls to either default or normalized client based on request headers.
// It serves as the main entry point for the Spanner client, centralizing routing concerns.
//
// DESIGN NOTE: This client embeds the standard client (SpannerClient) to handle automatic
// fallback for methods that do not have a specialized normalized implementation.
// For methods that DO have a specialized implementation (like GetObservations), it explicitly
// checks the header and routes accordingly. It does NOT rely on the normalized client's
// internal fallback for general request routing, ensuring that the standard path remains
// the explicit default.
type selectorClient struct {
SpannerClient // Embeds the standard client as the default client
normalized SpannerClient
}

// NewRawSpannerClient creates a new SpannerClient without the schema selector.
// This is intended for testing and internal use where a direct client is needed.
func NewRawSpannerClient(ctx context.Context, spannerConfigYaml, databaseOverride string) (SpannerClient, error) {
// NewSpannerClient creates a new SpannerClient from the config yaml string and an optional database override.
// It returns a wrapper client that handles request-time schema dispatching.
func NewSpannerClient(ctx context.Context, spannerConfigYaml, databaseOverride string) (SpannerClient, error) {
cfg, err := createSpannerConfig(spannerConfigYaml, databaseOverride)
if err != nil {
return nil, fmt.Errorf("failed to create spannerDatabaseClient: %w", err)
return nil, err
}
client, err := createSpannerClient(ctx, cfg)
exec, err := NewSpannerConnector(ctx, cfg)
if err != nil {
return nil, fmt.Errorf("failed to create spannerDatabaseClient: %w", err)
return nil, err
}
return newSpannerDatabaseClient(client)

defaultClient := newStandardSpannerClient(exec)
normalizedClient := NewNormalizedClient(defaultClient)

return &selectorClient{
SpannerClient: defaultClient,
normalized: normalizedClient,
}, nil
}

// NewSpannerClient creates a new SpannerClient from the config yaml string and an optional database override.
// It returns a wrapper client that handles request-time schema dispatching.
func NewSpannerClient(ctx context.Context, spannerConfigYaml, databaseOverride string) (SpannerClient, error) {
rawClient, err := NewRawSpannerClient(ctx, spannerConfigYaml, databaseOverride)
if err != nil {
return nil, err


// GetObservations overrides the embedded client's GetObservations to dispatch based on schema selection.
func (s *selectorClient) GetObservations(ctx context.Context, variables []string, entities []string) ([]*Observation, error) {
if useNormalizedSchema(ctx) {
logNormalizedInvocation("GetObservations",
"num_variables", len(variables),
"num_entities", len(entities),
)
return s.normalized.GetObservations(ctx, variables, entities)
}
return NewSchemaSelectorClient(rawClient)
return s.SpannerClient.GetObservations(ctx, variables, entities)
}

// createSpannerClient creates the database name string and initializes the Spanner client.
func createSpannerClient(ctx context.Context, cfg *SpannerConfig) (*spanner.Client, error) {
// Construct the database name string
databaseName := fmt.Sprintf("projects/%s/instances/%s/databases/%s", cfg.Project, cfg.Instance, cfg.Database)
// CheckVariableExistence overrides the embedded client's CheckVariableExistence to dispatch based on schema selection.
func (s *selectorClient) CheckVariableExistence(ctx context.Context, variables []string, entities []string) ([][]string, error) {
if useNormalizedSchema(ctx) {
logNormalizedInvocation("CheckVariableExistence",
"num_variables", len(variables),
"num_entities", len(entities),
)
return s.normalized.CheckVariableExistence(ctx, variables, entities)
}
return s.SpannerClient.CheckVariableExistence(ctx, variables, entities)
}

// Create the Spanner client
client, err := spanner.NewClient(ctx, databaseName)
if err != nil {
return nil, fmt.Errorf("failed to create Spanner client: %w", err)
// GetObservationsContainedInPlace overrides the embedded client's GetObservationsContainedInPlace to dispatch based on schema selection.
func (s *selectorClient) GetObservationsContainedInPlace(ctx context.Context, variables []string, containedInPlace *v2.ContainedInPlace) ([]*Observation, error) {
if useNormalizedSchema(ctx) {
logNormalizedInvocation("GetObservationsContainedInPlace",
"num_variables", len(variables),
"ancestor", containedInPlace.Ancestor,
"child_place_type", containedInPlace.ChildPlaceType,
)
return s.normalized.GetObservationsContainedInPlace(ctx, variables, containedInPlace)
}
return s.SpannerClient.GetObservationsContainedInPlace(ctx, variables, containedInPlace)
}

return client, nil
// GetSdmxObservations overrides the embedded client's GetSdmxObservations.
// SDMX is only supported on the normalized schema, so it always delegates to the normalized client.
func (s *selectorClient) GetSdmxObservations(ctx context.Context, req *pb.SdmxDataQuery) (*pb.SdmxDataResult, error) {
logNormalizedInvocation("GetSdmxObservations",
"query", req,
)
return s.normalized.GetSdmxObservations(ctx, req)
}

// createSpannerConfig creates the config from the specific yaml string and an optional database override.
Expand All @@ -145,56 +156,19 @@ func createSpannerConfig(spannerConfigYaml, databaseOverride string) (*SpannerCo
return &cfg, nil
}

func (sc *spannerDatabaseClient) Id() string {
return sc.client.DatabaseName()
// useNormalizedSchema checks whether to use the normalized Spanner schema based on request header.
func useNormalizedSchema(ctx context.Context) bool {
if md, ok := metadata.FromIncomingContext(ctx); ok {
headers := md.Get(util.XUseNormalizedSchema)
return len(headers) > 0 && headers[0] == "true"
}
return false
}

// Start starts the background goroutine to periodically fetch the timestamp.
func (sc *spannerDatabaseClient) Start() {
sc.startOnce.Do(func() {
ctx, cancel := context.WithCancel(context.Background())

sc.wg.Add(1)
go func() {
// Defer statements are processed in LIFO order.
// Mark the wait group as done.
defer sc.wg.Done()
// Cancel the context to clean up any in-flight operations.
defer cancel()
// Stop the ticker.
defer sc.ticker.Stop()

for {
select {
case <-sc.stopCh:
return
case <-sc.ticker.C():
// Ignore the error here to allow the process to continue running
// even if one fetch fails. The previous timestamp remains in cache.
err := sc.updateTimestamp(ctx)
if err != nil {
slog.Error("Error updating Spanner staleness timestamp", "error", err)
}
}
}
}()
})
// logNormalizedInvocation logs that the normalized schema was invoked for a method with custom arguments.
func logNormalizedInvocation(methodName string, args ...any) {
fullArgs := append([]any{"method", methodName}, args...)
slog.Info("Invoking normalized Spanner schema", fullArgs...)
}

// Close closes the Spanner client and stops the background goroutine.
func (sc *spannerDatabaseClient) Close() {
sc.stopOnce.Do(func() {
close(sc.stopCh)

sc.wg.Wait()

if sc.client != nil {
sc.client.Close()
}
})
}

// GetSdmxObservations is not supported on the default client.
func (sc *spannerDatabaseClient) GetSdmxObservations(ctx context.Context, req *pb.SdmxDataQuery) (*pb.SdmxDataResult, error) {
return nil, status.Error(codes.Unimplemented, "SDMX queries are only supported on the normalized schema")
}
38 changes: 0 additions & 38 deletions internal/server/spanner/client_normalized.go

This file was deleted.

122 changes: 0 additions & 122 deletions internal/server/spanner/client_selector.go

This file was deleted.

Loading
Loading