Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
19 commits
Select commit Hold shift + click to select a range
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions internal/otelcollector/config/common/config_builder.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@ func defaultService() ServiceConfig {
},
},
},
Level: "detailed",
},
Logs: Logs{
Level: "info",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,21 +16,53 @@ import (
// OTLP EXPORTER CONFIG BUILDER
// =============================================================================

// SendingQueue constructors and options

type SendingQueueOption func(*SendingQueue)

func WithSizer(sizer Sizer) SendingQueueOption {
return func(sq *SendingQueue) {
sq.Sizer = sizer
}
}

func WithBatch(batch Batch) SendingQueueOption {
return func(sq *SendingQueue) {
sq.Batch = batch
}
}

func NewSendingQueue(queueSize int, opts ...SendingQueueOption) SendingQueue {
if queueSize == 0 {
return SendingQueue{Enabled: false}
}

sq := SendingQueue{
Enabled: true,
QueueSize: queueSize,
}
for _, opt := range opts {
opt(&sq)
}

return sq
}

type OTLPExporterConfigBuilder struct {
reader client.Reader
otlpOutput *telemetryv1beta1.OTLPOutput
pipelineRef pipelines.PipelineRef
queueSize int
reader client.Reader
otlpOutput *telemetryv1beta1.OTLPOutput
pipelineRef pipelines.PipelineRef
sendingQueue SendingQueue
}

type EnvVars map[string][]byte

func NewOTLPExporterConfigBuilder(reader client.Reader, otlpOutput *telemetryv1beta1.OTLPOutput, pipelineRef pipelines.PipelineRef, queueSize int) *OTLPExporterConfigBuilder {
func NewOTLPExporterConfigBuilder(reader client.Reader, otlpOutput *telemetryv1beta1.OTLPOutput, pipelineRef pipelines.PipelineRef, sendingQueue SendingQueue) *OTLPExporterConfigBuilder {
return &OTLPExporterConfigBuilder{
reader: reader,
otlpOutput: otlpOutput,
pipelineRef: pipelineRef,
queueSize: queueSize,
reader: reader,
otlpOutput: otlpOutput,
pipelineRef: pipelineRef,
sendingQueue: sendingQueue,
}
}

Expand All @@ -40,23 +72,15 @@ func (cb *OTLPExporterConfigBuilder) OTLPExporter(ctx context.Context) (*OTLPExp
return nil, nil, fmt.Errorf("failed to make env vars: %w", err)
}

exporter := otlpExporter(cb.otlpOutput, cb.pipelineRef, envVars, cb.queueSize)
exporter := otlpExporter(cb.otlpOutput, cb.pipelineRef, envVars, cb.sendingQueue)

return exporter, envVars, nil
}

func otlpExporter(otlpOutput *telemetryv1beta1.OTLPOutput, pipelineRef pipelines.PipelineRef, envVars map[string][]byte, queueSize int) *OTLPExporterConfig {
func otlpExporter(otlpOutput *telemetryv1beta1.OTLPOutput, pipelineRef pipelines.PipelineRef, envVars map[string][]byte, sendingQueue SendingQueue) *OTLPExporterConfig {
otlpEndpointVariable := formatEnvVarKey(otlpEndpointVariablePrefix, pipelineRef)
otlpEndpointValue := string(envVars[otlpEndpointVariable])

sendingQueue := SendingQueue{
Enabled: false,
}
if queueSize != 0 {
sendingQueue.QueueSize = queueSize
sendingQueue.Enabled = true
}

compression := string(otlpOutput.Compression)
if compression == "" {
compression = string(telemetryv1beta1.OTLPCompressionGzip)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ func TestMakeExporterConfig(t *testing.T) {
Endpoint: telemetryv1beta1.ValueType{Value: "otlp-endpoint"},
}

cb := NewOTLPExporterConfigBuilder(fake.NewClientBuilder().Build(), output, traceRefTest(), 512)
cb := NewOTLPExporterConfigBuilder(fake.NewClientBuilder().Build(), output, traceRefTest(), NewSendingQueue(512))
otlpExporterConfig, envVars, err := cb.OTLPExporter(t.Context())
require.NoError(t, err)
require.NotNil(t, envVars)
Expand All @@ -62,7 +62,7 @@ func TestMakeExporterConfigTraceWithPath(t *testing.T) {
Protocol: "http",
}

cb := NewOTLPExporterConfigBuilder(fake.NewClientBuilder().Build(), output, traceRefTest(), 512)
cb := NewOTLPExporterConfigBuilder(fake.NewClientBuilder().Build(), output, traceRefTest(), NewSendingQueue(512))
otlpExporterConfig, envVars, err := cb.OTLPExporter(t.Context())
require.NoError(t, err)
require.NotNil(t, envVars)
Expand All @@ -81,7 +81,7 @@ func TestMakeExporterConfigMetricWithPath(t *testing.T) {
Protocol: "http",
}

cb := NewOTLPExporterConfigBuilder(fake.NewClientBuilder().Build(), output, metricRefTest(), 512)
cb := NewOTLPExporterConfigBuilder(fake.NewClientBuilder().Build(), output, metricRefTest(), NewSendingQueue(512))
otlpExporterConfig, envVars, err := cb.OTLPExporter(t.Context())
require.NoError(t, err)
require.NotNil(t, envVars)
Expand All @@ -104,7 +104,7 @@ func TestMakeExporterConfigWithBasicAuth(t *testing.T) {
},
}

cb := NewOTLPExporterConfigBuilder(fake.NewClientBuilder().Build(), output, traceRefTest(), 512)
cb := NewOTLPExporterConfigBuilder(fake.NewClientBuilder().Build(), output, traceRefTest(), NewSendingQueue(512))
otlpExporterConfig, envVars, err := cb.OTLPExporter(t.Context())
require.NoError(t, err)
require.NotNil(t, envVars)
Expand All @@ -130,7 +130,7 @@ func TestMakeExporterConfigWithOAuth2(t *testing.T) {
},
}

cb := NewOTLPExporterConfigBuilder(fake.NewClientBuilder().Build(), output, traceRefTest(), 512)
cb := NewOTLPExporterConfigBuilder(fake.NewClientBuilder().Build(), output, traceRefTest(), NewSendingQueue(512))
otlpExporterConfig, envVars, err := cb.OTLPExporter(t.Context())
require.NoError(t, err)
require.NotNil(t, envVars)
Expand All @@ -153,7 +153,7 @@ func TestMakeExporterConfigWithCustomHeaders(t *testing.T) {
Headers: headers,
}

cb := NewOTLPExporterConfigBuilder(fake.NewClientBuilder().Build(), output, traceRefTest(), 512)
cb := NewOTLPExporterConfigBuilder(fake.NewClientBuilder().Build(), output, traceRefTest(), NewSendingQueue(512))
otlpExporterConfig, envVars, err := cb.OTLPExporter(t.Context())
require.NoError(t, err)
require.NotNil(t, envVars)
Expand All @@ -171,7 +171,7 @@ func TestMakeExporterConfigWithTLSInsecure(t *testing.T) {
TLS: tls,
}

cb := NewOTLPExporterConfigBuilder(fake.NewClientBuilder().Build(), output, traceRefTest(), 512)
cb := NewOTLPExporterConfigBuilder(fake.NewClientBuilder().Build(), output, traceRefTest(), NewSendingQueue(512))
otlpExporterConfig, envVars, err := cb.OTLPExporter(t.Context())
require.NoError(t, err)
require.NotNil(t, envVars)
Expand All @@ -189,7 +189,7 @@ func TestMakeExporterConfigWithTLSInsecureSkipVerify(t *testing.T) {
TLS: tls,
}

cb := NewOTLPExporterConfigBuilder(fake.NewClientBuilder().Build(), output, traceRefTest(), 512)
cb := NewOTLPExporterConfigBuilder(fake.NewClientBuilder().Build(), output, traceRefTest(), NewSendingQueue(512))
otlpExporterConfig, envVars, err := cb.OTLPExporter(t.Context())
require.NoError(t, err)
require.NotNil(t, envVars)
Expand Down Expand Up @@ -218,7 +218,7 @@ func TestMakeExporterConfigWithmTLS(t *testing.T) {
TLS: tls,
}

cb := NewOTLPExporterConfigBuilder(fake.NewClientBuilder().Build(), output, traceRefTest(), 512)
cb := NewOTLPExporterConfigBuilder(fake.NewClientBuilder().Build(), output, traceRefTest(), NewSendingQueue(512))
otlpExporterConfig, envVars, err := cb.OTLPExporter(t.Context())
require.NoError(t, err)
require.NotNil(t, envVars)
Expand Down Expand Up @@ -262,7 +262,7 @@ func TestMakeExporterConfigCompression(t *testing.T) {
Compression: tt.compression,
}

cb := NewOTLPExporterConfigBuilder(fake.NewClientBuilder().Build(), output, traceRefTest(), 512)
cb := NewOTLPExporterConfigBuilder(fake.NewClientBuilder().Build(), output, traceRefTest(), NewSendingQueue(512))
otlpExporterConfig, envVars, err := cb.OTLPExporter(t.Context())
require.NoError(t, err)
require.NotNil(t, envVars)
Expand Down
22 changes: 20 additions & 2 deletions internal/otelcollector/config/common/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,14 @@ import (
telemetryv1beta1 "github.com/kyma-project/telemetry-manager/apis/telemetry/v1beta1"
)

type Sizer string

const (
SizerBytes Sizer = "bytes"
SizerItems Sizer = "items"
SizerRequests Sizer = "requests"
)

// =============================================================================
// BASE CONFIGURATION TYPES
// =============================================================================
Expand Down Expand Up @@ -76,6 +84,7 @@ type Telemetry struct {

type TelemetryMetrics struct {
Readers []TelemetryMetricReader `yaml:"readers"`
Level string `yaml:"level,omitempty"`
}

type TelemetryMetricReader struct {
Expand Down Expand Up @@ -152,8 +161,17 @@ type TLS struct {
}

type SendingQueue struct {
Enabled bool `yaml:"enabled"`
QueueSize int `yaml:"queue_size"`
Enabled bool `yaml:"enabled"`
QueueSize int `yaml:"queue_size"`
Sizer Sizer `yaml:"sizer,omitempty"`
Batch Batch `yaml:"batch,omitempty"`
}

type Batch struct {
MinSize int `yaml:"min_size,omitempty"`
MaxSize int `yaml:"max_size,omitempty"`
FlushTimeout time.Duration `yaml:"flush_timeout,omitempty"`
Comment thread
jeffreylimnardy marked this conversation as resolved.
Sizer Sizer `yaml:"sizer,omitempty"`
}

type RetryOnFailure struct {
Expand Down
24 changes: 23 additions & 1 deletion internal/otelcollector/config/logagent/config_builder.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import (
"path/filepath"
"slices"
"strings"
"time"

"sigs.k8s.io/controller-runtime/pkg/client"

Expand All @@ -19,6 +20,20 @@ import (

const checkpointVolumePathSubdir = "telemetry-log-agent/file-log-receiver"

// Exporter sending queue configuration constants, see
// https://github.com/kyma-project/telemetry-manager/blob/main/docs/contributor/pocs/consistent-batching-across-components/02-log-agent-batching.md.
const (
// maximum queue size in bytes (200 MB)
exporterQueueSize = 200_000_000
// minimum batch size in bytes (2 MB)
exporterBatchMinSize = 2_000_000
// maximum batch size in bytes (4 MB)
exporterBatchMaxSize = 4_000_000
// maximum time before flusing a batch
// maximum time before flushing a batch
exporterBatchFlushTimeout = 10 * time.Second
Comment thread
jeffreylimnardy marked this conversation as resolved.
)

type buildComponentFunc = common.BuildComponentFunc[*telemetryv1beta1.LogPipeline]

type Builder struct {
Expand Down Expand Up @@ -227,7 +242,14 @@ func (b *Builder) addOTLPExporter() buildComponentFunc {
b.Reader,
lp.Spec.Output.OTLP,
pipelines.LogPipelineRef(lp),
0, // queue size is set to 0 for now, as the queue is disabled
common.NewSendingQueue(exporterQueueSize,
common.WithSizer(common.SizerBytes),
common.WithBatch(common.Batch{
MinSize: exporterBatchMinSize,
MaxSize: exporterBatchMaxSize,
FlushTimeout: exporterBatchFlushTimeout,
}),
),
)

return otlpExporterBuilder.OTLPExporter(ctx)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@ service:
without_scope_info: false
without_type_suffix: false
without_units: false
level: detailed
logs:
level: info
encoding: json
Expand Down Expand Up @@ -239,6 +240,14 @@ exporters:
otlp_http/logpipeline-test:
logs_endpoint: ${OTLP_ENDPOINT_LOGPIPELINE_TEST}
compression: gzip
sending_queue:
enabled: true
queue_size: 200000000
sizer: bytes
batch:
min_size: 2000000
max_size: 4000000
flush_timeout: 10s
retry_on_failure:
enabled: true
initial_interval: 5s
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@ service:
without_scope_info: false
without_type_suffix: false
without_units: false
level: detailed
logs:
level: info
encoding: json
Expand Down Expand Up @@ -239,6 +240,14 @@ exporters:
otlp_http/logpipeline-test:
endpoint: ${OTLP_ENDPOINT_LOGPIPELINE_TEST}
compression: gzip
sending_queue:
enabled: true
queue_size: 200000000
sizer: bytes
batch:
min_size: 2000000
max_size: 4000000
flush_timeout: 10s
retry_on_failure:
enabled: true
initial_interval: 5s
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@ service:
without_scope_info: false
without_type_suffix: false
without_units: false
level: detailed
logs:
level: info
encoding: json
Expand Down Expand Up @@ -246,6 +247,14 @@ exporters:
otlp_http/logpipeline-test:
endpoint: ${OTLP_ENDPOINT_LOGPIPELINE_TEST}
compression: gzip
sending_queue:
enabled: true
queue_size: 200000000
sizer: bytes
batch:
min_size: 2000000
max_size: 4000000
flush_timeout: 10s
retry_on_failure:
enabled: true
initial_interval: 5s
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@ service:
without_scope_info: false
without_type_suffix: false
without_units: false
level: detailed
logs:
level: info
encoding: json
Expand Down Expand Up @@ -259,6 +260,14 @@ exporters:
tls:
insecure: true
compression: gzip
sending_queue:
enabled: true
queue_size: 200000000
sizer: bytes
batch:
min_size: 2000000
max_size: 4000000
flush_timeout: 10s
retry_on_failure:
enabled: true
initial_interval: 5s
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@ service:
without_scope_info: false
without_type_suffix: false
without_units: false
level: detailed
logs:
level: info
encoding: json
Expand Down Expand Up @@ -238,6 +239,14 @@ exporters:
otlp_grpc/logpipeline-test:
endpoint: ${OTLP_ENDPOINT_LOGPIPELINE_TEST}
compression: gzip
sending_queue:
enabled: true
queue_size: 200000000
sizer: bytes
batch:
min_size: 2000000
max_size: 4000000
flush_timeout: 10s
retry_on_failure:
enabled: true
initial_interval: 5s
Expand Down
Loading
Loading