@@ -771,15 +771,17 @@ func (i *SQLStore) FetchPendingInvoices(ctx context.Context) (
771771
772772 readTxOpt := sqldb .ReadTxOpt ()
773773 err := i .db .ExecTx (ctx , readTxOpt , func (db SQLInvoiceQueries ) error {
774- return queryWithLimit (func (offset int ) (int , error ) {
774+ var cursor int64
775+ limit := int32 (i .opts .paginationLimit )
776+ for {
775777 params := sqlc.FetchPendingInvoicesParams {
776- NumOffset : int32 ( offset ) ,
777- NumLimit : int32 ( i . opts . paginationLimit ) ,
778+ IDCursor : cursor ,
779+ NumLimit : limit ,
778780 }
779781
780782 rows , err := db .FetchPendingInvoices (ctx , params )
781783 if err != nil && ! errors .Is (err , sql .ErrNoRows ) {
782- return 0 , fmt .Errorf ("unable to get invoices " +
784+ return fmt .Errorf ("unable to get invoices " +
783785 "from db: %w" , err )
784786 }
785787
@@ -789,14 +791,17 @@ func (i *SQLStore) FetchPendingInvoices(ctx context.Context) (
789791 ctx , db , row , nil , true ,
790792 )
791793 if err != nil {
792- return 0 , err
794+ return err
793795 }
794796
795797 invoices [* hash ] = * invoice
798+ cursor = row .ID
796799 }
797800
798- return len (rows ), nil
799- }, i .opts .paginationLimit )
801+ if int32 (len (rows )) < limit {
802+ return nil
803+ }
804+ }
800805 }, func () {
801806 invoices = make (map [lntypes.Hash ]Invoice )
802807 })
@@ -830,18 +835,20 @@ func (i *SQLStore) InvoicesSettledSince(ctx context.Context, idx uint64) (
830835
831836 readTxOpt := sqldb .ReadTxOpt ()
832837 err := i .db .ExecTx (ctx , readTxOpt , func (db SQLInvoiceQueries ) error {
833- err := queryWithLimit (func (offset int ) (int , error ) {
838+ var cursor int64
839+ limit := int32 (i .opts .paginationLimit )
840+ for {
834841 // settle_index is always provided here so the
835842 // invoices_settle_index_idx index can be used.
836843 params := sqlc.FilterInvoicesBySettleIndexParams {
837844 SettleIndexGet : sqldb .SQLInt64 (idx + 1 ),
838- NumOffset : int32 ( offset ) ,
839- NumLimit : int32 ( i . opts . paginationLimit ) ,
845+ IDCursor : cursor ,
846+ NumLimit : limit ,
840847 }
841848
842849 rows , err := db .FilterInvoicesBySettleIndex (ctx , params )
843850 if err != nil && ! errors .Is (err , sql .ErrNoRows ) {
844- return 0 , fmt .Errorf ("unable to get invoices " +
851+ return fmt .Errorf ("unable to get invoices " +
845852 "from db: %w" , err )
846853 }
847854
@@ -851,12 +858,13 @@ func (i *SQLStore) InvoicesSettledSince(ctx context.Context, idx uint64) (
851858 ctx , db , row , nil , true ,
852859 )
853860 if err != nil {
854- return 0 , fmt .Errorf ("unable to fetch " +
861+ return fmt .Errorf ("unable to fetch " +
855862 "invoice(id=%d) from db: %w" ,
856863 row .ID , err )
857864 }
858865
859866 invoices = append (invoices , * invoice )
867+ cursor = row .ID
860868
861869 processedCount ++
862870 if time .Since (lastLogTime ) >=
@@ -871,10 +879,9 @@ func (i *SQLStore) InvoicesSettledSince(ctx context.Context, idx uint64) (
871879 }
872880 }
873881
874- return len (rows ), nil
875- }, i .opts .paginationLimit )
876- if err != nil {
877- return err
882+ if int32 (len (rows )) < limit {
883+ break
884+ }
878885 }
879886
880887 // Now fetch all the AMP sub invoices that were settled since
@@ -977,18 +984,21 @@ func (i *SQLStore) InvoicesAddedSince(ctx context.Context, idx uint64) (
977984
978985 readTxOpt := sqldb .ReadTxOpt ()
979986 err := i .db .ExecTx (ctx , readTxOpt , func (db SQLInvoiceQueries ) error {
980- return queryWithLimit (func (offset int ) (int , error ) {
981- // id is always provided here so the primary-key
982- // index is used for this range scan.
987+ // id is always provided here so the primary-key index is used
988+ // for this range scan. The cursor starts at idx+1 so the first
989+ // page fetches invoices with id >= idx+1. After each page the
990+ // cursor advances to last_id + 1.
991+ cursor := int64 (idx + 1 )
992+ limit := int32 (i .opts .paginationLimit )
993+ for {
983994 params := sqlc.FilterInvoicesByAddIndexParams {
984- AddIndexGet : int64 (idx + 1 ),
985- NumOffset : int32 (offset ),
986- NumLimit : int32 (i .opts .paginationLimit ),
995+ AddIndexGet : cursor ,
996+ NumLimit : limit ,
987997 }
988998
989999 rows , err := db .FilterInvoicesByAddIndex (ctx , params )
9901000 if err != nil && ! errors .Is (err , sql .ErrNoRows ) {
991- return 0 , fmt .Errorf ("unable to get invoices " +
1001+ return fmt .Errorf ("unable to get invoices " +
9921002 "from db: %w" , err )
9931003 }
9941004
@@ -998,10 +1008,11 @@ func (i *SQLStore) InvoicesAddedSince(ctx context.Context, idx uint64) (
9981008 ctx , db , row , nil , true ,
9991009 )
10001010 if err != nil {
1001- return 0 , err
1011+ return err
10021012 }
10031013
10041014 result = append (result , * invoice )
1015+ cursor = row .ID + 1
10051016
10061017 processedCount ++
10071018 if time .Since (lastLogTime ) >=
@@ -1015,8 +1026,10 @@ func (i *SQLStore) InvoicesAddedSince(ctx context.Context, idx uint64) (
10151026 }
10161027 }
10171028
1018- return len (rows ), nil
1019- }, i .opts .paginationLimit )
1029+ if int32 (len (rows )) < limit {
1030+ return nil
1031+ }
1032+ }
10201033 }, func () {
10211034 result = nil
10221035 })
@@ -1064,46 +1077,51 @@ func (i *SQLStore) QueryInvoices(ctx context.Context,
10641077
10651078 readTxOpt := sqldb .ReadTxOpt ()
10661079 err := i .db .ExecTx (ctx , readTxOpt , func (db SQLInvoiceQueries ) error {
1067- return queryWithLimit (func (offset int ) (int , error ) {
1080+ limit := int32 (i .opts .paginationLimit )
1081+
1082+ // For reverse queries the cursor is an inclusive upper bound on
1083+ // id (id <= cursor); after each page it advances to
1084+ // last_returned_id - 1. Start at IndexOffset, or MaxInt64 to
1085+ // begin from the most recent invoice.
1086+ // For forward queries the cursor is an inclusive lower bound
1087+ // (id >= cursor); after each page it advances to
1088+ // last_returned_id + 1. Start at IndexOffset + 1 so the invoice
1089+ // at IndexOffset itself is excluded (matching the old
1090+ // behaviour).
1091+ var cursor int64
1092+ if q .Reversed {
1093+ cursor = int64 (math .MaxInt64 )
1094+ if q .IndexOffset != 0 {
1095+ cursor = int64 (q .IndexOffset ) - 1
1096+ }
1097+ } else {
1098+ cursor = int64 (q .IndexOffset ) + 1
1099+ }
1100+
1101+ for {
10681102 var (
1069- rows []sqlc.Invoice
1070- err error
1071- limit = int32 (i .opts .paginationLimit )
1103+ rows []sqlc.Invoice
1104+ err error
10721105 )
10731106
10741107 if q .Reversed {
1075- // For reverse queries the upper id bound is
1076- // always provided. When no offset is given we
1077- // start from the most recently added invoice.
1078- addIndexLet := int64 (math .MaxInt64 )
1079- if q .IndexOffset != 0 {
1080- // The invoice at IndexOffset must not
1081- // appear in the results.
1082- addIndexLet = int64 (q .IndexOffset ) - 1
1083- }
1084-
10851108 params := sqlc.FilterInvoicesReverseParams {
1086- AddIndexLet : addIndexLet ,
1109+ AddIndexLet : cursor ,
10871110 PendingOnly : q .PendingOnly ,
10881111 CreatedAfter : createdAfter ,
10891112 CreatedBefore : createdBefore ,
1090- NumOffset : int32 (offset ),
10911113 NumLimit : limit ,
10921114 }
10931115
10941116 rows , err = db .FilterInvoicesReverse (
10951117 ctx , params ,
10961118 )
10971119 } else {
1098- // For forward queries the lower id bound is
1099- // always provided. IndexOffset 0 means "start
1100- // from the very first invoice" (id >= 1).
11011120 params := sqlc.FilterInvoicesForwardParams {
1102- AddIndexGet : int64 ( q . IndexOffset ) + 1 ,
1121+ AddIndexGet : cursor ,
11031122 PendingOnly : q .PendingOnly ,
11041123 CreatedAfter : createdAfter ,
11051124 CreatedBefore : createdBefore ,
1106- NumOffset : int32 (offset ),
11071125 NumLimit : limit ,
11081126 }
11091127
@@ -1113,7 +1131,7 @@ func (i *SQLStore) QueryInvoices(ctx context.Context,
11131131 }
11141132
11151133 if err != nil && ! errors .Is (err , sql .ErrNoRows ) {
1116- return 0 , fmt .Errorf ("unable to get invoices " +
1134+ return fmt .Errorf ("unable to get invoices " +
11171135 "from db: %w" , err )
11181136 }
11191137
@@ -1123,18 +1141,25 @@ func (i *SQLStore) QueryInvoices(ctx context.Context,
11231141 ctx , db , row , nil , true ,
11241142 )
11251143 if err != nil {
1126- return 0 , err
1144+ return err
11271145 }
11281146
11291147 invoices = append (invoices , * invoice )
1148+ if q .Reversed {
1149+ cursor = row .ID - 1
1150+ } else {
1151+ cursor = row .ID + 1
1152+ }
11301153
11311154 if len (invoices ) == int (q .NumMaxInvoices ) {
1132- return 0 , nil
1155+ return nil
11331156 }
11341157 }
11351158
1136- return len (rows ), nil
1137- }, i .opts .paginationLimit )
1159+ if int32 (len (rows )) < limit {
1160+ return nil
1161+ }
1162+ }
11381163 }, func () {
11391164 invoices = nil
11401165 })
@@ -1891,22 +1916,3 @@ func unmarshalInvoiceHTLC(row sqlc.InvoiceHtlc) (CircuitKey,
18911916
18921917 return circuitKey , htlc , nil
18931918}
1894-
1895- // queryWithLimit is a helper method that can be used to query the database
1896- // using a limit and offset. The passed query function should return the number
1897- // of rows returned and an error if any.
1898- func queryWithLimit (query func (int ) (int , error ), limit int ) error {
1899- offset := 0
1900- for {
1901- rows , err := query (offset )
1902- if err != nil {
1903- return err
1904- }
1905-
1906- if rows < limit {
1907- return nil
1908- }
1909-
1910- offset += limit
1911- }
1912- }
0 commit comments