diff --git a/go.mod b/go.mod index b99f76110e..faf624de2b 100644 --- a/go.mod +++ b/go.mod @@ -37,7 +37,7 @@ require ( github.com/scylladb/go-reflectx v1.0.1 github.com/shopspring/decimal v1.4.0 github.com/smartcontractkit/chain-selectors v1.0.67 - github.com/smartcontractkit/chainlink-common/pkg/chipingress v0.0.4 + github.com/smartcontractkit/chainlink-common/pkg/chipingress v0.0.6 github.com/smartcontractkit/chainlink-protos/billing/go v0.0.0-20250722225531-876fd6b94976 github.com/smartcontractkit/chainlink-protos/cre/go v0.0.0-20250911124514-5874cc6d62b2 github.com/smartcontractkit/chainlink-protos/linking-service/go v0.0.0-20251002192024-d2ad9222409b diff --git a/go.sum b/go.sum index 26c037d067..783baab6da 100644 --- a/go.sum +++ b/go.sum @@ -326,8 +326,8 @@ github.com/sirupsen/logrus v1.4.1/go.mod h1:ni0Sbl8bgC9z8RoU9G6nDWqqs/fq4eDPysMB github.com/sirupsen/logrus v1.4.2/go.mod h1:tLMulIdttU9McNUspp0xgXVQah82FyeX6MwdIuYE2rE= github.com/smartcontractkit/chain-selectors v1.0.67 h1:gxTqP/JC40KDe3DE1SIsIKSTKTZEPyEU1YufO1admnw= github.com/smartcontractkit/chain-selectors v1.0.67/go.mod h1:xsKM0aN3YGcQKTPRPDDtPx2l4mlTN1Djmg0VVXV40b8= -github.com/smartcontractkit/chainlink-common/pkg/chipingress v0.0.4 h1:hvqATtrZ0iMRTI80cpBot/3JFbjz2j+2tvpfooVhRHw= -github.com/smartcontractkit/chainlink-common/pkg/chipingress v0.0.4/go.mod h1:eKGyfTKzr0/PeR7qKN4l2FcW9p+HzyKUwAfGhm/5YZc= +github.com/smartcontractkit/chainlink-common/pkg/chipingress v0.0.6 h1:INTd6uKc/QO11B0Vx7Ze17xgW3bqYbWuQcBQa9ixicQ= +github.com/smartcontractkit/chainlink-common/pkg/chipingress v0.0.6/go.mod h1:eKGyfTKzr0/PeR7qKN4l2FcW9p+HzyKUwAfGhm/5YZc= github.com/smartcontractkit/chainlink-protos/billing/go v0.0.0-20250722225531-876fd6b94976 h1:mF3FiDUoV0QbJcks9R2y7ydqntNL1Z0VCPBJgx/Ms+0= github.com/smartcontractkit/chainlink-protos/billing/go v0.0.0-20250722225531-876fd6b94976/go.mod h1:HHGeDUpAsPa0pmOx7wrByCitjQ0mbUxf0R9v+g67uCA= github.com/smartcontractkit/chainlink-protos/cre/go v0.0.0-20250911124514-5874cc6d62b2 h1:1/KdO5AbUr3CmpLjMPuJXPo2wHMbfB8mldKLsg7D4M8= diff --git a/pkg/beholder/auth.go b/pkg/beholder/auth.go index f2a0012622..640d0fbe25 100644 --- a/pkg/beholder/auth.go +++ b/pkg/beholder/auth.go @@ -1,12 +1,20 @@ package beholder import ( + "context" "crypto" "crypto/ed25519" "crypto/rand" + "encoding/binary" "fmt" + "maps" + "sync" + "sync/atomic" + "time" "github.com/smartcontractkit/chainlink-common/pkg/chipingress" + "google.golang.org/grpc" + "google.golang.org/grpc/credentials" ) // authHeaderKey is the name of the header that the node authenticator will use to send the auth token @@ -14,17 +22,136 @@ var authHeaderKey = "X-Beholder-Node-Auth-Token" // authHeaderVersion is the version of the auth header format var authHeaderVersion = "1" +var authHeaderV2 = "2" -type staticAuthHeaderProvider struct { - headers map[string]string +type HeaderProvider interface { + Headers(ctx context.Context) (map[string]string, error) } -func (p *staticAuthHeaderProvider) GetHeaders() map[string]string { - return p.headers +type PerRPCCredentialsProvider interface { + Credentials() credentials.PerRPCCredentials } +type Auth interface { + PerRPCCredentialsProvider + HeaderProvider +} + +type Signer interface { + Sign(ctx context.Context, keyID []byte, data []byte) ([]byte, error) +} + +type staticAuth struct { + headers map[string]string + requireTransportSecurity bool +} + +func (p *staticAuth) Headers(_ context.Context) (map[string]string, error) { + return p.headers, nil +} + +func (p *staticAuth) Credentials() credentials.PerRPCCredentials { + return p +} + +func (p *staticAuth) GetRequestMetadata(ctx context.Context, _ ...string) (map[string]string, error) { + return p.Headers(ctx) +} + +func (p *staticAuth) RequireTransportSecurity() bool { + return p.requireTransportSecurity +} + +func NewStaticAuth(headers map[string]string, requireTransportSecurity bool) Auth { + return &staticAuth{headers, requireTransportSecurity} +} + +// Deprecated: use NewStaticAuth instead func NewStaticAuthHeaderProvider(headers map[string]string) chipingress.HeaderProvider { - return &staticAuthHeaderProvider{headers: headers} + return &staticAuth{headers: headers} +} + +type rotatingAuth struct { + csaPubKey ed25519.PublicKey + signer Signer + signerTimeout time.Duration + headers atomic.Value // stores map[string]string + ttl time.Duration + lastUpdatedNanos atomic.Int64 + requireTransportSecurity bool + mu sync.Mutex +} + +func NewRotatingAuth(csaPubKey ed25519.PublicKey, signer Signer, ttl time.Duration, requireTransportSecurity bool) Auth { + r := &rotatingAuth{ + csaPubKey: csaPubKey, + signer: signer, + signerTimeout: time.Second * 5, + ttl: ttl, + lastUpdatedNanos: atomic.Int64{}, + requireTransportSecurity: requireTransportSecurity, + } + r.headers.Store(make(map[string]string)) + return r +} + +func (r *rotatingAuth) Headers(ctx context.Context) (map[string]string, error) { + + // Return a copy of the headers to avoid concurrent read/write to the map by callers + returnHeader := make(map[string]string) + lastUpdated := time.Unix(0, r.lastUpdatedNanos.Load()) + + if time.Since(lastUpdated) > r.ttl { + + r.mu.Lock() + defer r.mu.Unlock() + + // Multiple concurrent calls (after the first) will block waiting for the lock. + // First will get the lock and update headers + lastUpdated, double check since potentially another goroutine has already + // updated the headers and lastUpdated while waiting for the lock. + lastUpdated = time.Unix(0, r.lastUpdatedNanos.Load()) + if time.Since(lastUpdated) < r.ttl { + maps.Copy(returnHeader, r.headers.Load().(map[string]string)) + return returnHeader, nil + } + + // Append the bytes of the public key with bytes of the timestamp to create the message to sign + ts := time.Now() + tsBytes := make([]byte, 8) + binary.BigEndian.PutUint64(tsBytes, uint64(ts.UnixNano())) + msgBytes := append(r.csaPubKey, tsBytes...) + + ctxWithTimeout, cancel := context.WithTimeout(ctx, r.signerTimeout) + defer cancel() + + // Sign(public key bytes + timestamp bytes) + signature, err := r.signer.Sign(ctxWithTimeout, r.csaPubKey, msgBytes) + if err != nil { + return nil, fmt.Errorf("beholder: failed to sign auth header: %w", err) + } + + newHeaders := make(map[string]string) + newHeaders[authHeaderKey] = fmt.Sprintf("%s:%x:%d:%x", authHeaderV2, r.csaPubKey, ts.UnixNano(), signature) + + r.headers.Store(newHeaders) + r.lastUpdatedNanos.Store(ts.UnixNano()) + } + + maps.Copy(returnHeader, r.headers.Load().(map[string]string)) + + return returnHeader, nil +} + +func (a *rotatingAuth) Credentials() credentials.PerRPCCredentials { + return a +} + +func (a *rotatingAuth) GetRequestMetadata(ctx context.Context, _ ...string) (map[string]string, error) { + return a.Headers(ctx) +} + +func (a *rotatingAuth) RequireTransportSecurity() bool { + return a.requireTransportSecurity } // BuildAuthHeaders creates the auth header value to be included on requests. @@ -58,3 +185,7 @@ func NewAuthHeaders(ed25519Signer crypto.Signer) (map[string]string, error) { return map[string]string{authHeaderKey: headerValue}, nil } + +func authDialOpt(auth PerRPCCredentialsProvider) grpc.DialOption { + return grpc.WithPerRPCCredentials(auth.Credentials()) +} diff --git a/pkg/beholder/auth_test.go b/pkg/beholder/auth_test.go index 3de3d4628b..a357a716c4 100644 --- a/pkg/beholder/auth_test.go +++ b/pkg/beholder/auth_test.go @@ -1,11 +1,15 @@ package beholder_test import ( + "context" "crypto/ed25519" "encoding/hex" + "strings" "testing" + "time" "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/mock" "github.com/stretchr/testify/require" "github.com/smartcontractkit/chainlink-common/pkg/beholder" @@ -37,9 +41,304 @@ func TestStaticAuthHeaderProvider(t *testing.T) { } // Create new header provider - provider := beholder.NewStaticAuthHeaderProvider(testHeaders) + provider := beholder.NewStaticAuth(testHeaders, false) // Get headers and verify they match - headers := provider.GetHeaders() + headers, err := provider.Headers(t.Context()) + require.NoError(t, err) assert.Equal(t, testHeaders, headers) } + +// MockSigner implements the beholder.Signer interface for testing rotating auth +type MockSigner struct { + mock.Mock +} + +func (m *MockSigner) Sign(ctx context.Context, keyID []byte, data []byte) ([]byte, error) { + args := m.Called(ctx, keyID, data) + return args.Get(0).([]byte), args.Error(1) +} + +func TestRotatingAuth(t *testing.T) { + // Generate test key pair + pubKey, privKey, err := ed25519.GenerateKey(nil) + require.NoError(t, err) + + t.Run("creates valid rotating auth headers", func(t *testing.T) { + + mockSigner := &MockSigner{} + + dummySignature := ed25519.Sign(privKey, []byte("test data")) + + mockSigner. + On("Sign", mock.Anything, mock.MatchedBy(func(keyID []byte) bool { + return string(keyID) == string(pubKey) // Verify correct public key is passed + }), mock.Anything). + Return(dummySignature, nil) + + ttl := 5 * time.Minute + auth := beholder.NewRotatingAuth(pubKey, mockSigner, ttl, false) + + headers, err := auth.Headers(t.Context()) + require.NoError(t, err) + require.NotEmpty(t, headers) + + authHeader := headers["X-Beholder-Node-Auth-Token"] + require.NotEmpty(t, authHeader) + + parts := strings.Split(authHeader, ":") + require.Len(t, parts, 4, "Auth header should have format version:pubkey_hex:timestamp:signature_hex") + + assert.Equal(t, "2", parts[0], "Version should be 2") + assert.Equal(t, hex.EncodeToString(pubKey), parts[1], "Public key should match") + assert.NotEmpty(t, parts[2], "Timestamp should not be empty") + + // Verify signature is hex encoded + _, err = hex.DecodeString(parts[3]) + assert.NoError(t, err, "Signature should be valid hex") + + mockSigner.AssertExpectations(t) + }) + + t.Run("reuses headers within TTL", func(t *testing.T) { + + mockSigner := &MockSigner{} + + dummySignature := ed25519.Sign(privKey, []byte("test data")) + + mockSigner. + On("Sign", mock.Anything, mock.Anything, mock.Anything). + Return(dummySignature, nil). + Maybe() + + ttl := 5 * time.Minute + auth := beholder.NewRotatingAuth(pubKey, mockSigner, ttl, false) + + headers1, err := auth.Headers(t.Context()) + require.NoError(t, err) + + headers2, err := auth.Headers(t.Context()) + require.NoError(t, err) + + assert.Equal(t, headers1, headers2, "Headers should be reused within TTL") + + mockSigner.AssertExpectations(t) + }) + + t.Run("handles signer errors", func(t *testing.T) { + + mockSigner := &MockSigner{} + expectedErr := assert.AnError + + mockSigner. + On("Sign", mock.Anything, mock.Anything, mock.Anything). + Return([]byte{}, expectedErr) + + ttl := 5 * time.Minute + auth := beholder.NewRotatingAuth(pubKey, mockSigner, ttl, false) + + headers, err := auth.Headers(t.Context()) + require.Error(t, err) + assert.Nil(t, headers) + assert.Contains(t, err.Error(), "beholder: failed to sign auth header") + assert.Contains(t, err.Error(), expectedErr.Error()) + + mockSigner.AssertExpectations(t) + }) + + t.Run("implements PerRPCCredentialsProvider interface", func(t *testing.T) { + + mockSigner := &MockSigner{} + dummySignature := ed25519.Sign(privKey, []byte("test data")) + + mockSigner. + On("Sign", mock.Anything, mock.Anything, mock.Anything). + Return(dummySignature, nil). + Maybe() + + ttl := 5 * time.Minute + auth := beholder.NewRotatingAuth(pubKey, mockSigner, ttl, false) + + creds := auth.Credentials() + require.NotNil(t, creds) + + assert.False(t, creds.RequireTransportSecurity()) + + metadata, err := creds.GetRequestMetadata(t.Context()) + require.NoError(t, err) + assert.NotEmpty(t, metadata) + + mockSigner.AssertExpectations(t) + }) + + t.Run("respects transport security requirement", func(t *testing.T) { + + mockSigner := &MockSigner{} + dummySignature := ed25519.Sign(privKey, []byte("test data")) + + mockSigner. + On("Sign", mock.Anything, mock.Anything, mock.Anything). + Return(dummySignature, nil). + Maybe() + + ttl := 5 * time.Minute + // transport security required + authSecure := beholder.NewRotatingAuth(pubKey, mockSigner, ttl, true) + credsSecure := authSecure.Credentials() + assert.True(t, credsSecure.RequireTransportSecurity()) + // transport security not required + authInsecure := beholder.NewRotatingAuth(pubKey, mockSigner, ttl, false) + credsInsecure := authInsecure.Credentials() + assert.False(t, credsInsecure.RequireTransportSecurity()) + + mockSigner.AssertExpectations(t) + }) +} + +// BenchmarkRotatingAuth_Headers_CachedPath benchmarks the fast path where headers are cached and within TTL. +// This is the most common case in production. +func BenchmarkRotatingAuth_Headers_CachedPath(b *testing.B) { + + pubKey, privKey, err := ed25519.GenerateKey(nil) + require.NoError(b, err) + + mockSigner := &MockSigner{} + dummySignature := ed25519.Sign(privKey, []byte("test data")) + + mockSigner. + On("Sign", mock.Anything, mock.Anything, mock.Anything). + Return(dummySignature, nil). + Maybe() + + // Use a long TTL so headers don't expire during the benchmark + ttl := 1 * time.Hour + auth := beholder.NewRotatingAuth(pubKey, mockSigner, ttl, false) + + // Prime the cache by calling Headers once + ctx := b.Context() + _, err = auth.Headers(ctx) + require.NoError(b, err) + + b.ReportAllocs() + + for b.Loop() { + headers, err := auth.Headers(ctx) + if err != nil { + b.Fatal(err) + } + if len(headers) == 0 { + b.Fatal("expected non-empty headers") + } + } +} + +// BenchmarkRotatingAuth_Headers_ExpiredPath benchmarks the slow path where headers need to be regenerated. +// This happens when TTL expires. +func BenchmarkRotatingAuth_Headers_ExpiredPath(b *testing.B) { + + pubKey, privKey, err := ed25519.GenerateKey(nil) + require.NoError(b, err) + + mockSigner := &MockSigner{} + dummySignature := ed25519.Sign(privKey, []byte("test data")) + + mockSigner. + On("Sign", mock.Anything, mock.Anything, mock.Anything). + Return(dummySignature, nil). + Maybe() + + // Use a TTL of 0 to force regeneration on every call + ttl := 0 * time.Second + auth := beholder.NewRotatingAuth(pubKey, mockSigner, ttl, false) + + ctx := b.Context() + + b.ReportAllocs() + + for b.Loop() { + headers, err := auth.Headers(ctx) + if err != nil { + b.Fatal(err) + } + if len(headers) == 0 { + b.Fatal("expected non-empty headers") + } + } +} + +// BenchmarkRotatingAuth_Headers_ParallelCached benchmarks concurrent access when headers are cached. +// This simulates multiple goroutines making concurrent requests with valid cached headers. +func BenchmarkRotatingAuth_Headers_ParallelCached(b *testing.B) { + + pubKey, privKey, err := ed25519.GenerateKey(nil) + require.NoError(b, err) + + mockSigner := &MockSigner{} + dummySignature := ed25519.Sign(privKey, []byte("test data")) + + mockSigner. + On("Sign", mock.Anything, mock.Anything, mock.Anything). + Return(dummySignature, nil). + Maybe() + + // Use a long TTL so headers don't expire during the benchmark + ttl := 1 * time.Hour + auth := beholder.NewRotatingAuth(pubKey, mockSigner, ttl, false) + + // Prime the cache + ctx := b.Context() + _, err = auth.Headers(ctx) + require.NoError(b, err) + + b.ResetTimer() + b.ReportAllocs() + + b.RunParallel(func(pb *testing.PB) { + for pb.Next() { + headers, err := auth.Headers(ctx) + if err != nil { + b.Fatal(err) + } + if len(headers) == 0 { + b.Fatal("expected non-empty headers") + } + } + }) +} + +// BenchmarkRotatingAuth_Headers_ParallelExpired benchmarks concurrent access when headers expire. +// This tests contention on the mutex when multiple goroutines race to regenerate headers. +func BenchmarkRotatingAuth_Headers_ParallelExpired(b *testing.B) { + + pubKey, privKey, err := ed25519.GenerateKey(nil) + require.NoError(b, err) + + mockSigner := &MockSigner{} + dummySignature := ed25519.Sign(privKey, []byte("test data")) + + mockSigner. + On("Sign", mock.Anything, mock.Anything, mock.Anything). + Return(dummySignature, nil). + Maybe() + + // Use a short TTL to cause periodic regeneration + ttl := 10 * time.Millisecond + auth := beholder.NewRotatingAuth(pubKey, mockSigner, ttl, false) + + ctx := b.Context() + + b.ResetTimer() + b.ReportAllocs() + + b.RunParallel(func(pb *testing.PB) { + for pb.Next() { + headers, err := auth.Headers(ctx) + if err != nil { + b.Fatal(err) + } + if len(headers) == 0 { + b.Fatal("expected non-empty headers") + } + } + }) +} diff --git a/pkg/beholder/client.go b/pkg/beholder/client.go index f986e787cc..76ae28d52f 100644 --- a/pkg/beholder/client.go +++ b/pkg/beholder/client.go @@ -2,8 +2,11 @@ package beholder import ( "context" + "encoding/hex" "errors" "fmt" + "time" + "github.com/smartcontractkit/chainlink-common/pkg/chipingress" "go.opentelemetry.io/otel/attribute" "go.opentelemetry.io/otel/exporters/otlp/otlplog/otlploggrpc" @@ -91,11 +94,42 @@ func NewGRPCClient(cfg Config, otlploggrpcNew otlploggrpcFactory) (*Client, erro return nil, err } } + opts := []otlploggrpc.Option{ otlploggrpc.WithTLSCredentials(creds), otlploggrpc.WithEndpoint(cfg.OtelExporterGRPCEndpoint), - otlploggrpc.WithHeaders(cfg.AuthHeaders), } + // Initialize auth here for reuse with log, trace, and metric exporters + var auth Auth + if cfg.AuthKeySigner != nil { + + if cfg.AuthPublicKeyHex == "" { + return nil, fmt.Errorf("auth: public key hex required when signer is set") + } + // Clamp lowest possible value to 10mins + if cfg.AuthHeadersTTL < 10*time.Minute { + return nil, fmt.Errorf("auth: headers TTL must be at least 10 minutes") + } + + key, err := hex.DecodeString(cfg.AuthPublicKeyHex) + if err != nil { + return nil, fmt.Errorf("auth: failed to decode public key hex: %w", err) + } + + auth = NewRotatingAuth(key, cfg.AuthKeySigner, cfg.AuthHeadersTTL, !cfg.InsecureConnection) + } + // Log exporter auth + switch { + // Rotating auth + case auth != nil: + opts = append(opts, otlploggrpc.WithDialOption(authDialOpt(auth))) + // Static auth + case len(cfg.AuthHeaders) > 0: + opts = append(opts, otlploggrpc.WithHeaders(cfg.AuthHeaders)) + // No auth + default: + } + if cfg.LogRetryConfig != nil { // NOTE: By default, the retry is enabled in the OTel SDK opts = append(opts, otlploggrpc.WithRetry(otlploggrpc.RetryConfig{ @@ -157,14 +191,14 @@ func NewGRPCClient(cfg Config, otlploggrpcNew otlploggrpcFactory) (*Client, erro logger := loggerProvider.Logger(defaultPackageName) // Tracer - tracerProvider, err := newTracerProvider(cfg, baseResource, creds) + tracerProvider, err := newTracerProvider(cfg, baseResource, auth, creds) if err != nil { return nil, err } tracer := tracerProvider.Tracer(defaultPackageName) // Meter - meterProvider, err := newMeterProvider(cfg, baseResource, creds) + meterProvider, err := newMeterProvider(cfg, baseResource, auth, creds) if err != nil { return nil, err } @@ -220,25 +254,29 @@ func NewGRPCClient(cfg Config, otlploggrpcNew otlploggrpcFactory) (*Client, erro // if chip ingress is enabled, create dual source emitter that sends to both otel collector and chip ingress // eventually we will remove the dual source emitter and just use chip ingress if cfg.ChipIngressEmitterEnabled || cfg.ChipIngressEmitterGRPCEndpoint != "" { - chipIngressOpts := make([]chipingress.Opt, 0, 2) + + var opts []chipingress.Opt if cfg.ChipIngressInsecureConnection { - // Use insecure credentials when TLS is not required - chipIngressOpts = append(chipIngressOpts, chipingress.WithInsecureConnection()) + opts = append(opts, chipingress.WithInsecureConnection()) } else { - chipIngressOpts = append(chipIngressOpts, chipingress.WithTLS()) + opts = append(opts, chipingress.WithTLS()) } - // Only add headers if they exist - if len(cfg.AuthHeaders) > 0 { - headerProvider := NewStaticAuthHeaderProvider(cfg.AuthHeaders) - chipIngressOpts = append(chipIngressOpts, chipingress.WithTokenAuth(headerProvider)) + // Chip ingress auth + switch { + // Rotating auth + case auth != nil: + opts = append(opts, chipingress.WithTokenAuth(auth)) + // Static auth + case len(cfg.AuthHeaders) > 0: + opts = append(opts, chipingress.WithTokenAuth( + NewStaticAuth(cfg.AuthHeaders, !cfg.ChipIngressInsecureConnection), + )) + // No auth + default: } - chipIngressClient, err = chipingress.NewClient( - cfg.ChipIngressEmitterGRPCEndpoint, - chipIngressOpts..., - ) - + chipIngressClient, err = chipingress.NewClient(cfg.ChipIngressEmitterGRPCEndpoint, opts...) if err != nil { return nil, err } @@ -355,13 +393,22 @@ type shutdowner interface { Shutdown(ctx context.Context) error } -func newTracerProvider(config Config, resource *sdkresource.Resource, creds credentials.TransportCredentials) (*sdktrace.TracerProvider, error) { +func newTracerProvider(config Config, resource *sdkresource.Resource, auth Auth, creds credentials.TransportCredentials) (*sdktrace.TracerProvider, error) { ctx := context.Background() exporterOpts := []otlptracegrpc.Option{ otlptracegrpc.WithTLSCredentials(creds), otlptracegrpc.WithEndpoint(config.OtelExporterGRPCEndpoint), - otlptracegrpc.WithHeaders(config.AuthHeaders), + } + switch { + // Rotating auth + case auth != nil: + exporterOpts = append(exporterOpts, otlptracegrpc.WithDialOption(authDialOpt(auth))) + // Static auth + case len(config.AuthHeaders) > 0: + exporterOpts = append(exporterOpts, otlptracegrpc.WithHeaders(config.AuthHeaders)) + // No auth + default: } if config.TraceRetryConfig != nil { // NOTE: By default, the retry is enabled in the OTel SDK @@ -396,20 +443,31 @@ func newTracerProvider(config Config, resource *sdkresource.Resource, creds cred return sdktrace.NewTracerProvider(opts...), nil } -func newMeterProvider(config Config, resource *sdkresource.Resource, creds credentials.TransportCredentials) (*sdkmetric.MeterProvider, error) { +func newMeterProvider(cfg Config, resource *sdkresource.Resource, auth Auth, creds credentials.TransportCredentials) (*sdkmetric.MeterProvider, error) { ctx := context.Background() opts := []otlpmetricgrpc.Option{ otlpmetricgrpc.WithTLSCredentials(creds), - otlpmetricgrpc.WithEndpoint(config.OtelExporterGRPCEndpoint), - otlpmetricgrpc.WithHeaders(config.AuthHeaders), + otlpmetricgrpc.WithEndpoint(cfg.OtelExporterGRPCEndpoint), + } + + switch { + // Rotating auth + case auth != nil: + opts = append(opts, otlpmetricgrpc.WithDialOption(authDialOpt(auth))) + // Static auth + case len(cfg.AuthHeaders) > 0: + opts = append(opts, otlpmetricgrpc.WithHeaders(cfg.AuthHeaders)) + // No auth + default: } - if config.MetricRetryConfig != nil { + + if cfg.MetricRetryConfig != nil { // NOTE: By default, the retry is enabled in the OTel SDK opts = append(opts, otlpmetricgrpc.WithRetry(otlpmetricgrpc.RetryConfig{ - Enabled: config.MetricRetryConfig.Enabled(), - InitialInterval: config.MetricRetryConfig.GetInitialInterval(), - MaxInterval: config.MetricRetryConfig.GetMaxInterval(), - MaxElapsedTime: config.MetricRetryConfig.GetMaxElapsedTime(), + Enabled: cfg.MetricRetryConfig.Enabled(), + InitialInterval: cfg.MetricRetryConfig.GetInitialInterval(), + MaxInterval: cfg.MetricRetryConfig.GetMaxInterval(), + MaxElapsedTime: cfg.MetricRetryConfig.GetMaxElapsedTime(), })) } // note: context is unused internally @@ -421,10 +479,10 @@ func newMeterProvider(config Config, resource *sdkresource.Resource, creds crede sdkmetric.WithReader( sdkmetric.NewPeriodicReader( exporter, - sdkmetric.WithInterval(config.MetricReaderInterval), // Default is 10s + sdkmetric.WithInterval(cfg.MetricReaderInterval), // Default is 10s )), sdkmetric.WithResource(resource), - sdkmetric.WithView(config.MetricViews...), + sdkmetric.WithView(cfg.MetricViews...), ) return mp, nil } diff --git a/pkg/beholder/client_test.go b/pkg/beholder/client_test.go index 969117a41f..62095d3284 100644 --- a/pkg/beholder/client_test.go +++ b/pkg/beholder/client_test.go @@ -2,10 +2,13 @@ package beholder_test import ( "context" + "crypto/ed25519" + "encoding/hex" "errors" "fmt" "strings" "testing" + "time" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/mock" @@ -13,7 +16,6 @@ import ( "go.opentelemetry.io/otel" "go.opentelemetry.io/otel/exporters/otlp/otlplog/otlploggrpc" "go.opentelemetry.io/otel/exporters/otlp/otlplog/otlploghttp" - "go.opentelemetry.io/otel/log" otellog "go.opentelemetry.io/otel/log" sdklog "go.opentelemetry.io/otel/sdk/log" @@ -341,7 +343,7 @@ func TestNewClientWithChipIngressConfig(t *testing.T) { t.Errorf("Logger panicked when LogStreamingEnabled is false: %v", r) } }() - client.Logger.Emit(t.Context(), log.Record{}) + client.Logger.Emit(t.Context(), otellog.Record{}) }) t.Run("creates client with ChipIngress insecure endpoint", func(t *testing.T) { @@ -451,7 +453,6 @@ func TestNewGRPCClient_ChipIngressEmitter(t *testing.T) { client, err := beholder.NewGRPCClient(cfg, otlploggrpcNew) require.NoError(t, err) require.NotNil(t, client) - defer client.Close() assert.NotNil(t, client.Emitter) // Check that the emitter is a dualSourceEmitter @@ -475,7 +476,6 @@ func TestNewGRPCClient_ChipIngressEmitter(t *testing.T) { client, err := beholder.NewGRPCClient(cfg, otlploggrpcNew) require.NoError(t, err) require.NotNil(t, client) - defer client.Close() assert.NotNil(t, client.Emitter) }) } @@ -530,3 +530,215 @@ func (m *mockLogExporter) Shutdown(ctx context.Context) error { func (m *mockLogExporter) ForceFlush(ctx context.Context) error { return nil } + +func TestNewGRPCClientRotatingAuth(t *testing.T) { + + pubKey, _, err := ed25519.GenerateKey(nil) + require.NoError(t, err) + pubKeyHex := hex.EncodeToString(pubKey) + + t.Run("successful rotating auth setup", func(t *testing.T) { + + mockSigner := &MockSigner{} + + cfg := beholder.Config{ + OtelExporterGRPCEndpoint: "localhost:4317", + AuthPublicKeyHex: pubKeyHex, + AuthKeySigner: mockSigner, + AuthHeadersTTL: 10 * time.Minute, + InsecureConnection: true, + } + + // Mock the otlploggrpc.New function + otlploggrpcNew := func(options ...otlploggrpc.Option) (sdklog.Exporter, error) { + return &mockLogExporter{}, nil + } + + client, err := beholder.NewGRPCClient(cfg, otlploggrpcNew) + require.NoError(t, err) + require.NotNil(t, client) + }) + + t.Run("error when public key hex is empty but signer is set", func(t *testing.T) { + + mockSigner := &MockSigner{} + + cfg := beholder.Config{ + OtelExporterGRPCEndpoint: "localhost:4317", + AuthPublicKeyHex: "", // Empty public key hex + AuthKeySigner: mockSigner, + AuthHeadersTTL: 10 * time.Minute, + InsecureConnection: true, + } + + otlploggrpcNew := func(options ...otlploggrpc.Option) (sdklog.Exporter, error) { + return &mockLogExporter{}, nil + } + + client, err := beholder.NewGRPCClient(cfg, otlploggrpcNew) + require.Error(t, err) + assert.Nil(t, client) + assert.Contains(t, err.Error(), "auth: public key hex required when signer is set") + }) + + t.Run("error when TTL is too short", func(t *testing.T) { + + mockSigner := &MockSigner{} + + cfg := beholder.Config{ + OtelExporterGRPCEndpoint: "localhost:4317", + AuthPublicKeyHex: pubKeyHex, + AuthKeySigner: mockSigner, + AuthHeadersTTL: 5 * time.Minute, + InsecureConnection: true, + } + + otlploggrpcNew := func(options ...otlploggrpc.Option) (sdklog.Exporter, error) { + return &mockLogExporter{}, nil + } + + client, err := beholder.NewGRPCClient(cfg, otlploggrpcNew) + require.Error(t, err) + assert.Nil(t, client) + assert.Contains(t, err.Error(), "auth: headers TTL must be at least 10 minutes") + }) + + t.Run("error when public key hex is invalid", func(t *testing.T) { + + mockSigner := &MockSigner{} + + cfg := beholder.Config{ + OtelExporterGRPCEndpoint: "localhost:4317", + AuthPublicKeyHex: "invalid-hex", // Invalid hex + AuthKeySigner: mockSigner, + AuthHeadersTTL: 10 * time.Minute, + InsecureConnection: true, + } + + otlploggrpcNew := func(options ...otlploggrpc.Option) (sdklog.Exporter, error) { + return &mockLogExporter{}, nil + } + + client, err := beholder.NewGRPCClient(cfg, otlploggrpcNew) + require.Error(t, err) + assert.Nil(t, client) + assert.Contains(t, err.Error(), "auth: failed to decode public key hex") + }) +} + +func TestNewGRPCClientStaticAuthFallback(t *testing.T) { + t.Run("uses static auth when no rotating auth is configured", func(t *testing.T) { + + cfg := beholder.Config{ + OtelExporterGRPCEndpoint: "localhost:4317", + AuthHeaders: map[string]string{ + "Authorization": "Bearer test-token", + }, + InsecureConnection: true, + } + + otlploggrpcNew := func(options ...otlploggrpc.Option) (sdklog.Exporter, error) { + return &mockLogExporter{}, nil + } + + client, err := beholder.NewGRPCClient(cfg, otlploggrpcNew) + require.NoError(t, err) + require.NotNil(t, client) + }) + + t.Run("no auth when neither rotating nor static auth is configured", func(t *testing.T) { + + cfg := beholder.Config{ + OtelExporterGRPCEndpoint: "localhost:4317", + InsecureConnection: true, + } + + otlploggrpcNew := func(options ...otlploggrpc.Option) (sdklog.Exporter, error) { + return &mockLogExporter{}, nil + } + + client, err := beholder.NewGRPCClient(cfg, otlploggrpcNew) + require.NoError(t, err) + require.NotNil(t, client) + }) +} + +func TestNewGRPCClientChipIngressAuth(t *testing.T) { + + pubKey, _, err := ed25519.GenerateKey(nil) + require.NoError(t, err) + pubKeyHex := hex.EncodeToString(pubKey) + + t.Run("chip ingress with rotating auth", func(t *testing.T) { + + mockSigner := &MockSigner{} + + cfg := beholder.Config{ + OtelExporterGRPCEndpoint: "localhost:4317", + ChipIngressEmitterEnabled: true, + ChipIngressEmitterGRPCEndpoint: "localhost:8080", + ChipIngressInsecureConnection: true, + AuthPublicKeyHex: pubKeyHex, + AuthKeySigner: mockSigner, + AuthHeadersTTL: 10 * time.Minute, + InsecureConnection: true, + } + + otlploggrpcNew := func(options ...otlploggrpc.Option) (sdklog.Exporter, error) { + return &mockLogExporter{}, nil + } + + client, err := beholder.NewGRPCClient(cfg, otlploggrpcNew) + require.NoError(t, err) + require.NotNil(t, client) + + _, ok := client.Emitter.(*beholder.DualSourceEmitter) + assert.True(t, ok, "Expected Emitter to be a DualSourceEmitter") + }) + + t.Run("chip ingress with static auth", func(t *testing.T) { + cfg := beholder.Config{ + OtelExporterGRPCEndpoint: "localhost:4317", + ChipIngressEmitterEnabled: true, + ChipIngressEmitterGRPCEndpoint: "localhost:8080", + ChipIngressInsecureConnection: true, + AuthHeaders: map[string]string{ + "Authorization": "Bearer test-token", + }, + InsecureConnection: true, + } + + otlploggrpcNew := func(options ...otlploggrpc.Option) (sdklog.Exporter, error) { + return &mockLogExporter{}, nil + } + + client, err := beholder.NewGRPCClient(cfg, otlploggrpcNew) + require.NoError(t, err) + require.NotNil(t, client) + + _, ok := client.Emitter.(*beholder.DualSourceEmitter) + assert.True(t, ok, "Expected Emitter to be a DualSourceEmitter") + }) + + t.Run("chip ingress with no auth", func(t *testing.T) { + + cfg := beholder.Config{ + OtelExporterGRPCEndpoint: "localhost:4317", + ChipIngressEmitterEnabled: true, + ChipIngressEmitterGRPCEndpoint: "localhost:8080", + ChipIngressInsecureConnection: true, + InsecureConnection: true, + } + + otlploggrpcNew := func(options ...otlploggrpc.Option) (sdklog.Exporter, error) { + return &mockLogExporter{}, nil + } + + client, err := beholder.NewGRPCClient(cfg, otlploggrpcNew) + require.NoError(t, err) + require.NotNil(t, client) + + _, ok := client.Emitter.(*beholder.DualSourceEmitter) + assert.True(t, ok, "Expected Emitter to be a DualSourceEmitter") + }) +} diff --git a/pkg/beholder/config.go b/pkg/beholder/config.go index d08226961d..3b66279ced 100644 --- a/pkg/beholder/config.go +++ b/pkg/beholder/config.go @@ -52,6 +52,8 @@ type Config struct { // Auth AuthPublicKeyHex string AuthHeaders map[string]string + AuthKeySigner Signer + AuthHeadersTTL time.Duration } type RetryConfig struct { @@ -115,6 +117,8 @@ func DefaultConfig() Config { LogMaxQueueSize: 2048, LogBatchProcessor: true, LogStreamingEnabled: true, // Enable logs streaming by default + // Auth + AuthHeadersTTL: 10 * time.Minute, } } diff --git a/pkg/beholder/config_test.go b/pkg/beholder/config_test.go index 2739580993..5be5fa087d 100644 --- a/pkg/beholder/config_test.go +++ b/pkg/beholder/config_test.go @@ -48,6 +48,11 @@ func ExampleConfig() { LogMaxQueueSize: 2048, LogBatchProcessor: true, LogStreamingEnabled: false, // Disable streaming logs by default + // Auth + AuthPublicKeyHex: "", + AuthHeaders: map[string]string{}, + AuthKeySigner: nil, + AuthHeadersTTL: 0, } fmt.Printf("%+v\n", config) config.LogRetryConfig = &beholder.RetryConfig{ @@ -57,6 +62,6 @@ func ExampleConfig() { } fmt.Printf("%+v\n", *config.LogRetryConfig) // Output: - // {InsecureConnection:true CACertFile: OtelExporterGRPCEndpoint:localhost:4317 OtelExporterHTTPEndpoint:localhost:4318 ResourceAttributes:[{Key:package_name Value:{vtype:4 numeric:0 stringly:beholder slice:}} {Key:sender Value:{vtype:4 numeric:0 stringly:beholderclient slice:}}] EmitterExportTimeout:1s EmitterExportInterval:1s EmitterExportMaxBatchSize:512 EmitterMaxQueueSize:2048 EmitterBatchProcessor:true TraceSampleRatio:1 TraceBatchTimeout:1s TraceSpanExporter: TraceRetryConfig: MetricReaderInterval:1s MetricRetryConfig: MetricViews:[] ChipIngressEmitterEnabled:false ChipIngressEmitterGRPCEndpoint: ChipIngressInsecureConnection:false LogExportTimeout:1s LogExportInterval:1s LogExportMaxBatchSize:512 LogMaxQueueSize:2048 LogBatchProcessor:true LogRetryConfig: LogStreamingEnabled:false AuthPublicKeyHex: AuthHeaders:map[]} + // {InsecureConnection:true CACertFile: OtelExporterGRPCEndpoint:localhost:4317 OtelExporterHTTPEndpoint:localhost:4318 ResourceAttributes:[{Key:package_name Value:{vtype:4 numeric:0 stringly:beholder slice:}} {Key:sender Value:{vtype:4 numeric:0 stringly:beholderclient slice:}}] EmitterExportTimeout:1s EmitterExportInterval:1s EmitterExportMaxBatchSize:512 EmitterMaxQueueSize:2048 EmitterBatchProcessor:true TraceSampleRatio:1 TraceBatchTimeout:1s TraceSpanExporter: TraceRetryConfig: MetricReaderInterval:1s MetricRetryConfig: MetricViews:[] ChipIngressEmitterEnabled:false ChipIngressEmitterGRPCEndpoint: ChipIngressInsecureConnection:false LogExportTimeout:1s LogExportInterval:1s LogExportMaxBatchSize:512 LogMaxQueueSize:2048 LogBatchProcessor:true LogRetryConfig: LogStreamingEnabled:false AuthPublicKeyHex: AuthHeaders:map[] AuthKeySigner: AuthHeadersTTL:0s} // {InitialInterval:5s MaxInterval:30s MaxElapsedTime:1m0s} }