diff --git a/data-prepper-plugins/kafka-plugins/src/main/java/org/opensearch/dataprepper/plugins/kafka/util/KafkaTopicConsumerMetrics.java b/data-prepper-plugins/kafka-plugins/src/main/java/org/opensearch/dataprepper/plugins/kafka/util/KafkaTopicConsumerMetrics.java index 4a2be03513..aba517177e 100644 --- a/data-prepper-plugins/kafka-plugins/src/main/java/org/opensearch/dataprepper/plugins/kafka/util/KafkaTopicConsumerMetrics.java +++ b/data-prepper-plugins/kafka-plugins/src/main/java/org/opensearch/dataprepper/plugins/kafka/util/KafkaTopicConsumerMetrics.java @@ -221,6 +221,10 @@ public void update(final KafkaConsumer consumer) { continue; } double newValue = (Double)value.metricValue(); + if (Double.isNaN(newValue) || Double.isInfinite(newValue)) { + LOG.debug("Skipping non-finite metric value {} for {}", newValue, metricName); + continue; + } if (metricName.equals("records-consumed-total")) { synchronized(consumerMetrics) { double prevValue = consumerMetrics.get(metricName); diff --git a/data-prepper-plugins/kafka-plugins/src/test/java/org/opensearch/dataprepper/plugins/kafka/util/KafkaTopicConsumerMetricsTests.java b/data-prepper-plugins/kafka-plugins/src/test/java/org/opensearch/dataprepper/plugins/kafka/util/KafkaTopicConsumerMetricsTests.java index 025df07532..9bf2f3e130 100644 --- a/data-prepper-plugins/kafka-plugins/src/test/java/org/opensearch/dataprepper/plugins/kafka/util/KafkaTopicConsumerMetricsTests.java +++ b/data-prepper-plugins/kafka-plugins/src/test/java/org/opensearch/dataprepper/plugins/kafka/util/KafkaTopicConsumerMetricsTests.java @@ -259,6 +259,121 @@ public void KafkaTopicMetricTest_checkMetricUpdates(int numConsumers) { } + @Test + public void update_withNaNMetricValue_preservesPreviousValue() { + doAnswer((i) -> { + bytesConsumedCount += (double)i.getArgument(0); + return null; + }).when(bytesConsumedCounter).increment(any(Double.class)); + doAnswer((i) -> { + recordsConsumedCount += (double)i.getArgument(0); + return null; + }).when(recordsConsumedCounter).increment(any(Double.class)); + + topicMetrics = createObjectUnderTest(); + KafkaConsumer kafkaConsumer = mock(KafkaConsumer.class); + topicMetrics.register(kafkaConsumer); + + // First update with real values + Map metrics = new HashMap<>(); + when(kafkaConsumer.metrics()).thenReturn(metrics); + double expectedLag = 500.0; + double expectedLead = 100.0; + Map emptyTags = new HashMap<>(); + Map topicTags = new HashMap<>(); + topicTags.put("topic", topicName); + metrics.put(getMetric("records-lag-max", expectedLag, emptyTags).metricName(), + getMetric("records-lag-max", expectedLag, emptyTags)); + metrics.put(getMetric("records-lead-min", expectedLead, emptyTags).metricName(), + getMetric("records-lead-min", expectedLead, emptyTags)); + metrics.put(getMetric("commit-rate", 5.0, emptyTags).metricName(), + getMetric("commit-rate", 5.0, emptyTags)); + metrics.put(getMetric("bytes-consumed-total", 100.0, topicTags).metricName(), + getMetric("bytes-consumed-total", 100.0, topicTags)); + metrics.put(getMetric("records-consumed-total", 10.0, topicTags).metricName(), + getMetric("records-consumed-total", 10.0, topicTags)); + topicMetrics.update(kafkaConsumer); + + // Second update with NaN values (simulating Kafka 3.x idle consumer) + Map nanMetrics = new HashMap<>(); + when(kafkaConsumer.metrics()).thenReturn(nanMetrics); + nanMetrics.put(getMetric("records-lag-max", Double.NaN, emptyTags).metricName(), + getMetric("records-lag-max", Double.NaN, emptyTags)); + nanMetrics.put(getMetric("records-lead-min", Double.NaN, emptyTags).metricName(), + getMetric("records-lead-min", Double.NaN, emptyTags)); + nanMetrics.put(getMetric("commit-rate", Double.NaN, emptyTags).metricName(), + getMetric("commit-rate", Double.NaN, emptyTags)); + nanMetrics.put(getMetric("bytes-consumed-total", Double.NaN, topicTags).metricName(), + getMetric("bytes-consumed-total", Double.NaN, topicTags)); + nanMetrics.put(getMetric("records-consumed-total", Double.NaN, topicTags).metricName(), + getMetric("records-consumed-total", Double.NaN, topicTags)); + topicMetrics.update(kafkaConsumer); + + // Verify gauges still return valid previous values, not NaN + Map> metricValuesMap = topicMetrics.getMetricValues(); + Map consumerMetrics = metricValuesMap.get(kafkaConsumer); + assertEquals(expectedLag, consumerMetrics.get("records-lag-max"), 0.01d); + assertEquals(expectedLead, consumerMetrics.get("records-lead-min"), 0.01d); + assertEquals(5.0, consumerMetrics.get("commit-rate"), 0.01d); + + // Verify gauge callbacks produce valid numbers + ToDoubleFunction lagGauge = pluginMetricsMap.get("topic." + topicName + ".recordsLagMax"); + double lagResult = lagGauge.applyAsDouble(metricValuesMap); + assertEquals(expectedLag, lagResult, 0.01d); + + ToDoubleFunction leadGauge = pluginMetricsMap.get("topic." + topicName + ".recordsLeadMin"); + double leadResult = leadGauge.applyAsDouble(metricValuesMap); + assertEquals(expectedLead, leadResult, 0.01d); + } + + @Test + public void update_withNegativeInfinityMetricValue_preservesPreviousValue() { + topicMetrics = createObjectUnderTest(); + KafkaConsumer kafkaConsumer = mock(KafkaConsumer.class); + topicMetrics.register(kafkaConsumer); + + // First update with real values + Map metrics = new HashMap<>(); + when(kafkaConsumer.metrics()).thenReturn(metrics); + double expectedLag = 250.0; + Map emptyTags = new HashMap<>(); + metrics.put(getMetric("records-lag-max", expectedLag, emptyTags).metricName(), + getMetric("records-lag-max", expectedLag, emptyTags)); + topicMetrics.update(kafkaConsumer); + + // Second update with -Infinity (Kafka Max SampledStat empty window) + Map infMetrics = new HashMap<>(); + when(kafkaConsumer.metrics()).thenReturn(infMetrics); + infMetrics.put(getMetric("records-lag-max", Double.NEGATIVE_INFINITY, emptyTags).metricName(), + getMetric("records-lag-max", Double.NEGATIVE_INFINITY, emptyTags)); + topicMetrics.update(kafkaConsumer); + + Map consumerMetrics = topicMetrics.getMetricValues().get(kafkaConsumer); + assertEquals(expectedLag, consumerMetrics.get("records-lag-max"), 0.01d); + } + + @Test + public void update_withNaNMetricValue_andNopriorUpdate_preservesInitialZero() { + topicMetrics = createObjectUnderTest(); + KafkaConsumer kafkaConsumer = mock(KafkaConsumer.class); + topicMetrics.register(kafkaConsumer); + + // First update is NaN (consumer never received any records) + Map nanMetrics = new HashMap<>(); + when(kafkaConsumer.metrics()).thenReturn(nanMetrics); + Map emptyTags = new HashMap<>(); + nanMetrics.put(getMetric("records-lag-max", Double.NaN, emptyTags).metricName(), + getMetric("records-lag-max", Double.NaN, emptyTags)); + topicMetrics.update(kafkaConsumer); + + // Should preserve the initial 0.0 from register() + Map consumerMetrics = topicMetrics.getMetricValues().get(kafkaConsumer); + assertEquals(0.0, consumerMetrics.get("records-lag-max"), 0.01d); + + ToDoubleFunction lagGauge = pluginMetricsMap.get("topic." + topicName + ".recordsLagMax"); + double lagResult = lagGauge.applyAsDouble(topicMetrics.getMetricValues()); + assertEquals(0.0, lagResult, 0.01d); + } @Test void recordTimeBetweenPolls_records_metric_correctly() throws InterruptedException {