Skip to content

Commit 41cab00

Browse files
committed
fix comment
1 parent 53f54db commit 41cab00

4 files changed

Lines changed: 128 additions & 10 deletions

File tree

src/commands/cmd_stream.cc

Lines changed: 34 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -853,21 +853,48 @@ class CommandXPending : public Commander {
853853
}
854854

855855
if (parser.Good()) {
856-
std::string start_id, end_id;
857-
start_id = GET_OR_RET(parser.TakeStr());
858-
end_id = GET_OR_RET(parser.TakeStr());
859-
if (start_id != "-") {
860-
auto s = ParseStreamEntryID(start_id, &options_.start_id);
856+
const std::string start_str = GET_OR_RET(parser.TakeStr());
857+
const std::string end_str = GET_OR_RET(parser.TakeStr());
858+
859+
// Extended XPENDING uses the same ID range rules as XRANGE (see Redis documentation).
860+
if (start_str == "-") {
861+
options_.start_id = StreamEntryID::Minimum();
862+
options_.exclude_start = false;
863+
} else if (!start_str.empty() && start_str[0] == '(') {
864+
options_.exclude_start = true;
865+
auto s = ParseRangeStart(start_str.substr(1), &options_.start_id);
861866
if (!s.IsOK()) {
862867
return s;
863868
}
869+
} else if (start_str == "+") {
870+
options_.start_id = StreamEntryID::Maximum();
871+
options_.exclude_start = false;
872+
} else {
873+
auto s = ParseRangeStart(start_str, &options_.start_id);
874+
if (!s.IsOK()) {
875+
return s;
876+
}
877+
options_.exclude_start = false;
864878
}
865879

866-
if (end_id != "+") {
867-
auto s = ParseStreamEntryID(end_id, &options_.end_id);
880+
if (end_str == "+") {
881+
options_.end_id = StreamEntryID::Maximum();
882+
options_.exclude_end = false;
883+
} else if (!end_str.empty() && end_str[0] == '(') {
884+
options_.exclude_end = true;
885+
auto s = ParseRangeEnd(end_str.substr(1), &options_.end_id);
886+
if (!s.IsOK()) {
887+
return s;
888+
}
889+
} else if (end_str == "-") {
890+
options_.end_id = StreamEntryID::Minimum();
891+
options_.exclude_end = false;
892+
} else {
893+
auto s = ParseRangeEnd(end_str, &options_.end_id);
868894
if (!s.IsOK()) {
869895
return s;
870896
}
897+
options_.exclude_end = false;
871898
}
872899

873900
options_.count = GET_OR_RET(parser.TakeInt<uint64_t>());

src/types/redis_stream.cc

Lines changed: 16 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1757,14 +1757,27 @@ rocksdb::Status Stream::GetPendingEntries(engine::Context &ctx, StreamPendingOpt
17571757
return s.IsNotFound() ? rocksdb::Status::OK() : s;
17581758
}
17591759

1760-
std::string prefix_key = internalPelKeyFromGroupAndEntryId(ns_key, metadata, group_name, options.start_id);
1761-
// XPENDING extended form follows XRANGE-style ranges (closed interval by default).
1762-
// RocksDB iterate_upper_bound is exclusive; use a key strictly after end_id for the scan bound.
1760+
StreamEntryID scan_start_id = options.start_id;
1761+
if (options.exclude_start) {
1762+
Status inc_st = IncrementStreamEntryID(&scan_start_id);
1763+
if (!inc_st.IsOK()) {
1764+
pending_infos.pending_number = 0;
1765+
pending_infos.first_entry_id = StreamEntryID::Maximum();
1766+
pending_infos.last_entry_id = StreamEntryID::Minimum();
1767+
return rocksdb::Status::OK();
1768+
}
1769+
}
1770+
std::string prefix_key = internalPelKeyFromGroupAndEntryId(ns_key, metadata, group_name, scan_start_id);
1771+
// XPENDING extended form follows XRANGE-style ranges (closed interval by default; '(' excludes an endpoint).
1772+
// RocksDB iterate_upper_bound is exclusive.
17631773
std::string end_key_exclusive;
17641774
if (options.end_id == StreamEntryID::Maximum()) {
17651775
// "+" means no upper ID limit; place the bound after all keys in this stream metadata version
17661776
// (same pattern as Stream::trim). A PEL-specific key cannot be "past" maximum ID.
17671777
end_key_exclusive = InternalKey(ns_key, "", metadata.version + 1, storage_->IsSlotIdEncoded()).Encode();
1778+
} else if (options.exclude_end) {
1779+
// Exclusive end: iterator keys must be strictly before end_id.
1780+
end_key_exclusive = internalPelKeyFromGroupAndEntryId(ns_key, metadata, group_name, options.end_id);
17681781
} else {
17691782
StreamEntryID end_next = options.end_id;
17701783
Status inc_st = IncrementStreamEntryID(&end_next);

src/types/redis_stream_base.h

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -245,6 +245,8 @@ struct StreamPendingOptions {
245245

246246
StreamEntryID start_id{StreamEntryID::Minimum()};
247247
StreamEntryID end_id{StreamEntryID::Maximum()};
248+
bool exclude_start = false;
249+
bool exclude_end = false;
248250

249251
uint64_t count;
250252
bool with_count = false;

tests/gocase/unit/type/stream/stream_test.go

Lines changed: 76 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2544,6 +2544,82 @@ func TestStreamOffset(t *testing.T) {
25442544

25452545
require.NoError(t, rdb.Del(ctx, streamKey).Err())
25462546
})
2547+
2548+
t.Run("XPENDING with incomplete end ID should include the whole millisecond", func(t *testing.T) {
2549+
streamKey := "xpending-incomplete-end-test"
2550+
group := "grp"
2551+
consumer := "con"
2552+
ids := []string{"1-0", "1-1", "1-2"}
2553+
2554+
require.NoError(t, rdb.Del(ctx, streamKey).Err())
2555+
for i, id := range ids {
2556+
require.NoError(t, rdb.XAdd(ctx, &redis.XAddArgs{Stream: streamKey, ID: id, Values: []string{"f", strconv.Itoa(i)}}).Err())
2557+
}
2558+
2559+
require.NoError(t, rdb.XGroupCreateMkStream(ctx, streamKey, group, "0").Err())
2560+
_, err := rdb.XReadGroup(ctx, &redis.XReadGroupArgs{Group: group, Consumer: consumer, Streams: []string{streamKey, ">"}, Count: 10}).Result()
2561+
require.NoError(t, err)
2562+
2563+
result, err := rdb.Do(ctx, "XPENDING", streamKey, group, "1", "1", "10").Result()
2564+
require.NoError(t, err)
2565+
entries, ok := result.([]interface{})
2566+
require.True(t, ok)
2567+
require.Len(t, entries, 3, "XPENDING 1 1 should include every pending entry in millisecond 1, matching Redis")
2568+
2569+
for i, entry := range entries {
2570+
fields, ok := entry.([]interface{})
2571+
require.True(t, ok)
2572+
require.Len(t, fields, 4)
2573+
gotID, ok := fields[0].(string)
2574+
require.True(t, ok)
2575+
require.Equal(t, ids[i], gotID)
2576+
gotConsumer, ok := fields[1].(string)
2577+
require.True(t, ok)
2578+
require.Equal(t, consumer, gotConsumer)
2579+
require.GreaterOrEqual(t, fields[2], int64(0))
2580+
require.EqualValues(t, 1, fields[3])
2581+
}
2582+
2583+
require.NoError(t, rdb.Del(ctx, streamKey).Err())
2584+
})
2585+
2586+
t.Run("XPENDING with exclusive start should match Redis", func(t *testing.T) {
2587+
streamKey := "xpending-exclusive-start-test"
2588+
group := "grp"
2589+
consumer := "con"
2590+
ids := []string{"1-0", "1-1", "1-2"}
2591+
2592+
require.NoError(t, rdb.Del(ctx, streamKey).Err())
2593+
for i, id := range ids {
2594+
require.NoError(t, rdb.XAdd(ctx, &redis.XAddArgs{Stream: streamKey, ID: id, Values: []string{"f", strconv.Itoa(i)}}).Err())
2595+
}
2596+
2597+
require.NoError(t, rdb.XGroupCreateMkStream(ctx, streamKey, group, "0").Err())
2598+
_, err := rdb.XReadGroup(ctx, &redis.XReadGroupArgs{Group: group, Consumer: consumer, Streams: []string{streamKey, ">"}, Count: 10}).Result()
2599+
require.NoError(t, err)
2600+
2601+
result, err := rdb.Do(ctx, "XPENDING", streamKey, group, "(1-0", "+", "10").Result()
2602+
require.NoError(t, err)
2603+
entries, ok := result.([]interface{})
2604+
require.True(t, ok)
2605+
require.Len(t, entries, 2, "XPENDING (1-0 + 10 should exclude the first pending entry, matching Redis")
2606+
2607+
for i, entry := range entries {
2608+
fields, ok := entry.([]interface{})
2609+
require.True(t, ok)
2610+
require.Len(t, fields, 4)
2611+
gotID, ok := fields[0].(string)
2612+
require.True(t, ok)
2613+
require.Equal(t, ids[i+1], gotID)
2614+
gotConsumer, ok := fields[1].(string)
2615+
require.True(t, ok)
2616+
require.Equal(t, consumer, gotConsumer)
2617+
require.GreaterOrEqual(t, fields[2], int64(0))
2618+
require.EqualValues(t, 1, fields[3])
2619+
}
2620+
2621+
require.NoError(t, rdb.Del(ctx, streamKey).Err())
2622+
})
25472623
}
25482624

25492625
func parseStreamEntryID(id string) (ts int64, seqNum int64) {

0 commit comments

Comments
 (0)