Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -210,6 +210,7 @@ private AcknowledgementSet createAcknowledgementSet(Map<TopicPartition, CommitOf
}

<T> ConsumerRecords<String, T> doPoll() throws Exception {
topicMetrics.recordTimeBetweenPolls();
ConsumerRecords<String, T> records =
consumer.poll(Duration.ofMillis(topicConfig.getThreadWaitingTime().toMillis()/2));
return records;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
package org.opensearch.dataprepper.plugins.kafka.util;

import io.micrometer.core.instrument.Counter;
import io.micrometer.core.instrument.Timer;
import org.opensearch.dataprepper.metrics.PluginMetrics;
import org.apache.kafka.common.Metric;
import org.apache.kafka.common.MetricName;
Expand All @@ -17,6 +18,7 @@
import java.util.Objects;
import java.util.Map;
import java.util.HashMap;
import java.util.concurrent.TimeUnit;

public class KafkaTopicConsumerMetrics {
private static final Logger LOG = LoggerFactory.getLogger(KafkaTopicConsumerMetrics.class);
Expand All @@ -30,6 +32,7 @@ public class KafkaTopicConsumerMetrics {
static final String NUMBER_OF_RECORDS_COMMITTED = "numberOfRecordsCommitted";
static final String NUMBER_OF_RECORDS_CONSUMED = "numberOfRecordsConsumed";
static final String NUMBER_OF_BYTES_CONSUMED = "numberOfBytesConsumed";
static final String ACTUAL_POLL_INTERVAL = "actualPollInterval";

private final String topicName;
private long updateTime;
Expand All @@ -46,6 +49,8 @@ public class KafkaTopicConsumerMetrics {
private final Counter numberOfRecordsCommitted;
private final Counter numberOfRecordsConsumed;
private final Counter numberOfBytesConsumed;
private final Timer timeBetweenPollCalls;
private Instant lastPollTime;

public KafkaTopicConsumerMetrics(final String topicName, final PluginMetrics pluginMetrics,
final boolean topicNameInMetrics) {
Expand All @@ -64,6 +69,8 @@ public KafkaTopicConsumerMetrics(final String topicName, final PluginMetrics plu
this.numberOfPollAuthErrors = pluginMetrics.counter(getTopicMetricName(NUMBER_OF_POLL_AUTH_ERRORS, topicNameInMetrics));
this.numberOfPositiveAcknowledgements = pluginMetrics.counter(getTopicMetricName(NUMBER_OF_POSITIVE_ACKNOWLEDGEMENTS, topicNameInMetrics));
this.numberOfNegativeAcknowledgements = pluginMetrics.counter(getTopicMetricName(NUMBER_OF_NEGATIVE_ACKNOWLEDGEMENTS, topicNameInMetrics));
this.timeBetweenPollCalls = pluginMetrics.timer(getTopicMetricName(ACTUAL_POLL_INTERVAL, topicNameInMetrics));
lastPollTime = Instant.now();
}

private void initializeMetricNamesMap(final boolean topicNameInMetrics) {
Expand Down Expand Up @@ -168,6 +175,12 @@ public Counter getNumberOfPositiveAcknowledgements() {
return numberOfPositiveAcknowledgements;
}

public void recordTimeBetweenPolls() {
final long timeBetweenPolls = Instant.now().toEpochMilli() - lastPollTime.toEpochMilli();
timeBetweenPollCalls.record(timeBetweenPolls, TimeUnit.MILLISECONDS);
lastPollTime = Instant.now();
}

private String getTopicMetricName(final String metricName, final boolean topicNameInMetrics) {
if (topicNameInMetrics) {
return "topic." + topicName + "." + metricName;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,7 @@
import static org.mockito.Mockito.doAnswer;
import static org.mockito.Mockito.doThrow;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;

@ExtendWith(MockitoExtension.class)
Expand Down Expand Up @@ -327,6 +328,8 @@ public void testPlainTextConsumeRecords() throws InterruptedException {
Assertions.assertNotNull(event.getMetadata().getExternalOriginationTime());
Assertions.assertNotNull(event.getEventHandle().getExternalOriginationTime());
}

verify(topicMetrics).recordTimeBetweenPolls();
}

@Test
Expand Down Expand Up @@ -377,6 +380,8 @@ public void testPlainTextConsumeRecordsWithAcknowledgements() throws Interrupted
});
// This counter should not be incremented with acknowledgements
Assertions.assertEquals(consumer.getNumRecordsCommitted(), 0L);

verify(topicMetrics).recordTimeBetweenPolls();
}

@Test
Expand Down Expand Up @@ -420,6 +425,8 @@ public void testPlainTextConsumeRecordsWithNegativeAcknowledgements() throws Int
consumer.processAcknowledgedOffsets();
offsetsToCommit = consumer.getOffsetsToCommit();
Assertions.assertEquals(offsetsToCommit.size(), 0);

verify(topicMetrics).recordTimeBetweenPolls();
}

@Test
Expand Down Expand Up @@ -458,6 +465,8 @@ public void testJsonConsumeRecords() throws InterruptedException, Exception {
Assertions.assertNotNull(event.getMetadata().getExternalOriginationTime());
Assertions.assertNotNull(event.getEventHandle().getExternalOriginationTime());
}

verify(topicMetrics).recordTimeBetweenPolls();
}

@Test
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,32 +5,40 @@

package org.opensearch.dataprepper.plugins.kafka.util;

import io.micrometer.core.instrument.Counter;
import io.micrometer.core.instrument.Timer;
import org.apache.commons.lang3.RandomStringUtils;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.Metric;
import org.apache.kafka.common.MetricName;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.ExtendWith;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.ValueSource;
import org.junit.jupiter.api.extension.ExtendWith;
import static org.junit.jupiter.api.Assertions.assertEquals;

import org.mockito.ArgumentCaptor;
import org.mockito.Mock;
import org.mockito.junit.jupiter.MockitoExtension;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;
import static org.mockito.Mockito.doAnswer;
import org.opensearch.dataprepper.metrics.PluginMetrics;
import org.apache.kafka.common.Metric;
import org.apache.kafka.common.MetricName;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.commons.lang3.RandomStringUtils;

import io.micrometer.core.instrument.Counter;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.CoreMatchers.equalTo;

import java.util.Map;
import java.util.HashMap;
import java.util.Map;
import java.util.Random;
import java.util.function.ToDoubleFunction;
import java.util.concurrent.TimeUnit;
import java.util.function.ToDoubleFunction;

import static org.hamcrest.CoreMatchers.equalTo;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.Matchers.greaterThan;
import static org.hamcrest.Matchers.lessThan;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.ArgumentMatchers.eq;
import static org.mockito.Mockito.doAnswer;
import static org.mockito.Mockito.doNothing;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;
import static org.opensearch.dataprepper.plugins.kafka.util.KafkaTopicConsumerMetrics.ACTUAL_POLL_INTERVAL;

@ExtendWith(MockitoExtension.class)
public class KafkaTopicConsumerMetricsTests {
Expand Down Expand Up @@ -79,6 +87,9 @@ public Object metricValue() {
@Mock
private Counter bytesConsumedCounter;

@Mock
private Timer timeBetweenPolls;

@Mock
private Counter recordsConsumedCounter;
private double bytesConsumedCount;
Expand Down Expand Up @@ -122,14 +133,7 @@ void setUp() {
return recordsConsumedCounter;
}
}).when(pluginMetrics).counter(any(String.class));
doAnswer((i) -> {
bytesConsumedCount += (double)i.getArgument(0);
return null;
}).when(bytesConsumedCounter).increment(any(Double.class));
doAnswer((i) -> {
recordsConsumedCount += (double)i.getArgument(0);
return null;
}).when(recordsConsumedCounter).increment(any(Double.class));
when(pluginMetrics.timer("topic." + topicName + "." + ACTUAL_POLL_INTERVAL)).thenReturn(timeBetweenPolls);
}

public KafkaTopicConsumerMetrics createObjectUnderTest() {
Expand Down Expand Up @@ -205,6 +209,15 @@ private void populateKafkaMetrics(Map<MetricName, KafkaTestMetric> metrics, doub
@ValueSource(ints = {1, 5, 10})
//@ValueSource(ints = {2})
public void KafkaTopicMetricTest_checkMetricUpdates(int numConsumers) {
doAnswer((i) -> {
bytesConsumedCount += (double)i.getArgument(0);
return null;
}).when(bytesConsumedCounter).increment(any(Double.class));
doAnswer((i) -> {
recordsConsumedCount += (double)i.getArgument(0);
return null;
}).when(recordsConsumedCounter).increment(any(Double.class));

topicMetrics = createObjectUnderTest();
for (int i = 0; i < numConsumers; i++) {
KafkaConsumer kafkaConsumer = mock(KafkaConsumer.class);
Expand Down Expand Up @@ -246,4 +259,22 @@ public void KafkaTopicMetricTest_checkMetricUpdates(int numConsumers) {

}


@Test
void recordTimeBetweenPolls_records_metric_correctly() throws InterruptedException {
topicMetrics = createObjectUnderTest();

final ArgumentCaptor<Long> recordedTimeCaptor = ArgumentCaptor.forClass(Long.class);

doNothing().when(timeBetweenPolls).record(recordedTimeCaptor.capture(), eq(TimeUnit.MILLISECONDS));

Thread.sleep(100);
topicMetrics.recordTimeBetweenPolls();

final long recordedTime = recordedTimeCaptor.getValue();

assertThat(recordedTime, greaterThan(0L));
assertThat(recordedTime, lessThan(1000L));
}

}
Loading