@@ -2539,6 +2539,82 @@ func TestStreamOffset(t *testing.T) {
25392539
25402540 require .NoError (t , rdb .Del (ctx , streamKey ).Err ())
25412541 })
2542+
2543+ t .Run ("XPENDING with incomplete end ID should include the whole millisecond" , func (t * testing.T ) {
2544+ streamKey := "xpending-incomplete-end-test"
2545+ group := "grp"
2546+ consumer := "con"
2547+ ids := []string {"1-0" , "1-1" , "1-2" }
2548+
2549+ require .NoError (t , rdb .Del (ctx , streamKey ).Err ())
2550+ for i , id := range ids {
2551+ require .NoError (t , rdb .XAdd (ctx , & redis.XAddArgs {Stream : streamKey , ID : id , Values : []string {"f" , strconv .Itoa (i )}}).Err ())
2552+ }
2553+
2554+ require .NoError (t , rdb .XGroupCreateMkStream (ctx , streamKey , group , "0" ).Err ())
2555+ _ , err := rdb .XReadGroup (ctx , & redis.XReadGroupArgs {Group : group , Consumer : consumer , Streams : []string {streamKey , ">" }, Count : 10 }).Result ()
2556+ require .NoError (t , err )
2557+
2558+ result , err := rdb .Do (ctx , "XPENDING" , streamKey , group , "1" , "1" , "10" ).Result ()
2559+ require .NoError (t , err )
2560+ entries , ok := result .([]interface {})
2561+ require .True (t , ok )
2562+ require .Len (t , entries , 3 , "XPENDING 1 1 should include every pending entry in millisecond 1, matching Redis" )
2563+
2564+ for i , entry := range entries {
2565+ fields , ok := entry .([]interface {})
2566+ require .True (t , ok )
2567+ require .Len (t , fields , 4 )
2568+ gotID , ok := fields [0 ].(string )
2569+ require .True (t , ok )
2570+ require .Equal (t , ids [i ], gotID )
2571+ gotConsumer , ok := fields [1 ].(string )
2572+ require .True (t , ok )
2573+ require .Equal (t , consumer , gotConsumer )
2574+ require .GreaterOrEqual (t , fields [2 ], int64 (0 ))
2575+ require .EqualValues (t , 1 , fields [3 ])
2576+ }
2577+
2578+ require .NoError (t , rdb .Del (ctx , streamKey ).Err ())
2579+ })
2580+
2581+ t .Run ("XPENDING with exclusive start should match Redis" , func (t * testing.T ) {
2582+ streamKey := "xpending-exclusive-start-test"
2583+ group := "grp"
2584+ consumer := "con"
2585+ ids := []string {"1-0" , "1-1" , "1-2" }
2586+
2587+ require .NoError (t , rdb .Del (ctx , streamKey ).Err ())
2588+ for i , id := range ids {
2589+ require .NoError (t , rdb .XAdd (ctx , & redis.XAddArgs {Stream : streamKey , ID : id , Values : []string {"f" , strconv .Itoa (i )}}).Err ())
2590+ }
2591+
2592+ require .NoError (t , rdb .XGroupCreateMkStream (ctx , streamKey , group , "0" ).Err ())
2593+ _ , err := rdb .XReadGroup (ctx , & redis.XReadGroupArgs {Group : group , Consumer : consumer , Streams : []string {streamKey , ">" }, Count : 10 }).Result ()
2594+ require .NoError (t , err )
2595+
2596+ result , err := rdb .Do (ctx , "XPENDING" , streamKey , group , "(1-0" , "+" , "10" ).Result ()
2597+ require .NoError (t , err )
2598+ entries , ok := result .([]interface {})
2599+ require .True (t , ok )
2600+ require .Len (t , entries , 2 , "XPENDING (1-0 + 10 should exclude the first pending entry, matching Redis" )
2601+
2602+ for i , entry := range entries {
2603+ fields , ok := entry .([]interface {})
2604+ require .True (t , ok )
2605+ require .Len (t , fields , 4 )
2606+ gotID , ok := fields [0 ].(string )
2607+ require .True (t , ok )
2608+ require .Equal (t , ids [i + 1 ], gotID )
2609+ gotConsumer , ok := fields [1 ].(string )
2610+ require .True (t , ok )
2611+ require .Equal (t , consumer , gotConsumer )
2612+ require .GreaterOrEqual (t , fields [2 ], int64 (0 ))
2613+ require .EqualValues (t , 1 , fields [3 ])
2614+ }
2615+
2616+ require .NoError (t , rdb .Del (ctx , streamKey ).Err ())
2617+ })
25422618}
25432619
25442620func parseStreamEntryID (id string ) (ts int64 , seqNum int64 ) {
0 commit comments