Skip to content

Commit edb033e

Browse files
songqingjihuayu
andauthored
fix(stream): align XPENDING ID parsing and PEL range with XRANGE (incomplete IDs, exclusive bounds) (#3437)
XPENDING extended form must use the same start/end ID rules as XRANGE: parse the end with ParseRangeEnd so a bare millisecond includes all sequence numbers in that ms; accept '(' for exclusive start/end and honor them when scanning the pending entry list. Refs: https://redis.io/docs/latest/commands/xpending/ ("in a similar way we do it with XRANGE") Refs: https://redis.io/docs/latest/commands/xrange/ ("closed interval" / inclusive unless '(' prefix) --------- Co-authored-by: 纪华裕 <jihuayu123@gmail.com>
1 parent 855bad3 commit edb033e

4 files changed

Lines changed: 208 additions & 10 deletions

File tree

src/commands/cmd_stream.cc

Lines changed: 34 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -853,21 +853,48 @@ class CommandXPending : public Commander {
853853
}
854854

855855
if (parser.Good()) {
856-
std::string start_id, end_id;
857-
start_id = GET_OR_RET(parser.TakeStr());
858-
end_id = GET_OR_RET(parser.TakeStr());
859-
if (start_id != "-") {
860-
auto s = ParseStreamEntryID(start_id, &options_.start_id);
856+
const std::string start_str = GET_OR_RET(parser.TakeStr());
857+
const std::string end_str = GET_OR_RET(parser.TakeStr());
858+
859+
// Extended XPENDING uses the same ID range rules as XRANGE (see Redis documentation).
860+
if (start_str == "-") {
861+
options_.start_id = StreamEntryID::Minimum();
862+
options_.exclude_start = false;
863+
} else if (!start_str.empty() && start_str[0] == '(') {
864+
options_.exclude_start = true;
865+
auto s = ParseRangeStart(start_str.substr(1), &options_.start_id);
861866
if (!s.IsOK()) {
862867
return s;
863868
}
869+
} else if (start_str == "+") {
870+
options_.start_id = StreamEntryID::Maximum();
871+
options_.exclude_start = false;
872+
} else {
873+
auto s = ParseRangeStart(start_str, &options_.start_id);
874+
if (!s.IsOK()) {
875+
return s;
876+
}
877+
options_.exclude_start = false;
864878
}
865879

866-
if (end_id != "+") {
867-
auto s = ParseStreamEntryID(start_id, &options_.end_id);
880+
if (end_str == "+") {
881+
options_.end_id = StreamEntryID::Maximum();
882+
options_.exclude_end = false;
883+
} else if (!end_str.empty() && end_str[0] == '(') {
884+
options_.exclude_end = true;
885+
auto s = ParseRangeEnd(end_str.substr(1), &options_.end_id);
886+
if (!s.IsOK()) {
887+
return s;
888+
}
889+
} else if (end_str == "-") {
890+
options_.end_id = StreamEntryID::Minimum();
891+
options_.exclude_end = false;
892+
} else {
893+
auto s = ParseRangeEnd(end_str, &options_.end_id);
868894
if (!s.IsOK()) {
869895
return s;
870896
}
897+
options_.exclude_end = false;
871898
}
872899

873900
options_.count = GET_OR_RET(parser.TakeInt<uint64_t>());

src/types/redis_stream.cc

Lines changed: 32 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1751,11 +1751,40 @@ rocksdb::Status Stream::GetPendingEntries(engine::Context &ctx, StreamPendingOpt
17511751
return s.IsNotFound() ? rocksdb::Status::OK() : s;
17521752
}
17531753

1754-
std::string prefix_key = internalPelKeyFromGroupAndEntryId(ns_key, metadata, group_name, options.start_id);
1755-
std::string end_key = internalPelKeyFromGroupAndEntryId(ns_key, metadata, group_name, options.end_id);
1754+
StreamEntryID scan_start_id = options.start_id;
1755+
if (options.exclude_start) {
1756+
Status inc_st = IncrementStreamEntryID(&scan_start_id);
1757+
if (!inc_st.IsOK()) {
1758+
pending_infos.pending_number = 0;
1759+
pending_infos.first_entry_id = StreamEntryID::Maximum();
1760+
pending_infos.last_entry_id = StreamEntryID::Minimum();
1761+
return rocksdb::Status::OK();
1762+
}
1763+
}
1764+
std::string prefix_key = internalPelKeyFromGroupAndEntryId(ns_key, metadata, group_name, scan_start_id);
1765+
// XPENDING extended form follows XRANGE-style ranges (closed interval by default; '(' excludes an endpoint).
1766+
// RocksDB iterate_upper_bound is exclusive.
1767+
std::string end_key_exclusive;
1768+
if (options.end_id == StreamEntryID::Maximum()) {
1769+
// "+" means no upper ID limit; place the bound after all keys in this stream metadata version
1770+
// (same pattern as Stream::trim). A PEL-specific key cannot be "past" maximum ID.
1771+
end_key_exclusive = InternalKey(ns_key, "", metadata.version + 1, storage_->IsSlotIdEncoded()).Encode();
1772+
} else if (options.exclude_end) {
1773+
// Exclusive end: iterator keys must be strictly before end_id.
1774+
end_key_exclusive = internalPelKeyFromGroupAndEntryId(ns_key, metadata, group_name, options.end_id);
1775+
} else {
1776+
StreamEntryID end_next = options.end_id;
1777+
Status inc_st = IncrementStreamEntryID(&end_next);
1778+
if (!inc_st.IsOK()) {
1779+
end_key_exclusive = InternalKey(ns_key, "", metadata.version + 1, storage_->IsSlotIdEncoded()).Encode();
1780+
} else {
1781+
// Next ID after end_id in PEL key space — exclusive iterator upper bound (XRANGE closed end).
1782+
end_key_exclusive = internalPelKeyFromGroupAndEntryId(ns_key, metadata, group_name, end_next);
1783+
}
1784+
}
17561785

17571786
rocksdb::ReadOptions read_options = ctx.DefaultScanOptions();
1758-
rocksdb::Slice upper_bound(end_key);
1787+
rocksdb::Slice upper_bound(end_key_exclusive);
17591788
read_options.iterate_upper_bound = &upper_bound;
17601789
rocksdb::Slice lower_bound(prefix_key);
17611790
read_options.iterate_lower_bound = &lower_bound;

src/types/redis_stream_base.h

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -245,6 +245,8 @@ struct StreamPendingOptions {
245245

246246
StreamEntryID start_id{StreamEntryID::Minimum()};
247247
StreamEntryID end_id{StreamEntryID::Maximum()};
248+
bool exclude_start = false;
249+
bool exclude_end = false;
248250

249251
uint64_t count;
250252
bool with_count = false;

tests/gocase/unit/type/stream/stream_test.go

Lines changed: 140 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2503,6 +2503,146 @@ func TestStreamOffset(t *testing.T) {
25032503
require.ErrorContains(t, rdb.Do(ctx, "XREVRANGE", "mystream").Err(), "wrong number of arguments")
25042504
require.ErrorContains(t, rdb.Do(ctx, "XREVRANGE", "mystream", "+").Err(), "wrong number of arguments")
25052505
})
2506+
2507+
t.Run("XPENDING with specific end ID should filter correctly", func(t *testing.T) {
2508+
streamKey := "xpending-endid-test"
2509+
group := "grp"
2510+
consumer := "con"
2511+
require.NoError(t, rdb.Del(ctx, streamKey).Err())
2512+
2513+
id1, err := rdb.XAdd(ctx, &redis.XAddArgs{Stream: streamKey, Values: []string{"f", "1"}}).Result()
2514+
require.NoError(t, err)
2515+
id2, err := rdb.XAdd(ctx, &redis.XAddArgs{Stream: streamKey, Values: []string{"f", "2"}}).Result()
2516+
require.NoError(t, err)
2517+
id3, err := rdb.XAdd(ctx, &redis.XAddArgs{Stream: streamKey, Values: []string{"f", "3"}}).Result()
2518+
require.NoError(t, err)
2519+
2520+
require.NoError(t, rdb.XGroupCreateMkStream(ctx, streamKey, group, "0").Err())
2521+
// Read all entries so they become pending
2522+
_, err = rdb.XReadGroup(ctx, &redis.XReadGroupArgs{Group: group, Consumer: consumer, Streams: []string{streamKey, ">"}, Count: 10}).Result()
2523+
require.NoError(t, err)
2524+
2525+
// XPENDING extended form: each row is [id, consumer, idle_ms, delivery_count].
2526+
assertXPendingExtRow := func(t *testing.T, row interface{}, wantID string) {
2527+
t.Helper()
2528+
fields, ok := row.([]interface{})
2529+
require.True(t, ok)
2530+
require.Len(t, fields, 4)
2531+
gotID, ok := fields[0].(string)
2532+
require.True(t, ok)
2533+
require.Equal(t, wantID, gotID)
2534+
gotConsumer, ok := fields[1].(string)
2535+
require.True(t, ok)
2536+
require.Equal(t, consumer, gotConsumer)
2537+
require.GreaterOrEqual(t, fields[2], int64(0))
2538+
require.EqualValues(t, 1, fields[3])
2539+
}
2540+
2541+
// XPENDING extended form: same ID range rules as XRANGE (see Redis docs). Use XPENDING with end_id = id1.
2542+
result, err := rdb.Do(ctx, "XPENDING", streamKey, group, id1, id1, "10").Result()
2543+
require.NoError(t, err)
2544+
entries, ok := result.([]interface{})
2545+
require.True(t, ok)
2546+
require.Len(t, entries, 1, "XPENDING with end_id=id1 should return only 1 entry")
2547+
assertXPendingExtRow(t, entries[0], id1)
2548+
2549+
// Use XPENDING with range [id1, id2] (should return 2 entries).
2550+
result, err = rdb.Do(ctx, "XPENDING", streamKey, group, id1, id2, "10").Result()
2551+
require.NoError(t, err)
2552+
entries, ok = result.([]interface{})
2553+
require.True(t, ok)
2554+
require.Len(t, entries, 2, "XPENDING with range [id1,id2] should return 2 entries")
2555+
assertXPendingExtRow(t, entries[0], id1)
2556+
assertXPendingExtRow(t, entries[1], id2)
2557+
2558+
// Use XPENDING with range [id1, id3] (should return all 3 entries).
2559+
result, err = rdb.Do(ctx, "XPENDING", streamKey, group, id1, id3, "10").Result()
2560+
require.NoError(t, err)
2561+
entries, ok = result.([]interface{})
2562+
require.True(t, ok)
2563+
require.Len(t, entries, 3, "XPENDING with range [id1,id3] should return 3 entries")
2564+
assertXPendingExtRow(t, entries[0], id1)
2565+
assertXPendingExtRow(t, entries[1], id2)
2566+
assertXPendingExtRow(t, entries[2], id3)
2567+
2568+
require.NoError(t, rdb.Del(ctx, streamKey).Err())
2569+
})
2570+
2571+
t.Run("XPENDING with incomplete end ID should include the whole millisecond", func(t *testing.T) {
2572+
streamKey := "xpending-incomplete-end-test"
2573+
group := "grp"
2574+
consumer := "con"
2575+
ids := []string{"1-0", "1-1", "1-2"}
2576+
2577+
require.NoError(t, rdb.Del(ctx, streamKey).Err())
2578+
for i, id := range ids {
2579+
require.NoError(t, rdb.XAdd(ctx, &redis.XAddArgs{Stream: streamKey, ID: id, Values: []string{"f", strconv.Itoa(i)}}).Err())
2580+
}
2581+
2582+
require.NoError(t, rdb.XGroupCreateMkStream(ctx, streamKey, group, "0").Err())
2583+
_, err := rdb.XReadGroup(ctx, &redis.XReadGroupArgs{Group: group, Consumer: consumer, Streams: []string{streamKey, ">"}, Count: 10}).Result()
2584+
require.NoError(t, err)
2585+
2586+
result, err := rdb.Do(ctx, "XPENDING", streamKey, group, "1", "1", "10").Result()
2587+
require.NoError(t, err)
2588+
entries, ok := result.([]interface{})
2589+
require.True(t, ok)
2590+
require.Len(t, entries, 3, "XPENDING 1 1 should include every pending entry in millisecond 1, matching Redis")
2591+
2592+
for i, entry := range entries {
2593+
fields, ok := entry.([]interface{})
2594+
require.True(t, ok)
2595+
require.Len(t, fields, 4)
2596+
gotID, ok := fields[0].(string)
2597+
require.True(t, ok)
2598+
require.Equal(t, ids[i], gotID)
2599+
gotConsumer, ok := fields[1].(string)
2600+
require.True(t, ok)
2601+
require.Equal(t, consumer, gotConsumer)
2602+
require.GreaterOrEqual(t, fields[2], int64(0))
2603+
require.EqualValues(t, 1, fields[3])
2604+
}
2605+
2606+
require.NoError(t, rdb.Del(ctx, streamKey).Err())
2607+
})
2608+
2609+
t.Run("XPENDING with exclusive start should match Redis", func(t *testing.T) {
2610+
streamKey := "xpending-exclusive-start-test"
2611+
group := "grp"
2612+
consumer := "con"
2613+
ids := []string{"1-0", "1-1", "1-2"}
2614+
2615+
require.NoError(t, rdb.Del(ctx, streamKey).Err())
2616+
for i, id := range ids {
2617+
require.NoError(t, rdb.XAdd(ctx, &redis.XAddArgs{Stream: streamKey, ID: id, Values: []string{"f", strconv.Itoa(i)}}).Err())
2618+
}
2619+
2620+
require.NoError(t, rdb.XGroupCreateMkStream(ctx, streamKey, group, "0").Err())
2621+
_, err := rdb.XReadGroup(ctx, &redis.XReadGroupArgs{Group: group, Consumer: consumer, Streams: []string{streamKey, ">"}, Count: 10}).Result()
2622+
require.NoError(t, err)
2623+
2624+
result, err := rdb.Do(ctx, "XPENDING", streamKey, group, "(1-0", "+", "10").Result()
2625+
require.NoError(t, err)
2626+
entries, ok := result.([]interface{})
2627+
require.True(t, ok)
2628+
require.Len(t, entries, 2, "XPENDING (1-0 + 10 should exclude the first pending entry, matching Redis")
2629+
2630+
for i, entry := range entries {
2631+
fields, ok := entry.([]interface{})
2632+
require.True(t, ok)
2633+
require.Len(t, fields, 4)
2634+
gotID, ok := fields[0].(string)
2635+
require.True(t, ok)
2636+
require.Equal(t, ids[i+1], gotID)
2637+
gotConsumer, ok := fields[1].(string)
2638+
require.True(t, ok)
2639+
require.Equal(t, consumer, gotConsumer)
2640+
require.GreaterOrEqual(t, fields[2], int64(0))
2641+
require.EqualValues(t, 1, fields[3])
2642+
}
2643+
2644+
require.NoError(t, rdb.Del(ctx, streamKey).Err())
2645+
})
25062646
}
25072647

25082648
func parseStreamEntryID(id string) (ts int64, seqNum int64) {

0 commit comments

Comments
 (0)