Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
124 commits
Select commit Hold shift + click to select a range
7d2c4a3
docs: add comments to class GrpcMessagingApplication
winglechen Oct 14, 2025
2da77dc
Merge branch 'apache:develop' into comment
winglechen Oct 20, 2025
906a88b
Merge branch 'apache:develop' into comment
winglechen Oct 27, 2025
11597bb
Merge branch 'apache:develop' into comment
winglechen Nov 5, 2025
034db71
Merge branch 'apache:develop' into comment
winglechen Nov 12, 2025
512bc80
comment: add comments to ProxyStartup
winglechen Nov 14, 2025
a0cac85
comment: add comments to remoting related thread pool config of Proxy…
winglechen Nov 16, 2025
1ecc3a0
Merge branch 'apache:develop' into comment
winglechen Nov 16, 2025
156ec22
comment: add comments to method sendMessage of SendMessageActivity of…
winglechen Nov 16, 2025
7517165
comment: add comments to method sendMessage of ProducerProcessor
winglechen Nov 16, 2025
d567caf
comment: add comments to method sendMessage of MessageService
winglechen Nov 16, 2025
2098c73
Merge branch 'apache:develop' into comment
winglechen Nov 21, 2025
89243b1
Merge branch 'apache:develop' into comment
winglechen Nov 25, 2025
b4311c4
Merge branch 'apache:develop' into comment
winglechen Dec 3, 2025
f700590
Merge branch 'apache:develop' into comment
winglechen Dec 11, 2025
d758885
Merge branch 'apache:develop' into comment
winglechen Dec 17, 2025
076b6de
Merge branch 'apache:develop' into comment
winglechen Feb 2, 2026
c5cec8d
Merge branch 'apache:develop' into comment
winglechen Feb 7, 2026
6babbfa
Merge branch 'apache:develop' into comment
winglechen Feb 28, 2026
d96f2fa
Merge branch 'apache:develop' into comment
winglechen Apr 1, 2026
dd9c8f8
Merge branch 'apache:develop' into comment
winglechen May 4, 2026
30aa8b5
comment: add comments of ack mode to BrokerConfig.popConsumerKVServic…
winglechen May 6, 2026
9f55d70
Merge branch 'apache:develop' into comment
winglechen May 11, 2026
a75c7ba
comment: add class comments to SelectMappedBufferResult
winglechen May 14, 2026
eadaec6
Merge branch 'comment' of github.com:wolforest/rocketmq-comment into …
winglechen May 14, 2026
477cec2
comment: add class comments to GetMessageResult
winglechen May 14, 2026
3e81bb1
comment: add comments to method getMessage
winglechen May 16, 2026
c869be8
comment: add comments to method receiveMessage of ReceiveMessageActivity
winglechen May 16, 2026
ae075b3
comment: add comments to the fields startOffsetInfo, msgOffsetInfo, a…
winglechen May 17, 2026
031348e
comment: add comments of ack mode to method processRequest of PopMess…
winglechen May 17, 2026
7e490e9
comment: add process flow comments to method processRequest of PopMes…
winglechen May 18, 2026
f491213
comment: add class comments to PopLongPollingService
winglechen May 18, 2026
c8ddf65
Merge branch 'apache:develop' into comment
winglechen May 18, 2026
80f3e62
comment: add version related comments to method processRequest of Pop…
winglechen May 18, 2026
92bffea
comment: add method comments to method popMsgFromTopic of PopMessageP…
winglechen May 18, 2026
c43e83e
comment: add process flow comments to method popMsgFromQueue of PopMe…
winglechen May 18, 2026
af61749
comment: add method comments to method popMsgFromQueue of PopMessageP…
winglechen May 18, 2026
314879f
comment: add comments to method getPopOffset of PopMessageProcessor
winglechen May 18, 2026
174e0ff
comment: add comments to method getInitOffset of PopMessageProcessor
winglechen May 18, 2026
c8cb465
comment: add process comments to method getInitOffset of PopMessagePr…
winglechen May 18, 2026
0cab192
comment: add class comments to PopBufferMergeService
winglechen May 19, 2026
30d5e43
Merge branch 'apache:develop' into comment
winglechen May 20, 2026
4f612a9
comment: add class comments and attribute comments to PopCheckPoint
winglechen May 21, 2026
baef876
comment: add attribute comments to PopCheckPointWrapper
winglechen May 23, 2026
dac6f4e
comment: add method comments to addCkMock of PopBufferMergeService
winglechen May 23, 2026
ffea2f4
comment: add method comments to addCk of PopBufferMergeService
winglechen May 23, 2026
31a456d
comment: add method comments to putCkToStore of PopBufferMergeService
winglechen May 23, 2026
81a3a14
comment: add attribute comments to buffer and commitOffsets of PopBuf…
winglechen May 23, 2026
e6a279f
comment: add method comments to putOffsetQueue of PopBufferMergeService
winglechen May 23, 2026
6a247b5
comment: add process comments to method run of PopBufferMergeService
winglechen May 23, 2026
f5a0fc6
comment: add method comments to scanGarbage of PopBufferMergeService
winglechen May 23, 2026
a3ea07b
comment: add method comments to scan of PopBufferMergeService
winglechen May 23, 2026
ed37ae3
Merge branch 'apache:develop' into comment
winglechen May 23, 2026
848f23f
comment: add process comments to method addAk of PopBufferMergeService
winglechen May 26, 2026
00252be
comment: add method comments to addAk of PopBufferMergeService
winglechen May 26, 2026
801a041
comment: add attribute comments to queueOffsetDiff of PopCheckPoint
winglechen May 26, 2026
cd92c92
comment: add notes about queueOffsetDiff null scenario of PopCheckPoint
winglechen May 26, 2026
4576158
Merge branch 'apache:develop' into comment
winglechen May 26, 2026
e3287d4
comment: update attribute comments for bits and toStoreBits of PopChe…
winglechen May 26, 2026
107d6fb
comment: add class and method comments to DataConverter
winglechen May 26, 2026
404019b
comment: add method comments to markBitCAS of PopBufferMergeService
winglechen May 26, 2026
f49cbe4
comment: add method comments to indexOfAck of PopCheckPoint
winglechen May 26, 2026
b98be60
comment: add method comments to ackOffsetByIndex of PopCheckPoint
winglechen May 26, 2026
1049c19
comment: add inline comments to putAckToStore and scan of PopBufferMe…
winglechen May 26, 2026
6c561f5
comment: add method comments to putAckToStore of PopBufferMergeService
winglechen May 26, 2026
ca47354
comment: add method comments to handleAckPutMessageResult of PopBuffe…
winglechen May 26, 2026
2eb0e90
comment: add attribute comments to justOffset of PopCheckPointWrapper
winglechen May 26, 2026
6c554ae
comment: update attribute comments to reviveQueueOffset of PopCheckPo…
winglechen May 26, 2026
d3d9587
comment: add inline comment for reviveQueueOffset check in scan
winglechen May 26, 2026
a56679b
comment: fix inline comments in scan
winglechen May 26, 2026
fdfc788
comment: update inline comment for isCkDone check in scan
winglechen May 26, 2026
d138c0c
comment: add method comments to isCkDone of PopBufferMergeService
winglechen May 26, 2026
1246674
comment: add method comments to isCkDoneForFinish of PopBufferMergeSe…
winglechen May 26, 2026
0f4a4cb
comment: add method comments to scanCommitOffset of PopBufferMergeSer…
winglechen May 26, 2026
fbd9a5d
comment: add naming note to scanCommitOffset
winglechen May 26, 2026
57b5089
comment: add method comments to commitOffset of PopBufferMergeService
winglechen May 26, 2026
5278b4b
comment: add inline comment for scanCommitOffset call in scan
winglechen May 26, 2026
224815b
comment: add inline comments for store and remove in scan
winglechen May 27, 2026
26c908d
comment: add in-line comments to method scanGarbage of PopBufferMerge…
winglechen May 27, 2026
05e2f4d
comment: add class comments to PopReviveService
winglechen May 27, 2026
352e569
comment: add note about public methods to PopReviveService
winglechen May 27, 2026
3ebb734
comment: add in-line comments to method run of PopReviveService
winglechen May 27, 2026
cd64fa1
comment: add method comments to run of PopReviveService
winglechen May 27, 2026
72f7c36
comment: add method comments to consumeReviveMessage of PopReviveService
winglechen May 27, 2026
e85b23a
comment: add in-line comments to method consumeReviveMessage of PopRe…
winglechen May 27, 2026
6947ab3
comment: add method comments to getReviveMessage of PopReviveService
winglechen May 27, 2026
4874a13
comment: add in-line comments to method consumeReviveMessage of PopRe…
winglechen May 27, 2026
a024659
comment: add method comments to mockCkForAck and createMockCkForAck o…
winglechen May 27, 2026
0e682f3
comment: add method comments to mergeAndRevive of PopReviveService
winglechen May 27, 2026
8fbe422
comment: add in-line comments to method mergeAndRevive of PopReviveSe…
winglechen May 27, 2026
f163d0d
comment: add method comments to rePutCK of PopReviveService
winglechen May 27, 2026
fe4ff18
comment: add method comments to reviveMsgFromCk of PopReviveService
winglechen May 27, 2026
feb456b
Merge branch 'apache:develop' into comment
winglechen May 27, 2026
4891cff
comment: add in-line comments to method reviveMsgFromCk of PopReviveS…
winglechen May 27, 2026
10a7fe8
comment: add concurrency control comments to method mergeAndRevive of…
winglechen May 27, 2026
5d121c1
Merge branch 'comment' of github.com:wolforest/rocketmq-comment into …
winglechen May 27, 2026
6145444
comment: add attribute comments to inflightReviveRequestMap of PopRev…
winglechen May 27, 2026
f24cae0
comment: add in-line comments to method reviveRetry of PopReviveService
winglechen May 27, 2026
241951d
comment: add method comments to reviveRetry of PopReviveService
winglechen May 27, 2026
d51909c
comment: add class comments to AckMessageProcessor
winglechen May 27, 2026
7a35bb4
comment: add class comments to ChangeInvisibleTimeProcessor
winglechen May 27, 2026
3efe3ba
comment: add note about default mode in PopMessageProcessor
winglechen May 28, 2026
76495cf
comment: add in-line comments to method processRequest of AckMessageP…
winglechen May 28, 2026
0485831
comment: add method comments to processRequest of AckMessageProcessor
winglechen May 28, 2026
7927a88
comment: add in-line comments to method processRequest of ChangeInvis…
winglechen May 28, 2026
0d5ebc9
comment: add in-line comments to method processRequestAsync of Change…
winglechen May 28, 2026
7b762e0
comment: add in-line comments to method appendCheckPointThenAckOrigin…
winglechen May 28, 2026
328195d
comment: add in-line comments to method ackOrigin of ChangeInvisibleT…
winglechen May 28, 2026
aa04f66
comment: update inline comment for appendCheckPointThenAckOrigin
winglechen May 28, 2026
b72eb4e
comment: add method comments to processRequestAsync of ChangeInvisibl…
winglechen May 28, 2026
37c4ae2
comment: add method comments to appendCheckPointThenAckOrigin of Chan…
winglechen May 28, 2026
92cf992
comment: add method comments to ackOrigin of ChangeInvisibleTimeProce…
winglechen May 28, 2026
97d68c8
comment: update inline comment for ack mode in AckMessageProcessor
winglechen May 28, 2026
fe753cb
comment: add in-line comments to method appendAck of AckMessageProcessor
winglechen May 28, 2026
f92c4e8
comment: add method comments to appendAck of AckMessageProcessor
winglechen May 28, 2026
d5f4348
comment: add class comments to PopMessageProcessor
winglechen May 28, 2026
72698c1
comment: add method comments to processRequest of PopMessageProcessor
winglechen May 28, 2026
24d2877
comment: add inline comments for kv path in PopMessageProcessor
winglechen May 28, 2026
318426e
comment: update inline comment for long polling in PopMessageProcessor
winglechen May 28, 2026
ed4aa41
comment: add in-line comments to method popAsync of PopConsumerService
winglechen May 28, 2026
de70be6
comment: add method comments to popAsync of PopConsumerService
winglechen May 28, 2026
31b092b
comment: add method comments to getMessageFromTopicAsync of PopConsum…
winglechen May 28, 2026
353f71b
comment: add in-line comments to method getMessageFromTopicAsync of P…
winglechen May 28, 2026
d2dc764
comment: add method comments to getMessageAsync of PopConsumerService
winglechen May 28, 2026
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,24 @@
import static org.apache.rocketmq.broker.longpolling.PollingResult.POLLING_SUC;
import static org.apache.rocketmq.broker.longpolling.PollingResult.POLLING_TIMEOUT;

/**
* Pop-mode long polling service that suspends Pop requests and wakes them up when new messages arrive.
* <p>
* Core responsibilities:
* <ul>
* <li>Suspend Pop requests — when the broker has no messages to return immediately, registers requests
* into the {@code pollingMap} keyed by {@code topic@cid@queueId} and waits</li>
* <li>Wake up on new message arrival — {@link #notifyMessageArriving} is triggered by the message arriving
* listener; it fetches matching Pop requests from the pollingMap, applies Tag filtering, and re-submits
* them to the PopMessageProcessor to return results to the client</li>
* <li>Timeout scanning — the background thread periodically scans the waiting queues and wakes up
* timed-out requests with an empty result</li>
* <li>Retry topic bridging — {@link #notifyMessageArrivingFromRetry} translates a new message on the retry
* topic into a wake-up notification on the original topic</li>
* <li>Resource cleanup — periodically removes stale polling entries for deleted topics or
* offline consumer groups</li>
* </ul>
*/
public class PopLongPollingService extends ServiceThread {

private static final Logger POP_LOGGER =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -298,6 +298,31 @@ public boolean isFifoBlocked(PopConsumerContext context, String groupId, String
context.getAttemptId(), topicId, groupId, queueId, context.getInvisibleTime());
}

/**
* Fetch messages from a single queue and append them to the pop context.
*
* <p>Chained via {@link CompletableFuture#thenCompose} from
* {@link #getMessageFromTopicAsync}. When the batch is already full
* ({@code remain <= 0}), the pending count is added to the context and
* the chain stops. Otherwise, messages are fetched from the store and
* the result is merged into the context via {@link #handleGetMessageResult}.
*
* <p>Early termination can occur inside this method when:
* <ul>
* <li>Too many inflight (un-acked) messages exist</li>
* <li>A FIFO queue is blocked</li>
* </ul>
*
* @param future the accumulator future carrying the pop context
* @param clientHost the client address
* @param groupId consumer group id
* @param topicId topic name
* @param queueId queue id
* @param batchSize max number of messages still needed
* @param filter message filter
* @param retryType whether this is a retry topic V1/V2
* @return a future completing with the pop context updated with results
*/
protected CompletableFuture<PopConsumerContext> getMessageAsync(CompletableFuture<PopConsumerContext> future,
String clientHost, String groupId, String topicId, int queueId, int batchSize, MessageFilter filter,
PopConsumerRecord.RetryType retryType) {
Expand Down Expand Up @@ -335,13 +360,38 @@ protected CompletableFuture<PopConsumerContext> getMessageAsync(CompletableFutur
});
}

/**
* Fetch messages from every read queue of a topic via a CompletableFuture chain.
*
* <p>Each queue is visited once. For each queue the
* {@link #getMessageAsync(CompletableFuture, String, String, String, int, int, MessageFilter, PopConsumerRecord.RetryType)}
* method is chained via {@link CompletableFuture#thenCompose}. The chain carries
* the accumulated result through all queues, stopping early when the batch is
* filled, the queue is blocked, or the inflight threshold is reached.
*
* <p>Queue iteration order respects {@code priorityOrderAsc} and uses
* {@code requestCount} as a round-robin offset for load balancing.
*
* @param future the accumulator future
* @param clientHost the client address
* @param groupId consumer group id
* @param topicId topic name
* @param requestCount round-robin counter for queue selection
* @param batchSize max number of messages to return
* @param filter message filter expression
* @param retryType whether this is a retry topic V1/V2
* @return a future completing with the pop result context
*/
protected CompletableFuture<PopConsumerContext> getMessageFromTopicAsync(CompletableFuture<PopConsumerContext> future,
String clientHost, String groupId, String topicId, long requestCount, int batchSize, MessageFilter filter,
PopConsumerRecord.RetryType retryType) {
// get topic config
TopicConfig topicConfig = this.brokerController.getTopicConfigManager().selectTopicConfig(topicId);
if (null == topicConfig) {
return future;
}

// iterate all queues of the topic
for (int i = 0; i < topicConfig.getReadQueueNums(); i++) {
long index = (brokerController.getBrokerConfig().isPriorityOrderAsc() ?
topicConfig.getReadQueueNums() - 1 - i : i) + requestCount;
Expand All @@ -352,10 +402,38 @@ protected CompletableFuture<PopConsumerContext> getMessageFromTopicAsync(Complet
return future;
}

/**
* Asynchronously pop messages for the KVStore-based ack path.
*
* <p>This method coordinates the full Pop lifecycle:
* <ol>
* <li>Validates topic, group, and acquires the consumer lock</li>
* <li>Determines whether to pull from retry topic first
* (based on {@code popFromRetryProbability})</li>
* <li>Pulls messages from normal topic (and retry topic V1/V2 if configured)</li>
* <li>Writes checkpoints to {@link PopConsumerCache} (buffer merge) or
* {@link PopConsumerKVStore} (RocksDB)</li>
* <li>Re-encodes retry messages if needed</li>
* </ol>
*
* @param clientHost the client address
* @param popTime the pop invocation timestamp
* @param invisibleTime the message visibility timeout
* @param groupId consumer group id
* @param topicId topic name
* @param queueId queue id (-1 for all queues)
* @param batchSize max number of messages to return
* @param fifo whether this is a FIFO ordered consumption
* @param attemptId attempt id for idempotent consumption
* @param initMode consume init mode (min/max)
* @param filter message filter expression
* @return a future that completes with the pop result context
*/
public CompletableFuture<PopConsumerContext> popAsync(String clientHost, long popTime, long invisibleTime,
String groupId, String topicId, int queueId, int batchSize, boolean fifo, String attemptId, int initMode,
MessageFilter filter) {

// init context params
PopConsumerContext popConsumerContext =
new PopConsumerContext(clientHost, popTime, invisibleTime, groupId, fifo, initMode, attemptId);

Expand Down Expand Up @@ -391,25 +469,30 @@ public CompletableFuture<PopConsumerContext> popAsync(String clientHost, long po
CompletableFuture.completedFuture(popConsumerContext);

try {
// get message from retry topic,
if (!fifo && preferRetry) {
// default config of retrieveMessageFromPopRetryTopicV1 is true,
if (brokerConfig.isRetrieveMessageFromPopRetryTopicV1()) {
getMessageFuture = this.getMessageFromTopicAsync(getMessageFuture, clientHost, groupId,
retryTopicV1, requestCount, batchSize, filter, PopConsumerRecord.RetryType.RETRY_TOPIC_V1);
}

// default config of enableRetryTopicV2 is false
if (brokerConfig.isEnableRetryTopicV2()) {
getMessageFuture = this.getMessageFromTopicAsync(getMessageFuture, clientHost, groupId,
retryTopicV2, requestCount, batchSize, filter, PopConsumerRecord.RetryType.RETRY_TOPIC_V2);
}
}

// get message from normal topic
if (queueId != -1) {
getMessageFuture = this.getMessageAsync(getMessageFuture, clientHost, groupId,
topicId, queueId, batchSize, filter, PopConsumerRecord.RetryType.NORMAL_TOPIC);
} else {
getMessageFuture = this.getMessageFromTopicAsync(getMessageFuture, clientHost, groupId,
topicId, requestCount, batchSize, filter, PopConsumerRecord.RetryType.NORMAL_TOPIC);

// get message from retry topic
if (!fifo && !preferRetry) {
if (brokerConfig.isRetrieveMessageFromPopRetryTopicV1()) {
getMessageFuture = this.getMessageFromTopicAsync(getMessageFuture, clientHost, groupId,
Expand All @@ -425,13 +508,16 @@ public CompletableFuture<PopConsumerContext> popAsync(String clientHost, long po

return getMessageFuture.thenCompose(result -> {
if (result.isFound() && !result.isFifo()) {
// write checkpoint to cache or store
// default config of enablePopBufferMerge is false
if (brokerConfig.isEnablePopBufferMerge() &&
popConsumerCache != null && !popConsumerCache.isCacheFull()) {
this.popConsumerCache.writeRecords(result.getPopConsumerRecordList());
} else {
this.popConsumerStore.writeRecords(result.getPopConsumerRecordList());
}

// format result
for (int i = 0; i < result.getGetMessageResultList().size(); i++) {
GetMessageResult getMessageResult = result.getGetMessageResultList().get(i);
PopConsumerRecord popConsumerRecord = result.getPopConsumerRecordList().get(i);
Expand All @@ -449,6 +535,7 @@ public CompletableFuture<PopConsumerContext> popAsync(String clientHost, long po
}
return CompletableFuture.completedFuture(result);
}).whenComplete((result, throwable) -> {
// unlock by consumerLockService
try {
if (throwable != null) {
log.error("PopConsumerService popAsync get message error",
Expand Down
Loading