@@ -87,13 +87,9 @@ func (pb *pendingBase[T]) getPending(ctx context.Context) ([]T, error) {
8787 }
8888
8989 pb .inFlightMu .Lock ()
90- claims := slices .Clone (pb .inFlightClaims )
91- gaps := slices .Clone (pb .gaps )
90+ rangeStart , rangeEnd := findAvailableRange (pb .gaps , pb .inFlightClaims , lastSubmitted , storeHeight )
9291 pb .inFlightMu .Unlock ()
9392
94- // Determine the first contiguous unclaimed range.
95- // Priority: gaps (failed ranges below lastHeight), then items above lastHeight.
96- rangeStart , rangeEnd := findAvailableRange (gaps , claims , lastSubmitted , storeHeight )
9793 if rangeStart == 0 || rangeStart > rangeEnd {
9894 return nil , nil
9995 }
@@ -103,30 +99,18 @@ func (pb *pendingBase[T]) getPending(ctx context.Context) ([]T, error) {
10399 rangeEnd = rangeStart + uint64 (DefaultPendingCacheSize ) - 1
104100 }
105101
106- // Fetch items not already in cache
102+ pending := make ([] T , 0 , rangeEnd - rangeStart + 1 )
107103 for h := rangeStart ; h <= rangeEnd ; h ++ {
108- if _ , ok := pb .pendingCache .Peek (h ); ok {
104+ if item , ok := pb .pendingCache .Get (h ); ok {
105+ pending = append (pending , item )
109106 continue
110107 }
111108 item , err := pb .fetch (ctx , pb .store , h )
112109 if err != nil {
113- return nil , err
110+ return pending , err
114111 }
115112 pb .pendingCache .Add (h , item )
116- }
117-
118- pending := make ([]T , 0 , rangeEnd - rangeStart + 1 )
119- for h := rangeStart ; h <= rangeEnd ; h ++ {
120- if item , ok := pb .pendingCache .Get (h ); ok {
121- pending = append (pending , item )
122- } else {
123- item , err := pb .fetch (ctx , pb .store , h )
124- if err != nil {
125- return pending , err
126- }
127- pb .pendingCache .Add (h , item )
128- pending = append (pending , item )
129- }
113+ pending = append (pending , item )
130114 }
131115
132116 if len (pending ) > 0 {
@@ -146,24 +130,17 @@ func (pb *pendingBase[T]) numPending() uint64 {
146130 return 0
147131 }
148132
149- pb .inFlightMu .Lock ()
150- claims := slices .Clone (pb .inFlightClaims )
151- gaps := slices .Clone (pb .gaps )
152- pb .inFlightMu .Unlock ()
153-
154133 lastSubmitted := pb .lastHeight .Load ()
155134
135+ pb .inFlightMu .Lock ()
156136 var count uint64
157-
158- // Count gap items not covered by claims
159- for _ , gap := range gaps {
160- count += countUnclaimed (gap .start , gap .end , claims )
137+ for _ , gap := range pb .gaps {
138+ count += countUnclaimed (gap .start , gap .end , pb .inFlightClaims )
161139 }
162-
163- // Count items above lastHeight not covered by claims
164140 if height > lastSubmitted {
165- count += countUnclaimed (lastSubmitted + 1 , height , claims )
141+ count += countUnclaimed (lastSubmitted + 1 , height , pb . inFlightClaims )
166142 }
143+ pb .inFlightMu .Unlock ()
167144
168145 return count
169146}
@@ -199,16 +176,17 @@ func (pb *pendingBase[T]) resetInFlightRange(start, end uint64) {
199176 defer pb .inFlightMu .Unlock ()
200177
201178 var removedClaim * inFlightClaim
202- newClaims := make ([] inFlightClaim , 0 , len ( pb . inFlightClaims ))
179+ n := 0
203180 for _ , c := range pb .inFlightClaims {
204181 if c .end < start || c .start > end {
205- newClaims = append (newClaims , c )
182+ pb .inFlightClaims [n ] = c
183+ n ++
206184 } else {
207185 cc := c
208186 removedClaim = & cc
209187 }
210188 }
211- pb .inFlightClaims = newClaims
189+ pb .inFlightClaims = pb . inFlightClaims [: n ]
212190
213191 currentLast := pb .lastHeight .Load ()
214192
0 commit comments