feat(stream): support XDELEX command#3502
Conversation
b464d40 to
ccc15fd
Compare
There was a problem hiding this comment.
Pull request overview
This PR adds a new stream deletion command XDELEX with configurable deletion/PEL-cleanup semantics, and fixes XINFO CONSUMERS so consumer metadata from other groups doesn’t leak into the scan results.
Changes:
- Implement
XDELEXwithKEEPREF(default),DELREF, andACKEDstrategies, including replica replay support viaWriteBatchLogData. - Fix
XINFO CONSUMERSiteration to filter consumers by group name, preventing cross-group leakage. - Add Go integration tests and a C++ unit test for new deletion behaviors and parsing edge cases.
Reviewed changes
Copilot reviewed 7 out of 7 changed files in this pull request and generated 3 comments.
Show a summary per file
| File | Description |
|---|---|
| tests/gocase/unit/type/stream/stream_test.go | Adds Go integration coverage for XDELEX behaviors and error handling. |
| tests/cppunit/types/stream_test.cc | Adds a C++ unit test ensuring empty ID list handling for the new deletion API. |
| src/types/redis_stream.h | Exposes DeleteEntriesWithOption and helper declarations for XDELEX implementation. |
| src/types/redis_stream.cc | Implements DeleteEntriesWithOption plus PEL cleanup and ACKED semantics; fixes GetConsumerInfo group isolation. |
| src/types/redis_stream_base.h | Introduces enums for delete options and per-ID deletion result codes. |
| src/storage/batch_extractor.cc | Ensures replication replays XDELEX semantics and filters internal stream/PEL/meta events. |
| src/commands/cmd_stream.cc | Adds the xdelex command parser/executor and registers the command. |
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
| if (first_deleted) { | ||
| iter->SeekToFirst(); | ||
| while (iter->Valid() && deleted_entry_keys.count(iter->key().ToString()) > 0) { | ||
| iter->Next(); | ||
| } | ||
| if (iter->Valid()) { | ||
| metadata.first_entry_id = entryIDFromInternalKey(iter->key()); | ||
| metadata.recorded_first_entry_id = metadata.first_entry_id; | ||
| } else { | ||
| metadata.first_entry_id.Clear(); | ||
| metadata.recorded_first_entry_id.Clear(); | ||
| } | ||
| } | ||
| if (last_deleted) { | ||
| iter->SeekToLast(); | ||
| while (iter->Valid() && deleted_entry_keys.count(iter->key().ToString()) > 0) { | ||
| iter->Prev(); | ||
| } | ||
| if (iter->Valid()) { | ||
| metadata.last_entry_id = entryIDFromInternalKey(iter->key()); | ||
| } else { | ||
| metadata.last_entry_id.Clear(); | ||
| } | ||
| } |
| #include <map> | ||
| #include <optional> | ||
| #include <string> | ||
| #include <vector> | ||
|
|
||
| #include "common/db_util.h" | ||
| #include "storage/redis_db.h" | ||
| #include "storage/redis_metadata.h" |
| for (int64_t i = 0; i < numids; i++) { | ||
| auto id_str = GET_OR_RET(parser.TakeStr()); | ||
| redis::StreamEntryID id; | ||
| auto s = ParseStreamEntryID(id_str, &id); | ||
| if (!s.IsOK()) return s; | ||
| entry_ids_.emplace_back(id); | ||
| } | ||
|
|
||
| return Status::OK(); |
|
Hi @kirito632. Thanks for your PR. I reviewed your PR once using my AI tools (Codex / GPT-5.5 xhigh), and it found quite a few issues (I’m not going to reveal these things. I want you to try and see whether you can find them on your own.). I hope you can also try using an AI tool locally to review your PR once before submitting it. Below is the prompt I use. It is written in Chinese, but you can translate it into the language you use and run it there: If you have any other questions, feel free to @ me directly. Hope this helps you. |
|
Thanks a lot for the detailed feedback and for sharing your workflow. I’m going through the current PR again to identify and fix the remaining issues on my own first.Really appreciate the guidance and the time you spent reviewing this PR. |
d2839d9 to
649043e
Compare
|
Sorry for another force-push. I've added two missing checks: |
649043e to
0ddf4de
Compare
|
Hi @jihuayu ,I took a look at the CI failures and they don't appear to be directly related to the changes in this PR.These might be flaky failures. Could you please help rerun those failed tests?Thanks. |
Done. When you need a review, please @ me. |
|
@jihuayu , I investigated the coverage report after adding the new XDELEX integration tests. The Go tests now cover:
However, cmd_stream.cc and batch_extractor.cc still appear as 0% covered This makes me wonder whether the issue is related to coverage collection Do you think additional C++ unit tests are still expected here, or should |
|
Thanks for waiting! This is a bit weird. Since you only added the instructions, go test should theoretically hit most of the branches, so the coverage shouldn't be this low. That said, Sonar is sometimes inaccurate with coverage stats because it pulls in useless files. Let me check it out. |
|
I’ve confirmed that the code coverage issue is related to CI. I’ve been busy working on the CI these days. Sorry, the review may still need to wait a few more days. PS: I asked Copilot to review the code. It is not 100% correct, so feel free to ignore its comments if you disagree. |
| void WriteBatchExtractor::LogData(const rocksdb::Slice &blob) { | ||
| // Currently, we only have two kinds of log data | ||
| if (ServerLogData::IsServerLogData(blob.data())) { | ||
| ServerLogData server_log; | ||
| if (server_log.Decode(blob).IsOK()) { | ||
| // We don't handle server log currently | ||
| } | ||
| } else { | ||
| // Redis type log data | ||
| if (auto s = log_data_.Decode(blob); !s.IsOK()) { | ||
| WARN("Failed to decode Redis type log: {}", s.Msg()); | ||
| } else { | ||
| seen_xdelex_entry_keys_.clear(); | ||
| } | ||
| } | ||
| } |
| auto group_meta = decodeStreamConsumerGroupMetadataValue(group_value); | ||
| group_meta.pending_number -= decrement; | ||
| s = batch->Put(stream_cf_handle_, group_key, encodeStreamConsumerGroupMetadataValue(group_meta)); |
| auto consumer_meta = decodeStreamConsumerMetadataValue(consumer_value); | ||
| consumer_meta.pending_number -= decrement; | ||
| s = batch->Put(stream_cf_handle_, consumer_key, encodeStreamConsumerMetadataValue(consumer_meta)); |
|
@jihuayu ,While reviewing the stream code for XDELEX, I found two unrelated stream issues:
These appear independent of XDELEX, so I plan to handle them separately to keep the current PR focused.Do you think this approach is acceptable, or should I address them within the current PR? |
Yes, splitting it into a separate PR would be better. You can simply mark the Copilot review comments as resolved. |
…vent cross-group leakage
Add XDELEX to delete stream entries with KEEPREF/DELREF options. Includes refactoring for unified entry/PEL deletion. Also fix a consumer pending count bug when destroying a consumer.
ecc4365 to
b952fd6
Compare
|
Hi @jihuayu , I’ve been trying to improve my review process while working on this PR. After your previous feedback, I spent some time comparing behavior against Redis 8.2 and found several compatibility issues myself before updating the implementation. I’m still learning, so I’m curious about one thing: when you review a PR like this, what are the main categories you usually check beyond basic functionality? For example, do you focus more on Redis compatibility, invariants, replication behavior, edge cases, maintainability, or something else? I’m mainly asking because I’d like to improve my own review workflow and understand what experienced maintainers tend to look for. Thanks! |
|
@kirito632 Wow, I’m glad you’re interested in this. First, what I care about most is correctness. This includes consistency with Redis semantics, although we may intentionally use different semantics when there is a specific reason to do so. It also includes whether the implementation follows Kvrocks conventions, whether reads and writes follow the intended design, whether locking is handled correctly, and whether there are any performance issues. Beyond that, there are some style-related concerns, such as whether the code is clear and whether the naming is appropriate. This part can be somewhat subjective and depends on each reviewer’s preferences. In general, as long as it is not too far off, it should be fine. As for test cases, the ideal case would be to cover all edge cases. In practice, that is quite difficult, so covering the main flow and important error paths is usually enough. Among these, the first point—correctness—is the main focus of the review. For especially complex or very long code, code style and readability will also be treated as being just as important as correctness. I hope to see in the review that contributors understand their own code and have put their own thought into it. |
|
Thanks, this is very helpful. For this PR, I’ll keep correctness as the main focus and continue checking it from those angles. One thing I noticed while testing against Redis 8.2 is that some XDELEX behavior is a bit more permissive than what I first expected from the command description, so I’ve been using Redis 8.2 behavior as the source of truth where possible and adding focused tests for those cases. |
|
@jihuayu ,Could you please help rerun the failed tests?Thanks. |
|
OK, done! Don't worry, if you confirm the test is not your issue, I will make sure they all pass before merging. |
|
@jihuayu , The CI is green now, but SonarCloud is still failing because Coverage on New Code is 40.5% (required >= 50%). After checking the report, most uncovered lines are in:
The stream command behavior itself is already covered by the newly added Go tests, but Sonar seems to be counting several WriteBatchExtractor paths that are difficult to exercise through command-level tests. I'm considering adding a few focused C++ tests around WriteBatchExtractor to increase coverage. Before doing that, I'd like to confirm whether this is the direction you'd prefer, or if there are other areas you think would benefit more from additional coverage. Thanks! |
|
There are still some issues with the code coverage, but we can ignore them for now. |
|
Understood, thanks! |
|
Hi @kirito632, are your two PRs ready for review? |
|
Hi @jihuayu ,both PRs are ready for review.Thanks. |
|
jihuayu
left a comment
There was a problem hiding this comment.
Hi @kirito632 After reading through it, I think there are a few performance issues with the current implementation. Here are my suggestions:
- Return the group metadata together with the group names in a single scan, so ACKED doesn’t need an extra RocksDB Get for each group.
- Skip groups whose pending_number is zero, since they cannot contain any PEL references for the target entries.
- Precompute the minimum last_delivered_id across all groups for ACKED. If an entry ID is greater than this value, it cannot have been acknowledged by every group, so it can be skipped without any PEL lookups.
- Use bounded, chunked MultiGet for the remaining PEL keys instead of issuing one RocksDB Get for every (entry ID, group) pair.
- For DELREF, organize the lookups by group. Batch-check the requested entry IDs for each active group, then delete only the matched PEL entries and aggregate the corresponding pending-count decrements.
Also, I think this PR is a bit too large. Once a PR contains more than about 300 lines of non-test logic, it becomes significantly harder to review. Reviewers often don’t have enough uninterrupted time to understand a large change in one sitting, which slows down the review process considerably. Since the different subcommands are largely independent, I would recommend splitting them into separate PRs next times.
| bool fields_seen = false; | ||
|
|
||
| for (size_t i = start; i < args.size();) { | ||
| if (util::EqualICase(args[i], "FIELDS")) { | ||
| if (fields_seen) { | ||
| return {Status::RedisParseErr, errInvalidSyntax}; | ||
| } | ||
| fields_seen = true; | ||
| if (i + 1 >= args.size()) { | ||
| return {Status::RedisParseErr, errWrongNumOfArguments}; | ||
| } | ||
|
|
||
| auto num_fields = ParseInt<int64_t>(args[i + 1], 10); | ||
| if (!num_fields || *num_fields < 1) { | ||
| return {Status::RedisParseErr, errValueNotInteger}; | ||
| } | ||
|
|
||
| size_t first_field = i + 2; | ||
| auto field_count = static_cast<size_t>(*num_fields); | ||
| if (field_count > args.size() - first_field) { | ||
| return {Status::RedisParseErr, errWrongNumOfArguments}; | ||
| } | ||
| size_t i = start; | ||
| if (i < args.size()) { | ||
| auto condition = ParseHashExpireCondition(args[i]); | ||
| if (condition) { | ||
| *condition_out = *condition; | ||
| i++; | ||
|
|
||
| fields->clear(); | ||
| fields->reserve(field_count); | ||
| for (size_t j = 0; j < field_count; j++) { | ||
| fields->emplace_back(args[first_field + j]); | ||
| if (i < args.size() && ParseHashExpireCondition(args[i])) { | ||
| return {Status::RedisParseErr, errInvalidSyntax}; | ||
| } | ||
| i = first_field + field_count; | ||
| continue; | ||
| } | ||
|
|
||
| auto condition = ParseHashExpireCondition(args[i]); | ||
| if (!condition) { | ||
| return {Status::RedisParseErr, errInvalidSyntax}; | ||
| } | ||
| if (*condition_out != HashFieldExpireCondition::kNone && *condition_out != *condition) { | ||
| return {Status::RedisParseErr, errInvalidSyntax}; | ||
| } | ||
| *condition_out = *condition; | ||
| i++; | ||
| } | ||
|
|
||
| if (!fields_seen) { | ||
| CommandParser parser(args, i); | ||
| if (!parser.EatEqICase("FIELDS")) { | ||
| return {Status::RedisParseErr, errInvalidSyntax}; | ||
| } | ||
| return Status::OK(); | ||
| return ParseHashFieldListTail(parser, fields); |
There was a problem hiding this comment.
I originally refactored the HFE parser while trying to align the parsing style with the new XDELEX implementation, but it unnecessarily broadened the scope of this PR.I will fix this in the next push.Thanks for you review.
| Status ParseStreamEntryID(const std::string &input, StreamEntryID *id) { | ||
| auto pos = input.find('-'); | ||
| if (pos != std::string::npos) { | ||
| auto ms_str = input.substr(0, pos); | ||
| auto seq_str = input.substr(pos + 1); | ||
| auto parse_ms = ParseInt<uint64_t>(ms_str, 10); | ||
| auto parse_seq = ParseInt<uint64_t>(seq_str, 10); | ||
| auto parse_ms = ParseStreamEntryIDComponent(ms_str, false); | ||
| auto parse_seq = ParseStreamEntryIDComponent(seq_str, true); | ||
| if (!parse_ms || !parse_seq) { | ||
| return {Status::RedisParseErr, kErrInvalidEntryIdSpecified}; | ||
| } | ||
|
|
||
| id->ms = *parse_ms; | ||
| id->seq = *parse_seq; | ||
| } else { | ||
| auto parse_input = ParseInt<uint64_t>(input, 10); | ||
| auto parse_input = ParseStreamEntryIDComponent(input, false); |
There was a problem hiding this comment.
Why change it?
This changes the shared stream ID parser to accept signed stream ID components such as +1-0, 1-+0, and 1--0. The change is broader than XDELEX because other stream commands also use ParseStreamEntryID. Redis rejects these forms, so this is a protocol compatibility regression. Please keep the shared parser strict and update the XDELEX tests to expect errors for signed components.
There was a problem hiding this comment.
My apologies, I didn't consider the wider impact. I'll keep the shared parser strict and fix this in the next push.
There was a problem hiding this comment.
I verified this against Redis 8.2 locally and found that these signed zero-equivalent IDs are accepted by XDELEX (and some other stream commands). I'll revert the shared parser changes and keep the compatibility handling local to XDELEX instead.Also, regarding the size of this PR, would you prefer me to split it into smaller PRs at this point, or would you rather keep this PR as-is.
There was a problem hiding this comment.
@kirito632 For this PR, let’s leave it as is. You can split your other PR.


What
This PR introduces the XDELEX stream command with configurable
PEL cleanup behaviors.
It also fixes an existing issue in XINFO CONSUMERS where consumer metadata
from different groups could leak into the same scan result.
XINFO CONSUMERS Fix
Root cause:
GetConsumerInfo() used iterator bounds based only on the stream version range
([version, version+1)).
Since the InternalKey encoding places the version field before the sub-key,
consumer records from other groups could still fall within the same iteration
range.
Fix:
Extract and validate the group name from each internal key during iteration,
filtering out unrelated consumers.
XDELEX
Supported deletion strategies:
KEEPREF (default):
Delete the stream entry only.
DELREF:
Delete the stream entry and remove it from all groups' PELs.
ACKED:
Delete the stream entry only if it has been acknowledged by all consumer groups.
Per-ID return codes:
1: entry deleted
2: entry retained (e.g. ACKED condition not satisfied)
-1: entry not found or duplicate ID within the same request
Implementation Notes
Pending-number decrements are aggregated in memory and flushed through
a single WriteBatch to avoid repeated metadata updates during DELREF operations.
Duplicate IDs are deduplicated to preserve idempotency and prevent
stream metadata corruption.
The originating command name is propagated into WriteBatchExtractor
so replicas replay the exact same deletion semantics.
Internal stream metadata and PEL delete events are filtered out in
WriteBatchExtractor to avoid generating invalid replicated XDEL commands.
Testing
Added Go integration tests covering:
multi-group DELREF cleanup
ACKED blocking semantics
duplicate-ID idempotency
invalid syntax handling
XINFO CONSUMERS group isolation
AI-Assisted Contribution Disclosure
This contribution complies with the ASF AI-assisted contribution guidelines.
AI tools were used to assist with English phrasing and some test scaffolding.
The core C++ implementation, debugging, correctness analysis, and final verification
were independently completed by me.