diff --git a/pkg/beholder/client.go b/pkg/beholder/client.go index 277eaeb40..7d43a30df 100644 --- a/pkg/beholder/client.go +++ b/pkg/beholder/client.go @@ -28,6 +28,8 @@ import ( "github.com/smartcontractkit/chainlink-common/pkg/chipingress" ) +const defaultGRPCCompressor = "gzip" + type Emitter interface { // Sends message with bytes and attributes to OTel Collector Emit(ctx context.Context, body []byte, attrKVs ...any) error @@ -362,6 +364,13 @@ func newTracerProvider(config Config, resource *sdkresource.Resource, auth Auth, // No auth default: } + switch compressor := config.TraceCompressor; compressor { + case "none": + case "": + exporterOpts = append(exporterOpts, otlptracegrpc.WithCompressor(defaultGRPCCompressor)) + default: + exporterOpts = append(exporterOpts, otlptracegrpc.WithCompressor(compressor)) + } if config.TraceRetryConfig != nil { // NOTE: By default, the retry is enabled in the OTel SDK exporterOpts = append(exporterOpts, otlptracegrpc.WithRetry(otlptracegrpc.RetryConfig{ @@ -401,6 +410,13 @@ func newMeterProvider(cfg Config, resource *sdkresource.Resource, auth Auth, cre otlpmetricgrpc.WithTLSCredentials(creds), otlpmetricgrpc.WithEndpoint(cfg.OtelExporterGRPCEndpoint), } + switch compressor := cfg.MetricCompressor; compressor { + case "none": + case "": + opts = append(opts, otlpmetricgrpc.WithCompressor(defaultGRPCCompressor)) + default: + opts = append(opts, otlpmetricgrpc.WithCompressor(compressor)) + } switch { // Rotating auth @@ -454,6 +470,13 @@ func newLoggerOpts(cfg Config, auth Auth, creds credentials.TransportCredentials otlploggrpc.WithTLSCredentials(creds), otlploggrpc.WithEndpoint(cfg.OtelExporterGRPCEndpoint), } + switch compressor := cfg.LogCompressor; compressor { + case "none": + case "": + opts = append(opts, otlploggrpc.WithCompressor(defaultGRPCCompressor)) + default: + opts = append(opts, otlploggrpc.WithCompressor(compressor)) + } // Log exporter auth switch { // Rotating auth diff --git a/pkg/beholder/config.go b/pkg/beholder/config.go index a6d501f81..f3e4561b6 100644 --- a/pkg/beholder/config.go +++ b/pkg/beholder/config.go @@ -29,11 +29,15 @@ type Config struct { TraceBatchTimeout time.Duration TraceSpanExporter trace.SpanExporter // optional additional exporter TraceRetryConfig *RetryConfig + // TraceCompressor sets the gRPC compressor for traces. Valid values: "gzip" (default), "none". + TraceCompressor string // OTel Metric MetricReaderInterval time.Duration MetricRetryConfig *RetryConfig MetricViews []metric.View + // MetricCompressor sets the gRPC compressor for metrics. Valid values: "gzip" (default), "none". + MetricCompressor string // Custom Events via Chip Ingress Emitter ChipIngressEmitterEnabled bool @@ -50,6 +54,8 @@ type Config struct { LogRetryConfig *RetryConfig LogStreamingEnabled bool // Enable logs streaming to the OTel log exporter LogLevel zapcore.Level // Log level for telemetry streaming + // LogCompressor sets the gRPC compressor for logs. Valid values: "gzip" (default), "none". + LogCompressor string // Auth // AuthHeaders serves two purposes: @@ -110,10 +116,12 @@ func DefaultConfig() Config { // Trace TraceSampleRatio: 1, TraceBatchTimeout: 1 * time.Second, + TraceCompressor: "gzip", // OTel trace exporter retry config TraceRetryConfig: defaultRetryConfig.Copy(), // Metric MetricReaderInterval: 1 * time.Second, + MetricCompressor: "gzip", // OTel metric exporter retry config MetricRetryConfig: defaultRetryConfig.Copy(), // Log @@ -124,6 +132,7 @@ func DefaultConfig() Config { LogBatchProcessor: true, LogStreamingEnabled: true, // Enable logs streaming by default LogLevel: zapcore.InfoLevel, + LogCompressor: "gzip", // Auth (defaults to static auth mode with TTL=0) AuthHeadersTTL: 0, } diff --git a/pkg/beholder/config_test.go b/pkg/beholder/config_test.go index 4e9d7ab3f..ae156db9c 100644 --- a/pkg/beholder/config_test.go +++ b/pkg/beholder/config_test.go @@ -36,10 +36,12 @@ func ExampleConfig() { // Trace TraceSampleRatio: 1, TraceBatchTimeout: 1 * time.Second, + TraceCompressor: "gzip", // OTel trace exporter retry config TraceRetryConfig: nil, // Metric MetricReaderInterval: 1 * time.Second, + MetricCompressor: "gzip", // OTel metric exporter retry config MetricRetryConfig: nil, // Log @@ -50,6 +52,7 @@ func ExampleConfig() { LogBatchProcessor: true, LogStreamingEnabled: false, // Disable streaming logs by default LogLevel: zapcore.InfoLevel, // Default log level + LogCompressor: "gzip", // Auth AuthPublicKeyHex: "", AuthHeaders: map[string]string{}, @@ -64,6 +67,6 @@ func ExampleConfig() { } fmt.Printf("%+v\n", *config.LogRetryConfig) // Output: - // {InsecureConnection:true CACertFile: OtelExporterGRPCEndpoint:localhost:4317 OtelExporterHTTPEndpoint:localhost:4318 ResourceAttributes:[{Key:package_name Value:{vtype:4 numeric:0 stringly:beholder slice:}} {Key:sender Value:{vtype:4 numeric:0 stringly:beholderclient slice:}}] EmitterExportTimeout:1s EmitterExportInterval:1s EmitterExportMaxBatchSize:512 EmitterMaxQueueSize:2048 EmitterBatchProcessor:true TraceSampleRatio:1 TraceBatchTimeout:1s TraceSpanExporter: TraceRetryConfig: MetricReaderInterval:1s MetricRetryConfig: MetricViews:[] ChipIngressEmitterEnabled:false ChipIngressEmitterGRPCEndpoint: ChipIngressInsecureConnection:false LogExportTimeout:1s LogExportInterval:1s LogExportMaxBatchSize:512 LogMaxQueueSize:2048 LogBatchProcessor:true LogRetryConfig: LogStreamingEnabled:false LogLevel:info AuthHeaders:map[] AuthHeadersTTL:0s AuthKeySigner: AuthPublicKeyHex:} + // {InsecureConnection:true CACertFile: OtelExporterGRPCEndpoint:localhost:4317 OtelExporterHTTPEndpoint:localhost:4318 ResourceAttributes:[{Key:package_name Value:{vtype:4 numeric:0 stringly:beholder slice:}} {Key:sender Value:{vtype:4 numeric:0 stringly:beholderclient slice:}}] EmitterExportTimeout:1s EmitterExportInterval:1s EmitterExportMaxBatchSize:512 EmitterMaxQueueSize:2048 EmitterBatchProcessor:true TraceSampleRatio:1 TraceBatchTimeout:1s TraceSpanExporter: TraceRetryConfig: TraceCompressor:gzip MetricReaderInterval:1s MetricRetryConfig: MetricViews:[] MetricCompressor:gzip ChipIngressEmitterEnabled:false ChipIngressEmitterGRPCEndpoint: ChipIngressInsecureConnection:false LogExportTimeout:1s LogExportInterval:1s LogExportMaxBatchSize:512 LogMaxQueueSize:2048 LogBatchProcessor:true LogRetryConfig: LogStreamingEnabled:false LogLevel:info LogCompressor:gzip AuthHeaders:map[] AuthHeadersTTL:0s AuthKeySigner: AuthPublicKeyHex:} // {InitialInterval:5s MaxInterval:30s MaxElapsedTime:1m0s} } diff --git a/pkg/loop/config.go b/pkg/loop/config.go index 442a91bed..7f77f155b 100644 --- a/pkg/loop/config.go +++ b/pkg/loop/config.go @@ -70,6 +70,9 @@ const ( envTelemetryLogExportMaxBatchSize = "CL_TELEMETRY_LOG_EXPORT_MAX_BATCH_SIZE" envTelemetryLogExportInterval = "CL_TELEMETRY_LOG_EXPORT_INTERVAL" envTelemetryLogMaxQueueSize = "CL_TELEMETRY_LOG_MAX_QUEUE_SIZE" + envTelemetryTraceCompressor = "CL_TELEMETRY_TRACE_COMPRESSOR" + envTelemetryMetricCompressor = "CL_TELEMETRY_METRIC_COMPRESSOR" + envTelemetryLogCompressor = "CL_TELEMETRY_LOG_COMPRESSOR" envChipIngressEndpoint = "CL_CHIP_INGRESS_ENDPOINT" envChipIngressInsecureConnection = "CL_CHIP_INGRESS_INSECURE_CONNECTION" @@ -133,6 +136,9 @@ type EnvConfig struct { TelemetryLogExportMaxBatchSize int TelemetryLogExportInterval time.Duration TelemetryLogMaxQueueSize int + TelemetryTraceCompressor string + TelemetryMetricCompressor string + TelemetryLogCompressor string ChipIngressEndpoint string ChipIngressInsecureConnection bool @@ -209,6 +215,9 @@ func (e *EnvConfig) AsCmdEnv() (env []string) { add(envTelemetryLogExportMaxBatchSize, strconv.Itoa(e.TelemetryLogExportMaxBatchSize)) add(envTelemetryLogExportInterval, e.TelemetryLogExportInterval.String()) add(envTelemetryLogMaxQueueSize, strconv.Itoa(e.TelemetryLogMaxQueueSize)) + add(envTelemetryTraceCompressor, e.TelemetryTraceCompressor) + add(envTelemetryMetricCompressor, e.TelemetryMetricCompressor) + add(envTelemetryLogCompressor, e.TelemetryLogCompressor) add(envChipIngressEndpoint, e.ChipIngressEndpoint) add(envChipIngressInsecureConnection, strconv.FormatBool(e.ChipIngressInsecureConnection)) @@ -406,7 +415,9 @@ func (e *EnvConfig) parse() error { if err != nil { return fmt.Errorf("failed to parse %s: %w", envTelemetryLogMaxQueueSize, err) } - + e.TelemetryTraceCompressor = os.Getenv(envTelemetryTraceCompressor) + e.TelemetryMetricCompressor = os.Getenv(envTelemetryMetricCompressor) + e.TelemetryLogCompressor = os.Getenv(envTelemetryLogCompressor) // Optional e.ChipIngressEndpoint = os.Getenv(envChipIngressEndpoint) e.ChipIngressInsecureConnection, err = getBool(envChipIngressInsecureConnection) diff --git a/pkg/loop/server.go b/pkg/loop/server.go index 61cb25a8f..3af79cefe 100644 --- a/pkg/loop/server.go +++ b/pkg/loop/server.go @@ -155,6 +155,7 @@ func (s *Server) start(opts ...ServerOpt) error { OtelExporterGRPCEndpoint: s.EnvConfig.TelemetryEndpoint, ResourceAttributes: append(attributes, s.EnvConfig.TelemetryAttributes.AsStringAttributes()...), TraceSampleRatio: s.EnvConfig.TelemetryTraceSampleRatio, + TraceCompressor: s.EnvConfig.TelemetryTraceCompressor, EmitterBatchProcessor: s.EnvConfig.TelemetryEmitterBatchProcessor, EmitterExportTimeout: s.EnvConfig.TelemetryEmitterExportTimeout, EmitterExportInterval: s.EnvConfig.TelemetryEmitterExportInterval, @@ -167,9 +168,11 @@ func (s *Server) start(opts ...ServerOpt) error { LogExportMaxBatchSize: s.EnvConfig.TelemetryLogExportMaxBatchSize, LogExportInterval: s.EnvConfig.TelemetryLogExportInterval, LogMaxQueueSize: s.EnvConfig.TelemetryLogMaxQueueSize, + LogCompressor: s.EnvConfig.TelemetryLogCompressor, ChipIngressEmitterEnabled: s.EnvConfig.ChipIngressEndpoint != "", ChipIngressEmitterGRPCEndpoint: s.EnvConfig.ChipIngressEndpoint, ChipIngressInsecureConnection: s.EnvConfig.ChipIngressInsecureConnection, + MetricCompressor: s.EnvConfig.TelemetryMetricCompressor, } // Configure beholder auth - the client will determine rotating vs static mode