Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 2 additions & 3 deletions cmd/decree/config_integration_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -55,11 +55,10 @@ func (f *fakeConfigTransport) SetFields(_ context.Context, req *configclient.Set
func buildClients(t *testing.T, fields ...adminclient.Field) (*adminclient.Client, *configclient.Client, *fakeConfigTransport) {
t.Helper()
admin := adminclient.New(
&fakeSchemaTransport{
adminclient.WithSchemaTransport(&fakeSchemaTransport{
tenant: &adminclient.Tenant{ID: "t1", SchemaID: "s1", SchemaVersion: 1},
schema: &adminclient.Schema{ID: "s1", Version: 1, Fields: fields},
},
nil, nil, nil,
}),
)
tr := &fakeConfigTransport{}
cfg := configclient.New(tr)
Expand Down
18 changes: 7 additions & 11 deletions cmd/server/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -244,17 +244,13 @@ func run() int {
logger.InfoContext(ctx, "schema service enabled")
}
if srv.IsServiceEnabled("config") {
configSvc := config.NewService(config.ServiceConfig{
Store: configStore,
Cache: configCache,
Publisher: publisher,
Subscriber: subscriber,
Logger: logger,
CacheMetrics: cacheMetrics,
Metrics: configMetrics,
Validators: validatorFactory,
Recorder: recorder,
})
configSvc := config.NewService(configStore, configCache, publisher, subscriber,
config.WithLogger(logger),
config.WithCacheMetrics(cacheMetrics),
config.WithMetrics(configMetrics),
config.WithValidators(validatorFactory),
config.WithRecorder(recorder),
)
pb.RegisterConfigServiceServer(srv.GRPCServer(), configSvc)
srv.SetServiceHealthy("centralconfig.v1.ConfigService")
logger.InfoContext(ctx, "config service enabled")
Expand Down
74 changes: 52 additions & 22 deletions internal/config/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,17 +35,41 @@ func (e *dependentRequiredError) Unwrap() error { return e.err }

const defaultCacheTTL = 5 * time.Minute

// ServiceConfig holds dependencies for the ConfigService.
type ServiceConfig struct {
Store Store
Cache cache.ConfigCache
Publisher pubsub.Publisher
Subscriber pubsub.Subscriber
Logger *slog.Logger
CacheMetrics *telemetry.CacheMetrics
Metrics *telemetry.ConfigMetrics
Validators *validation.ValidatorFactory
Recorder *audit.UsageRecorder
// Option configures a Service.
type Option func(*serviceOptions)

type serviceOptions struct {
logger *slog.Logger
cacheMetrics *telemetry.CacheMetrics
metrics *telemetry.ConfigMetrics
validators *validation.ValidatorFactory
recorder *audit.UsageRecorder
}

// WithLogger sets the service logger. Defaults to slog.Default() when unset.
func WithLogger(l *slog.Logger) Option {
return func(o *serviceOptions) { o.logger = l }
}

// WithCacheMetrics wires cache hit/miss metrics. Nil disables them.
func WithCacheMetrics(m *telemetry.CacheMetrics) Option {
return func(o *serviceOptions) { o.cacheMetrics = m }
}

// WithMetrics wires write/version metrics. Nil disables them.
func WithMetrics(m *telemetry.ConfigMetrics) Option {
return func(o *serviceOptions) { o.metrics = m }
}

// WithValidators wires the schema validator factory. Nil disables per-field
// validation and dependentRequired checks.
func WithValidators(v *validation.ValidatorFactory) Option {
return func(o *serviceOptions) { o.validators = v }
}

// WithRecorder wires an audit usage recorder. Nil disables read tracking.
func WithRecorder(r *audit.UsageRecorder) Option {
return func(o *serviceOptions) { o.recorder = r }
}

// Service implements the ConfigService gRPC server.
Expand All @@ -62,18 +86,24 @@ type Service struct {
recorder *audit.UsageRecorder
}

// NewService creates a new ConfigService.
func NewService(cfg ServiceConfig) *Service {
// NewService creates a new ConfigService. The four required dependencies
// (store, cache, publisher, subscriber) are positional; everything else is
// optional and may be passed via With...() options.
func NewService(store Store, cache cache.ConfigCache, publisher pubsub.Publisher, subscriber pubsub.Subscriber, opts ...Option) *Service {
o := serviceOptions{logger: slog.Default()}
for _, opt := range opts {
opt(&o)
}
return &Service{
store: cfg.Store,
cache: cfg.Cache,
publisher: cfg.Publisher,
subscriber: cfg.Subscriber,
logger: cfg.Logger,
cacheMetrics: cfg.CacheMetrics,
metrics: cfg.Metrics,
validators: cfg.Validators,
recorder: cfg.Recorder,
store: store,
cache: cache,
publisher: publisher,
subscriber: subscriber,
logger: o.logger,
cacheMetrics: o.cacheMetrics,
metrics: o.metrics,
validators: o.validators,
recorder: o.recorder,
}
}

Expand Down
62 changes: 23 additions & 39 deletions internal/config/service_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,13 +36,9 @@ func newTestService() (*Service, *mockStore, *mockCache, *mockPublisher) {
c := &mockCache{}
pub := &mockPublisher{}
sub := &mockSubscriber{}
svc := NewService(ServiceConfig{
Store: store,
Cache: c,
Publisher: pub,
Subscriber: sub,
Logger: testLogger,
})
svc := NewService(store, c, pub, sub,
WithLogger(testLogger),
)
return svc, store, c, pub
}

Expand All @@ -52,14 +48,10 @@ func newTestServiceWithValidation() (*Service, *mockStore) {
pub := &mockPublisher{}
sub := &mockSubscriber{}
vf := validation.NewValidatorFactory(store)
svc := NewService(ServiceConfig{
Store: store,
Cache: c,
Publisher: pub,
Subscriber: sub,
Logger: testLogger,
Validators: vf,
})
svc := NewService(store, c, pub, sub,
WithLogger(testLogger),
WithValidators(vf),
)
return svc, store
}

Expand Down Expand Up @@ -552,12 +544,10 @@ func TestGetField_RecordsUsage(t *testing.T) {
audit.WithFlushInterval(time.Hour),
audit.WithLogger(testLogger),
)
svc := NewService(ServiceConfig{
Store: store,
Cache: c,
Logger: testLogger,
Recorder: recorder,
})
svc := NewService(store, c, nil, nil,
WithLogger(testLogger),
WithRecorder(recorder),
)
ctx := context.Background()

store.On("GetLatestConfigVersion", ctx, tenantID1).
Expand Down Expand Up @@ -587,12 +577,10 @@ func TestGetConfig_RecordsUsage(t *testing.T) {
audit.WithFlushInterval(time.Hour),
audit.WithLogger(testLogger),
)
svc := NewService(ServiceConfig{
Store: store,
Cache: c,
Logger: testLogger,
Recorder: recorder,
})
svc := NewService(store, c, nil, nil,
WithLogger(testLogger),
WithRecorder(recorder),
)
ctx := context.Background()

store.On("GetLatestConfigVersion", ctx, tenantID1).
Expand Down Expand Up @@ -630,12 +618,10 @@ func TestGetFields_RecordsUsage(t *testing.T) {
audit.WithFlushInterval(time.Hour),
audit.WithLogger(testLogger),
)
svc := NewService(ServiceConfig{
Store: store,
Cache: c,
Logger: testLogger,
Recorder: recorder,
})
svc := NewService(store, c, nil, nil,
WithLogger(testLogger),
WithRecorder(recorder),
)
ctx := context.Background()

store.On("GetLatestConfigVersion", ctx, tenantID1).
Expand Down Expand Up @@ -669,12 +655,10 @@ func TestGetConfig_CacheHit_RecordsUsage(t *testing.T) {
audit.WithFlushInterval(time.Hour),
audit.WithLogger(testLogger),
)
svc := NewService(ServiceConfig{
Store: store,
Cache: c,
Logger: testLogger,
Recorder: recorder,
})
svc := NewService(store, c, nil, nil,
WithLogger(testLogger),
WithRecorder(recorder),
)
ctx := context.Background()

store.On("GetLatestConfigVersion", ctx, tenantID1).
Expand Down
16 changes: 6 additions & 10 deletions internal/server/memory_integration_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -54,16 +54,12 @@ func TestMemoryBackend_Integration(t *testing.T) {
schemaSvc := schema.NewService(memSchema, slog.Default(), telemetry.NewSchemaMetrics(telemetry.Config{}), validatorFactory)
pb.RegisterSchemaServiceServer(srv.GRPCServer(), schemaSvc)

configSvc := config.NewService(config.ServiceConfig{
Store: memConfig,
Cache: cache.NewMemoryCache(0),
Publisher: memPubSub,
Subscriber: memPubSub,
Logger: slog.Default(),
CacheMetrics: telemetry.NewCacheMetrics(telemetry.Config{}),
Metrics: telemetry.NewConfigMetrics(telemetry.Config{}),
Validators: validatorFactory,
})
configSvc := config.NewService(memConfig, cache.NewMemoryCache(0), memPubSub, memPubSub,
config.WithLogger(slog.Default()),
config.WithCacheMetrics(telemetry.NewCacheMetrics(telemetry.Config{})),
config.WithMetrics(telemetry.NewConfigMetrics(telemetry.Config{})),
config.WithValidators(validatorFactory),
)
pb.RegisterConfigServiceServer(srv.GRPCServer(), configSvc)

auditSvc := audit.NewService(audit.NewMemoryStore(), slog.Default(), nil)
Expand Down
22 changes: 16 additions & 6 deletions sdk/adminclient/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,17 +16,27 @@ type Client struct {
server ServerTransport
}

// New creates a new admin client using the given transport implementations.
// Any of the transports may be nil if that service is not needed;
// methods for a nil service will return [ErrServiceNotConfigured].
// New creates a new admin client. Pass [WithSchemaTransport],
// [WithConfigTransport], [WithAuditTransport], and [WithServerTransport] for
// the services you need. Methods on services without a wired transport return
// [ErrServiceNotConfigured].
//
// Example (with grpctransport):
//
// client := grpctransport.NewAdminClient(conn, grpctransport.WithSubject("admin"))
//
// Example (with explicit transports):
//
// client := adminclient.New(schemaTransport, configTransport, auditTransport, serverTransport)
func New(schema SchemaTransport, config ConfigTransport, audit AuditTransport, server ServerTransport) *Client {
return &Client{schema: schema, config: config, audit: audit, server: server}
// client := adminclient.New(
// adminclient.WithSchemaTransport(schemaTransport),
// adminclient.WithConfigTransport(configTransport),
// adminclient.WithAuditTransport(auditTransport),
// adminclient.WithServerTransport(serverTransport),
// )
func New(opts ...Option) *Client {
o := clientOptions{}
for _, opt := range opts {
opt(&o)
}
return &Client{schema: o.schema, config: o.config, audit: o.audit, server: o.server}
}
Loading
Loading