Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 2 additions & 5 deletions pkg/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -194,7 +194,8 @@ func applyRelabelConfigs(labels model.LabelSet) model.LabelSet {
builder.Add(string(name), string(value))
}

// Sort labels as required by Process
// Sort labels as required by Process (binary search)
builder.Sort()
promLabels := builder.Labels()

// Apply relabeling
Expand All @@ -219,10 +220,6 @@ func applyLabels(labels model.LabelSet) model.LabelSet {
delete(finalLabels, dropLabel)
}

// Apply relabeling after merging extra labels and dropping labels
finalLabels = applyRelabelConfigs(finalLabels)

// Skip entries with no labels after relabeling
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could we leave the documentation

if len(finalLabels) == 0 {
return nil
}
Expand Down
21 changes: 21 additions & 0 deletions pkg/promtail.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ const (
maxRetries = 10

reservedLabelTenantID = "__tenant_id__"
reservedLabelLogLine = "__log_line__"

userAgent = "lambda-promtail"
)
Expand Down Expand Up @@ -62,6 +63,26 @@ func newBatch(ctx context.Context, pClient Client, processingPipeline *LokiStage
}

func (b *batch) add(ctx context.Context, e entry) error {
// Apply relabel configs with log line available for content-based filtering.
// The log line is injected as a temporary __log_line__ label so relabel rules
// can match against it (e.g. to drop entries via "source_labels": ["__log_line__"]).
// This runs before pipeline stages to preserve the original processing order.
if len(relabelConfigs) > 0 && e.labels != nil {
labelsWithLine := make(model.LabelSet, len(e.labels)+1)
for k, v := range e.labels {
labelsWithLine[k] = v
}
labelsWithLine[model.LabelName(reservedLabelLogLine)] = model.LabelValue(e.entry.Line)

relabeled := applyRelabelConfigs(labelsWithLine)
delete(relabeled, model.LabelName(reservedLabelLogLine))

if len(relabeled) == 0 {
return nil
}
e.labels = relabeled
}

if b.processor.Size() > 0 {
// Apply pipeline stages to entry
stageEntry := stages.Entry{
Expand Down
241 changes: 241 additions & 0 deletions pkg/relabel_test.go
Original file line number Diff line number Diff line change
@@ -1,11 +1,15 @@
package main

import (
"context"
"testing"
"time"

"github.com/prometheus/common/model"
"github.com/prometheus/prometheus/model/relabel"
"github.com/stretchr/testify/require"

"github.com/grafana/loki/v3/pkg/logproto"
"github.com/grafana/regexp"
)

Expand Down Expand Up @@ -127,3 +131,240 @@ func TestParseRelabelConfigs(t *testing.T) {
})
}
}

func setupBatchTestGlobals(t *testing.T) {
t.Helper()
origBatchSize := batchSize
origRelabelConfigs := relabelConfigs
t.Cleanup(func() {
batchSize = origBatchSize
relabelConfigs = origRelabelConfigs
})
batchSize = 131072
}

func TestBatchAddDropsLogLinesViaRelabelConfig(t *testing.T) {
setupBatchTestGlobals(t)

configs, err := parseRelabelConfigs(`[{"source_labels":["__log_line__"],"regex":"END RequestId:.*","action":"drop"}]`)
require.NoError(t, err)
relabelConfigs = configs

b := &batch{
streams: map[string]*logproto.Stream{},
processor: &LokiStages{},
}
ctx := context.Background()
labels := model.LabelSet{
model.LabelName("job"): model.LabelValue("test"),
}

// Entry matching the drop pattern should be dropped
err = b.add(ctx, entry{labels: labels.Clone(), entry: logproto.Entry{
Line: "END RequestId: abc-123",
Timestamp: time.Now(),
}})
require.NoError(t, err)

// Entry not matching the drop pattern should be kept
err = b.add(ctx, entry{labels: labels.Clone(), entry: logproto.Entry{
Line: "Hello World",
Timestamp: time.Now(),
}})
require.NoError(t, err)

require.Len(t, b.streams, 1)
for _, stream := range b.streams {
require.Len(t, stream.Entries, 1)
require.Equal(t, "Hello World", stream.Entries[0].Line)
}
}

func TestBatchAddKeepsEntriesWithNoRelabelConfig(t *testing.T) {
setupBatchTestGlobals(t)
relabelConfigs = nil

b := &batch{
streams: map[string]*logproto.Stream{},
processor: &LokiStages{},
}
ctx := context.Background()
labels := model.LabelSet{
model.LabelName("job"): model.LabelValue("test"),
}

err := b.add(ctx, entry{labels: labels.Clone(), entry: logproto.Entry{
Line: "END RequestId: abc-123",
Timestamp: time.Now(),
}})
require.NoError(t, err)

err = b.add(ctx, entry{labels: labels.Clone(), entry: logproto.Entry{
Line: "Hello World",
Timestamp: time.Now(),
}})
require.NoError(t, err)

require.Len(t, b.streams, 1)
for _, stream := range b.streams {
require.Len(t, stream.Entries, 2)
}
}

func TestBatchAddKeepActionFiltersLogLines(t *testing.T) {
setupBatchTestGlobals(t)

// Keep only lines starting with "REPORT"
configs, err := parseRelabelConfigs(`[{"source_labels":["__log_line__"],"regex":"REPORT.*","action":"keep"}]`)
require.NoError(t, err)
relabelConfigs = configs

b := &batch{
streams: map[string]*logproto.Stream{},
processor: &LokiStages{},
}
ctx := context.Background()
labels := model.LabelSet{
model.LabelName("job"): model.LabelValue("test"),
}

err = b.add(ctx, entry{labels: labels.Clone(), entry: logproto.Entry{
Line: "START RequestId: abc-123",
Timestamp: time.Now(),
}})
require.NoError(t, err)

err = b.add(ctx, entry{labels: labels.Clone(), entry: logproto.Entry{
Line: "REPORT RequestId: abc-123 Duration: 100ms",
Timestamp: time.Now(),
}})
require.NoError(t, err)

err = b.add(ctx, entry{labels: labels.Clone(), entry: logproto.Entry{
Line: "END RequestId: abc-123",
Timestamp: time.Now(),
}})
require.NoError(t, err)

require.Len(t, b.streams, 1)
for _, stream := range b.streams {
require.Len(t, stream.Entries, 1)
require.Equal(t, "REPORT RequestId: abc-123 Duration: 100ms", stream.Entries[0].Line)
}
}

func TestBatchAddLogLineLabelNotInOutput(t *testing.T) {
setupBatchTestGlobals(t)

// Use a replace action so entries are kept, not dropped
configs, err := parseRelabelConfigs(`[{"source_labels":["__log_line__"],"regex":"(.*)","target_label":"extracted","action":"replace"}]`)
require.NoError(t, err)
relabelConfigs = configs

b := &batch{
streams: map[string]*logproto.Stream{},
processor: &LokiStages{},
}
ctx := context.Background()
labels := model.LabelSet{
model.LabelName("job"): model.LabelValue("test"),
}

err = b.add(ctx, entry{labels: labels.Clone(), entry: logproto.Entry{
Line: "test log line",
Timestamp: time.Now(),
}})
require.NoError(t, err)

require.Len(t, b.streams, 1)
for streamLabels := range b.streams {
require.NotContains(t, streamLabels, reservedLabelLogLine)
}
}

func TestBatchAddLabelOnlyRelabelStillWorks(t *testing.T) {
setupBatchTestGlobals(t)

// Drop entries where label "env" equals "debug"
configs, err := parseRelabelConfigs(`[{"source_labels":["env"],"regex":"debug","action":"drop"}]`)
require.NoError(t, err)
relabelConfigs = configs

b := &batch{
streams: map[string]*logproto.Stream{},
processor: &LokiStages{},
}
ctx := context.Background()

// Entry with env=debug should be dropped
err = b.add(ctx, entry{
labels: model.LabelSet{
model.LabelName("job"): model.LabelValue("test"),
model.LabelName("env"): model.LabelValue("debug"),
},
entry: logproto.Entry{
Line: "debug log",
Timestamp: time.Now(),
},
})
require.NoError(t, err)

// Entry with env=production should be kept
err = b.add(ctx, entry{
labels: model.LabelSet{
model.LabelName("job"): model.LabelValue("test"),
model.LabelName("env"): model.LabelValue("production"),
},
entry: logproto.Entry{
Line: "production log",
Timestamp: time.Now(),
},
})
require.NoError(t, err)

require.Len(t, b.streams, 1)
for _, stream := range b.streams {
require.Len(t, stream.Entries, 1)
require.Equal(t, "production log", stream.Entries[0].Line)
}
}

func TestBatchAddDropMultiplePatterns(t *testing.T) {
setupBatchTestGlobals(t)

// Drop lines matching "END RequestId:" OR "START RequestId:"
configs, err := parseRelabelConfigs(`[{"source_labels":["__log_line__"],"regex":"(END|START) RequestId:.*","action":"drop"}]`)
require.NoError(t, err)
relabelConfigs = configs

b := &batch{
streams: map[string]*logproto.Stream{},
processor: &LokiStages{},
}
ctx := context.Background()
labels := model.LabelSet{
model.LabelName("job"): model.LabelValue("test"),
}

lines := []string{
"START RequestId: abc-123",
"Processing request...",
"END RequestId: abc-123",
"REPORT RequestId: abc-123 Duration: 50ms",
}

for _, line := range lines {
err = b.add(ctx, entry{labels: labels.Clone(), entry: logproto.Entry{
Line: line,
Timestamp: time.Now(),
}})
require.NoError(t, err)
}

require.Len(t, b.streams, 1)
for _, stream := range b.streams {
require.Len(t, stream.Entries, 2)
require.Equal(t, "Processing request...", stream.Entries[0].Line)
require.Equal(t, "REPORT RequestId: abc-123 Duration: 50ms", stream.Entries[1].Line)
}
}