Skip to content

[ISSUE #10489] Support BatchChangeInvisibleTime to reduce network round-trips and RocksDB flushes#10519

Open
qianye1001 wants to merge 15 commits into
apache:developfrom
qianye1001:codex/batch-change-invisible-time
Open

[ISSUE #10489] Support BatchChangeInvisibleTime to reduce network round-trips and RocksDB flushes#10519
qianye1001 wants to merge 15 commits into
apache:developfrom
qianye1001:codex/batch-change-invisible-time

Conversation

@qianye1001

Copy link
Copy Markdown
Contributor

Fixes #10489

Summary

  • Add remoting/client/proxy support for BatchChangeInvisibleTime with a batch header scoped to one topic and consumer group for broker-side auth.
  • Add broker batch handling for PopConsumer KV change invisible time, including buffer-aware cache handling and delete-before-put RocksDB batch semantics.
  • Wire proxy batch call sites for receipt handle renew/cleanup, receive write-back failure nack, and filtered TO_RETURN messages, gated by enableBatchChangeInvisibleTime.

Tests

  • mvn -pl common,remoting,client,broker,proxy -DskipTests compile test-compile
  • mvn -pl common,remoting,client,broker,proxy -DfailIfNoTests=false -Dtest=BatchChangeInvisibleTimeTest,MQClientAPIImplTest#testBatchChangeInvisibleTimeAsyncSendsRequestHeader+testProcessBatchChangeInvisibleTimeResponse,ChangeInvisibleTimeProcessorTest#testProcessBatchRequestRejectsMismatchedTopicOrGroup+testProcessBatchKvRequestDoesNotBuildSingleHeader+testProcessBatchRequestConvertsBadEntryToPerEntryFailure+testProcessBatchRequestRejectsOversizedBody,PopConsumerCacheTest,PopConsumerRocksdbStoreTest,ConsumerProcessorTest#testBatchChangeInvisibleTimeSplitByRealTopic+testBatchChangeInvisibleTime+testBatchChangeInvisibleTimeSplitOversizedBrokerGroup,ReceiveMessageResponseStreamWriterTest#testWriteBackFailureShouldBatchChangeInvisibleTime,ReceiptHandleProcessorTest#testBatchRenewMessage+testBatchClearGroup test
  • mvn -pl common,remoting,client,broker,proxy -DfailIfNoTests=false -Dtest=ReceiveMessageResponseStreamWriterTest,ReceiptHandleProcessorTest test

@codecov-commenter

codecov-commenter commented Jun 16, 2026

Copy link
Copy Markdown

Codecov Report

❌ Patch coverage is 62.66830% with 305 lines in your changes missing coverage. Please review.
✅ Project coverage is 48.30%. Comparing base (b4d92c0) to head (dae2063).
⚠️ Report is 16 commits behind head on develop.

Files with missing lines Patch % Lines
...broker/processor/ChangeInvisibleTimeProcessor.java 49.38% 66 Missing and 16 partials ⚠️
...tmq/proxy/service/message/LocalMessageService.java 0.00% 60 Missing ⚠️
...cketmq/proxy/processor/ReceiptHandleProcessor.java 58.62% 19 Missing and 5 partials ⚠️
...he/rocketmq/proxy/processor/ConsumerProcessor.java 85.29% 11 Missing and 9 partials ⚠️
.../apache/rocketmq/test/client/rmq/RMQPopClient.java 0.00% 20 Missing ⚠️
...g/apache/rocketmq/client/impl/MQClientAPIImpl.java 64.00% 12 Missing and 6 partials ⚠️
...q/proxy/service/message/ClusterMessageService.java 0.00% 17 Missing ⚠️
...apache/rocketmq/broker/pop/PopConsumerService.java 63.15% 7 Missing and 7 partials ⚠️
...2/consumer/ReceiveMessageResponseStreamWriter.java 71.73% 11 Missing and 2 partials ⚠️
.../header/BatchChangeInvisibleTimeRequestHeader.java 0.00% 13 Missing ⚠️
... and 8 more
Additional details and impacted files
@@              Coverage Diff              @@
##             develop   #10519      +/-   ##
=============================================
+ Coverage      48.12%   48.30%   +0.17%     
- Complexity     13352    13565     +213     
=============================================
  Files           1377     1384       +7     
  Lines         100720   101614     +894     
  Branches       13012    13133     +121     
=============================================
+ Hits           48476    49080     +604     
- Misses         46309    46519     +210     
- Partials        5935     6015      +80     

☔ View full report in Codecov by Harness.
📢 Have feedback on the report? Share it here.

🚀 New features to boost your workflow:
  • ❄️ Test Analytics: Detect flaky tests, report on failures, and find test suite problems.

@RockteMQ-AI RockteMQ-AI left a comment

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Review by github-manager-bot

Summary

This PR adds batch support for ChangeInvisibleTime across the full stack (remoting protocol → client → broker → proxy), reducing network round-trips and RocksDB flushes when multiple pop consumer records need invisible-time updates in the same batch. The feature is gated by enableBatchChangeInvisibleTime and falls back to per-entry processing when disabled.

Findings

  • [Critical] PopConsumerCache.writeAndDeleteRecordsBusy-wait spin lock: while (!consumerLockService.tryLock(...)) { Thread.yield(); } is a CPU-consuming spin loop. Under high contention (many concurrent batch operations on the same group/topic), this can waste significant CPU. Consider using a timed lock (e.g., tryLock with a timeout + retry with backoff) or a blocking lock abstraction.

  • [Critical] PopConsumerCache.writeAndDeleteRecordsLock topic derived from first record only: The lock topic is parsed from lockRecord which comes from validateAndGetLockRecord(writeRecordList, deleteRecordList). If the batch somehow contains records spanning different topics (despite the header-level validation), this could lead to incorrect locking. Please add an assertion or defensive check that all records in the batch share the same topic+group.

  • [Warning] ChangeInvisibleTimeProcessor.processBatchRequestBatch size splitting: The proxy splits oversized batches (splitOversizedBrokerGroup), but the broker-side processBatchRequest also checks popBatchChangeInvisibleTimeMaxSize. If the proxy split threshold and broker max size are misconfigured, the broker will reject the batch and all entries fail. Consider documenting the expected relationship between these two config values, or having the broker auto-split instead of rejecting.

  • [Warning] DefaultReceiptHandleManager.scheduleRenewTaskNo max batch size for renew: The renew path collects all expiring handles in a group into a single batch call. If a group has thousands of handles expiring simultaneously (e.g., after a long consumer pause), this could create a very large batch. Consider capping the batch size and splitting into multiple calls.

  • [Warning] ConsumerProcessor.filterPopResultTO_RETURN batch not flushed on error path: The toReturnMessageList is accumulated during the message loop and sent after the loop via batchChangeInvisibleTime. If an exception occurs during the loop (e.g., in the REJECT case), the accumulated TO_RETURN handles may not be processed. Consider wrapping the post-loop batch send in a finally block or ensuring it runs regardless of earlier exceptions.

  • [Warning] PopConsumerCache.writeAndDeleteRecordsBuffer eviction race: When records are in the in-memory buffer and a writeAndDeleteRecords is called, the method splits records into buffer vs. store paths. The buffer deletion path calls buffer.remove(record) while the store path adds to storeDeleteRecords. If a concurrent scanExpiredRecords promotes a record from buffer to store between the split and the actual store delete, the record could be missed. The lock should prevent this, but please verify the lock scope covers the full split+write+delete sequence.

  • [Info] PopConsumerRocksdbStore.writeAndDeleteRecordsDelete-before-put ordering: The WriteBatch processes all deletes before puts. This is correct for the case where the same key appears in both lists (old record deleted, new record written with different key due to changed popTime). The ordering is sound.

  • [Info] Good defensive handling of null entries in prepareBatchRequestEntries and processBatchChangeInvisibleTimeResponse. Each null entry gets a failure response without aborting the entire batch.

  • [Info] Test coverage is comprehensive — covers cache, RocksDB store, service layer, processor, client API, and proxy components. The split-by-broker and oversized-batch tests in the proxy are particularly valuable.

Suggestions

  1. Replace the Thread.yield() spin loop with a timed wait or CompletableFuture-based lock to avoid CPU waste under contention.
  2. Add a defensive assertion that all records in a batch share the same topic+group before acquiring the lock.
  3. Consider aligning or documenting the relationship between proxy batchChangeInvisibleTimeMaxSize and broker popBatchChangeInvisibleTimeMaxSize.
  4. Add a max batch size cap in the renew path to handle edge cases with large numbers of expiring handles.

Cross-repo Note

This PR adds new request/response protocol types (BatchChangeInvisibleTimeRequestHeader, BatchChangeInvisibleTimeRequestBody, etc.) in rocketmq-remoting. The rocketmq-clients repos do not need changes since the batch protocol is internal (broker ↔ proxy), not client-facing.


Automated review by github-manager-bot

@RockteMQ-AI RockteMQ-AI left a comment

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Review by github-manager-bot (re-review)

Summary

Re-review after 2 new commits:

  1. 5df26c4 Fix single change invisible time KV write batch
  2. 76cc6b4 Serialize oversized batch change invisible time chunks

Changes Since Last Review

Commit 1 — Single-entry path fix: The changeInvisibilityDuration method now uses writeAndDeleteRecords for both the single-entry and batch paths, ensuring a single batched KV operation instead of separate write + delete calls. This is a clean improvement.

Commit 2 — Oversized batch chunking: The proxy now splits large handleMessageList into chunks of batchChangeInvisibleTimeMaxNum and processes them sequentially via thenCompose. This avoids overwhelming a single broker with many concurrent write-heavy batches. The fallback to per-item requests on failure is preserved.

Updated Findings

  • [Resolved] The single-entry path now correctly uses the batch API — good fix.
  • [Resolved] Oversized batches are now chunked and serialized — addresses the concern about large batch pressure.
  • [Info] PopConsumerCache.writeAndDeleteRecords — The spin-lock pattern (while (!tryLock) { Thread.yield(); }) is consistent with the existing deleteRecords method in the same class. While not ideal under high contention, this follows the established codebase convention. Acceptable for now; consider a timed backoff in a future cleanup.
  • [Positive] The validateAndGetLockRecord method correctly ensures all records share the same lock key, preventing cross-topic/group lock conflicts.
  • [Positive] Good test coverage for the new writeAndDeleteRecords method — tests cover empty deletes, buffered records, topic lock routing, and mixed lock key rejection.

Verdict

The two new commits meaningfully address the previous review findings. The implementation looks solid.


Automated re-review by github-manager-bot

@RockteMQ-AI RockteMQ-AI left a comment

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Review by github-manager-bot (Re-review after 4 new commits)

Summary

Four new commits since the previous review:

  1. Refine batch entries — Introduces ChangeInvisibleTimeRequestEntry as a first-class remoting type with proper serialization
  2. Remove redundant validation — Removes the batch-level topic/group validation in PopConsumerCache (now enforced at the header level)
  3. Simplify write/delete — Replaces the busy-wait spin lock with a clean split-buffer/store approach
  4. Fix serializable compat — Skips transient fields in the reflection-based compatibility test

Previous Findings — Status

  • [Critical] Busy-wait spin lock in writeAndDeleteRecordsResolved. The spin lock is removed. The new implementation splits records into buffer vs. store paths and processes them independently. Much cleaner.

  • [Critical] Lock topic derived from first record onlyResolved. The lock mechanism is removed entirely. The batch header enforces topic+group consistency, and the split logic processes each record independently.

  • [Warning] Batch size splitting — Partially addressed. The broker now checks batchMaxNum and rejects oversized batches. The proxy splits oversized groups via splitOversizedBrokerGroup. However, see remaining concern below.

  • [Warning] TO_RETURN batch not flushed on error — Likely resolved. The loop catches per-message exceptions and continues, so the post-loop batch send should execute. But consider wrapping the post-loop batchChangeInvisibleTime call in a finally block for extra safety.

  • [Warning] Buffer eviction race — Resolved. The new writeAndDeleteRecords cleanly separates buffer and store paths, avoiding the race between buffer eviction and batch write.

Remaining Findings

  • [Warning] ConsumerProcessor.scheduleRenewTaskNo max batch size for renew. The renew path still collects all expiring handles in a group into a single batchChangeInvisibleTime call. If a group has thousands of handles expiring simultaneously (e.g., after a long consumer pause), this creates a very large batch. Consider capping the batch size (e.g., proxyConfig.getBatchChangeInvisibleTimeMaxNum()) and splitting into multiple calls.

  • [Warning] ChangeInvisibleTimeProcessor.processBatchRequestAsyncBatch rejection vs. auto-split. When requestEntries.size() > batchMaxNum, the broker rejects the entire batch. If the proxy split threshold and broker max size are misconfigured, all entries fail. Consider auto-splitting at the broker level instead of rejecting, or at minimum documenting the expected relationship between popBatchChangeInvisibleTimeMaxSize (proxy) and batchChangeInvisibleTimeMaxNum (broker).

  • [Info] ChangeInvisibleTimeRequestEntry — Good addition of a first-class remoting type. The encode()/decode() methods handle the serialization correctly. The @JSONField annotations ensure proper JSON mapping.

  • [Info] RemotingSerializableCompatTest — Skipping transient fields in the compatibility check is correct. Transient fields are not part of the wire format.

Cross-repo Note

This PR adds BATCH_CHANGE_MESSAGE_INVISIBLETIME as a new RequestCode. Ensure the RequestCode value does not conflict with any in-flight PRs in apache/rocketmq. The corresponding client-side support is included in this PR, so no cross-repo coordination is needed.


Automated review by github-manager-bot

@fuyou001

Copy link
Copy Markdown
Contributor

[P1] Preserve input ordering when returning batch change-invisible-time results

batchChangeInvisibleTime does not preserve the order of handleMessageList: expired-handle results are appended first, while the remaining results are appended later in HashMap group iteration order. However, ReceiptHandleProcessor.batchChangeInvisibleTime consumes the returned list by index and completes the future for indexes.get(i) with results.get(i).

When a renewal batch contains handles from different brokers/real topics, or contains an expired handle, a result can therefore be assigned to a different message. This may update message A with message B receipt handle and cause subsequent renew/ack operations to target the wrong offset.

Please preserve the original input order in the returned result list, or correlate each result with its ReceiptHandleMessage instead of consuming results positionally. A regression test should cover interleaved broker/topic groups and an expired handle.

Comment thread proxy/src/main/java/org/apache/rocketmq/proxy/processor/ConsumerProcessor.java Outdated

@RockteMQ-AI RockteMQ-AI left a comment

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Review by github-manager-bot (Re-review after new commit)

Summary

One new commit since the previous review:

  • Fix test flake — Reorders mock setup before in , properly mocks to return a completed , and adds try/finally cleanup.

New Commit Assessment

  • [Info] — Good fix. The previous ordering ( → ) had a race where the background thread could attempt processing before the mock was configured. The new ordering (mock setup → → ) eliminates the race. The try/finally cleanup ( + ) is also a good practice to avoid test interference.

Previous Findings — No Change

All previous findings remain as stated in the last review. The remaining warnings about batch size capping in and broker-level batch rejection vs. auto-split are still applicable.

No new issues found in this commit.


Automated review by github-manager-bot

@RockteMQ-AI RockteMQ-AI left a comment

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Review by github-manager-bot (Re-review after new commit)

Summary

One new commit since the previous review:

  • Fix test flake — Reorders mock setup before start() in ReceiptHandleProcessorTest.testStart(), properly mocks changeInvisibleTime to return a completed CompletableFuture<AckResult>, and adds try/finally cleanup.

New Commit Assessment

  • [Info] ReceiptHandleProcessorTest.testStart — Good fix. The previous ordering (start() then addReceiptHandle()) had a race where the background thread could attempt processing before the mock was configured. The new ordering (mock setup, addReceiptHandle(), then start()) eliminates the race. The try/finally cleanup (removeReceiptHandle + shutdown) is also a good practice to avoid test interference.

Previous Findings — No Change

All previous findings remain as stated in the last review. The remaining warnings about batch size capping in ConsumerProcessor.scheduleRenewTask and broker-level batch rejection vs. auto-split are still applicable.

No new issues found in this commit.


Automated review by github-manager-bot

@fuyou001

Copy link
Copy Markdown
Contributor

[P1] Do not fall back to single requests after an ambiguous batch failure

processBrokerChangeInvisibleTime falls back to individual changeInvisibleTime calls for every exception from batchChangeInvisibleTime, including timeouts, connection resets, and lost responses. Those failures do not prove that the broker did not apply the batch. The broker may have already persisted the new CK records and only the response failed to reach the proxy.

Retrying each entry with the original receipt handle is not idempotent: each execution generates a new changedPopTime, so the fallback can create a second CK while the CK created by the successful batch remains. Both CKs may later revive the same message, causing duplicate delivery.

Please restrict fallback to an explicit response proving that the broker does not support the batch request code. Transport failures and timeouts should not be retried this way unless the operation is made idempotent, for example with a request identifier deduplicated by the broker. A regression test should cover the case where the broker applies the batch but the client future completes with a timeout.

@fuyou001

Copy link
Copy Markdown
Contributor

[P1] Make cache ownership transfer atomic with concurrent cleanup

PopConsumerCache.writeAndDeleteRecords first classifies an old CK with consumerRecords.contains(record), then performs the RocksDB write/delete batch, and only afterward calls deleteRecords(bufferDeleteRecords). This check and removal are not atomic with cleanupRecords.

A cleanup thread can move the old CK from recordTreeMap to removeTreeMap, flush it to RocksDB, and clear it from the cache after contains returned true but before the final cache deletion. The renewal path then writes the new CK without including the old CK in storeDeleteRecords; its final cache deletion fails, and that failure result is ignored. RocksDB can therefore retain both the old and new CK, potentially reviving the same message more than once. If the keys coincide, a late cleanup write can also overwrite the new value with the old one.

Please coordinate cleanup and renewal through an atomic claim/removal or another explicit ownership state transition before deciding whether the old CK must be deleted from RocksDB. A concurrent regression test should pause between classification and deletion while cleanup stages and flushes the same record.

@RockteMQ-AI RockteMQ-AI left a comment

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Review by github-manager-bot (Re-review after 2 new commits)

Summary

Two new commits since the previous review:

  1. 7a2e4bc Fix batch change invisible time result ordering — Refactors ConsumerProcessor.batchChangeInvisibleTime to preserve input order in results by using index-based arrays instead of appending to a list.
  2. 3c5cc1b Keep single invisible time change direct — Reverts the delegation of changeInvisibilityDuration to the batch path, restoring the original direct implementation.

New Commit Assessment

  • [Info] ConsumerProcessor.batchChangeInvisibleTime ordering fix — Good change. The previous implementation grouped messages by broker and appended results in broker-group order, losing the original input ordering. The new approach:

    • Uses BatchChangeInvisibleTimeResult[] array indexed by original position
    • Tracks indices via brokerHandleIndexMap instead of message lists
    • Converts array to list via buildBatchChangeInvisibleTimeResultList at the end
    • This ensures callers receive results in the same order as the input handleMessageList, which is important for correlating results with requests.
  • [Info] ConsumerProcessorTest.testBatchChangeInvisibleTimePreserveInputOrderWithExpiredAndInterleavedGroups — Comprehensive test that verifies ordering is preserved with expired handles, interleaved broker assignments, and multiple groups. Good coverage.

  • [Info] PopConsumerService.changeInvisibilityDuration revert — Correct decision. Delegating a single-item call through the batch path (batchChangeInvisibilityDuration(Collections.singletonList(...))) added unnecessary overhead (record conversion, batch assembly) for the common single-item case. The direct path with PopConsumerRecord is more efficient and avoids the extra allocation. The net diff for this file is zero.

Previous Findings — No Change

All previous findings remain as stated in the last review. The remaining warnings about batch size capping in ConsumerProcessor.scheduleRenewTask and broker-level batch rejection vs. auto-split are still applicable.

No new issues found in these commits.


Automated review by github-manager-bot

@RockteMQ-AI RockteMQ-AI left a comment

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Review by github-manager-bot (Re-review after 2 new commits)

Summary

Two new commits since the previous review:

  1. 7ed9d80c Fix receipt handle group async cleanup test — Minor test adjustment in ReceiptHandleGroupTest.java (+3/-3).
  2. 7b533ec6 Mark broker shutdown test medium in Bazel — Adds timeout/size annotation to a Bazel test target in broker/BUILD.bazel (+3/-0).

Diff Verification

✅ All previously reviewed changes remain intact:

  • Batch ChangeInvisibleTime remoting protocol
  • ConsumerProcessor.batchChangeInvisibleTime() with index-based result ordering
  • Single invisible time change kept as direct path (not delegated to batch)
  • Chunk serialization for oversized batches
  • Pop cache batch write/delete simplification
  • Remoting serializable compat for transient fields

✅ New commits are test-only changes:

  • Test fix is a minor assertion/ordering adjustment
  • Bazel annotation is a build configuration change

Verdict

Test fixes are clean and don't affect production code. The overall PR design (batch invisible time with chunking, backward-compatible single-item path) remains sound. LGTM.


Automated review by github-manager-bot

@qianye1001

Copy link
Copy Markdown
Contributor Author

@fuyou001 Thanks, agreed. Fixed in d1d6e3e.

The proxy fallback is now restricted to the explicit unsupported case only: the real exception must be an MQBrokerException with ResponseCode.REQUEST_CODE_NOT_SUPPORTED. For ambiguous failures such as timeout/transport failure/lost response, processBrokerChangeInvisibleTime now completes the batch entries with INTERNAL_SERVER_ERROR and does not retry single changeInvisibleTime, so it will not create a second CK with a new changedPopTime after the broker may already have applied the batch.

Added regression coverage in ConsumerProcessorTest for both paths:

  • testBatchChangeInvisibleTimeFallbackOnlyWhenRequestCodeNotSupported
  • testBatchChangeInvisibleTimeDoesNotFallbackOnAmbiguousFailure

Verified locally:

  • mvn -pl proxy -am -DfailIfNoTests=false -Dtest=ConsumerProcessorTest test
  • mvn -pl broker -am -DfailIfNoTests=false -Dtest=ChangeInvisibleTimeProcessorTest test

Comment addressed: #10519 (comment)

@qianye1001

Copy link
Copy Markdown
Contributor Author

@fuyou001 Thanks for the detailed analysis. I considered this one and I do not plan to address it in this batch-change PR.

The reason is that this is not a new correctness model introduced by the batch path. The existing single changeInvisibilityDuration path already follows the same pattern: write the new CK, try to remove the old CK from the pop buffer, and then delete the old CK from RocksDB when needed. A concurrent cleanup can theoretically race with that path as well, so fixing this properly would be a broader PopConsumerCache ownership/cleanup change, not a batch-change-specific fix.

Making the ownership transfer atomic would likely require coordinating renewal/change and cleanup with an additional lock or explicit ownership state transition on the hot path. For the batch path that cost is relatively high, especially because the observed impact is a low-probability duplicate delivery scenario and POP consumption already has at-least-once semantics. I would prefer not to add that synchronization cost in this PR.

So for this PR I am keeping the batch behavior aligned with the existing single-change semantics and limiting the changes to reducing broker round trips and store writes/deletes. If we want to harden this race, I think it should be handled separately for both single and batch paths with a dedicated design and benchmark.

Comment addressed: #10519 (comment)

@RockteMQ-AI RockteMQ-AI left a comment

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Review by github-manager-bot (Re-review after 3 new commits)

Summary

Three new commits since the previous review:

  1. d1d6e3e Fix batch change invisible time fallback — Critical correctness fix for the fallback logic
  2. 3e563b2 ci: re-trigger CI — CI housekeeping
  3. dae2063 Stabilize producer backpressure test — Removes flaky timing-dependent test

Changes Since Last Review

1. Fallback Logic Fix (ConsumerProcessor.java)

  • [Info] Previously, any failure from batchChangeInvisibleTime triggered a fallback to single changeInvisibleTime requests. This was unsafe: if the batch partially succeeded on the broker (e.g., timeout after applying some changes), falling back would create duplicate checkpoints.
  • [Info] Now only falls back when the error is REQUEST_CODE_NOT_SUPPORTED (broker genuinely does not support the batch API). For all other failures (timeout, internal error), returns errors directly via buildBatchChangeInvisibleTimeExceptionResults.
  • [Info] The isBatchChangeInvisibleTimeNotSupported() helper correctly unwraps the real exception via ExceptionUtils.getRealException() before checking the type and response code.

2. Method Rename (ChangeInvisibleTimeProcessor.java)

  • [Info] validateBatchRequestEntriesnormalizeAndValidateBatchRequestEntries. The new name accurately reflects that the method both normalizes and validates entries.

3. Test Stabilization (DefaultMQProducerTest.java)

  • [Info] testRunningSetBackCompress simplified: removed multi-threaded CountDownLatch approach, replaced with direct semaphore manipulation. Eliminates timing-dependent flakiness.

4. New Tests (ConsumerProcessorTest.java)

  • [Info] testBatchChangeInvisibleTimeFallbackOnlyWhenRequestCodeNotSupported — Verifies fallback occurs only for REQUEST_CODE_NOT_SUPPORTED.
  • [Info] testBatchChangeInvisibleTimeDoesNotFallbackOnAmbiguousFailure — Verifies no fallback for ambiguous failures (e.g., TimeoutException), preventing duplicate checkpoints.

Findings

  • [Info] The fallback fix is a significant correctness improvement. The distinction between "broker does not support batch" (safe to fall back) and "batch may have partially succeeded" (unsafe to fall back) is well-reasoned.
  • [Info] Test coverage for the two fallback paths (supported vs. unsupported) is thorough.
  • [Info] The buildReceiptHandleMessages helper reduces test boilerplate.

Suggestions

  • [Info] Consider logging the actual exception type in the non-fallback path at INFO level (currently WARN) to help operators distinguish between "broker too old" and "transient failure" in production logs. Minor nit.

Overall the incremental changes look solid. The fallback correctness fix is important.


Automated review by github-manager-bot

import java.util.List;
import org.apache.rocketmq.remoting.protocol.RemotingSerializable;

public class BatchChangeInvisibleTimeRequestBody extends RemotingSerializable {

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

need to check if the batch size exceeds the length limit of the frequency.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

[Enhancement] Support BatchChangeInvisibleTime to reduce network round-trips and RocksDB flushes

7 participants