[ISSUE #10489] Support BatchChangeInvisibleTime to reduce network round-trips and RocksDB flushes#10519
[ISSUE #10489] Support BatchChangeInvisibleTime to reduce network round-trips and RocksDB flushes#10519qianye1001 wants to merge 15 commits into
Conversation
…rk round-trips and RocksDB flushes
Codecov Report❌ Patch coverage is Additional details and impacted files@@ Coverage Diff @@
## develop #10519 +/- ##
=============================================
+ Coverage 48.12% 48.30% +0.17%
- Complexity 13352 13565 +213
=============================================
Files 1377 1384 +7
Lines 100720 101614 +894
Branches 13012 13133 +121
=============================================
+ Hits 48476 49080 +604
- Misses 46309 46519 +210
- Partials 5935 6015 +80 ☔ View full report in Codecov by Harness. 🚀 New features to boost your workflow:
|
RockteMQ-AI
left a comment
There was a problem hiding this comment.
Review by github-manager-bot
Summary
This PR adds batch support for ChangeInvisibleTime across the full stack (remoting protocol → client → broker → proxy), reducing network round-trips and RocksDB flushes when multiple pop consumer records need invisible-time updates in the same batch. The feature is gated by enableBatchChangeInvisibleTime and falls back to per-entry processing when disabled.
Findings
-
[Critical]
PopConsumerCache.writeAndDeleteRecords— Busy-wait spin lock:while (!consumerLockService.tryLock(...)) { Thread.yield(); }is a CPU-consuming spin loop. Under high contention (many concurrent batch operations on the same group/topic), this can waste significant CPU. Consider using a timed lock (e.g.,tryLockwith a timeout + retry with backoff) or a blocking lock abstraction. -
[Critical]
PopConsumerCache.writeAndDeleteRecords— Lock topic derived from first record only: The lock topic is parsed fromlockRecordwhich comes fromvalidateAndGetLockRecord(writeRecordList, deleteRecordList). If the batch somehow contains records spanning different topics (despite the header-level validation), this could lead to incorrect locking. Please add an assertion or defensive check that all records in the batch share the same topic+group. -
[Warning]
ChangeInvisibleTimeProcessor.processBatchRequest— Batch size splitting: The proxy splits oversized batches (splitOversizedBrokerGroup), but the broker-sideprocessBatchRequestalso checkspopBatchChangeInvisibleTimeMaxSize. If the proxy split threshold and broker max size are misconfigured, the broker will reject the batch and all entries fail. Consider documenting the expected relationship between these two config values, or having the broker auto-split instead of rejecting. -
[Warning]
DefaultReceiptHandleManager.scheduleRenewTask— No max batch size for renew: The renew path collects all expiring handles in a group into a single batch call. If a group has thousands of handles expiring simultaneously (e.g., after a long consumer pause), this could create a very large batch. Consider capping the batch size and splitting into multiple calls. -
[Warning]
ConsumerProcessor.filterPopResult— TO_RETURN batch not flushed on error path: ThetoReturnMessageListis accumulated during the message loop and sent after the loop viabatchChangeInvisibleTime. If an exception occurs during the loop (e.g., in theREJECTcase), the accumulated TO_RETURN handles may not be processed. Consider wrapping the post-loop batch send in afinallyblock or ensuring it runs regardless of earlier exceptions. -
[Warning]
PopConsumerCache.writeAndDeleteRecords— Buffer eviction race: When records are in the in-memory buffer and awriteAndDeleteRecordsis called, the method splits records into buffer vs. store paths. The buffer deletion path callsbuffer.remove(record)while the store path adds tostoreDeleteRecords. If a concurrentscanExpiredRecordspromotes a record from buffer to store between the split and the actual store delete, the record could be missed. The lock should prevent this, but please verify the lock scope covers the full split+write+delete sequence. -
[Info]
PopConsumerRocksdbStore.writeAndDeleteRecords— Delete-before-put ordering: TheWriteBatchprocesses all deletes before puts. This is correct for the case where the same key appears in both lists (old record deleted, new record written with different key due to changed popTime). The ordering is sound. -
[Info] Good defensive handling of null entries in
prepareBatchRequestEntriesandprocessBatchChangeInvisibleTimeResponse. Each null entry gets a failure response without aborting the entire batch. -
[Info] Test coverage is comprehensive — covers cache, RocksDB store, service layer, processor, client API, and proxy components. The split-by-broker and oversized-batch tests in the proxy are particularly valuable.
Suggestions
- Replace the
Thread.yield()spin loop with a timed wait orCompletableFuture-based lock to avoid CPU waste under contention. - Add a defensive assertion that all records in a batch share the same topic+group before acquiring the lock.
- Consider aligning or documenting the relationship between proxy
batchChangeInvisibleTimeMaxSizeand brokerpopBatchChangeInvisibleTimeMaxSize. - Add a max batch size cap in the renew path to handle edge cases with large numbers of expiring handles.
Cross-repo Note
This PR adds new request/response protocol types (BatchChangeInvisibleTimeRequestHeader, BatchChangeInvisibleTimeRequestBody, etc.) in rocketmq-remoting. The rocketmq-clients repos do not need changes since the batch protocol is internal (broker ↔ proxy), not client-facing.
Automated review by github-manager-bot
RockteMQ-AI
left a comment
There was a problem hiding this comment.
Review by github-manager-bot (re-review)
Summary
Re-review after 2 new commits:
5df26c4Fix single change invisible time KV write batch76cc6b4Serialize oversized batch change invisible time chunks
Changes Since Last Review
Commit 1 — Single-entry path fix: The changeInvisibilityDuration method now uses writeAndDeleteRecords for both the single-entry and batch paths, ensuring a single batched KV operation instead of separate write + delete calls. This is a clean improvement.
Commit 2 — Oversized batch chunking: The proxy now splits large handleMessageList into chunks of batchChangeInvisibleTimeMaxNum and processes them sequentially via thenCompose. This avoids overwhelming a single broker with many concurrent write-heavy batches. The fallback to per-item requests on failure is preserved.
Updated Findings
- [Resolved] The single-entry path now correctly uses the batch API — good fix.
- [Resolved] Oversized batches are now chunked and serialized — addresses the concern about large batch pressure.
- [Info]
PopConsumerCache.writeAndDeleteRecords— The spin-lock pattern (while (!tryLock) { Thread.yield(); }) is consistent with the existingdeleteRecordsmethod in the same class. While not ideal under high contention, this follows the established codebase convention. Acceptable for now; consider a timed backoff in a future cleanup. - [Positive] The
validateAndGetLockRecordmethod correctly ensures all records share the same lock key, preventing cross-topic/group lock conflicts. - [Positive] Good test coverage for the new
writeAndDeleteRecordsmethod — tests cover empty deletes, buffered records, topic lock routing, and mixed lock key rejection.
Verdict
The two new commits meaningfully address the previous review findings. The implementation looks solid.
Automated re-review by github-manager-bot
RockteMQ-AI
left a comment
There was a problem hiding this comment.
Review by github-manager-bot (Re-review after 4 new commits)
Summary
Four new commits since the previous review:
- Refine batch entries — Introduces
ChangeInvisibleTimeRequestEntryas a first-class remoting type with proper serialization - Remove redundant validation — Removes the batch-level topic/group validation in
PopConsumerCache(now enforced at the header level) - Simplify write/delete — Replaces the busy-wait spin lock with a clean split-buffer/store approach
- Fix serializable compat — Skips
transientfields in the reflection-based compatibility test
Previous Findings — Status
-
[Critical] Busy-wait spin lock in— Resolved. The spin lock is removed. The new implementation splits records into buffer vs. store paths and processes them independently. Much cleaner.writeAndDeleteRecords -
[Critical] Lock topic derived from first record only— Resolved. The lock mechanism is removed entirely. The batch header enforces topic+group consistency, and the split logic processes each record independently. -
[Warning] Batch size splitting — Partially addressed. The broker now checks
batchMaxNumand rejects oversized batches. The proxy splits oversized groups viasplitOversizedBrokerGroup. However, see remaining concern below. -
[Warning] TO_RETURN batch not flushed on error — Likely resolved. The loop catches per-message exceptions and continues, so the post-loop batch send should execute. But consider wrapping the post-loop
batchChangeInvisibleTimecall in afinallyblock for extra safety. -
[Warning] Buffer eviction race — Resolved. The new
writeAndDeleteRecordscleanly separates buffer and store paths, avoiding the race between buffer eviction and batch write.
Remaining Findings
-
[Warning]
ConsumerProcessor.scheduleRenewTask— No max batch size for renew. The renew path still collects all expiring handles in a group into a singlebatchChangeInvisibleTimecall. If a group has thousands of handles expiring simultaneously (e.g., after a long consumer pause), this creates a very large batch. Consider capping the batch size (e.g.,proxyConfig.getBatchChangeInvisibleTimeMaxNum()) and splitting into multiple calls. -
[Warning]
ChangeInvisibleTimeProcessor.processBatchRequestAsync— Batch rejection vs. auto-split. WhenrequestEntries.size() > batchMaxNum, the broker rejects the entire batch. If the proxy split threshold and broker max size are misconfigured, all entries fail. Consider auto-splitting at the broker level instead of rejecting, or at minimum documenting the expected relationship betweenpopBatchChangeInvisibleTimeMaxSize(proxy) andbatchChangeInvisibleTimeMaxNum(broker). -
[Info]
ChangeInvisibleTimeRequestEntry— Good addition of a first-class remoting type. Theencode()/decode()methods handle the serialization correctly. The@JSONFieldannotations ensure proper JSON mapping. -
[Info]
RemotingSerializableCompatTest— Skippingtransientfields in the compatibility check is correct. Transient fields are not part of the wire format.
Cross-repo Note
This PR adds BATCH_CHANGE_MESSAGE_INVISIBLETIME as a new RequestCode. Ensure the RequestCode value does not conflict with any in-flight PRs in apache/rocketmq. The corresponding client-side support is included in this PR, so no cross-repo coordination is needed.
Automated review by github-manager-bot
|
[P1] Preserve input ordering when returning batch change-invisible-time results
When a renewal batch contains handles from different brokers/real topics, or contains an expired handle, a result can therefore be assigned to a different message. This may update message A with message B receipt handle and cause subsequent renew/ack operations to target the wrong offset. Please preserve the original input order in the returned result list, or correlate each result with its |
RockteMQ-AI
left a comment
There was a problem hiding this comment.
Review by github-manager-bot (Re-review after new commit)
Summary
One new commit since the previous review:
- Fix test flake — Reorders mock setup before in , properly mocks to return a completed , and adds try/finally cleanup.
New Commit Assessment
- [Info] — Good fix. The previous ordering ( → ) had a race where the background thread could attempt processing before the mock was configured. The new ordering (mock setup → → ) eliminates the race. The try/finally cleanup ( + ) is also a good practice to avoid test interference.
Previous Findings — No Change
All previous findings remain as stated in the last review. The remaining warnings about batch size capping in and broker-level batch rejection vs. auto-split are still applicable.
No new issues found in this commit.
Automated review by github-manager-bot
RockteMQ-AI
left a comment
There was a problem hiding this comment.
Review by github-manager-bot (Re-review after new commit)
Summary
One new commit since the previous review:
- Fix test flake — Reorders mock setup before
start()inReceiptHandleProcessorTest.testStart(), properly mockschangeInvisibleTimeto return a completedCompletableFuture<AckResult>, and adds try/finally cleanup.
New Commit Assessment
- [Info]
ReceiptHandleProcessorTest.testStart— Good fix. The previous ordering (start()thenaddReceiptHandle()) had a race where the background thread could attempt processing before the mock was configured. The new ordering (mock setup,addReceiptHandle(), thenstart()) eliminates the race. The try/finally cleanup (removeReceiptHandle+shutdown) is also a good practice to avoid test interference.
Previous Findings — No Change
All previous findings remain as stated in the last review. The remaining warnings about batch size capping in ConsumerProcessor.scheduleRenewTask and broker-level batch rejection vs. auto-split are still applicable.
No new issues found in this commit.
Automated review by github-manager-bot
|
[P1] Do not fall back to single requests after an ambiguous batch failure
Retrying each entry with the original receipt handle is not idempotent: each execution generates a new Please restrict fallback to an explicit response proving that the broker does not support the batch request code. Transport failures and timeouts should not be retried this way unless the operation is made idempotent, for example with a request identifier deduplicated by the broker. A regression test should cover the case where the broker applies the batch but the client future completes with a timeout. |
|
[P1] Make cache ownership transfer atomic with concurrent cleanup
A cleanup thread can move the old CK from Please coordinate cleanup and renewal through an atomic claim/removal or another explicit ownership state transition before deciding whether the old CK must be deleted from RocksDB. A concurrent regression test should pause between classification and deletion while cleanup stages and flushes the same record. |
RockteMQ-AI
left a comment
There was a problem hiding this comment.
Review by github-manager-bot (Re-review after 2 new commits)
Summary
Two new commits since the previous review:
7a2e4bcFix batch change invisible time result ordering — RefactorsConsumerProcessor.batchChangeInvisibleTimeto preserve input order in results by using index-based arrays instead of appending to a list.3c5cc1bKeep single invisible time change direct — Reverts the delegation ofchangeInvisibilityDurationto the batch path, restoring the original direct implementation.
New Commit Assessment
-
[Info]
ConsumerProcessor.batchChangeInvisibleTimeordering fix — Good change. The previous implementation grouped messages by broker and appended results in broker-group order, losing the original input ordering. The new approach:- Uses
BatchChangeInvisibleTimeResult[]array indexed by original position - Tracks indices via
brokerHandleIndexMapinstead of message lists - Converts array to list via
buildBatchChangeInvisibleTimeResultListat the end - This ensures callers receive results in the same order as the input
handleMessageList, which is important for correlating results with requests.
- Uses
-
[Info]
ConsumerProcessorTest.testBatchChangeInvisibleTimePreserveInputOrderWithExpiredAndInterleavedGroups— Comprehensive test that verifies ordering is preserved with expired handles, interleaved broker assignments, and multiple groups. Good coverage. -
[Info]
PopConsumerService.changeInvisibilityDurationrevert — Correct decision. Delegating a single-item call through the batch path (batchChangeInvisibilityDuration(Collections.singletonList(...))) added unnecessary overhead (record conversion, batch assembly) for the common single-item case. The direct path withPopConsumerRecordis more efficient and avoids the extra allocation. The net diff for this file is zero.
Previous Findings — No Change
All previous findings remain as stated in the last review. The remaining warnings about batch size capping in ConsumerProcessor.scheduleRenewTask and broker-level batch rejection vs. auto-split are still applicable.
No new issues found in these commits.
Automated review by github-manager-bot
RockteMQ-AI
left a comment
There was a problem hiding this comment.
Review by github-manager-bot (Re-review after 2 new commits)
Summary
Two new commits since the previous review:
7ed9d80cFix receipt handle group async cleanup test — Minor test adjustment inReceiptHandleGroupTest.java(+3/-3).7b533ec6Mark broker shutdown test medium in Bazel — Adds timeout/size annotation to a Bazel test target inbroker/BUILD.bazel(+3/-0).
Diff Verification
✅ All previously reviewed changes remain intact:
- Batch
ChangeInvisibleTimeremoting protocol ConsumerProcessor.batchChangeInvisibleTime()with index-based result ordering- Single invisible time change kept as direct path (not delegated to batch)
- Chunk serialization for oversized batches
- Pop cache batch write/delete simplification
- Remoting serializable compat for transient fields
✅ New commits are test-only changes:
- Test fix is a minor assertion/ordering adjustment
- Bazel annotation is a build configuration change
Verdict
Test fixes are clean and don't affect production code. The overall PR design (batch invisible time with chunking, backward-compatible single-item path) remains sound. LGTM.
Automated review by github-manager-bot
|
@fuyou001 Thanks, agreed. Fixed in d1d6e3e. The proxy fallback is now restricted to the explicit unsupported case only: the real exception must be an Added regression coverage in
Verified locally:
Comment addressed: #10519 (comment) |
|
@fuyou001 Thanks for the detailed analysis. I considered this one and I do not plan to address it in this batch-change PR. The reason is that this is not a new correctness model introduced by the batch path. The existing single Making the ownership transfer atomic would likely require coordinating renewal/change and cleanup with an additional lock or explicit ownership state transition on the hot path. For the batch path that cost is relatively high, especially because the observed impact is a low-probability duplicate delivery scenario and POP consumption already has at-least-once semantics. I would prefer not to add that synchronization cost in this PR. So for this PR I am keeping the batch behavior aligned with the existing single-change semantics and limiting the changes to reducing broker round trips and store writes/deletes. If we want to harden this race, I think it should be handled separately for both single and batch paths with a dedicated design and benchmark. Comment addressed: #10519 (comment) |
RockteMQ-AI
left a comment
There was a problem hiding this comment.
Review by github-manager-bot (Re-review after 3 new commits)
Summary
Three new commits since the previous review:
d1d6e3eFix batch change invisible time fallback — Critical correctness fix for the fallback logic3e563b2ci: re-trigger CI — CI housekeepingdae2063Stabilize producer backpressure test — Removes flaky timing-dependent test
Changes Since Last Review
1. Fallback Logic Fix (ConsumerProcessor.java)
- [Info] Previously, any failure from
batchChangeInvisibleTimetriggered a fallback to singlechangeInvisibleTimerequests. This was unsafe: if the batch partially succeeded on the broker (e.g., timeout after applying some changes), falling back would create duplicate checkpoints. - [Info] Now only falls back when the error is
REQUEST_CODE_NOT_SUPPORTED(broker genuinely does not support the batch API). For all other failures (timeout, internal error), returns errors directly viabuildBatchChangeInvisibleTimeExceptionResults. - [Info] The
isBatchChangeInvisibleTimeNotSupported()helper correctly unwraps the real exception viaExceptionUtils.getRealException()before checking the type and response code.
2. Method Rename (ChangeInvisibleTimeProcessor.java)
- [Info]
validateBatchRequestEntries→normalizeAndValidateBatchRequestEntries. The new name accurately reflects that the method both normalizes and validates entries.
3. Test Stabilization (DefaultMQProducerTest.java)
- [Info]
testRunningSetBackCompresssimplified: removed multi-threadedCountDownLatchapproach, replaced with direct semaphore manipulation. Eliminates timing-dependent flakiness.
4. New Tests (ConsumerProcessorTest.java)
- [Info]
testBatchChangeInvisibleTimeFallbackOnlyWhenRequestCodeNotSupported— Verifies fallback occurs only forREQUEST_CODE_NOT_SUPPORTED. - [Info]
testBatchChangeInvisibleTimeDoesNotFallbackOnAmbiguousFailure— Verifies no fallback for ambiguous failures (e.g.,TimeoutException), preventing duplicate checkpoints.
Findings
- [Info] The fallback fix is a significant correctness improvement. The distinction between "broker does not support batch" (safe to fall back) and "batch may have partially succeeded" (unsafe to fall back) is well-reasoned.
- [Info] Test coverage for the two fallback paths (supported vs. unsupported) is thorough.
- [Info] The
buildReceiptHandleMessageshelper reduces test boilerplate.
Suggestions
- [Info] Consider logging the actual exception type in the non-fallback path at
INFOlevel (currentlyWARN) to help operators distinguish between "broker too old" and "transient failure" in production logs. Minor nit.
Overall the incremental changes look solid. The fallback correctness fix is important.
Automated review by github-manager-bot
| import java.util.List; | ||
| import org.apache.rocketmq.remoting.protocol.RemotingSerializable; | ||
|
|
||
| public class BatchChangeInvisibleTimeRequestBody extends RemotingSerializable { |
There was a problem hiding this comment.
need to check if the batch size exceeds the length limit of the frequency.
Fixes #10489
Summary
enableBatchChangeInvisibleTime.Tests
mvn -pl common,remoting,client,broker,proxy -DskipTests compile test-compilemvn -pl common,remoting,client,broker,proxy -DfailIfNoTests=false -Dtest=BatchChangeInvisibleTimeTest,MQClientAPIImplTest#testBatchChangeInvisibleTimeAsyncSendsRequestHeader+testProcessBatchChangeInvisibleTimeResponse,ChangeInvisibleTimeProcessorTest#testProcessBatchRequestRejectsMismatchedTopicOrGroup+testProcessBatchKvRequestDoesNotBuildSingleHeader+testProcessBatchRequestConvertsBadEntryToPerEntryFailure+testProcessBatchRequestRejectsOversizedBody,PopConsumerCacheTest,PopConsumerRocksdbStoreTest,ConsumerProcessorTest#testBatchChangeInvisibleTimeSplitByRealTopic+testBatchChangeInvisibleTime+testBatchChangeInvisibleTimeSplitOversizedBrokerGroup,ReceiveMessageResponseStreamWriterTest#testWriteBackFailureShouldBatchChangeInvisibleTime,ReceiptHandleProcessorTest#testBatchRenewMessage+testBatchClearGroup testmvn -pl common,remoting,client,broker,proxy -DfailIfNoTests=false -Dtest=ReceiveMessageResponseStreamWriterTest,ReceiptHandleProcessorTest test