Skip to content

Commit d950ece

Browse files
authored
When kafka consumer metric is NaN, report previous value instead of invalid value (#6741)
Signed-off-by: Taylor Gray <tylgry@amazon.com>
1 parent ce4e86a commit d950ece

2 files changed

Lines changed: 119 additions & 0 deletions

File tree

data-prepper-plugins/kafka-plugins/src/main/java/org/opensearch/dataprepper/plugins/kafka/util/KafkaTopicConsumerMetrics.java

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -221,6 +221,10 @@ public void update(final KafkaConsumer consumer) {
221221
continue;
222222
}
223223
double newValue = (Double)value.metricValue();
224+
if (Double.isNaN(newValue) || Double.isInfinite(newValue)) {
225+
LOG.debug("Skipping non-finite metric value {} for {}", newValue, metricName);
226+
continue;
227+
}
224228
if (metricName.equals("records-consumed-total")) {
225229
synchronized(consumerMetrics) {
226230
double prevValue = consumerMetrics.get(metricName);

data-prepper-plugins/kafka-plugins/src/test/java/org/opensearch/dataprepper/plugins/kafka/util/KafkaTopicConsumerMetricsTests.java

Lines changed: 115 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -259,6 +259,121 @@ public void KafkaTopicMetricTest_checkMetricUpdates(int numConsumers) {
259259

260260
}
261261

262+
@Test
263+
public void update_withNaNMetricValue_preservesPreviousValue() {
264+
doAnswer((i) -> {
265+
bytesConsumedCount += (double)i.getArgument(0);
266+
return null;
267+
}).when(bytesConsumedCounter).increment(any(Double.class));
268+
doAnswer((i) -> {
269+
recordsConsumedCount += (double)i.getArgument(0);
270+
return null;
271+
}).when(recordsConsumedCounter).increment(any(Double.class));
272+
273+
topicMetrics = createObjectUnderTest();
274+
KafkaConsumer kafkaConsumer = mock(KafkaConsumer.class);
275+
topicMetrics.register(kafkaConsumer);
276+
277+
// First update with real values
278+
Map<MetricName, KafkaTestMetric> metrics = new HashMap<>();
279+
when(kafkaConsumer.metrics()).thenReturn(metrics);
280+
double expectedLag = 500.0;
281+
double expectedLead = 100.0;
282+
Map<String, String> emptyTags = new HashMap<>();
283+
Map<String, String> topicTags = new HashMap<>();
284+
topicTags.put("topic", topicName);
285+
metrics.put(getMetric("records-lag-max", expectedLag, emptyTags).metricName(),
286+
getMetric("records-lag-max", expectedLag, emptyTags));
287+
metrics.put(getMetric("records-lead-min", expectedLead, emptyTags).metricName(),
288+
getMetric("records-lead-min", expectedLead, emptyTags));
289+
metrics.put(getMetric("commit-rate", 5.0, emptyTags).metricName(),
290+
getMetric("commit-rate", 5.0, emptyTags));
291+
metrics.put(getMetric("bytes-consumed-total", 100.0, topicTags).metricName(),
292+
getMetric("bytes-consumed-total", 100.0, topicTags));
293+
metrics.put(getMetric("records-consumed-total", 10.0, topicTags).metricName(),
294+
getMetric("records-consumed-total", 10.0, topicTags));
295+
topicMetrics.update(kafkaConsumer);
296+
297+
// Second update with NaN values (simulating Kafka 3.x idle consumer)
298+
Map<MetricName, KafkaTestMetric> nanMetrics = new HashMap<>();
299+
when(kafkaConsumer.metrics()).thenReturn(nanMetrics);
300+
nanMetrics.put(getMetric("records-lag-max", Double.NaN, emptyTags).metricName(),
301+
getMetric("records-lag-max", Double.NaN, emptyTags));
302+
nanMetrics.put(getMetric("records-lead-min", Double.NaN, emptyTags).metricName(),
303+
getMetric("records-lead-min", Double.NaN, emptyTags));
304+
nanMetrics.put(getMetric("commit-rate", Double.NaN, emptyTags).metricName(),
305+
getMetric("commit-rate", Double.NaN, emptyTags));
306+
nanMetrics.put(getMetric("bytes-consumed-total", Double.NaN, topicTags).metricName(),
307+
getMetric("bytes-consumed-total", Double.NaN, topicTags));
308+
nanMetrics.put(getMetric("records-consumed-total", Double.NaN, topicTags).metricName(),
309+
getMetric("records-consumed-total", Double.NaN, topicTags));
310+
topicMetrics.update(kafkaConsumer);
311+
312+
// Verify gauges still return valid previous values, not NaN
313+
Map<KafkaConsumer, Map<String, Double>> metricValuesMap = topicMetrics.getMetricValues();
314+
Map<String, Double> consumerMetrics = metricValuesMap.get(kafkaConsumer);
315+
assertEquals(expectedLag, consumerMetrics.get("records-lag-max"), 0.01d);
316+
assertEquals(expectedLead, consumerMetrics.get("records-lead-min"), 0.01d);
317+
assertEquals(5.0, consumerMetrics.get("commit-rate"), 0.01d);
318+
319+
// Verify gauge callbacks produce valid numbers
320+
ToDoubleFunction lagGauge = pluginMetricsMap.get("topic." + topicName + ".recordsLagMax");
321+
double lagResult = lagGauge.applyAsDouble(metricValuesMap);
322+
assertEquals(expectedLag, lagResult, 0.01d);
323+
324+
ToDoubleFunction leadGauge = pluginMetricsMap.get("topic." + topicName + ".recordsLeadMin");
325+
double leadResult = leadGauge.applyAsDouble(metricValuesMap);
326+
assertEquals(expectedLead, leadResult, 0.01d);
327+
}
328+
329+
@Test
330+
public void update_withNegativeInfinityMetricValue_preservesPreviousValue() {
331+
topicMetrics = createObjectUnderTest();
332+
KafkaConsumer kafkaConsumer = mock(KafkaConsumer.class);
333+
topicMetrics.register(kafkaConsumer);
334+
335+
// First update with real values
336+
Map<MetricName, KafkaTestMetric> metrics = new HashMap<>();
337+
when(kafkaConsumer.metrics()).thenReturn(metrics);
338+
double expectedLag = 250.0;
339+
Map<String, String> emptyTags = new HashMap<>();
340+
metrics.put(getMetric("records-lag-max", expectedLag, emptyTags).metricName(),
341+
getMetric("records-lag-max", expectedLag, emptyTags));
342+
topicMetrics.update(kafkaConsumer);
343+
344+
// Second update with -Infinity (Kafka Max SampledStat empty window)
345+
Map<MetricName, KafkaTestMetric> infMetrics = new HashMap<>();
346+
when(kafkaConsumer.metrics()).thenReturn(infMetrics);
347+
infMetrics.put(getMetric("records-lag-max", Double.NEGATIVE_INFINITY, emptyTags).metricName(),
348+
getMetric("records-lag-max", Double.NEGATIVE_INFINITY, emptyTags));
349+
topicMetrics.update(kafkaConsumer);
350+
351+
Map<String, Double> consumerMetrics = topicMetrics.getMetricValues().get(kafkaConsumer);
352+
assertEquals(expectedLag, consumerMetrics.get("records-lag-max"), 0.01d);
353+
}
354+
355+
@Test
356+
public void update_withNaNMetricValue_andNopriorUpdate_preservesInitialZero() {
357+
topicMetrics = createObjectUnderTest();
358+
KafkaConsumer kafkaConsumer = mock(KafkaConsumer.class);
359+
topicMetrics.register(kafkaConsumer);
360+
361+
// First update is NaN (consumer never received any records)
362+
Map<MetricName, KafkaTestMetric> nanMetrics = new HashMap<>();
363+
when(kafkaConsumer.metrics()).thenReturn(nanMetrics);
364+
Map<String, String> emptyTags = new HashMap<>();
365+
nanMetrics.put(getMetric("records-lag-max", Double.NaN, emptyTags).metricName(),
366+
getMetric("records-lag-max", Double.NaN, emptyTags));
367+
topicMetrics.update(kafkaConsumer);
368+
369+
// Should preserve the initial 0.0 from register()
370+
Map<String, Double> consumerMetrics = topicMetrics.getMetricValues().get(kafkaConsumer);
371+
assertEquals(0.0, consumerMetrics.get("records-lag-max"), 0.01d);
372+
373+
ToDoubleFunction lagGauge = pluginMetricsMap.get("topic." + topicName + ".recordsLagMax");
374+
double lagResult = lagGauge.applyAsDouble(topicMetrics.getMetricValues());
375+
assertEquals(0.0, lagResult, 0.01d);
376+
}
262377

263378
@Test
264379
void recordTimeBetweenPolls_records_metric_correctly() throws InterruptedException {

0 commit comments

Comments
 (0)