Skip to content

Commit 7a458cc

Browse files
committed
Add metric tracking time between poll calls for kafka consumer
Signed-off-by: Taylor Gray <tylgry@amazon.com>
1 parent 0ddc84b commit 7a458cc

4 files changed

Lines changed: 79 additions & 25 deletions

File tree

data-prepper-plugins/kafka-plugins/src/main/java/org/opensearch/dataprepper/plugins/kafka/consumer/KafkaCustomConsumer.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -210,6 +210,7 @@ private AcknowledgementSet createAcknowledgementSet(Map<TopicPartition, CommitOf
210210
}
211211

212212
<T> ConsumerRecords<String, T> doPoll() throws Exception {
213+
topicMetrics.recordTimeBetweenPolls();
213214
ConsumerRecords<String, T> records =
214215
consumer.poll(Duration.ofMillis(topicConfig.getThreadWaitingTime().toMillis()/2));
215216
return records;

data-prepper-plugins/kafka-plugins/src/main/java/org/opensearch/dataprepper/plugins/kafka/util/KafkaTopicConsumerMetrics.java

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@
66
package org.opensearch.dataprepper.plugins.kafka.util;
77

88
import io.micrometer.core.instrument.Counter;
9+
import io.micrometer.core.instrument.Timer;
910
import org.opensearch.dataprepper.metrics.PluginMetrics;
1011
import org.apache.kafka.common.Metric;
1112
import org.apache.kafka.common.MetricName;
@@ -17,6 +18,7 @@
1718
import java.util.Objects;
1819
import java.util.Map;
1920
import java.util.HashMap;
21+
import java.util.concurrent.TimeUnit;
2022

2123
public class KafkaTopicConsumerMetrics {
2224
private static final Logger LOG = LoggerFactory.getLogger(KafkaTopicConsumerMetrics.class);
@@ -30,6 +32,7 @@ public class KafkaTopicConsumerMetrics {
3032
static final String NUMBER_OF_RECORDS_COMMITTED = "numberOfRecordsCommitted";
3133
static final String NUMBER_OF_RECORDS_CONSUMED = "numberOfRecordsConsumed";
3234
static final String NUMBER_OF_BYTES_CONSUMED = "numberOfBytesConsumed";
35+
static final String TIME_BETWEEN_POLL_CALLS = "timeBetweenPollCalls";
3336

3437
private final String topicName;
3538
private long updateTime;
@@ -46,6 +49,8 @@ public class KafkaTopicConsumerMetrics {
4649
private final Counter numberOfRecordsCommitted;
4750
private final Counter numberOfRecordsConsumed;
4851
private final Counter numberOfBytesConsumed;
52+
private final Timer timeBetweenPollCalls;
53+
private Instant lastPollTime;
4954

5055
public KafkaTopicConsumerMetrics(final String topicName, final PluginMetrics pluginMetrics,
5156
final boolean topicNameInMetrics) {
@@ -64,6 +69,8 @@ public KafkaTopicConsumerMetrics(final String topicName, final PluginMetrics plu
6469
this.numberOfPollAuthErrors = pluginMetrics.counter(getTopicMetricName(NUMBER_OF_POLL_AUTH_ERRORS, topicNameInMetrics));
6570
this.numberOfPositiveAcknowledgements = pluginMetrics.counter(getTopicMetricName(NUMBER_OF_POSITIVE_ACKNOWLEDGEMENTS, topicNameInMetrics));
6671
this.numberOfNegativeAcknowledgements = pluginMetrics.counter(getTopicMetricName(NUMBER_OF_NEGATIVE_ACKNOWLEDGEMENTS, topicNameInMetrics));
72+
this.timeBetweenPollCalls = pluginMetrics.timer(getTopicMetricName(TIME_BETWEEN_POLL_CALLS, topicNameInMetrics));
73+
lastPollTime = Instant.now();
6774
}
6875

6976
private void initializeMetricNamesMap(final boolean topicNameInMetrics) {
@@ -168,6 +175,12 @@ public Counter getNumberOfPositiveAcknowledgements() {
168175
return numberOfPositiveAcknowledgements;
169176
}
170177

178+
public void recordTimeBetweenPolls() {
179+
final long timeBetweenPolls = Instant.now().toEpochMilli() - lastPollTime.toEpochMilli();
180+
timeBetweenPollCalls.record(timeBetweenPolls, TimeUnit.MILLISECONDS);
181+
lastPollTime = Instant.now();
182+
}
183+
171184
private String getTopicMetricName(final String metricName, final boolean topicNameInMetrics) {
172185
if (topicNameInMetrics) {
173186
return "topic." + topicName + "." + metricName;

data-prepper-plugins/kafka-plugins/src/test/java/org/opensearch/dataprepper/plugins/kafka/consumer/KafkaCustomConsumerTest.java

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -70,6 +70,7 @@
7070
import static org.mockito.Mockito.doAnswer;
7171
import static org.mockito.Mockito.doThrow;
7272
import static org.mockito.Mockito.mock;
73+
import static org.mockito.Mockito.verify;
7374
import static org.mockito.Mockito.when;
7475

7576
@ExtendWith(MockitoExtension.class)
@@ -327,6 +328,8 @@ public void testPlainTextConsumeRecords() throws InterruptedException {
327328
Assertions.assertNotNull(event.getMetadata().getExternalOriginationTime());
328329
Assertions.assertNotNull(event.getEventHandle().getExternalOriginationTime());
329330
}
331+
332+
verify(topicMetrics).recordTimeBetweenPolls();
330333
}
331334

332335
@Test
@@ -377,6 +380,8 @@ public void testPlainTextConsumeRecordsWithAcknowledgements() throws Interrupted
377380
});
378381
// This counter should not be incremented with acknowledgements
379382
Assertions.assertEquals(consumer.getNumRecordsCommitted(), 0L);
383+
384+
verify(topicMetrics).recordTimeBetweenPolls();
380385
}
381386

382387
@Test
@@ -420,6 +425,8 @@ public void testPlainTextConsumeRecordsWithNegativeAcknowledgements() throws Int
420425
consumer.processAcknowledgedOffsets();
421426
offsetsToCommit = consumer.getOffsetsToCommit();
422427
Assertions.assertEquals(offsetsToCommit.size(), 0);
428+
429+
verify(topicMetrics).recordTimeBetweenPolls();
423430
}
424431

425432
@Test
@@ -458,6 +465,8 @@ public void testJsonConsumeRecords() throws InterruptedException, Exception {
458465
Assertions.assertNotNull(event.getMetadata().getExternalOriginationTime());
459466
Assertions.assertNotNull(event.getEventHandle().getExternalOriginationTime());
460467
}
468+
469+
verify(topicMetrics).recordTimeBetweenPolls();
461470
}
462471

463472
@Test

data-prepper-plugins/kafka-plugins/src/test/java/org/opensearch/dataprepper/plugins/kafka/util/KafkaTopicConsumerMetricsTests.java

Lines changed: 56 additions & 25 deletions
Original file line numberDiff line numberDiff line change
@@ -5,32 +5,40 @@
55

66
package org.opensearch.dataprepper.plugins.kafka.util;
77

8+
import io.micrometer.core.instrument.Counter;
9+
import io.micrometer.core.instrument.Timer;
10+
import org.apache.commons.lang3.RandomStringUtils;
11+
import org.apache.kafka.clients.consumer.KafkaConsumer;
12+
import org.apache.kafka.common.Metric;
13+
import org.apache.kafka.common.MetricName;
814
import org.junit.jupiter.api.BeforeEach;
15+
import org.junit.jupiter.api.Test;
16+
import org.junit.jupiter.api.extension.ExtendWith;
917
import org.junit.jupiter.params.ParameterizedTest;
1018
import org.junit.jupiter.params.provider.ValueSource;
11-
import org.junit.jupiter.api.extension.ExtendWith;
12-
import static org.junit.jupiter.api.Assertions.assertEquals;
13-
19+
import org.mockito.ArgumentCaptor;
1420
import org.mockito.Mock;
1521
import org.mockito.junit.jupiter.MockitoExtension;
16-
import static org.mockito.ArgumentMatchers.any;
17-
import static org.mockito.Mockito.mock;
18-
import static org.mockito.Mockito.when;
19-
import static org.mockito.Mockito.doAnswer;
2022
import org.opensearch.dataprepper.metrics.PluginMetrics;
21-
import org.apache.kafka.common.Metric;
22-
import org.apache.kafka.common.MetricName;
23-
import org.apache.kafka.clients.consumer.KafkaConsumer;
24-
import org.apache.commons.lang3.RandomStringUtils;
25-
26-
import io.micrometer.core.instrument.Counter;
27-
import static org.hamcrest.MatcherAssert.assertThat;
28-
import static org.hamcrest.CoreMatchers.equalTo;
2923

30-
import java.util.Map;
3124
import java.util.HashMap;
25+
import java.util.Map;
3226
import java.util.Random;
33-
import java.util.function.ToDoubleFunction;
27+
import java.util.concurrent.TimeUnit;
28+
import java.util.function.ToDoubleFunction;
29+
30+
import static org.hamcrest.CoreMatchers.equalTo;
31+
import static org.hamcrest.MatcherAssert.assertThat;
32+
import static org.hamcrest.Matchers.greaterThan;
33+
import static org.hamcrest.Matchers.lessThan;
34+
import static org.junit.jupiter.api.Assertions.assertEquals;
35+
import static org.mockito.ArgumentMatchers.any;
36+
import static org.mockito.ArgumentMatchers.eq;
37+
import static org.mockito.Mockito.doAnswer;
38+
import static org.mockito.Mockito.doNothing;
39+
import static org.mockito.Mockito.mock;
40+
import static org.mockito.Mockito.when;
41+
import static org.opensearch.dataprepper.plugins.kafka.util.KafkaTopicConsumerMetrics.TIME_BETWEEN_POLL_CALLS;
3442

3543
@ExtendWith(MockitoExtension.class)
3644
public class KafkaTopicConsumerMetricsTests {
@@ -79,6 +87,9 @@ public Object metricValue() {
7987
@Mock
8088
private Counter bytesConsumedCounter;
8189

90+
@Mock
91+
private Timer timeBetweenPolls;
92+
8293
@Mock
8394
private Counter recordsConsumedCounter;
8495
private double bytesConsumedCount;
@@ -122,14 +133,7 @@ void setUp() {
122133
return recordsConsumedCounter;
123134
}
124135
}).when(pluginMetrics).counter(any(String.class));
125-
doAnswer((i) -> {
126-
bytesConsumedCount += (double)i.getArgument(0);
127-
return null;
128-
}).when(bytesConsumedCounter).increment(any(Double.class));
129-
doAnswer((i) -> {
130-
recordsConsumedCount += (double)i.getArgument(0);
131-
return null;
132-
}).when(recordsConsumedCounter).increment(any(Double.class));
136+
when(pluginMetrics.timer("topic." + topicName + "." + TIME_BETWEEN_POLL_CALLS)).thenReturn(timeBetweenPolls);
133137
}
134138

135139
public KafkaTopicConsumerMetrics createObjectUnderTest() {
@@ -205,6 +209,15 @@ private void populateKafkaMetrics(Map<MetricName, KafkaTestMetric> metrics, doub
205209
@ValueSource(ints = {1, 5, 10})
206210
//@ValueSource(ints = {2})
207211
public void KafkaTopicMetricTest_checkMetricUpdates(int numConsumers) {
212+
doAnswer((i) -> {
213+
bytesConsumedCount += (double)i.getArgument(0);
214+
return null;
215+
}).when(bytesConsumedCounter).increment(any(Double.class));
216+
doAnswer((i) -> {
217+
recordsConsumedCount += (double)i.getArgument(0);
218+
return null;
219+
}).when(recordsConsumedCounter).increment(any(Double.class));
220+
208221
topicMetrics = createObjectUnderTest();
209222
for (int i = 0; i < numConsumers; i++) {
210223
KafkaConsumer kafkaConsumer = mock(KafkaConsumer.class);
@@ -246,4 +259,22 @@ public void KafkaTopicMetricTest_checkMetricUpdates(int numConsumers) {
246259

247260
}
248261

262+
263+
@Test
264+
void recordTimeBetweenPolls_records_metric_correctly() throws InterruptedException {
265+
topicMetrics = createObjectUnderTest();
266+
267+
final ArgumentCaptor<Long> recordedTimeCaptor = ArgumentCaptor.forClass(Long.class);
268+
269+
doNothing().when(timeBetweenPolls).record(recordedTimeCaptor.capture(), eq(TimeUnit.MILLISECONDS));
270+
271+
Thread.sleep(100);
272+
topicMetrics.recordTimeBetweenPolls();
273+
274+
final long recordedTime = recordedTimeCaptor.getValue();
275+
276+
assertThat(recordedTime, greaterThan(0L));
277+
assertThat(recordedTime, lessThan(1000L));
278+
}
279+
249280
}

0 commit comments

Comments
 (0)