Skip to content

Commit f714075

Browse files
authored
Max event size metric label (#713)
* Add source_name label to max_event_size_exceeded metric * Rename config field * Fix
1 parent 8dd8c14 commit f714075

8 files changed

Lines changed: 132 additions & 42 deletions

File tree

fd/util.go

Lines changed: 3 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -18,8 +18,8 @@ import (
1818
func extractPipelineParams(settings *simplejson.Json) *pipeline.Settings {
1919
capacity := pipeline.DefaultCapacity
2020
antispamThreshold := pipeline.DefaultAntispamThreshold
21-
antispamField := pipeline.DefaultAntispamField
2221
var antispamExceptions antispam.Exceptions
22+
sourceNameMetaField := pipeline.DefaultSourceNameMetaField
2323
avgInputEventSize := pipeline.DefaultAvgInputEventSize
2424
maxInputEventSize := pipeline.DefaultMaxInputEventSize
2525
cutOffEventByLimit := pipeline.DefaultCutOffEventByLimit
@@ -98,15 +98,14 @@ func extractPipelineParams(settings *simplejson.Json) *pipeline.Settings {
9898
antispamThreshold = 0
9999
}
100100

101-
antispamField = settings.Get("antispam_field").MustString()
102-
103101
var err error
104102
antispamExceptions, err = extractAntispamExceptions(settings)
105103
if err != nil {
106104
logger.Fatalf("extract exceptions: %s", err)
107105
}
108106
antispamExceptions.Prepare()
109107

108+
sourceNameMetaField = settings.Get("source_name_meta_field").MustString()
110109
isStrict = settings.Get("is_strict").MustBool()
111110

112111
str = settings.Get("metric_hold_duration").MustString()
@@ -129,8 +128,8 @@ func extractPipelineParams(settings *simplejson.Json) *pipeline.Settings {
129128
CutOffEventByLimit: cutOffEventByLimit,
130129
CutOffEventByLimitMsg: cutOffEventByLimitMsg,
131130
AntispamThreshold: antispamThreshold,
132-
AntispamField: antispamField,
133131
AntispamExceptions: antispamExceptions,
132+
SourceNameMetaField: sourceNameMetaField,
134133
MaintenanceInterval: maintenanceInterval,
135134
EventTimeout: eventTimeout,
136135
StreamField: streamField,

pipeline/antispam/antispammer.go

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -42,7 +42,6 @@ type source struct {
4242
type Options struct {
4343
MaintenanceInterval time.Duration
4444
Threshold int
45-
Field string
4645
UnbanIterations int
4746
Exceptions Exceptions
4847

pipeline/pipeline.go

Lines changed: 23 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -25,7 +25,7 @@ import (
2525

2626
const (
2727
DefaultAntispamThreshold = 0
28-
DefaultAntispamField = ""
28+
DefaultSourceNameMetaField = ""
2929
DefaultDecoder = "auto"
3030
DefaultIsStrict = false
3131
DefaultStreamField = "stream"
@@ -51,17 +51,17 @@ type finalizeFn = func(event *Event, notifyInput bool, backEvent bool)
5151

5252
type InputPluginController interface {
5353
In(sourceID SourceID, sourceName string, offset int64, data []byte, isNewSource bool, meta metadata.MetaData) uint64
54-
UseSpread() // don't use stream field and spread all events across all processors
55-
DisableStreams() // don't use stream field
56-
SuggestDecoder(t decoder.Type) // set decoder type if pipeline uses "auto" value for decoder
57-
IncReadOps() // inc read ops for metric
58-
IncMaxEventSizeExceeded() // inc max event size exceeded counter
54+
UseSpread() // don't use stream field and spread all events across all processors
55+
DisableStreams() // don't use stream field
56+
SuggestDecoder(t decoder.Type) // set decoder type if pipeline uses "auto" value for decoder
57+
IncReadOps() // inc read ops for metric
58+
IncMaxEventSizeExceeded(lvs ...string) // inc max event size exceeded counter
5959
}
6060

6161
type ActionPluginController interface {
6262
Propagate(event *Event) // throw held event back to pipeline
6363
Spawn(parent *Event, nodes []*insaneJSON.Node)
64-
IncMaxEventSizeExceeded() // inc max event size exceeded counter
64+
IncMaxEventSizeExceeded(lvs ...string) // inc max event size exceeded counter
6565
}
6666

6767
type OutputPluginController interface {
@@ -130,7 +130,7 @@ type Pipeline struct {
130130
outputEventSizeMetric prometheus.Counter
131131
readOpsEventsSizeMetric prometheus.Counter
132132
wrongEventCRIFormatMetric prometheus.Counter
133-
maxEventSizeExceededMetric prometheus.Counter
133+
maxEventSizeExceededMetric *prometheus.CounterVec
134134
eventPoolLatency prometheus.Observer
135135

136136
countEventPanicsRecoveredMetric prometheus.Counter
@@ -144,8 +144,8 @@ type Settings struct {
144144
MaintenanceInterval time.Duration
145145
EventTimeout time.Duration
146146
AntispamThreshold int
147-
AntispamField string
148147
AntispamExceptions antispam.Exceptions
148+
SourceNameMetaField string
149149
AvgEventSize int
150150
MaxEventSize int
151151
CutOffEventByLimit bool
@@ -183,7 +183,6 @@ func New(name string, settings *Settings, registry *prometheus.Registry) *Pipeli
183183
antispamer: antispam.NewAntispammer(&antispam.Options{
184184
MaintenanceInterval: settings.MaintenanceInterval,
185185
Threshold: settings.AntispamThreshold,
186-
Field: settings.AntispamField,
187186
UnbanIterations: antispamUnbanIterations,
188187
Logger: lg.Named("antispam"),
189188
MetricsController: metricCtl,
@@ -242,8 +241,8 @@ func (p *Pipeline) IncReadOps() {
242241
p.readOps.Inc()
243242
}
244243

245-
func (p *Pipeline) IncMaxEventSizeExceeded() {
246-
p.maxEventSizeExceededMetric.Inc()
244+
func (p *Pipeline) IncMaxEventSizeExceeded(lvs ...string) {
245+
p.maxEventSizeExceededMetric.WithLabelValues(lvs...).Inc()
247246
}
248247

249248
func (p *Pipeline) IncCountEventPanicsRecovered() {
@@ -260,7 +259,7 @@ func (p *Pipeline) registerMetrics() {
260259
p.outputEventSizeMetric = m.RegisterCounter("output_events_size", "Size of events on pipeline output")
261260
p.readOpsEventsSizeMetric = m.RegisterCounter("read_ops_count", "Read OPS count")
262261
p.wrongEventCRIFormatMetric = m.RegisterCounter("wrong_event_cri_format", "Wrong event CRI format counter")
263-
p.maxEventSizeExceededMetric = m.RegisterCounter("max_event_size_exceeded", "Max event size exceeded counter")
262+
p.maxEventSizeExceededMetric = m.RegisterCounterVec("max_event_size_exceeded", "Max event size exceeded counter", "source_name")
264263
p.countEventPanicsRecoveredMetric = m.RegisterCounter("count_event_panics_recovered", "Count of processor.countEvent panics recovered")
265264
p.eventPoolLatency = m.RegisterHistogram("event_pool_latency_seconds",
266265
"How long we are wait an event from the pool", metric.SecondsBucketsDetailedNano)
@@ -389,7 +388,7 @@ func (p *Pipeline) GetOutput() OutputPlugin {
389388
func (p *Pipeline) In(sourceID SourceID, sourceName string, offset int64, bytes []byte, isNewSource bool, meta metadata.MetaData) (seqID uint64) {
390389
// don't process mud.
391390
var ok bool
392-
bytes, ok = p.checkInputBytes(bytes)
391+
bytes, ok = p.checkInputBytes(bytes, sourceName, meta)
393392
if !ok {
394393
return EventSeqIDError
395394
}
@@ -432,16 +431,16 @@ func (p *Pipeline) In(sourceID SourceID, sourceName string, offset int64, bytes
432431
if !row.IsPartial && p.settings.AntispamThreshold > 0 {
433432
var checkSourceID any
434433
var checkSourceName string
435-
if p.settings.AntispamField == "" {
434+
if p.settings.SourceNameMetaField == "" {
436435
checkSourceID = uint64(sourceID)
437436
checkSourceName = sourceName
438437
} else {
439-
if val, ok := meta[p.settings.AntispamField]; ok {
438+
if val, ok := meta[p.settings.SourceNameMetaField]; ok {
440439
checkSourceID = val
441440
checkSourceName = val
442441
isNewSource = false
443442
} else {
444-
p.Error(fmt.Sprintf("antispam_field %s does not exists in meta", p.settings.AntispamField))
443+
p.Error(fmt.Sprintf("source_name_meta_field %q does not exists in meta", p.settings.SourceNameMetaField))
445444
checkSourceID = uint64(sourceID)
446445
checkSourceName = sourceName
447446
}
@@ -534,15 +533,20 @@ func (p *Pipeline) In(sourceID SourceID, sourceName string, offset int64, bytes
534533
return p.streamEvent(event)
535534
}
536535

537-
func (p *Pipeline) checkInputBytes(bytes []byte) ([]byte, bool) {
536+
func (p *Pipeline) checkInputBytes(bytes []byte, sourceName string, meta metadata.MetaData) ([]byte, bool) {
538537
length := len(bytes)
539538

540539
if length == 0 || (bytes[0] == '\n' && length == 1) {
541540
return bytes, false
542541
}
543542

544543
if p.settings.MaxEventSize != 0 && length > p.settings.MaxEventSize {
545-
p.IncMaxEventSizeExceeded()
544+
source := sourceName
545+
if val, ok := meta[p.settings.SourceNameMetaField]; ok {
546+
source = val
547+
}
548+
p.IncMaxEventSizeExceeded(source)
549+
546550
if !p.settings.CutOffEventByLimit {
547551
return bytes, false
548552
}

pipeline/pipeline_whitebox_test.go

Lines changed: 80 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -3,8 +3,11 @@ package pipeline
33
import (
44
"testing"
55

6+
"github.com/ozontech/file.d/pipeline/metadata"
67
"github.com/prometheus/client_golang/prometheus"
8+
"github.com/prometheus/client_golang/prometheus/testutil"
79
"github.com/stretchr/testify/assert"
10+
"github.com/stretchr/testify/require"
811
"go.uber.org/atomic"
912
)
1013

@@ -145,7 +148,7 @@ func TestCheckInputBytes(t *testing.T) {
145148
t.Run(tCase.name, func(t *testing.T) {
146149
pipe := New("test_pipeline", tCase.pipelineSettings, prometheus.NewRegistry())
147150

148-
data, ok := pipe.checkInputBytes(tCase.input)
151+
data, ok := pipe.checkInputBytes(tCase.input, "test", nil)
149152

150153
assert.Equal(t, tCase.wantOk, ok)
151154
if !tCase.wantOk {
@@ -155,3 +158,79 @@ func TestCheckInputBytes(t *testing.T) {
155158
})
156159
}
157160
}
161+
162+
func TestCheckInputBytesMetric(t *testing.T) {
163+
cases := []struct {
164+
name string
165+
pipelineSettings *Settings
166+
sourceName string
167+
meta metadata.MetaData
168+
want map[string]float64
169+
}{
170+
{
171+
name: "from_source1",
172+
pipelineSettings: &Settings{
173+
Capacity: 5,
174+
Decoder: "raw",
175+
MetricHoldDuration: DefaultMetricHoldDuration,
176+
MaxEventSize: 1,
177+
},
178+
sourceName: "test-source",
179+
meta: metadata.MetaData{
180+
"some-key": "some-value",
181+
},
182+
want: map[string]float64{
183+
"test-source": 1,
184+
"some-value": 0,
185+
},
186+
},
187+
{
188+
name: "from_source2",
189+
pipelineSettings: &Settings{
190+
Capacity: 5,
191+
Decoder: "raw",
192+
MetricHoldDuration: DefaultMetricHoldDuration,
193+
MaxEventSize: 1,
194+
SourceNameMetaField: "test",
195+
},
196+
sourceName: "test-source",
197+
meta: metadata.MetaData{
198+
"some-key": "some-value",
199+
},
200+
want: map[string]float64{
201+
"test-source": 1,
202+
"some-value": 0,
203+
},
204+
},
205+
{
206+
name: "from_meta",
207+
pipelineSettings: &Settings{
208+
Capacity: 5,
209+
Decoder: "raw",
210+
MetricHoldDuration: DefaultMetricHoldDuration,
211+
MaxEventSize: 1,
212+
SourceNameMetaField: "test",
213+
},
214+
sourceName: "test-source",
215+
meta: metadata.MetaData{
216+
"test": "some-value",
217+
},
218+
want: map[string]float64{
219+
"test-source": 0,
220+
"some-value": 1,
221+
},
222+
},
223+
}
224+
225+
for _, tCase := range cases {
226+
t.Run(tCase.name, func(t *testing.T) {
227+
pipe := New("test_pipeline", tCase.pipelineSettings, prometheus.NewRegistry())
228+
229+
pipe.checkInputBytes([]byte("some log"), tCase.sourceName, tCase.meta)
230+
231+
for k, v := range tCase.want {
232+
require.Equal(t, v, testutil.ToFloat64(pipe.maxEventSizeExceededMetric.WithLabelValues(k)))
233+
}
234+
})
235+
}
236+
}

pipeline/processor.go

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -70,7 +70,7 @@ type processor struct {
7070

7171
metricsValues []string
7272

73-
incMaxEventSizeExceeded func()
73+
incMaxEventSizeExceeded func(lvs ...string)
7474
incCountEventPanicsRecovered func()
7575
}
7676

@@ -81,7 +81,7 @@ func newProcessor(
8181
output OutputPlugin,
8282
streamer *streamer,
8383
finalizeFn finalizeFn,
84-
incMaxEventSizeExceededFn func(),
84+
incMaxEventSizeExceededFn func(lvs ...string),
8585
incCountEventPanicsRecoveredFn func(),
8686
) *processor {
8787
processor := &processor{
@@ -421,8 +421,8 @@ func (p *processor) Propagate(event *Event) {
421421
p.processSequence(event)
422422
}
423423

424-
func (p *processor) IncMaxEventSizeExceeded() {
425-
p.incMaxEventSizeExceeded()
424+
func (p *processor) IncMaxEventSizeExceeded(lvs ...string) {
425+
p.incMaxEventSizeExceeded(lvs...)
426426
}
427427

428428
// Spawn the children of the parent and process in the actions.

plugin/input/file/worker.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -25,7 +25,7 @@ type inputer interface {
2525
// In processes event and returns it seq number.
2626
In(sourceID pipeline.SourceID, sourceName string, offset int64, data []byte, isNewSource bool, meta metadata.MetaData) uint64
2727
IncReadOps()
28-
IncMaxEventSizeExceeded()
28+
IncMaxEventSizeExceeded(lvs ...string)
2929
}
3030

3131
func (w *worker) start(inputController inputer, jobProvider *jobProvider, readBufferSize int, logger *zap.SugaredLogger) {
@@ -106,7 +106,7 @@ func (w *worker) work(controller inputer, jobProvider *jobProvider, readBufferSi
106106

107107
// check if the event fits into the max size, otherwise skip the event
108108
if shouldCheckMax && !w.cutOffEventByLimit && len(accumBuf)+len(line) > w.maxEventSize {
109-
controller.IncMaxEventSizeExceeded()
109+
controller.IncMaxEventSizeExceeded(sourceName)
110110
skipLine = true
111111
}
112112

plugin/input/file/worker_test.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -26,7 +26,7 @@ type inputerMock struct {
2626

2727
func (i *inputerMock) IncReadOps() {}
2828

29-
func (i *inputerMock) IncMaxEventSizeExceeded() {}
29+
func (i *inputerMock) IncMaxEventSizeExceeded(lvs ...string) {}
3030

3131
func (i *inputerMock) In(_ pipeline.SourceID, _ string, _ int64, data []byte, _ bool, _ metadata.MetaData) uint64 {
3232
i.gotData = append(i.gotData, string(data))

plugin/input/k8s/multiline_action.go

Lines changed: 19 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -12,9 +12,11 @@ type MultilineAction struct {
1212
allowedPodLabels map[string]bool
1313
allowedNodeLabels map[string]bool
1414

15-
logger *zap.SugaredLogger
16-
controller pipeline.ActionPluginController
17-
maxEventSize int
15+
logger *zap.SugaredLogger
16+
controller pipeline.ActionPluginController
17+
maxEventSize int
18+
sourceNameMetaField string
19+
1820
eventBuf []byte
1921
eventSize int
2022
skipNextEvent bool
@@ -29,6 +31,7 @@ func (p *MultilineAction) Start(config pipeline.AnyConfig, params *pipeline.Acti
2931
p.logger = params.Logger
3032
p.controller = params.Controller
3133
p.maxEventSize = params.PipelineSettings.MaxEventSize
34+
p.sourceNameMetaField = params.PipelineSettings.SourceNameMetaField
3235
p.config = config.(*Config)
3336

3437
p.allowedPodLabels = cfg.ListToMap(p.config.AllowedPodLabels)
@@ -92,16 +95,22 @@ func (p *MultilineAction) Do(event *pipeline.Event) pipeline.ActionResult {
9295
// check buffer size before append
9396
if p.maxEventSize == 0 || sizeAfterAppend < p.maxEventSize {
9497
p.eventBuf = append(p.eventBuf, logFragment[1:logFragmentLen-1]...)
95-
} else {
98+
} else if !p.skipNextEvent {
9699
if p.controller != nil {
97-
p.controller.IncMaxEventSizeExceeded()
98-
}
100+
source := event.SourceName
101+
if p.sourceNameMetaField != "" {
102+
// at the moment, all metadata fields have been added to log
103+
if val := event.Root.Dig(p.sourceNameMetaField).AsString(); val != "" {
104+
source = val
105+
}
106+
}
99107

100-
if !p.skipNextEvent {
101-
// skip event if max_event_size is exceeded
102-
p.skipNextEvent = true
103-
p.logger.Errorf("event chunk will be discarded due to max_event_size, source_name=%s, namespace=%s, pod=%s", event.SourceName, ns, pod)
108+
p.controller.IncMaxEventSizeExceeded(source)
104109
}
110+
111+
// skip event if max_event_size is exceeded
112+
p.skipNextEvent = true
113+
p.logger.Errorf("event chunk will be discarded due to max_event_size, source_name=%s, namespace=%s, pod=%s", event.SourceName, ns, pod)
105114
}
106115
return pipeline.ActionCollapse
107116
}

0 commit comments

Comments
 (0)