Skip to content

Commit 46e144f

Browse files
committed
fix buffered telemetry sampling
1 parent 06e7e87 commit 46e144f

2 files changed

Lines changed: 407 additions & 12 deletions

File tree

core/services/llo/telem/telemetry.go

Lines changed: 15 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -302,6 +302,10 @@ func (t *telemeter) sendBufferedTelemetry(digest types.ConfigDigest, seqNr uint6
302302
go func() {
303303
for _, msgs := range messages {
304304
for _, msg := range msgs {
305+
// Sampling is applied at flush time (not enqueue time) for buffered telemetry
306+
if !t.sampler.Sample(msg.telemType, msg.msg) {
307+
continue
308+
}
305309
bytes, err := proto.Marshal(msg.msg)
306310
if err != nil {
307311
t.eng.Warnf("protobuf marshal failed %v", err.Error())
@@ -337,12 +341,12 @@ func (t *telemeter) enqueueTelemetry(digest string, seqNr uint64, typ synchroniz
337341
if _, ok := t.telemetryBuffer[digest]; !ok {
338342
t.telemetryBuffer[digest] = make(map[uint64][]telemetryEntry)
339343
}
340-
if t.sampler.Sample(typ, msg) {
341-
t.telemetryBuffer[digest][seqNr] = []telemetryEntry{{
342-
telemType: typ,
343-
msg: msg,
344-
}}
345-
}
344+
345+
// Sampling is applied at flush time for buffered telemetry
346+
t.telemetryBuffer[digest][seqNr] = []telemetryEntry{{
347+
telemType: typ,
348+
msg: msg,
349+
}}
346350
default: // synchronization.LLOReport and other buffered types
347351
// Report telemetry: append, since multiple reports per seqNr is
348352
// expected (one per reportable channel).
@@ -352,12 +356,11 @@ func (t *telemeter) enqueueTelemetry(digest string, seqNr uint64, typ synchroniz
352356
if _, ok := t.telemetryBuffer[digest]; !ok {
353357
t.telemetryBuffer[digest] = make(map[uint64][]telemetryEntry)
354358
}
355-
if t.sampler.Sample(typ, msg) {
356-
t.telemetryBuffer[digest][seqNr] = append(t.telemetryBuffer[digest][seqNr], telemetryEntry{
357-
telemType: typ,
358-
msg: msg,
359-
})
360-
}
359+
// Sampling is applied at flush time for buffered telemetry
360+
t.telemetryBuffer[digest][seqNr] = append(t.telemetryBuffer[digest][seqNr], telemetryEntry{
361+
telemType: typ,
362+
msg: msg,
363+
})
361364
}
362365
}
363366

0 commit comments

Comments
 (0)