Skip to content

Commit dc0fb39

Browse files
worstellampcode-com
andcommitted
feat(scheduler): add cost-based admission and Job struct
Add a cost field to scheduler jobs so strategies declare resource weight at submit time. The scheduler tracks total active cost against a configurable budget (max-cost), providing a general mechanism to limit total resource pressure from heavy background work. Additionally, replace positional Submit parameters with a Job struct that includes an explicit Clone bool, removing the isCloneJob string matching. MaxCloneConcurrency is preserved as a direct, independent admission check alongside cost. Cost constants defined in the git strategy: clone=4, snapshot=3, repack=2, fetch=1 Two independent admission checks in takeNextJob: 1. Cost budget: activeCost + job.Cost <= maxCost 2. Clone limit: Clone jobs capped at MaxCloneConcurrency Co-authored-by: Amp <amp@ampcode.com> Amp-Thread-ID: https://ampcode.com/threads/T-019d4a57-1477-707c-bb89-5543fddff0e7
1 parent 88105a0 commit dc0fb39

7 files changed

Lines changed: 155 additions & 117 deletions

File tree

internal/jobscheduler/jobs.go

Lines changed: 61 additions & 40 deletions
Original file line numberDiff line numberDiff line change
@@ -18,19 +18,23 @@ import (
1818
type Config struct {
1919
Concurrency int `hcl:"concurrency" help:"The maximum number of concurrent jobs to run (0 means number of cores)." default:"4"`
2020
MaxCloneConcurrency int `hcl:"max-clone-concurrency" help:"Maximum number of concurrent clone jobs. Remaining worker slots are reserved for fetch/repack/snapshot jobs. 0 means no limit." default:"0"`
21+
MaxCost int `hcl:"max-cost" help:"Maximum total cost of concurrently running jobs. Each job declares its own cost at submission. 0 means Concurrency * 4." default:"0"`
2122
SchedulerDB string `hcl:"scheduler-db" help:"Path to the scheduler state database." default:"${CACHEW_STATE}/scheduler.db"`
2223
}
2324

24-
type queueJob struct {
25-
id string
26-
queue string
27-
run func(ctx context.Context) error
25+
// Job describes a unit of work to submit to the scheduler.
26+
type Job struct {
27+
Queue string
28+
ID string
29+
Cost int
30+
Clone bool // Subject to MaxCloneConcurrency limits.
31+
Run func(ctx context.Context) error
2832
}
2933

3034
func jobKey(queue, id string) string { return id + ":" + queue }
3135

32-
func (j *queueJob) String() string { return jobKey(j.queue, j.id) }
33-
func (j *queueJob) Run(ctx context.Context) error { return errors.WithStack(j.run(ctx)) }
36+
func (j *Job) String() string { return jobKey(j.Queue, j.ID) }
37+
func (j *Job) run(ctx context.Context) error { return errors.WithStack(j.Run(ctx)) }
3438

3539
// Scheduler runs background jobs concurrently across multiple serialised queues.
3640
//
@@ -43,27 +47,29 @@ type Scheduler interface {
4347
//
4448
// This is useful to avoid collisions across strategies.
4549
WithQueuePrefix(prefix string) Scheduler
46-
// Submit a job to the queue.
50+
// Submit a job to the scheduler.
4751
//
4852
// Jobs run concurrently across queues, but never within a queue.
49-
Submit(queue, id string, run func(ctx context.Context) error)
50-
// SubmitPeriodicJob submits a job to the queue that runs immediately, and then periodically after the interval.
53+
Submit(job Job)
54+
// SubmitPeriodicJob submits a job that runs immediately, then repeats after the interval.
5155
//
5256
// Jobs run concurrently across queues, but never within a queue.
53-
SubmitPeriodicJob(queue, id string, interval time.Duration, run func(ctx context.Context) error)
57+
SubmitPeriodicJob(job Job, interval time.Duration)
5458
}
5559

5660
type prefixedScheduler struct {
5761
prefix string
5862
scheduler Scheduler
5963
}
6064

61-
func (p *prefixedScheduler) Submit(queue, id string, run func(ctx context.Context) error) {
62-
p.scheduler.Submit(queue, p.prefix+id, run)
65+
func (p *prefixedScheduler) Submit(job Job) {
66+
job.ID = p.prefix + job.ID
67+
p.scheduler.Submit(job)
6368
}
6469

65-
func (p *prefixedScheduler) SubmitPeriodicJob(queue, id string, interval time.Duration, run func(ctx context.Context) error) {
66-
p.scheduler.SubmitPeriodicJob(queue, p.prefix+id, interval, run)
70+
func (p *prefixedScheduler) SubmitPeriodicJob(job Job, interval time.Duration) {
71+
job.ID = p.prefix + job.ID
72+
p.scheduler.SubmitPeriodicJob(job, interval)
6773
}
6874

6975
func (p *prefixedScheduler) WithQueuePrefix(prefix string) Scheduler {
@@ -76,9 +82,13 @@ func (p *prefixedScheduler) WithQueuePrefix(prefix string) Scheduler {
7682
type RootScheduler struct {
7783
workAvailable chan bool
7884
lock sync.Mutex
79-
queue []queueJob
85+
queue []Job
8086
active map[string]string // queue -> job id
87+
activeCost int
88+
activeCosts map[string]int // queue -> cost of running job
89+
maxCost int
8190
activeClones int
91+
activeCloneQueues map[string]bool
8292
maxCloneConcurrency int
8393
cancel context.CancelFunc
8494
store ScheduleStore
@@ -111,16 +121,22 @@ func New(ctx context.Context, config Config) (*RootScheduler, error) {
111121
}
112122
maxClones := config.MaxCloneConcurrency
113123
if maxClones == 0 && config.Concurrency > 1 {
114-
// Default: reserve at least half the workers for non-clone jobs.
115124
maxClones = max(1, config.Concurrency/2)
116125
}
126+
maxCost := config.MaxCost
127+
if maxCost == 0 {
128+
maxCost = config.Concurrency * 4
129+
}
117130
m, err := newSchedulerMetrics()
118131
if err != nil {
119132
return nil, errors.Wrap(err, "create scheduler metrics")
120133
}
121134
q := &RootScheduler{
122135
workAvailable: make(chan bool, 1024),
123136
active: make(map[string]string),
137+
activeCosts: make(map[string]int),
138+
activeCloneQueues: make(map[string]bool),
139+
maxCost: maxCost,
124140
maxCloneConcurrency: maxClones,
125141
store: store,
126142
metrics: m,
@@ -147,31 +163,33 @@ func (q *RootScheduler) WithQueuePrefix(prefix string) Scheduler {
147163
}
148164
}
149165

150-
func (q *RootScheduler) Submit(queue, id string, run func(ctx context.Context) error) {
166+
func (q *RootScheduler) Submit(job Job) {
151167
q.lock.Lock()
152-
q.queue = append(q.queue, queueJob{queue: queue, id: id, run: run})
168+
q.queue = append(q.queue, job)
153169
q.metrics.queueDepth.Record(context.Background(), int64(len(q.queue)))
154170
q.lock.Unlock()
155171
q.workAvailable <- true
156172
}
157173

158-
func (q *RootScheduler) SubmitPeriodicJob(queue, id string, interval time.Duration, run func(ctx context.Context) error) {
159-
key := jobKey(queue, id)
174+
func (q *RootScheduler) SubmitPeriodicJob(job Job, interval time.Duration) {
175+
key := jobKey(job.Queue, job.ID)
160176
delay := q.periodicDelay(key, interval)
177+
origRun := job.Run
161178
submit := func() {
162-
q.Submit(queue, id, func(ctx context.Context) error {
163-
err := run(ctx)
179+
job.Run = func(ctx context.Context) error {
180+
err := origRun(ctx)
164181
if q.store != nil {
165182
if storeErr := q.store.SetLastRun(key, time.Now()); storeErr != nil {
166183
logging.FromContext(ctx).WarnContext(ctx, "Failed to record job last run", "key", key, "error", storeErr)
167184
}
168185
}
169186
go func() {
170187
time.Sleep(interval)
171-
q.SubmitPeriodicJob(queue, id, interval, run)
188+
q.SubmitPeriodicJob(job, interval)
172189
}()
173190
return errors.WithStack(err)
174-
})
191+
}
192+
q.Submit(job)
175193
}
176194
if delay <= 0 {
177195
submit()
@@ -210,7 +228,7 @@ func (q *RootScheduler) worker(ctx context.Context, id int) {
210228
if !ok {
211229
continue
212230
}
213-
jobAttrs := attribute.String("job.type", jobType(job.id))
231+
jobAttrs := attribute.String("job.type", jobType(job.ID))
214232
start := time.Now()
215233
logger.InfoContext(ctx, "Starting job", "job", job)
216234
err := job.run(ctx)
@@ -225,7 +243,7 @@ func (q *RootScheduler) worker(ctx context.Context, id int) {
225243
statusAttr := attribute.String("status", status)
226244
q.metrics.jobsTotal.Add(ctx, 1, metric.WithAttributes(jobAttrs, statusAttr))
227245
q.metrics.jobDuration.Record(ctx, elapsed.Seconds(), metric.WithAttributes(jobAttrs, statusAttr))
228-
q.markQueueInactive(job.queue)
246+
q.markQueueInactive(job.Queue)
229247
q.workAvailable <- true
230248
}
231249
}
@@ -252,46 +270,49 @@ func jobType(id string) string {
252270
func (q *RootScheduler) markQueueInactive(queue string) {
253271
q.lock.Lock()
254272
defer q.lock.Unlock()
255-
if isCloneJob(q.active[queue]) {
273+
q.activeCost -= q.activeCosts[queue]
274+
delete(q.activeCosts, queue)
275+
if q.activeCloneQueues[queue] {
256276
q.activeClones--
277+
delete(q.activeCloneQueues, queue)
257278
}
258279
delete(q.active, queue)
259280
q.recordGaugesLocked()
260281
}
261282

262-
// isCloneJob returns true for job IDs that represent long-running clone operations
263-
// which should be subject to concurrency limits.
264-
func isCloneJob(id string) bool {
265-
return strings.HasSuffix(id, "clone") || strings.HasSuffix(id, "deferred-mirror-restore")
266-
}
267-
268-
// Take the next job for any queue that is not already running a job.
269-
func (q *RootScheduler) takeNextJob() (queueJob, bool) {
283+
func (q *RootScheduler) takeNextJob() (Job, bool) {
270284
q.lock.Lock()
271285
defer q.lock.Unlock()
272286
for i, job := range q.queue {
273-
if _, active := q.active[job.queue]; active {
287+
if _, active := q.active[job.Queue]; active {
288+
continue
289+
}
290+
if q.activeCost > 0 && q.activeCost+job.Cost > q.maxCost {
274291
continue
275292
}
276-
if q.maxCloneConcurrency > 0 && isCloneJob(job.id) && q.activeClones >= q.maxCloneConcurrency {
293+
if job.Clone && q.maxCloneConcurrency > 0 && q.activeClones >= q.maxCloneConcurrency {
277294
continue
278295
}
279296
q.queue = append(q.queue[:i], q.queue[i+1:]...)
280297
q.workAvailable <- true
281-
q.active[job.queue] = job.id
282-
if isCloneJob(job.id) {
298+
q.active[job.Queue] = job.ID
299+
q.activeCost += job.Cost
300+
q.activeCosts[job.Queue] = job.Cost
301+
if job.Clone {
283302
q.activeClones++
303+
q.activeCloneQueues[job.Queue] = true
284304
}
285305
q.recordGaugesLocked()
286306
return job, true
287307
}
288-
return queueJob{}, false
308+
return Job{}, false
289309
}
290310

291311
// recordGaugesLocked updates gauge metrics. Must be called with q.lock held.
292312
func (q *RootScheduler) recordGaugesLocked() {
293313
ctx := context.Background()
294314
q.metrics.queueDepth.Record(ctx, int64(len(q.queue)))
295315
q.metrics.activeWorkers.Record(ctx, int64(len(q.active)))
316+
q.metrics.activeCost.Record(ctx, int64(q.activeCost))
296317
q.metrics.activeClones.Record(ctx, int64(q.activeClones))
297318
}

0 commit comments

Comments
 (0)