Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
41 changes: 34 additions & 7 deletions src/commands/cmd_stream.cc
Original file line number Diff line number Diff line change
Expand Up @@ -853,21 +853,48 @@ class CommandXPending : public Commander {
}

if (parser.Good()) {
std::string start_id, end_id;
start_id = GET_OR_RET(parser.TakeStr());
end_id = GET_OR_RET(parser.TakeStr());
if (start_id != "-") {
auto s = ParseStreamEntryID(start_id, &options_.start_id);
const std::string start_str = GET_OR_RET(parser.TakeStr());
const std::string end_str = GET_OR_RET(parser.TakeStr());

// Extended XPENDING uses the same ID range rules as XRANGE (see Redis documentation).
if (start_str == "-") {
options_.start_id = StreamEntryID::Minimum();
options_.exclude_start = false;
} else if (!start_str.empty() && start_str[0] == '(') {
options_.exclude_start = true;
auto s = ParseRangeStart(start_str.substr(1), &options_.start_id);
if (!s.IsOK()) {
return s;
}
} else if (start_str == "+") {
options_.start_id = StreamEntryID::Maximum();
options_.exclude_start = false;
} else {
auto s = ParseRangeStart(start_str, &options_.start_id);
if (!s.IsOK()) {
return s;
}
options_.exclude_start = false;
}

if (end_id != "+") {
auto s = ParseStreamEntryID(start_id, &options_.end_id);
if (end_str == "+") {
options_.end_id = StreamEntryID::Maximum();
options_.exclude_end = false;
} else if (!end_str.empty() && end_str[0] == '(') {
options_.exclude_end = true;
auto s = ParseRangeEnd(end_str.substr(1), &options_.end_id);
if (!s.IsOK()) {
return s;
}
} else if (end_str == "-") {
options_.end_id = StreamEntryID::Minimum();
options_.exclude_end = false;
} else {
auto s = ParseRangeEnd(end_str, &options_.end_id);
if (!s.IsOK()) {
return s;
}
options_.exclude_end = false;
}

options_.count = GET_OR_RET(parser.TakeInt<uint64_t>());
Expand Down
35 changes: 32 additions & 3 deletions src/types/redis_stream.cc
Original file line number Diff line number Diff line change
Expand Up @@ -1751,11 +1751,40 @@ rocksdb::Status Stream::GetPendingEntries(engine::Context &ctx, StreamPendingOpt
return s.IsNotFound() ? rocksdb::Status::OK() : s;
}

std::string prefix_key = internalPelKeyFromGroupAndEntryId(ns_key, metadata, group_name, options.start_id);
std::string end_key = internalPelKeyFromGroupAndEntryId(ns_key, metadata, group_name, options.end_id);
StreamEntryID scan_start_id = options.start_id;
if (options.exclude_start) {
Status inc_st = IncrementStreamEntryID(&scan_start_id);
if (!inc_st.IsOK()) {
pending_infos.pending_number = 0;
pending_infos.first_entry_id = StreamEntryID::Maximum();
pending_infos.last_entry_id = StreamEntryID::Minimum();
return rocksdb::Status::OK();
}
}
std::string prefix_key = internalPelKeyFromGroupAndEntryId(ns_key, metadata, group_name, scan_start_id);
// XPENDING extended form follows XRANGE-style ranges (closed interval by default; '(' excludes an endpoint).
// RocksDB iterate_upper_bound is exclusive.
std::string end_key_exclusive;
if (options.end_id == StreamEntryID::Maximum()) {
// "+" means no upper ID limit; place the bound after all keys in this stream metadata version
// (same pattern as Stream::trim). A PEL-specific key cannot be "past" maximum ID.
end_key_exclusive = InternalKey(ns_key, "", metadata.version + 1, storage_->IsSlotIdEncoded()).Encode();
} else if (options.exclude_end) {
// Exclusive end: iterator keys must be strictly before end_id.
end_key_exclusive = internalPelKeyFromGroupAndEntryId(ns_key, metadata, group_name, options.end_id);
} else {
StreamEntryID end_next = options.end_id;
Status inc_st = IncrementStreamEntryID(&end_next);
if (!inc_st.IsOK()) {
end_key_exclusive = InternalKey(ns_key, "", metadata.version + 1, storage_->IsSlotIdEncoded()).Encode();
} else {
// Next ID after end_id in PEL key space — exclusive iterator upper bound (XRANGE closed end).
end_key_exclusive = internalPelKeyFromGroupAndEntryId(ns_key, metadata, group_name, end_next);
}
}

rocksdb::ReadOptions read_options = ctx.DefaultScanOptions();
rocksdb::Slice upper_bound(end_key);
rocksdb::Slice upper_bound(end_key_exclusive);
read_options.iterate_upper_bound = &upper_bound;
rocksdb::Slice lower_bound(prefix_key);
read_options.iterate_lower_bound = &lower_bound;
Expand Down
2 changes: 2 additions & 0 deletions src/types/redis_stream_base.h
Original file line number Diff line number Diff line change
Expand Up @@ -245,6 +245,8 @@ struct StreamPendingOptions {

StreamEntryID start_id{StreamEntryID::Minimum()};
StreamEntryID end_id{StreamEntryID::Maximum()};
bool exclude_start = false;
bool exclude_end = false;

uint64_t count;
bool with_count = false;
Expand Down
140 changes: 140 additions & 0 deletions tests/gocase/unit/type/stream/stream_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2503,6 +2503,146 @@ func TestStreamOffset(t *testing.T) {
require.ErrorContains(t, rdb.Do(ctx, "XREVRANGE", "mystream").Err(), "wrong number of arguments")
require.ErrorContains(t, rdb.Do(ctx, "XREVRANGE", "mystream", "+").Err(), "wrong number of arguments")
})

t.Run("XPENDING with specific end ID should filter correctly", func(t *testing.T) {
streamKey := "xpending-endid-test"
group := "grp"
consumer := "con"
require.NoError(t, rdb.Del(ctx, streamKey).Err())

id1, err := rdb.XAdd(ctx, &redis.XAddArgs{Stream: streamKey, Values: []string{"f", "1"}}).Result()
require.NoError(t, err)
id2, err := rdb.XAdd(ctx, &redis.XAddArgs{Stream: streamKey, Values: []string{"f", "2"}}).Result()
require.NoError(t, err)
id3, err := rdb.XAdd(ctx, &redis.XAddArgs{Stream: streamKey, Values: []string{"f", "3"}}).Result()
require.NoError(t, err)

require.NoError(t, rdb.XGroupCreateMkStream(ctx, streamKey, group, "0").Err())
// Read all entries so they become pending
_, err = rdb.XReadGroup(ctx, &redis.XReadGroupArgs{Group: group, Consumer: consumer, Streams: []string{streamKey, ">"}, Count: 10}).Result()
require.NoError(t, err)

// XPENDING extended form: each row is [id, consumer, idle_ms, delivery_count].
assertXPendingExtRow := func(t *testing.T, row interface{}, wantID string) {
t.Helper()
fields, ok := row.([]interface{})
require.True(t, ok)
require.Len(t, fields, 4)
gotID, ok := fields[0].(string)
require.True(t, ok)
require.Equal(t, wantID, gotID)
gotConsumer, ok := fields[1].(string)
require.True(t, ok)
require.Equal(t, consumer, gotConsumer)
require.GreaterOrEqual(t, fields[2], int64(0))
require.EqualValues(t, 1, fields[3])
}

// XPENDING extended form: same ID range rules as XRANGE (see Redis docs). Use XPENDING with end_id = id1.
result, err := rdb.Do(ctx, "XPENDING", streamKey, group, id1, id1, "10").Result()
require.NoError(t, err)
entries, ok := result.([]interface{})
require.True(t, ok)
require.Len(t, entries, 1, "XPENDING with end_id=id1 should return only 1 entry")
Comment thread
git-hulk marked this conversation as resolved.
assertXPendingExtRow(t, entries[0], id1)

// Use XPENDING with range [id1, id2] (should return 2 entries).
result, err = rdb.Do(ctx, "XPENDING", streamKey, group, id1, id2, "10").Result()
require.NoError(t, err)
entries, ok = result.([]interface{})
require.True(t, ok)
require.Len(t, entries, 2, "XPENDING with range [id1,id2] should return 2 entries")
assertXPendingExtRow(t, entries[0], id1)
assertXPendingExtRow(t, entries[1], id2)

// Use XPENDING with range [id1, id3] (should return all 3 entries).
result, err = rdb.Do(ctx, "XPENDING", streamKey, group, id1, id3, "10").Result()
require.NoError(t, err)
entries, ok = result.([]interface{})
require.True(t, ok)
require.Len(t, entries, 3, "XPENDING with range [id1,id3] should return 3 entries")
assertXPendingExtRow(t, entries[0], id1)
assertXPendingExtRow(t, entries[1], id2)
assertXPendingExtRow(t, entries[2], id3)

require.NoError(t, rdb.Del(ctx, streamKey).Err())
})

t.Run("XPENDING with incomplete end ID should include the whole millisecond", func(t *testing.T) {
streamKey := "xpending-incomplete-end-test"
group := "grp"
consumer := "con"
ids := []string{"1-0", "1-1", "1-2"}

require.NoError(t, rdb.Del(ctx, streamKey).Err())
for i, id := range ids {
require.NoError(t, rdb.XAdd(ctx, &redis.XAddArgs{Stream: streamKey, ID: id, Values: []string{"f", strconv.Itoa(i)}}).Err())
}

require.NoError(t, rdb.XGroupCreateMkStream(ctx, streamKey, group, "0").Err())
_, err := rdb.XReadGroup(ctx, &redis.XReadGroupArgs{Group: group, Consumer: consumer, Streams: []string{streamKey, ">"}, Count: 10}).Result()
require.NoError(t, err)

result, err := rdb.Do(ctx, "XPENDING", streamKey, group, "1", "1", "10").Result()
require.NoError(t, err)
entries, ok := result.([]interface{})
require.True(t, ok)
require.Len(t, entries, 3, "XPENDING 1 1 should include every pending entry in millisecond 1, matching Redis")

for i, entry := range entries {
fields, ok := entry.([]interface{})
require.True(t, ok)
require.Len(t, fields, 4)
gotID, ok := fields[0].(string)
require.True(t, ok)
require.Equal(t, ids[i], gotID)
gotConsumer, ok := fields[1].(string)
require.True(t, ok)
require.Equal(t, consumer, gotConsumer)
require.GreaterOrEqual(t, fields[2], int64(0))
require.EqualValues(t, 1, fields[3])
}

require.NoError(t, rdb.Del(ctx, streamKey).Err())
})

t.Run("XPENDING with exclusive start should match Redis", func(t *testing.T) {
streamKey := "xpending-exclusive-start-test"
group := "grp"
consumer := "con"
ids := []string{"1-0", "1-1", "1-2"}

require.NoError(t, rdb.Del(ctx, streamKey).Err())
for i, id := range ids {
require.NoError(t, rdb.XAdd(ctx, &redis.XAddArgs{Stream: streamKey, ID: id, Values: []string{"f", strconv.Itoa(i)}}).Err())
}

require.NoError(t, rdb.XGroupCreateMkStream(ctx, streamKey, group, "0").Err())
_, err := rdb.XReadGroup(ctx, &redis.XReadGroupArgs{Group: group, Consumer: consumer, Streams: []string{streamKey, ">"}, Count: 10}).Result()
require.NoError(t, err)

result, err := rdb.Do(ctx, "XPENDING", streamKey, group, "(1-0", "+", "10").Result()
require.NoError(t, err)
entries, ok := result.([]interface{})
require.True(t, ok)
require.Len(t, entries, 2, "XPENDING (1-0 + 10 should exclude the first pending entry, matching Redis")

for i, entry := range entries {
fields, ok := entry.([]interface{})
require.True(t, ok)
require.Len(t, fields, 4)
gotID, ok := fields[0].(string)
require.True(t, ok)
require.Equal(t, ids[i+1], gotID)
gotConsumer, ok := fields[1].(string)
require.True(t, ok)
require.Equal(t, consumer, gotConsumer)
require.GreaterOrEqual(t, fields[2], int64(0))
require.EqualValues(t, 1, fields[3])
}

require.NoError(t, rdb.Del(ctx, streamKey).Err())
})
}

func parseStreamEntryID(id string) (ts int64, seqNum int64) {
Expand Down