@@ -55,11 +55,10 @@ type Driver struct {
5555 log * slog.Logger
5656 pq jobs.Queue
5757 pipeline atomic.Pointer [jobs.Pipeline ]
58- cond * sync.Cond
5958
6059 listeners atomic.Uint32
61- active * uint64
62- delayed * uint64
60+ active atomic. Uint64
61+ delayed atomic. Uint64
6362
6463 stopCh chan struct {}
6564}
@@ -106,10 +105,6 @@ func FromConfig(_ context.Context, tracer *sdktrace.TracerProvider, configKey st
106105 return new (bytes.Buffer )
107106 },
108107 },
109- cond : sync .NewCond (& sync.Mutex {}),
110-
111- delayed : new (uint64 ),
112- active : new (uint64 ),
113108
114109 db : db ,
115110 log : log ,
@@ -170,10 +165,6 @@ func FromPipeline(_ context.Context, tracer *sdktrace.TracerProvider, pipeline j
170165 bPool : sync.Pool {New : func () any {
171166 return new (bytes.Buffer )
172167 }},
173- cond : sync .NewCond (& sync.Mutex {}),
174-
175- delayed : new (uint64 ),
176- active : new (uint64 ),
177168
178169 db : db ,
179170 log : log ,
@@ -216,7 +207,7 @@ func (d *Driver) Push(ctx context.Context, job jobs.Message) error {
216207 return errors .E (op , err )
217208 }
218209
219- atomic . AddUint64 ( d .delayed , 1 )
210+ d .delayed . Add ( 1 )
220211
221212 return nil
222213 }
@@ -227,7 +218,7 @@ func (d *Driver) Push(ctx context.Context, job jobs.Message) error {
227218 return errors .E (op , err )
228219 }
229220
230- atomic . AddUint64 ( d .active , 1 )
221+ d .active . Add ( 1 )
231222
232223 return nil
233224 })
@@ -297,7 +288,7 @@ func (d *Driver) Pause(ctx context.Context, p string) error {
297288 d .stopCh <- struct {}{}
298289 d .stopCh <- struct {}{}
299290
300- d .listeners .Add ( ^ uint32 ( 0 ) )
291+ d .listeners .Store ( 0 )
301292
302293 d .log .Debug ("pipeline was paused" , "driver" , pipe .Driver (), "pipeline" , pipe .Name (), "start" , start , "elapsed" , time .Since (start ))
303294
@@ -340,9 +331,9 @@ func (d *Driver) State(ctx context.Context) (*jobs.State, error) {
340331 Pipeline : pipe .Name (),
341332 Driver : pipe .Driver (),
342333 Queue : PushBucket ,
343- Priority : uint64 (pipe .Priority ()), //nolint:gosec
344- Active : int64 (atomic . LoadUint64 ( d .active )), //nolint:gosec
345- Delayed : int64 (atomic . LoadUint64 ( d .delayed )), //nolint:gosec
334+ Priority : uint64 (pipe .Priority ()), //nolint:gosec
335+ Active : int64 (d .active . Load ( )), //nolint:gosec
336+ Delayed : int64 (d .delayed . Load ( )), //nolint:gosec
346337 Ready : d .listeners .Load () > 0 ,
347338 }, nil
348339}
@@ -368,12 +359,26 @@ func create(db *bolt.DB) error {
368359 }
369360
370361 inQb := tx .Bucket (strToBytes (InQueueBucket ))
371- cursor := inQb .Cursor ()
372-
373362 pushB := tx .Bucket (strToBytes (PushBucket ))
374363
375- for k , v := cursor .First (); k != nil ; k , v = cursor .Next () {
376- err = pushB .Put (k , v )
364+ // Collect all in-queue entries before any deletes: deleting the
365+ // current key mid-iteration shifts the underlying inode slice and
366+ // causes cursor.Next() to skip the next entry.
367+ type kv struct { k , v []byte }
368+ var entries []kv
369+ cur := inQb .Cursor ()
370+ for k , v := cur .First (); k != nil ; k , v = cur .Next () {
371+ entries = append (entries , kv {
372+ k : append ([]byte (nil ), k ... ),
373+ v : append ([]byte (nil ), v ... ),
374+ })
375+ }
376+ for _ , e := range entries {
377+ err = pushB .Put (e .k , e .v )
378+ if err != nil {
379+ return errors .E (upOp , err )
380+ }
381+ err = inQb .Delete (e .k )
377382 if err != nil {
378383 return errors .E (upOp , err )
379384 }
0 commit comments