diff --git a/pkg/main.go b/pkg/main.go index a76e590..823e159 100644 --- a/pkg/main.go +++ b/pkg/main.go @@ -194,7 +194,8 @@ func applyRelabelConfigs(labels model.LabelSet) model.LabelSet { builder.Add(string(name), string(value)) } - // Sort labels as required by Process + // Sort labels as required by Process (binary search) + builder.Sort() promLabels := builder.Labels() // Apply relabeling @@ -219,10 +220,6 @@ func applyLabels(labels model.LabelSet) model.LabelSet { delete(finalLabels, dropLabel) } - // Apply relabeling after merging extra labels and dropping labels - finalLabels = applyRelabelConfigs(finalLabels) - - // Skip entries with no labels after relabeling if len(finalLabels) == 0 { return nil } diff --git a/pkg/promtail.go b/pkg/promtail.go index 122ff87..52ec948 100644 --- a/pkg/promtail.go +++ b/pkg/promtail.go @@ -29,6 +29,7 @@ const ( maxRetries = 10 reservedLabelTenantID = "__tenant_id__" + reservedLabelLogLine = "__log_line__" userAgent = "lambda-promtail" ) @@ -62,6 +63,26 @@ func newBatch(ctx context.Context, pClient Client, processingPipeline *LokiStage } func (b *batch) add(ctx context.Context, e entry) error { + // Apply relabel configs with log line available for content-based filtering. + // The log line is injected as a temporary __log_line__ label so relabel rules + // can match against it (e.g. to drop entries via "source_labels": ["__log_line__"]). + // This runs before pipeline stages to preserve the original processing order. + if len(relabelConfigs) > 0 && e.labels != nil { + labelsWithLine := make(model.LabelSet, len(e.labels)+1) + for k, v := range e.labels { + labelsWithLine[k] = v + } + labelsWithLine[model.LabelName(reservedLabelLogLine)] = model.LabelValue(e.entry.Line) + + relabeled := applyRelabelConfigs(labelsWithLine) + delete(relabeled, model.LabelName(reservedLabelLogLine)) + + if len(relabeled) == 0 { + return nil + } + e.labels = relabeled + } + if b.processor.Size() > 0 { // Apply pipeline stages to entry stageEntry := stages.Entry{ diff --git a/pkg/relabel_test.go b/pkg/relabel_test.go index 95e13e3..f9275b9 100644 --- a/pkg/relabel_test.go +++ b/pkg/relabel_test.go @@ -1,11 +1,15 @@ package main import ( + "context" "testing" + "time" + "github.com/prometheus/common/model" "github.com/prometheus/prometheus/model/relabel" "github.com/stretchr/testify/require" + "github.com/grafana/loki/v3/pkg/logproto" "github.com/grafana/regexp" ) @@ -127,3 +131,240 @@ func TestParseRelabelConfigs(t *testing.T) { }) } } + +func setupBatchTestGlobals(t *testing.T) { + t.Helper() + origBatchSize := batchSize + origRelabelConfigs := relabelConfigs + t.Cleanup(func() { + batchSize = origBatchSize + relabelConfigs = origRelabelConfigs + }) + batchSize = 131072 +} + +func TestBatchAddDropsLogLinesViaRelabelConfig(t *testing.T) { + setupBatchTestGlobals(t) + + configs, err := parseRelabelConfigs(`[{"source_labels":["__log_line__"],"regex":"END RequestId:.*","action":"drop"}]`) + require.NoError(t, err) + relabelConfigs = configs + + b := &batch{ + streams: map[string]*logproto.Stream{}, + processor: &LokiStages{}, + } + ctx := context.Background() + labels := model.LabelSet{ + model.LabelName("job"): model.LabelValue("test"), + } + + // Entry matching the drop pattern should be dropped + err = b.add(ctx, entry{labels: labels.Clone(), entry: logproto.Entry{ + Line: "END RequestId: abc-123", + Timestamp: time.Now(), + }}) + require.NoError(t, err) + + // Entry not matching the drop pattern should be kept + err = b.add(ctx, entry{labels: labels.Clone(), entry: logproto.Entry{ + Line: "Hello World", + Timestamp: time.Now(), + }}) + require.NoError(t, err) + + require.Len(t, b.streams, 1) + for _, stream := range b.streams { + require.Len(t, stream.Entries, 1) + require.Equal(t, "Hello World", stream.Entries[0].Line) + } +} + +func TestBatchAddKeepsEntriesWithNoRelabelConfig(t *testing.T) { + setupBatchTestGlobals(t) + relabelConfigs = nil + + b := &batch{ + streams: map[string]*logproto.Stream{}, + processor: &LokiStages{}, + } + ctx := context.Background() + labels := model.LabelSet{ + model.LabelName("job"): model.LabelValue("test"), + } + + err := b.add(ctx, entry{labels: labels.Clone(), entry: logproto.Entry{ + Line: "END RequestId: abc-123", + Timestamp: time.Now(), + }}) + require.NoError(t, err) + + err = b.add(ctx, entry{labels: labels.Clone(), entry: logproto.Entry{ + Line: "Hello World", + Timestamp: time.Now(), + }}) + require.NoError(t, err) + + require.Len(t, b.streams, 1) + for _, stream := range b.streams { + require.Len(t, stream.Entries, 2) + } +} + +func TestBatchAddKeepActionFiltersLogLines(t *testing.T) { + setupBatchTestGlobals(t) + + // Keep only lines starting with "REPORT" + configs, err := parseRelabelConfigs(`[{"source_labels":["__log_line__"],"regex":"REPORT.*","action":"keep"}]`) + require.NoError(t, err) + relabelConfigs = configs + + b := &batch{ + streams: map[string]*logproto.Stream{}, + processor: &LokiStages{}, + } + ctx := context.Background() + labels := model.LabelSet{ + model.LabelName("job"): model.LabelValue("test"), + } + + err = b.add(ctx, entry{labels: labels.Clone(), entry: logproto.Entry{ + Line: "START RequestId: abc-123", + Timestamp: time.Now(), + }}) + require.NoError(t, err) + + err = b.add(ctx, entry{labels: labels.Clone(), entry: logproto.Entry{ + Line: "REPORT RequestId: abc-123 Duration: 100ms", + Timestamp: time.Now(), + }}) + require.NoError(t, err) + + err = b.add(ctx, entry{labels: labels.Clone(), entry: logproto.Entry{ + Line: "END RequestId: abc-123", + Timestamp: time.Now(), + }}) + require.NoError(t, err) + + require.Len(t, b.streams, 1) + for _, stream := range b.streams { + require.Len(t, stream.Entries, 1) + require.Equal(t, "REPORT RequestId: abc-123 Duration: 100ms", stream.Entries[0].Line) + } +} + +func TestBatchAddLogLineLabelNotInOutput(t *testing.T) { + setupBatchTestGlobals(t) + + // Use a replace action so entries are kept, not dropped + configs, err := parseRelabelConfigs(`[{"source_labels":["__log_line__"],"regex":"(.*)","target_label":"extracted","action":"replace"}]`) + require.NoError(t, err) + relabelConfigs = configs + + b := &batch{ + streams: map[string]*logproto.Stream{}, + processor: &LokiStages{}, + } + ctx := context.Background() + labels := model.LabelSet{ + model.LabelName("job"): model.LabelValue("test"), + } + + err = b.add(ctx, entry{labels: labels.Clone(), entry: logproto.Entry{ + Line: "test log line", + Timestamp: time.Now(), + }}) + require.NoError(t, err) + + require.Len(t, b.streams, 1) + for streamLabels := range b.streams { + require.NotContains(t, streamLabels, reservedLabelLogLine) + } +} + +func TestBatchAddLabelOnlyRelabelStillWorks(t *testing.T) { + setupBatchTestGlobals(t) + + // Drop entries where label "env" equals "debug" + configs, err := parseRelabelConfigs(`[{"source_labels":["env"],"regex":"debug","action":"drop"}]`) + require.NoError(t, err) + relabelConfigs = configs + + b := &batch{ + streams: map[string]*logproto.Stream{}, + processor: &LokiStages{}, + } + ctx := context.Background() + + // Entry with env=debug should be dropped + err = b.add(ctx, entry{ + labels: model.LabelSet{ + model.LabelName("job"): model.LabelValue("test"), + model.LabelName("env"): model.LabelValue("debug"), + }, + entry: logproto.Entry{ + Line: "debug log", + Timestamp: time.Now(), + }, + }) + require.NoError(t, err) + + // Entry with env=production should be kept + err = b.add(ctx, entry{ + labels: model.LabelSet{ + model.LabelName("job"): model.LabelValue("test"), + model.LabelName("env"): model.LabelValue("production"), + }, + entry: logproto.Entry{ + Line: "production log", + Timestamp: time.Now(), + }, + }) + require.NoError(t, err) + + require.Len(t, b.streams, 1) + for _, stream := range b.streams { + require.Len(t, stream.Entries, 1) + require.Equal(t, "production log", stream.Entries[0].Line) + } +} + +func TestBatchAddDropMultiplePatterns(t *testing.T) { + setupBatchTestGlobals(t) + + // Drop lines matching "END RequestId:" OR "START RequestId:" + configs, err := parseRelabelConfigs(`[{"source_labels":["__log_line__"],"regex":"(END|START) RequestId:.*","action":"drop"}]`) + require.NoError(t, err) + relabelConfigs = configs + + b := &batch{ + streams: map[string]*logproto.Stream{}, + processor: &LokiStages{}, + } + ctx := context.Background() + labels := model.LabelSet{ + model.LabelName("job"): model.LabelValue("test"), + } + + lines := []string{ + "START RequestId: abc-123", + "Processing request...", + "END RequestId: abc-123", + "REPORT RequestId: abc-123 Duration: 50ms", + } + + for _, line := range lines { + err = b.add(ctx, entry{labels: labels.Clone(), entry: logproto.Entry{ + Line: line, + Timestamp: time.Now(), + }}) + require.NoError(t, err) + } + + require.Len(t, b.streams, 1) + for _, stream := range b.streams { + require.Len(t, stream.Entries, 2) + require.Equal(t, "Processing request...", stream.Entries[0].Line) + require.Equal(t, "REPORT RequestId: abc-123 Duration: 50ms", stream.Entries[1].Line) + } +}