Skip to content

Commit a440b3a

Browse files
committed
Implement RunPipelineQueue function to manage worker execution and pipeline processing
1 parent 50bd551 commit a440b3a

1 file changed

Lines changed: 23 additions & 0 deletions

File tree

internal/process/worker.go

Lines changed: 23 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -28,3 +28,26 @@ func (w *Worker) Start(pipelineChan <-chan *Pipeline, conn *pgxpool.Pool, wg *sy
2828
}
2929
}
3030
}
31+
32+
// RunPipelineQueue drains a PipelineQueue into a channel and runs it with workers.
33+
func RunPipelineQueue(queue *PipelineQueue, conn *pgxpool.Pool, cfg *config.Config) error {
34+
if queue.IsEmpty() {
35+
return nil
36+
}
37+
38+
pipChan := make(chan *Pipeline, queue.Len())
39+
for !queue.IsEmpty() {
40+
if p := queue.Dequeue(); p != nil {
41+
pipChan <- p
42+
}
43+
}
44+
close(pipChan)
45+
46+
var wg sync.WaitGroup
47+
for i := 1; i <= cfg.Batch.Threads; i++ {
48+
wg.Add(1)
49+
go NewWorker(i).Start(pipChan, conn, &wg, cfg)
50+
}
51+
wg.Wait()
52+
return nil
53+
}

0 commit comments

Comments
 (0)