Skip to content

Commit ab562b4

Browse files
authored
INFOPLAT 2963 beholder rotating auth headers loop impl (#1609)
* INFOPLAT-2963 Adds TTL to loop config * INFOPLAT-2963 Wires up `Keystore` as `Signer` impl for beholder headers * Deferred signer * Makes lazy signer an interface - removes `Keystore` field * Removes keystore mentions from loop server * Uses Initial provided headers to the lazy signer * Makes configuration more clear * Simplify server beholder auth config * Adjust `Signer` interface to use `keyID string` this conforms ot the `Keystore` interface in `core` * Example of setting signer * `fmt` * Sort `Auth` fields * Reduces stuttering in interface * Address comments * Sets config mechanism dependent on authheadertll * Wire up beholder client in relayer * Simply setting signer GetClient() will never return nil since its always initialized in `from pkg/beholder/global.go` `init` * Updates `SetSigner` comment mentions that it is thread-safe * Removes deferred beholder signer wire up Removes wire up Removes code refactor * Fixes merge conflict
1 parent d5b6b6c commit ab562b4

11 files changed

Lines changed: 181 additions & 38 deletions

File tree

pkg/beholder/auth.go

Lines changed: 18 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -38,7 +38,7 @@ type Auth interface {
3838
}
3939

4040
type Signer interface {
41-
Sign(ctx context.Context, keyID []byte, data []byte) ([]byte, error)
41+
Sign(ctx context.Context, keyID string, data []byte) ([]byte, error)
4242
}
4343

4444
type staticAuth struct {
@@ -82,7 +82,10 @@ type rotatingAuth struct {
8282
mu sync.Mutex
8383
}
8484

85-
func NewRotatingAuth(csaPubKey ed25519.PublicKey, signer Signer, ttl time.Duration, requireTransportSecurity bool) Auth {
85+
// NewRotatingAuth creates a rotating auth mechanism that automatically refreshes headers.
86+
// If initialHeaders are provided, they will be used immediately until TTL expires.
87+
// After TTL expiration, the signer is called to generate new headers.
88+
func NewRotatingAuth(csaPubKey ed25519.PublicKey, signer Signer, ttl time.Duration, requireTransportSecurity bool, initialHeaders map[string]string) Auth {
8689
r := &rotatingAuth{
8790
csaPubKey: csaPubKey,
8891
signer: signer,
@@ -91,7 +94,18 @@ func NewRotatingAuth(csaPubKey ed25519.PublicKey, signer Signer, ttl time.Durati
9194
lastUpdatedNanos: atomic.Int64{},
9295
requireTransportSecurity: requireTransportSecurity,
9396
}
94-
r.headers.Store(make(map[string]string))
97+
98+
headers := make(map[string]string)
99+
// If initial headers are provided, use them and set timestamp to now
100+
// Otherwise, leave timestamp at 0 so headers are generated on first call
101+
if len(initialHeaders) > 0 {
102+
headers = initialHeaders
103+
// We assume the time between the initial headers being generated is very small
104+
r.lastUpdatedNanos.Store(time.Now().UnixNano())
105+
}
106+
107+
r.headers.Store(headers)
108+
95109
return r
96110
}
97111

@@ -125,7 +139,7 @@ func (r *rotatingAuth) Headers(ctx context.Context) (map[string]string, error) {
125139
defer cancel()
126140

127141
// Sign(public key bytes + timestamp bytes)
128-
signature, err := r.signer.Sign(ctxWithTimeout, r.csaPubKey, msgBytes)
142+
signature, err := r.signer.Sign(ctxWithTimeout, fmt.Sprintf("%x", r.csaPubKey), msgBytes)
129143
if err != nil {
130144
return nil, fmt.Errorf("beholder: failed to sign auth header: %w", err)
131145
}

pkg/beholder/auth_test.go

Lines changed: 50 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@ import (
44
"context"
55
"crypto/ed25519"
66
"encoding/hex"
7+
"fmt"
78
"strings"
89
"testing"
910
"time"
@@ -54,7 +55,7 @@ type MockSigner struct {
5455
mock.Mock
5556
}
5657

57-
func (m *MockSigner) Sign(ctx context.Context, keyID []byte, data []byte) ([]byte, error) {
58+
func (m *MockSigner) Sign(ctx context.Context, keyID string, data []byte) ([]byte, error) {
5859
args := m.Called(ctx, keyID, data)
5960
return args.Get(0).([]byte), args.Error(1)
6061
}
@@ -71,13 +72,13 @@ func TestRotatingAuth(t *testing.T) {
7172
dummySignature := ed25519.Sign(privKey, []byte("test data"))
7273

7374
mockSigner.
74-
On("Sign", mock.Anything, mock.MatchedBy(func(keyID []byte) bool {
75-
return string(keyID) == string(pubKey) // Verify correct public key is passed
75+
On("Sign", mock.Anything, mock.MatchedBy(func(keyID string) bool {
76+
return keyID == hex.EncodeToString(pubKey) // Verify correct public key hex is passed
7677
}), mock.Anything).
7778
Return(dummySignature, nil)
7879

7980
ttl := 5 * time.Minute
80-
auth := beholder.NewRotatingAuth(pubKey, mockSigner, ttl, false)
81+
auth := beholder.NewRotatingAuth(pubKey, mockSigner, ttl, false, nil)
8182

8283
headers, err := auth.Headers(t.Context())
8384
require.NoError(t, err)
@@ -112,7 +113,7 @@ func TestRotatingAuth(t *testing.T) {
112113
Maybe()
113114

114115
ttl := 5 * time.Minute
115-
auth := beholder.NewRotatingAuth(pubKey, mockSigner, ttl, false)
116+
auth := beholder.NewRotatingAuth(pubKey, mockSigner, ttl, false, nil)
116117

117118
headers1, err := auth.Headers(t.Context())
118119
require.NoError(t, err)
@@ -135,7 +136,7 @@ func TestRotatingAuth(t *testing.T) {
135136
Return([]byte{}, expectedErr)
136137

137138
ttl := 5 * time.Minute
138-
auth := beholder.NewRotatingAuth(pubKey, mockSigner, ttl, false)
139+
auth := beholder.NewRotatingAuth(pubKey, mockSigner, ttl, false, nil)
139140

140141
headers, err := auth.Headers(t.Context())
141142
require.Error(t, err)
@@ -157,7 +158,7 @@ func TestRotatingAuth(t *testing.T) {
157158
Maybe()
158159

159160
ttl := 5 * time.Minute
160-
auth := beholder.NewRotatingAuth(pubKey, mockSigner, ttl, false)
161+
auth := beholder.NewRotatingAuth(pubKey, mockSigner, ttl, false, nil)
161162

162163
creds := auth.Credentials()
163164
require.NotNil(t, creds)
@@ -183,16 +184,52 @@ func TestRotatingAuth(t *testing.T) {
183184

184185
ttl := 5 * time.Minute
185186
// transport security required
186-
authSecure := beholder.NewRotatingAuth(pubKey, mockSigner, ttl, true)
187+
authSecure := beholder.NewRotatingAuth(pubKey, mockSigner, ttl, true, nil)
187188
credsSecure := authSecure.Credentials()
188189
assert.True(t, credsSecure.RequireTransportSecurity())
189190
// transport security not required
190-
authInsecure := beholder.NewRotatingAuth(pubKey, mockSigner, ttl, false)
191+
authInsecure := beholder.NewRotatingAuth(pubKey, mockSigner, ttl, false, nil)
191192
credsInsecure := authInsecure.Credentials()
192193
assert.False(t, credsInsecure.RequireTransportSecurity())
193194

194195
mockSigner.AssertExpectations(t)
195196
})
197+
198+
t.Run("uses initial headers until TTL expires", func(t *testing.T) {
199+
mockSigner := &MockSigner{}
200+
201+
// Create initial headers with v2 format
202+
ts := time.Now()
203+
signature := ed25519.Sign(privKey, []byte("initial"))
204+
initialHeaders := map[string]string{
205+
"X-Beholder-Node-Auth-Token": "2:" + hex.EncodeToString(pubKey) + ":" + fmt.Sprintf("%d", ts.UnixNano()) + ":" + hex.EncodeToString(signature),
206+
}
207+
208+
// Use a very short TTL so it expires quickly
209+
ttl := 1 * time.Millisecond
210+
auth := beholder.NewRotatingAuth(pubKey, mockSigner, ttl, false, initialHeaders)
211+
212+
// First call should return the initial headers without calling Sign
213+
headers1, err := auth.Headers(t.Context())
214+
require.NoError(t, err)
215+
assert.Equal(t, initialHeaders, headers1)
216+
217+
// Wait for TTL to expire
218+
time.Sleep(5 * time.Millisecond)
219+
220+
// Now the signer should be called to generate new headers
221+
newSignature := ed25519.Sign(privKey, []byte("new"))
222+
mockSigner.
223+
On("Sign", mock.Anything, mock.Anything, mock.Anything).
224+
Return(newSignature, nil).
225+
Once()
226+
227+
headers2, err := auth.Headers(t.Context())
228+
require.NoError(t, err)
229+
assert.NotEqual(t, initialHeaders, headers2, "Should generate new headers after TTL expires")
230+
231+
mockSigner.AssertExpectations(t)
232+
})
196233
}
197234

198235
// BenchmarkRotatingAuth_Headers_CachedPath benchmarks the fast path where headers are cached and within TTL.
@@ -212,7 +249,7 @@ func BenchmarkRotatingAuth_Headers_CachedPath(b *testing.B) {
212249

213250
// Use a long TTL so headers don't expire during the benchmark
214251
ttl := 1 * time.Hour
215-
auth := beholder.NewRotatingAuth(pubKey, mockSigner, ttl, false)
252+
auth := beholder.NewRotatingAuth(pubKey, mockSigner, ttl, false, nil)
216253

217254
// Prime the cache by calling Headers once
218255
ctx := b.Context()
@@ -249,7 +286,7 @@ func BenchmarkRotatingAuth_Headers_ExpiredPath(b *testing.B) {
249286

250287
// Use a TTL of 0 to force regeneration on every call
251288
ttl := 0 * time.Second
252-
auth := beholder.NewRotatingAuth(pubKey, mockSigner, ttl, false)
289+
auth := beholder.NewRotatingAuth(pubKey, mockSigner, ttl, false, nil)
253290

254291
ctx := b.Context()
255292

@@ -283,7 +320,7 @@ func BenchmarkRotatingAuth_Headers_ParallelCached(b *testing.B) {
283320

284321
// Use a long TTL so headers don't expire during the benchmark
285322
ttl := 1 * time.Hour
286-
auth := beholder.NewRotatingAuth(pubKey, mockSigner, ttl, false)
323+
auth := beholder.NewRotatingAuth(pubKey, mockSigner, ttl, false, nil)
287324

288325
// Prime the cache
289326
ctx := b.Context()
@@ -323,7 +360,7 @@ func BenchmarkRotatingAuth_Headers_ParallelExpired(b *testing.B) {
323360

324361
// Use a short TTL to cause periodic regeneration
325362
ttl := 10 * time.Millisecond
326-
auth := beholder.NewRotatingAuth(pubKey, mockSigner, ttl, false)
363+
auth := beholder.NewRotatingAuth(pubKey, mockSigner, ttl, false, nil)
327364

328365
ctx := b.Context()
329366

pkg/beholder/client.go

Lines changed: 36 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -50,6 +50,9 @@ type Client struct {
5050
MeterProvider otelmetric.MeterProvider
5151
MessageLoggerProvider otellog.LoggerProvider
5252

53+
// lazySigner allows updating the keystore after client initialization.
54+
lazySigner *lazySigner
55+
5356
// OnClose
5457
OnClose func() error
5558
}
@@ -98,12 +101,18 @@ func NewGRPCClient(cfg Config, otlploggrpcNew otlploggrpcFactory) (*Client, erro
98101
}
99102

100103
// Initialize auth here for reuse with log, trace, and metric exporters
104+
// Two modes are supported:
105+
// 1. Static auth: If AuthHeadersTTL == 0, use AuthHeaders as-is and never change
106+
// 2. Rotating auth: If AuthHeadersTTL > 0, create lazySigner for deferred keystore injection
107+
var signer *lazySigner
101108
var auth Auth
102-
if cfg.AuthKeySigner != nil {
109+
110+
if cfg.AuthHeadersTTL > 0 {
103111

104112
if cfg.AuthPublicKeyHex == "" {
105-
return nil, fmt.Errorf("auth: public key hex required when signer is set")
113+
return nil, fmt.Errorf("auth: public key hex required for rotating auth (TTL > 0)")
106114
}
115+
107116
// Clamp lowest possible value to 10mins
108117
if cfg.AuthHeadersTTL < 10*time.Minute {
109118
return nil, fmt.Errorf("auth: headers TTL must be at least 10 minutes")
@@ -114,8 +123,16 @@ func NewGRPCClient(cfg Config, otlploggrpcNew otlploggrpcFactory) (*Client, erro
114123
return nil, fmt.Errorf("auth: failed to decode public key hex: %w", err)
115124
}
116125

117-
auth = NewRotatingAuth(key, cfg.AuthKeySigner, cfg.AuthHeadersTTL, !cfg.InsecureConnection)
126+
// Optionally wrap the signer in a lazySigner if AuthKeySigner was provided
127+
// This allows the signer to be set both before and after client initialization
128+
signer = &lazySigner{}
129+
if cfg.AuthKeySigner != nil {
130+
signer.Set(cfg.AuthKeySigner)
131+
}
132+
133+
auth = NewRotatingAuth(key, signer, cfg.AuthHeadersTTL, !cfg.InsecureConnection, cfg.AuthHeaders)
118134
}
135+
119136
// Tracer
120137
tracerProvider, err := newTracerProvider(cfg, baseResource, auth, creds)
121138
if err != nil {
@@ -223,7 +240,7 @@ func NewGRPCClient(cfg Config, otlploggrpcNew otlploggrpcFactory) (*Client, erro
223240
}
224241
return
225242
}
226-
return &Client{cfg, logger, tracer, meter, emitter, chip, loggerProvider, tracerProvider, meterProvider, messageLoggerProvider, onClose}, nil
243+
return &Client{cfg, logger, tracer, meter, emitter, chip, loggerProvider, tracerProvider, meterProvider, messageLoggerProvider, signer, onClose}, nil
227244
}
228245

229246
// Closes all providers, flushes all data and stops all background processes
@@ -262,6 +279,21 @@ func (c Client) ForName(name string) Client {
262279
return newClient
263280
}
264281

282+
// SetSigner updates the signer in the lazy signer.
283+
// This method enables setting the signer after the beholder client has been created, which is useful
284+
// when the signer is not available at client initialization time but the client needs to be configured
285+
// with rotating auth. The underlying lazy signer is thread-safe.
286+
func (c *Client) SetSigner(signer Signer) {
287+
if c.lazySigner != nil {
288+
c.lazySigner.Set(signer)
289+
}
290+
}
291+
292+
// IsSignerSet returns true if a signer has been set in the lazy signer.
293+
func (c *Client) IsSignerSet() bool {
294+
return c.lazySigner != nil && c.lazySigner.IsSet()
295+
}
296+
265297
func newOtelResource(cfg Config) (resource *sdkresource.Resource, err error) {
266298
extraResources, err := sdkresource.New(
267299
context.Background(),

pkg/beholder/client_test.go

Lines changed: 4 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -559,15 +559,12 @@ func TestNewGRPCClientRotatingAuth(t *testing.T) {
559559
require.NotNil(t, client)
560560
})
561561

562-
t.Run("error when public key hex is empty but signer is set", func(t *testing.T) {
563-
564-
mockSigner := &MockSigner{}
562+
t.Run("error when public key hex is empty but TTL is set", func(t *testing.T) {
565563

566564
cfg := beholder.Config{
567565
OtelExporterGRPCEndpoint: "localhost:4317",
568-
AuthPublicKeyHex: "", // Empty public key hex
569-
AuthKeySigner: mockSigner,
570-
AuthHeadersTTL: 10 * time.Minute,
566+
AuthPublicKeyHex: "", // Empty public key hex
567+
AuthHeadersTTL: 10 * time.Minute, // TTL > 0 requires public key
571568
InsecureConnection: true,
572569
}
573570

@@ -578,7 +575,7 @@ func TestNewGRPCClientRotatingAuth(t *testing.T) {
578575
client, err := beholder.NewGRPCClient(cfg, otlploggrpcNew)
579576
require.Error(t, err)
580577
assert.Nil(t, client)
581-
assert.Contains(t, err.Error(), "auth: public key hex required when signer is set")
578+
assert.Contains(t, err.Error(), "auth: public key hex required for rotating auth")
582579
})
583580

584581
t.Run("error when TTL is too short", func(t *testing.T) {

pkg/beholder/config.go

Lines changed: 12 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -52,10 +52,14 @@ type Config struct {
5252
LogLevel zapcore.Level // Log level for telemetry streaming
5353

5454
// Auth
55-
AuthPublicKeyHex string
55+
// AuthHeaders serves two purposes:
56+
// 1. Static mode: When AuthKeySigner is nil, these headers are used as-is and never change
57+
// 2. Rotating mode: When AuthKeySigner is set, these headers are used as initial headers
58+
// until TTL expires, then the lazy signer generates new ones
5659
AuthHeaders map[string]string
57-
AuthKeySigner Signer
5860
AuthHeadersTTL time.Duration
61+
AuthKeySigner Signer
62+
AuthPublicKeyHex string
5963
}
6064

6165
type RetryConfig struct {
@@ -120,8 +124,8 @@ func DefaultConfig() Config {
120124
LogBatchProcessor: true,
121125
LogStreamingEnabled: true, // Enable logs streaming by default
122126
LogLevel: zapcore.InfoLevel,
123-
// Auth
124-
AuthHeadersTTL: 10 * time.Minute,
127+
// Auth (defaults to static auth mode with TTL=0)
128+
AuthHeadersTTL: 0,
125129
}
126130
}
127131

@@ -134,6 +138,8 @@ func TestDefaultConfig() Config {
134138
config.LogRetryConfig.MaxElapsedTime = 0 // Retry is disabled
135139
config.TraceRetryConfig.MaxElapsedTime = 0 // Retry is disabled
136140
config.MetricRetryConfig.MaxElapsedTime = 0 // Retry is disabled
141+
// Auth disabled for testing (TTL=0 means static auth mode)
142+
config.AuthHeadersTTL = 0
137143
return config
138144
}
139145

@@ -144,6 +150,8 @@ func TestDefaultConfigHTTPClient() Config {
144150
config.LogBatchProcessor = false
145151
config.OtelExporterGRPCEndpoint = ""
146152
config.OtelExporterHTTPEndpoint = "localhost:4318"
153+
// Auth disabled for testing (TTL=0 means static auth mode)
154+
config.AuthHeadersTTL = 0
147155
return config
148156
}
149157

pkg/beholder/config_test.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -64,6 +64,6 @@ func ExampleConfig() {
6464
}
6565
fmt.Printf("%+v\n", *config.LogRetryConfig)
6666
// Output:
67-
// {InsecureConnection:true CACertFile: OtelExporterGRPCEndpoint:localhost:4317 OtelExporterHTTPEndpoint:localhost:4318 ResourceAttributes:[{Key:package_name Value:{vtype:4 numeric:0 stringly:beholder slice:<nil>}} {Key:sender Value:{vtype:4 numeric:0 stringly:beholderclient slice:<nil>}}] EmitterExportTimeout:1s EmitterExportInterval:1s EmitterExportMaxBatchSize:512 EmitterMaxQueueSize:2048 EmitterBatchProcessor:true TraceSampleRatio:1 TraceBatchTimeout:1s TraceSpanExporter:<nil> TraceRetryConfig:<nil> MetricReaderInterval:1s MetricRetryConfig:<nil> MetricViews:[] ChipIngressEmitterEnabled:false ChipIngressEmitterGRPCEndpoint: ChipIngressInsecureConnection:false LogExportTimeout:1s LogExportInterval:1s LogExportMaxBatchSize:512 LogMaxQueueSize:2048 LogBatchProcessor:true LogRetryConfig:<nil> LogStreamingEnabled:false LogLevel:info AuthPublicKeyHex: AuthHeaders:map[] AuthKeySigner:<nil> AuthHeadersTTL:0s}
67+
// {InsecureConnection:true CACertFile: OtelExporterGRPCEndpoint:localhost:4317 OtelExporterHTTPEndpoint:localhost:4318 ResourceAttributes:[{Key:package_name Value:{vtype:4 numeric:0 stringly:beholder slice:<nil>}} {Key:sender Value:{vtype:4 numeric:0 stringly:beholderclient slice:<nil>}}] EmitterExportTimeout:1s EmitterExportInterval:1s EmitterExportMaxBatchSize:512 EmitterMaxQueueSize:2048 EmitterBatchProcessor:true TraceSampleRatio:1 TraceBatchTimeout:1s TraceSpanExporter:<nil> TraceRetryConfig:<nil> MetricReaderInterval:1s MetricRetryConfig:<nil> MetricViews:[] ChipIngressEmitterEnabled:false ChipIngressEmitterGRPCEndpoint: ChipIngressInsecureConnection:false LogExportTimeout:1s LogExportInterval:1s LogExportMaxBatchSize:512 LogMaxQueueSize:2048 LogBatchProcessor:true LogRetryConfig:<nil> LogStreamingEnabled:false LogLevel:info AuthHeaders:map[] AuthHeadersTTL:0s AuthKeySigner:<nil> AuthPublicKeyHex:}
6868
// {InitialInterval:5s MaxInterval:30s MaxElapsedTime:1m0s}
6969
}

pkg/beholder/httpclient.go

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -186,7 +186,8 @@ func NewHTTPClient(cfg Config, otlploghttpNew otlploghttpFactory) (*Client, erro
186186
}
187187
return
188188
}
189-
return &Client{cfg, logger, tracer, meter, emitter, nil, loggerProvider, tracerProvider, meterProvider, messageLoggerProvider, onClose}, nil
189+
// HTTP client doesn't currently support rotating auth, so lazySigner is always nil
190+
return &Client{cfg, logger, tracer, meter, emitter, nil, loggerProvider, tracerProvider, meterProvider, messageLoggerProvider, nil, onClose}, nil
190191
}
191192

192193
func newHTTPTracerProvider(config Config, resource *sdkresource.Resource, tlsConfig *tls.Config) (*sdktrace.TracerProvider, error) {

0 commit comments

Comments
 (0)