diff --git a/.changeset/weak-pandas-draw.md b/.changeset/weak-pandas-draw.md new file mode 100644 index 00000000000..2aa71d4e0ab --- /dev/null +++ b/.changeset/weak-pandas-draw.md @@ -0,0 +1,5 @@ +--- +"chainlink": patch +--- + +#bugfix fix llo buffered telemetry sampling diff --git a/core/services/llo/telem/telemetry.go b/core/services/llo/telem/telemetry.go index 04959e1a0e0..9092266cfd1 100644 --- a/core/services/llo/telem/telemetry.go +++ b/core/services/llo/telem/telemetry.go @@ -302,6 +302,10 @@ func (t *telemeter) sendBufferedTelemetry(digest types.ConfigDigest, seqNr uint6 go func() { for _, msgs := range messages { for _, msg := range msgs { + // Sampling is applied at flush time (not enqueue time) for buffered telemetry + if !t.sampler.Sample(msg.telemType, msg.msg) { + continue + } bytes, err := proto.Marshal(msg.msg) if err != nil { t.eng.Warnf("protobuf marshal failed %v", err.Error()) @@ -337,12 +341,12 @@ func (t *telemeter) enqueueTelemetry(digest string, seqNr uint64, typ synchroniz if _, ok := t.telemetryBuffer[digest]; !ok { t.telemetryBuffer[digest] = make(map[uint64][]telemetryEntry) } - if t.sampler.Sample(typ, msg) { - t.telemetryBuffer[digest][seqNr] = []telemetryEntry{{ - telemType: typ, - msg: msg, - }} - } + + // Sampling is applied at flush time for buffered telemetry + t.telemetryBuffer[digest][seqNr] = []telemetryEntry{{ + telemType: typ, + msg: msg, + }} default: // synchronization.LLOReport and other buffered types // Report telemetry: append, since multiple reports per seqNr is // expected (one per reportable channel). @@ -352,12 +356,11 @@ func (t *telemeter) enqueueTelemetry(digest string, seqNr uint64, typ synchroniz if _, ok := t.telemetryBuffer[digest]; !ok { t.telemetryBuffer[digest] = make(map[uint64][]telemetryEntry) } - if t.sampler.Sample(typ, msg) { - t.telemetryBuffer[digest][seqNr] = append(t.telemetryBuffer[digest][seqNr], telemetryEntry{ - telemType: typ, - msg: msg, - }) - } + // Sampling is applied at flush time for buffered telemetry + t.telemetryBuffer[digest][seqNr] = append(t.telemetryBuffer[digest][seqNr], telemetryEntry{ + telemType: typ, + msg: msg, + }) } } diff --git a/core/services/llo/telem/telemetry_test.go b/core/services/llo/telem/telemetry_test.go index 910e5fe9d5b..2c1a4bf8e9f 100644 --- a/core/services/llo/telem/telemetry_test.go +++ b/core/services/llo/telem/telemetry_test.go @@ -795,4 +795,396 @@ func Test_Telemeter_reportTelemetry(t *testing.T) { }) } +// Test_Telemeter_outcomeTelemetry_samplingAtFlushTime is a regression test for +// the bug where enabling telemetry sampling caused LLO outcome telemetry to +// drop to ~0.2/s/node instead of the expected ~1/s/node. +// +// Root cause: sampling was applied at enqueue time. For LLOOutcome, the +// fingerprint is (donId, configDigest) bucketed per second, so only the first +// Outcome of each wall-clock second was buffered. The buffer is keyed by +// seqNr and flushed only on Transmit(seqNr=X). When the OCR3 round cadence +// (DeltaRound) is much faster than the report-emission cadence +// (DefaultMinReportIntervalNanoseconds), most seqNrs do not Transmit — so the +// sampled entry was almost always evicted by a later Transmit before being +// sent. +// +// Fix: apply sampling at flush time inside sendBufferedTelemetry, so the +// buffer always sees every Outcome and the sampler decides admission on the +// (already transmit-filtered) survivor. +func Test_Telemeter_outcomeTelemetry_samplingAtFlushTime(t *testing.T) { + t.Parallel() + + lggr := logger.TestLogger(t) + donID := uint32(1) + + // Build outcomes spanning multiple wall-clock seconds. Per second, we emit + // several Outcomes at different seqNrs but only the LAST seqNr of each + // second transmits — mimicking a DON where DeltaRound << report interval. + const ( + secondsCovered = 3 + outcomesPerSecond = 5 + baseObservationUnix = int64(1737936858) + baseSeqNr = uint64(1000) + ) + + cd := (&mockOpts{}).ConfigDigest() + + t.Run("with sampling enabled emits ~1 outcome per second despite seqNr/transmit mismatch", func(t *testing.T) { + m := &mockMonitoringEndpoint{chTypedLogs: make(chan typedLog, 100)} + tm := newTelemeter(TelemeterParams{ + Logger: lggr, + MonitoringEndpoint: m, + DonID: donID, + CaptureOutcomeTelemetry: true, + SampleTelemetry: true, + }) + servicetest.Run(t, tm) + ch := tm.GetOutcomeTelemetryCh() + require.NotNil(t, ch) + + transmittingSeqNrs := make([]uint64, 0, secondsCovered) + for s := 0; s < secondsCovered; s++ { + secStart := time.Unix(baseObservationUnix+int64(s), 0).UnixNano() + for i := 0; i < outcomesPerSecond; i++ { + seqNr := baseSeqNr + uint64(s*outcomesPerSecond+i) + // Spread observation timestamps within the same wall-clock + // second (different nanos, same second bucket). + obsTs := uint64(secStart + int64(i)*int64(10*time.Millisecond)) + ch <- &datastreamsllo.LLOOutcomeTelemetry{ + LifeCycleStage: "production", + ObservationTimestampNanoseconds: obsTs, + SeqNr: seqNr, + ConfigDigest: cd[:], + DonId: donID, + } + } + // Only the last seqNr of each second "transmits". + transmittingSeqNrs = append(transmittingSeqNrs, baseSeqNr+uint64(s*outcomesPerSecond+outcomesPerSecond-1)) + } + + // Wait until every outcome is buffered. + testutils.RequireEventually(t, func() bool { + tm.telemetryBufferMu.Lock() + defer tm.telemetryBufferMu.Unlock() + return len(tm.telemetryBuffer[cd.Hex()]) == secondsCovered*outcomesPerSecond + }) + + for _, seqNr := range transmittingSeqNrs { + tm.TrackSeqNr(cd, seqNr) + } + + // We expect one outcome telemetry per wall-clock second (sampler + // admits the first survivor per second bucket). + received := make([]uint64, 0, secondsCovered) + for i := 0; i < secondsCovered; i++ { + select { + case tLog := <-m.chTypedLogs: + assert.Equal(t, synchronization.LLOOutcome, tLog.telemType) + decoded := &datastreamsllo.LLOOutcomeTelemetry{} + require.NoError(t, proto.Unmarshal(tLog.log, decoded)) + received = append(received, decoded.ObservationTimestampNanoseconds) + case <-time.After(testutils.WaitTimeout(t)): + t.Fatalf("timed out waiting for outcome telemetry #%d (got %d so far)", i+1, len(received)) + } + } + assert.Len(t, received, secondsCovered) + + // Verify no additional messages — sampler should have dropped the + // remaining survivors that fell in already-seen second buckets. + select { + case extra := <-m.chTypedLogs: + decoded := &datastreamsllo.LLOOutcomeTelemetry{} + require.NoError(t, proto.Unmarshal(extra.log, decoded)) + t.Fatalf("expected no more outcome messages, got one with ts=%d", decoded.ObservationTimestampNanoseconds) + case <-time.After(100 * time.Millisecond): + // good + } + }) + + t.Run("with sampling disabled emits one outcome per transmitting seqNr", func(t *testing.T) { + m := &mockMonitoringEndpoint{chTypedLogs: make(chan typedLog, 100)} + tm := newTelemeter(TelemeterParams{ + Logger: lggr, + MonitoringEndpoint: m, + DonID: donID, + CaptureOutcomeTelemetry: true, + SampleTelemetry: false, + }) + servicetest.Run(t, tm) + ch := tm.GetOutcomeTelemetryCh() + require.NotNil(t, ch) + + transmittingSeqNrs := make([]uint64, 0, secondsCovered) + for s := 0; s < secondsCovered; s++ { + secStart := time.Unix(baseObservationUnix+int64(s), 0).UnixNano() + for i := 0; i < outcomesPerSecond; i++ { + seqNr := baseSeqNr + uint64(s*outcomesPerSecond+i) + obsTs := uint64(secStart + int64(i)*int64(10*time.Millisecond)) + ch <- &datastreamsllo.LLOOutcomeTelemetry{ + LifeCycleStage: "production", + ObservationTimestampNanoseconds: obsTs, + SeqNr: seqNr, + ConfigDigest: cd[:], + DonId: donID, + } + } + transmittingSeqNrs = append(transmittingSeqNrs, baseSeqNr+uint64(s*outcomesPerSecond+outcomesPerSecond-1)) + } + + testutils.RequireEventually(t, func() bool { + tm.telemetryBufferMu.Lock() + defer tm.telemetryBufferMu.Unlock() + return len(tm.telemetryBuffer[cd.Hex()]) == secondsCovered*outcomesPerSecond + }) + + for _, seqNr := range transmittingSeqNrs { + tm.TrackSeqNr(cd, seqNr) + } + + // Without sampling, every transmit produces telemetry — one per second + // (since only one seqNr per second transmits). + for i := 0; i < secondsCovered; i++ { + select { + case tLog := <-m.chTypedLogs: + assert.Equal(t, synchronization.LLOOutcome, tLog.telemType) + case <-time.After(testutils.WaitTimeout(t)): + t.Fatalf("timed out waiting for outcome telemetry #%d", i+1) + } + } + + select { + case <-m.chTypedLogs: + t.Fatal("expected no more outcome messages") + case <-time.After(100 * time.Millisecond): + } + }) +} + +// Test_Telemeter_reportTelemetry_samplingAtFlushTime is the analogous +// regression test to Test_Telemeter_outcomeTelemetry_samplingAtFlushTime, but +// for LLOReport. The same root cause applies: sampling was applied at enqueue +// time, but the buffer is seqNr-keyed and only the transmitting seqNr's entry +// survives flush. When most seqNrs do not transmit, sampled report entries +// were evicted before they could be sent. +// +// LLOReport's sampler fingerprint additionally keys on channelId, so each +// channel has its own per-second bucket. +func Test_Telemeter_reportTelemetry_samplingAtFlushTime(t *testing.T) { + t.Parallel() + + lggr := logger.TestLogger(t) + donID := uint32(1) + + const ( + secondsCovered = 3 + seqNrsPerSecond = 5 + baseObservationUnix = int64(1737936858) + baseSeqNr = uint64(2000) + ) + + channels := []uint32{1, 2} + cd := (&mockOpts{}).ConfigDigest() + + t.Run("with sampling enabled emits ~1 report per channel per second despite seqNr/transmit mismatch", func(t *testing.T) { + m := &mockMonitoringEndpoint{chTypedLogs: make(chan typedLog, 100)} + tm := newTelemeter(TelemeterParams{ + Logger: lggr, + MonitoringEndpoint: m, + DonID: donID, + CaptureReportTelemetry: true, + SampleTelemetry: true, + }) + servicetest.Run(t, tm) + ch := tm.GetReportTelemetryCh() + require.NotNil(t, ch) + + transmittingSeqNrs := make([]uint64, 0, secondsCovered) + expectedBufferEntries := 0 + for s := 0; s < secondsCovered; s++ { + secStart := time.Unix(baseObservationUnix+int64(s), 0).UnixNano() + for i := 0; i < seqNrsPerSecond; i++ { + seqNr := baseSeqNr + uint64(s*seqNrsPerSecond+i) + obsTs := uint64(secStart + int64(i)*int64(10*time.Millisecond)) + // Each seqNr emits a report per channel — mimics the + // Reports() call shape in LLO (one report per channel). + for _, channelID := range channels { + ch <- &datastreamsllo.LLOReportTelemetry{ + ChannelId: channelID, + ObservationTimestampNanoseconds: obsTs, + SeqNr: seqNr, + ConfigDigest: cd[:], + } + expectedBufferEntries++ + } + } + // Only the last seqNr of each second transmits. + transmittingSeqNrs = append(transmittingSeqNrs, baseSeqNr+uint64(s*seqNrsPerSecond+seqNrsPerSecond-1)) + } + + // Wait until every report is buffered (no enqueue-time sampling). + testutils.RequireEventually(t, func() bool { + tm.telemetryBufferMu.Lock() + defer tm.telemetryBufferMu.Unlock() + total := 0 + for _, m := range tm.telemetryBuffer[cd.Hex()] { + total += len(m) + } + return total == expectedBufferEntries + }) + + for _, seqNr := range transmittingSeqNrs { + tm.TrackSeqNr(cd, seqNr) + } + + // Expect one report per channel per second bucket (sampler admits the + // first survivor per (channelId, second) fingerprint). + expected := secondsCovered * len(channels) + seen := make(map[uint32]int) + for i := 0; i < expected; i++ { + select { + case tLog := <-m.chTypedLogs: + assert.Equal(t, synchronization.LLOReport, tLog.telemType) + decoded := &datastreamsllo.LLOReportTelemetry{} + require.NoError(t, proto.Unmarshal(tLog.log, decoded)) + seen[decoded.ChannelId]++ + case <-time.After(testutils.WaitTimeout(t)): + t.Fatalf("timed out waiting for report telemetry #%d (got %d so far)", i+1, len(seen)) + } + } + for _, channelID := range channels { + assert.Equal(t, secondsCovered, seen[channelID], + "expected one report per second for channel %d", channelID) + } + + select { + case extra := <-m.chTypedLogs: + decoded := &datastreamsllo.LLOReportTelemetry{} + require.NoError(t, proto.Unmarshal(extra.log, decoded)) + t.Fatalf("expected no more report messages, got one with channel=%d ts=%d", + decoded.ChannelId, decoded.ObservationTimestampNanoseconds) + case <-time.After(100 * time.Millisecond): + } + }) + + t.Run("with sampling disabled emits all reports for transmitting seqNrs", func(t *testing.T) { + m := &mockMonitoringEndpoint{chTypedLogs: make(chan typedLog, 100)} + tm := newTelemeter(TelemeterParams{ + Logger: lggr, + MonitoringEndpoint: m, + DonID: donID, + CaptureReportTelemetry: true, + SampleTelemetry: false, + }) + servicetest.Run(t, tm) + ch := tm.GetReportTelemetryCh() + require.NotNil(t, ch) + + transmittingSeqNrs := make([]uint64, 0, secondsCovered) + expectedBufferEntries := 0 + for s := 0; s < secondsCovered; s++ { + secStart := time.Unix(baseObservationUnix+int64(s), 0).UnixNano() + for i := 0; i < seqNrsPerSecond; i++ { + seqNr := baseSeqNr + uint64(s*seqNrsPerSecond+i) + obsTs := uint64(secStart + int64(i)*int64(10*time.Millisecond)) + for _, channelID := range channels { + ch <- &datastreamsllo.LLOReportTelemetry{ + ChannelId: channelID, + ObservationTimestampNanoseconds: obsTs, + SeqNr: seqNr, + ConfigDigest: cd[:], + } + expectedBufferEntries++ + } + } + transmittingSeqNrs = append(transmittingSeqNrs, baseSeqNr+uint64(s*seqNrsPerSecond+seqNrsPerSecond-1)) + } + + testutils.RequireEventually(t, func() bool { + tm.telemetryBufferMu.Lock() + defer tm.telemetryBufferMu.Unlock() + total := 0 + for _, m := range tm.telemetryBuffer[cd.Hex()] { + total += len(m) + } + return total == expectedBufferEntries + }) + + for _, seqNr := range transmittingSeqNrs { + tm.TrackSeqNr(cd, seqNr) + } + + // Without sampling, every report at a transmitting seqNr flushes. + // One transmitting seqNr per second × len(channels) reports per seqNr. + expected := secondsCovered * len(channels) + for i := 0; i < expected; i++ { + select { + case tLog := <-m.chTypedLogs: + assert.Equal(t, synchronization.LLOReport, tLog.telemType) + case <-time.After(testutils.WaitTimeout(t)): + t.Fatalf("timed out waiting for report telemetry #%d", i+1) + } + } + + select { + case <-m.chTypedLogs: + t.Fatal("expected no more report messages") + case <-time.After(100 * time.Millisecond): + } + }) + + t.Run("with sampling enabled multiple reports per channel at same transmitting seqNr keep one per second", func(t *testing.T) { + // Reports() emits one report per channel for a given seqNr. When + // that seqNr does transmit, the per-channel reports should be + // admitted (different fingerprints). This test verifies the + // append-semantics interaction with flush-time sampling. + m := &mockMonitoringEndpoint{chTypedLogs: make(chan typedLog, 100)} + tm := newTelemeter(TelemeterParams{ + Logger: lggr, + MonitoringEndpoint: m, + DonID: donID, + CaptureReportTelemetry: true, + SampleTelemetry: true, + }) + servicetest.Run(t, tm) + ch := tm.GetReportTelemetryCh() + require.NotNil(t, ch) + + opts := &mockOpts{} + cd := opts.ConfigDigest() + obsTs := uint64(time.Unix(baseObservationUnix, 0).UnixNano()) + + // Append 3 reports at the same seqNr for distinct channels. + for _, channelID := range []uint32{10, 20, 30} { + ch <- &datastreamsllo.LLOReportTelemetry{ + ChannelId: channelID, + ObservationTimestampNanoseconds: obsTs, + SeqNr: opts.SeqNr(), + ConfigDigest: cd[:], + } + } + + testutils.RequireEventually(t, func() bool { + tm.telemetryBufferMu.Lock() + defer tm.telemetryBufferMu.Unlock() + return len(tm.telemetryBuffer[cd.Hex()][opts.SeqNr()]) == 3 + }) + + tm.TrackSeqNr(cd, opts.SeqNr()) + + received := make(map[uint32]struct{}) + for i := 0; i < 3; i++ { + select { + case tLog := <-m.chTypedLogs: + decoded := &datastreamsllo.LLOReportTelemetry{} + require.NoError(t, proto.Unmarshal(tLog.log, decoded)) + received[decoded.ChannelId] = struct{}{} + case <-time.After(testutils.WaitTimeout(t)): + t.Fatalf("timed out waiting for report #%d", i+1) + } + } + assert.Equal(t, map[uint32]struct{}{10: {}, 20: {}, 30: {}}, received, + "each per-channel report should be admitted (distinct sampler fingerprints)") + }) +} + func ptr[T any](t T) *T { return &t }