diff --git a/src/commands/cmd_stream.cc b/src/commands/cmd_stream.cc index ae84e4aca28..73a5279c8eb 100644 --- a/src/commands/cmd_stream.cc +++ b/src/commands/cmd_stream.cc @@ -853,21 +853,48 @@ class CommandXPending : public Commander { } if (parser.Good()) { - std::string start_id, end_id; - start_id = GET_OR_RET(parser.TakeStr()); - end_id = GET_OR_RET(parser.TakeStr()); - if (start_id != "-") { - auto s = ParseStreamEntryID(start_id, &options_.start_id); + const std::string start_str = GET_OR_RET(parser.TakeStr()); + const std::string end_str = GET_OR_RET(parser.TakeStr()); + + // Extended XPENDING uses the same ID range rules as XRANGE (see Redis documentation). + if (start_str == "-") { + options_.start_id = StreamEntryID::Minimum(); + options_.exclude_start = false; + } else if (!start_str.empty() && start_str[0] == '(') { + options_.exclude_start = true; + auto s = ParseRangeStart(start_str.substr(1), &options_.start_id); if (!s.IsOK()) { return s; } + } else if (start_str == "+") { + options_.start_id = StreamEntryID::Maximum(); + options_.exclude_start = false; + } else { + auto s = ParseRangeStart(start_str, &options_.start_id); + if (!s.IsOK()) { + return s; + } + options_.exclude_start = false; } - if (end_id != "+") { - auto s = ParseStreamEntryID(start_id, &options_.end_id); + if (end_str == "+") { + options_.end_id = StreamEntryID::Maximum(); + options_.exclude_end = false; + } else if (!end_str.empty() && end_str[0] == '(') { + options_.exclude_end = true; + auto s = ParseRangeEnd(end_str.substr(1), &options_.end_id); + if (!s.IsOK()) { + return s; + } + } else if (end_str == "-") { + options_.end_id = StreamEntryID::Minimum(); + options_.exclude_end = false; + } else { + auto s = ParseRangeEnd(end_str, &options_.end_id); if (!s.IsOK()) { return s; } + options_.exclude_end = false; } options_.count = GET_OR_RET(parser.TakeInt()); diff --git a/src/types/redis_stream.cc b/src/types/redis_stream.cc index 6dc47fba243..580629c9fb1 100644 --- a/src/types/redis_stream.cc +++ b/src/types/redis_stream.cc @@ -1751,11 +1751,40 @@ rocksdb::Status Stream::GetPendingEntries(engine::Context &ctx, StreamPendingOpt return s.IsNotFound() ? rocksdb::Status::OK() : s; } - std::string prefix_key = internalPelKeyFromGroupAndEntryId(ns_key, metadata, group_name, options.start_id); - std::string end_key = internalPelKeyFromGroupAndEntryId(ns_key, metadata, group_name, options.end_id); + StreamEntryID scan_start_id = options.start_id; + if (options.exclude_start) { + Status inc_st = IncrementStreamEntryID(&scan_start_id); + if (!inc_st.IsOK()) { + pending_infos.pending_number = 0; + pending_infos.first_entry_id = StreamEntryID::Maximum(); + pending_infos.last_entry_id = StreamEntryID::Minimum(); + return rocksdb::Status::OK(); + } + } + std::string prefix_key = internalPelKeyFromGroupAndEntryId(ns_key, metadata, group_name, scan_start_id); + // XPENDING extended form follows XRANGE-style ranges (closed interval by default; '(' excludes an endpoint). + // RocksDB iterate_upper_bound is exclusive. + std::string end_key_exclusive; + if (options.end_id == StreamEntryID::Maximum()) { + // "+" means no upper ID limit; place the bound after all keys in this stream metadata version + // (same pattern as Stream::trim). A PEL-specific key cannot be "past" maximum ID. + end_key_exclusive = InternalKey(ns_key, "", metadata.version + 1, storage_->IsSlotIdEncoded()).Encode(); + } else if (options.exclude_end) { + // Exclusive end: iterator keys must be strictly before end_id. + end_key_exclusive = internalPelKeyFromGroupAndEntryId(ns_key, metadata, group_name, options.end_id); + } else { + StreamEntryID end_next = options.end_id; + Status inc_st = IncrementStreamEntryID(&end_next); + if (!inc_st.IsOK()) { + end_key_exclusive = InternalKey(ns_key, "", metadata.version + 1, storage_->IsSlotIdEncoded()).Encode(); + } else { + // Next ID after end_id in PEL key space — exclusive iterator upper bound (XRANGE closed end). + end_key_exclusive = internalPelKeyFromGroupAndEntryId(ns_key, metadata, group_name, end_next); + } + } rocksdb::ReadOptions read_options = ctx.DefaultScanOptions(); - rocksdb::Slice upper_bound(end_key); + rocksdb::Slice upper_bound(end_key_exclusive); read_options.iterate_upper_bound = &upper_bound; rocksdb::Slice lower_bound(prefix_key); read_options.iterate_lower_bound = &lower_bound; diff --git a/src/types/redis_stream_base.h b/src/types/redis_stream_base.h index e2d1998dcbb..f7991f573e3 100644 --- a/src/types/redis_stream_base.h +++ b/src/types/redis_stream_base.h @@ -245,6 +245,8 @@ struct StreamPendingOptions { StreamEntryID start_id{StreamEntryID::Minimum()}; StreamEntryID end_id{StreamEntryID::Maximum()}; + bool exclude_start = false; + bool exclude_end = false; uint64_t count; bool with_count = false; diff --git a/tests/gocase/unit/type/stream/stream_test.go b/tests/gocase/unit/type/stream/stream_test.go index 1590847afde..00354848ca9 100644 --- a/tests/gocase/unit/type/stream/stream_test.go +++ b/tests/gocase/unit/type/stream/stream_test.go @@ -2503,6 +2503,146 @@ func TestStreamOffset(t *testing.T) { require.ErrorContains(t, rdb.Do(ctx, "XREVRANGE", "mystream").Err(), "wrong number of arguments") require.ErrorContains(t, rdb.Do(ctx, "XREVRANGE", "mystream", "+").Err(), "wrong number of arguments") }) + + t.Run("XPENDING with specific end ID should filter correctly", func(t *testing.T) { + streamKey := "xpending-endid-test" + group := "grp" + consumer := "con" + require.NoError(t, rdb.Del(ctx, streamKey).Err()) + + id1, err := rdb.XAdd(ctx, &redis.XAddArgs{Stream: streamKey, Values: []string{"f", "1"}}).Result() + require.NoError(t, err) + id2, err := rdb.XAdd(ctx, &redis.XAddArgs{Stream: streamKey, Values: []string{"f", "2"}}).Result() + require.NoError(t, err) + id3, err := rdb.XAdd(ctx, &redis.XAddArgs{Stream: streamKey, Values: []string{"f", "3"}}).Result() + require.NoError(t, err) + + require.NoError(t, rdb.XGroupCreateMkStream(ctx, streamKey, group, "0").Err()) + // Read all entries so they become pending + _, err = rdb.XReadGroup(ctx, &redis.XReadGroupArgs{Group: group, Consumer: consumer, Streams: []string{streamKey, ">"}, Count: 10}).Result() + require.NoError(t, err) + + // XPENDING extended form: each row is [id, consumer, idle_ms, delivery_count]. + assertXPendingExtRow := func(t *testing.T, row interface{}, wantID string) { + t.Helper() + fields, ok := row.([]interface{}) + require.True(t, ok) + require.Len(t, fields, 4) + gotID, ok := fields[0].(string) + require.True(t, ok) + require.Equal(t, wantID, gotID) + gotConsumer, ok := fields[1].(string) + require.True(t, ok) + require.Equal(t, consumer, gotConsumer) + require.GreaterOrEqual(t, fields[2], int64(0)) + require.EqualValues(t, 1, fields[3]) + } + + // XPENDING extended form: same ID range rules as XRANGE (see Redis docs). Use XPENDING with end_id = id1. + result, err := rdb.Do(ctx, "XPENDING", streamKey, group, id1, id1, "10").Result() + require.NoError(t, err) + entries, ok := result.([]interface{}) + require.True(t, ok) + require.Len(t, entries, 1, "XPENDING with end_id=id1 should return only 1 entry") + assertXPendingExtRow(t, entries[0], id1) + + // Use XPENDING with range [id1, id2] (should return 2 entries). + result, err = rdb.Do(ctx, "XPENDING", streamKey, group, id1, id2, "10").Result() + require.NoError(t, err) + entries, ok = result.([]interface{}) + require.True(t, ok) + require.Len(t, entries, 2, "XPENDING with range [id1,id2] should return 2 entries") + assertXPendingExtRow(t, entries[0], id1) + assertXPendingExtRow(t, entries[1], id2) + + // Use XPENDING with range [id1, id3] (should return all 3 entries). + result, err = rdb.Do(ctx, "XPENDING", streamKey, group, id1, id3, "10").Result() + require.NoError(t, err) + entries, ok = result.([]interface{}) + require.True(t, ok) + require.Len(t, entries, 3, "XPENDING with range [id1,id3] should return 3 entries") + assertXPendingExtRow(t, entries[0], id1) + assertXPendingExtRow(t, entries[1], id2) + assertXPendingExtRow(t, entries[2], id3) + + require.NoError(t, rdb.Del(ctx, streamKey).Err()) + }) + + t.Run("XPENDING with incomplete end ID should include the whole millisecond", func(t *testing.T) { + streamKey := "xpending-incomplete-end-test" + group := "grp" + consumer := "con" + ids := []string{"1-0", "1-1", "1-2"} + + require.NoError(t, rdb.Del(ctx, streamKey).Err()) + for i, id := range ids { + require.NoError(t, rdb.XAdd(ctx, &redis.XAddArgs{Stream: streamKey, ID: id, Values: []string{"f", strconv.Itoa(i)}}).Err()) + } + + require.NoError(t, rdb.XGroupCreateMkStream(ctx, streamKey, group, "0").Err()) + _, err := rdb.XReadGroup(ctx, &redis.XReadGroupArgs{Group: group, Consumer: consumer, Streams: []string{streamKey, ">"}, Count: 10}).Result() + require.NoError(t, err) + + result, err := rdb.Do(ctx, "XPENDING", streamKey, group, "1", "1", "10").Result() + require.NoError(t, err) + entries, ok := result.([]interface{}) + require.True(t, ok) + require.Len(t, entries, 3, "XPENDING 1 1 should include every pending entry in millisecond 1, matching Redis") + + for i, entry := range entries { + fields, ok := entry.([]interface{}) + require.True(t, ok) + require.Len(t, fields, 4) + gotID, ok := fields[0].(string) + require.True(t, ok) + require.Equal(t, ids[i], gotID) + gotConsumer, ok := fields[1].(string) + require.True(t, ok) + require.Equal(t, consumer, gotConsumer) + require.GreaterOrEqual(t, fields[2], int64(0)) + require.EqualValues(t, 1, fields[3]) + } + + require.NoError(t, rdb.Del(ctx, streamKey).Err()) + }) + + t.Run("XPENDING with exclusive start should match Redis", func(t *testing.T) { + streamKey := "xpending-exclusive-start-test" + group := "grp" + consumer := "con" + ids := []string{"1-0", "1-1", "1-2"} + + require.NoError(t, rdb.Del(ctx, streamKey).Err()) + for i, id := range ids { + require.NoError(t, rdb.XAdd(ctx, &redis.XAddArgs{Stream: streamKey, ID: id, Values: []string{"f", strconv.Itoa(i)}}).Err()) + } + + require.NoError(t, rdb.XGroupCreateMkStream(ctx, streamKey, group, "0").Err()) + _, err := rdb.XReadGroup(ctx, &redis.XReadGroupArgs{Group: group, Consumer: consumer, Streams: []string{streamKey, ">"}, Count: 10}).Result() + require.NoError(t, err) + + result, err := rdb.Do(ctx, "XPENDING", streamKey, group, "(1-0", "+", "10").Result() + require.NoError(t, err) + entries, ok := result.([]interface{}) + require.True(t, ok) + require.Len(t, entries, 2, "XPENDING (1-0 + 10 should exclude the first pending entry, matching Redis") + + for i, entry := range entries { + fields, ok := entry.([]interface{}) + require.True(t, ok) + require.Len(t, fields, 4) + gotID, ok := fields[0].(string) + require.True(t, ok) + require.Equal(t, ids[i+1], gotID) + gotConsumer, ok := fields[1].(string) + require.True(t, ok) + require.Equal(t, consumer, gotConsumer) + require.GreaterOrEqual(t, fields[2], int64(0)) + require.EqualValues(t, 1, fields[3]) + } + + require.NoError(t, rdb.Del(ctx, streamKey).Err()) + }) } func parseStreamEntryID(id string) (ts int64, seqNum int64) {