Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -221,6 +221,10 @@ public void update(final KafkaConsumer consumer) {
continue;
}
double newValue = (Double)value.metricValue();
if (Double.isNaN(newValue) || Double.isInfinite(newValue)) {
LOG.debug("Skipping non-finite metric value {} for {}", newValue, metricName);
continue;
}
if (metricName.equals("records-consumed-total")) {
synchronized(consumerMetrics) {
double prevValue = consumerMetrics.get(metricName);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -259,6 +259,121 @@ public void KafkaTopicMetricTest_checkMetricUpdates(int numConsumers) {

}

@Test
public void update_withNaNMetricValue_preservesPreviousValue() {
doAnswer((i) -> {
bytesConsumedCount += (double)i.getArgument(0);
return null;
}).when(bytesConsumedCounter).increment(any(Double.class));
doAnswer((i) -> {
recordsConsumedCount += (double)i.getArgument(0);
return null;
}).when(recordsConsumedCounter).increment(any(Double.class));

topicMetrics = createObjectUnderTest();
KafkaConsumer kafkaConsumer = mock(KafkaConsumer.class);
topicMetrics.register(kafkaConsumer);

// First update with real values
Map<MetricName, KafkaTestMetric> metrics = new HashMap<>();
when(kafkaConsumer.metrics()).thenReturn(metrics);
double expectedLag = 500.0;
double expectedLead = 100.0;
Map<String, String> emptyTags = new HashMap<>();
Map<String, String> topicTags = new HashMap<>();
topicTags.put("topic", topicName);
metrics.put(getMetric("records-lag-max", expectedLag, emptyTags).metricName(),
getMetric("records-lag-max", expectedLag, emptyTags));
metrics.put(getMetric("records-lead-min", expectedLead, emptyTags).metricName(),
getMetric("records-lead-min", expectedLead, emptyTags));
metrics.put(getMetric("commit-rate", 5.0, emptyTags).metricName(),
getMetric("commit-rate", 5.0, emptyTags));
metrics.put(getMetric("bytes-consumed-total", 100.0, topicTags).metricName(),
getMetric("bytes-consumed-total", 100.0, topicTags));
metrics.put(getMetric("records-consumed-total", 10.0, topicTags).metricName(),
getMetric("records-consumed-total", 10.0, topicTags));
topicMetrics.update(kafkaConsumer);

// Second update with NaN values (simulating Kafka 3.x idle consumer)
Map<MetricName, KafkaTestMetric> nanMetrics = new HashMap<>();
when(kafkaConsumer.metrics()).thenReturn(nanMetrics);
nanMetrics.put(getMetric("records-lag-max", Double.NaN, emptyTags).metricName(),
getMetric("records-lag-max", Double.NaN, emptyTags));
nanMetrics.put(getMetric("records-lead-min", Double.NaN, emptyTags).metricName(),
getMetric("records-lead-min", Double.NaN, emptyTags));
nanMetrics.put(getMetric("commit-rate", Double.NaN, emptyTags).metricName(),
getMetric("commit-rate", Double.NaN, emptyTags));
nanMetrics.put(getMetric("bytes-consumed-total", Double.NaN, topicTags).metricName(),
getMetric("bytes-consumed-total", Double.NaN, topicTags));
nanMetrics.put(getMetric("records-consumed-total", Double.NaN, topicTags).metricName(),
getMetric("records-consumed-total", Double.NaN, topicTags));
topicMetrics.update(kafkaConsumer);

// Verify gauges still return valid previous values, not NaN
Map<KafkaConsumer, Map<String, Double>> metricValuesMap = topicMetrics.getMetricValues();
Map<String, Double> consumerMetrics = metricValuesMap.get(kafkaConsumer);
assertEquals(expectedLag, consumerMetrics.get("records-lag-max"), 0.01d);
assertEquals(expectedLead, consumerMetrics.get("records-lead-min"), 0.01d);
assertEquals(5.0, consumerMetrics.get("commit-rate"), 0.01d);

// Verify gauge callbacks produce valid numbers
ToDoubleFunction lagGauge = pluginMetricsMap.get("topic." + topicName + ".recordsLagMax");
double lagResult = lagGauge.applyAsDouble(metricValuesMap);
assertEquals(expectedLag, lagResult, 0.01d);

ToDoubleFunction leadGauge = pluginMetricsMap.get("topic." + topicName + ".recordsLeadMin");
double leadResult = leadGauge.applyAsDouble(metricValuesMap);
assertEquals(expectedLead, leadResult, 0.01d);
}

@Test
public void update_withNegativeInfinityMetricValue_preservesPreviousValue() {
topicMetrics = createObjectUnderTest();
KafkaConsumer kafkaConsumer = mock(KafkaConsumer.class);
topicMetrics.register(kafkaConsumer);

// First update with real values
Map<MetricName, KafkaTestMetric> metrics = new HashMap<>();
when(kafkaConsumer.metrics()).thenReturn(metrics);
double expectedLag = 250.0;
Map<String, String> emptyTags = new HashMap<>();
metrics.put(getMetric("records-lag-max", expectedLag, emptyTags).metricName(),
getMetric("records-lag-max", expectedLag, emptyTags));
topicMetrics.update(kafkaConsumer);

// Second update with -Infinity (Kafka Max SampledStat empty window)
Map<MetricName, KafkaTestMetric> infMetrics = new HashMap<>();
when(kafkaConsumer.metrics()).thenReturn(infMetrics);
infMetrics.put(getMetric("records-lag-max", Double.NEGATIVE_INFINITY, emptyTags).metricName(),
getMetric("records-lag-max", Double.NEGATIVE_INFINITY, emptyTags));
topicMetrics.update(kafkaConsumer);

Map<String, Double> consumerMetrics = topicMetrics.getMetricValues().get(kafkaConsumer);
assertEquals(expectedLag, consumerMetrics.get("records-lag-max"), 0.01d);
}

@Test
public void update_withNaNMetricValue_andNopriorUpdate_preservesInitialZero() {
topicMetrics = createObjectUnderTest();
KafkaConsumer kafkaConsumer = mock(KafkaConsumer.class);
topicMetrics.register(kafkaConsumer);

// First update is NaN (consumer never received any records)
Map<MetricName, KafkaTestMetric> nanMetrics = new HashMap<>();
when(kafkaConsumer.metrics()).thenReturn(nanMetrics);
Map<String, String> emptyTags = new HashMap<>();
nanMetrics.put(getMetric("records-lag-max", Double.NaN, emptyTags).metricName(),
getMetric("records-lag-max", Double.NaN, emptyTags));
topicMetrics.update(kafkaConsumer);

// Should preserve the initial 0.0 from register()
Map<String, Double> consumerMetrics = topicMetrics.getMetricValues().get(kafkaConsumer);
assertEquals(0.0, consumerMetrics.get("records-lag-max"), 0.01d);

ToDoubleFunction lagGauge = pluginMetricsMap.get("topic." + topicName + ".recordsLagMax");
double lagResult = lagGauge.applyAsDouble(topicMetrics.getMetricValues());
assertEquals(0.0, lagResult, 0.01d);
}

@Test
void recordTimeBetweenPolls_records_metric_correctly() throws InterruptedException {
Expand Down
Loading