Skip to content

Commit 7a352ae

Browse files
committed
fix(stream): correct XPENDING end parsing and inclusive PEL range scan
1 parent 2815e08 commit 7a352ae

3 files changed

Lines changed: 61 additions & 3 deletions

File tree

src/commands/cmd_stream.cc

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -864,7 +864,7 @@ class CommandXPending : public Commander {
864864
}
865865

866866
if (end_id != "+") {
867-
auto s = ParseStreamEntryID(start_id, &options_.end_id);
867+
auto s = ParseStreamEntryID(end_id, &options_.end_id);
868868
if (!s.IsOK()) {
869869
return s;
870870
}

src/types/redis_stream.cc

Lines changed: 18 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1758,10 +1758,26 @@ rocksdb::Status Stream::GetPendingEntries(engine::Context &ctx, StreamPendingOpt
17581758
}
17591759

17601760
std::string prefix_key = internalPelKeyFromGroupAndEntryId(ns_key, metadata, group_name, options.start_id);
1761-
std::string end_key = internalPelKeyFromGroupAndEntryId(ns_key, metadata, group_name, options.end_id);
1761+
// XPENDING extended form follows XRANGE-style ranges (closed interval by default).
1762+
// RocksDB iterate_upper_bound is exclusive; use a key strictly after end_id for the scan bound.
1763+
std::string end_key_exclusive;
1764+
if (options.end_id == StreamEntryID::Maximum()) {
1765+
// "+" means no upper ID limit; place the bound after all keys in this stream metadata version
1766+
// (same pattern as Stream::trim). A PEL-specific key cannot be "past" maximum ID.
1767+
end_key_exclusive = InternalKey(ns_key, "", metadata.version + 1, storage_->IsSlotIdEncoded()).Encode();
1768+
} else {
1769+
StreamEntryID end_next = options.end_id;
1770+
Status inc_st = IncrementStreamEntryID(&end_next);
1771+
if (!inc_st.IsOK()) {
1772+
end_key_exclusive = InternalKey(ns_key, "", metadata.version + 1, storage_->IsSlotIdEncoded()).Encode();
1773+
} else {
1774+
// Next ID after end_id in PEL key space — exclusive iterator upper bound (XRANGE closed end).
1775+
end_key_exclusive = internalPelKeyFromGroupAndEntryId(ns_key, metadata, group_name, end_next);
1776+
}
1777+
}
17621778

17631779
rocksdb::ReadOptions read_options = ctx.DefaultScanOptions();
1764-
rocksdb::Slice upper_bound(end_key);
1780+
rocksdb::Slice upper_bound(end_key_exclusive);
17651781
read_options.iterate_upper_bound = &upper_bound;
17661782
rocksdb::Slice lower_bound(prefix_key);
17671783
read_options.iterate_lower_bound = &lower_bound;

tests/gocase/unit/type/stream/stream_test.go

Lines changed: 42 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2480,6 +2480,48 @@ func TestStreamOffset(t *testing.T) {
24802480
require.ErrorContains(t, rdb.Do(ctx, "XREVRANGE", "mystream").Err(), "wrong number of arguments")
24812481
require.ErrorContains(t, rdb.Do(ctx, "XREVRANGE", "mystream", "+").Err(), "wrong number of arguments")
24822482
})
2483+
2484+
t.Run("XPENDING with specific end ID should filter correctly", func(t *testing.T) {
2485+
streamKey := "xpending-endid-test"
2486+
group := "grp"
2487+
consumer := "con"
2488+
require.NoError(t, rdb.Del(ctx, streamKey).Err())
2489+
2490+
id1, err := rdb.XAdd(ctx, &redis.XAddArgs{Stream: streamKey, Values: []string{"f", "1"}}).Result()
2491+
require.NoError(t, err)
2492+
id2, err := rdb.XAdd(ctx, &redis.XAddArgs{Stream: streamKey, Values: []string{"f", "2"}}).Result()
2493+
require.NoError(t, err)
2494+
id3, err := rdb.XAdd(ctx, &redis.XAddArgs{Stream: streamKey, Values: []string{"f", "3"}}).Result()
2495+
require.NoError(t, err)
2496+
2497+
require.NoError(t, rdb.XGroupCreateMkStream(ctx, streamKey, group, "0").Err())
2498+
// Read all entries so they become pending
2499+
_, err = rdb.XReadGroup(ctx, &redis.XReadGroupArgs{Group: group, Consumer: consumer, Streams: []string{streamKey, ">"}, Count: 10}).Result()
2500+
require.NoError(t, err)
2501+
2502+
// XPENDING extended form: same ID range rules as XRANGE (see Redis docs). Use XPENDING with end_id = id1.
2503+
result, err := rdb.Do(ctx, "XPENDING", streamKey, group, id1, id1, "10").Result()
2504+
require.NoError(t, err)
2505+
entries, ok := result.([]interface{})
2506+
require.True(t, ok)
2507+
require.Len(t, entries, 1, "XPENDING with end_id=id1 should return only 1 entry")
2508+
2509+
// Use XPENDING with range [id1, id2] (should return 2 entries).
2510+
result, err = rdb.Do(ctx, "XPENDING", streamKey, group, id1, id2, "10").Result()
2511+
require.NoError(t, err)
2512+
entries, ok = result.([]interface{})
2513+
require.True(t, ok)
2514+
require.Len(t, entries, 2, "XPENDING with range [id1,id2] should return 2 entries")
2515+
2516+
// Use XPENDING with range [id1, id3] (should return all 3 entries).
2517+
result, err = rdb.Do(ctx, "XPENDING", streamKey, group, id1, id3, "10").Result()
2518+
require.NoError(t, err)
2519+
entries, ok = result.([]interface{})
2520+
require.True(t, ok)
2521+
require.Len(t, entries, 3, "XPENDING with range [id1,id3] should return 3 entries")
2522+
2523+
require.NoError(t, rdb.Del(ctx, streamKey).Err())
2524+
})
24832525
}
24842526

24852527
func parseStreamEntryID(id string) (ts int64, seqNum int64) {

0 commit comments

Comments
 (0)