Skip to content

Commit 7f6c206

Browse files
author
hqbfzwang
committed
opti method name and log print
1 parent 9d28083 commit 7f6c206

5 files changed

Lines changed: 29 additions & 26 deletions

File tree

pulsar-client/src/main/java/org/apache/pulsar/client/impl/BatchMessageContainerImpl.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -278,7 +278,7 @@ public OpSendMsg createOpSendMsg() throws IOException {
278278
// Because when invoke `ProducerImpl.processOpSendMsg` on flush,
279279
// if `op.msg != null && isBatchMessagingEnabled()` checks true, it will call `batchMessageAndSend` to flush
280280
// messageContainers before publishing this one-batch message.
281-
op = OpSendMsg.create(producer.rpcLatencyHistogram, messages, cmd, messageMetadata.getSequenceId(),
281+
op = OpSendMsg.create(producer.producerMetrics, messages, cmd, messageMetadata.getSequenceId(),
282282
firstCallback, batchAllocatedSizeBytes);
283283

284284
// NumMessagesInBatch and BatchSizeByte will not be serialized to the binary cmd. It's just useful for the
@@ -332,7 +332,7 @@ public OpSendMsg createOpSendMsg() throws IOException {
332332
messageMetadata.getUncompressedSize(), encryptedPayload.readableBytes());
333333
}
334334

335-
OpSendMsg op = OpSendMsg.create(producer.rpcLatencyHistogram, messages, cmd, messageMetadata.getSequenceId(),
335+
OpSendMsg op = OpSendMsg.create(producer.producerMetrics, messages, cmd, messageMetadata.getSequenceId(),
336336
messageMetadata.getHighestSequenceId(), firstCallback, batchAllocatedSizeBytes);
337337

338338
op.setNumMessagesInBatch(numMessagesInBatch);

pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -690,7 +690,7 @@ protected CompletableFuture<Void> doReconsumeLater(Message<?> message, AckType a
690690
copyMessageKeysIfNeeded(message, typedMessageBuilderNew);
691691
copyMessageEventTime(message, typedMessageBuilderNew);
692692
typedMessageBuilderNew.sendAsync().thenAccept(msgId -> {
693-
consumerMetrics.recordDlq();
693+
consumerMetrics.recordDlqMessageSent();
694694

695695
doAcknowledge(finalMessageId, ackType, Collections.emptyMap(), null).thenAccept(v -> {
696696
result.complete(null);

pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerImpl.java

Lines changed: 15 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -81,7 +81,6 @@
8181
import org.apache.pulsar.client.impl.conf.ProducerConfigurationData;
8282
import org.apache.pulsar.client.impl.crypto.MessageCryptoBc;
8383
import org.apache.pulsar.client.impl.metrics.InstrumentProvider;
84-
import org.apache.pulsar.client.impl.metrics.LatencyHistogram;
8584
import org.apache.pulsar.client.impl.metrics.ProducerMetrics;
8685
import org.apache.pulsar.client.impl.schema.JSONSchema;
8786
import org.apache.pulsar.client.impl.schema.SchemaUtils;
@@ -180,8 +179,7 @@ public class ProducerImpl<T> extends ProducerBase<T> implements TimerTask, Conne
180179

181180
private boolean errorState;
182181

183-
private final ProducerMetrics producerMetrics;
184-
final LatencyHistogram rpcLatencyHistogram;
182+
final ProducerMetrics producerMetrics;
185183
private final boolean pauseSendingToPreservePublishOrderOnSchemaRegFailure;
186184
// This variable can be exposed as a metrics in the future, a PIP is needed.
187185
private final AtomicInteger pendingQueueFullCounter;
@@ -286,7 +284,6 @@ public ProducerImpl(PulsarClientImpl client, String topic, ProducerConfiguration
286284

287285
InstrumentProvider ip = client.instrumentProvider();
288286
producerMetrics = new ProducerMetrics(ip, topic);
289-
rpcLatencyHistogram = producerMetrics.getRpcLatencyHistogram();
290287
pendingQueueFullCounter = new AtomicInteger();
291288

292289
this.connectionHandler = initConnectionHandler();
@@ -794,9 +791,11 @@ private void serializeAndSendMessage(MessageImpl<?> msg,
794791
if (msg.getSchemaState() == MessageImpl.SchemaState.Ready) {
795792
ByteBufPair cmd = sendMessage(producerId, sequenceId, numMessages, messageId, msgMetadata,
796793
encryptedPayload);
797-
op = OpSendMsg.create(rpcLatencyHistogram, msg, cmd, sequenceId, callback);
794+
op = OpSendMsg.create(producerMetrics, msg, cmd, sequenceId, callback);
795+
798796
} else {
799-
op = OpSendMsg.create(rpcLatencyHistogram, msg, null, sequenceId, callback);
797+
op = OpSendMsg.create(producerMetrics, msg, null, sequenceId, callback);
798+
800799
final MessageMetadata finalMsgMetadata = msgMetadata;
801800
op.rePopulate = () -> {
802801
if (msgMetadata.hasChunkId()) {
@@ -1527,7 +1526,7 @@ public ReferenceCounted touch(Object hint) {
15271526
}
15281527

15291528
protected static final class OpSendMsg {
1530-
LatencyHistogram rpcLatencyHistogram;
1529+
ProducerMetrics producerMetrics;
15311530
MessageImpl<?> msg;
15321531
List<MessageImpl<?>> msgs;
15331532
ByteBufPair cmd;
@@ -1547,7 +1546,7 @@ protected static final class OpSendMsg {
15471546
int chunkId = -1;
15481547

15491548
void initialize() {
1550-
rpcLatencyHistogram = null;
1549+
producerMetrics = null;
15511550
msg = null;
15521551
msgs = null;
15531552
cmd = null;
@@ -1567,11 +1566,11 @@ void initialize() {
15671566
chunkedMessageCtx = null;
15681567
}
15691568

1570-
static OpSendMsg create(LatencyHistogram rpcLatencyHistogram, MessageImpl<?> msg, ByteBufPair cmd,
1569+
static OpSendMsg create(ProducerMetrics producerMetrics, MessageImpl<?> msg, ByteBufPair cmd,
15711570
long sequenceId, SendCallback callback) {
15721571
OpSendMsg op = RECYCLER.get();
15731572
op.initialize();
1574-
op.rpcLatencyHistogram = rpcLatencyHistogram;
1573+
op.producerMetrics = producerMetrics;
15751574
op.msg = msg;
15761575
op.cmd = cmd;
15771576
op.callback = callback;
@@ -1581,11 +1580,11 @@ static OpSendMsg create(LatencyHistogram rpcLatencyHistogram, MessageImpl<?> msg
15811580
return op;
15821581
}
15831582

1584-
static OpSendMsg create(LatencyHistogram rpcLatencyHistogram, List<MessageImpl<?>> msgs, ByteBufPair cmd,
1583+
static OpSendMsg create(ProducerMetrics producerMetrics, List<MessageImpl<?>> msgs, ByteBufPair cmd,
15851584
long sequenceId, SendCallback callback, int batchAllocatedSize) {
15861585
OpSendMsg op = RECYCLER.get();
15871586
op.initialize();
1588-
op.rpcLatencyHistogram = rpcLatencyHistogram;
1587+
op.producerMetrics = producerMetrics;
15891588
op.msgs = msgs;
15901589
op.cmd = cmd;
15911590
op.callback = callback;
@@ -1599,12 +1598,12 @@ static OpSendMsg create(LatencyHistogram rpcLatencyHistogram, List<MessageImpl<?
15991598
return op;
16001599
}
16011600

1602-
static OpSendMsg create(LatencyHistogram rpcLatencyHistogram, List<MessageImpl<?>> msgs, ByteBufPair cmd,
1601+
static OpSendMsg create(ProducerMetrics producerMetrics, List<MessageImpl<?>> msgs, ByteBufPair cmd,
16031602
long lowestSequenceId,
16041603
long highestSequenceId, SendCallback callback, int batchAllocatedSize) {
16051604
OpSendMsg op = RECYCLER.get();
16061605
op.initialize();
1607-
op.rpcLatencyHistogram = rpcLatencyHistogram;
1606+
op.producerMetrics = producerMetrics;
16081607
op.msgs = msgs;
16091608
op.cmd = cmd;
16101609
op.callback = callback;
@@ -1656,9 +1655,9 @@ void sendComplete(final Exception e) {
16561655
}
16571656

16581657
if (e == null) {
1659-
rpcLatencyHistogram.recordSuccess(now - this.lastSentAt);
1658+
producerMetrics.recordRpcLatencySuccess(now - this.lastSentAt);
16601659
} else {
1661-
rpcLatencyHistogram.recordFailure(now - this.lastSentAt);
1660+
producerMetrics.recordRpcLatencyFailure(now - this.lastSentAt);
16621661
}
16631662

16641663
OpSendMsgStats opSendMsgStats = OpSendMsgStatsImpl.builder()

pulsar-client/src/main/java/org/apache/pulsar/client/impl/metrics/ConsumerMetrics.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -75,7 +75,7 @@ public void recordNack() {
7575
consumerNacksCounter.increment();
7676
}
7777

78-
public void recordDlq() {
78+
public void recordDlqMessageSent() {
7979
consumerDlqMessagesCounter.increment();
8080
}
8181

pulsar-client/src/main/java/org/apache/pulsar/client/impl/metrics/ProducerMetrics.java

Lines changed: 10 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -38,7 +38,7 @@ public ProducerMetrics(InstrumentProvider ip, String topic) {
3838

3939
rpcLatencyHistogram = ip.newLatencyHistogram(
4040
"pulsar.client.producer.rpc.send.duration",
41-
"Publish RPC latency experienced internally by the client when sending data to receiving an ack",
41+
"Publish RPC latency experienced internally by the client when sending data and receiving an ack",
4242
topic, Attributes.empty());
4343

4444
publishedBytesCounter = ip.newCounter(
@@ -53,7 +53,7 @@ public ProducerMetrics(InstrumentProvider ip, String topic) {
5353

5454
pendingBytesUpDownCounter = ip.newUpDownCounter(
5555
"pulsar.client.producer.message.pending.size", Unit.Bytes,
56-
"The size of the messages in the producer internal queue, waiting to sent",
56+
"The size of the messages in the producer internal queue, waiting to be sent",
5757
topic, Attributes.empty());
5858

5959
producersOpenedCounter = ip.newCounter(
@@ -85,15 +85,19 @@ public void recordSendFailed(long latencyNanos, int msgSize) {
8585
sendLatencyHistogram.recordFailure(latencyNanos);
8686
}
8787

88+
public void recordRpcLatencySuccess(long latencyNanos) {
89+
rpcLatencyHistogram.recordSuccess(latencyNanos);
90+
}
91+
92+
public void recordRpcLatencyFailure(long latencyNanos) {
93+
rpcLatencyHistogram.recordFailure(latencyNanos);
94+
}
95+
8896
public void recordProducerOpened() {
8997
producersOpenedCounter.increment();
9098
}
9199

92100
public void recordProducerClosed() {
93101
producersClosedCounter.increment();
94102
}
95-
96-
public LatencyHistogram getRpcLatencyHistogram() {
97-
return rpcLatencyHistogram;
98-
}
99103
}

0 commit comments

Comments
 (0)