88 "fmt"
99 "log"
1010 "slices"
11+ "strconv"
1112 "sync"
1213 "sync/atomic"
1314 "time"
@@ -187,19 +188,25 @@ func (c *Controller) discoverReplicationJobs(continueToken []byte) ([]*replicati
187188 }
188189 defer func () { ignoreError (tx .Rollback ()) }()
189190
191+ // discover blobs to replicate in batches (instead of buffering unbounded # of jobs in RAM)
190192 batchLimit := 500
193+ // if all volumes are offline for this replication target's queue we'd end up hammering 100 % CPU with
194+ // this discovery process unless we have some throttling in place.
195+ notAccessibleLimit := 25_000
191196
192197 jobs := []* replicationJob {}
193198
199+ notAccessiblesEncountered := 0
200+
194201 nextContinueToken := stodb .StartFromFirst
195202
196203 err = stodb .BlobsPendingReplicationByVolumeIndex .Query (volIDToBytesForIndex (c .toVolumeID ), continueToken , func (id []byte ) error {
197- if len (jobs ) == batchLimit {
204+ batchLimitHit := len (jobs ) == batchLimit
205+ notAccessibleLimitHit := notAccessiblesEncountered == notAccessibleLimit
206+ if batchLimitHit || notAccessibleLimitHit {
198207 nextContinueToken = id
199208
200- c .logl .Info .Printf (
201- "operating @ batchLimit (%d)" ,
202- batchLimit )
209+ c .logl .Info .Printf ("batchLimitHit=%v (%d) notAccessibleLimitHit=%v (%d)" , batchLimitHit , batchLimit , notAccessibleLimitHit , notAccessibleLimit )
203210 return stodb .StopIteration
204211 }
205212
@@ -221,8 +228,9 @@ func (c *Controller) discoverReplicationJobs(continueToken []byte) ([]*replicati
221228 if err != nil {
222229 if err == stotypes .ErrBlobNotAccessibleOnThisNode {
223230 c .stats .blobVolumeNotAccessible ++
231+ notAccessiblesEncountered ++
224232 return nil
225- } else {
233+ } else { // not expected (above func shouldn't return any other error)
226234 c .stats .otherErrors ++
227235 return err
228236 }
@@ -257,7 +265,7 @@ func HasQueuedWriteIOsForVolume(volID int, tx *bbolt.Tx) (bool, error) {
257265}
258266
259267func volIDToBytesForIndex (volID int ) []byte {
260- return []byte (fmt . Sprintf ( "%d" , volID ))
268+ return []byte (strconv . Itoa ( volID ))
261269}
262270
263271type atomicInt32 struct {
0 commit comments