Skip to content

Commit 14e8f8a

Browse files
alecthomasclaude
andcommitted
feat(scheduler): add admission cost, dedup, and RunSync cancellation
Add per-job admission cost floor to prevent volume-based DoS by charging a minimum fairness cost regardless of actual job duration. Deduplicate jobs by (type, id): Submit silently drops duplicates, RunSync coalesces callers onto the existing job. Sync and async jobs can be mixed on the same key. When all RunSync waiters cancel and no Submit owns the job, the job's context is cancelled. Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
1 parent e551e65 commit 14e8f8a

5 files changed

Lines changed: 393 additions & 75 deletions

File tree

internal/scheduler/README.md

Lines changed: 16 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -20,7 +20,9 @@ All work — foreground and background — goes through a single scheduler. The
2020

2121
**Cost**: A `time.Duration` representing the relative system impact of a job. The scheduler automatically learns the cost of each `(job_type, job_id)` pair using an exponential moving average of observed execution time. On first encounter, a small default (1 second) is used. Callers never specify cost explicitly.
2222

23-
**Accumulated cost**: A running total of cost consumed per fairness key. Every time a job is admitted, its estimated cost is added to its fairness key's accumulated cost. The scheduler always picks the job whose fairness key has the lowest accumulated cost — whoever has consumed the least goes next.
23+
**Admission cost**: A configurable minimum fairness cost charged per job admission (`Config.AdmissionCost`, default 5 seconds). At admission time, the fairness charge is `max(estimated_cost, admission_cost)`. This prevents a client from monopolising the scheduler by submitting many cheap jobs — even a job that completes in 100ms still costs 5 seconds of fairness budget, so mass-submission is expensive in fairness terms without affecting actual execution time.
24+
25+
**Accumulated cost**: A running total of cost consumed per fairness key. Every time a job is admitted, the admission cost (the max of estimated and configured minimum) is added to its fairness key's accumulated cost. The scheduler always picks the job whose fairness key has the lowest accumulated cost — whoever has consumed the least goes next.
2426

2527
**Fairness key**: An opaque string on the job, populated by the caller. For foreground jobs, this is typically the client IP or identity. For background jobs, this is empty. The scheduler doesn't know what it represents — it just uses it for ordering.
2628

@@ -159,6 +161,17 @@ scheduler.Submit(
159161

160162
Both enter the same pending queue and the same admission logic. `RunSync` blocks on a completion signal before returning to the caller.
161163

164+
### Deduplication
165+
166+
Jobs are deduplicated by `(job_type, job_id)`. If a job with the same key is already pending or running:
167+
168+
- **`Submit`**: the call is silently dropped. The existing job's `fn` will be used.
169+
- **`RunSync`**: the caller coalesces onto the existing job and receives its result when it completes. Multiple callers can wait on the same job simultaneously.
170+
171+
Sync and async jobs can be mixed for the same `(job_type, job_id)`. A `RunSync` onto an active `Submit` job attaches a waiter. A `Submit` onto an active `RunSync` job marks it as submitted so it survives waiter cancellation.
172+
173+
This means if ten clients request the same clone concurrently, only one clone runs — all ten callers receive the result. If a coalesced `RunSync` caller's context is cancelled, that caller is detached but the job continues for the remaining waiters. When **all** waiters cancel and no `Submit` created the job, the job itself is cancelled via its context — both pending and running jobs are stopped.
174+
162175
## Dispatch Algorithm
163176

164177
The entire scheduling algorithm:
@@ -174,7 +187,7 @@ for each job in sorted order:
174187
if type_running_count >= type_slots → skip
175188
if any running job has same job_id AND same non-empty conflict_group → skip
176189
admit job
177-
estimated_cost = cost_estimates[(job_type, job_id)] or 1s
190+
estimated_cost = max(cost_estimates[(job_type, job_id)] or 1s, config.admission_cost)
178191
accumulated_cost[fairness_key] += estimated_cost
179192
```
180193

@@ -194,6 +207,7 @@ Key properties of this algorithm:
194207
- **Per-type limits**: within a tier, individual job types can be capped to a fraction of the tier's allocation, preventing expensive operations from monopolising the tier.
195208
- **Fairness**: within a priority level, jobs from the fairness key with the lowest accumulated cost go first. A client that has consumed a lot of capacity yields to one that has consumed little.
196209
- **Cost-awareness**: expensive jobs advance accumulated cost faster, so they naturally yield to cheaper work from other clients. A `linux.git` clone that takes 60 seconds advances the client's accumulated cost by ~60s, while a `git.git` clone that takes 5 seconds advances it by ~5s.
210+
- **Volume-awareness**: the admission cost floor ensures that submitting many cheap jobs is still expensive in fairness terms. A client submitting 1000 trivial jobs accumulates at least `1000 * admission_cost`, causing it to yield to clients that have submitted fewer jobs.
197211
- **Adaptive**: the scheduler automatically learns the cost of each `(job_type, job_id)` pair. No manual cost tuning required. After one execution, estimates are already meaningful.
198212
- **Conflict safety**: conflicting jobs on the same resource stay in the pending queue, not consuming concurrency slots while they wait.
199213
- **No head-of-line blocking**: if the next job by ordering is blocked (conflict or concurrency limit), the scheduler skips it and admits the next admissible job.

internal/scheduler/scheduler.go

Lines changed: 96 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -75,6 +75,15 @@ type Config struct {
7575
CostTTL time.Duration `hcl:"cost-ttl,optional" help:"TTL for cost estimate entries." default:"1h"`
7676
FairnessTTL time.Duration `hcl:"fairness-ttl,optional" help:"TTL for accumulated cost entries." default:"10m"`
7777
CleanupInterval time.Duration `hcl:"cleanup-interval,optional" help:"How often to run TTL cleanup." default:"1m"`
78+
// AdmissionCost is the minimum fairness cost charged per job admission, regardless of the
79+
// job's estimated duration. This prevents a client from monopolising the scheduler by
80+
// submitting many cheap jobs.
81+
AdmissionCost time.Duration `hcl:"admission-cost,optional" help:"Minimum fairness cost per job admission." default:"5s"`
82+
}
83+
84+
type dedupKey struct {
85+
jobType JobType
86+
jobID string
7887
}
7988

8089
type job struct {
@@ -83,7 +92,10 @@ type job struct {
8392
fairnessKey string
8493
fn func(ctx context.Context) error
8594
arrivalTime time.Time
86-
done chan error // non-nil for RunSync
95+
submitted bool // true if created or referenced by Submit
96+
waiters []chan error // RunSync callers waiting for the result
97+
ctx context.Context // context passed to fn; cancellable for RunSync jobs
98+
cancel context.CancelFunc // non-nil for RunSync jobs
8799
}
88100

89101
func (j *job) String() string { return j.jobType.String() + ":" + j.jobID }
@@ -115,6 +127,7 @@ type Scheduler struct {
115127
types map[JobType]JobTypeConfig
116128
pending []*job
117129
running []*runningJob
130+
active map[dedupKey]*job // all pending + running jobs for dedup
118131
fairness map[string]*fairnessEntry
119132
costs map[costKey]*costEntry
120133
config Config
@@ -142,6 +155,7 @@ func New(ctx context.Context, config Config, ns *metadatadb.Namespace) (*Schedul
142155
s := &Scheduler{
143156
priorities: make(map[Priority]bool),
144157
types: make(map[JobType]JobTypeConfig),
158+
active: make(map[dedupKey]*job),
145159
fairness: make(map[string]*fairnessEntry),
146160
costs: make(map[costKey]*costEntry),
147161
lastRunsLocal: make(map[string]time.Time),
@@ -199,55 +213,75 @@ func (s *Scheduler) validateType(jt JobType) {
199213
}
200214

201215
// Submit queues a background job for async execution. Returns immediately.
216+
// If a job with the same (type, id) is already pending or running, the
217+
// submission is silently deduplicated. Panics if a RunSync job with the same
218+
// (type, id) is active — sync and async jobs must not be mixed.
202219
func (s *Scheduler) Submit(jobType JobType, jobID string, fn func(ctx context.Context) error) {
203220
s.mu.Lock()
204221
s.validateType(jobType)
205-
s.pending = append(s.pending, &job{
222+
key := dedupKey{jobType, jobID}
223+
if existing, ok := s.active[key]; ok {
224+
existing.submitted = true
225+
s.mu.Unlock()
226+
return
227+
}
228+
j := &job{
206229
jobType: jobType,
207230
jobID: jobID,
208231
fn: fn,
209232
arrivalTime: s.now(),
210-
})
233+
submitted: true,
234+
}
235+
s.active[key] = j
236+
s.pending = append(s.pending, j)
211237
s.mu.Unlock()
212238
s.signal()
213239
}
214240

215241
// RunSync submits a foreground job and blocks until it completes or ctx is
216-
// cancelled. The fn receives a context that is cancelled when either the
217-
// caller's ctx or the scheduler's context is done.
242+
// cancelled. If a job with the same (type, id) is already pending or running,
243+
// the caller coalesces onto the existing job and receives its result. When all
244+
// coalesced callers cancel, the job itself is cancelled. Panics if a Submit
245+
// job with the same (type, id) is active — sync and async jobs must not be mixed.
218246
func (s *Scheduler) RunSync(ctx context.Context, jobType JobType, jobID, fairnessKey string, fn func(ctx context.Context) error) error {
219-
jobCtx, jobCancel := context.WithCancel(ctx)
220-
stop := context.AfterFunc(s.ctx, jobCancel)
247+
done := make(chan error, 1)
221248

222249
s.mu.Lock()
223250
s.validateType(jobType)
224-
s.mu.Unlock()
225-
226-
done := make(chan error, 1)
251+
key := dedupKey{jobType, jobID}
252+
if existing, ok := s.active[key]; ok {
253+
existing.waiters = append(existing.waiters, done)
254+
s.mu.Unlock()
255+
return s.awaitDone(ctx, existing, done)
256+
}
257+
jobCtx, jobCancel := context.WithCancel(s.ctx)
227258
j := &job{
228259
jobType: jobType,
229260
jobID: jobID,
230261
fairnessKey: fairnessKey,
231-
fn: func(_ context.Context) error { return fn(jobCtx) },
262+
fn: fn,
232263
arrivalTime: s.now(),
233-
done: done,
264+
waiters: []chan error{done},
265+
ctx: jobCtx,
266+
cancel: jobCancel,
234267
}
235-
s.mu.Lock()
268+
s.active[key] = j
236269
s.pending = append(s.pending, j)
237270
s.mu.Unlock()
238271
s.signal()
239272

273+
return s.awaitDone(ctx, j, done)
274+
}
275+
276+
func (s *Scheduler) awaitDone(ctx context.Context, j *job, done chan error) error {
240277
select {
241278
case err := <-done:
242-
stop()
243-
jobCancel()
244279
return err
245280
case <-ctx.Done():
246281
s.mu.Lock()
247-
s.removePendingLocked(j)
282+
s.removeWaiterLocked(j, done)
283+
s.maybeRemoveJobLocked(j)
248284
s.mu.Unlock()
249-
stop()
250-
jobCancel()
251285
return errors.WithStack(ctx.Err())
252286
}
253287
}
@@ -383,7 +417,11 @@ func (s *Scheduler) tierSlotsLocked(weight float64) int {
383417
func (s *Scheduler) executeJob(j *job) {
384418
start := s.now()
385419
s.logger.InfoContext(s.ctx, "Starting job", "job", j)
386-
err := j.fn(s.ctx)
420+
fnCtx := s.ctx
421+
if j.ctx != nil {
422+
fnCtx = j.ctx
423+
}
424+
err := j.fn(fnCtx)
387425
elapsed := s.now().Sub(start)
388426

389427
if err != nil {
@@ -395,11 +433,14 @@ func (s *Scheduler) executeJob(j *job) {
395433
s.mu.Lock()
396434
s.updateCostEstimateLocked(j.jobType, j.jobID, elapsed)
397435
s.removeFromRunningLocked(j)
436+
delete(s.active, dedupKey{j.jobType, j.jobID})
437+
waiters := j.waiters
438+
j.waiters = nil
398439
s.recordMetricsLocked()
399440
s.mu.Unlock()
400441

401-
if j.done != nil {
402-
j.done <- err
442+
for _, w := range waiters {
443+
w <- err
403444
}
404445
s.signal()
405446
}
@@ -450,10 +491,11 @@ func (s *Scheduler) hasConflictLocked(j *job) bool {
450491
const defaultCost = time.Second
451492

452493
func (s *Scheduler) estimatedCostLocked(j *job) time.Duration {
494+
est := defaultCost
453495
if entry, ok := s.costs[costKey{j.jobType, j.jobID}]; ok {
454-
return entry.estimate
496+
est = entry.estimate
455497
}
456-
return defaultCost
498+
return max(est, s.config.AdmissionCost)
457499
}
458500

459501
func (s *Scheduler) updateCostEstimateLocked(jt JobType, jobID string, elapsed time.Duration) {
@@ -530,6 +572,37 @@ func (s *Scheduler) removePendingLocked(j *job) {
530572
s.pending = slices.DeleteFunc(s.pending, func(pj *job) bool { return pj == j })
531573
}
532574

575+
func (s *Scheduler) removeWaiterLocked(j *job, done chan error) {
576+
j.waiters = slices.DeleteFunc(j.waiters, func(w chan error) bool { return w == done })
577+
}
578+
579+
func (s *Scheduler) isRunningLocked(j *job) bool {
580+
for _, rj := range s.running {
581+
if rj.job == j {
582+
return true
583+
}
584+
}
585+
return false
586+
}
587+
588+
// maybeRemoveJobLocked is called when a RunSync waiter is removed. If no
589+
// waiters remain it cancels the job's context. Pending jobs are also removed
590+
// from the queue and active map; running jobs are left for executeJob to
591+
// clean up after the fn returns.
592+
func (s *Scheduler) maybeRemoveJobLocked(j *job) {
593+
if len(j.waiters) > 0 || j.submitted {
594+
return
595+
}
596+
if j.cancel != nil {
597+
j.cancel()
598+
}
599+
if s.isRunningLocked(j) {
600+
return
601+
}
602+
s.removePendingLocked(j)
603+
delete(s.active, dedupKey{j.jobType, j.jobID})
604+
}
605+
533606
// --- TTL cleanup ---
534607

535608
func (s *Scheduler) cleanupLoop() {

0 commit comments

Comments
 (0)