From 9d2f9d3a5c5ad689b042595768ecf6d906c90c21 Mon Sep 17 00:00:00 2001 From: kirqz23 Date: Mon, 18 Aug 2025 22:18:46 +0200 Subject: [PATCH 1/2] beholder: config option to enable streaming logs to OTel --- pkg/loop/config.go | 7 +++++++ pkg/loop/config_test.go | 3 +++ pkg/loop/server.go | 3 ++- 3 files changed, 12 insertions(+), 1 deletion(-) diff --git a/pkg/loop/config.go b/pkg/loop/config.go index a3bcef405..629809dd3 100644 --- a/pkg/loop/config.go +++ b/pkg/loop/config.go @@ -61,6 +61,7 @@ const ( envTelemetryEmitterExportInterval = "CL_TELEMETRY_EMITTER_EXPORT_INTERVAL" envTelemetryEmitterExportMaxBatchSize = "CL_TELEMETRY_EMITTER_EXPORT_MAX_BATCH_SIZE" envTelemetryEmitterMaxQueueSize = "CL_TELEMETRY_EMITTER_MAX_QUEUE_SIZE" + envTelemetryLogStreamingEnabled = "CL_TELEMETRY_LOG_STREAMING_ENABLED" envChipIngressEndpoint = "CL_CHIP_INGRESS_ENDPOINT" ) @@ -115,6 +116,7 @@ type EnvConfig struct { TelemetryEmitterExportInterval time.Duration TelemetryEmitterExportMaxBatchSize int TelemetryEmitterMaxQueueSize int + TelemetryLogStreamingEnabled bool ChipIngressEndpoint string } @@ -182,6 +184,7 @@ func (e *EnvConfig) AsCmdEnv() (env []string) { add(envTelemetryEmitterExportInterval, e.TelemetryEmitterExportInterval.String()) add(envTelemetryEmitterExportMaxBatchSize, strconv.Itoa(e.TelemetryEmitterExportMaxBatchSize)) add(envTelemetryEmitterMaxQueueSize, strconv.Itoa(e.TelemetryEmitterMaxQueueSize)) + add(envTelemetryLogStreamingEnabled, strconv.FormatBool(e.TelemetryLogStreamingEnabled)) add(envChipIngressEndpoint, e.ChipIngressEndpoint) @@ -341,6 +344,10 @@ func (e *EnvConfig) parse() error { if err != nil { return fmt.Errorf("failed to parse %s: %w", envTelemetryEmitterMaxQueueSize, err) } + e.TelemetryLogStreamingEnabled, err = getBool(envTelemetryLogStreamingEnabled) + if err != nil { + return fmt.Errorf("failed to parse %s: %w", envTelemetryLogStreamingEnabled, err) + } // Optional e.ChipIngressEndpoint = os.Getenv(envChipIngressEndpoint) } diff --git a/pkg/loop/config_test.go b/pkg/loop/config_test.go index 6092f55f3..5f3b24464 100644 --- a/pkg/loop/config_test.go +++ b/pkg/loop/config_test.go @@ -74,6 +74,7 @@ func TestEnvConfig_parse(t *testing.T) { envTelemetryEmitterExportInterval: "2s", envTelemetryEmitterExportMaxBatchSize: "100", envTelemetryEmitterMaxQueueSize: "1000", + envTelemetryLogStreamingEnabled: "false", envChipIngressEndpoint: "http://chip-ingress.example.com", }, @@ -171,6 +172,7 @@ var envCfgFull = EnvConfig{ TelemetryEmitterExportInterval: 2 * time.Second, TelemetryEmitterExportMaxBatchSize: 100, TelemetryEmitterMaxQueueSize: 1000, + TelemetryLogStreamingEnabled: false, ChipIngressEndpoint: "http://chip-ingress.example.com", } @@ -219,6 +221,7 @@ func TestEnvConfig_AsCmdEnv(t *testing.T) { assert.Equal(t, "2s", got[envTelemetryEmitterExportInterval]) assert.Equal(t, "100", got[envTelemetryEmitterExportMaxBatchSize]) assert.Equal(t, "1000", got[envTelemetryEmitterMaxQueueSize]) + assert.Equal(t, "false", got[envTelemetryLogStreamingEnabled]) // Assert ChipIngress environment variables assert.Equal(t, "http://chip-ingress.example.com", got[envChipIngressEndpoint]) diff --git a/pkg/loop/server.go b/pkg/loop/server.go index f4667fb1a..ff78fa777 100644 --- a/pkg/loop/server.go +++ b/pkg/loop/server.go @@ -123,6 +123,7 @@ func (s *Server) start() error { EmitterExportInterval: s.EnvConfig.TelemetryEmitterExportInterval, EmitterExportMaxBatchSize: s.EnvConfig.TelemetryEmitterExportMaxBatchSize, EmitterMaxQueueSize: s.EnvConfig.TelemetryEmitterMaxQueueSize, + LogStreamingEnabled: s.EnvConfig.TelemetryLogStreamingEnabled, ChipIngressEmitterEnabled: s.EnvConfig.ChipIngressEndpoint != "", ChipIngressEmitterGRPCEndpoint: s.EnvConfig.ChipIngressEndpoint, ChipIngressInsecureConnection: s.EnvConfig.TelemetryInsecureConnection, @@ -166,7 +167,7 @@ func (s *Server) start() error { LockTimeout: s.EnvConfig.DatabaseLockTimeout, MaxOpenConns: s.EnvConfig.DatabaseMaxOpenConns, MaxIdleConns: s.EnvConfig.DatabaseMaxIdleConns, - EnableTracing: s.EnvConfig.DatabaseTracingEnabled, + EnableTracing: s.EnvConfig.DatabaseTracingEnabled, }.New(ctx, dbURL, pg.DriverPostgres) if err != nil { return fmt.Errorf("error connecting to DataBase: %w", err) From b42a0a872a9ebf8c0c75b499c358403d24aad7bb Mon Sep 17 00:00:00 2001 From: Pavel <177363085+pkcll@users.noreply.github.com> Date: Mon, 18 Aug 2025 22:47:56 -0400 Subject: [PATCH 2/2] loop/server: use otelzap logger --- pkg/logger/logger.go | 10 ++++++++++ pkg/loop/logger.go | 13 +++++++++++++ pkg/loop/server.go | 8 ++++++++ 3 files changed, 31 insertions(+) diff --git a/pkg/logger/logger.go b/pkg/logger/logger.go index 6ac6031e7..e2ed670af 100644 --- a/pkg/logger/logger.go +++ b/pkg/logger/logger.go @@ -96,6 +96,16 @@ func NewWith(cfgFn func(*zap.Config)) (Logger, error) { return &logger{core.Sugar()}, nil } +func NewWithCore(cfgFn func(*zap.Config), core zapcore.Core) (Logger, error) { + cfg := zap.NewProductionConfig() + cfgFn(&cfg) + c, err := cfg.Build() + if err != nil { + return nil, err + } + return &logger{zap.New(zapcore.NewTee(c.Core(), core)).Sugar()}, nil +} + // NewWithSync returns a new Logger with a given SyncWriter. func NewWithSync(w io.Writer) Logger { core := zapcore.NewCore(zapcore.NewJSONEncoder(zap.NewProductionEncoderConfig()), zapcore.AddSync(w), zapcore.InfoLevel) diff --git a/pkg/loop/logger.go b/pkg/loop/logger.go index 76b6bd11a..1d37ecf60 100644 --- a/pkg/loop/logger.go +++ b/pkg/loop/logger.go @@ -14,6 +14,8 @@ import ( "golang.org/x/exp/slices" "github.com/smartcontractkit/chainlink-common/pkg/logger" + "github.com/smartcontractkit/chainlink-common/pkg/logger/otelzap" + otellog "go.opentelemetry.io/otel/log" ) // HCLogLogger returns an [hclog.Logger] backed by the given [logger.Logger]. @@ -171,6 +173,17 @@ func NewLogger() (logger.Logger, error) { }) } +func NewOtelLogger(otelLogger otellog.Logger) (logger.Logger, error) { + cfgFn := func(cfg *zap.Config) { + cfg.Level.SetLevel(zap.DebugLevel) + cfg.EncoderConfig.LevelKey = "@level" + cfg.EncoderConfig.MessageKey = "@message" + cfg.EncoderConfig.TimeKey = "@timestamp" + cfg.EncoderConfig.EncodeTime = zapcore.TimeEncoderOfLayout("2006-01-02T15:04:05.000000Z07:00") + } + return logger.NewWithCore(cfgFn, otelzap.NewCore(otelLogger)) +} + // onceValue returns a function that invokes f only once and returns the value // returned by f. The returned function may be called concurrently. // diff --git a/pkg/loop/server.go b/pkg/loop/server.go index ff78fa777..12577ee9d 100644 --- a/pkg/loop/server.go +++ b/pkg/loop/server.go @@ -146,6 +146,14 @@ func (s *Server) start() error { } beholder.SetClient(beholderClient) beholder.SetGlobalOtelProviders() + + if beholderCfg.LogStreamingEnabled { + otelLogger, err := NewOtelLogger(beholder.GetLogger()) + if err != nil { + return fmt.Errorf("failed to enable log streaming: %w", err) + } + s.Logger = logger.Sugared(logger.Named(otelLogger, s.Logger.Name())) + } } s.promServer = NewPromServer(s.EnvConfig.PrometheusPort, s.Logger)