Skip to content

Commit 893ee57

Browse files
author
hqbfzwang
committed
opti method name and log print
1 parent d2f115c commit 893ee57

5 files changed

Lines changed: 29 additions & 26 deletions

File tree

pulsar-client/src/main/java/org/apache/pulsar/client/impl/BatchMessageContainerImpl.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -278,7 +278,7 @@ public OpSendMsg createOpSendMsg() throws IOException {
278278
// Because when invoke `ProducerImpl.processOpSendMsg` on flush,
279279
// if `op.msg != null && isBatchMessagingEnabled()` checks true, it will call `batchMessageAndSend` to flush
280280
// messageContainers before publishing this one-batch message.
281-
op = OpSendMsg.create(producer.rpcLatencyHistogram, messages, cmd, messageMetadata.getSequenceId(),
281+
op = OpSendMsg.create(producer.producerMetrics, messages, cmd, messageMetadata.getSequenceId(),
282282
firstCallback, batchAllocatedSizeBytes);
283283

284284
// NumMessagesInBatch and BatchSizeByte will not be serialized to the binary cmd. It's just useful for the
@@ -332,7 +332,7 @@ public OpSendMsg createOpSendMsg() throws IOException {
332332
messageMetadata.getUncompressedSize(), encryptedPayload.readableBytes());
333333
}
334334

335-
OpSendMsg op = OpSendMsg.create(producer.rpcLatencyHistogram, messages, cmd, messageMetadata.getSequenceId(),
335+
OpSendMsg op = OpSendMsg.create(producer.producerMetrics, messages, cmd, messageMetadata.getSequenceId(),
336336
messageMetadata.getHighestSequenceId(), firstCallback, batchAllocatedSizeBytes);
337337

338338
op.setNumMessagesInBatch(numMessagesInBatch);

pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -690,7 +690,7 @@ protected CompletableFuture<Void> doReconsumeLater(Message<?> message, AckType a
690690
copyMessageKeysIfNeeded(message, typedMessageBuilderNew);
691691
copyMessageEventTime(message, typedMessageBuilderNew);
692692
typedMessageBuilderNew.sendAsync().thenAccept(msgId -> {
693-
consumerMetrics.recordDlq();
693+
consumerMetrics.recordDlqMessageSent();
694694

695695
doAcknowledge(finalMessageId, ackType, Collections.emptyMap(), null).thenAccept(v -> {
696696
result.complete(null);

pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerImpl.java

Lines changed: 15 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -81,7 +81,6 @@
8181
import org.apache.pulsar.client.impl.conf.ProducerConfigurationData;
8282
import org.apache.pulsar.client.impl.crypto.MessageCryptoBc;
8383
import org.apache.pulsar.client.impl.metrics.InstrumentProvider;
84-
import org.apache.pulsar.client.impl.metrics.LatencyHistogram;
8584
import org.apache.pulsar.client.impl.metrics.ProducerMetrics;
8685
import org.apache.pulsar.client.impl.schema.JSONSchema;
8786
import org.apache.pulsar.client.impl.schema.SchemaUtils;
@@ -182,8 +181,7 @@ public class ProducerImpl<T> extends ProducerBase<T> implements TimerTask, Conne
182181

183182
private boolean errorState;
184183

185-
private final ProducerMetrics producerMetrics;
186-
final LatencyHistogram rpcLatencyHistogram;
184+
final ProducerMetrics producerMetrics;
187185
private final boolean pauseSendingToPreservePublishOrderOnSchemaRegFailure;
188186
// This variable can be exposed as a metrics in the future, a PIP is needed.
189187
private final AtomicInteger pendingQueueFullCounter;
@@ -288,7 +286,6 @@ public ProducerImpl(PulsarClientImpl client, String topic, ProducerConfiguration
288286

289287
InstrumentProvider ip = client.instrumentProvider();
290288
producerMetrics = new ProducerMetrics(ip, topic);
291-
rpcLatencyHistogram = producerMetrics.getRpcLatencyHistogram();
292289
pendingQueueFullCounter = new AtomicInteger();
293290

294291
this.connectionHandler = initConnectionHandler();
@@ -797,9 +794,11 @@ private void serializeAndSendMessage(MessageImpl<?> msg,
797794
if (msg.getSchemaState() == MessageImpl.SchemaState.Ready) {
798795
ByteBufPair cmd = sendMessage(producerId, sequenceId, numMessages, messageId, msgMetadata,
799796
encryptedPayload);
800-
op = OpSendMsg.create(rpcLatencyHistogram, msg, cmd, sequenceId, callback);
797+
op = OpSendMsg.create(producerMetrics, msg, cmd, sequenceId, callback);
798+
801799
} else {
802-
op = OpSendMsg.create(rpcLatencyHistogram, msg, null, sequenceId, callback);
800+
op = OpSendMsg.create(producerMetrics, msg, null, sequenceId, callback);
801+
803802
final MessageMetadata finalMsgMetadata = msgMetadata;
804803
op.rePopulate = () -> {
805804
if (msgMetadata.hasChunkId()) {
@@ -1549,7 +1548,7 @@ public ReferenceCounted touch(Object hint) {
15491548
}
15501549

15511550
protected static final class OpSendMsg {
1552-
LatencyHistogram rpcLatencyHistogram;
1551+
ProducerMetrics producerMetrics;
15531552
MessageImpl<?> msg;
15541553
List<MessageImpl<?>> msgs;
15551554
ByteBufPair cmd;
@@ -1569,7 +1568,7 @@ protected static final class OpSendMsg {
15691568
int chunkId = -1;
15701569

15711570
void initialize() {
1572-
rpcLatencyHistogram = null;
1571+
producerMetrics = null;
15731572
msg = null;
15741573
msgs = null;
15751574
cmd = null;
@@ -1589,11 +1588,11 @@ void initialize() {
15891588
chunkedMessageCtx = null;
15901589
}
15911590

1592-
static OpSendMsg create(LatencyHistogram rpcLatencyHistogram, MessageImpl<?> msg, ByteBufPair cmd,
1591+
static OpSendMsg create(ProducerMetrics producerMetrics, MessageImpl<?> msg, ByteBufPair cmd,
15931592
long sequenceId, SendCallback callback) {
15941593
OpSendMsg op = RECYCLER.get();
15951594
op.initialize();
1596-
op.rpcLatencyHistogram = rpcLatencyHistogram;
1595+
op.producerMetrics = producerMetrics;
15971596
op.msg = msg;
15981597
op.cmd = cmd;
15991598
op.callback = callback;
@@ -1603,11 +1602,11 @@ static OpSendMsg create(LatencyHistogram rpcLatencyHistogram, MessageImpl<?> msg
16031602
return op;
16041603
}
16051604

1606-
static OpSendMsg create(LatencyHistogram rpcLatencyHistogram, List<MessageImpl<?>> msgs, ByteBufPair cmd,
1605+
static OpSendMsg create(ProducerMetrics producerMetrics, List<MessageImpl<?>> msgs, ByteBufPair cmd,
16071606
long sequenceId, SendCallback callback, int batchAllocatedSize) {
16081607
OpSendMsg op = RECYCLER.get();
16091608
op.initialize();
1610-
op.rpcLatencyHistogram = rpcLatencyHistogram;
1609+
op.producerMetrics = producerMetrics;
16111610
op.msgs = msgs;
16121611
op.cmd = cmd;
16131612
op.callback = callback;
@@ -1621,12 +1620,12 @@ static OpSendMsg create(LatencyHistogram rpcLatencyHistogram, List<MessageImpl<?
16211620
return op;
16221621
}
16231622

1624-
static OpSendMsg create(LatencyHistogram rpcLatencyHistogram, List<MessageImpl<?>> msgs, ByteBufPair cmd,
1623+
static OpSendMsg create(ProducerMetrics producerMetrics, List<MessageImpl<?>> msgs, ByteBufPair cmd,
16251624
long lowestSequenceId,
16261625
long highestSequenceId, SendCallback callback, int batchAllocatedSize) {
16271626
OpSendMsg op = RECYCLER.get();
16281627
op.initialize();
1629-
op.rpcLatencyHistogram = rpcLatencyHistogram;
1628+
op.producerMetrics = producerMetrics;
16301629
op.msgs = msgs;
16311630
op.cmd = cmd;
16321631
op.callback = callback;
@@ -1678,9 +1677,9 @@ void sendComplete(final Exception e) {
16781677
}
16791678

16801679
if (e == null) {
1681-
rpcLatencyHistogram.recordSuccess(now - this.lastSentAt);
1680+
producerMetrics.recordRpcLatencySuccess(now - this.lastSentAt);
16821681
} else {
1683-
rpcLatencyHistogram.recordFailure(now - this.lastSentAt);
1682+
producerMetrics.recordRpcLatencyFailure(now - this.lastSentAt);
16841683
}
16851684

16861685
OpSendMsgStats opSendMsgStats = OpSendMsgStatsImpl.builder()

pulsar-client/src/main/java/org/apache/pulsar/client/impl/metrics/ConsumerMetrics.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -75,7 +75,7 @@ public void recordNack() {
7575
consumerNacksCounter.increment();
7676
}
7777

78-
public void recordDlq() {
78+
public void recordDlqMessageSent() {
7979
consumerDlqMessagesCounter.increment();
8080
}
8181

pulsar-client/src/main/java/org/apache/pulsar/client/impl/metrics/ProducerMetrics.java

Lines changed: 10 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -38,7 +38,7 @@ public ProducerMetrics(InstrumentProvider ip, String topic) {
3838

3939
rpcLatencyHistogram = ip.newLatencyHistogram(
4040
"pulsar.client.producer.rpc.send.duration",
41-
"Publish RPC latency experienced internally by the client when sending data to receiving an ack",
41+
"Publish RPC latency experienced internally by the client when sending data and receiving an ack",
4242
topic, Attributes.empty());
4343

4444
publishedBytesCounter = ip.newCounter(
@@ -53,7 +53,7 @@ public ProducerMetrics(InstrumentProvider ip, String topic) {
5353

5454
pendingBytesUpDownCounter = ip.newUpDownCounter(
5555
"pulsar.client.producer.message.pending.size", Unit.Bytes,
56-
"The size of the messages in the producer internal queue, waiting to sent",
56+
"The size of the messages in the producer internal queue, waiting to be sent",
5757
topic, Attributes.empty());
5858

5959
producersOpenedCounter = ip.newCounter(
@@ -85,15 +85,19 @@ public void recordSendFailed(long latencyNanos, int msgSize) {
8585
sendLatencyHistogram.recordFailure(latencyNanos);
8686
}
8787

88+
public void recordRpcLatencySuccess(long latencyNanos) {
89+
rpcLatencyHistogram.recordSuccess(latencyNanos);
90+
}
91+
92+
public void recordRpcLatencyFailure(long latencyNanos) {
93+
rpcLatencyHistogram.recordFailure(latencyNanos);
94+
}
95+
8896
public void recordProducerOpened() {
8997
producersOpenedCounter.increment();
9098
}
9199

92100
public void recordProducerClosed() {
93101
producersClosedCounter.increment();
94102
}
95-
96-
public LatencyHistogram getRpcLatencyHistogram() {
97-
return rpcLatencyHistogram;
98-
}
99103
}

0 commit comments

Comments
 (0)