diff --git a/broker/src/main/java/org/apache/rocketmq/broker/longpolling/PopLongPollingService.java b/broker/src/main/java/org/apache/rocketmq/broker/longpolling/PopLongPollingService.java index c595178d193..134887f9ad4 100644 --- a/broker/src/main/java/org/apache/rocketmq/broker/longpolling/PopLongPollingService.java +++ b/broker/src/main/java/org/apache/rocketmq/broker/longpolling/PopLongPollingService.java @@ -52,6 +52,24 @@ import static org.apache.rocketmq.broker.longpolling.PollingResult.POLLING_SUC; import static org.apache.rocketmq.broker.longpolling.PollingResult.POLLING_TIMEOUT; +/** + * Pop-mode long polling service that suspends Pop requests and wakes them up when new messages arrive. + *
+ * Core responsibilities: + *
Chained via {@link CompletableFuture#thenCompose} from + * {@link #getMessageFromTopicAsync}. When the batch is already full + * ({@code remain <= 0}), the pending count is added to the context and + * the chain stops. Otherwise, messages are fetched from the store and + * the result is merged into the context via {@link #handleGetMessageResult}. + * + *
Early termination can occur inside this method when: + *
Each queue is visited once. For each queue the + * {@link #getMessageAsync(CompletableFuture, String, String, String, int, int, MessageFilter, PopConsumerRecord.RetryType)} + * method is chained via {@link CompletableFuture#thenCompose}. The chain carries + * the accumulated result through all queues, stopping early when the batch is + * filled, the queue is blocked, or the inflight threshold is reached. + * + *
Queue iteration order respects {@code priorityOrderAsc} and uses
+ * {@code requestCount} as a round-robin offset for load balancing.
+ *
+ * @param future the accumulator future
+ * @param clientHost the client address
+ * @param groupId consumer group id
+ * @param topicId topic name
+ * @param requestCount round-robin counter for queue selection
+ * @param batchSize max number of messages to return
+ * @param filter message filter expression
+ * @param retryType whether this is a retry topic V1/V2
+ * @return a future completing with the pop result context
+ */
protected CompletableFuture This method coordinates the full Pop lifecycle:
+ * Handles both single ({@link RequestCode#ACK_MESSAGE}) and batch
+ * ({@link RequestCode#BATCH_ACK_MESSAGE}) acks. Each ack is processed
+ * through one of two paths:
+ * Orderly ack is handled separately by {@link #ackOrderly} /
+ * {@link #ackOrderlyNew}, which update the consumer order info and advance
+ * the consumer offset while notifying any long-polling waiters.
+ *
+ * This class also owns and manages the {@link PopReviveService} instances
+ * for the file-based revive path.
+ */
public class AckMessageProcessor implements NettyRequestProcessor {
private static final Logger POP_LOGGER = LoggerFactory.getLogger(LoggerName.ROCKETMQ_POP_LOGGER_NAME);
@@ -121,13 +143,36 @@ public boolean rejectRequest() {
return false;
}
+ /**
+ * Process an ack request (single or batch).
+ *
+ * Routes to one of two paths based on {@code popConsumerKVServiceEnable}:
+ * Orderly acks ({@code rqId == POP_ORDER_REVIVE_QUEUE}) are handled by
+ * {@link #ackOrderly} / {@link #ackOrderlyNew} instead.
+ *
+ * @param channel the Netty channel of the requesting client
+ * @param request the incoming request
+ * @param brokerAllowSuspend whether the broker may suspend the request
+ * @return the response to send back to the client
+ * @throws RemotingCommandException if the request cannot be decoded
+ */
private RemotingCommand processRequest(final Channel channel, RemotingCommand request,
boolean brokerAllowSuspend) throws RemotingCommandException {
+ // init context params
AckMessageRequestHeader requestHeader;
BatchAckMessageRequestBody reqBody = null;
final RemotingCommand response = RemotingCommand.createResponseCommand(ResponseCode.SUCCESS, null);
response.setOpaque(request.getOpaque());
+
if (request.getCode() == RequestCode.ACK_MESSAGE) {
+ // decode and validate request
requestHeader = (AckMessageRequestHeader) request.decodeCommandCustomHeader(AckMessageRequestHeader.class);
TopicConfig topicConfig = this.brokerController.getTopicConfigManager().selectTopicConfig(requestHeader.getTopic());
@@ -167,12 +212,15 @@ private RemotingCommand processRequest(final Channel channel, RemotingCommand re
response.setRemark(errorInfo);
return response;
}
+
+ // append ack, default mode is file based merge, call appendAck
if (brokerController.getBrokerConfig().isPopConsumerKVServiceEnable()) {
appendAckNew(requestHeader, null, response, channel, null);
} else {
appendAck(requestHeader, null, response, channel, null);
}
} else if (request.getCode() == RequestCode.BATCH_ACK_MESSAGE) {
+ // decode and validate request
if (request.getBody() != null) {
reqBody = BatchAckMessageRequestBody.decode(request.getBody(), BatchAckMessageRequestBody.class);
}
@@ -180,7 +228,10 @@ private RemotingCommand processRequest(final Channel channel, RemotingCommand re
response.setCode(ResponseCode.NO_MESSAGE);
return response;
}
+
+ // process each ack
for (BatchAck bAck : reqBody.getAcks()) {
+ // default value of popConsumerKVServiceEnable is false
if (brokerController.getBrokerConfig().isPopConsumerKVServiceEnable()) {
appendAckNew(null, bAck, response, channel, reqBody.getBrokerName());
} else {
@@ -188,6 +239,7 @@ private RemotingCommand processRequest(final Channel channel, RemotingCommand re
}
}
} else {
+ // unsupported request, logging and return
POP_LOGGER.error("AckMessageProcessor failed to process RequestCode: {}, consumer: {} ", request.getCode(), RemotingHelper.parseChannelRemoteAddr(channel));
response.setCode(ResponseCode.MESSAGE_ILLEGAL);
response.setRemark(String.format("AckMessageProcessor failed to process RequestCode: %d", request.getCode()));
@@ -196,8 +248,31 @@ private RemotingCommand processRequest(final Channel channel, RemotingCommand re
return response;
}
+ /**
+ * Append an ack (single or batch) in the file-based path.
+ *
+ * For single ack: parses the extra info from the request header,
+ * routes orderly acks to {@link #ackOrderly}, or creates a single {@link AckMsg}.
+ *
+ * For batch ack: expands the {@link BitSet} from the
+ * {@link BatchAck} into individual offsets, routes orderly acks individually,
+ * and packs the remaining offsets into a {@link BatchAckMsg}.
+ *
+ * The ack is first offered to {@link PopBufferMergeService#addAk}.
+ * If the buffer merge is not available, the ack is serialized as JSON and
+ * written to the revive topic with tag {@link PopAckConstants#ACK_TAG}
+ * or {@link PopAckConstants#BATCH_ACK_TAG}.
+ *
+ * @param requestHeader the single-ack request header (null for batch)
+ * @param batchAck the batch ack body (null for single)
+ * @param response the response to modify on error
+ * @param channel the Netty channel
+ * @param brokerName the broker name
+ * @throws RemotingCommandException if offset validation fails
+ */
private void appendAck(final AckMessageRequestHeader requestHeader, final BatchAck batchAck,
final RemotingCommand response, final Channel channel, String brokerName) throws RemotingCommandException {
+ // init context params
String[] extraInfo;
String consumeGroup, topic;
int qId, rqId;
@@ -205,8 +280,11 @@ private void appendAck(final AckMessageRequestHeader requestHeader, final BatchA
long popTime, invisibleTime;
AckMsg ackMsg;
int ackCount = 0;
+
+ // ack orderly or set context params
if (batchAck == null) {
// single ack
+ // set context params
extraInfo = ExtraInfoUtil.split(requestHeader.getExtraInfo());
brokerName = ExtraInfoUtil.getBrokerName(extraInfo);
consumeGroup = requestHeader.getConsumerGroup();
@@ -218,15 +296,18 @@ private void appendAck(final AckMessageRequestHeader requestHeader, final BatchA
popTime = ExtraInfoUtil.getPopTime(extraInfo);
invisibleTime = ExtraInfoUtil.getInvisibleTime(extraInfo);
+ // ack orderly if revive queue
if (rqId == KeyBuilder.POP_ORDER_REVIVE_QUEUE) {
ackOrderly(topic, consumeGroup, qId, ackOffset, popTime, invisibleTime, channel, response);
return;
}
+ // set ackMsg and ackCount
ackMsg = new AckMsg();
ackCount = 1;
} else {
// batch ack
+ // set context params
consumeGroup = batchAck.getConsumerGroup();
topic = ExtraInfoUtil.getRealTopic(batchAck.getTopic(), batchAck.getConsumerGroup(), batchAck.getRetry());
qId = batchAck.getQueueId();
@@ -236,6 +317,7 @@ private void appendAck(final AckMessageRequestHeader requestHeader, final BatchA
popTime = batchAck.getPopTime();
invisibleTime = batchAck.getInvisibleTime();
+ // offset check
long minOffset = this.brokerController.getMessageStore().getMinOffsetInQueue(topic, qId);
long maxOffset;
try {
@@ -248,6 +330,7 @@ private void appendAck(final AckMessageRequestHeader requestHeader, final BatchA
return;
}
+ // ack orderly or add offset to batchAckMsg
BatchAckMsg batchAckMsg = new BatchAckMsg();
BitSet bitSet = batchAck.getBitSet();
for (int i = bitSet.nextSetBit(0); i >= 0; i = bitSet.nextSetBit(i + 1)) {
@@ -264,10 +347,13 @@ private void appendAck(final AckMessageRequestHeader requestHeader, final BatchA
batchAckMsg.getAckOffsetList().add(offset);
}
}
+
+ // skip if empty or is revive queue
if (rqId == KeyBuilder.POP_ORDER_REVIVE_QUEUE || batchAckMsg.getAckOffsetList().isEmpty()) {
return;
}
+ // set ackMsg and ackCount
ackMsg = batchAckMsg;
ackCount = batchAckMsg.getAckOffsetList().size();
}
@@ -275,6 +361,7 @@ private void appendAck(final AckMessageRequestHeader requestHeader, final BatchA
this.brokerController.getBrokerStatsManager().incBrokerAckNums(ackCount);
this.brokerController.getBrokerStatsManager().incGroupAckNums(consumeGroup, topic, ackCount);
+ // set ackMsg
ackMsg.setConsumerGroup(consumeGroup);
ackMsg.setTopic(topic);
ackMsg.setQueueId(qId);
@@ -283,11 +370,13 @@ private void appendAck(final AckMessageRequestHeader requestHeader, final BatchA
ackMsg.setPopTime(popTime);
ackMsg.setBrokerName(brokerName);
+ // add ackMsg
if (this.brokerController.getPopMessageProcessor().getPopBufferMergeService().addAk(rqId, ackMsg)) {
brokerController.getPopInflightMessageCounter().decrementInFlightMessageNum(topic, consumeGroup, popTime, qId, ackCount);
return;
}
+ // create revive message by ackMsg, if add ackMsg failed
MessageExtBrokerInner msgInner = new MessageExtBrokerInner();
msgInner.setTopic(reviveTopic);
msgInner.setBody(JSON.toJSONString(ackMsg).getBytes(StandardCharsets.UTF_8));
@@ -305,7 +394,9 @@ private void appendAck(final AckMessageRequestHeader requestHeader, final BatchA
msgInner.setDeliverTimeMs(popTime + invisibleTime);
msgInner.getProperties().put(MessageConst.PROPERTY_UNIQ_CLIENT_MESSAGE_ID_KEYIDX, PopMessageProcessor.genAckUniqueId(ackMsg));
msgInner.setPropertiesString(MessageDecoder.messageProperties2String(msgInner.getProperties()));
- if (brokerController.getBrokerConfig().isAppendAckAsync()) {
+
+ // store revive message
+ if (brokerController.getBrokerConfig().isAppendAckAsync()) { // default is false
int finalAckCount = ackCount;
this.brokerController.getEscapeBridge().asyncPutMessageToSpecificQueue(msgInner).thenAccept(putMessageResult -> {
handlePutMessageResult(putMessageResult, ackMsg, topic, consumeGroup, popTime, qId, finalAckCount);
diff --git a/broker/src/main/java/org/apache/rocketmq/broker/processor/ChangeInvisibleTimeProcessor.java b/broker/src/main/java/org/apache/rocketmq/broker/processor/ChangeInvisibleTimeProcessor.java
index 5ff132ca237..5de0c3eac78 100644
--- a/broker/src/main/java/org/apache/rocketmq/broker/processor/ChangeInvisibleTimeProcessor.java
+++ b/broker/src/main/java/org/apache/rocketmq/broker/processor/ChangeInvisibleTimeProcessor.java
@@ -27,6 +27,7 @@
import org.apache.rocketmq.broker.offset.ConsumerOffsetManager;
import org.apache.rocketmq.broker.offset.MemoryConsumerOrderInfoManager;
import org.apache.rocketmq.broker.pop.PopConsumerLockService;
+import org.apache.rocketmq.broker.pop.PopConsumerService;
import org.apache.rocketmq.broker.pop.orderly.ConsumerOrderInfoManager;
import org.apache.rocketmq.common.PopAckConstants;
import org.apache.rocketmq.common.TopicConfig;
@@ -52,6 +53,23 @@
import org.apache.rocketmq.store.pop.AckMsg;
import org.apache.rocketmq.store.pop.PopCheckPoint;
+/**
+ * Processes the nack {@code ChangeInvisibleTime} request from consumers.
+ *
+ * When a consumer needs more time to process a message (or wants to
+ * suspend/nack it), this processor updates the message's visibility
+ * timeout. The implementation varies by the ack mode:
+ * For orderly consumption, the next visible time is updated directly in
+ * the {@link ConsumerOrderInfoManager} without writing to the revive topic.
+ */
public class ChangeInvisibleTimeProcessor implements NettyRequestProcessor {
private static final Logger POP_LOGGER = LoggerFactory.getLogger(LoggerName.ROCKETMQ_POP_LOGGER_NAME);
private final BrokerController brokerController;
@@ -76,8 +94,12 @@ public boolean rejectRequest() {
private RemotingCommand processRequest(final Channel channel, RemotingCommand request,
boolean brokerAllowSuspend) throws RemotingCommandException {
+ // process request async
CompletableFuture Routes to the appropriate handler based on message type:
+ * Called after the new checkpoint has been written successfully. This method
+ * writes an {@link PopAckConstants#ACK_TAG} message that matches the
+ * original checkpoint's merge key. When {@link PopReviveService} processes this
+ * ack, it sets the corresponding bit in the old CK's bitMap, causing
+ * the old CK to be treated as fully acked and skipped during revive.
+ *
+ * If {@link PopBufferMergeService#addAk} accepts the ack (buffer
+ * merge enabled), it is merged in memory without writing to the store.
+ *
+ * @param requestHeader the original request header
+ * @param extraInfo the extra info from the original pop request
+ * @return a future that completes with {@code true} on success
+ */
private CompletableFuture This is the core of the file-based non-orderly ChangeInvisibleTime path:
+ * buffer checkpoint in memory then enqueue them into system revive queue then wait to be acked.
+ *
+ * Two in-memory data structures drive the merge logic:
+ * The background {@link #scan()} thread periodically evaluates each buffered CK:
+ * This service is enabled by {@code enablePopBufferMerge} and only runs on
+ * a master or a slave acting as master. When {@code enablePopBatchAck} is set,
+ * multiple ack offsets are packed into a single {@link BatchAckMsg}.
+ */
public class PopBufferMergeService extends ServiceThread {
private static final Logger POP_LOGGER = LoggerFactory.getLogger(LoggerName.ROCKETMQ_POP_LOGGER_NAME);
+ /**
+ * In-memory map of check points.
+ * Key: topic + group + queueId + startOffset + popTime + brokerName
+ * Value: check point wrapper
+ * use cases:
+ * - scan: iterate buffer
+ * - addAckMsg: get check point from buffer and mark ack state of Check Point
+ */
ConcurrentHashMap For each {@code topic@cid@queueId} queue, the method peeks the head (oldest)
+ * wrapper and checks whether it is ready to commit:
+ * If the head is ready, it is committed and removed. Processing continues
+ * to the next wrapper in the same queue. If the head is not ready, the loop
+ * breaks — this ensures strict FIFO order and prevents consumer offset
+ * regression.
+ *
+ * Called at the end of {@link #scan()} after the buffer has been processed.
+ *
+ * @return the total number of remaining wrappers across all queues (for logging)
+ */
private int scanCommitOffset() {
Iterator Three types of entries are removed:
+ * For each entry in {@link #buffer}:
+ * After processing the buffer, calls {@link #scanCommitOffset()} to commit offsets
+ * for finished checkpoints in FIFO order.
+ *
+ * If the scan duration exceeds {@code popCkStayBufferTimeOut - 1000ms}, the service
+ * temporarily stops accepting new CKs ({@link #serving} = false) to avoid backlog.
+ */
private void scan() {
long startTime = System.currentTimeMillis();
AtomicInteger count = new AtomicInteger(0);
@@ -244,7 +352,6 @@ private void scan() {
continue;
}
-
// just process offset(already stored at pull thread), or buffer ck(not stored and ack finish)
if (pointWrapper.isJustOffset() && pointWrapper.isCkStored() || isCkDone(pointWrapper)
|| isCkDoneForFinish(pointWrapper) && pointWrapper.isCkStored()) {
@@ -259,6 +366,7 @@ private void scan() {
PopCheckPoint point = pointWrapper.getCk();
long now = System.currentTimeMillis();
+ // check whether check point is timeout
boolean removeCk = !this.serving;
// ck will be timeout
if (point.getReviveTime() - now < brokerController.getBrokerConfig().getPopCkStayBufferTimeOut()) {
@@ -275,17 +383,18 @@ private void scan() {
}
// double check
- if (isCkDone(pointWrapper)) {
+ if (isCkDone(pointWrapper)) { // all checkpoint are acked, do nothing
continue;
- } else if (pointWrapper.isJustOffset()) {
+ } else if (pointWrapper.isJustOffset()) { // store checkpoint
// just offset should be in store.
if (pointWrapper.getReviveQueueOffset() < 0) {
putCkToStore(pointWrapper, this.brokerController.getBrokerConfig().isAppendCkAsync());
countCk++;
}
continue;
- } else if (removeCk) {
+ } else if (removeCk) { // store checkpoint if needed
// put buffer ak to store
+ // revive queue offset < 0 means checkpoint was not stored
if (pointWrapper.getReviveQueueOffset() < 0) {
putCkToStore(pointWrapper, this.brokerController.getBrokerConfig().isAppendCkAsync());
countCk++;
@@ -295,11 +404,13 @@ private void scan() {
continue;
}
- if (brokerController.getBrokerConfig().isEnablePopBatchAck()) {
+ // store checkpoint
+ if (brokerController.getBrokerConfig().isEnablePopBatchAck()) { // default is false
List Uses a CAS (compare-and-swap) loop to ensure thread safety without locking.
+ * If the bit is already set, this method returns immediately (no-op).
+ *
+ * @param setBits the atomic bitmask to update
+ * @param index the bit position (0-based)
+ */
private void markBitCAS(AtomicInteger setBits, int index) {
while (true) {
int bits = setBits.get();
@@ -384,6 +508,22 @@ private void markBitCAS(AtomicInteger setBits, int index) {
}
}
+ /**
+ * Commit the consumer offset for the checkpoint's {@code topic@cid@queueId}.
+ *
+ * Called from {@link #scanCommitOffset()} after the checkpoint is confirmed
+ * as finished (all acks received or CK stored). The offset is advanced to
+ * {@link PopCheckPointWrapper#nextBeginOffset}, which is the offset of the
+ * first message after this batch.
+ *
+ * The operation is guarded by {@link PopMessageProcessor.QueueLockManager}
+ * to prevent concurrent offset updates on the same queue.
+ *
+ * @param wrapper the finished checkpoint wrapper
+ * @return {@code true} if the offset was committed or no commit is needed
+ * ({@code nextBeginOffset < 0}); {@code false} if the lock could
+ * not be acquired (caller should retry later)
+ */
private boolean commitOffset(final PopCheckPointWrapper wrapper) {
if (wrapper.getNextBeginOffset() < 0) {
return true;
@@ -413,8 +553,25 @@ private boolean commitOffset(final PopCheckPointWrapper wrapper) {
return true;
}
+ /**
+ * Enqueue the checkpoint wrapper into the per-{@code topic@cid@queueId} offset queue
+ * for sequential offset committing.
+ *
+ * The queue is maintained in FIFO order. The {@link #scanCommitOffset()} method
+ * drains the queue from the head, ensuring that offsets are committed in the same
+ * order as the checkpoints were created, which prevents consumer offset regression.
+ *
+ * The {@link QueueWithTime#time} is also updated to the CK's pop time so that
+ * {@link #scanGarbage()} can identify and remove stale entries after 5 minutes of
+ * inactivity.
+ *
+ * @param pointWrapper the checkpoint wrapper to enqueue
+ * @return true if the element was added to the queue successfully
+ */
private boolean putOffsetQueue(PopCheckPointWrapper pointWrapper) {
QueueWithTime The ack is not written to the revive topic immediately. Instead, a flag is
+ * set in {@link PopCheckPointWrapper#bits} via {@link #markBitCAS}.
+ * The pending ack will later be flushed to storage by {@link #scan()} when the
+ * checkpoint is evicted (timeout / buffer full / service stopping).
+ *
+ * Rejection conditions (return false):
+ * Every sub-message has a corresponding bit in
+ * {@link PopCheckPointWrapper#bits}. This method returns {@code true} when
+ * all bits are set, meaning the CK can be removed from the buffer without
+ * writing any ack to the revive topic (clean completion).
+ *
+ * @param pointWrapper the checkpoint wrapper to check
+ * @return {@code true} if every sub-message has been acked
+ */
private boolean isCkDone(PopCheckPointWrapper pointWrapper) {
byte num = pointWrapper.getCk().getNum();
for (byte i = 0; i < num; i++) {
@@ -807,6 +1053,18 @@ private boolean isCkDone(PopCheckPointWrapper pointWrapper) {
return true;
}
+ /**
+ * Check whether all acked sub-messages have been fully persisted.
+ *
+ * Uses XOR: {@code bits ^ toStoreBits}. A bit is set in the result when
+ * the corresponding sub-message has been acked ({@code bits}) but not yet
+ * persisted ({@code toStoreBits}). Returns {@code true} only when every
+ * acked message has also been persisted, meaning the checkpoint is ready
+ * for final cleanup.
+ *
+ * @param pointWrapper the checkpoint wrapper to check
+ * @return {@code true} if no ack remains to be persisted
+ */
private boolean isCkDoneForFinish(PopCheckPointWrapper pointWrapper) {
byte num = pointWrapper.getCk().getNum();
int bits = pointWrapper.getBits().get() ^ pointWrapper.getToStoreBits().get();
@@ -842,17 +1100,46 @@ public LinkedBlockingDeque Three-state indicator:
+ * When {@code true}:
+ * This is the core processor for the Pop consumption mode. It handles:
+ * This class also owns the {@link PopLongPollingService},
+ * {@link PopBufferMergeService}, and {@link QueueLockManager} instances
+ * used by the file-based ack path.
+ */
public class PopMessageProcessor implements NettyRequestProcessor {
private static final Logger POP_LOGGER = LoggerFactory.getLogger(LoggerName.ROCKETMQ_POP_LOGGER_NAME);
@@ -222,13 +241,36 @@ public void notifyMessageArriving(final String topic, final int queueId, final S
topic, queueId, cid, false, null, 0L, null, null);
}
+ /**
+ * Process a PopMessage request.
+ *
+ * This method handles the full Pop lifecycle:
+ * Queues are visited sequentially (respecting {@code priorityOrderAsc}).
+ * For each queue a {@link #popMsgFromQueue} call is chained via
+ * {@code CompletableFuture#thenCompose}. The chained future carries the
+ * remaining number of messages still needed ({@code restNum}).
+ *
+ * Early termination can occur inside {@link #popMsgFromQueue} when:
+ * This method is called as a step in a {@link CompletableFuture} chain
+ * (see {@link #popMsgFromTopic}). The {@code restNum} argument is the
+ * number of messages still needed — when it drops to {@code 0} or below,
+ * subsequent calls in the chain may short-circuit early.
+ *
+ * The method has several early-termination paths (all return
+ * immediately with the current {@code restNum}):
+ * Otherwise, it asynchronously fetches messages from the store, handles
+ * offset correction, updates order-consume tracking / checkpoint data, and
+ * merges the results into {@code getMessageResult}.
+ *
+ * @param topic topic name
+ * @param attemptId attempt id for idempotent consumption
+ * @param isRetry whether this is a retry topic
+ * @param getMessageResult accumulator for messages popped so far
+ * @param requestHeader pop request parameters
+ * @param queueId target queue id
+ * @param restNum number of messages still needed before the batch
+ * size is satisfied
+ * @param reviveQid revive queue id for checkpoint
+ * @param channel netty channel of the requesting client
+ * @param popTime pop invocation timestamp
+ * @param messageFilter expression filter applied to each message
+ * @param startOffsetInfo buffer for offset tracing info
+ * @param msgOffsetInfo buffer for per-message offset tracing info
+ * @param orderCountInfo buffer for order-consume count info
+ * @return a future completing with the remaining number of messages needed
+ */
private CompletableFuture There is only one public method for business: run Each revive queue has its own dedicated {@code PopReviveService} instance.
+ * The service periodically:
+ * This is the file-based revive path (CK + Ack messages are stored in
+ * the system revive topic). It is complemented by the KVStore-based path in
+ * {@code PopConsumerService} which handles the {@code PopConsumerKVStore} flow.
+ */
public class PopReviveService extends ServiceThread {
private static final Logger POP_LOGGER = LoggerFactory.getLogger(LoggerName.ROCKETMQ_POP_LOGGER_NAME);
private final int[] ckRewriteIntervalsInSeconds = new int[] { 10, 20, 30, 60, 120, 180, 240, 300, 360, 420, 480, 540, 600, 1200, 1800, 3600, 7200 };
@@ -74,6 +93,25 @@ public class PopReviveService extends ServiceThread {
private long currentReviveMessageTimestamp = -1;
private volatile boolean shouldRunPopRevive = false;
+ /**
+ * Tracks checkpoints that are currently being revived.
+ *
+ * Key — the checkpoint being processed.
+ * Value — a pair of (startTime, completed), where:
+ * The map is sorted by {@link PopCheckPoint#compareTo} (by startOffset).
+ * This ordering is used to drain completed entries from the head, ensuring
+ * the revive topic offset is committed strictly in sequence.
+ *
+ * Concurrency is limited to at most 3 entries at a time (see
+ * {@link #mergeAndRevive}). If an entry stays incomplete for over 30
+ * seconds, it is considered hung and is skipped via {@link #rePutCK}.
+ */
private final NavigableMap Constructs a new {@link MessageExtBrokerInner} from the original
+ * message, increments the reconsume count (unless suspended), sets the
+ * first-pop time and origin group properties, and writes it to the
+ * appropriate retry topic (V1 or V2 depending on configuration).
+ *
+ * If the retry topic does not exist, it is created automatically
+ * via {@link #addRetryTopicIfNotExist}.
+ *
+ * @param popCheckPoint the checkpoint that triggered the revive
+ * @param messageExt the original message to re-publish
+ * @return {@code true} if the message was written successfully
+ */
private boolean reviveRetry(PopCheckPoint popCheckPoint, MessageExt messageExt) {
+ // convert checkpoint to inner message
MessageExtBrokerInner msgInner = new MessageExtBrokerInner();
if (!popCheckPoint.getTopic().startsWith(MixAll.RETRY_GROUP_TOPIC_PREFIX)) {
msgInner.setTopic(KeyBuilder.buildPopRetryTopic(popCheckPoint.getTopic(), popCheckPoint.getCId(), brokerController.getBrokerConfig().isEnableRetryTopicV2()));
@@ -133,9 +187,15 @@ private boolean reviveRetry(PopCheckPoint popCheckPoint, MessageExt messageExt)
}
msgInner.getProperties().put(MessageConst.PROPERTY_ORIGIN_GROUP, popCheckPoint.getCId());
msgInner.setPropertiesString(MessageDecoder.messageProperties2String(msgInner.getProperties()));
+
+ // set topic and queueId
addRetryTopicIfNotExist(msgInner.getTopic(), popCheckPoint.getCId());
msgInner.setQueueId(getRetryQueueId(msgInner.getTopic(), messageExt));
+
+ // store message
PutMessageResult putMessageResult = brokerController.getEscapeBridge().putMessageToSpecificQueue(msgInner);
+
+ // logging and metric
brokerController.getBrokerMetricsManager().getPopMetricsManager().incPopReviveRetryMessageCount(popCheckPoint, putMessageResult.getPutMessageStatus());
if (brokerController.getBrokerConfig().isEnablePopLog()) {
POP_LOGGER.info("reviveQueueId={},retry msg, ck={}, msg queueId {}, offset {}, reviveDelay={}, result is {} ",
@@ -205,6 +265,17 @@ private int getRetryQueueId(String retryTopic, MessageExt messageExt) {
return oriQueueId;
}
+ /**
+ * Pull a batch of messages from the revive topic at the given offset.
+ *
+ * If the offset becomes illegal (e.g. the revive topic was truncated),
+ * the revive offset is corrected to {@code nextBeginOffset - 1} so that
+ * the next scan starts from a valid position.
+ *
+ * @param offset the queue offset to start reading from
+ * @param queueId the revive queue id
+ * @return a list of decoded messages, or {@code null} if at the tail
+ */
protected List This method reads messages from the revive topic starting from the
+ * current offset. Each message is classified by its tag:
+ * AckMsg that arrive after their checkpoint has already been processed
+ * ({@code enableSkipLongAwaitingAck}) are handled by creating a mock CK
+ * via {@link #mockCkForAck} so that the revive offset can still be
+ * committed correctly.
+ *
+ * The scan stops when any of:
+ * When an ack arrives long after its CK has been consumed (e.g. network
+ * delay), the CK is no longer in the scan map. If {@code enableSkipLongAwaitingAck}
+ * is enabled, this method creates a synthetic CK so that the revive offset
+ * can still be advanced correctly in {@link #mergeAndRevive}.
+ *
+ * @param messageExt the revive topic message that carried the ack
+ * @param ackMsg the decoded ack
+ * @param mergeKey the merge key for the CK lookup
+ * @param mockPointMap map to collect the mock CKs
+ * @return {@code true} if a mock CK was created
+ */
private boolean mockCkForAck(MessageExt messageExt, AckMsg ackMsg, String mergeKey, HashMap The mock CK has {@code num = 0} and empty bitMap, meaning no actual
+ * messages to revive. Its only purpose is to carry the {@code reviveOffset}
+ * so that the revive consumer offset can be committed past this ack.
+ *
+ * @param ackMsg the ack message
+ * @param reviveOffset the queue offset of the ack message in the revive topic
+ * @return a mock checkpoint with no sub-messages
+ */
private PopCheckPoint createMockCkForAck(AckMsg ackMsg, long reviveOffset) {
PopCheckPoint point = new PopCheckPoint();
point.setStartOffset(ackMsg.getStartOffset());
@@ -496,7 +634,26 @@ private PopCheckPoint createMockCkForAck(AckMsg ackMsg, long reviveOffset) {
return point;
}
+ /**
+ * Process collected checkpoints and revive all un-acked sub-messages.
+ *
+ * Checkpoints are sorted by revive offset. For each one:
+ * After processing, the revive topic offset is advanced past all
+ * processed checkpoints.
+ *
+ * @param consumeReviveObj the container with collected CKs and scan state
+ * @throws Throwable if any revive operation fails
+ */
protected void mergeAndRevive(ConsumeReviveObj consumeReviveObj) throws Throwable {
+ // sort checkpoints and init newOffset
ArrayList For each sub-message whose bit is not set in the bitMap, the original
+ * message is fetched via {@link #getBizMessage} and re-published to the
+ * retry topic via {@link #reviveRetry}. All revive attempts run
+ * concurrently via {@link CompletableFuture#allOf}.
+ *
+ * After all attempts complete:
+ * When a sub-message cannot be revived (e.g. the original message is
+ * temporarily unavailable), the CK is re-published with:
+ * If {@code rePutTimes} exceeds the backoff table length and
+ * {@code skipWhenCKRePutReachMaxTimes} is set, the CK is dropped.
+ *
+ * @param oldCK the original checkpoint that failed to revive
+ * @param pair the failed offset and result (object1 = offset, object2 = result)
+ */
private void rePutCK(PopCheckPoint oldCK, Pair Each iteration:
+ * An {@code int} bitmask is used to track the ack state of up to 32 sub-messages
+ * within a single Pop checkpoint (see {@code PopCheckPoint}).
+ */
public class DataConverter {
public static final Charset CHARSET_UTF8 = Charset.forName("UTF-8");
+ /**
+ * Convert a {@code long} to an 8-byte array (big-endian).
+ */
public static byte[] Long2Byte(Long v) {
ByteBuffer tmp = ByteBuffer.allocate(8);
tmp.putLong(v);
return tmp.array();
}
+ /**
+ * Set or clear the bit at {@code index} in an int bitmask.
+ * Uses {@code 1L} (long literal) to avoid signed-int overflow when {@code index == 31}.
+ *
+ * @param value the original bitmask
+ * @param index the bit position (0-based, 0..31)
+ * @param flag {@code true} to set, {@code false} to clear
+ * @return the updated bitmask
+ */
public static int setBit(int value, int index, boolean flag) {
if (flag) {
return (int) (value | (1L << index));
@@ -36,6 +54,13 @@ public static int setBit(int value, int index, boolean flag) {
}
}
+ /**
+ * Test whether the bit at {@code index} is set in an int bitmask.
+ *
+ * @param value the bitmask
+ * @param index the bit position (0-based, 0..31)
+ * @return {@code true} if the bit is 1
+ */
public static boolean getBit(int value, int index) {
return (value & (1L << index)) != 0;
}
diff --git a/proxy/src/main/java/org/apache/rocketmq/proxy/ProxyStartup.java b/proxy/src/main/java/org/apache/rocketmq/proxy/ProxyStartup.java
index 1b38a19ae6a..94e4f4ffc8b 100644
--- a/proxy/src/main/java/org/apache/rocketmq/proxy/ProxyStartup.java
+++ b/proxy/src/main/java/org/apache/rocketmq/proxy/ProxyStartup.java
@@ -55,6 +55,14 @@
public class ProxyStartup {
private static final Logger log = LoggerFactory.getLogger(LoggerName.PROXY_LOGGER_NAME);
+ /**
+ * proxy components container, manager components with method start/shutdown/...
+ * - gRPC thread pool executor
+ * - message processor (wrap broker controller)
+ * - grpc server
+ * - remoting protocol server
+ * - ...
+ */
private static final ProxyStartAndShutdown PROXY_START_AND_SHUTDOWN = new ProxyStartAndShutdown();
private static class ProxyStartAndShutdown extends AbstractStartAndShutdown {
@@ -73,8 +81,10 @@ public static void main(String[] args) {
// init thread pool monitor for proxy.
initThreadPoolMonitor();
+ // init business thread pool for grpc server
ThreadPoolExecutor executor = createServerExecutor();
+ // create message processor, wrap broker controller in local mode
MessagingProcessor messagingProcessor = createMessagingProcessor();
// tls cert update
diff --git a/proxy/src/main/java/org/apache/rocketmq/proxy/config/ProxyConfig.java b/proxy/src/main/java/org/apache/rocketmq/proxy/config/ProxyConfig.java
index 5a1a5859305..7def5314972 100644
--- a/proxy/src/main/java/org/apache/rocketmq/proxy/config/ProxyConfig.java
+++ b/proxy/src/main/java/org/apache/rocketmq/proxy/config/ProxyConfig.java
@@ -262,7 +262,7 @@ public class ProxyConfig implements ConfigFile {
private String remotingAccessAddr = "";
private int remotingListenPort = 8080;
- // related to proxy's send strategy in cluster mode.
+ // related to proxy's sending strategy in cluster mode.
private boolean sendLatencyEnable = false;
private boolean startDetectorEnable = false;
private int detectTimeout = 200;
@@ -270,9 +270,38 @@ public class ProxyConfig implements ConfigFile {
private int remotingHeartbeatThreadPoolNums = 2 * PROCESSOR_NUMBER;
private int remotingTopicRouteThreadPoolNums = 2 * PROCESSOR_NUMBER;
+ /**
+ * thread pool number for
+ * 1. send message(and send message v2)
+ * 2. send batch message
+ * 3. consume send message back
+ * 4. end transaction
+ * 5. recall message
+ */
private int remotingSendMessageThreadPoolNums = 4 * PROCESSOR_NUMBER;
+ /**
+ * thread pool number for
+ * 1. pull message
+ * 2. lite pull message
+ * 3. pop message
+ */
private int remotingPullMessageThreadPoolNums = 4 * PROCESSOR_NUMBER;
+ /**
+ * thread pool number for
+ * 1. update consumer offset
+ * 2. ack message
+ * 3. change message invisible time
+ * 4. get consumer connection list
+ */
private int remotingUpdateOffsetThreadPoolNums = 4 * PROCESSOR_NUMBER;
+ /**
+ * thread pool number for
+ * 1. unregister client
+ * 2. check client config
+ * 3. get consumer list by group
+ * 4. get min/max offset, query consume offset, search offset by timestamp
+ * 5. lock/unlock batch mq
+ */
private int remotingDefaultThreadPoolNums = 4 * PROCESSOR_NUMBER;
private int remotingHeartbeatThreadPoolQueueCapacity = 50000;
diff --git a/proxy/src/main/java/org/apache/rocketmq/proxy/grpc/v2/GrpcMessagingApplication.java b/proxy/src/main/java/org/apache/rocketmq/proxy/grpc/v2/GrpcMessagingApplication.java
index 3429ad54e27..12508d32108 100644
--- a/proxy/src/main/java/org/apache/rocketmq/proxy/grpc/v2/GrpcMessagingApplication.java
+++ b/proxy/src/main/java/org/apache/rocketmq/proxy/grpc/v2/GrpcMessagingApplication.java
@@ -73,6 +73,15 @@
import org.apache.rocketmq.proxy.grpc.v2.common.ResponseWriter;
import org.apache.rocketmq.proxy.processor.MessagingProcessor;
+/**
+ * RocketMQ gRPC protocol implementation
+ *
+ * When a batch of messages is popped, the queue offsets of the messages may not
+ * be contiguous (e.g. batch messages, ConsumeQueue compaction, filter mismatch gaps).
+ * This list records {@code actualQueueOffset - startOffset} for each message in the
+ * batch, so that the system can correctly map an ack offset back to its index within
+ * the checkpoint via {@link #indexOfAck}, and reconstruct the original offset via
+ * {@link #ackOffsetByIndex}.
+ *
+ * When this field is null or empty (old-version CK), offsets are assumed to be
+ * {@code startOffset + index}.
+ */
@JSONField(name = "d")
private List The index is used to look up the corresponding bit in the {@link #bitMap}
+ * (or in {@code PopCheckPointWrapper.bits}) and to retrieve the original
+ * queue offset via {@link #ackOffsetByIndex}.
+ *
+ * @param ackOffset the queue offset being acked
+ * @return the sub-message index (0-based), or -1 if the offset is not found
+ * in this checkpoint
+ */
public int indexOfAck(long ackOffset) {
if (ackOffset < startOffset) {
return -1;
}
- // old version of checkpoint
+ // old version of checkpoint, this will not happen in 5.*
if (queueOffsetDiff == null || queueOffsetDiff.isEmpty()) {
if (ackOffset - startOffset < num) {
@@ -184,8 +233,16 @@ public int indexOfAck(long ackOffset) {
return queueOffsetDiff.indexOf((int) (ackOffset - startOffset));
}
+ /**
+ * get original queue offset by index.
+ * the method name is miss-leading, it should be getQueueOffsetByIndex.
+ * queueOffset = startOffset + queueOffsetDiff[index]
+ *
+ * @param index sub-message index within this checkpoint (0-based)
+ * @return the original queue offset in the consume queue
+ */
public long ackOffsetByIndex(byte index) {
- // old version of checkpoint
+ // old version of checkpoint, this will not happen in 5.*
if (queueOffsetDiff == null || queueOffsetDiff.isEmpty()) {
return startOffset + index;
}
+ *
+ *
+ * @param clientHost the client address
+ * @param popTime the pop invocation timestamp
+ * @param invisibleTime the message visibility timeout
+ * @param groupId consumer group id
+ * @param topicId topic name
+ * @param queueId queue id (-1 for all queues)
+ * @param batchSize max number of messages to return
+ * @param fifo whether this is a FIFO ordered consumption
+ * @param attemptId attempt id for idempotent consumption
+ * @param initMode consume init mode (min/max)
+ * @param filter message filter expression
+ * @return a future that completes with the pop result context
+ */
public CompletableFuture
+ *
+ *
+ *
+ *
+ *
+ *
+ *
+ *
+ *
+ *
+ *
+ * @param channel the Netty channel
+ * @param request the incoming request
+ * @param brokerAllowSuspend whether the broker may suspend
+ * @return a future that completes with the response
+ * @throws RemotingCommandException if the request cannot be decoded
+ */
public CompletableFuture
+ *
+ *
+ * @param requestHeader the original request header
+ * @param reviveQid the revive queue to write to
+ * @param queueId the original queue id
+ * @param offset the message offset being extended
+ * @param popTime the new pop time (current time)
+ * @param extraInfo the extra info from the original pop request
+ * @return a future that completes with {@code true} on success
+ */
private CompletableFuture
+ *
+ *
+ *
+ *
+ *
+ *
+ *
+ *
+ *
+ *
+ */
private void scanGarbage() {
Iterator
+ *
+ *
+ *
+ *
+ *
+ * @param reviveQid revive queue id (used only for logging)
+ * @param ackMsg the ack message from the consumer
+ * @return true if the ack was merged successfully
+ */
public boolean addAk(int reviveQid, AckMsg ackMsg) {
+ // validate env
if (!brokerController.getBrokerConfig().isEnablePopBufferMerge()) {
return false;
}
if (!serving) {
return false;
}
+
try {
+ // get and validate checkpoint
PopCheckPointWrapper pointWrapper = this.buffer.get(ackMsg.getTopic() + ackMsg.getConsumerGroup() + ackMsg.getQueueId() + ackMsg.getStartOffset() + ackMsg.getPopTime() + ackMsg.getBrokerName());
if (pointWrapper == null) {
if (brokerController.getBrokerConfig().isEnablePopLog()) {
@@ -568,7 +768,8 @@ public boolean addAk(int reviveQid, AckMsg ackMsg) {
return false;
}
- if (ackMsg instanceof BatchAckMsg) {
+ // merge ackMsg with checkpoint
+ if (ackMsg instanceof BatchAckMsg) { // merge batch ackMsg
for (Long ackOffset : ((BatchAckMsg) ackMsg).getAckOffsetList()) {
int indexOfAck = point.indexOfAck(ackOffset);
if (indexOfAck > -1) {
@@ -577,7 +778,7 @@ public boolean addAk(int reviveQid, AckMsg ackMsg) {
POP_LOGGER.error("[PopBuffer]Invalid index of ack, reviveQid={}, {}, {}", reviveQid, ackMsg, point);
}
}
- } else {
+ } else { // merge ackMsg
int indexOfAck = point.indexOfAck(ackMsg.getAckOffset());
if (indexOfAck > -1) {
markBitCAS(pointWrapper.getBits(), indexOfAck);
@@ -587,6 +788,7 @@ public boolean addAk(int reviveQid, AckMsg ackMsg) {
}
}
+ // logging
if (brokerController.getBrokerConfig().isEnablePopLog()) {
POP_LOGGER.info("[PopBuffer]add ack, rqId={}, {}, {}", reviveQid, pointWrapper, ackMsg);
}
@@ -608,6 +810,12 @@ public void clearOffsetQueue(String lockKey) {
this.commitOffsets.remove(lockKey);
}
+ /**
+ * write message(checkpoint) to revive topic, then update pointWrapper related info.
+ *
+ * @param pointWrapper checkpoint
+ * @param runInCurrent async or sync
+ */
private void putCkToStore(final PopCheckPointWrapper pointWrapper, final boolean runInCurrent) {
if (pointWrapper.getReviveQueueOffset() >= 0) {
return;
@@ -617,6 +825,7 @@ private void putCkToStore(final PopCheckPointWrapper pointWrapper, final boolean
// Indicates that ck message is storing
pointWrapper.setReviveQueueOffset(Long.MAX_VALUE);
+ // default value of isAppendCkAsync is false
if (brokerController.getBrokerConfig().isAppendCkAsync() && runInCurrent) {
brokerController.getEscapeBridge().asyncPutMessageToSpecificQueue(msgInner).thenAccept(putMessageResult -> {
handleCkMessagePutResult(putMessageResult, pointWrapper);
@@ -655,7 +864,21 @@ private void handleCkMessagePutResult(PutMessageResult putMessageResult, final P
}
}
+ /**
+ * Persist message which created by checkpoint to the revive topic.
+ *
+ *
+ *
+ *
+ * @param pointWrapper the checkpoint wrapper containing the original CK
+ * @param msgIndex the sub-message index within the CK batch to ack
+ * @param count atomic counter incremented on successful persistence
+ */
private void putAckToStore(final PopCheckPointWrapper pointWrapper, byte msgIndex, AtomicInteger count) {
+ // build ackMsg and Message by checkpoint
PopCheckPoint point = pointWrapper.getCk();
MessageExtBrokerInner msgInner = new MessageExtBrokerInner();
final AckMsg ackMsg = new AckMsg();
@@ -679,7 +902,8 @@ private void putAckToStore(final PopCheckPointWrapper pointWrapper, byte msgInde
msgInner.setPropertiesString(MessageDecoder.messageProperties2String(msgInner.getProperties()));
- if (brokerController.getBrokerConfig().isAppendAckAsync()) {
+ // store message then change store status of the checkpoint
+ if (brokerController.getBrokerConfig().isAppendAckAsync()) { // default value is false
brokerController.getEscapeBridge().asyncPutMessageToSpecificQueue(msgInner).thenAccept(putMessageResult -> {
handleAckPutMessageResult(ackMsg, putMessageResult, pointWrapper, count, msgIndex);
}).exceptionally(throwable -> {
@@ -687,11 +911,22 @@ private void putAckToStore(final PopCheckPointWrapper pointWrapper, byte msgInde
return null;
});
} else {
+ // store message
PutMessageResult putMessageResult = brokerController.getEscapeBridge().putMessageToSpecificQueue(msgInner);
+ // change store status of the checkpoint
handleAckPutMessageResult(ackMsg, putMessageResult, pointWrapper, count, msgIndex);
}
}
+ /**
+ * update store status of checkpoint if revive message stored successfully.
+ *
+ * @param ackMsg the ack message that was persisted
+ * @param putMessageResult the result returned by the store
+ * @param pointWrapper the checkpoint wrapper being processed
+ * @param count atomic counter incremented on success
+ * @param msgIndex the sub-message index that was persisted
+ */
private void handleAckPutMessageResult(AckMsg ackMsg, PutMessageResult putMessageResult,
PopCheckPointWrapper pointWrapper, AtomicInteger count, byte msgIndex) {
brokerController.getBrokerMetricsManager().getPopMetricsManager().incPopReviveAckPutCount(ackMsg, putMessageResult.getPutMessageStatus());
@@ -797,6 +1032,17 @@ private boolean cancelCkTimer(final PopCheckPointWrapper pointWrapper) {
return true;
}
+ /**
+ * Check whether all sub-messages in the checkpoint have been acked.
+ *
+ *
+ *
+ */
private volatile long reviveQueueOffset;
private final PopCheckPoint ck;
- // bit for concurrent
+ // store ack states of messages, one byte for each message
private final AtomicInteger bits;
- // bit for stored buffer ak
+ // bits for stored buffer ak, one byte for each message
private final AtomicInteger toStoreBits;
+ // nextOffset of original topic
private final long nextBeginOffset;
+ // topic@group@queueId
private final String lockKey;
+ // topic + group + queueId + startOffset + popTime + brokerName
private final String mergeKey;
+ /**
+ * Whether this checkpoint should be written to the revive topic directly.
+ *
+ *
+ *
+ *
+ * @see PopBufferMergeService#addCkJustOffset
+ * @see PopBufferMergeService#addCkMock
+ */
private final boolean justOffset;
+ // whether check point has stored in revive queue
private volatile boolean ckStored = false;
public PopCheckPointWrapper(int reviveQueueId, long reviveQueueOffset, PopCheckPoint point,
diff --git a/broker/src/main/java/org/apache/rocketmq/broker/processor/PopMessageProcessor.java b/broker/src/main/java/org/apache/rocketmq/broker/processor/PopMessageProcessor.java
index c32e1b5ae23..e863ae7ac79 100644
--- a/broker/src/main/java/org/apache/rocketmq/broker/processor/PopMessageProcessor.java
+++ b/broker/src/main/java/org/apache/rocketmq/broker/processor/PopMessageProcessor.java
@@ -33,6 +33,7 @@
import org.apache.rocketmq.broker.longpolling.PopRequest;
import org.apache.rocketmq.broker.pagecache.ManyMessageTransfer;
import org.apache.rocketmq.broker.pop.PopConsumerContext;
+import org.apache.rocketmq.broker.pop.PopConsumerService;
import org.apache.rocketmq.common.BrokerConfig;
import org.apache.rocketmq.common.KeyBuilder;
import org.apache.rocketmq.common.MixAll;
@@ -99,6 +100,24 @@
import static org.apache.rocketmq.remoting.metrics.RemotingMetricsConstant.LABEL_RESPONSE_CODE;
import static org.apache.rocketmq.remoting.metrics.RemotingMetricsConstant.LABEL_RESULT;
+/**
+ * Processes PopMessage requests from consumers.
+ *
+ *
+ *
+ *
+ *
+ *
+ *
+ * @param ctx the Netty channel handler context
+ * @param request the incoming PopMessage request
+ * @return the response, or {@code null} if the response is sent asynchronously
+ * (zero-copy path or long-polling suspension)
+ * @throws RemotingCommandException if the request cannot be decoded
+ */
@Override
public RemotingCommand processRequest(final ChannelHandlerContext ctx, RemotingCommand request)
throws RemotingCommandException {
+ // init request and response
final long beginTimeMills = this.brokerController.getMessageStore().now();
Channel channel = ctx.channel();
+
RemotingCommand response = RemotingCommand.createResponseCommand(PopMessageResponseHeader.class);
response.setOpaque(request.getOpaque());
@@ -240,6 +282,7 @@ public RemotingCommand processRequest(final ChannelHandlerContext ctx, RemotingC
}
final PopMessageResponseHeader responseHeader = (PopMessageResponseHeader) response.readCustomHeader();
+ // validation
// Pop mode only supports consumption in cluster load balancing mode
brokerController.getConsumerManager().compensateBasicConsumerInfo(
requestHeader.getConsumerGroup(), ConsumeType.CONSUME_POP, MessageModel.CLUSTERING);
@@ -319,6 +362,7 @@ public RemotingCommand processRequest(final ChannelHandlerContext ctx, RemotingC
return response;
}
+ // init filter
BrokerConfig brokerConfig = brokerController.getBrokerConfig();
SubscriptionData subscriptionData = null;
ExpressionMessageFilter messageFilter = null;
@@ -382,6 +426,9 @@ public RemotingCommand processRequest(final ChannelHandlerContext ctx, RemotingC
ExpressionMessageFilter finalMessageFilter = messageFilter;
SubscriptionData finalSubscriptionData = subscriptionData;
+ // There are two type of ack mode:
+ // 1. ack by KV service
+ // 2. ack by file merge service, default mode
if (brokerConfig.isPopConsumerKVServiceEnable()) {
CompletableFuture
+ *
+ *
+ * @param topicConfig topic configuration; {@code null} skips all queues
+ * @param isRetry whether the topic is a retry topic
+ * @param getMessageResult accumulator for the messages popped so far
+ * @param requestHeader pop request parameters
+ * @param reviveQid revive queue id
+ * @param channel netty channel of the requesting client
+ * @param popTime pop timestamp
+ * @param messageFilter expression filter applied to each message
+ * @param startOffsetInfo buffer for offset tracing info
+ * @param msgOffsetInfo buffer for per-message offset tracing info
+ * @param orderCountInfo buffer for order-consume count info
+ * @param randomQ random queue offset for round-robin load balancing
+ * @param getMessageFuture future that carries the remaining message count
+ * @return a future completing with the remaining number of messages needed
+ */
private CompletableFuture
+ *
+ *
+ *
+ *
+ *
+ *
+ *
+ *
+ *
+ *
+ *
+ *
+ *
+ *
+ * @param consumeReviveObj the mutable container that receives the collected
+ * CKs and the computed {@code endTime}
+ */
protected void consumeReviveMessage(ConsumeReviveObj consumeReviveObj) {
+ // init context parameters
HashMap
+ *
+ *
+ *
+ *
+ *
+ * @param popCheckPoint the checkpoint whose un-acked messages should be revived
+ */
private void reviveMsgFromCk(PopCheckPoint popCheckPoint) {
+ // env check and init
if (!shouldRunPopRevive) {
POP_LOGGER.info("slave skip retry, revive topic={}, reviveQueueId={}", reviveTopic, queueId);
return;
}
inflightReviveRequestMap.put(popCheckPoint, new Pair<>(System.currentTimeMillis(), false));
List
+ *
+ *
+ *
+ *
+ */
@Override
public void run() {
int slow = 1;
while (!this.isStopped()) {
try {
+ // env check
if (System.currentTimeMillis() < brokerController.getShouldStartTime()) {
POP_LOGGER.info("PopReviveService Ready to run after {}", brokerController.getShouldStartTime());
this.waitForRunning(1000);
@@ -676,6 +903,8 @@ public void run() {
}
POP_LOGGER.info("start revive topic={}, reviveQueueId={}", reviveTopic, queueId);
+
+ // consume revive message
ConsumeReviveObj consumeReviveObj = new ConsumeReviveObj();
consumeReviveMessage(consumeReviveObj);
@@ -684,8 +913,10 @@ public void run() {
continue;
}
+ // merge checkpoint and ackMsg then revive
mergeAndRevive(consumeReviveObj);
+ // wait and logging
ArrayList
+ *
+ */
public class GrpcMessagingApplication extends MessagingServiceGrpc.MessagingServiceImplBase implements StartAndShutdown {
private final static Logger log = LoggerFactory.getLogger(LoggerName.PROXY_LOGGER_NAME);
@@ -168,6 +177,16 @@ protected Status convertExceptionToStatus(Throwable t) {
return ResponseBuilder.getInstance().buildStatus(t);
}
+ /**
+ * submit grpc task to related thread pool.
+ *
+ * @param executor thread pool
+ * @param context context
+ * @param request grpc request
+ * @param runnable process task
+ * @param responseObserver grpc response observer
+ * @param statusResponseCreator error response creator
+ */
protected
+ *
+ */
@Override
public StreamObserver> sendMessage(ProxyContext ctx, QueueSelector queueSelector,
String producerGroup, int sysFlag, List
> future = new CompletableFuture<>();
@@ -96,6 +104,7 @@ public CompletableFuture
> sendMessage(ProxyContext ctx, QueueSe
SendMessageRequestHeader requestHeader = buildSendMessageRequestHeader(messageList, producerGroup, sysFlag, messageQueue.getQueueId());
AddressableMessageQueue finalMessageQueue = messageQueue;
+ // call SendMessageProcessor of broker
future = this.serviceManager.getMessageService().sendMessage(
ctx,
messageQueue,
diff --git a/proxy/src/main/java/org/apache/rocketmq/proxy/service/message/MessageService.java b/proxy/src/main/java/org/apache/rocketmq/proxy/service/message/MessageService.java
index 1e828c36fd9..a92010d45b4 100644
--- a/proxy/src/main/java/org/apache/rocketmq/proxy/service/message/MessageService.java
+++ b/proxy/src/main/java/org/apache/rocketmq/proxy/service/message/MessageService.java
@@ -47,6 +47,9 @@
public interface MessageService {
+ /**
+ * call SendMessageProcessor of broker
+ */
CompletableFuture
> sendMessage(
ProxyContext ctx,
AddressableMessageQueue messageQueue,
diff --git a/store/src/main/java/org/apache/rocketmq/store/DefaultMessageStore.java b/store/src/main/java/org/apache/rocketmq/store/DefaultMessageStore.java
index aee767dae2f..cb8389111a0 100644
--- a/store/src/main/java/org/apache/rocketmq/store/DefaultMessageStore.java
+++ b/store/src/main/java/org/apache/rocketmq/store/DefaultMessageStore.java
@@ -900,19 +900,19 @@ public GetMessageResult getMessage(final String group, final String topic, final
minOffset = consumeQueue.getMinOffsetInQueue();
maxOffset = consumeQueue.getMaxOffsetInQueue();
- if (maxOffset == 0) {
+ if (maxOffset == 0) { // empty queue
status = GetMessageStatus.NO_MESSAGE_IN_QUEUE;
nextBeginOffset = nextOffsetCorrection(offset, 0);
- } else if (offset < minOffset) {
+ } else if (offset < minOffset) { // offset too small
status = GetMessageStatus.OFFSET_TOO_SMALL;
nextBeginOffset = nextOffsetCorrection(offset, minOffset);
- } else if (offset == maxOffset) {
+ } else if (offset == maxOffset) { // offset overflow one
status = GetMessageStatus.OFFSET_OVERFLOW_ONE;
nextBeginOffset = nextOffsetCorrection(offset, offset);
- } else if (offset > maxOffset) {
+ } else if (offset > maxOffset) { // offset too big
status = GetMessageStatus.OFFSET_OVERFLOW_BADLY;
nextBeginOffset = nextOffsetCorrection(offset, maxOffset);
- } else {
+ } else { // offset is ok
final int maxFilterMessageSize = Math.max(this.messageStoreConfig.getMaxFilterMessageSize(), maxMsgNums * consumeQueue.getUnitSize());
final boolean diskFallRecorded = this.messageStoreConfig.isDiskFallRecorded();
@@ -925,6 +925,14 @@ public GetMessageResult getMessage(final String group, final String topic, final
long maxPhyOffsetPulling = 0;
int cqFileNum = 0;
+ /*
+ * bufferTotalSize is the total message size
+ * bufferTotalSize less than 0 means
+ * the while loop will break after getting more than one messages
+ *
+ * travelCqFileNumWhenGetMessage limits the max file nums to travel when get message
+ * default is 1
+ */
while (getResult.getBufferTotalSize() <= 0
&& nextBeginOffset < maxOffset
&& cqFileNum++ < this.messageStoreConfig.getTravelCqFileNumWhenGetMessage()) {
diff --git a/store/src/main/java/org/apache/rocketmq/store/GetMessageResult.java b/store/src/main/java/org/apache/rocketmq/store/GetMessageResult.java
index 6f322a19e19..980f9a7bc89 100644
--- a/store/src/main/java/org/apache/rocketmq/store/GetMessageResult.java
+++ b/store/src/main/java/org/apache/rocketmq/store/GetMessageResult.java
@@ -21,15 +21,23 @@
import java.util.Collections;
import java.util.List;
+/**
+ * result of get message
+ */
public class GetMessageResult {
-
+ // mappedFile info list
private final List