From 7a352aefe2243c5bb047286f40613efe57bbe70a Mon Sep 17 00:00:00 2001 From: songqing Date: Fri, 10 Apr 2026 09:01:34 +0800 Subject: [PATCH 1/3] fix(stream): correct XPENDING end parsing and inclusive PEL range scan --- src/commands/cmd_stream.cc | 2 +- src/types/redis_stream.cc | 20 +++++++++- tests/gocase/unit/type/stream/stream_test.go | 42 ++++++++++++++++++++ 3 files changed, 61 insertions(+), 3 deletions(-) diff --git a/src/commands/cmd_stream.cc b/src/commands/cmd_stream.cc index ae84e4aca28..d9f91c6e503 100644 --- a/src/commands/cmd_stream.cc +++ b/src/commands/cmd_stream.cc @@ -864,7 +864,7 @@ class CommandXPending : public Commander { } if (end_id != "+") { - auto s = ParseStreamEntryID(start_id, &options_.end_id); + auto s = ParseStreamEntryID(end_id, &options_.end_id); if (!s.IsOK()) { return s; } diff --git a/src/types/redis_stream.cc b/src/types/redis_stream.cc index 60606df08a4..d6af6ad64f9 100644 --- a/src/types/redis_stream.cc +++ b/src/types/redis_stream.cc @@ -1758,10 +1758,26 @@ rocksdb::Status Stream::GetPendingEntries(engine::Context &ctx, StreamPendingOpt } std::string prefix_key = internalPelKeyFromGroupAndEntryId(ns_key, metadata, group_name, options.start_id); - std::string end_key = internalPelKeyFromGroupAndEntryId(ns_key, metadata, group_name, options.end_id); + // XPENDING extended form follows XRANGE-style ranges (closed interval by default). + // RocksDB iterate_upper_bound is exclusive; use a key strictly after end_id for the scan bound. + std::string end_key_exclusive; + if (options.end_id == StreamEntryID::Maximum()) { + // "+" means no upper ID limit; place the bound after all keys in this stream metadata version + // (same pattern as Stream::trim). A PEL-specific key cannot be "past" maximum ID. + end_key_exclusive = InternalKey(ns_key, "", metadata.version + 1, storage_->IsSlotIdEncoded()).Encode(); + } else { + StreamEntryID end_next = options.end_id; + Status inc_st = IncrementStreamEntryID(&end_next); + if (!inc_st.IsOK()) { + end_key_exclusive = InternalKey(ns_key, "", metadata.version + 1, storage_->IsSlotIdEncoded()).Encode(); + } else { + // Next ID after end_id in PEL key space — exclusive iterator upper bound (XRANGE closed end). + end_key_exclusive = internalPelKeyFromGroupAndEntryId(ns_key, metadata, group_name, end_next); + } + } rocksdb::ReadOptions read_options = ctx.DefaultScanOptions(); - rocksdb::Slice upper_bound(end_key); + rocksdb::Slice upper_bound(end_key_exclusive); read_options.iterate_upper_bound = &upper_bound; rocksdb::Slice lower_bound(prefix_key); read_options.iterate_lower_bound = &lower_bound; diff --git a/tests/gocase/unit/type/stream/stream_test.go b/tests/gocase/unit/type/stream/stream_test.go index 14d3d361d62..59a54f1e92d 100644 --- a/tests/gocase/unit/type/stream/stream_test.go +++ b/tests/gocase/unit/type/stream/stream_test.go @@ -2480,6 +2480,48 @@ func TestStreamOffset(t *testing.T) { require.ErrorContains(t, rdb.Do(ctx, "XREVRANGE", "mystream").Err(), "wrong number of arguments") require.ErrorContains(t, rdb.Do(ctx, "XREVRANGE", "mystream", "+").Err(), "wrong number of arguments") }) + + t.Run("XPENDING with specific end ID should filter correctly", func(t *testing.T) { + streamKey := "xpending-endid-test" + group := "grp" + consumer := "con" + require.NoError(t, rdb.Del(ctx, streamKey).Err()) + + id1, err := rdb.XAdd(ctx, &redis.XAddArgs{Stream: streamKey, Values: []string{"f", "1"}}).Result() + require.NoError(t, err) + id2, err := rdb.XAdd(ctx, &redis.XAddArgs{Stream: streamKey, Values: []string{"f", "2"}}).Result() + require.NoError(t, err) + id3, err := rdb.XAdd(ctx, &redis.XAddArgs{Stream: streamKey, Values: []string{"f", "3"}}).Result() + require.NoError(t, err) + + require.NoError(t, rdb.XGroupCreateMkStream(ctx, streamKey, group, "0").Err()) + // Read all entries so they become pending + _, err = rdb.XReadGroup(ctx, &redis.XReadGroupArgs{Group: group, Consumer: consumer, Streams: []string{streamKey, ">"}, Count: 10}).Result() + require.NoError(t, err) + + // XPENDING extended form: same ID range rules as XRANGE (see Redis docs). Use XPENDING with end_id = id1. + result, err := rdb.Do(ctx, "XPENDING", streamKey, group, id1, id1, "10").Result() + require.NoError(t, err) + entries, ok := result.([]interface{}) + require.True(t, ok) + require.Len(t, entries, 1, "XPENDING with end_id=id1 should return only 1 entry") + + // Use XPENDING with range [id1, id2] (should return 2 entries). + result, err = rdb.Do(ctx, "XPENDING", streamKey, group, id1, id2, "10").Result() + require.NoError(t, err) + entries, ok = result.([]interface{}) + require.True(t, ok) + require.Len(t, entries, 2, "XPENDING with range [id1,id2] should return 2 entries") + + // Use XPENDING with range [id1, id3] (should return all 3 entries). + result, err = rdb.Do(ctx, "XPENDING", streamKey, group, id1, id3, "10").Result() + require.NoError(t, err) + entries, ok = result.([]interface{}) + require.True(t, ok) + require.Len(t, entries, 3, "XPENDING with range [id1,id3] should return 3 entries") + + require.NoError(t, rdb.Del(ctx, streamKey).Err()) + }) } func parseStreamEntryID(id string) (ts int64, seqNum int64) { From 53f54dbd0eba35f6f29cae1ee7327b9cb05f04b6 Mon Sep 17 00:00:00 2001 From: songqing Date: Fri, 10 Apr 2026 10:25:57 +0800 Subject: [PATCH 2/3] fix comment --- tests/gocase/unit/type/stream/stream_test.go | 22 ++++++++++++++++++++ 1 file changed, 22 insertions(+) diff --git a/tests/gocase/unit/type/stream/stream_test.go b/tests/gocase/unit/type/stream/stream_test.go index 59a54f1e92d..161e6dfb9d5 100644 --- a/tests/gocase/unit/type/stream/stream_test.go +++ b/tests/gocase/unit/type/stream/stream_test.go @@ -2499,12 +2499,29 @@ func TestStreamOffset(t *testing.T) { _, err = rdb.XReadGroup(ctx, &redis.XReadGroupArgs{Group: group, Consumer: consumer, Streams: []string{streamKey, ">"}, Count: 10}).Result() require.NoError(t, err) + // XPENDING extended form: each row is [id, consumer, idle_ms, delivery_count]. + assertXPendingExtRow := func(t *testing.T, row interface{}, wantID string) { + t.Helper() + fields, ok := row.([]interface{}) + require.True(t, ok) + require.Len(t, fields, 4) + gotID, ok := fields[0].(string) + require.True(t, ok) + require.Equal(t, wantID, gotID) + gotConsumer, ok := fields[1].(string) + require.True(t, ok) + require.Equal(t, consumer, gotConsumer) + require.GreaterOrEqual(t, fields[2], int64(0)) + require.EqualValues(t, 1, fields[3]) + } + // XPENDING extended form: same ID range rules as XRANGE (see Redis docs). Use XPENDING with end_id = id1. result, err := rdb.Do(ctx, "XPENDING", streamKey, group, id1, id1, "10").Result() require.NoError(t, err) entries, ok := result.([]interface{}) require.True(t, ok) require.Len(t, entries, 1, "XPENDING with end_id=id1 should return only 1 entry") + assertXPendingExtRow(t, entries[0], id1) // Use XPENDING with range [id1, id2] (should return 2 entries). result, err = rdb.Do(ctx, "XPENDING", streamKey, group, id1, id2, "10").Result() @@ -2512,6 +2529,8 @@ func TestStreamOffset(t *testing.T) { entries, ok = result.([]interface{}) require.True(t, ok) require.Len(t, entries, 2, "XPENDING with range [id1,id2] should return 2 entries") + assertXPendingExtRow(t, entries[0], id1) + assertXPendingExtRow(t, entries[1], id2) // Use XPENDING with range [id1, id3] (should return all 3 entries). result, err = rdb.Do(ctx, "XPENDING", streamKey, group, id1, id3, "10").Result() @@ -2519,6 +2538,9 @@ func TestStreamOffset(t *testing.T) { entries, ok = result.([]interface{}) require.True(t, ok) require.Len(t, entries, 3, "XPENDING with range [id1,id3] should return 3 entries") + assertXPendingExtRow(t, entries[0], id1) + assertXPendingExtRow(t, entries[1], id2) + assertXPendingExtRow(t, entries[2], id3) require.NoError(t, rdb.Del(ctx, streamKey).Err()) }) From 41cab002c82003eb047e0d7993fba62d8a8d6025 Mon Sep 17 00:00:00 2001 From: songqing Date: Fri, 10 Apr 2026 11:16:13 +0800 Subject: [PATCH 3/3] fix comment --- src/commands/cmd_stream.cc | 41 +++++++++-- src/types/redis_stream.cc | 19 ++++- src/types/redis_stream_base.h | 2 + tests/gocase/unit/type/stream/stream_test.go | 76 ++++++++++++++++++++ 4 files changed, 128 insertions(+), 10 deletions(-) diff --git a/src/commands/cmd_stream.cc b/src/commands/cmd_stream.cc index d9f91c6e503..73a5279c8eb 100644 --- a/src/commands/cmd_stream.cc +++ b/src/commands/cmd_stream.cc @@ -853,21 +853,48 @@ class CommandXPending : public Commander { } if (parser.Good()) { - std::string start_id, end_id; - start_id = GET_OR_RET(parser.TakeStr()); - end_id = GET_OR_RET(parser.TakeStr()); - if (start_id != "-") { - auto s = ParseStreamEntryID(start_id, &options_.start_id); + const std::string start_str = GET_OR_RET(parser.TakeStr()); + const std::string end_str = GET_OR_RET(parser.TakeStr()); + + // Extended XPENDING uses the same ID range rules as XRANGE (see Redis documentation). + if (start_str == "-") { + options_.start_id = StreamEntryID::Minimum(); + options_.exclude_start = false; + } else if (!start_str.empty() && start_str[0] == '(') { + options_.exclude_start = true; + auto s = ParseRangeStart(start_str.substr(1), &options_.start_id); if (!s.IsOK()) { return s; } + } else if (start_str == "+") { + options_.start_id = StreamEntryID::Maximum(); + options_.exclude_start = false; + } else { + auto s = ParseRangeStart(start_str, &options_.start_id); + if (!s.IsOK()) { + return s; + } + options_.exclude_start = false; } - if (end_id != "+") { - auto s = ParseStreamEntryID(end_id, &options_.end_id); + if (end_str == "+") { + options_.end_id = StreamEntryID::Maximum(); + options_.exclude_end = false; + } else if (!end_str.empty() && end_str[0] == '(') { + options_.exclude_end = true; + auto s = ParseRangeEnd(end_str.substr(1), &options_.end_id); + if (!s.IsOK()) { + return s; + } + } else if (end_str == "-") { + options_.end_id = StreamEntryID::Minimum(); + options_.exclude_end = false; + } else { + auto s = ParseRangeEnd(end_str, &options_.end_id); if (!s.IsOK()) { return s; } + options_.exclude_end = false; } options_.count = GET_OR_RET(parser.TakeInt()); diff --git a/src/types/redis_stream.cc b/src/types/redis_stream.cc index d6af6ad64f9..0deed335e02 100644 --- a/src/types/redis_stream.cc +++ b/src/types/redis_stream.cc @@ -1757,14 +1757,27 @@ rocksdb::Status Stream::GetPendingEntries(engine::Context &ctx, StreamPendingOpt return s.IsNotFound() ? rocksdb::Status::OK() : s; } - std::string prefix_key = internalPelKeyFromGroupAndEntryId(ns_key, metadata, group_name, options.start_id); - // XPENDING extended form follows XRANGE-style ranges (closed interval by default). - // RocksDB iterate_upper_bound is exclusive; use a key strictly after end_id for the scan bound. + StreamEntryID scan_start_id = options.start_id; + if (options.exclude_start) { + Status inc_st = IncrementStreamEntryID(&scan_start_id); + if (!inc_st.IsOK()) { + pending_infos.pending_number = 0; + pending_infos.first_entry_id = StreamEntryID::Maximum(); + pending_infos.last_entry_id = StreamEntryID::Minimum(); + return rocksdb::Status::OK(); + } + } + std::string prefix_key = internalPelKeyFromGroupAndEntryId(ns_key, metadata, group_name, scan_start_id); + // XPENDING extended form follows XRANGE-style ranges (closed interval by default; '(' excludes an endpoint). + // RocksDB iterate_upper_bound is exclusive. std::string end_key_exclusive; if (options.end_id == StreamEntryID::Maximum()) { // "+" means no upper ID limit; place the bound after all keys in this stream metadata version // (same pattern as Stream::trim). A PEL-specific key cannot be "past" maximum ID. end_key_exclusive = InternalKey(ns_key, "", metadata.version + 1, storage_->IsSlotIdEncoded()).Encode(); + } else if (options.exclude_end) { + // Exclusive end: iterator keys must be strictly before end_id. + end_key_exclusive = internalPelKeyFromGroupAndEntryId(ns_key, metadata, group_name, options.end_id); } else { StreamEntryID end_next = options.end_id; Status inc_st = IncrementStreamEntryID(&end_next); diff --git a/src/types/redis_stream_base.h b/src/types/redis_stream_base.h index e2d1998dcbb..f7991f573e3 100644 --- a/src/types/redis_stream_base.h +++ b/src/types/redis_stream_base.h @@ -245,6 +245,8 @@ struct StreamPendingOptions { StreamEntryID start_id{StreamEntryID::Minimum()}; StreamEntryID end_id{StreamEntryID::Maximum()}; + bool exclude_start = false; + bool exclude_end = false; uint64_t count; bool with_count = false; diff --git a/tests/gocase/unit/type/stream/stream_test.go b/tests/gocase/unit/type/stream/stream_test.go index 161e6dfb9d5..941495f9610 100644 --- a/tests/gocase/unit/type/stream/stream_test.go +++ b/tests/gocase/unit/type/stream/stream_test.go @@ -2544,6 +2544,82 @@ func TestStreamOffset(t *testing.T) { require.NoError(t, rdb.Del(ctx, streamKey).Err()) }) + + t.Run("XPENDING with incomplete end ID should include the whole millisecond", func(t *testing.T) { + streamKey := "xpending-incomplete-end-test" + group := "grp" + consumer := "con" + ids := []string{"1-0", "1-1", "1-2"} + + require.NoError(t, rdb.Del(ctx, streamKey).Err()) + for i, id := range ids { + require.NoError(t, rdb.XAdd(ctx, &redis.XAddArgs{Stream: streamKey, ID: id, Values: []string{"f", strconv.Itoa(i)}}).Err()) + } + + require.NoError(t, rdb.XGroupCreateMkStream(ctx, streamKey, group, "0").Err()) + _, err := rdb.XReadGroup(ctx, &redis.XReadGroupArgs{Group: group, Consumer: consumer, Streams: []string{streamKey, ">"}, Count: 10}).Result() + require.NoError(t, err) + + result, err := rdb.Do(ctx, "XPENDING", streamKey, group, "1", "1", "10").Result() + require.NoError(t, err) + entries, ok := result.([]interface{}) + require.True(t, ok) + require.Len(t, entries, 3, "XPENDING 1 1 should include every pending entry in millisecond 1, matching Redis") + + for i, entry := range entries { + fields, ok := entry.([]interface{}) + require.True(t, ok) + require.Len(t, fields, 4) + gotID, ok := fields[0].(string) + require.True(t, ok) + require.Equal(t, ids[i], gotID) + gotConsumer, ok := fields[1].(string) + require.True(t, ok) + require.Equal(t, consumer, gotConsumer) + require.GreaterOrEqual(t, fields[2], int64(0)) + require.EqualValues(t, 1, fields[3]) + } + + require.NoError(t, rdb.Del(ctx, streamKey).Err()) + }) + + t.Run("XPENDING with exclusive start should match Redis", func(t *testing.T) { + streamKey := "xpending-exclusive-start-test" + group := "grp" + consumer := "con" + ids := []string{"1-0", "1-1", "1-2"} + + require.NoError(t, rdb.Del(ctx, streamKey).Err()) + for i, id := range ids { + require.NoError(t, rdb.XAdd(ctx, &redis.XAddArgs{Stream: streamKey, ID: id, Values: []string{"f", strconv.Itoa(i)}}).Err()) + } + + require.NoError(t, rdb.XGroupCreateMkStream(ctx, streamKey, group, "0").Err()) + _, err := rdb.XReadGroup(ctx, &redis.XReadGroupArgs{Group: group, Consumer: consumer, Streams: []string{streamKey, ">"}, Count: 10}).Result() + require.NoError(t, err) + + result, err := rdb.Do(ctx, "XPENDING", streamKey, group, "(1-0", "+", "10").Result() + require.NoError(t, err) + entries, ok := result.([]interface{}) + require.True(t, ok) + require.Len(t, entries, 2, "XPENDING (1-0 + 10 should exclude the first pending entry, matching Redis") + + for i, entry := range entries { + fields, ok := entry.([]interface{}) + require.True(t, ok) + require.Len(t, fields, 4) + gotID, ok := fields[0].(string) + require.True(t, ok) + require.Equal(t, ids[i+1], gotID) + gotConsumer, ok := fields[1].(string) + require.True(t, ok) + require.Equal(t, consumer, gotConsumer) + require.GreaterOrEqual(t, fields[2], int64(0)) + require.EqualValues(t, 1, fields[3]) + } + + require.NoError(t, rdb.Del(ctx, streamKey).Err()) + }) } func parseStreamEntryID(id string) (ts int64, seqNum int64) {