Skip to content

Commit c2016c5

Browse files
author
hqbfzwang
committed
opti test and add javadoc
1 parent dc6a48b commit c2016c5

4 files changed

Lines changed: 22 additions & 4 deletions

File tree

pulsar-client/src/main/java/org/apache/pulsar/client/impl/metrics/ConsumerMetrics.java

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,11 @@
2020

2121
import io.opentelemetry.api.common.Attributes;
2222

23+
/**
24+
* Encapsulates OpenTelemetry metrics for a Pulsar consumer, tracking message receives,
25+
* prefetch queue size, acknowledgements, negative acknowledgements, dead-letter queue
26+
* messages, and consumer session open/close events.
27+
*/
2328
public class ConsumerMetrics {
2429

2530
private final Counter messagesReceivedCounter;

pulsar-client/src/main/java/org/apache/pulsar/client/impl/metrics/ProducerMetrics.java

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,11 @@
2020

2121
import io.opentelemetry.api.common.Attributes;
2222

23+
/**
24+
* Encapsulates OpenTelemetry metrics for a Pulsar producer, tracking message send latency,
25+
* RPC latency, published bytes, pending queue message count and size, and producer session
26+
* open/close events.
27+
*/
2328
public class ProducerMetrics {
2429

2530
private final LatencyHistogram sendLatencyHistogram;

pulsar-client/src/test/java/org/apache/pulsar/client/impl/OpSendMsgQueueTest.java

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,7 @@
2424
import com.google.common.collect.Lists;
2525
import java.util.Arrays;
2626
import java.util.Iterator;
27+
import org.apache.pulsar.client.impl.metrics.InstrumentProvider;
2728
import org.apache.pulsar.client.impl.metrics.ProducerMetrics;
2829
import org.testng.annotations.BeforeClass;
2930
import org.testng.annotations.Test;
@@ -32,6 +33,9 @@
3233
* Contains unit tests for ProducerImpl.OpSendMsgQueue inner class.
3334
*/
3435
public class OpSendMsgQueueTest {
36+
private static final ProducerMetrics NOOP_PRODUCER_METRICS =
37+
new ProducerMetrics(InstrumentProvider.NOOP, "test-topic");
38+
3539
MessageImpl<?> message;
3640

3741
@BeforeClass
@@ -41,7 +45,7 @@ public void createMockMessage() {
4145
}
4246

4347
private ProducerImpl.OpSendMsg createDummyOpSendMsg() {
44-
return ProducerImpl.OpSendMsg.create((ProducerMetrics) null, message, null, 0L, null);
48+
return ProducerImpl.OpSendMsg.create(NOOP_PRODUCER_METRICS, message, null, 0L, null);
4549
}
4650

4751
@Test

pulsar-client/src/test/java/org/apache/pulsar/client/impl/ProducerImplTest.java

Lines changed: 7 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -30,13 +30,17 @@
3030
import org.apache.commons.lang3.reflect.FieldUtils;
3131
import org.apache.pulsar.client.api.PulsarClientException;
3232
import org.apache.pulsar.client.api.Schema;
33+
import org.apache.pulsar.client.impl.metrics.InstrumentProvider;
3334
import org.apache.pulsar.client.impl.metrics.ProducerMetrics;
3435
import org.apache.pulsar.common.api.proto.MessageMetadata;
3536
import org.apache.pulsar.common.protocol.ByteBufPair;
3637
import org.mockito.Mockito;
3738
import org.testng.annotations.Test;
3839

3940
public class ProducerImplTest {
41+
private static final ProducerMetrics NOOP_PRODUCER_METRICS =
42+
new ProducerMetrics(InstrumentProvider.NOOP, "test-topic");
43+
4044
@Test
4145
public void testChunkedMessageCtxDeallocate() {
4246
int totalChunks = 3;
@@ -47,7 +51,7 @@ public void testChunkedMessageCtxDeallocate() {
4751
for (int i = 0; i < totalChunks; i++) {
4852
ProducerImpl.OpSendMsg opSendMsg =
4953
ProducerImpl.OpSendMsg.create(
50-
(ProducerMetrics) null,
54+
NOOP_PRODUCER_METRICS,
5155
MessageImpl.create(new MessageMetadata(), ByteBuffer.allocate(0), Schema.STRING, null),
5256
null, 0, null);
5357
opSendMsg.chunkedMessageCtx = ctx;
@@ -103,7 +107,7 @@ public void testFailPendingMessagesSyncRetry()
103107
MessageImpl<?> msg = Mockito.mock(MessageImpl.class);
104108
Mockito.when(msg.getUncompressedSize()).thenReturn(10);
105109
ProducerImpl.OpSendMsg op = ProducerImpl.OpSendMsg.create(
106-
(ProducerMetrics) null,
110+
NOOP_PRODUCER_METRICS,
107111
msg,
108112
Mockito.mock(ByteBufPair.class),
109113
1L,
@@ -121,7 +125,7 @@ public void testFailPendingMessagesSyncRetry()
121125
Mockito.doAnswer(invocation -> {
122126
// Reentrant retry during callback
123127
ProducerImpl.OpSendMsg retryOp = ProducerImpl.OpSendMsg.create(
124-
(ProducerMetrics) null,
128+
NOOP_PRODUCER_METRICS,
125129
retryMsg,
126130
Mockito.mock(ByteBufPair.class),
127131
2L,

0 commit comments

Comments
 (0)