Skip to content

Commit 919d6cc

Browse files
worstellampcode-com
andcommitted
feat(scheduler): add priority queue support
Add a priority-queues config option to the scheduler that accepts a list of queue name prefixes. Jobs whose queue matches a priority prefix are dequeued before non-priority jobs, while maintaining FIFO order within each tier. This allows operators to ensure that known important repositories (e.g., monorepos) are never starved by a flood of cold clone jobs for less critical repos. The queue name is the upstream URL, so configuration is straightforward: scheduler { priority-queues = ["https://github.com/org/monorepo"] } Amp-Thread-ID: https://ampcode.com/threads/T-019d404e-21ec-723a-b211-c619925dd12e Co-authored-by: Amp <amp@ampcode.com>
1 parent 4d2963e commit 919d6cc

2 files changed

Lines changed: 108 additions & 13 deletions

File tree

internal/jobscheduler/jobs.go

Lines changed: 49 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -14,9 +14,10 @@ import (
1414
)
1515

1616
type Config struct {
17-
Concurrency int `hcl:"concurrency" help:"The maximum number of concurrent jobs to run (0 means number of cores)." default:"4"`
18-
MaxCloneConcurrency int `hcl:"max-clone-concurrency" help:"Maximum number of concurrent clone jobs. Remaining worker slots are reserved for fetch/repack/snapshot jobs. 0 means no limit." default:"0"`
19-
SchedulerDB string `hcl:"scheduler-db" help:"Path to the scheduler state database." default:"${CACHEW_STATE}/scheduler.db"`
17+
Concurrency int `hcl:"concurrency" help:"The maximum number of concurrent jobs to run (0 means number of cores)." default:"4"`
18+
MaxCloneConcurrency int `hcl:"max-clone-concurrency" help:"Maximum number of concurrent clone jobs. Remaining worker slots are reserved for fetch/repack/snapshot jobs. 0 means no limit." default:"0"`
19+
SchedulerDB string `hcl:"scheduler-db" help:"Path to the scheduler state database." default:"${CACHEW_STATE}/scheduler.db"`
20+
PriorityQueues []string `hcl:"priority-queues,optional" help:"Queue name prefixes that should be dequeued before other jobs. Matches if the queue starts with any prefix."`
2021
}
2122

2223
type queueJob struct {
@@ -78,6 +79,7 @@ type RootScheduler struct {
7879
active map[string]string // queue -> job id
7980
activeClones int
8081
maxCloneConcurrency int
82+
priorityQueues []string
8183
cancel context.CancelFunc
8284
store ScheduleStore
8385
}
@@ -115,6 +117,7 @@ func New(ctx context.Context, config Config) (*RootScheduler, error) {
115117
workAvailable: make(chan bool, 1024),
116118
active: make(map[string]string),
117119
maxCloneConcurrency: maxClones,
120+
priorityQueues: config.PriorityQueues,
118121
store: store,
119122
}
120123
ctx, cancel := context.WithCancel(ctx)
@@ -229,24 +232,57 @@ func isCloneJob(id string) bool {
229232
return strings.HasSuffix(id, "clone") || strings.HasSuffix(id, "deferred-mirror-restore")
230233
}
231234

232-
// Take the next job for any queue that is not already running a job.
235+
// takeNextJob selects the next eligible job. Priority queue jobs are preferred over non-priority jobs;
236+
// within the same priority tier, jobs are dequeued in submission order (FIFO).
233237
func (q *RootScheduler) takeNextJob() (queueJob, bool) {
234238
q.lock.Lock()
235239
defer q.lock.Unlock()
240+
idx := -1
236241
for i, job := range q.queue {
237-
if _, active := q.active[job.queue]; active {
242+
if !q.isEligibleLocked(job) {
238243
continue
239244
}
240-
if q.maxCloneConcurrency > 0 && isCloneJob(job.id) && q.activeClones >= q.maxCloneConcurrency {
245+
if idx == -1 {
246+
idx = i
247+
if q.isPriority(job.queue) {
248+
break // first eligible priority job wins immediately
249+
}
241250
continue
242251
}
243-
q.queue = append(q.queue[:i], q.queue[i+1:]...)
244-
q.workAvailable <- true
245-
q.active[job.queue] = job.id
246-
if isCloneJob(job.id) {
247-
q.activeClones++
252+
// idx holds a non-priority candidate; upgrade if this job is priority
253+
if q.isPriority(job.queue) {
254+
idx = i
255+
break
256+
}
257+
}
258+
if idx == -1 {
259+
return queueJob{}, false
260+
}
261+
job := q.queue[idx]
262+
q.queue = append(q.queue[:idx], q.queue[idx+1:]...)
263+
q.workAvailable <- true
264+
q.active[job.queue] = job.id
265+
if isCloneJob(job.id) {
266+
q.activeClones++
267+
}
268+
return job, true
269+
}
270+
271+
func (q *RootScheduler) isEligibleLocked(job queueJob) bool {
272+
if _, active := q.active[job.queue]; active {
273+
return false
274+
}
275+
if q.maxCloneConcurrency > 0 && isCloneJob(job.id) && q.activeClones >= q.maxCloneConcurrency {
276+
return false
277+
}
278+
return true
279+
}
280+
281+
func (q *RootScheduler) isPriority(queue string) bool {
282+
for _, prefix := range q.priorityQueues {
283+
if strings.HasPrefix(queue, prefix) {
284+
return true
248285
}
249-
return job, true
250286
}
251-
return queueJob{}, false
287+
return false
252288
}

internal/jobscheduler/jobs_test.go

Lines changed: 59 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -460,6 +460,65 @@ func FuzzJobScheduler(f *testing.F) {
460460
})
461461
}
462462

463+
func TestJobSchedulerPriorityQueues(t *testing.T) {
464+
_, ctx := logging.Configure(context.Background(), logging.Config{Level: slog.LevelError})
465+
ctx, cancel := context.WithCancel(ctx)
466+
defer cancel()
467+
468+
// Single worker so jobs execute sequentially, making order deterministic.
469+
scheduler := newTestScheduler(ctx, t, jobscheduler.Config{
470+
Concurrency: 1,
471+
PriorityQueues: []string{"https://github.com/important/"},
472+
})
473+
474+
var (
475+
mu sync.Mutex
476+
order []string
477+
)
478+
479+
// Block the single worker so all subsequent submits queue up.
480+
blocker := make(chan struct{})
481+
scheduler.Submit("blocker", "block", func(_ context.Context) error {
482+
<-blocker
483+
return nil
484+
})
485+
// Give the worker time to pick up the blocking job.
486+
time.Sleep(50 * time.Millisecond)
487+
488+
// Submit non-priority first, then priority. Priority should execute first after the blocker.
489+
scheduler.Submit("https://github.com/random/repo", "job", func(_ context.Context) error {
490+
mu.Lock()
491+
order = append(order, "random")
492+
mu.Unlock()
493+
return nil
494+
})
495+
scheduler.Submit("https://github.com/important/monorepo", "job", func(_ context.Context) error {
496+
mu.Lock()
497+
order = append(order, "important")
498+
mu.Unlock()
499+
return nil
500+
})
501+
scheduler.Submit("https://github.com/other/thing", "job", func(_ context.Context) error {
502+
mu.Lock()
503+
order = append(order, "other")
504+
mu.Unlock()
505+
return nil
506+
})
507+
508+
// Release the blocker.
509+
close(blocker)
510+
511+
eventually(t, 2*time.Second, func() bool {
512+
mu.Lock()
513+
defer mu.Unlock()
514+
return len(order) == 3
515+
}, "all jobs should complete")
516+
517+
mu.Lock()
518+
defer mu.Unlock()
519+
assert.Equal(t, "important", order[0], "priority job should execute first, got: %v", order)
520+
}
521+
463522
func TestJobSchedulerCloneConcurrencyLimit(t *testing.T) {
464523
_, ctx := logging.Configure(context.Background(), logging.Config{Level: slog.LevelError})
465524
ctx, cancel := context.WithCancel(ctx)

0 commit comments

Comments
 (0)