diff --git a/broker/src/main/java/org/apache/rocketmq/broker/longpolling/PopLongPollingService.java b/broker/src/main/java/org/apache/rocketmq/broker/longpolling/PopLongPollingService.java index c595178d193..134887f9ad4 100644 --- a/broker/src/main/java/org/apache/rocketmq/broker/longpolling/PopLongPollingService.java +++ b/broker/src/main/java/org/apache/rocketmq/broker/longpolling/PopLongPollingService.java @@ -52,6 +52,24 @@ import static org.apache.rocketmq.broker.longpolling.PollingResult.POLLING_SUC; import static org.apache.rocketmq.broker.longpolling.PollingResult.POLLING_TIMEOUT; +/** + * Pop-mode long polling service that suspends Pop requests and wakes them up when new messages arrive. + *

+ * Core responsibilities: + *

+ */ public class PopLongPollingService extends ServiceThread { private static final Logger POP_LOGGER = diff --git a/broker/src/main/java/org/apache/rocketmq/broker/pop/PopConsumerService.java b/broker/src/main/java/org/apache/rocketmq/broker/pop/PopConsumerService.java index 9ab5eb651be..e3f4f1e21c6 100644 --- a/broker/src/main/java/org/apache/rocketmq/broker/pop/PopConsumerService.java +++ b/broker/src/main/java/org/apache/rocketmq/broker/pop/PopConsumerService.java @@ -298,6 +298,31 @@ public boolean isFifoBlocked(PopConsumerContext context, String groupId, String context.getAttemptId(), topicId, groupId, queueId, context.getInvisibleTime()); } + /** + * Fetch messages from a single queue and append them to the pop context. + * + *

Chained via {@link CompletableFuture#thenCompose} from + * {@link #getMessageFromTopicAsync}. When the batch is already full + * ({@code remain <= 0}), the pending count is added to the context and + * the chain stops. Otherwise, messages are fetched from the store and + * the result is merged into the context via {@link #handleGetMessageResult}. + * + *

Early termination can occur inside this method when: + *

+ * + * @param future the accumulator future carrying the pop context + * @param clientHost the client address + * @param groupId consumer group id + * @param topicId topic name + * @param queueId queue id + * @param batchSize max number of messages still needed + * @param filter message filter + * @param retryType whether this is a retry topic V1/V2 + * @return a future completing with the pop context updated with results + */ protected CompletableFuture getMessageAsync(CompletableFuture future, String clientHost, String groupId, String topicId, int queueId, int batchSize, MessageFilter filter, PopConsumerRecord.RetryType retryType) { @@ -335,13 +360,38 @@ protected CompletableFuture getMessageAsync(CompletableFutur }); } + /** + * Fetch messages from every read queue of a topic via a CompletableFuture chain. + * + *

Each queue is visited once. For each queue the + * {@link #getMessageAsync(CompletableFuture, String, String, String, int, int, MessageFilter, PopConsumerRecord.RetryType)} + * method is chained via {@link CompletableFuture#thenCompose}. The chain carries + * the accumulated result through all queues, stopping early when the batch is + * filled, the queue is blocked, or the inflight threshold is reached. + * + *

Queue iteration order respects {@code priorityOrderAsc} and uses + * {@code requestCount} as a round-robin offset for load balancing. + * + * @param future the accumulator future + * @param clientHost the client address + * @param groupId consumer group id + * @param topicId topic name + * @param requestCount round-robin counter for queue selection + * @param batchSize max number of messages to return + * @param filter message filter expression + * @param retryType whether this is a retry topic V1/V2 + * @return a future completing with the pop result context + */ protected CompletableFuture getMessageFromTopicAsync(CompletableFuture future, String clientHost, String groupId, String topicId, long requestCount, int batchSize, MessageFilter filter, PopConsumerRecord.RetryType retryType) { + // get topic config TopicConfig topicConfig = this.brokerController.getTopicConfigManager().selectTopicConfig(topicId); if (null == topicConfig) { return future; } + + // iterate all queues of the topic for (int i = 0; i < topicConfig.getReadQueueNums(); i++) { long index = (brokerController.getBrokerConfig().isPriorityOrderAsc() ? topicConfig.getReadQueueNums() - 1 - i : i) + requestCount; @@ -352,10 +402,38 @@ protected CompletableFuture getMessageFromTopicAsync(Complet return future; } + /** + * Asynchronously pop messages for the KVStore-based ack path. + * + *

This method coordinates the full Pop lifecycle: + *

    + *
  1. Validates topic, group, and acquires the consumer lock
  2. + *
  3. Determines whether to pull from retry topic first + * (based on {@code popFromRetryProbability})
  4. + *
  5. Pulls messages from normal topic (and retry topic V1/V2 if configured)
  6. + *
  7. Writes checkpoints to {@link PopConsumerCache} (buffer merge) or + * {@link PopConsumerKVStore} (RocksDB)
  8. + *
  9. Re-encodes retry messages if needed
  10. + *
+ * + * @param clientHost the client address + * @param popTime the pop invocation timestamp + * @param invisibleTime the message visibility timeout + * @param groupId consumer group id + * @param topicId topic name + * @param queueId queue id (-1 for all queues) + * @param batchSize max number of messages to return + * @param fifo whether this is a FIFO ordered consumption + * @param attemptId attempt id for idempotent consumption + * @param initMode consume init mode (min/max) + * @param filter message filter expression + * @return a future that completes with the pop result context + */ public CompletableFuture popAsync(String clientHost, long popTime, long invisibleTime, String groupId, String topicId, int queueId, int batchSize, boolean fifo, String attemptId, int initMode, MessageFilter filter) { + // init context params PopConsumerContext popConsumerContext = new PopConsumerContext(clientHost, popTime, invisibleTime, groupId, fifo, initMode, attemptId); @@ -391,18 +469,22 @@ public CompletableFuture popAsync(String clientHost, long po CompletableFuture.completedFuture(popConsumerContext); try { + // get message from retry topic, if (!fifo && preferRetry) { + // default config of retrieveMessageFromPopRetryTopicV1 is true, if (brokerConfig.isRetrieveMessageFromPopRetryTopicV1()) { getMessageFuture = this.getMessageFromTopicAsync(getMessageFuture, clientHost, groupId, retryTopicV1, requestCount, batchSize, filter, PopConsumerRecord.RetryType.RETRY_TOPIC_V1); } + // default config of enableRetryTopicV2 is false if (brokerConfig.isEnableRetryTopicV2()) { getMessageFuture = this.getMessageFromTopicAsync(getMessageFuture, clientHost, groupId, retryTopicV2, requestCount, batchSize, filter, PopConsumerRecord.RetryType.RETRY_TOPIC_V2); } } + // get message from normal topic if (queueId != -1) { getMessageFuture = this.getMessageAsync(getMessageFuture, clientHost, groupId, topicId, queueId, batchSize, filter, PopConsumerRecord.RetryType.NORMAL_TOPIC); @@ -410,6 +492,7 @@ public CompletableFuture popAsync(String clientHost, long po getMessageFuture = this.getMessageFromTopicAsync(getMessageFuture, clientHost, groupId, topicId, requestCount, batchSize, filter, PopConsumerRecord.RetryType.NORMAL_TOPIC); + // get message from retry topic if (!fifo && !preferRetry) { if (brokerConfig.isRetrieveMessageFromPopRetryTopicV1()) { getMessageFuture = this.getMessageFromTopicAsync(getMessageFuture, clientHost, groupId, @@ -425,6 +508,8 @@ public CompletableFuture popAsync(String clientHost, long po return getMessageFuture.thenCompose(result -> { if (result.isFound() && !result.isFifo()) { + // write checkpoint to cache or store + // default config of enablePopBufferMerge is false if (brokerConfig.isEnablePopBufferMerge() && popConsumerCache != null && !popConsumerCache.isCacheFull()) { this.popConsumerCache.writeRecords(result.getPopConsumerRecordList()); @@ -432,6 +517,7 @@ public CompletableFuture popAsync(String clientHost, long po this.popConsumerStore.writeRecords(result.getPopConsumerRecordList()); } + // format result for (int i = 0; i < result.getGetMessageResultList().size(); i++) { GetMessageResult getMessageResult = result.getGetMessageResultList().get(i); PopConsumerRecord popConsumerRecord = result.getPopConsumerRecordList().get(i); @@ -449,6 +535,7 @@ public CompletableFuture popAsync(String clientHost, long po } return CompletableFuture.completedFuture(result); }).whenComplete((result, throwable) -> { + // unlock by consumerLockService try { if (throwable != null) { log.error("PopConsumerService popAsync get message error", diff --git a/broker/src/main/java/org/apache/rocketmq/broker/processor/AckMessageProcessor.java b/broker/src/main/java/org/apache/rocketmq/broker/processor/AckMessageProcessor.java index 34a790efca7..7e044c38db3 100644 --- a/broker/src/main/java/org/apache/rocketmq/broker/processor/AckMessageProcessor.java +++ b/broker/src/main/java/org/apache/rocketmq/broker/processor/AckMessageProcessor.java @@ -26,6 +26,7 @@ import org.apache.rocketmq.broker.lite.LiteMetadataUtil; import org.apache.rocketmq.broker.offset.ConsumerOffsetManager; import org.apache.rocketmq.broker.pop.PopConsumerLockService; +import org.apache.rocketmq.broker.pop.PopConsumerService; import org.apache.rocketmq.broker.pop.orderly.ConsumerOrderInfoManager; import org.apache.rocketmq.common.KeyBuilder; import org.apache.rocketmq.common.PopAckConstants; @@ -54,6 +55,27 @@ import org.apache.rocketmq.store.pop.AckMsg; import org.apache.rocketmq.store.pop.BatchAckMsg; +/** + * Processes consumer ack messages in Pop consumption mode. + * + *

Handles both single ({@link RequestCode#ACK_MESSAGE}) and batch + * ({@link RequestCode#BATCH_ACK_MESSAGE}) acks. Each ack is processed + * through one of two paths: + *

    + *
  • KVStore path ({@code popConsumerKVServiceEnable=true}) — + * delegates to {@link PopConsumerService#ackAsync}
  • + *
  • File-based path — tries {@link PopBufferMergeService#addAk} + * first; if the buffer merge is not available, writes the ack as a + * message to the system revive topic
  • + *
+ * + *

Orderly ack is handled separately by {@link #ackOrderly} / + * {@link #ackOrderlyNew}, which update the consumer order info and advance + * the consumer offset while notifying any long-polling waiters. + * + *

This class also owns and manages the {@link PopReviveService} instances + * for the file-based revive path. + */ public class AckMessageProcessor implements NettyRequestProcessor { private static final Logger POP_LOGGER = LoggerFactory.getLogger(LoggerName.ROCKETMQ_POP_LOGGER_NAME); @@ -121,13 +143,36 @@ public boolean rejectRequest() { return false; } + /** + * Process an ack request (single or batch). + * + *

Routes to one of two paths based on {@code popConsumerKVServiceEnable}: + *

    + *
  • {@code true} — {@link #appendAckNew} (KVStore path, delegates to + * {@link PopConsumerService#ackAsync})
  • + *
  • {@code false} — {@link #appendAck} (file-based path, tries + * {@link PopBufferMergeService#addAk} first, then writes to revive topic)
  • + *
+ * + *

Orderly acks ({@code rqId == POP_ORDER_REVIVE_QUEUE}) are handled by + * {@link #ackOrderly} / {@link #ackOrderlyNew} instead. + * + * @param channel the Netty channel of the requesting client + * @param request the incoming request + * @param brokerAllowSuspend whether the broker may suspend the request + * @return the response to send back to the client + * @throws RemotingCommandException if the request cannot be decoded + */ private RemotingCommand processRequest(final Channel channel, RemotingCommand request, boolean brokerAllowSuspend) throws RemotingCommandException { + // init context params AckMessageRequestHeader requestHeader; BatchAckMessageRequestBody reqBody = null; final RemotingCommand response = RemotingCommand.createResponseCommand(ResponseCode.SUCCESS, null); response.setOpaque(request.getOpaque()); + if (request.getCode() == RequestCode.ACK_MESSAGE) { + // decode and validate request requestHeader = (AckMessageRequestHeader) request.decodeCommandCustomHeader(AckMessageRequestHeader.class); TopicConfig topicConfig = this.brokerController.getTopicConfigManager().selectTopicConfig(requestHeader.getTopic()); @@ -167,12 +212,15 @@ private RemotingCommand processRequest(final Channel channel, RemotingCommand re response.setRemark(errorInfo); return response; } + + // append ack, default mode is file based merge, call appendAck if (brokerController.getBrokerConfig().isPopConsumerKVServiceEnable()) { appendAckNew(requestHeader, null, response, channel, null); } else { appendAck(requestHeader, null, response, channel, null); } } else if (request.getCode() == RequestCode.BATCH_ACK_MESSAGE) { + // decode and validate request if (request.getBody() != null) { reqBody = BatchAckMessageRequestBody.decode(request.getBody(), BatchAckMessageRequestBody.class); } @@ -180,7 +228,10 @@ private RemotingCommand processRequest(final Channel channel, RemotingCommand re response.setCode(ResponseCode.NO_MESSAGE); return response; } + + // process each ack for (BatchAck bAck : reqBody.getAcks()) { + // default value of popConsumerKVServiceEnable is false if (brokerController.getBrokerConfig().isPopConsumerKVServiceEnable()) { appendAckNew(null, bAck, response, channel, reqBody.getBrokerName()); } else { @@ -188,6 +239,7 @@ private RemotingCommand processRequest(final Channel channel, RemotingCommand re } } } else { + // unsupported request, logging and return POP_LOGGER.error("AckMessageProcessor failed to process RequestCode: {}, consumer: {} ", request.getCode(), RemotingHelper.parseChannelRemoteAddr(channel)); response.setCode(ResponseCode.MESSAGE_ILLEGAL); response.setRemark(String.format("AckMessageProcessor failed to process RequestCode: %d", request.getCode())); @@ -196,8 +248,31 @@ private RemotingCommand processRequest(final Channel channel, RemotingCommand re return response; } + /** + * Append an ack (single or batch) in the file-based path. + * + *

For single ack: parses the extra info from the request header, + * routes orderly acks to {@link #ackOrderly}, or creates a single {@link AckMsg}. + * + *

For batch ack: expands the {@link BitSet} from the + * {@link BatchAck} into individual offsets, routes orderly acks individually, + * and packs the remaining offsets into a {@link BatchAckMsg}. + * + *

The ack is first offered to {@link PopBufferMergeService#addAk}. + * If the buffer merge is not available, the ack is serialized as JSON and + * written to the revive topic with tag {@link PopAckConstants#ACK_TAG} + * or {@link PopAckConstants#BATCH_ACK_TAG}. + * + * @param requestHeader the single-ack request header (null for batch) + * @param batchAck the batch ack body (null for single) + * @param response the response to modify on error + * @param channel the Netty channel + * @param brokerName the broker name + * @throws RemotingCommandException if offset validation fails + */ private void appendAck(final AckMessageRequestHeader requestHeader, final BatchAck batchAck, final RemotingCommand response, final Channel channel, String brokerName) throws RemotingCommandException { + // init context params String[] extraInfo; String consumeGroup, topic; int qId, rqId; @@ -205,8 +280,11 @@ private void appendAck(final AckMessageRequestHeader requestHeader, final BatchA long popTime, invisibleTime; AckMsg ackMsg; int ackCount = 0; + + // ack orderly or set context params if (batchAck == null) { // single ack + // set context params extraInfo = ExtraInfoUtil.split(requestHeader.getExtraInfo()); brokerName = ExtraInfoUtil.getBrokerName(extraInfo); consumeGroup = requestHeader.getConsumerGroup(); @@ -218,15 +296,18 @@ private void appendAck(final AckMessageRequestHeader requestHeader, final BatchA popTime = ExtraInfoUtil.getPopTime(extraInfo); invisibleTime = ExtraInfoUtil.getInvisibleTime(extraInfo); + // ack orderly if revive queue if (rqId == KeyBuilder.POP_ORDER_REVIVE_QUEUE) { ackOrderly(topic, consumeGroup, qId, ackOffset, popTime, invisibleTime, channel, response); return; } + // set ackMsg and ackCount ackMsg = new AckMsg(); ackCount = 1; } else { // batch ack + // set context params consumeGroup = batchAck.getConsumerGroup(); topic = ExtraInfoUtil.getRealTopic(batchAck.getTopic(), batchAck.getConsumerGroup(), batchAck.getRetry()); qId = batchAck.getQueueId(); @@ -236,6 +317,7 @@ private void appendAck(final AckMessageRequestHeader requestHeader, final BatchA popTime = batchAck.getPopTime(); invisibleTime = batchAck.getInvisibleTime(); + // offset check long minOffset = this.brokerController.getMessageStore().getMinOffsetInQueue(topic, qId); long maxOffset; try { @@ -248,6 +330,7 @@ private void appendAck(final AckMessageRequestHeader requestHeader, final BatchA return; } + // ack orderly or add offset to batchAckMsg BatchAckMsg batchAckMsg = new BatchAckMsg(); BitSet bitSet = batchAck.getBitSet(); for (int i = bitSet.nextSetBit(0); i >= 0; i = bitSet.nextSetBit(i + 1)) { @@ -264,10 +347,13 @@ private void appendAck(final AckMessageRequestHeader requestHeader, final BatchA batchAckMsg.getAckOffsetList().add(offset); } } + + // skip if empty or is revive queue if (rqId == KeyBuilder.POP_ORDER_REVIVE_QUEUE || batchAckMsg.getAckOffsetList().isEmpty()) { return; } + // set ackMsg and ackCount ackMsg = batchAckMsg; ackCount = batchAckMsg.getAckOffsetList().size(); } @@ -275,6 +361,7 @@ private void appendAck(final AckMessageRequestHeader requestHeader, final BatchA this.brokerController.getBrokerStatsManager().incBrokerAckNums(ackCount); this.brokerController.getBrokerStatsManager().incGroupAckNums(consumeGroup, topic, ackCount); + // set ackMsg ackMsg.setConsumerGroup(consumeGroup); ackMsg.setTopic(topic); ackMsg.setQueueId(qId); @@ -283,11 +370,13 @@ private void appendAck(final AckMessageRequestHeader requestHeader, final BatchA ackMsg.setPopTime(popTime); ackMsg.setBrokerName(brokerName); + // add ackMsg if (this.brokerController.getPopMessageProcessor().getPopBufferMergeService().addAk(rqId, ackMsg)) { brokerController.getPopInflightMessageCounter().decrementInFlightMessageNum(topic, consumeGroup, popTime, qId, ackCount); return; } + // create revive message by ackMsg, if add ackMsg failed MessageExtBrokerInner msgInner = new MessageExtBrokerInner(); msgInner.setTopic(reviveTopic); msgInner.setBody(JSON.toJSONString(ackMsg).getBytes(StandardCharsets.UTF_8)); @@ -305,7 +394,9 @@ private void appendAck(final AckMessageRequestHeader requestHeader, final BatchA msgInner.setDeliverTimeMs(popTime + invisibleTime); msgInner.getProperties().put(MessageConst.PROPERTY_UNIQ_CLIENT_MESSAGE_ID_KEYIDX, PopMessageProcessor.genAckUniqueId(ackMsg)); msgInner.setPropertiesString(MessageDecoder.messageProperties2String(msgInner.getProperties())); - if (brokerController.getBrokerConfig().isAppendAckAsync()) { + + // store revive message + if (brokerController.getBrokerConfig().isAppendAckAsync()) { // default is false int finalAckCount = ackCount; this.brokerController.getEscapeBridge().asyncPutMessageToSpecificQueue(msgInner).thenAccept(putMessageResult -> { handlePutMessageResult(putMessageResult, ackMsg, topic, consumeGroup, popTime, qId, finalAckCount); diff --git a/broker/src/main/java/org/apache/rocketmq/broker/processor/ChangeInvisibleTimeProcessor.java b/broker/src/main/java/org/apache/rocketmq/broker/processor/ChangeInvisibleTimeProcessor.java index 5ff132ca237..5de0c3eac78 100644 --- a/broker/src/main/java/org/apache/rocketmq/broker/processor/ChangeInvisibleTimeProcessor.java +++ b/broker/src/main/java/org/apache/rocketmq/broker/processor/ChangeInvisibleTimeProcessor.java @@ -27,6 +27,7 @@ import org.apache.rocketmq.broker.offset.ConsumerOffsetManager; import org.apache.rocketmq.broker.offset.MemoryConsumerOrderInfoManager; import org.apache.rocketmq.broker.pop.PopConsumerLockService; +import org.apache.rocketmq.broker.pop.PopConsumerService; import org.apache.rocketmq.broker.pop.orderly.ConsumerOrderInfoManager; import org.apache.rocketmq.common.PopAckConstants; import org.apache.rocketmq.common.TopicConfig; @@ -52,6 +53,23 @@ import org.apache.rocketmq.store.pop.AckMsg; import org.apache.rocketmq.store.pop.PopCheckPoint; +/** + * Processes the nack {@code ChangeInvisibleTime} request from consumers. + * + *

When a consumer needs more time to process a message (or wants to + * suspend/nack it), this processor updates the message's visibility + * timeout. The implementation varies by the ack mode: + *

    + *
  • KVStore path — delegates to + * {@link PopConsumerService#changeInvisibilityDuration}
  • + *
  • File-based path — writes a new CK to the revive topic with + * the updated invisible time, then acks the original CK so that + * the message will not be revived until the new timeout expires
  • + *
+ * + *

For orderly consumption, the next visible time is updated directly in + * the {@link ConsumerOrderInfoManager} without writing to the revive topic. + */ public class ChangeInvisibleTimeProcessor implements NettyRequestProcessor { private static final Logger POP_LOGGER = LoggerFactory.getLogger(LoggerName.ROCKETMQ_POP_LOGGER_NAME); private final BrokerController brokerController; @@ -76,8 +94,12 @@ public boolean rejectRequest() { private RemotingCommand processRequest(final Channel channel, RemotingCommand request, boolean brokerAllowSuspend) throws RemotingCommandException { + // process request async CompletableFuture responseFuture = processRequestAsync(channel, request, brokerAllowSuspend); + // process response sync or a sync + // default value of appendCkAsync is false + // default value of appendAckAsync is false if (brokerController.getBrokerConfig().isAppendCkAsync() && brokerController.getBrokerConfig().isAppendAckAsync()) { responseFuture.thenAccept(response -> doResponse(channel, request, response)).exceptionally(throwable -> { RemotingCommand response = RemotingCommand.createResponseCommand(ChangeInvisibleTimeResponseHeader.class); @@ -102,8 +124,27 @@ private RemotingCommand processRequest(final Channel channel, RemotingCommand re return null; } + /** + * Asynchronously process a ChangeInvisibleTime request. + * + *

Routes to the appropriate handler based on message type: + *

    + *
  • Lite message — {@link #processChangeInvisibleTimeForLite}
  • + *
  • KVStore path + orderly — {@link #processChangeInvisibleTimeForOrderNew}
  • + *
  • KVStore path + non-orderly — {@link PopConsumerService#changeInvisibilityDuration}
  • + *
  • File-based path + orderly — {@link #processChangeInvisibleTimeForOrder}
  • + *
  • File-based path + non-orderly — {@link #appendCheckPointThenAckOrigin}
  • + *
+ * + * @param channel the Netty channel + * @param request the incoming request + * @param brokerAllowSuspend whether the broker may suspend + * @return a future that completes with the response + * @throws RemotingCommandException if the request cannot be decoded + */ public CompletableFuture processRequestAsync(final Channel channel, RemotingCommand request, boolean brokerAllowSuspend) throws RemotingCommandException { + // decode and validate request final ChangeInvisibleTimeRequestHeader requestHeader = (ChangeInvisibleTimeRequestHeader) request.decodeCommandCustomHeader(ChangeInvisibleTimeRequestHeader.class); RemotingCommand response = RemotingCommand.createResponseCommand(ChangeInvisibleTimeResponseHeader.class); response.setCode(ResponseCode.SUCCESS); @@ -126,11 +167,13 @@ public CompletableFuture processRequestAsync(final Channel chan return CompletableFuture.completedFuture(response); } + // lite topic process CompletableFuture future = processChangeInvisibleTimeForLite(requestHeader, response, responseHeader); if (future != null) { return future; } + // offset check long minOffset = this.brokerController.getMessageStore().getMinOffsetInQueue(requestHeader.getTopic(), requestHeader.getQueueId()); long maxOffset; try { @@ -144,6 +187,9 @@ public CompletableFuture processRequestAsync(final Channel chan } String[] extraInfo = ExtraInfoUtil.split(requestHeader.getExtraInfo()); + + // default value of popConsumerKVServiceEnable is false + // kv based ack service if (brokerController.getBrokerConfig().isPopConsumerKVServiceEnable()) { if (ExtraInfoUtil.isOrder(extraInfo)) { return this.processChangeInvisibleTimeForOrderNew( @@ -164,16 +210,20 @@ public CompletableFuture processRequestAsync(final Channel chan return CompletableFuture.completedFuture(response); } + // file merge based ack service + + // orderly topic if (ExtraInfoUtil.isOrder(extraInfo)) { return CompletableFuture.completedFuture( processChangeInvisibleTimeForOrder(requestHeader, extraInfo, response, responseHeader)); } - // add new ck + // add new checkpoint then ack origin checkpoint long now = System.currentTimeMillis(); CompletableFuture futureResult = appendCheckPointThenAckOrigin(requestHeader, ExtraInfoUtil.getReviveQid(extraInfo), requestHeader.getQueueId(), requestHeader.getOffset(), now, extraInfo); + // format response return futureResult.thenCompose(result -> { if (result) { responseHeader.setInvisibleTime(requestHeader.getInvisibleTime()); @@ -260,8 +310,25 @@ protected RemotingCommand processChangeInvisibleTimeForOrder(ChangeInvisibleTime return response; } + /** + * Ack the original checkpoint after created a new checkpoint successfully. + * + *

Called after the new checkpoint has been written successfully. This method + * writes an {@link PopAckConstants#ACK_TAG} message that matches the + * original checkpoint's merge key. When {@link PopReviveService} processes this + * ack, it sets the corresponding bit in the old CK's bitMap, causing + * the old CK to be treated as fully acked and skipped during revive. + * + *

If {@link PopBufferMergeService#addAk} accepts the ack (buffer + * merge enabled), it is merged in memory without writing to the store. + * + * @param requestHeader the original request header + * @param extraInfo the extra info from the original pop request + * @return a future that completes with {@code true} on success + */ private CompletableFuture ackOrigin(final ChangeInvisibleTimeRequestHeader requestHeader, String[] extraInfo) { + // create ackMsg and related message MessageExtBrokerInner msgInner = new MessageExtBrokerInner(); AckMsg ackMsg = new AckMsg(); @@ -278,10 +345,12 @@ private CompletableFuture ackOrigin(final ChangeInvisibleTimeRequestHea this.brokerController.getBrokerStatsManager().incBrokerAckNums(1); this.brokerController.getBrokerStatsManager().incGroupAckNums(requestHeader.getConsumerGroup(), requestHeader.getTopic(), 1); + // add ackMsg if (brokerController.getPopMessageProcessor().getPopBufferMergeService().addAk(rqId, ackMsg)) { return CompletableFuture.completedFuture(true); } + // init message msgInner.setTopic(reviveTopic); msgInner.setBody(JSON.toJSONString(ackMsg).getBytes(StandardCharsets.UTF_8)); msgInner.setQueueId(rqId); @@ -292,6 +361,8 @@ private CompletableFuture ackOrigin(final ChangeInvisibleTimeRequestHea msgInner.setDeliverTimeMs(ExtraInfoUtil.getPopTime(extraInfo) + ExtraInfoUtil.getInvisibleTime(extraInfo)); msgInner.getProperties().put(MessageConst.PROPERTY_UNIQ_CLIENT_MESSAGE_ID_KEYIDX, PopMessageProcessor.genAckUniqueId(ackMsg)); msgInner.setPropertiesString(MessageDecoder.messageProperties2String(msgInner.getProperties())); + + // store message return this.brokerController.getEscapeBridge().asyncPutMessageToSpecificQueue(msgInner).thenCompose(putMessageResult -> { if (putMessageResult.getPutMessageStatus() != PutMessageStatus.PUT_OK && putMessageResult.getPutMessageStatus() != PutMessageStatus.FLUSH_DISK_TIMEOUT @@ -307,6 +378,27 @@ private CompletableFuture ackOrigin(final ChangeInvisibleTimeRequestHea }); } + /** + * Extend the visibility timeout by writing a new checkpoint and ack the old one. + * + *

This is the core of the file-based non-orderly ChangeInvisibleTime path: + *

    + *
  1. Writes a new CK ({@link PopAckConstants#CK_TAG}) to the revive + * topic with the updated {@code invisibleTime}. This CK will trigger a + * revive at the new timeout if not acked.
  2. + *
  3. If the CK is stored successfully, calls {@link #ackOrigin} to write + * an Ack ({@link PopAckConstants#ACK_TAG}) for the original CK, + * preventing the old CK from triggering a premature revive.
  4. + *
+ * + * @param requestHeader the original request header + * @param reviveQid the revive queue to write to + * @param queueId the original queue id + * @param offset the message offset being extended + * @param popTime the new pop time (current time) + * @param extraInfo the extra info from the original pop request + * @return a future that completes with {@code true} on success + */ private CompletableFuture appendCheckPointThenAckOrigin( final ChangeInvisibleTimeRequestHeader requestHeader, int reviveQid, @@ -314,6 +406,8 @@ private CompletableFuture appendCheckPointThenAckOrigin( // add check point msg to revive log MessageExtBrokerInner msgInner = new MessageExtBrokerInner(); msgInner.setTopic(reviveTopic); + + // create checkpoint PopCheckPoint ck = new PopCheckPoint(); ck.setBitMap(0); ck.setNum((byte) 1); @@ -327,6 +421,7 @@ private CompletableFuture appendCheckPointThenAckOrigin( ck.setBrokerName(ExtraInfoUtil.getBrokerName(extraInfo)); ck.setSuspend(requestHeader.isSuspend()); + // init message with checkpoint msgInner.setBody(JSON.toJSONString(ck).getBytes(StandardCharsets.UTF_8)); msgInner.setQueueId(reviveQid); msgInner.setTags(PopAckConstants.CK_TAG); @@ -336,6 +431,9 @@ private CompletableFuture appendCheckPointThenAckOrigin( msgInner.setDeliverTimeMs(ck.getReviveTime() - PopAckConstants.ackTimeInterval); msgInner.getProperties().put(MessageConst.PROPERTY_UNIQ_CLIENT_MESSAGE_ID_KEYIDX, PopMessageProcessor.genCkUniqueId(ck)); msgInner.setPropertiesString(MessageDecoder.messageProperties2String(msgInner.getProperties())); + + // store new checkpoint to extend invisible time + // then ack origin checkpoint return this.brokerController.getEscapeBridge().asyncPutMessageToSpecificQueue(msgInner).thenCompose(putMessageResult -> { if (brokerController.getBrokerConfig().isEnablePopLog()) { POP_LOGGER.info("change Invisible, appendCheckPoint, topic {}, queueId {},reviveId {}, cid {}, startOffset {}, rt {}, result {}", requestHeader.getTopic(), queueId, reviveQid, requestHeader.getConsumerGroup(), offset, @@ -349,6 +447,8 @@ private CompletableFuture appendCheckPointThenAckOrigin( this.brokerController.getBrokerStatsManager().incGroupCkNums(requestHeader.getConsumerGroup(), requestHeader.getTopic(), 1); } } + + // if success, ack origin checkpoint if (putMessageResult.getPutMessageStatus() != PutMessageStatus.PUT_OK && putMessageResult.getPutMessageStatus() != PutMessageStatus.FLUSH_DISK_TIMEOUT && putMessageResult.getPutMessageStatus() != PutMessageStatus.FLUSH_SLAVE_TIMEOUT diff --git a/broker/src/main/java/org/apache/rocketmq/broker/processor/PopBufferMergeService.java b/broker/src/main/java/org/apache/rocketmq/broker/processor/PopBufferMergeService.java index 5373eaea333..b17c1c7e410 100644 --- a/broker/src/main/java/org/apache/rocketmq/broker/processor/PopBufferMergeService.java +++ b/broker/src/main/java/org/apache/rocketmq/broker/processor/PopBufferMergeService.java @@ -44,10 +44,56 @@ import java.util.concurrent.LinkedBlockingDeque; import java.util.concurrent.atomic.AtomicInteger; +/** + * File based Ack buffer merge service. + * + *

buffer checkpoint in memory then enqueue them into system revive queue then wait to be acked. + * + *

Two in-memory data structures drive the merge logic: + *

    + *
  • {@link #buffer} — maps {@code mergeKey} to {@link PopCheckPointWrapper}, + * tracking which sub-messages within a CK batch have been acked + * (via {@code bits} bitmask) and which have been persisted + * (via {@code toStoreBits} bitmask)
  • + *
  • {@link #commitOffsets} — maps {@code topic@cid@queueId} to an ordered + * queue of {@link PopCheckPointWrapper}s for sequential offset committing
  • + *
+ * + *

The background {@link #scan()} thread periodically evaluates each buffered CK: + *

    + *
  • All acks received — removes the CK from the buffer without writing + * anything to storage (clean completion)
  • + *
  • About to expire ({@code reviveTime - now < popCkStayBufferTimeOut}) + * or stayed too long — writes the CK and all un-persisted acks + * (or batch acks) to the revive topic
  • + *
+ * + *

This service is enabled by {@code enablePopBufferMerge} and only runs on + * a master or a slave acting as master. When {@code enablePopBatchAck} is set, + * multiple ack offsets are packed into a single {@link BatchAckMsg}. + */ public class PopBufferMergeService extends ServiceThread { private static final Logger POP_LOGGER = LoggerFactory.getLogger(LoggerName.ROCKETMQ_POP_LOGGER_NAME); + /** + * In-memory map of check points. + * Key: topic + group + queueId + startOffset + popTime + brokerName + * Value: check point wrapper + * use cases: + * - scan: iterate buffer + * - addAckMsg: get check point from buffer and mark ack state of Check Point + */ ConcurrentHashMap buffer = new ConcurrentHashMap<>(1024 * 16); + /** + * manage check point of given consumer and given queue + * Key: topic@cid@queueId + * Value: check point queue of specific consumer and queue + * use cases: + * - getLatestOffset: get consumer next start offset of given queue + * - scanGarbage + * - getOffsetTotalSize: get total popping num + * - isQueueFull + */ ConcurrentHashMap> commitOffsets = new ConcurrentHashMap<>(); private volatile boolean serving = true; @@ -92,6 +138,7 @@ public void run() { // scan while (!this.isStopped()) { try { + // env check if (!isShouldRunning()) { // slave this.waitForRunning(interval * 200 * 5); @@ -104,11 +151,12 @@ public void run() { scan(); if (scanTimes % countOfSecond30 == 0) { + // remove checkpoint which are timeout scanGarbage(); } + // waiting this.waitForRunning(interval); - if (!this.serving && this.buffer.size() == 0 && getOffsetTotalSize() == 0) { this.serving = true; } @@ -118,6 +166,7 @@ public void run() { } } + // scan until buffer is empty this.serving = false; try { Thread.sleep(2000); @@ -133,6 +182,27 @@ public void run() { } } + /** + * Drain the {@link #commitOffsets} queues and commit consumer offsets in FIFO order. + * scanAndCommitOffset may be a better name + * + *

For each {@code topic@cid@queueId} queue, the method peeks the head (oldest) + * wrapper and checks whether it is ready to commit: + *

    + *
  • Just-offset entry with CK stored
  • + *
  • All sub-messages acked ({@link #isCkDone})
  • + *
  • All acks persisted and CK stored ({@link #isCkDoneForFinish})
  • + *
+ * + *

If the head is ready, it is committed and removed. Processing continues + * to the next wrapper in the same queue. If the head is not ready, the loop + * breaks — this ensures strict FIFO order and prevents consumer offset + * regression. + * + *

Called at the end of {@link #scan()} after the buffer has been processed. + * + * @return the total number of remaining wrappers across all queues (for logging) + */ private int scanCommitOffset() { Iterator>> iterator = this.commitOffsets.entrySet().iterator(); int count = 0; @@ -185,9 +255,20 @@ public long getLatestOffset(String topic, String group, int queueId) { return getLatestOffset(KeyBuilder.buildPollingKey(topic, group, queueId)); } + /** + * Remove stale entries from {@link #commitOffsets}. + * + *

Three types of entries are removed: + *

    + *
  • Topic no longer exists (deleted)
  • + *
  • Consumer group no longer exists (unsubscribed)
  • + *
  • No activity for more than 5 minutes (idle)
  • + *
+ */ private void scanGarbage() { Iterator>> iterator = commitOffsets.entrySet().iterator(); while (iterator.hasNext()) { + // validate checkpoint Map.Entry> entry = iterator.next(); if (entry.getKey() == null) { continue; @@ -198,16 +279,23 @@ private void scanGarbage() { } String topic = keyArray[0]; String cid = keyArray[1]; + + // remove if topic no longer exists if (brokerController.getTopicConfigManager().selectTopicConfig(topic) == null) { POP_LOGGER.info("[PopBuffer]remove nonexistent topic {} in buffer!", topic); iterator.remove(); continue; } + + // remove if subscription group no longer exists if (!brokerController.getSubscriptionGroupManager().containsSubscriptionGroup(cid)) { POP_LOGGER.info("[PopBuffer]remove nonexistent subscription group {} of topic {} in buffer!", cid, topic); iterator.remove(); continue; } + + // remove if idle + // entry.getValue().getTime() = popTime of last checkpoint enqueued in the queue if (System.currentTimeMillis() - entry.getValue().getTime() > minute5) { POP_LOGGER.info("[PopBuffer]remove long time not used sub {} of topic {} in buffer!", cid, topic); iterator.remove(); @@ -223,6 +311,26 @@ private boolean isSubscriptionGroupNotExist(PopCheckPointWrapper pointWrapper) { } + /** + * Scan and process all buffered checkpoints, then drain the offset commit queue. + * + *

For each entry in {@link #buffer}: + *

    + *
  • Consumer group not found — removes the entry silently
  • + *
  • CK done (all sub-messages acked) — removes from buffer, no store write needed
  • + *
  • Just-offset entry — writes the CK to the revive topic if not yet stored
  • + *
  • Needs eviction (service stopped, revive timeout, or stay timeout) — + * writes the CK and all un-persisted acks (batch or individual) to the revive topic, + * then removes the entry when all persisted
  • + *
  • Otherwise — leaves the entry in the buffer for the next scan cycle
  • + *
+ * + *

After processing the buffer, calls {@link #scanCommitOffset()} to commit offsets + * for finished checkpoints in FIFO order. + * + *

If the scan duration exceeds {@code popCkStayBufferTimeOut - 1000ms}, the service + * temporarily stops accepting new CKs ({@link #serving} = false) to avoid backlog. + */ private void scan() { long startTime = System.currentTimeMillis(); AtomicInteger count = new AtomicInteger(0); @@ -244,7 +352,6 @@ private void scan() { continue; } - // just process offset(already stored at pull thread), or buffer ck(not stored and ack finish) if (pointWrapper.isJustOffset() && pointWrapper.isCkStored() || isCkDone(pointWrapper) || isCkDoneForFinish(pointWrapper) && pointWrapper.isCkStored()) { @@ -259,6 +366,7 @@ private void scan() { PopCheckPoint point = pointWrapper.getCk(); long now = System.currentTimeMillis(); + // check whether check point is timeout boolean removeCk = !this.serving; // ck will be timeout if (point.getReviveTime() - now < brokerController.getBrokerConfig().getPopCkStayBufferTimeOut()) { @@ -275,17 +383,18 @@ private void scan() { } // double check - if (isCkDone(pointWrapper)) { + if (isCkDone(pointWrapper)) { // all checkpoint are acked, do nothing continue; - } else if (pointWrapper.isJustOffset()) { + } else if (pointWrapper.isJustOffset()) { // store checkpoint // just offset should be in store. if (pointWrapper.getReviveQueueOffset() < 0) { putCkToStore(pointWrapper, this.brokerController.getBrokerConfig().isAppendCkAsync()); countCk++; } continue; - } else if (removeCk) { + } else if (removeCk) { // store checkpoint if needed // put buffer ak to store + // revive queue offset < 0 means checkpoint was not stored if (pointWrapper.getReviveQueueOffset() < 0) { putCkToStore(pointWrapper, this.brokerController.getBrokerConfig().isAppendCkAsync()); countCk++; @@ -295,11 +404,13 @@ private void scan() { continue; } - if (brokerController.getBrokerConfig().isEnablePopBatchAck()) { + // store checkpoint + if (brokerController.getBrokerConfig().isEnablePopBatchAck()) { // default is false List indexList = this.batchAckIndexList; try { for (byte i = 0; i < point.getNum(); i++) { // reput buffer ak to store + // if checkpoint is acked and not stored, add to indexList if (DataConverter.getBit(pointWrapper.getBits().get(), i) && !DataConverter.getBit(pointWrapper.getToStoreBits().get(), i)) { indexList.add(i); @@ -314,6 +425,7 @@ private void scan() { } else { for (byte i = 0; i < point.getNum(); i++) { // reput buffer ak to store + // if checkpoint is acked and not stored, call putAckToStore if (DataConverter.getBit(pointWrapper.getBits().get(), i) && !DataConverter.getBit(pointWrapper.getToStoreBits().get(), i)) { putAckToStore(pointWrapper, i, count); @@ -321,6 +433,7 @@ private void scan() { } } + // remove checkpoint from buffer if (isCkDoneForFinish(pointWrapper) && pointWrapper.isCkStored()) { if (brokerController.getBrokerConfig().isEnablePopLog()) { POP_LOGGER.info("[PopBuffer]ck finish, {}", pointWrapper); @@ -331,8 +444,10 @@ private void scan() { } } + // scan commitOffsets and commit offset which is needed. int offsetBufferSize = scanCommitOffset(); + // calculate scan times long eclipse = System.currentTimeMillis() - startTime; if (eclipse > brokerController.getBrokerConfig().getPopCkStayBufferTimeOut() - 1000) { POP_LOGGER.warn("[PopBuffer]scan stop, because eclipse too long, PopBufferEclipse={}, " + @@ -370,6 +485,15 @@ public int getBufferedCKSize() { return this.counter.get(); } + /** + * Atomically set the bit at {@code index} in an {@link AtomicInteger} bitmask. + * + *

Uses a CAS (compare-and-swap) loop to ensure thread safety without locking. + * If the bit is already set, this method returns immediately (no-op). + * + * @param setBits the atomic bitmask to update + * @param index the bit position (0-based) + */ private void markBitCAS(AtomicInteger setBits, int index) { while (true) { int bits = setBits.get(); @@ -384,6 +508,22 @@ private void markBitCAS(AtomicInteger setBits, int index) { } } + /** + * Commit the consumer offset for the checkpoint's {@code topic@cid@queueId}. + * + *

Called from {@link #scanCommitOffset()} after the checkpoint is confirmed + * as finished (all acks received or CK stored). The offset is advanced to + * {@link PopCheckPointWrapper#nextBeginOffset}, which is the offset of the + * first message after this batch. + * + *

The operation is guarded by {@link PopMessageProcessor.QueueLockManager} + * to prevent concurrent offset updates on the same queue. + * + * @param wrapper the finished checkpoint wrapper + * @return {@code true} if the offset was committed or no commit is needed + * ({@code nextBeginOffset < 0}); {@code false} if the lock could + * not be acquired (caller should retry later) + */ private boolean commitOffset(final PopCheckPointWrapper wrapper) { if (wrapper.getNextBeginOffset() < 0) { return true; @@ -413,8 +553,25 @@ private boolean commitOffset(final PopCheckPointWrapper wrapper) { return true; } + /** + * Enqueue the checkpoint wrapper into the per-{@code topic@cid@queueId} offset queue + * for sequential offset committing. + * + *

The queue is maintained in FIFO order. The {@link #scanCommitOffset()} method + * drains the queue from the head, ensuring that offsets are committed in the same + * order as the checkpoints were created, which prevents consumer offset regression. + * + *

The {@link QueueWithTime#time} is also updated to the CK's pop time so that + * {@link #scanGarbage()} can identify and remove stale entries after 5 minutes of + * inactivity. + * + * @param pointWrapper the checkpoint wrapper to enqueue + * @return true if the element was added to the queue successfully + */ private boolean putOffsetQueue(PopCheckPointWrapper pointWrapper) { QueueWithTime queue = this.commitOffsets.get(pointWrapper.getLockKey()); + + // init with empty queue if (queue == null) { queue = new QueueWithTime<>(); QueueWithTime old = this.commitOffsets.putIfAbsent(pointWrapper.getLockKey(), queue); @@ -422,6 +579,7 @@ private boolean putOffsetQueue(PopCheckPointWrapper pointWrapper) { queue = old; } } + queue.setTime(pointWrapper.getCk().getPopTime()); return queue.get().offer(pointWrapper); } @@ -436,12 +594,13 @@ private boolean checkQueueOk(PopCheckPointWrapper pointWrapper) { /** * put to store && add to buffer. + * addAndStoreCheckpoint maybe a better name. * - * @param point - * @param reviveQueueId - * @param reviveQueueOffset - * @param nextBeginOffset - * @return + * @param point check point + * @param reviveQueueId revive queueId + * @param reviveQueueOffset revive queueOffset + * @param nextBeginOffset next offset + * @return true if success */ public boolean addCkJustOffset(PopCheckPoint point, int reviveQueueId, long reviveQueueOffset, long nextBeginOffset) { @@ -454,6 +613,8 @@ public boolean addCkJustOffset(PopCheckPoint point, int reviveQueueId, long revi return false; } + // called before buffer operation + // because store operation will update attributes of pointWrapper this.putCkToStore(pointWrapper, checkQueueOk(pointWrapper)); putOffsetQueue(pointWrapper); @@ -465,8 +626,17 @@ public boolean addCkJustOffset(PopCheckPoint point, int reviveQueueId, long revi return true; } + /** + * mock checkpoint then add it to offset queue. + * this method is called when popped message is: + * - NO_MATCHED_MESSAGE + * - OFFSET_FOUND_NULL + * - MESSAGE_WAS_REMOVING + * - NO_MATCHED_LOGIC_QUEUE + */ public void addCkMock(String group, String topic, int queueId, long startOffset, long invisibleTime, long popTime, int reviveQueueId, long nextBeginOffset, String brokerName) { + // create checkpoint final PopCheckPoint ck = new PopCheckPoint(); ck.setBitMap(0); ck.setNum((byte) 0); @@ -482,12 +652,17 @@ public void addCkMock(String group, String topic, int queueId, long startOffset, pointWrapper.setCkStored(true); putOffsetQueue(pointWrapper); + if (brokerController.getBrokerConfig().isEnablePopLog()) { POP_LOGGER.info("[PopBuffer]add ck just offset, mocked, {}", pointWrapper); } } + /** + * add checkpoint to buffer. + */ public boolean addCk(PopCheckPoint point, int reviveQueueId, long reviveQueueOffset, long nextBeginOffset) { + // validate env and checkpoint // key: point.getT() + point.getC() + point.getQ() + point.getSo() + point.getPt() if (!brokerController.getBrokerConfig().isEnablePopBufferMerge()) { return false; @@ -531,14 +706,39 @@ public boolean addCk(PopCheckPoint point, int reviveQueueId, long reviveQueueOff return true; } + /** + * Merge a consumer ack into the buffered checkpoint. + * + *

The ack is not written to the revive topic immediately. Instead, a flag is + * set in {@link PopCheckPointWrapper#bits} via {@link #markBitCAS}. + * The pending ack will later be flushed to storage by {@link #scan()} when the + * checkpoint is evicted (timeout / buffer full / service stopping). + * + *

Rejection conditions (return false): + *

    + *
  • {@code enablePopBufferMerge} is disabled
  • + *
  • The service is not serving (too busy)
  • + *
  • No matching checkpoint found in {@link #buffer}
  • + *
  • The checkpoint is a {@code justOffset} entry (no messages to ack)
  • + *
  • The checkpoint is too close to its revive deadline
  • + *
  • The checkpoint has been buffered for too long
  • + *
+ * + * @param reviveQid revive queue id (used only for logging) + * @param ackMsg the ack message from the consumer + * @return true if the ack was merged successfully + */ public boolean addAk(int reviveQid, AckMsg ackMsg) { + // validate env if (!brokerController.getBrokerConfig().isEnablePopBufferMerge()) { return false; } if (!serving) { return false; } + try { + // get and validate checkpoint PopCheckPointWrapper pointWrapper = this.buffer.get(ackMsg.getTopic() + ackMsg.getConsumerGroup() + ackMsg.getQueueId() + ackMsg.getStartOffset() + ackMsg.getPopTime() + ackMsg.getBrokerName()); if (pointWrapper == null) { if (brokerController.getBrokerConfig().isEnablePopLog()) { @@ -568,7 +768,8 @@ public boolean addAk(int reviveQid, AckMsg ackMsg) { return false; } - if (ackMsg instanceof BatchAckMsg) { + // merge ackMsg with checkpoint + if (ackMsg instanceof BatchAckMsg) { // merge batch ackMsg for (Long ackOffset : ((BatchAckMsg) ackMsg).getAckOffsetList()) { int indexOfAck = point.indexOfAck(ackOffset); if (indexOfAck > -1) { @@ -577,7 +778,7 @@ public boolean addAk(int reviveQid, AckMsg ackMsg) { POP_LOGGER.error("[PopBuffer]Invalid index of ack, reviveQid={}, {}, {}", reviveQid, ackMsg, point); } } - } else { + } else { // merge ackMsg int indexOfAck = point.indexOfAck(ackMsg.getAckOffset()); if (indexOfAck > -1) { markBitCAS(pointWrapper.getBits(), indexOfAck); @@ -587,6 +788,7 @@ public boolean addAk(int reviveQid, AckMsg ackMsg) { } } + // logging if (brokerController.getBrokerConfig().isEnablePopLog()) { POP_LOGGER.info("[PopBuffer]add ack, rqId={}, {}, {}", reviveQid, pointWrapper, ackMsg); } @@ -608,6 +810,12 @@ public void clearOffsetQueue(String lockKey) { this.commitOffsets.remove(lockKey); } + /** + * write message(checkpoint) to revive topic, then update pointWrapper related info. + * + * @param pointWrapper checkpoint + * @param runInCurrent async or sync + */ private void putCkToStore(final PopCheckPointWrapper pointWrapper, final boolean runInCurrent) { if (pointWrapper.getReviveQueueOffset() >= 0) { return; @@ -617,6 +825,7 @@ private void putCkToStore(final PopCheckPointWrapper pointWrapper, final boolean // Indicates that ck message is storing pointWrapper.setReviveQueueOffset(Long.MAX_VALUE); + // default value of isAppendCkAsync is false if (brokerController.getBrokerConfig().isAppendCkAsync() && runInCurrent) { brokerController.getEscapeBridge().asyncPutMessageToSpecificQueue(msgInner).thenAccept(putMessageResult -> { handleCkMessagePutResult(putMessageResult, pointWrapper); @@ -655,7 +864,21 @@ private void handleCkMessagePutResult(PutMessageResult putMessageResult, final P } } + /** + * Persist message which created by checkpoint to the revive topic. + * + *
    + *
  • create message by checkpoint
  • + *
  • write message to revive topic
  • + *
  • update pointWrapper related info
  • + *
+ * + * @param pointWrapper the checkpoint wrapper containing the original CK + * @param msgIndex the sub-message index within the CK batch to ack + * @param count atomic counter incremented on successful persistence + */ private void putAckToStore(final PopCheckPointWrapper pointWrapper, byte msgIndex, AtomicInteger count) { + // build ackMsg and Message by checkpoint PopCheckPoint point = pointWrapper.getCk(); MessageExtBrokerInner msgInner = new MessageExtBrokerInner(); final AckMsg ackMsg = new AckMsg(); @@ -679,7 +902,8 @@ private void putAckToStore(final PopCheckPointWrapper pointWrapper, byte msgInde msgInner.setPropertiesString(MessageDecoder.messageProperties2String(msgInner.getProperties())); - if (brokerController.getBrokerConfig().isAppendAckAsync()) { + // store message then change store status of the checkpoint + if (brokerController.getBrokerConfig().isAppendAckAsync()) { // default value is false brokerController.getEscapeBridge().asyncPutMessageToSpecificQueue(msgInner).thenAccept(putMessageResult -> { handleAckPutMessageResult(ackMsg, putMessageResult, pointWrapper, count, msgIndex); }).exceptionally(throwable -> { @@ -687,11 +911,22 @@ private void putAckToStore(final PopCheckPointWrapper pointWrapper, byte msgInde return null; }); } else { + // store message PutMessageResult putMessageResult = brokerController.getEscapeBridge().putMessageToSpecificQueue(msgInner); + // change store status of the checkpoint handleAckPutMessageResult(ackMsg, putMessageResult, pointWrapper, count, msgIndex); } } + /** + * update store status of checkpoint if revive message stored successfully. + * + * @param ackMsg the ack message that was persisted + * @param putMessageResult the result returned by the store + * @param pointWrapper the checkpoint wrapper being processed + * @param count atomic counter incremented on success + * @param msgIndex the sub-message index that was persisted + */ private void handleAckPutMessageResult(AckMsg ackMsg, PutMessageResult putMessageResult, PopCheckPointWrapper pointWrapper, AtomicInteger count, byte msgIndex) { brokerController.getBrokerMetricsManager().getPopMetricsManager().incPopReviveAckPutCount(ackMsg, putMessageResult.getPutMessageStatus()); @@ -797,6 +1032,17 @@ private boolean cancelCkTimer(final PopCheckPointWrapper pointWrapper) { return true; } + /** + * Check whether all sub-messages in the checkpoint have been acked. + * + *

Every sub-message has a corresponding bit in + * {@link PopCheckPointWrapper#bits}. This method returns {@code true} when + * all bits are set, meaning the CK can be removed from the buffer without + * writing any ack to the revive topic (clean completion). + * + * @param pointWrapper the checkpoint wrapper to check + * @return {@code true} if every sub-message has been acked + */ private boolean isCkDone(PopCheckPointWrapper pointWrapper) { byte num = pointWrapper.getCk().getNum(); for (byte i = 0; i < num; i++) { @@ -807,6 +1053,18 @@ private boolean isCkDone(PopCheckPointWrapper pointWrapper) { return true; } + /** + * Check whether all acked sub-messages have been fully persisted. + * + *

Uses XOR: {@code bits ^ toStoreBits}. A bit is set in the result when + * the corresponding sub-message has been acked ({@code bits}) but not yet + * persisted ({@code toStoreBits}). Returns {@code true} only when every + * acked message has also been persisted, meaning the checkpoint is ready + * for final cleanup. + * + * @param pointWrapper the checkpoint wrapper to check + * @return {@code true} if no ack remains to be persisted + */ private boolean isCkDoneForFinish(PopCheckPointWrapper pointWrapper) { byte num = pointWrapper.getCk().getNum(); int bits = pointWrapper.getBits().get() ^ pointWrapper.getToStoreBits().get(); @@ -842,17 +1100,46 @@ public LinkedBlockingDeque get() { public class PopCheckPointWrapper { private final int reviveQueueId; - // -1: not stored, >=0: stored, Long.MAX: storing. + /** + * The consume queue offset of the CK message in the revive topic. + * + *

Three-state indicator: + *

    + *
  • {@code -1} — not yet stored; {@link #putCkToStore} will write it
  • + *
  • {@code >= 0} — successfully stored; the value is the offset in the + * revive topic's consume queue
  • + *
  • {@link Long#MAX_VALUE} — a write is in progress (prevents duplicate + * writes from concurrent scans)
  • + *
+ */ private volatile long reviveQueueOffset; private final PopCheckPoint ck; - // bit for concurrent + // store ack states of messages, one byte for each message private final AtomicInteger bits; - // bit for stored buffer ak + // bits for stored buffer ak, one byte for each message private final AtomicInteger toStoreBits; + // nextOffset of original topic private final long nextBeginOffset; + // topic@group@queueId private final String lockKey; + // topic + group + queueId + startOffset + popTime + brokerName private final String mergeKey; + /** + * Whether this checkpoint should be written to the revive topic directly. + * + *

When {@code true}: + *

    + *
  • The CK has already been or will be written to the revive topic directly
  • + *
  • No Ack merging is needed — {@link #addAk} rejects these entries
  • + *
  • The wrapper exists solely to maintain FIFO offset commit order in + * {@link #commitOffsets}
  • + *
+ * + * @see PopBufferMergeService#addCkJustOffset + * @see PopBufferMergeService#addCkMock + */ private final boolean justOffset; + // whether check point has stored in revive queue private volatile boolean ckStored = false; public PopCheckPointWrapper(int reviveQueueId, long reviveQueueOffset, PopCheckPoint point, diff --git a/broker/src/main/java/org/apache/rocketmq/broker/processor/PopMessageProcessor.java b/broker/src/main/java/org/apache/rocketmq/broker/processor/PopMessageProcessor.java index c32e1b5ae23..e863ae7ac79 100644 --- a/broker/src/main/java/org/apache/rocketmq/broker/processor/PopMessageProcessor.java +++ b/broker/src/main/java/org/apache/rocketmq/broker/processor/PopMessageProcessor.java @@ -33,6 +33,7 @@ import org.apache.rocketmq.broker.longpolling.PopRequest; import org.apache.rocketmq.broker.pagecache.ManyMessageTransfer; import org.apache.rocketmq.broker.pop.PopConsumerContext; +import org.apache.rocketmq.broker.pop.PopConsumerService; import org.apache.rocketmq.common.BrokerConfig; import org.apache.rocketmq.common.KeyBuilder; import org.apache.rocketmq.common.MixAll; @@ -99,6 +100,24 @@ import static org.apache.rocketmq.remoting.metrics.RemotingMetricsConstant.LABEL_RESPONSE_CODE; import static org.apache.rocketmq.remoting.metrics.RemotingMetricsConstant.LABEL_RESULT; +/** + * Processes PopMessage requests from consumers. + * + *

This is the core processor for the Pop consumption mode. It handles: + *

    + *
  • Validating the request (topic, group, queue, subscription, permissions)
  • + *
  • Routing to the {@link PopConsumerService} (KVStore path) or the + * inline file-based path
  • + *
  • Popping messages from normal and retry topics (V1/V2)
  • + *
  • Creating checkpoints and writing them to the revive topic
  • + *
  • Long-polling suspension via {@link PopLongPollingService}
  • + *
  • Transferring messages to the client (heap copy or zero-copy)
  • + *
+ * + *

This class also owns the {@link PopLongPollingService}, + * {@link PopBufferMergeService}, and {@link QueueLockManager} instances + * used by the file-based ack path. + */ public class PopMessageProcessor implements NettyRequestProcessor { private static final Logger POP_LOGGER = LoggerFactory.getLogger(LoggerName.ROCKETMQ_POP_LOGGER_NAME); @@ -222,13 +241,36 @@ public void notifyMessageArriving(final String topic, final int queueId, final S topic, queueId, cid, false, null, 0L, null, null); } + /** + * Process a PopMessage request. + * + *

This method handles the full Pop lifecycle: + *

    + *
  1. Validates the request (topic, group, permissions, subscription)
  2. + *
  3. Routes to the KVStore path (via {@link PopConsumerService#popAsync}) + * or the file-based path (inline CompletableFuture chain)
  4. + *
  5. Pops messages from normal and retry topics (V1/V2)
  6. + *
  7. Creates checkpoints and appends them to the revive topic
  8. + *
  9. Suspends the request via {@link PopLongPollingService#polling} if + * no messages are available
  10. + *
  11. Transfers messages via heap copy or zero-copy ({@code FileRegion})
  12. + *
+ * + * @param ctx the Netty channel handler context + * @param request the incoming PopMessage request + * @return the response, or {@code null} if the response is sent asynchronously + * (zero-copy path or long-polling suspension) + * @throws RemotingCommandException if the request cannot be decoded + */ @Override public RemotingCommand processRequest(final ChannelHandlerContext ctx, RemotingCommand request) throws RemotingCommandException { + // init request and response final long beginTimeMills = this.brokerController.getMessageStore().now(); Channel channel = ctx.channel(); + RemotingCommand response = RemotingCommand.createResponseCommand(PopMessageResponseHeader.class); response.setOpaque(request.getOpaque()); @@ -240,6 +282,7 @@ public RemotingCommand processRequest(final ChannelHandlerContext ctx, RemotingC } final PopMessageResponseHeader responseHeader = (PopMessageResponseHeader) response.readCustomHeader(); + // validation // Pop mode only supports consumption in cluster load balancing mode brokerController.getConsumerManager().compensateBasicConsumerInfo( requestHeader.getConsumerGroup(), ConsumeType.CONSUME_POP, MessageModel.CLUSTERING); @@ -319,6 +362,7 @@ public RemotingCommand processRequest(final ChannelHandlerContext ctx, RemotingC return response; } + // init filter BrokerConfig brokerConfig = brokerController.getBrokerConfig(); SubscriptionData subscriptionData = null; ExpressionMessageFilter messageFilter = null; @@ -382,6 +426,9 @@ public RemotingCommand processRequest(final ChannelHandlerContext ctx, RemotingC ExpressionMessageFilter finalMessageFilter = messageFilter; SubscriptionData finalSubscriptionData = subscriptionData; + // There are two type of ack mode: + // 1. ack by KV service + // 2. ack by file merge service, default mode if (brokerConfig.isPopConsumerKVServiceEnable()) { CompletableFuture popAsyncFuture = brokerController.getPopConsumerService().popAsync( @@ -391,6 +438,7 @@ public RemotingCommand processRequest(final ChannelHandlerContext ctx, RemotingC requestHeader.getAttemptId(), requestHeader.getInitMode(), messageFilter); popAsyncFuture.thenApply(result -> { + // callback try { if (request.getCallbackList() != null) { request.getCallbackList().forEach(CommandCallback::accept); @@ -400,6 +448,7 @@ public RemotingCommand processRequest(final ChannelHandlerContext ctx, RemotingC POP_LOGGER.error("PopProcessor execute callback error", t); } + // long polling process, useless in rocketmq 5.* if (result.isFound()) { response.setCode(ResponseCode.SUCCESS); getMessageResult.setStatus(GetMessageStatus.FOUND); @@ -432,6 +481,7 @@ public RemotingCommand processRequest(final ChannelHandlerContext ctx, RemotingC getMessageResult.setStatus(GetMessageStatus.NO_MESSAGE_IN_QUEUE); } + // format response responseHeader.setPopTime(result.getPopTime()); responseHeader.setInvisibleTime(result.getInvisibleTime()); responseHeader.setReviveQid( @@ -492,8 +542,10 @@ public RemotingCommand processRequest(final ChannelHandlerContext ctx, RemotingC return response; }).thenAccept(result -> NettyRemotingAbstract.writeResponse(channel, request, result, null, brokerController.getBrokerMetricsManager().getRemotingMetricsManager())); return null; - } + } // end of ack by kv service + // start of ack by file merge service mode + // init pop parameters int randomQ = random.nextInt(100); int reviveQid; if (requestHeader.isOrder()) { @@ -503,8 +555,11 @@ public RemotingCommand processRequest(final ChannelHandlerContext ctx, RemotingC this.brokerController.getBrokerConfig().getReviveQueueNum()); } + // properties of rocketmq 4.x, useless in 5.x StringBuilder startOffsetInfo = new StringBuilder(64); + // properties of rocketmq 4.x, useless in 5.x StringBuilder msgOffsetInfo = new StringBuilder(64); + // properties of rocketmq 4.x, useless in 5.x StringBuilder orderCountInfo = requestHeader.isOrder() ? new StringBuilder(64) : null; // Due to the design of the fields startOffsetInfo, msgOffsetInfo, and orderCountInfo, @@ -523,6 +578,8 @@ public RemotingCommand processRequest(final ChannelHandlerContext ctx, RemotingC } randomQ = usePriorityMode ? 0 : randomQ; // reset randomQ long popTime = System.currentTimeMillis(); + + // pop message CompletableFuture getMessageFuture = CompletableFuture.completedFuture(0L); if (needRetry && !requestHeader.isOrder()) { if (needRetryV1) { @@ -535,6 +592,7 @@ public RemotingCommand processRequest(final ChannelHandlerContext ctx, RemotingC popTime, finalMessageFilter, startOffsetInfo, msgOffsetInfo, orderCountInfo, randomQ, getMessageFuture); } } + if (requestHeader.getQueueId() < 0) { // read all queue getMessageFuture = popMsgFromTopic(topicConfig, false, getMessageResult, requestHeader, reviveQid, channel, @@ -546,6 +604,7 @@ public RemotingCommand processRequest(final ChannelHandlerContext ctx, RemotingC getMessageResult, requestHeader, queueId, restNum, reviveQid, channel, popTime, finalMessageFilter, startOffsetInfo, msgOffsetInfo, orderCountInfo)); } + // if not full , fetch retry again if (!needRetry && getMessageResult.getMessageMapedList().size() < requestHeader.getMaxMsgNums() && !requestHeader.isOrder()) { if (needRetryV1) { @@ -559,6 +618,7 @@ public RemotingCommand processRequest(final ChannelHandlerContext ctx, RemotingC } } + // async result handle final RemotingCommand finalResponse = response; getMessageFuture.thenApply(restNum -> { try { @@ -570,6 +630,7 @@ public RemotingCommand processRequest(final ChannelHandlerContext ctx, RemotingC POP_LOGGER.error("PopProcessor execute callback error", t); } + // long polling used in version 4.*, useless in 5.* if (!getMessageResult.getMessageBufferList().isEmpty()) { finalResponse.setCode(ResponseCode.SUCCESS); getMessageResult.setStatus(GetMessageStatus.FOUND); @@ -596,6 +657,8 @@ public RemotingCommand processRequest(final ChannelHandlerContext ctx, RemotingC } getMessageResult.setStatus(GetMessageStatus.NO_MESSAGE_IN_QUEUE); } + + // format response responseHeader.setInvisibleTime(requestHeader.getInvisibleTime()); responseHeader.setPopTime(popTime); responseHeader.setReviveQid(reviveQid); @@ -606,6 +669,9 @@ public RemotingCommand processRequest(final ChannelHandlerContext ctx, RemotingC responseHeader.setOrderCountInfo(orderCountInfo.toString()); } finalResponse.setRemark(getMessageResult.getStatus().name()); + + // transfer msg by heap or zero copy, + // zero copy used in 4.*, useless in 5.* switch (finalResponse.getCode()) { case ResponseCode.SUCCESS: if (this.brokerController.getBrokerConfig().isTransferMsgByHeap()) { @@ -615,7 +681,7 @@ public RemotingCommand processRequest(final ChannelHandlerContext ctx, RemotingC requestHeader.getTopic(), requestHeader.getQueueId(), (int) (this.brokerController.getMessageStore().now() - beginTimeMills)); finalResponse.setBody(r); - } else { + } else { // zero copy final GetMessageResult tmpGetMessageResult = getMessageResult; try { FileRegion fileRegion = @@ -652,6 +718,37 @@ public RemotingCommand processRequest(final ChannelHandlerContext ctx, RemotingC return null; } + /** + * Pop messages from every read queue of the given topic. + * + *

Queues are visited sequentially (respecting {@code priorityOrderAsc}). + * For each queue a {@link #popMsgFromQueue} call is chained via + * {@code CompletableFuture#thenCompose}. The chained future carries the + * remaining number of messages still needed ({@code restNum}). + * + *

Early termination can occur inside {@link #popMsgFromQueue} when: + *

    + *
  • the queue lock cannot be acquired
  • + *
  • too many in-flight (un-acked) messages exist
  • + *
  • an order queue is blocked
  • + *
  • the accumulated message count already reaches {@code maxMsgNums}
  • + *
+ * + * @param topicConfig topic configuration; {@code null} skips all queues + * @param isRetry whether the topic is a retry topic + * @param getMessageResult accumulator for the messages popped so far + * @param requestHeader pop request parameters + * @param reviveQid revive queue id + * @param channel netty channel of the requesting client + * @param popTime pop timestamp + * @param messageFilter expression filter applied to each message + * @param startOffsetInfo buffer for offset tracing info + * @param msgOffsetInfo buffer for per-message offset tracing info + * @param orderCountInfo buffer for order-consume count info + * @param randomQ random queue offset for round-robin load balancing + * @param getMessageFuture future that carries the remaining message count + * @return a future completing with the remaining number of messages needed + */ private CompletableFuture popMsgFromTopic(TopicConfig topicConfig, boolean isRetry, GetMessageResult getMessageResult, PopMessageRequestHeader requestHeader, int reviveQid, Channel channel, long popTime, ExpressionMessageFilter messageFilter, StringBuilder startOffsetInfo, @@ -679,12 +776,52 @@ private CompletableFuture popMsgFromTopic(String topic, boolean isRetry, G messageFilter, startOffsetInfo, msgOffsetInfo, orderCountInfo, randomQ, getMessageFuture); } + /** + * Pop messages from a specific queue of a topic. + * + *

This method is called as a step in a {@link CompletableFuture} chain + * (see {@link #popMsgFromTopic}). The {@code restNum} argument is the + * number of messages still needed — when it drops to {@code 0} or below, + * subsequent calls in the chain may short-circuit early. + * + *

The method has several early-termination paths (all return + * immediately with the current {@code restNum}): + *

    + *
  • Queue lock cannot be acquired — skips this queue
  • + *
  • Too many in-flight (un-acked) messages for this + * {@code topic@group@queueId}
  • + *
  • Order queue is blocked by a previous un-acked message
  • + *
  • Already accumulated {@code >= maxMsgNums} messages
  • + *
+ * + *

Otherwise, it asynchronously fetches messages from the store, handles + * offset correction, updates order-consume tracking / checkpoint data, and + * merges the results into {@code getMessageResult}. + * + * @param topic topic name + * @param attemptId attempt id for idempotent consumption + * @param isRetry whether this is a retry topic + * @param getMessageResult accumulator for messages popped so far + * @param requestHeader pop request parameters + * @param queueId target queue id + * @param restNum number of messages still needed before the batch + * size is satisfied + * @param reviveQid revive queue id for checkpoint + * @param channel netty channel of the requesting client + * @param popTime pop invocation timestamp + * @param messageFilter expression filter applied to each message + * @param startOffsetInfo buffer for offset tracing info + * @param msgOffsetInfo buffer for per-message offset tracing info + * @param orderCountInfo buffer for order-consume count info + * @return a future completing with the remaining number of messages needed + */ private CompletableFuture popMsgFromQueue(String topic, String attemptId, boolean isRetry, GetMessageResult getMessageResult, PopMessageRequestHeader requestHeader, int queueId, long restNum, int reviveQid, Channel channel, long popTime, ExpressionMessageFilter messageFilter, StringBuilder startOffsetInfo, StringBuilder msgOffsetInfo, StringBuilder orderCountInfo) { + // get pop offset String lockKey = topic + PopAckConstants.SPLIT + requestHeader.getConsumerGroup() + PopAckConstants.SPLIT + queueId; boolean isOrder = requestHeader.isOrder(); @@ -698,6 +835,7 @@ private CompletableFuture popMsgFromQueue(String topic, String attemptId, return failure; } + // try lock CompletableFuture future = new CompletableFuture<>(); if (!queueLockManager.tryLock(lockKey)) { try { @@ -710,8 +848,9 @@ private CompletableFuture popMsgFromQueue(String topic, String attemptId, } return future; } - future.whenComplete((result, throwable) -> queueLockManager.unLock(lockKey)); + + // check inflight message number if (isPopShouldStop(topic, requestHeader.getConsumerGroup(), queueId)) { POP_LOGGER.warn("Too much msgs unacked, then stop popping. topic={}, group={}, queueId={}", topic, requestHeader.getConsumerGroup(), queueId); @@ -724,6 +863,7 @@ private CompletableFuture popMsgFromQueue(String topic, String attemptId, return future; } + // check orderly lock and max message number try { offset = getPopOffset(topic, requestHeader.getConsumerGroup(), queueId, requestHeader.getInitMode(), true, lockKey, true); @@ -764,6 +904,7 @@ private CompletableFuture popMsgFromQueue(String topic, String attemptId, return this.brokerController.getMessageStore() .getMessageAsync(requestHeader.getConsumerGroup(), topic, queueId, offset, requestHeader.getMaxMsgNums() - getMessageResult.getMessageMapedList().size(), messageFilter) + // result check and retry if offset is not correct .thenCompose(result -> { if (result == null) { return CompletableFuture.completedFuture(null); @@ -784,7 +925,9 @@ private CompletableFuture popMsgFromQueue(String topic, String attemptId, requestHeader.getMaxMsgNums() - getMessageResult.getMessageMapedList().size(), messageFilter); } return CompletableFuture.completedFuture(result); - }).thenApply(result -> { + }) + // update order info or append checkpoint then format result + .thenApply(result -> { if (result == null) { try { atomicRestNum.set(brokerController.getMessageStore().getMaxOffsetInQueue(topic, queueId) - atomicOffset.get() + atomicRestNum.get()); @@ -876,7 +1019,9 @@ private CompletableFuture popMsgFromQueue(String topic, String attemptId, result.getMessageCount() ); return atomicRestNum.get(); - }).whenComplete((result, throwable) -> { + }) + // unlock queueLock + .whenComplete((result, throwable) -> { if (throwable != null) { POP_LOGGER.error("Pop message error, {}", lockKey, throwable); } @@ -889,14 +1034,37 @@ private boolean isPopShouldStop(String topic, String group, int queueId) { brokerController.getPopInflightMessageCounter().getGroupPopInFlightMessageNum(topic, group, queueId) > brokerController.getBrokerConfig().getPopInflightMessageThreshold(); } + /** + * get consume offset for pop mode + * called by: + * - this.popMsgFromQueue() + * functionality: + * - return resetOffset if exists + * - get offset if exists + * - init offset if not exists + * - get offset from popBufferMergeService + * + * @param topic topic + * @param group group + * @param queueId queueId + * @param initMode initMode ConsumeInitMode.MAX for pop mode + * @param init flag of whether commit offset the first time pop message + * @param lockKey lockKey + * @param checkResetOffset flag of whether resetPopOffset + * @return offset + */ private long getPopOffset(String topic, String group, int queueId, int initMode, boolean init, String lockKey, boolean checkResetOffset) throws ConsumeQueueException { long offset = this.brokerController.getConsumerOffsetManager().queryOffset(group, topic, queueId); if (offset < 0) { + //the first time consume, init offset by initMode offset = this.getInitOffset(topic, group, queueId, initMode, init); } + // before lock checkResetOffset is false + // after lock checkResetOffset is true + // This is an admin related feature if (checkResetOffset) { Long resetOffset = resetPopOffset(topic, group, queueId); if (resetOffset != null) { @@ -912,6 +1080,14 @@ private long getPopOffset(String topic, String group, int queueId, int initMode, } } + /** + * get offset from consume queue + * If consume from min offset: + * - return min offset. + * If consume from max offset: + * - get max offset + * - commit max offset if init is true. + */ public long getInitOffset(String topic, String group, int queueId, int initMode, boolean init) throws ConsumeQueueException { long offset; diff --git a/broker/src/main/java/org/apache/rocketmq/broker/processor/PopReviveService.java b/broker/src/main/java/org/apache/rocketmq/broker/processor/PopReviveService.java index 07f16e98965..a6048c39ed7 100644 --- a/broker/src/main/java/org/apache/rocketmq/broker/processor/PopReviveService.java +++ b/broker/src/main/java/org/apache/rocketmq/broker/processor/PopReviveService.java @@ -64,6 +64,25 @@ import static org.apache.rocketmq.broker.metrics.BrokerMetricsConstant.LABEL_IS_SYSTEM; import static org.apache.rocketmq.broker.metrics.BrokerMetricsConstant.LABEL_TOPIC; +/** + * Per-queue service that reads the revive topic, matches checkpoints with AckMsgs, and + * revives timed-out messages by re-publishing them to the retry topic. + * + *

There is only one public method for business: run

+ * + *

Each revive queue has its own dedicated {@code PopReviveService} instance. + * The service periodically: + *

    + *
  1. Scans the revive topic ({@link #consumeReviveMessage}) to collect CK + * (checkpoint) and Ack messages, merging Acks into CK's bitMap
  2. + *
  3. Processes expired checkpoints ({@link #mergeAndRevive}) by re-publishing any + * un-acked sub-messages back to the retry topic via {@link #reviveRetry}
  4. + *
+ * + *

This is the file-based revive path (CK + Ack messages are stored in + * the system revive topic). It is complemented by the KVStore-based path in + * {@code PopConsumerService} which handles the {@code PopConsumerKVStore} flow. + */ public class PopReviveService extends ServiceThread { private static final Logger POP_LOGGER = LoggerFactory.getLogger(LoggerName.ROCKETMQ_POP_LOGGER_NAME); private final int[] ckRewriteIntervalsInSeconds = new int[] { 10, 20, 30, 60, 120, 180, 240, 300, 360, 420, 480, 540, 600, 1200, 1800, 3600, 7200 }; @@ -74,6 +93,25 @@ public class PopReviveService extends ServiceThread { private long currentReviveMessageTimestamp = -1; private volatile boolean shouldRunPopRevive = false; + /** + * Tracks checkpoints that are currently being revived. + * + *

Key — the checkpoint being processed. + * Value — a pair of (startTime, completed), where: + *

    + *
  • {@code startTime} is the timestamp when revival began
  • + *
  • {@code completed} is {@code true} once all sub-messages have been + * processed (success or failure)
  • + *
+ * + *

The map is sorted by {@link PopCheckPoint#compareTo} (by startOffset). + * This ordering is used to drain completed entries from the head, ensuring + * the revive topic offset is committed strictly in sequence. + * + *

Concurrency is limited to at most 3 entries at a time (see + * {@link #mergeAndRevive}). If an entry stays incomplete for over 30 + * seconds, it is considered hung and is skipped via {@link #rePutCK}. + */ private final NavigableMap> inflightReviveRequestMap = Collections.synchronizedNavigableMap(new TreeMap<>()); private long reviveOffset; @@ -104,7 +142,23 @@ public boolean isShouldRunPopRevive() { return shouldRunPopRevive; } + /** + * Re-publish a timed-out message to the retry topic. + * + *

Constructs a new {@link MessageExtBrokerInner} from the original + * message, increments the reconsume count (unless suspended), sets the + * first-pop time and origin group properties, and writes it to the + * appropriate retry topic (V1 or V2 depending on configuration). + * + *

If the retry topic does not exist, it is created automatically + * via {@link #addRetryTopicIfNotExist}. + * + * @param popCheckPoint the checkpoint that triggered the revive + * @param messageExt the original message to re-publish + * @return {@code true} if the message was written successfully + */ private boolean reviveRetry(PopCheckPoint popCheckPoint, MessageExt messageExt) { + // convert checkpoint to inner message MessageExtBrokerInner msgInner = new MessageExtBrokerInner(); if (!popCheckPoint.getTopic().startsWith(MixAll.RETRY_GROUP_TOPIC_PREFIX)) { msgInner.setTopic(KeyBuilder.buildPopRetryTopic(popCheckPoint.getTopic(), popCheckPoint.getCId(), brokerController.getBrokerConfig().isEnableRetryTopicV2())); @@ -133,9 +187,15 @@ private boolean reviveRetry(PopCheckPoint popCheckPoint, MessageExt messageExt) } msgInner.getProperties().put(MessageConst.PROPERTY_ORIGIN_GROUP, popCheckPoint.getCId()); msgInner.setPropertiesString(MessageDecoder.messageProperties2String(msgInner.getProperties())); + + // set topic and queueId addRetryTopicIfNotExist(msgInner.getTopic(), popCheckPoint.getCId()); msgInner.setQueueId(getRetryQueueId(msgInner.getTopic(), messageExt)); + + // store message PutMessageResult putMessageResult = brokerController.getEscapeBridge().putMessageToSpecificQueue(msgInner); + + // logging and metric brokerController.getBrokerMetricsManager().getPopMetricsManager().incPopReviveRetryMessageCount(popCheckPoint, putMessageResult.getPutMessageStatus()); if (brokerController.getBrokerConfig().isEnablePopLog()) { POP_LOGGER.info("reviveQueueId={},retry msg, ck={}, msg queueId {}, offset {}, reviveDelay={}, result is {} ", @@ -205,6 +265,17 @@ private int getRetryQueueId(String retryTopic, MessageExt messageExt) { return oriQueueId; } + /** + * Pull a batch of messages from the revive topic at the given offset. + * + *

If the offset becomes illegal (e.g. the revive topic was truncated), + * the revive offset is corrected to {@code nextBeginOffset - 1} so that + * the next scan starts from a valid position. + * + * @param offset the queue offset to start reading from + * @param queueId the revive queue id + * @return a list of decoded messages, or {@code null} if at the tail + */ protected List getReviveMessage(long offset, int queueId) { PullResult pullResult = getMessage(PopAckConstants.REVIVE_GROUP, reviveTopic, queueId, offset, 32, true); if (pullResult == null) { @@ -333,7 +404,39 @@ private List decodeMsgList(GetMessageResult getMessageResult, boolea return foundList; } + /** + * Pull Message from revive topic then transfer to checkpoint and ack messages. + * + *

This method reads messages from the revive topic starting from the + * current offset. Each message is classified by its tag: + *

    + *
  • {@link PopAckConstants#CK_TAG} — a checkpoint, deserialized from + * JSON and stored in the map by its merge key
  • + *
  • {@link PopAckConstants#ACK_TAG} or + * {@link PopAckConstants#BATCH_ACK_TAG} — an ack, matched to its + * corresponding checkpoint via the merge key. The ack offset is translated + * to a sub-message index ({@link PopCheckPoint#indexOfAck}) and + * the checkpoint's bitMap is updated via {@link DataConverter#setBit}
  • + *
+ * + *

AckMsg that arrive after their checkpoint has already been processed + * ({@code enableSkipLongAwaitingAck}) are handled by creating a mock CK + * via {@link #mockCkForAck} so that the revive offset can still be + * committed correctly. + * + *

The scan stops when any of: + *

    + *
  • No more messages in the revive topic (tail reached)
  • + *
  • Scan time exceeds {@code reviveScanTime}
  • + *
  • The elapsed time since the first CK's revive time exceeds + * {@code ackTimeInterval + 1s}
  • + *
+ * + * @param consumeReviveObj the mutable container that receives the collected + * CKs and the computed {@code endTime} + */ protected void consumeReviveMessage(ConsumeReviveObj consumeReviveObj) { + // init context parameters HashMap map = consumeReviveObj.map; HashMap mockPointMap = new HashMap<>(); long startScanTime = System.currentTimeMillis(); @@ -346,11 +449,14 @@ protected void consumeReviveMessage(ConsumeReviveObj consumeReviveObj) { int noMsgCount = 0; long firstRt = 0; // offset self amend + while (true) { if (!shouldRunPopRevive) { POP_LOGGER.info("slave skip scan, revive topic={}, reviveQueueId={}", reviveTopic, queueId); break; } + + // pull revive messages List messageExts = getReviveMessage(offset, queueId); if (messageExts == null || messageExts.isEmpty()) { long old = endTime; @@ -379,10 +485,13 @@ protected void consumeReviveMessage(ConsumeReviveObj consumeReviveObj) { } else { noMsgCount = 0; } + if (System.currentTimeMillis() - startScanTime > brokerController.getBrokerConfig().getReviveScanTime()) { POP_LOGGER.info("reviveQueueId={}, scan timeout ", queueId); break; } + + // convert message to PopCheckPoint and AckMsg for (MessageExt messageExt : messageExts) { if (PopAckConstants.CK_TAG.equals(messageExt.getTags())) { String raw = new String(messageExt.getBody(), DataConverter.CHARSET_UTF8); @@ -411,6 +520,7 @@ protected void consumeReviveMessage(ConsumeReviveObj consumeReviveObj) { String mergeKey = ackMsg.getTopic() + ackMsg.getConsumerGroup() + ackMsg.getQueueId() + ackMsg.getStartOffset() + ackMsg.getPopTime() + brokerName; PopCheckPoint point = map.get(mergeKey); if (point == null) { + // default value of enableSkipLongAwaitingAck is false if (!brokerController.getBrokerConfig().isEnableSkipLongAwaitingAck()) { continue; } @@ -418,6 +528,7 @@ protected void consumeReviveMessage(ConsumeReviveObj consumeReviveObj) { firstRt = mockPointMap.get(mergeKey).getReviveTime(); } } else { + // merge ackMsg into checkpoint int indexOfAck = point.indexOfAck(ackMsg.getAckOffset()); if (indexOfAck > -1) { point.setBitMap(DataConverter.setBit(point.getBitMap(), indexOfAck, true)); @@ -438,6 +549,7 @@ protected void consumeReviveMessage(ConsumeReviveObj consumeReviveObj) { String mergeKey = bAckMsg.getTopic() + bAckMsg.getConsumerGroup() + bAckMsg.getQueueId() + bAckMsg.getStartOffset() + bAckMsg.getPopTime() + brokerName; PopCheckPoint point = map.get(mergeKey); if (point == null) { + // default value of enableSkipLongAwaitingAck is false if (!brokerController.getBrokerConfig().isEnableSkipLongAwaitingAck()) { continue; } @@ -445,6 +557,7 @@ protected void consumeReviveMessage(ConsumeReviveObj consumeReviveObj) { firstRt = mockPointMap.get(mergeKey).getReviveTime(); } } else { + // merge ackMsgs into checkpoint List ackOffsetList = bAckMsg.getAckOffsetList(); for (Long ackOffset : ackOffsetList) { int indexOfAck = point.indexOfAck(ackOffset); @@ -467,6 +580,20 @@ protected void consumeReviveMessage(ConsumeReviveObj consumeReviveObj) { consumeReviveObj.endTime = endTime; } + /** + * Create a mock CK for an ack whose original CK has already been processed. + * + *

When an ack arrives long after its CK has been consumed (e.g. network + * delay), the CK is no longer in the scan map. If {@code enableSkipLongAwaitingAck} + * is enabled, this method creates a synthetic CK so that the revive offset + * can still be advanced correctly in {@link #mergeAndRevive}. + * + * @param messageExt the revive topic message that carried the ack + * @param ackMsg the decoded ack + * @param mergeKey the merge key for the CK lookup + * @param mockPointMap map to collect the mock CKs + * @return {@code true} if a mock CK was created + */ private boolean mockCkForAck(MessageExt messageExt, AckMsg ackMsg, String mergeKey, HashMap mockPointMap) { long ackWaitTime = System.currentTimeMillis() - messageExt.getDeliverTimeMs(); long reviveAckWaitMs = brokerController.getBrokerConfig().getReviveAckWaitMs(); @@ -482,6 +609,17 @@ private boolean mockCkForAck(MessageExt messageExt, AckMsg ackMsg, String mergeK return false; } + /** + * Build a synthetic checkpoint from an ack message. + * + *

The mock CK has {@code num = 0} and empty bitMap, meaning no actual + * messages to revive. Its only purpose is to carry the {@code reviveOffset} + * so that the revive consumer offset can be committed past this ack. + * + * @param ackMsg the ack message + * @param reviveOffset the queue offset of the ack message in the revive topic + * @return a mock checkpoint with no sub-messages + */ private PopCheckPoint createMockCkForAck(AckMsg ackMsg, long reviveOffset) { PopCheckPoint point = new PopCheckPoint(); point.setStartOffset(ackMsg.getStartOffset()); @@ -496,7 +634,26 @@ private PopCheckPoint createMockCkForAck(AckMsg ackMsg, long reviveOffset) { return point; } + /** + * Process collected checkpoints and revive all un-acked sub-messages. + * + *

Checkpoints are sorted by revive offset. For each one: + *

    + *
  • Skip if the revive time has not yet elapsed (within + * {@code ackTimeInterval + 1s} of {@code endTime})
  • + *
  • Skip if the normal topic or consumer group no longer exists
  • + *
  • Wait if too many revives are already in-flight (max 3)
  • + *
  • Call {@link #reviveMsgFromCk} to re-publish un-acked messages
  • + *
+ * + *

After processing, the revive topic offset is advanced past all + * processed checkpoints. + * + * @param consumeReviveObj the container with collected CKs and scan state + * @throws Throwable if any revive operation fails + */ protected void mergeAndRevive(ConsumeReviveObj consumeReviveObj) throws Throwable { + // sort checkpoints and init newOffset ArrayList sortList = consumeReviveObj.genSortList(); POP_LOGGER.info("reviveQueueId={}, ck listSize={}", queueId, sortList.size()); if (sortList.size() != 0) { @@ -504,6 +661,7 @@ protected void mergeAndRevive(ConsumeReviveObj consumeReviveObj) throws Throwabl sortList.get(0).getReviveOffset(), sortList.get(sortList.size() - 1).getStartOffset(), sortList.get(sortList.size() - 1).getReviveOffset()); } long newOffset = consumeReviveObj.oldOffset; + for (PopCheckPoint popCheckPoint : sortList) { if (!shouldRunPopRevive) { POP_LOGGER.info("slave skip ck process, revive topic={}, reviveQueueId={}", reviveTopic, queueId); @@ -526,11 +684,14 @@ protected void mergeAndRevive(ConsumeReviveObj consumeReviveObj) throws Throwabl continue; } + // Concurrency control for revive: skip first long-running revive task. while (inflightReviveRequestMap.size() > 3) { waitForRunning(100); Pair pair = inflightReviveRequestMap.firstEntry().getValue(); + // if first revive task is timeout, reput it to revive topic, then skip if (!pair.getObject2() && System.currentTimeMillis() - pair.getObject1() > 1000 * 30) { PopCheckPoint oldCK = inflightReviveRequestMap.firstKey(); + // reput checkpoint to revive topic rePutCK(oldCK, pair); inflightReviveRequestMap.remove(oldCK); POP_LOGGER.warn("stay too long, remove from reviveRequestMap, {}, {}, {}, {}", popCheckPoint.getTopic(), @@ -538,10 +699,12 @@ protected void mergeAndRevive(ConsumeReviveObj consumeReviveObj) throws Throwabl } } + // revive message reviveMsgFromCk(popCheckPoint); - newOffset = popCheckPoint.getReviveOffset(); } + + // commit offset if (newOffset > consumeReviveObj.oldOffset) { if (!shouldRunPopRevive) { POP_LOGGER.info("slave skip commit, revive topic={}, reviveQueueId={}", reviveTopic, queueId); @@ -553,22 +716,46 @@ protected void mergeAndRevive(ConsumeReviveObj consumeReviveObj) throws Throwabl consumeReviveObj.newOffset = newOffset; } + /** + * Revive all un-acked sub-messages in a checkpoint: + * - reput message to revive topic + * - put message to retry topic + * + *

For each sub-message whose bit is not set in the bitMap, the original + * message is fetched via {@link #getBizMessage} and re-published to the + * retry topic via {@link #reviveRetry}. All revive attempts run + * concurrently via {@link CompletableFuture#allOf}. + * + *

After all attempts complete: + *

    + *
  • Failed offsets are re-queued via {@link #rePutCK}
  • + *
  • The {@link #inflightReviveRequestMap} is updated and completed + * entries are removed in order, advancing the revive offset
  • + *
+ * + * @param popCheckPoint the checkpoint whose un-acked messages should be revived + */ private void reviveMsgFromCk(PopCheckPoint popCheckPoint) { + // env check and init if (!shouldRunPopRevive) { POP_LOGGER.info("slave skip retry, revive topic={}, reviveQueueId={}", reviveTopic, queueId); return; } inflightReviveRequestMap.put(popCheckPoint, new Pair<>(System.currentTimeMillis(), false)); List>> futureList = new ArrayList<>(popCheckPoint.getNum()); + + // put message to retry topic if checkpoint was not acked for (int j = 0; j < popCheckPoint.getNum(); j++) { + // if checkpoint was acked, skip if (DataConverter.getBit(popCheckPoint.getBitMap(), j)) { continue; } - // retry msg + // get message by checkpoint, then put message to retry topic long msgOffset = popCheckPoint.ackOffsetByIndex((byte) j); CompletableFuture> future = getBizMessage(popCheckPoint, msgOffset) .thenApply(rst -> { + // validate message MessageExt message = rst.getLeft(); if (message == null) { POP_LOGGER.info("reviveQueueId={}, can not get biz msg, topic:{}, qid:{}, offset:{}, brokerName:{}, info:{}, retry:{}, then continue", @@ -580,8 +767,11 @@ private void reviveMsgFromCk(PopCheckPoint popCheckPoint) { }); futureList.add(future); } + + // reput checkpoint to revive topic if retry failed CompletableFuture.allOf(futureList.toArray(new CompletableFuture[0])) .whenComplete((v, e) -> { + // reput checkpoint for (CompletableFuture> future : futureList) { Pair pair = future.getNow(new Pair<>(0L, false)); if (!pair.getObject2()) { @@ -589,9 +779,12 @@ private void reviveMsgFromCk(PopCheckPoint popCheckPoint) { } } + // update ack status of inflight checkpoint if (inflightReviveRequestMap.containsKey(popCheckPoint)) { inflightReviveRequestMap.get(popCheckPoint).setObject2(true); } + + // commit offset and remove inflight checkpoint for (Map.Entry> entry : inflightReviveRequestMap.entrySet()) { PopCheckPoint oldCK = entry.getKey(); Pair pair = entry.getValue(); @@ -605,6 +798,24 @@ private void reviveMsgFromCk(PopCheckPoint popCheckPoint) { }); } + /** + * Re-write a checkpoint to the revive topic after a failed revive attempt. + * + *

When a sub-message cannot be revived (e.g. the original message is + * temporarily unavailable), the CK is re-published with: + *

    + *
  • A single sub-message targeting the failed offset
  • + *
  • An increased {@code rePutTimes} and an extended invisible time + * based on the backoff interval
  • + *
  • A cleared bitMap, so the next revive cycle will retry it
  • + *
+ * + *

If {@code rePutTimes} exceeds the backoff table length and + * {@code skipWhenCKRePutReachMaxTimes} is set, the CK is dropped. + * + * @param oldCK the original checkpoint that failed to revive + * @param pair the failed offset and result (object1 = offset, object2 = result) + */ private void rePutCK(PopCheckPoint oldCK, Pair pair) { int rePutTimes = oldCK.parseRePutTimes(); if (rePutTimes >= ckRewriteIntervalsInSeconds.length && brokerController.getBrokerConfig().isSkipWhenCKRePutReachMaxTimes()) { @@ -654,11 +865,27 @@ public long getReviveBehindMessages() throws ConsumeQueueException { return Math.max(0, diff); } + /** + * Main loop: periodically consume revive messages and revive timed-out CKs. + * + *

Each iteration: + *

    + *
  1. Waits for {@code reviveInterval} (configurable)
  2. + *
  3. Calls {@link #consumeReviveMessage} to scan the revive topic and + * merge checkpoints with their corresponding AckMsg
  4. + *
  5. Calls {@link #mergeAndRevive} to re-publish all un-acked + * sub-messages whose revive time has elapsed
  6. + *
  7. If no checkpoints were processed, increases a {@code slow} counter and + * sleeps longer — the idle interval ramps up to + * {@code reviveMaxSlow * reviveInterval}
  8. + *
+ */ @Override public void run() { int slow = 1; while (!this.isStopped()) { try { + // env check if (System.currentTimeMillis() < brokerController.getShouldStartTime()) { POP_LOGGER.info("PopReviveService Ready to run after {}", brokerController.getShouldStartTime()); this.waitForRunning(1000); @@ -676,6 +903,8 @@ public void run() { } POP_LOGGER.info("start revive topic={}, reviveQueueId={}", reviveTopic, queueId); + + // consume revive message ConsumeReviveObj consumeReviveObj = new ConsumeReviveObj(); consumeReviveMessage(consumeReviveObj); @@ -684,8 +913,10 @@ public void run() { continue; } + // merge checkpoint and ackMsg then revive mergeAndRevive(consumeReviveObj); + // wait and logging ArrayList sortList = consumeReviveObj.sortList; long delay = 0; if (sortList != null && !sortList.isEmpty()) { diff --git a/common/src/main/java/org/apache/rocketmq/common/BrokerConfig.java b/common/src/main/java/org/apache/rocketmq/common/BrokerConfig.java index c97ff2fc297..ca4bada8567 100644 --- a/common/src/main/java/org/apache/rocketmq/common/BrokerConfig.java +++ b/common/src/main/java/org/apache/rocketmq/common/BrokerConfig.java @@ -243,6 +243,11 @@ public class BrokerConfig extends BrokerIdentity { private int popFromRetryProbabilityForPriority = 0; // 0 as the lowest priority if true private boolean priorityOrderAsc = true; + /** + * There are two types of ack mode: + * 1. ack by file system service, which is the default mode. + * 2. ack by key-value service, when popConsumerKVServiceEnable and popConsumerKVServiceInit are both true. + */ private boolean popConsumerFSServiceInit = true; private boolean popConsumerKVServiceLog = false; private boolean popConsumerKVServiceInit = false; @@ -463,7 +468,7 @@ public class BrokerConfig extends BrokerIdentity { private boolean usePIDColdCtrStrategy = true; private long cgColdReadThreshold = 3 * 1024 * 1024; private long globalColdReadThreshold = 100 * 1024 * 1024; - + /** * The interval to fetch namesrv addr, default value is 10 second */ @@ -2108,11 +2113,11 @@ public boolean isUseStaticSubscription() { public void setUseStaticSubscription(boolean useStaticSubscription) { this.useStaticSubscription = useStaticSubscription; } - + public long getFetchNamesrvAddrInterval() { return fetchNamesrvAddrInterval; } - + public void setFetchNamesrvAddrInterval(final long fetchNamesrvAddrInterval) { this.fetchNamesrvAddrInterval = fetchNamesrvAddrInterval; } diff --git a/common/src/main/java/org/apache/rocketmq/common/utils/DataConverter.java b/common/src/main/java/org/apache/rocketmq/common/utils/DataConverter.java index cc96770b22a..474179d52f8 100644 --- a/common/src/main/java/org/apache/rocketmq/common/utils/DataConverter.java +++ b/common/src/main/java/org/apache/rocketmq/common/utils/DataConverter.java @@ -19,15 +19,33 @@ import java.nio.ByteBuffer; import java.nio.charset.Charset; +/** + * Bit-level utility methods, primarily used by Pop-mode ack tracking. + * + *

An {@code int} bitmask is used to track the ack state of up to 32 sub-messages + * within a single Pop checkpoint (see {@code PopCheckPoint}). + */ public class DataConverter { public static final Charset CHARSET_UTF8 = Charset.forName("UTF-8"); + /** + * Convert a {@code long} to an 8-byte array (big-endian). + */ public static byte[] Long2Byte(Long v) { ByteBuffer tmp = ByteBuffer.allocate(8); tmp.putLong(v); return tmp.array(); } + /** + * Set or clear the bit at {@code index} in an int bitmask. + *

Uses {@code 1L} (long literal) to avoid signed-int overflow when {@code index == 31}. + * + * @param value the original bitmask + * @param index the bit position (0-based, 0..31) + * @param flag {@code true} to set, {@code false} to clear + * @return the updated bitmask + */ public static int setBit(int value, int index, boolean flag) { if (flag) { return (int) (value | (1L << index)); @@ -36,6 +54,13 @@ public static int setBit(int value, int index, boolean flag) { } } + /** + * Test whether the bit at {@code index} is set in an int bitmask. + * + * @param value the bitmask + * @param index the bit position (0-based, 0..31) + * @return {@code true} if the bit is 1 + */ public static boolean getBit(int value, int index) { return (value & (1L << index)) != 0; } diff --git a/proxy/src/main/java/org/apache/rocketmq/proxy/ProxyStartup.java b/proxy/src/main/java/org/apache/rocketmq/proxy/ProxyStartup.java index 1b38a19ae6a..94e4f4ffc8b 100644 --- a/proxy/src/main/java/org/apache/rocketmq/proxy/ProxyStartup.java +++ b/proxy/src/main/java/org/apache/rocketmq/proxy/ProxyStartup.java @@ -55,6 +55,14 @@ public class ProxyStartup { private static final Logger log = LoggerFactory.getLogger(LoggerName.PROXY_LOGGER_NAME); + /** + * proxy components container, manager components with method start/shutdown/... + * - gRPC thread pool executor + * - message processor (wrap broker controller) + * - grpc server + * - remoting protocol server + * - ... + */ private static final ProxyStartAndShutdown PROXY_START_AND_SHUTDOWN = new ProxyStartAndShutdown(); private static class ProxyStartAndShutdown extends AbstractStartAndShutdown { @@ -73,8 +81,10 @@ public static void main(String[] args) { // init thread pool monitor for proxy. initThreadPoolMonitor(); + // init business thread pool for grpc server ThreadPoolExecutor executor = createServerExecutor(); + // create message processor, wrap broker controller in local mode MessagingProcessor messagingProcessor = createMessagingProcessor(); // tls cert update diff --git a/proxy/src/main/java/org/apache/rocketmq/proxy/config/ProxyConfig.java b/proxy/src/main/java/org/apache/rocketmq/proxy/config/ProxyConfig.java index 5a1a5859305..7def5314972 100644 --- a/proxy/src/main/java/org/apache/rocketmq/proxy/config/ProxyConfig.java +++ b/proxy/src/main/java/org/apache/rocketmq/proxy/config/ProxyConfig.java @@ -262,7 +262,7 @@ public class ProxyConfig implements ConfigFile { private String remotingAccessAddr = ""; private int remotingListenPort = 8080; - // related to proxy's send strategy in cluster mode. + // related to proxy's sending strategy in cluster mode. private boolean sendLatencyEnable = false; private boolean startDetectorEnable = false; private int detectTimeout = 200; @@ -270,9 +270,38 @@ public class ProxyConfig implements ConfigFile { private int remotingHeartbeatThreadPoolNums = 2 * PROCESSOR_NUMBER; private int remotingTopicRouteThreadPoolNums = 2 * PROCESSOR_NUMBER; + /** + * thread pool number for + * 1. send message(and send message v2) + * 2. send batch message + * 3. consume send message back + * 4. end transaction + * 5. recall message + */ private int remotingSendMessageThreadPoolNums = 4 * PROCESSOR_NUMBER; + /** + * thread pool number for + * 1. pull message + * 2. lite pull message + * 3. pop message + */ private int remotingPullMessageThreadPoolNums = 4 * PROCESSOR_NUMBER; + /** + * thread pool number for + * 1. update consumer offset + * 2. ack message + * 3. change message invisible time + * 4. get consumer connection list + */ private int remotingUpdateOffsetThreadPoolNums = 4 * PROCESSOR_NUMBER; + /** + * thread pool number for + * 1. unregister client + * 2. check client config + * 3. get consumer list by group + * 4. get min/max offset, query consume offset, search offset by timestamp + * 5. lock/unlock batch mq + */ private int remotingDefaultThreadPoolNums = 4 * PROCESSOR_NUMBER; private int remotingHeartbeatThreadPoolQueueCapacity = 50000; diff --git a/proxy/src/main/java/org/apache/rocketmq/proxy/grpc/v2/GrpcMessagingApplication.java b/proxy/src/main/java/org/apache/rocketmq/proxy/grpc/v2/GrpcMessagingApplication.java index 3429ad54e27..12508d32108 100644 --- a/proxy/src/main/java/org/apache/rocketmq/proxy/grpc/v2/GrpcMessagingApplication.java +++ b/proxy/src/main/java/org/apache/rocketmq/proxy/grpc/v2/GrpcMessagingApplication.java @@ -73,6 +73,15 @@ import org.apache.rocketmq.proxy.grpc.v2.common.ResponseWriter; import org.apache.rocketmq.proxy.processor.MessagingProcessor; +/** + * RocketMQ gRPC protocol implementation + * + *

    + *
  • implements gRPC protocol
  • + *
  • execute request in independent thread pool
  • + *
  • execute pipeline, ...
  • + *
+ */ public class GrpcMessagingApplication extends MessagingServiceGrpc.MessagingServiceImplBase implements StartAndShutdown { private final static Logger log = LoggerFactory.getLogger(LoggerName.PROXY_LOGGER_NAME); @@ -168,6 +177,16 @@ protected Status convertExceptionToStatus(Throwable t) { return ResponseBuilder.getInstance().buildStatus(t); } + /** + * submit grpc task to related thread pool. + * + * @param executor thread pool + * @param context context + * @param request grpc request + * @param runnable process task + * @param responseObserver grpc response observer + * @param statusResponseCreator error response creator + */ protected void addExecutor(ExecutorService executor, ProxyContext context, V request, Runnable runnable, StreamObserver responseObserver, Function statusResponseCreator) { if (request instanceof GeneratedMessageV3) { @@ -201,6 +220,12 @@ protected void validateContext(ProxyContext context) { } } + /** + * route query api, producer/consumer will call this api while starting. + * + * @param request request + * @param responseObserver gRPC response observer + */ @Override public void queryRoute(QueryRouteRequest request, StreamObserver responseObserver) { Function statusResponseCreator = status -> QueryRouteResponse.newBuilder().setStatus(status).build(); @@ -218,6 +243,12 @@ public void queryRoute(QueryRouteRequest request, StreamObserver responseObserver) { Function statusResponseCreator = status -> HeartbeatResponse.newBuilder().setStatus(status).build(); @@ -252,6 +283,12 @@ public void sendMessage(SendMessageRequest request, StreamObserver responseObserver) { @@ -420,6 +457,15 @@ public void syncLiteSubscription(SyncLiteSubscriptionRequest request, } } + /** + * telemetry API + * + *
    + *
  • register producer/consumer
  • + *
  • process trace
  • + *
  • verify message result
  • + *
+ */ @Override public StreamObserver telemetry(StreamObserver responseObserver) { Function statusResponseCreator = status -> TelemetryCommand.newBuilder().setStatus(status).build(); diff --git a/proxy/src/main/java/org/apache/rocketmq/proxy/grpc/v2/consumer/ReceiveMessageActivity.java b/proxy/src/main/java/org/apache/rocketmq/proxy/grpc/v2/consumer/ReceiveMessageActivity.java index f5e1c7b76f3..ed32e3d5e61 100644 --- a/proxy/src/main/java/org/apache/rocketmq/proxy/grpc/v2/consumer/ReceiveMessageActivity.java +++ b/proxy/src/main/java/org/apache/rocketmq/proxy/grpc/v2/consumer/ReceiveMessageActivity.java @@ -59,11 +59,22 @@ public ReceiveMessageActivity(MessagingProcessor messagingProcessor, super(messagingProcessor, grpcClientSettingsManager, grpcChannelManager); } + /** + * + * @param ctx ctx + * @param request + * request.invisible_duration => + * Required if client type is simple consumer. + * useless for PushConsumer + * + * @param responseObserver responseObserver + */ public void receiveMessage(ProxyContext ctx, ReceiveMessageRequest request, StreamObserver responseObserver) { ReceiveMessageResponseStreamWriter writer = createWriter(ctx, responseObserver); try { + // Settings were registered when client connected Settings settings = this.grpcClientSettingsManager.getClientSettings(ctx); final boolean isLite = ClientType.LITE_PUSH_CONSUMER.equals(settings.getClientType()); diff --git a/proxy/src/main/java/org/apache/rocketmq/proxy/grpc/v2/producer/SendMessageActivity.java b/proxy/src/main/java/org/apache/rocketmq/proxy/grpc/v2/producer/SendMessageActivity.java index c0138cae7fa..b9925d98909 100644 --- a/proxy/src/main/java/org/apache/rocketmq/proxy/grpc/v2/producer/SendMessageActivity.java +++ b/proxy/src/main/java/org/apache/rocketmq/proxy/grpc/v2/producer/SendMessageActivity.java @@ -63,6 +63,20 @@ public SendMessageActivity(MessagingProcessor messagingProcessor, super(messagingProcessor, grpcClientSettingsManager, grpcChannelManager); } + /** + * send message, execute in producer thread pool + * request flow: + * producer -> grpcRequest -> GrpcMessagingApplication -> ProducerThreadPoolForGrpc(...) + * functionality: + * 1. validate topic + * 2. create queue selector + * 3. build and validate message + * 4. convert response + * + * @param ctx proxy context + * @param request send message request + * @return send message response future + */ public CompletableFuture sendMessage(ProxyContext ctx, SendMessageRequest request) { CompletableFuture future = new CompletableFuture<>(); diff --git a/proxy/src/main/java/org/apache/rocketmq/proxy/processor/ProducerProcessor.java b/proxy/src/main/java/org/apache/rocketmq/proxy/processor/ProducerProcessor.java index 8c4907c588a..e84045e0646 100644 --- a/proxy/src/main/java/org/apache/rocketmq/proxy/processor/ProducerProcessor.java +++ b/proxy/src/main/java/org/apache/rocketmq/proxy/processor/ProducerProcessor.java @@ -66,6 +66,14 @@ public ProducerProcessor(MessagingProcessor messagingProcessor, this.topicMessageTypeValidator = new DefaultTopicMessageTypeValidator(); } + /** + * send message + * 1. validate message type + * 2. select queue + * 3. set message id if not set + * 4. call message service + * 5. fill transaction data if send succeed and is transaction message + */ public CompletableFuture> sendMessage(ProxyContext ctx, QueueSelector queueSelector, String producerGroup, int sysFlag, List messageList, long timeoutMillis) { CompletableFuture> future = new CompletableFuture<>(); @@ -96,6 +104,7 @@ public CompletableFuture> sendMessage(ProxyContext ctx, QueueSe SendMessageRequestHeader requestHeader = buildSendMessageRequestHeader(messageList, producerGroup, sysFlag, messageQueue.getQueueId()); AddressableMessageQueue finalMessageQueue = messageQueue; + // call SendMessageProcessor of broker future = this.serviceManager.getMessageService().sendMessage( ctx, messageQueue, diff --git a/proxy/src/main/java/org/apache/rocketmq/proxy/service/message/MessageService.java b/proxy/src/main/java/org/apache/rocketmq/proxy/service/message/MessageService.java index 1e828c36fd9..a92010d45b4 100644 --- a/proxy/src/main/java/org/apache/rocketmq/proxy/service/message/MessageService.java +++ b/proxy/src/main/java/org/apache/rocketmq/proxy/service/message/MessageService.java @@ -47,6 +47,9 @@ public interface MessageService { + /** + * call SendMessageProcessor of broker + */ CompletableFuture> sendMessage( ProxyContext ctx, AddressableMessageQueue messageQueue, diff --git a/store/src/main/java/org/apache/rocketmq/store/DefaultMessageStore.java b/store/src/main/java/org/apache/rocketmq/store/DefaultMessageStore.java index aee767dae2f..cb8389111a0 100644 --- a/store/src/main/java/org/apache/rocketmq/store/DefaultMessageStore.java +++ b/store/src/main/java/org/apache/rocketmq/store/DefaultMessageStore.java @@ -900,19 +900,19 @@ public GetMessageResult getMessage(final String group, final String topic, final minOffset = consumeQueue.getMinOffsetInQueue(); maxOffset = consumeQueue.getMaxOffsetInQueue(); - if (maxOffset == 0) { + if (maxOffset == 0) { // empty queue status = GetMessageStatus.NO_MESSAGE_IN_QUEUE; nextBeginOffset = nextOffsetCorrection(offset, 0); - } else if (offset < minOffset) { + } else if (offset < minOffset) { // offset too small status = GetMessageStatus.OFFSET_TOO_SMALL; nextBeginOffset = nextOffsetCorrection(offset, minOffset); - } else if (offset == maxOffset) { + } else if (offset == maxOffset) { // offset overflow one status = GetMessageStatus.OFFSET_OVERFLOW_ONE; nextBeginOffset = nextOffsetCorrection(offset, offset); - } else if (offset > maxOffset) { + } else if (offset > maxOffset) { // offset too big status = GetMessageStatus.OFFSET_OVERFLOW_BADLY; nextBeginOffset = nextOffsetCorrection(offset, maxOffset); - } else { + } else { // offset is ok final int maxFilterMessageSize = Math.max(this.messageStoreConfig.getMaxFilterMessageSize(), maxMsgNums * consumeQueue.getUnitSize()); final boolean diskFallRecorded = this.messageStoreConfig.isDiskFallRecorded(); @@ -925,6 +925,14 @@ public GetMessageResult getMessage(final String group, final String topic, final long maxPhyOffsetPulling = 0; int cqFileNum = 0; + /* + * bufferTotalSize is the total message size + * bufferTotalSize less than 0 means + * the while loop will break after getting more than one messages + * + * travelCqFileNumWhenGetMessage limits the max file nums to travel when get message + * default is 1 + */ while (getResult.getBufferTotalSize() <= 0 && nextBeginOffset < maxOffset && cqFileNum++ < this.messageStoreConfig.getTravelCqFileNumWhenGetMessage()) { diff --git a/store/src/main/java/org/apache/rocketmq/store/GetMessageResult.java b/store/src/main/java/org/apache/rocketmq/store/GetMessageResult.java index 6f322a19e19..980f9a7bc89 100644 --- a/store/src/main/java/org/apache/rocketmq/store/GetMessageResult.java +++ b/store/src/main/java/org/apache/rocketmq/store/GetMessageResult.java @@ -21,15 +21,23 @@ import java.util.Collections; import java.util.List; +/** + * result of get message + */ public class GetMessageResult { - + // mappedFile info list private final List messageMapedList; + // message info list in form of ByteBuffer, used by zero copy in version 4.* private final List messageBufferList; + // consume queue offset list private final List messageQueueOffset; private GetMessageStatus status; + // next offset of queue(Consume Queue) private long nextBeginOffset; + // min offset of queue(Consume Queue) private long minOffset; + // max offset of queue(Consume Queue) private long maxOffset; private int bufferTotalSize = 0; diff --git a/store/src/main/java/org/apache/rocketmq/store/SelectMappedBufferResult.java b/store/src/main/java/org/apache/rocketmq/store/SelectMappedBufferResult.java index 5c38cfe92a9..b96dfd98882 100644 --- a/store/src/main/java/org/apache/rocketmq/store/SelectMappedBufferResult.java +++ b/store/src/main/java/org/apache/rocketmq/store/SelectMappedBufferResult.java @@ -19,6 +19,12 @@ import java.nio.ByteBuffer; import org.apache.rocketmq.store.logfile.MappedFile; +/** + * result while select mapped file + * - mapped file + * - offset and size + * - whether it is in memory + */ public class SelectMappedBufferResult { private final long startOffset; diff --git a/store/src/main/java/org/apache/rocketmq/store/pop/PopCheckPoint.java b/store/src/main/java/org/apache/rocketmq/store/pop/PopCheckPoint.java index 803ebc68957..e4ed5c085e8 100644 --- a/store/src/main/java/org/apache/rocketmq/store/pop/PopCheckPoint.java +++ b/store/src/main/java/org/apache/rocketmq/store/pop/PopCheckPoint.java @@ -21,24 +21,62 @@ import java.util.ArrayList; import java.util.List; +/** + * state check info for multi-messages pop from consume queue + */ public class PopCheckPoint implements Comparable { @JSONField(name = "so") private long startOffset; + /** + * pop time, which is the time when message is popped + * reviveTime = popTime + invisibleTime + */ @JSONField(name = "pt") private long popTime; + /** + * the invisible time of messages + * default is 60s, it can be changed by MQ client + */ @JSONField(name = "it") private long invisibleTime; + /** + * store ack states of messages + * one byte for each message + */ @JSONField(name = "bm") private int bitMap; + /** + * total number of messages + */ @JSONField(name = "n") private byte num; @JSONField(name = "q") private int queueId; @JSONField(name = "t") private String topic; + /** + * consumer group + */ private String cid; + /** + * revive offset, which is the consume queue offset of messageExt + */ @JSONField(name = "ro") private long reviveOffset; + /** + * Per-message offset differences from {@link #startOffset}. + * queueOffsetDiff will not be null or empty in 5.* + * + *

When a batch of messages is popped, the queue offsets of the messages may not + * be contiguous (e.g. batch messages, ConsumeQueue compaction, filter mismatch gaps). + * This list records {@code actualQueueOffset - startOffset} for each message in the + * batch, so that the system can correctly map an ack offset back to its index within + * the checkpoint via {@link #indexOfAck}, and reconstruct the original offset via + * {@link #ackOffsetByIndex}. + * + *

When this field is null or empty (old-version CK), offsets are assumed to be + * {@code startOffset + index}. + */ @JSONField(name = "d") private List queueOffsetDiff; @JSONField(name = "bn") @@ -165,12 +203,23 @@ public void addDiff(int diff) { this.queueOffsetDiff.add(diff); } + /** + * Map an ack offset to its index within the checkpoint batch. + * + *

The index is used to look up the corresponding bit in the {@link #bitMap} + * (or in {@code PopCheckPointWrapper.bits}) and to retrieve the original + * queue offset via {@link #ackOffsetByIndex}. + * + * @param ackOffset the queue offset being acked + * @return the sub-message index (0-based), or -1 if the offset is not found + * in this checkpoint + */ public int indexOfAck(long ackOffset) { if (ackOffset < startOffset) { return -1; } - // old version of checkpoint + // old version of checkpoint, this will not happen in 5.* if (queueOffsetDiff == null || queueOffsetDiff.isEmpty()) { if (ackOffset - startOffset < num) { @@ -184,8 +233,16 @@ public int indexOfAck(long ackOffset) { return queueOffsetDiff.indexOf((int) (ackOffset - startOffset)); } + /** + * get original queue offset by index. + * the method name is miss-leading, it should be getQueueOffsetByIndex. + * queueOffset = startOffset + queueOffsetDiff[index] + * + * @param index sub-message index within this checkpoint (0-based) + * @return the original queue offset in the consume queue + */ public long ackOffsetByIndex(byte index) { - // old version of checkpoint + // old version of checkpoint, this will not happen in 5.* if (queueOffsetDiff == null || queueOffsetDiff.isEmpty()) { return startOffset + index; }