Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
23 changes: 23 additions & 0 deletions pkg/beholder/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,8 @@ import (
"github.com/smartcontractkit/chainlink-common/pkg/chipingress"
)

const defaultGRPCCompressor = "gzip"

type Emitter interface {
// Sends message with bytes and attributes to OTel Collector
Emit(ctx context.Context, body []byte, attrKVs ...any) error
Expand Down Expand Up @@ -362,6 +364,13 @@ func newTracerProvider(config Config, resource *sdkresource.Resource, auth Auth,
// No auth
default:
}
switch compressor := config.TraceCompressor; compressor {
case "none":
case "":
exporterOpts = append(exporterOpts, otlptracegrpc.WithCompressor(defaultGRPCCompressor))
default:
exporterOpts = append(exporterOpts, otlptracegrpc.WithCompressor(compressor))
}
if config.TraceRetryConfig != nil {
// NOTE: By default, the retry is enabled in the OTel SDK
exporterOpts = append(exporterOpts, otlptracegrpc.WithRetry(otlptracegrpc.RetryConfig{
Expand Down Expand Up @@ -401,6 +410,13 @@ func newMeterProvider(cfg Config, resource *sdkresource.Resource, auth Auth, cre
otlpmetricgrpc.WithTLSCredentials(creds),
otlpmetricgrpc.WithEndpoint(cfg.OtelExporterGRPCEndpoint),
}
switch compressor := cfg.MetricCompressor; compressor {
case "none":
case "":
opts = append(opts, otlpmetricgrpc.WithCompressor(defaultGRPCCompressor))
default:
opts = append(opts, otlpmetricgrpc.WithCompressor(compressor))
}

switch {
// Rotating auth
Expand Down Expand Up @@ -454,6 +470,13 @@ func newLoggerOpts(cfg Config, auth Auth, creds credentials.TransportCredentials
otlploggrpc.WithTLSCredentials(creds),
otlploggrpc.WithEndpoint(cfg.OtelExporterGRPCEndpoint),
}
switch compressor := cfg.LogCompressor; compressor {
case "none":
case "":
opts = append(opts, otlploggrpc.WithCompressor(defaultGRPCCompressor))
default:
opts = append(opts, otlploggrpc.WithCompressor(compressor))
}
// Log exporter auth
switch {
// Rotating auth
Expand Down
9 changes: 9 additions & 0 deletions pkg/beholder/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,11 +29,15 @@ type Config struct {
TraceBatchTimeout time.Duration
TraceSpanExporter trace.SpanExporter // optional additional exporter
TraceRetryConfig *RetryConfig
// TraceCompressor sets the gRPC compressor for traces. Valid values: "gzip" (default), "none".
TraceCompressor string

// OTel Metric
MetricReaderInterval time.Duration
MetricRetryConfig *RetryConfig
MetricViews []metric.View
// MetricCompressor sets the gRPC compressor for metrics. Valid values: "gzip" (default), "none".
MetricCompressor string

// Custom Events via Chip Ingress Emitter
ChipIngressEmitterEnabled bool
Expand All @@ -50,6 +54,8 @@ type Config struct {
LogRetryConfig *RetryConfig
LogStreamingEnabled bool // Enable logs streaming to the OTel log exporter
LogLevel zapcore.Level // Log level for telemetry streaming
// LogCompressor sets the gRPC compressor for logs. Valid values: "gzip" (default), "none".
LogCompressor string

// Auth
// AuthHeaders serves two purposes:
Expand Down Expand Up @@ -110,10 +116,12 @@ func DefaultConfig() Config {
// Trace
TraceSampleRatio: 1,
TraceBatchTimeout: 1 * time.Second,
TraceCompressor: "gzip",
// OTel trace exporter retry config
TraceRetryConfig: defaultRetryConfig.Copy(),
// Metric
MetricReaderInterval: 1 * time.Second,
MetricCompressor: "gzip",
// OTel metric exporter retry config
MetricRetryConfig: defaultRetryConfig.Copy(),
// Log
Expand All @@ -124,6 +132,7 @@ func DefaultConfig() Config {
LogBatchProcessor: true,
LogStreamingEnabled: true, // Enable logs streaming by default
LogLevel: zapcore.InfoLevel,
LogCompressor: "gzip",
// Auth (defaults to static auth mode with TTL=0)
AuthHeadersTTL: 0,
}
Expand Down
5 changes: 4 additions & 1 deletion pkg/beholder/config_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,10 +36,12 @@ func ExampleConfig() {
// Trace
TraceSampleRatio: 1,
TraceBatchTimeout: 1 * time.Second,
TraceCompressor: "gzip",
// OTel trace exporter retry config
TraceRetryConfig: nil,
// Metric
MetricReaderInterval: 1 * time.Second,
MetricCompressor: "gzip",
// OTel metric exporter retry config
MetricRetryConfig: nil,
// Log
Expand All @@ -50,6 +52,7 @@ func ExampleConfig() {
LogBatchProcessor: true,
LogStreamingEnabled: false, // Disable streaming logs by default
LogLevel: zapcore.InfoLevel, // Default log level
LogCompressor: "gzip",
// Auth
AuthPublicKeyHex: "",
AuthHeaders: map[string]string{},
Expand All @@ -64,6 +67,6 @@ func ExampleConfig() {
}
fmt.Printf("%+v\n", *config.LogRetryConfig)
// Output:
// {InsecureConnection:true CACertFile: OtelExporterGRPCEndpoint:localhost:4317 OtelExporterHTTPEndpoint:localhost:4318 ResourceAttributes:[{Key:package_name Value:{vtype:4 numeric:0 stringly:beholder slice:<nil>}} {Key:sender Value:{vtype:4 numeric:0 stringly:beholderclient slice:<nil>}}] EmitterExportTimeout:1s EmitterExportInterval:1s EmitterExportMaxBatchSize:512 EmitterMaxQueueSize:2048 EmitterBatchProcessor:true TraceSampleRatio:1 TraceBatchTimeout:1s TraceSpanExporter:<nil> TraceRetryConfig:<nil> MetricReaderInterval:1s MetricRetryConfig:<nil> MetricViews:[] ChipIngressEmitterEnabled:false ChipIngressEmitterGRPCEndpoint: ChipIngressInsecureConnection:false LogExportTimeout:1s LogExportInterval:1s LogExportMaxBatchSize:512 LogMaxQueueSize:2048 LogBatchProcessor:true LogRetryConfig:<nil> LogStreamingEnabled:false LogLevel:info AuthHeaders:map[] AuthHeadersTTL:0s AuthKeySigner:<nil> AuthPublicKeyHex:}
// {InsecureConnection:true CACertFile: OtelExporterGRPCEndpoint:localhost:4317 OtelExporterHTTPEndpoint:localhost:4318 ResourceAttributes:[{Key:package_name Value:{vtype:4 numeric:0 stringly:beholder slice:<nil>}} {Key:sender Value:{vtype:4 numeric:0 stringly:beholderclient slice:<nil>}}] EmitterExportTimeout:1s EmitterExportInterval:1s EmitterExportMaxBatchSize:512 EmitterMaxQueueSize:2048 EmitterBatchProcessor:true TraceSampleRatio:1 TraceBatchTimeout:1s TraceSpanExporter:<nil> TraceRetryConfig:<nil> TraceCompressor:gzip MetricReaderInterval:1s MetricRetryConfig:<nil> MetricViews:[] MetricCompressor:gzip ChipIngressEmitterEnabled:false ChipIngressEmitterGRPCEndpoint: ChipIngressInsecureConnection:false LogExportTimeout:1s LogExportInterval:1s LogExportMaxBatchSize:512 LogMaxQueueSize:2048 LogBatchProcessor:true LogRetryConfig:<nil> LogStreamingEnabled:false LogLevel:info LogCompressor:gzip AuthHeaders:map[] AuthHeadersTTL:0s AuthKeySigner:<nil> AuthPublicKeyHex:}
// {InitialInterval:5s MaxInterval:30s MaxElapsedTime:1m0s}
}
13 changes: 12 additions & 1 deletion pkg/loop/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,9 @@ const (
envTelemetryLogExportMaxBatchSize = "CL_TELEMETRY_LOG_EXPORT_MAX_BATCH_SIZE"
envTelemetryLogExportInterval = "CL_TELEMETRY_LOG_EXPORT_INTERVAL"
envTelemetryLogMaxQueueSize = "CL_TELEMETRY_LOG_MAX_QUEUE_SIZE"
envTelemetryTraceCompressor = "CL_TELEMETRY_TRACE_COMPRESSOR"
envTelemetryMetricCompressor = "CL_TELEMETRY_METRIC_COMPRESSOR"
envTelemetryLogCompressor = "CL_TELEMETRY_LOG_COMPRESSOR"

envChipIngressEndpoint = "CL_CHIP_INGRESS_ENDPOINT"
envChipIngressInsecureConnection = "CL_CHIP_INGRESS_INSECURE_CONNECTION"
Expand Down Expand Up @@ -133,6 +136,9 @@ type EnvConfig struct {
TelemetryLogExportMaxBatchSize int
TelemetryLogExportInterval time.Duration
TelemetryLogMaxQueueSize int
TelemetryTraceCompressor string
TelemetryMetricCompressor string
TelemetryLogCompressor string

ChipIngressEndpoint string
ChipIngressInsecureConnection bool
Expand Down Expand Up @@ -209,6 +215,9 @@ func (e *EnvConfig) AsCmdEnv() (env []string) {
add(envTelemetryLogExportMaxBatchSize, strconv.Itoa(e.TelemetryLogExportMaxBatchSize))
add(envTelemetryLogExportInterval, e.TelemetryLogExportInterval.String())
add(envTelemetryLogMaxQueueSize, strconv.Itoa(e.TelemetryLogMaxQueueSize))
add(envTelemetryTraceCompressor, e.TelemetryTraceCompressor)
add(envTelemetryMetricCompressor, e.TelemetryMetricCompressor)
add(envTelemetryLogCompressor, e.TelemetryLogCompressor)

add(envChipIngressEndpoint, e.ChipIngressEndpoint)
add(envChipIngressInsecureConnection, strconv.FormatBool(e.ChipIngressInsecureConnection))
Expand Down Expand Up @@ -406,7 +415,9 @@ func (e *EnvConfig) parse() error {
if err != nil {
return fmt.Errorf("failed to parse %s: %w", envTelemetryLogMaxQueueSize, err)
}

e.TelemetryTraceCompressor = os.Getenv(envTelemetryTraceCompressor)
e.TelemetryMetricCompressor = os.Getenv(envTelemetryMetricCompressor)
e.TelemetryLogCompressor = os.Getenv(envTelemetryLogCompressor)
// Optional
e.ChipIngressEndpoint = os.Getenv(envChipIngressEndpoint)
e.ChipIngressInsecureConnection, err = getBool(envChipIngressInsecureConnection)
Expand Down
3 changes: 3 additions & 0 deletions pkg/loop/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -155,6 +155,7 @@ func (s *Server) start(opts ...ServerOpt) error {
OtelExporterGRPCEndpoint: s.EnvConfig.TelemetryEndpoint,
ResourceAttributes: append(attributes, s.EnvConfig.TelemetryAttributes.AsStringAttributes()...),
TraceSampleRatio: s.EnvConfig.TelemetryTraceSampleRatio,
TraceCompressor: s.EnvConfig.TelemetryTraceCompressor,
EmitterBatchProcessor: s.EnvConfig.TelemetryEmitterBatchProcessor,
EmitterExportTimeout: s.EnvConfig.TelemetryEmitterExportTimeout,
EmitterExportInterval: s.EnvConfig.TelemetryEmitterExportInterval,
Expand All @@ -167,9 +168,11 @@ func (s *Server) start(opts ...ServerOpt) error {
LogExportMaxBatchSize: s.EnvConfig.TelemetryLogExportMaxBatchSize,
LogExportInterval: s.EnvConfig.TelemetryLogExportInterval,
LogMaxQueueSize: s.EnvConfig.TelemetryLogMaxQueueSize,
LogCompressor: s.EnvConfig.TelemetryLogCompressor,
ChipIngressEmitterEnabled: s.EnvConfig.ChipIngressEndpoint != "",
ChipIngressEmitterGRPCEndpoint: s.EnvConfig.ChipIngressEndpoint,
ChipIngressInsecureConnection: s.EnvConfig.ChipIngressInsecureConnection,
MetricCompressor: s.EnvConfig.TelemetryMetricCompressor,
}

// Configure beholder auth - the client will determine rotating vs static mode
Expand Down
Loading