Skip to content

Commit 7c3bdae

Browse files
worstellampcode-com
andcommitted
feat(scheduler): replace clone concurrency limit with cost-based admission
Replace the binary isCloneJob/MaxCloneConcurrency mechanism with a generic cost model. Strategies now declare the cost of each job at submit time, and the scheduler tracks total active cost against a configurable budget (max-cost). Cost constants defined in the git strategy: clone=4, snapshot=3, repack=2, fetch=1 Default max-cost is Concurrency * 4. A job is always admitted when nothing else is running, even if its cost exceeds max-cost, to prevent permanent starvation from misconfiguration. This removes isCloneJob and the scheduler's knowledge of git-specific job types. The scheduler now sees only costs and the strategy decides what each job is worth. Amp-Thread-ID: https://ampcode.com/threads/T-019d404e-21ec-723a-b211-c619925dd12e Co-authored-by: Amp <amp@ampcode.com>
1 parent 4b29ada commit 7c3bdae

File tree

7 files changed

+111
-106
lines changed

7 files changed

+111
-106
lines changed

internal/jobscheduler/jobs.go

Lines changed: 45 additions & 47 deletions
Original file line numberDiff line numberDiff line change
@@ -16,14 +16,15 @@ import (
1616
)
1717

1818
type Config struct {
19-
Concurrency int `hcl:"concurrency" help:"The maximum number of concurrent jobs to run (0 means number of cores)." default:"4"`
20-
MaxCloneConcurrency int `hcl:"max-clone-concurrency" help:"Maximum number of concurrent clone jobs. Remaining worker slots are reserved for fetch/repack/snapshot jobs. 0 means no limit." default:"0"`
21-
SchedulerDB string `hcl:"scheduler-db" help:"Path to the scheduler state database." default:"${CACHEW_STATE}/scheduler.db"`
19+
Concurrency int `hcl:"concurrency" help:"The maximum number of concurrent jobs to run (0 means number of cores)." default:"4"`
20+
MaxCost int `hcl:"max-cost" help:"Maximum total cost of concurrently running jobs. Each job declares its own cost at submission. 0 means Concurrency * 4." default:"0"`
21+
SchedulerDB string `hcl:"scheduler-db" help:"Path to the scheduler state database." default:"${CACHEW_STATE}/scheduler.db"`
2222
}
2323

2424
type queueJob struct {
2525
id string
2626
queue string
27+
cost int
2728
run func(ctx context.Context) error
2829
}
2930

@@ -43,27 +44,31 @@ type Scheduler interface {
4344
//
4445
// This is useful to avoid collisions across strategies.
4546
WithQueuePrefix(prefix string) Scheduler
46-
// Submit a job to the queue.
47+
// Submit a job to the queue with a given cost.
4748
//
4849
// Jobs run concurrently across queues, but never within a queue.
49-
Submit(queue, id string, run func(ctx context.Context) error)
50+
// The cost is used for admission control: a job is only started when
51+
// activeCost + cost <= maxCost.
52+
Submit(queue, id string, cost int, run func(ctx context.Context) error)
5053
// SubmitPeriodicJob submits a job to the queue that runs immediately, and then periodically after the interval.
5154
//
5255
// Jobs run concurrently across queues, but never within a queue.
53-
SubmitPeriodicJob(queue, id string, interval time.Duration, run func(ctx context.Context) error)
56+
// The cost is used for admission control: a job is only started when
57+
// activeCost + cost <= maxCost.
58+
SubmitPeriodicJob(queue, id string, cost int, interval time.Duration, run func(ctx context.Context) error)
5459
}
5560

5661
type prefixedScheduler struct {
5762
prefix string
5863
scheduler Scheduler
5964
}
6065

61-
func (p *prefixedScheduler) Submit(queue, id string, run func(ctx context.Context) error) {
62-
p.scheduler.Submit(queue, p.prefix+id, run)
66+
func (p *prefixedScheduler) Submit(queue, id string, cost int, run func(ctx context.Context) error) {
67+
p.scheduler.Submit(queue, p.prefix+id, cost, run)
6368
}
6469

65-
func (p *prefixedScheduler) SubmitPeriodicJob(queue, id string, interval time.Duration, run func(ctx context.Context) error) {
66-
p.scheduler.SubmitPeriodicJob(queue, p.prefix+id, interval, run)
70+
func (p *prefixedScheduler) SubmitPeriodicJob(queue, id string, cost int, interval time.Duration, run func(ctx context.Context) error) {
71+
p.scheduler.SubmitPeriodicJob(queue, p.prefix+id, cost, interval, run)
6772
}
6873

6974
func (p *prefixedScheduler) WithQueuePrefix(prefix string) Scheduler {
@@ -74,15 +79,16 @@ func (p *prefixedScheduler) WithQueuePrefix(prefix string) Scheduler {
7479
}
7580

7681
type RootScheduler struct {
77-
workAvailable chan bool
78-
lock sync.Mutex
79-
queue []queueJob
80-
active map[string]string // queue -> job id
81-
activeClones int
82-
maxCloneConcurrency int
83-
cancel context.CancelFunc
84-
store ScheduleStore
85-
metrics *schedulerMetrics
82+
workAvailable chan bool
83+
lock sync.Mutex
84+
queue []queueJob
85+
active map[string]string // queue -> job id
86+
activeCost int
87+
activeCosts map[string]int // queue -> cost of running job
88+
maxCost int
89+
cancel context.CancelFunc
90+
store ScheduleStore
91+
metrics *schedulerMetrics
8692
}
8793

8894
var _ Scheduler = &RootScheduler{}
@@ -109,21 +115,21 @@ func New(ctx context.Context, config Config) (*RootScheduler, error) {
109115
return nil, errors.Wrap(err, "create schedule store")
110116
}
111117
}
112-
maxClones := config.MaxCloneConcurrency
113-
if maxClones == 0 && config.Concurrency > 1 {
114-
// Default: reserve at least half the workers for non-clone jobs.
115-
maxClones = max(1, config.Concurrency/2)
118+
maxCost := config.MaxCost
119+
if maxCost == 0 {
120+
maxCost = config.Concurrency * 4
116121
}
117122
m, err := newSchedulerMetrics()
118123
if err != nil {
119124
return nil, errors.Wrap(err, "create scheduler metrics")
120125
}
121126
q := &RootScheduler{
122-
workAvailable: make(chan bool, 1024),
123-
active: make(map[string]string),
124-
maxCloneConcurrency: maxClones,
125-
store: store,
126-
metrics: m,
127+
workAvailable: make(chan bool, 1024),
128+
active: make(map[string]string),
129+
activeCosts: make(map[string]int),
130+
maxCost: maxCost,
131+
store: store,
132+
metrics: m,
127133
}
128134
ctx, cancel := context.WithCancel(ctx)
129135
q.cancel = cancel
@@ -147,19 +153,19 @@ func (q *RootScheduler) WithQueuePrefix(prefix string) Scheduler {
147153
}
148154
}
149155

150-
func (q *RootScheduler) Submit(queue, id string, run func(ctx context.Context) error) {
156+
func (q *RootScheduler) Submit(queue, id string, cost int, run func(ctx context.Context) error) {
151157
q.lock.Lock()
152-
q.queue = append(q.queue, queueJob{queue: queue, id: id, run: run})
158+
q.queue = append(q.queue, queueJob{queue: queue, id: id, cost: cost, run: run})
153159
q.metrics.queueDepth.Record(context.Background(), int64(len(q.queue)))
154160
q.lock.Unlock()
155161
q.workAvailable <- true
156162
}
157163

158-
func (q *RootScheduler) SubmitPeriodicJob(queue, id string, interval time.Duration, run func(ctx context.Context) error) {
164+
func (q *RootScheduler) SubmitPeriodicJob(queue, id string, cost int, interval time.Duration, run func(ctx context.Context) error) {
159165
key := jobKey(queue, id)
160166
delay := q.periodicDelay(key, interval)
161167
submit := func() {
162-
q.Submit(queue, id, func(ctx context.Context) error {
168+
q.Submit(queue, id, cost, func(ctx context.Context) error {
163169
err := run(ctx)
164170
if q.store != nil {
165171
if storeErr := q.store.SetLastRun(key, time.Now()); storeErr != nil {
@@ -168,7 +174,7 @@ func (q *RootScheduler) SubmitPeriodicJob(queue, id string, interval time.Durati
168174
}
169175
go func() {
170176
time.Sleep(interval)
171-
q.SubmitPeriodicJob(queue, id, interval, run)
177+
q.SubmitPeriodicJob(queue, id, cost, interval, run)
172178
}()
173179
return errors.WithStack(err)
174180
})
@@ -252,19 +258,12 @@ func jobType(id string) string {
252258
func (q *RootScheduler) markQueueInactive(queue string) {
253259
q.lock.Lock()
254260
defer q.lock.Unlock()
255-
if isCloneJob(q.active[queue]) {
256-
q.activeClones--
257-
}
261+
q.activeCost -= q.activeCosts[queue]
262+
delete(q.activeCosts, queue)
258263
delete(q.active, queue)
259264
q.recordGaugesLocked()
260265
}
261266

262-
// isCloneJob returns true for job IDs that represent long-running clone operations
263-
// which should be subject to concurrency limits.
264-
func isCloneJob(id string) bool {
265-
return strings.HasSuffix(id, "clone") || strings.HasSuffix(id, "deferred-mirror-restore")
266-
}
267-
268267
// Take the next job for any queue that is not already running a job.
269268
func (q *RootScheduler) takeNextJob() (queueJob, bool) {
270269
q.lock.Lock()
@@ -273,15 +272,14 @@ func (q *RootScheduler) takeNextJob() (queueJob, bool) {
273272
if _, active := q.active[job.queue]; active {
274273
continue
275274
}
276-
if q.maxCloneConcurrency > 0 && isCloneJob(job.id) && q.activeClones >= q.maxCloneConcurrency {
275+
if q.activeCost > 0 && q.activeCost+job.cost > q.maxCost {
277276
continue
278277
}
279278
q.queue = append(q.queue[:i], q.queue[i+1:]...)
280279
q.workAvailable <- true
281280
q.active[job.queue] = job.id
282-
if isCloneJob(job.id) {
283-
q.activeClones++
284-
}
281+
q.activeCost += job.cost
282+
q.activeCosts[job.queue] = job.cost
285283
q.recordGaugesLocked()
286284
return job, true
287285
}
@@ -293,5 +291,5 @@ func (q *RootScheduler) recordGaugesLocked() {
293291
ctx := context.Background()
294292
q.metrics.queueDepth.Record(ctx, int64(len(q.queue)))
295293
q.metrics.activeWorkers.Record(ctx, int64(len(q.active)))
296-
q.metrics.activeClones.Record(ctx, int64(q.activeClones))
294+
q.metrics.activeCost.Record(ctx, int64(q.activeCost))
297295
}

0 commit comments

Comments
 (0)