Skip to content

Commit f53a12c

Browse files
author
hqbfzwang
committed
[improve][client] Extract OTel metrics into ProducerMetrics/ConsumerMetrics classes
1 parent 8798a46 commit f53a12c

6 files changed

Lines changed: 523 additions & 85 deletions

File tree

pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java

Lines changed: 13 additions & 48 deletions
Original file line numberDiff line numberDiff line change
@@ -34,7 +34,7 @@
3434
import io.netty.util.ReferenceCountUtil;
3535
import io.netty.util.Timeout;
3636
import io.netty.util.concurrent.FastThreadLocal;
37-
import io.opentelemetry.api.common.Attributes;
37+
3838
import java.io.IOException;
3939
import java.net.URI;
4040
import java.net.URISyntaxException;
@@ -98,10 +98,8 @@
9898
import org.apache.pulsar.client.api.transaction.TxnID;
9999
import org.apache.pulsar.client.impl.conf.ConsumerConfigurationData;
100100
import org.apache.pulsar.client.impl.crypto.MessageCryptoBc;
101-
import org.apache.pulsar.client.impl.metrics.Counter;
101+
import org.apache.pulsar.client.impl.metrics.ConsumerMetrics;
102102
import org.apache.pulsar.client.impl.metrics.InstrumentProvider;
103-
import org.apache.pulsar.client.impl.metrics.Unit;
104-
import org.apache.pulsar.client.impl.metrics.UpDownCounter;
105103
import org.apache.pulsar.client.impl.schema.AutoConsumeSchema;
106104
import org.apache.pulsar.client.impl.schema.AutoProduceBytesSchema;
107105
import org.apache.pulsar.client.impl.transaction.TransactionImpl;
@@ -228,16 +226,7 @@ public class ConsumerImpl<T> extends ConsumerBase<T> implements ConnectionHandle
228226
private final boolean createTopicIfDoesNotExist;
229227
private final boolean poolMessages;
230228

231-
private final Counter messagesReceivedCounter;
232-
private final Counter bytesReceivedCounter;
233-
private final UpDownCounter messagesPrefetchedGauge;
234-
private final UpDownCounter bytesPrefetchedGauge;
235-
private final Counter consumersOpenedCounter;
236-
private final Counter consumersClosedCounter;
237-
private final Counter consumerAcksCounter;
238-
private final Counter consumerNacksCounter;
239-
240-
private final Counter consumerDlqMessagesCounter;
229+
private final ConsumerMetrics consumerMetrics;
241230

242231
private final AtomicReference<ClientCnx> clientCnxUsedForConsumerRegistration = new AtomicReference<>();
243232
private final AtomicInteger previousExceptionCount = new AtomicInteger();
@@ -421,29 +410,10 @@ protected ConsumerImpl(PulsarClientImpl client, String topic, ConsumerConfigurat
421410
topicNameWithoutPartition = topicName.getPartitionedTopicName();
422411

423412
InstrumentProvider ip = client.instrumentProvider();
424-
Attributes attrs = Attributes.builder().put("pulsar.subscription", subscription).build();
425-
consumersOpenedCounter = ip.newCounter("pulsar.client.consumer.opened", Unit.Sessions,
426-
"The number of consumer sessions opened", topic, attrs);
427-
consumersClosedCounter = ip.newCounter("pulsar.client.consumer.closed", Unit.Sessions,
428-
"The number of consumer sessions closed", topic, attrs);
429-
messagesReceivedCounter = ip.newCounter("pulsar.client.consumer.message.received.count", Unit.Messages,
430-
"The number of messages explicitly received by the consumer application", topic, attrs);
431-
bytesReceivedCounter = ip.newCounter("pulsar.client.consumer.message.received.size", Unit.Bytes,
432-
"The number of bytes explicitly received by the consumer application", topic, attrs);
433-
messagesPrefetchedGauge = ip.newUpDownCounter("pulsar.client.consumer.receive_queue.count", Unit.Messages,
434-
"The number of messages currently sitting in the consumer receive queue", topic, attrs);
435-
bytesPrefetchedGauge = ip.newUpDownCounter("pulsar.client.consumer.receive_queue.size", Unit.Bytes,
436-
"The total size in bytes of messages currently sitting in the consumer receive queue", topic, attrs);
437-
438-
consumerAcksCounter = ip.newCounter("pulsar.client.consumer.message.ack", Unit.Messages,
439-
"The number of acknowledged messages", topic, attrs);
440-
consumerNacksCounter = ip.newCounter("pulsar.client.consumer.message.nack", Unit.Messages,
441-
"The number of negatively acknowledged messages", topic, attrs);
442-
consumerDlqMessagesCounter = ip.newCounter("pulsar.client.consumer.message.dlq", Unit.Messages,
443-
"The number of messages sent to DLQ", topic, attrs);
413+
consumerMetrics = new ConsumerMetrics(ip, topic, subscription);
444414
grabCnx();
445415

446-
consumersOpenedCounter.increment();
416+
consumerMetrics.recordConsumerOpened();
447417
}
448418

449419
public ConnectionHandler getConnectionHandler() {
@@ -607,7 +577,7 @@ protected CompletableFuture<Messages<T>> internalBatchReceiveAsync() {
607577
protected CompletableFuture<Void> doAcknowledge(MessageId messageId, AckType ackType,
608578
Map<String, Long> properties,
609579
TransactionImpl txn) {
610-
consumerAcksCounter.increment();
580+
consumerMetrics.recordAck();
611581

612582
if (getState() != State.Ready && getState() != State.Connecting) {
613583
stats.incrementNumAcksFailed();
@@ -630,7 +600,7 @@ protected CompletableFuture<Void> doAcknowledge(MessageId messageId, AckType ack
630600
@Override
631601
protected CompletableFuture<Void> doAcknowledge(List<MessageId> messageIdList, AckType ackType,
632602
Map<String, Long> properties, TransactionImpl txn) {
633-
consumerAcksCounter.increment();
603+
consumerMetrics.recordAck();
634604

635605
if (getState() != State.Ready && getState() != State.Connecting) {
636606
stats.incrementNumAcksFailed();
@@ -721,7 +691,7 @@ protected CompletableFuture<Void> doReconsumeLater(Message<?> message, AckType a
721691
copyMessageKeysIfNeeded(message, typedMessageBuilderNew);
722692
copyMessageEventTime(message, typedMessageBuilderNew);
723693
typedMessageBuilderNew.sendAsync().thenAccept(msgId -> {
724-
consumerDlqMessagesCounter.increment();
694+
consumerMetrics.recordDlq();
725695

726696
doAcknowledge(finalMessageId, ackType, Collections.emptyMap(), null).thenAccept(v -> {
727697
result.complete(null);
@@ -835,7 +805,7 @@ private static void copyMessageEventTime(Message<?> message,
835805

836806
@Override
837807
public void negativeAcknowledge(MessageId messageId) {
838-
consumerNacksCounter.increment();
808+
consumerMetrics.recordNack();
839809
negativeAcksTracker.add(messageId);
840810

841811
// Ensure the message is not redelivered for ack-timeout, since we did receive an "ack"
@@ -844,7 +814,7 @@ public void negativeAcknowledge(MessageId messageId) {
844814

845815
@Override
846816
public void negativeAcknowledge(Message<?> message) {
847-
consumerNacksCounter.increment();
817+
consumerMetrics.recordNack();
848818
negativeAcksTracker.add(message);
849819

850820
// Ensure the message is not redelivered for ack-timeout, since we did receive an "ack"
@@ -1185,7 +1155,7 @@ public synchronized CompletableFuture<Void> closeAsync() {
11851155
return compositeCloseFuture;
11861156
}
11871157

1188-
consumersClosedCounter.increment();
1158+
consumerMetrics.recordConsumerClosed();
11891159

11901160
if (!isConnected()) {
11911161
log.info("[{}] [{}] Closed Consumer (not connected)", topic, subscription);
@@ -1369,8 +1339,7 @@ protected <V> MessageImpl<V> newMessage(final MessageIdImpl messageId,
13691339
}
13701340

13711341
private void executeNotifyCallback(final MessageImpl<T> message) {
1372-
messagesPrefetchedGauge.increment();
1373-
bytesPrefetchedGauge.add(message.size());
1342+
consumerMetrics.recordMessagePrefetched(message.size());
13741343

13751344
// Enqueue the message so that it can be retrieved when application calls receive()
13761345
// if the conf.getReceiverQueueSize() is 0 then discard message if no one is waiting for it.
@@ -1869,11 +1838,7 @@ protected synchronized void messageProcessed(Message<?> msg) {
18691838
ClientCnx msgCnx = ((MessageImpl<?>) msg).getCnx();
18701839
lastDequeuedMessageId = msg.getMessageId();
18711840

1872-
messagesPrefetchedGauge.decrement();
1873-
messagesReceivedCounter.increment();
1874-
1875-
bytesPrefetchedGauge.subtract(msg.size());
1876-
bytesReceivedCounter.add(msg.size());
1841+
consumerMetrics.recordMessageReceived(msg.size());
18771842

18781843
if (msgCnx != currentCnx) {
18791844
// The processed message did belong to the old queue that was cleared after reconnection.

pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerImpl.java

Lines changed: 10 additions & 37 deletions
Original file line numberDiff line numberDiff line change
@@ -81,11 +81,9 @@
8181
import org.apache.pulsar.client.api.transaction.Transaction;
8282
import org.apache.pulsar.client.impl.conf.ProducerConfigurationData;
8383
import org.apache.pulsar.client.impl.crypto.MessageCryptoBc;
84-
import org.apache.pulsar.client.impl.metrics.Counter;
8584
import org.apache.pulsar.client.impl.metrics.InstrumentProvider;
8685
import org.apache.pulsar.client.impl.metrics.LatencyHistogram;
87-
import org.apache.pulsar.client.impl.metrics.Unit;
88-
import org.apache.pulsar.client.impl.metrics.UpDownCounter;
86+
import org.apache.pulsar.client.impl.metrics.ProducerMetrics;
8987
import org.apache.pulsar.client.impl.schema.JSONSchema;
9088
import org.apache.pulsar.client.impl.schema.SchemaUtils;
9189
import org.apache.pulsar.client.impl.transaction.TransactionImpl;
@@ -183,14 +181,9 @@ public class ProducerImpl<T> extends ProducerBase<T> implements TimerTask, Conne
183181

184182
private boolean errorState;
185183

186-
private final LatencyHistogram latencyHistogram;
184+
private final ProducerMetrics producerMetrics;
185+
// rpcLatencyHistogram 需要传递给 OpSendMsg,保留包级别访问
187186
final LatencyHistogram rpcLatencyHistogram;
188-
private final Counter publishedBytesCounter;
189-
private final UpDownCounter pendingMessagesUpDownCounter;
190-
private final UpDownCounter pendingBytesUpDownCounter;
191-
192-
private final Counter producersOpenedCounter;
193-
private final Counter producersClosedCounter;
194187
private final boolean pauseSendingToPreservePublishOrderOnSchemaRegFailure;
195188
// This variable can be exposed as a metrics in the future, a PIP is needed.
196189
private final AtomicInteger pendingQueueFullCounter;
@@ -294,30 +287,14 @@ public ProducerImpl(PulsarClientImpl client, String topic, ProducerConfiguration
294287
}
295288

296289
InstrumentProvider ip = client.instrumentProvider();
297-
latencyHistogram = ip.newLatencyHistogram("pulsar.client.producer.message.send.duration",
298-
"Publish latency experienced by the application, includes client batching time", topic,
299-
Attributes.empty());
300-
rpcLatencyHistogram = ip.newLatencyHistogram("pulsar.client.producer.rpc.send.duration",
301-
"Publish RPC latency experienced internally by the client when sending data to receiving an ack", topic,
302-
Attributes.empty());
303-
publishedBytesCounter = ip.newCounter("pulsar.client.producer.message.send.size",
304-
Unit.Bytes, "The number of bytes published", topic, Attributes.empty());
305-
pendingMessagesUpDownCounter =
306-
ip.newUpDownCounter("pulsar.client.producer.message.pending.count", Unit.Messages,
307-
"The number of messages in the producer internal send queue, waiting to be sent", topic,
308-
Attributes.empty());
309-
pendingBytesUpDownCounter = ip.newUpDownCounter("pulsar.client.producer.message.pending.size", Unit.Bytes,
310-
"The size of the messages in the producer internal queue, waiting to sent", topic, Attributes.empty());
311-
producersOpenedCounter = ip.newCounter("pulsar.client.producer.opened", Unit.Sessions,
312-
"The number of producer sessions opened", topic, Attributes.empty());
313-
producersClosedCounter = ip.newCounter("pulsar.client.producer.closed", Unit.Sessions,
314-
"The number of producer sessions closed", topic, Attributes.empty());
290+
producerMetrics = new ProducerMetrics(ip, topic);
291+
rpcLatencyHistogram = producerMetrics.rpcLatencyHistogram;
315292
pendingQueueFullCounter = new AtomicInteger();
316293

317294
this.connectionHandler = initConnectionHandler();
318295
setChunkMaxMessageSize();
319296
grabCnx();
320-
producersOpenedCounter.increment();
297+
producerMetrics.recordProducerOpened();
321298
}
322299

323300
@VisibleForTesting
@@ -398,8 +375,7 @@ CompletableFuture<MessageId> internalSendAsync(Message<?> message) {
398375
}
399376

400377
int msgSize = interceptorMessage.getDataBuffer().readableBytes();
401-
pendingMessagesUpDownCounter.increment();
402-
pendingBytesUpDownCounter.add(msgSize);
378+
producerMetrics.recordPendingMessage(msgSize);
403379

404380
sendAsync(interceptorMessage, new DefaultSendMessageCallback(future, interceptorMessage, msgSize));
405381
return future;
@@ -457,22 +433,19 @@ private void onSendComplete(Throwable e, SendCallback sendCallback, MessageImpl<
457433
long createdAt = (sendCallback instanceof ProducerImpl.DefaultSendMessageCallback)
458434
? ((DefaultSendMessageCallback) sendCallback).createdAt : this.createdAt;
459435
long latencyNanos = System.nanoTime() - createdAt;
460-
pendingMessagesUpDownCounter.decrement();
461-
pendingBytesUpDownCounter.subtract(msgSize);
462436
ByteBuf payload = msg.getDataBuffer();
463437
if (payload == null) {
464438
log.error("[{}] [{}] Payload is null when calling onSendComplete, which is not expected.",
465439
topic, producerName);
466440
}
467441
try {
468442
if (e != null) {
469-
latencyHistogram.recordFailure(latencyNanos);
443+
producerMetrics.recordSendFailed(latencyNanos, msgSize);
470444
stats.incrementSendFailed();
471445
onSendAcknowledgement(msg, null, e);
472446
sendCallback.getFuture().completeExceptionally(e);
473447
} else {
474-
latencyHistogram.recordSuccess(latencyNanos);
475-
publishedBytesCounter.add(msgSize);
448+
producerMetrics.recordSendSuccess(latencyNanos, msgSize);
476449
stats.incrementNumAcksReceived(latencyNanos);
477450
onSendAcknowledgement(msg, msg.getMessageId(), null);
478451
sendCallback.getFuture().complete(msg.getMessageId());
@@ -1219,7 +1192,7 @@ public synchronized CompletableFuture<Void> closeAsync() {
12191192
return CompletableFuture.completedFuture(null);
12201193
}
12211194

1222-
producersClosedCounter.increment();
1195+
producerMetrics.recordProducerClosed();
12231196
closeProducerTasks();
12241197

12251198
ClientCnx cnx = cnx();
Lines changed: 89 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,89 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing,
13+
* software distributed under the License is distributed on an
14+
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15+
* KIND, either express or implied. See the License for the
16+
* specific language governing permissions and limitations
17+
* under the License.
18+
*/
19+
package org.apache.pulsar.client.impl.metrics;
20+
21+
import io.opentelemetry.api.common.Attributes;
22+
23+
public class ConsumerMetrics {
24+
25+
private final Counter messagesReceivedCounter;
26+
private final Counter bytesReceivedCounter;
27+
private final UpDownCounter messagesPrefetchedGauge;
28+
private final UpDownCounter bytesPrefetchedGauge;
29+
private final Counter consumersOpenedCounter;
30+
private final Counter consumersClosedCounter;
31+
private final Counter consumerAcksCounter;
32+
private final Counter consumerNacksCounter;
33+
private final Counter consumerDlqMessagesCounter;
34+
35+
public ConsumerMetrics(InstrumentProvider ip, String topic, String subscription) {
36+
Attributes attrs = Attributes.builder().put("pulsar.subscription", subscription).build();
37+
38+
consumersOpenedCounter = ip.newCounter("pulsar.client.consumer.opened", Unit.Sessions,
39+
"The number of consumer sessions opened", topic, attrs);
40+
consumersClosedCounter = ip.newCounter("pulsar.client.consumer.closed", Unit.Sessions,
41+
"The number of consumer sessions closed", topic, attrs);
42+
messagesReceivedCounter = ip.newCounter("pulsar.client.consumer.message.received.count", Unit.Messages,
43+
"The number of messages explicitly received by the consumer application", topic, attrs);
44+
bytesReceivedCounter = ip.newCounter("pulsar.client.consumer.message.received.size", Unit.Bytes,
45+
"The number of bytes explicitly received by the consumer application", topic, attrs);
46+
messagesPrefetchedGauge = ip.newUpDownCounter("pulsar.client.consumer.receive_queue.count", Unit.Messages,
47+
"The number of messages currently sitting in the consumer receive queue", topic, attrs);
48+
bytesPrefetchedGauge = ip.newUpDownCounter("pulsar.client.consumer.receive_queue.size", Unit.Bytes,
49+
"The total size in bytes of messages currently sitting in the consumer receive queue", topic, attrs);
50+
consumerAcksCounter = ip.newCounter("pulsar.client.consumer.message.ack", Unit.Messages,
51+
"The number of acknowledged messages", topic, attrs);
52+
consumerNacksCounter = ip.newCounter("pulsar.client.consumer.message.nack", Unit.Messages,
53+
"The number of negatively acknowledged messages", topic, attrs);
54+
consumerDlqMessagesCounter = ip.newCounter("pulsar.client.consumer.message.dlq", Unit.Messages,
55+
"The number of messages sent to DLQ", topic, attrs);
56+
}
57+
58+
public void recordMessagePrefetched(int msgSize) {
59+
messagesPrefetchedGauge.increment();
60+
bytesPrefetchedGauge.add(msgSize);
61+
}
62+
63+
public void recordMessageReceived(int msgSize) {
64+
messagesPrefetchedGauge.decrement();
65+
bytesPrefetchedGauge.subtract(msgSize);
66+
messagesReceivedCounter.increment();
67+
bytesReceivedCounter.add(msgSize);
68+
}
69+
70+
public void recordAck() {
71+
consumerAcksCounter.increment();
72+
}
73+
74+
public void recordNack() {
75+
consumerNacksCounter.increment();
76+
}
77+
78+
public void recordDlq() {
79+
consumerDlqMessagesCounter.increment();
80+
}
81+
82+
public void recordConsumerOpened() {
83+
consumersOpenedCounter.increment();
84+
}
85+
86+
public void recordConsumerClosed() {
87+
consumersClosedCounter.increment();
88+
}
89+
}

0 commit comments

Comments
 (0)