Skip to content

Commit 38c3355

Browse files
author
hqbfzwang
committed
opti test and add javadoc
1 parent 46bd0c0 commit 38c3355

4 files changed

Lines changed: 22 additions & 4 deletions

File tree

pulsar-client/src/main/java/org/apache/pulsar/client/impl/metrics/ConsumerMetrics.java

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,11 @@
2020

2121
import io.opentelemetry.api.common.Attributes;
2222

23+
/**
24+
* Encapsulates OpenTelemetry metrics for a Pulsar consumer, tracking message receives,
25+
* prefetch queue size, acknowledgements, negative acknowledgements, dead-letter queue
26+
* messages, and consumer session open/close events.
27+
*/
2328
public class ConsumerMetrics {
2429

2530
private final Counter messagesReceivedCounter;

pulsar-client/src/main/java/org/apache/pulsar/client/impl/metrics/ProducerMetrics.java

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,11 @@
2020

2121
import io.opentelemetry.api.common.Attributes;
2222

23+
/**
24+
* Encapsulates OpenTelemetry metrics for a Pulsar producer, tracking message send latency,
25+
* RPC latency, published bytes, pending queue message count and size, and producer session
26+
* open/close events.
27+
*/
2328
public class ProducerMetrics {
2429

2530
private final LatencyHistogram sendLatencyHistogram;

pulsar-client/src/test/java/org/apache/pulsar/client/impl/OpSendMsgQueueTest.java

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,7 @@
2424
import com.google.common.collect.Lists;
2525
import java.util.Arrays;
2626
import java.util.Iterator;
27+
import org.apache.pulsar.client.impl.metrics.InstrumentProvider;
2728
import org.apache.pulsar.client.impl.metrics.ProducerMetrics;
2829
import org.testng.annotations.BeforeClass;
2930
import org.testng.annotations.Test;
@@ -32,6 +33,9 @@
3233
* Contains unit tests for ProducerImpl.OpSendMsgQueue inner class.
3334
*/
3435
public class OpSendMsgQueueTest {
36+
private static final ProducerMetrics NOOP_PRODUCER_METRICS =
37+
new ProducerMetrics(InstrumentProvider.NOOP, "test-topic");
38+
3539
MessageImpl<?> message;
3640

3741
@BeforeClass
@@ -41,7 +45,7 @@ public void createMockMessage() {
4145
}
4246

4347
private ProducerImpl.OpSendMsg createDummyOpSendMsg() {
44-
return ProducerImpl.OpSendMsg.create((ProducerMetrics) null, message, null, 0L, null);
48+
return ProducerImpl.OpSendMsg.create(NOOP_PRODUCER_METRICS, message, null, 0L, null);
4549
}
4650

4751
@Test

pulsar-client/src/test/java/org/apache/pulsar/client/impl/ProducerImplTest.java

Lines changed: 7 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -29,13 +29,17 @@
2929
import org.apache.commons.lang3.reflect.FieldUtils;
3030
import org.apache.pulsar.client.api.PulsarClientException;
3131
import org.apache.pulsar.client.api.Schema;
32+
import org.apache.pulsar.client.impl.metrics.InstrumentProvider;
3233
import org.apache.pulsar.client.impl.metrics.ProducerMetrics;
3334
import org.apache.pulsar.common.api.proto.MessageMetadata;
3435
import org.apache.pulsar.common.protocol.ByteBufPair;
3536
import org.mockito.Mockito;
3637
import org.testng.annotations.Test;
3738

3839
public class ProducerImplTest {
40+
private static final ProducerMetrics NOOP_PRODUCER_METRICS =
41+
new ProducerMetrics(InstrumentProvider.NOOP, "test-topic");
42+
3943
@Test
4044
public void testChunkedMessageCtxDeallocate() {
4145
int totalChunks = 3;
@@ -46,7 +50,7 @@ public void testChunkedMessageCtxDeallocate() {
4650
for (int i = 0; i < totalChunks; i++) {
4751
ProducerImpl.OpSendMsg opSendMsg =
4852
ProducerImpl.OpSendMsg.create(
49-
(ProducerMetrics) null,
53+
NOOP_PRODUCER_METRICS,
5054
MessageImpl.create(new MessageMetadata(), ByteBuffer.allocate(0), Schema.STRING, null),
5155
null, 0, null);
5256
opSendMsg.chunkedMessageCtx = ctx;
@@ -100,7 +104,7 @@ public void testFailPendingMessagesSyncRetry()
100104
MessageImpl<?> msg = Mockito.mock(MessageImpl.class);
101105
Mockito.when(msg.getUncompressedSize()).thenReturn(10);
102106
ProducerImpl.OpSendMsg op = ProducerImpl.OpSendMsg.create(
103-
(ProducerMetrics) null,
107+
NOOP_PRODUCER_METRICS,
104108
msg,
105109
Mockito.mock(ByteBufPair.class),
106110
1L,
@@ -118,7 +122,7 @@ public void testFailPendingMessagesSyncRetry()
118122
Mockito.doAnswer(invocation -> {
119123
// Reentrant retry during callback
120124
ProducerImpl.OpSendMsg retryOp = ProducerImpl.OpSendMsg.create(
121-
(ProducerMetrics) null,
125+
NOOP_PRODUCER_METRICS,
122126
retryMsg,
123127
Mockito.mock(ByteBufPair.class),
124128
2L,

0 commit comments

Comments
 (0)