Skip to content

Commit 2c636d7

Browse files
authored
fix(jobscheduler): stop scheduling new jobs on shutdown (#365)
The scheduler's context is decoupled from SIGTERM so workers can finish in-flight jobs during the HTTP drain window, but this meant new jobs and periodic re-arms kept being scheduled throughout shutdown. Add a Drain phase that halts new submissions and periodic re-arming as soon as shutdown begins, while leaving workers running until the context is cancelled for the final teardown.
1 parent 8f32327 commit 2c636d7

4 files changed

Lines changed: 109 additions & 11 deletions

File tree

AGENTS.md

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -16,9 +16,10 @@ keep a local bare checkout of upstream Git repositories and serve packs from the
1616

1717
The codebase uses Hermit to manage toolchains. It is written in Go, and uses Just for running common tasks.
1818

19-
Only add comments for relatively large blocks of code, 20+ lines or more, and ONLY if it is not obvious what the code is
19+
ONLY add comments for relatively large blocks of code, 20+ lines or more, and ONLY if it is not obvious what the code is
2020
doing. ALWAYS add Go-style documentation comments for public variables/types/functions. If you do add comments, the
21-
comments should explain WHY something is happening, not WHAT is happening.
21+
comments MUST explain WHY something is happening, not WHAT is happening. NEVER add comments to top-level tests. BE
22+
SUCCINCT WHEN COMMENTING.
2223

2324
Functions should return errors, not log them internally. Logging belongs at the call site so callers retain control over
2425
how failures are reported and handled.

cmd/cachewd/main.go

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -179,6 +179,10 @@ func main() {
179179
stopSignals()
180180
}
181181

182+
// Stop scheduling new work once shutdown begins; cancelScheduler below
183+
// performs the hard teardown after in-flight jobs drain.
184+
drainSchedulerIntake(schedulerProvider)
185+
182186
gracefulShutdown(ctx, logger, server, &shuttingDown, globalConfig.ShutdownReadinessDelay, globalConfig.ShutdownTimeout)
183187

184188
cancelScheduler()
@@ -211,6 +215,14 @@ func gracefulShutdown(
211215
}
212216
}
213217

218+
func drainSchedulerIntake(provider jobscheduler.Provider) {
219+
scheduler, err := provider()
220+
if err != nil {
221+
return
222+
}
223+
scheduler.Drain()
224+
}
225+
214226
const schedulerDrainTimeout = 10 * time.Second
215227

216228
func drainScheduler(ctx context.Context, logger *slog.Logger, provider jobscheduler.Provider) {

internal/jobscheduler/jobs.go

Lines changed: 34 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -78,15 +78,18 @@ type RootScheduler struct {
7878
cond *sync.Cond
7979
lock sync.Mutex
8080
done bool
81+
draining bool
8182
queue []queueJob
8283
active map[string]string // queue -> job id
8384
activeClones int
8485
maxCloneConcurrency int
85-
// ctx is cancelled when the scheduler is shutting down. Periodic re-arm
86-
// goroutines select on it so they exit cleanly instead of submitting to a
87-
// dead scheduler.
88-
ctx context.Context //nolint:containedctx
89-
cancel context.CancelFunc
86+
// ctx is cancelled when the scheduler is torn down; periodic re-arm
87+
// goroutines select on it so they exit instead of submitting to a dead
88+
// scheduler.
89+
ctx context.Context //nolint:containedctx
90+
cancel context.CancelFunc
91+
// drain is closed by Drain so intake stops while workers keep running.
92+
drain chan struct{}
9093
wg sync.WaitGroup
9194
store ScheduleStore
9295
metrics *schedulerMetrics
@@ -127,6 +130,7 @@ func New(ctx context.Context, config Config) (*RootScheduler, error) {
127130
maxCloneConcurrency: maxClones,
128131
store: store,
129132
metrics: m,
133+
drain: make(chan struct{}),
130134
}
131135
q.cond = sync.NewCond(&q.lock)
132136
ctx, cancel := context.WithCancel(ctx)
@@ -147,6 +151,19 @@ func New(ctx context.Context, config Config) (*RootScheduler, error) {
147151
return q, nil
148152
}
149153

154+
// Drain stops the scheduler accepting new submissions and re-arming periodic
155+
// jobs, while leaving workers running to finish in-flight and queued jobs. The
156+
// worker pool is torn down later by cancelling the context passed to New.
157+
func (q *RootScheduler) Drain() {
158+
q.lock.Lock()
159+
defer q.lock.Unlock()
160+
if q.draining {
161+
return
162+
}
163+
q.draining = true
164+
close(q.drain)
165+
}
166+
150167
func (q *RootScheduler) Close() error {
151168
if q.store != nil {
152169
return errors.WithStack(q.store.Close())
@@ -163,7 +180,7 @@ func (q *RootScheduler) WithQueuePrefix(prefix string) Scheduler {
163180

164181
func (q *RootScheduler) Submit(queue, id string, run func(ctx context.Context) error) {
165182
q.lock.Lock()
166-
if q.done {
183+
if q.done || q.draining {
167184
q.lock.Unlock()
168185
return
169186
}
@@ -174,7 +191,7 @@ func (q *RootScheduler) Submit(queue, id string, run func(ctx context.Context) e
174191
}
175192

176193
func (q *RootScheduler) SubmitPeriodicJob(queue, id string, interval time.Duration, run func(ctx context.Context) error) {
177-
if q.ctx.Err() != nil {
194+
if q.ctx.Err() != nil || q.isDraining() {
178195
return
179196
}
180197
key := jobKey(queue, id)
@@ -204,14 +221,22 @@ func (q *RootScheduler) SubmitPeriodicJob(queue, id string, interval time.Durati
204221
go q.sleepThenSubmit(delay, submit)
205222
}
206223

207-
// sleepThenSubmit waits for d, then runs fn — unless the scheduler is
208-
// shutting down, in which case it returns immediately.
224+
func (q *RootScheduler) isDraining() bool {
225+
q.lock.Lock()
226+
defer q.lock.Unlock()
227+
return q.draining
228+
}
229+
230+
// sleepThenSubmit waits for d, then runs fn — unless the scheduler is draining
231+
// or being torn down, in which case it returns immediately.
209232
func (q *RootScheduler) sleepThenSubmit(d time.Duration, fn func()) {
210233
timer := time.NewTimer(d)
211234
defer timer.Stop()
212235
select {
213236
case <-q.ctx.Done():
214237
return
238+
case <-q.drain:
239+
return
215240
case <-timer.C:
216241
fn()
217242
}

internal/jobscheduler/jobs_test.go

Lines changed: 60 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -335,6 +335,66 @@ func TestJobSchedulerSubmitDroppedAfterShutdown(t *testing.T) {
335335
assert.False(t, executed.Load(), "submissions after shutdown should be dropped")
336336
}
337337

338+
func TestJobSchedulerDrainStopsNewWork(t *testing.T) {
339+
_, ctx := logging.Configure(context.Background(), logging.Config{Level: slog.LevelError})
340+
ctx, cancel := context.WithCancel(ctx)
341+
defer cancel()
342+
343+
scheduler, err := jobscheduler.New(ctx, jobscheduler.Config{Concurrency: 2})
344+
assert.NoError(t, err)
345+
t.Cleanup(func() { scheduler.Close() })
346+
347+
var periodicExecutions atomic.Int32
348+
scheduler.SubmitPeriodicJob("queue1", "periodic", 50*time.Millisecond, func(_ context.Context) error {
349+
periodicExecutions.Add(1)
350+
return nil
351+
})
352+
eventually(t, time.Second, func() bool { return periodicExecutions.Load() >= 2 },
353+
"periodic job should fire before drain")
354+
355+
scheduler.Drain()
356+
time.Sleep(150 * time.Millisecond)
357+
before := periodicExecutions.Load()
358+
time.Sleep(300 * time.Millisecond)
359+
assert.Equal(t, before, periodicExecutions.Load(),
360+
"periodic job should not re-arm after drain")
361+
362+
var executed atomic.Bool
363+
scheduler.Submit("queue2", "post-drain", func(_ context.Context) error {
364+
executed.Store(true)
365+
return nil
366+
})
367+
time.Sleep(100 * time.Millisecond)
368+
assert.False(t, executed.Load(), "submissions after drain should be dropped")
369+
}
370+
371+
func TestJobSchedulerDrainLetsInFlightFinish(t *testing.T) {
372+
_, ctx := logging.Configure(context.Background(), logging.Config{Level: slog.LevelError})
373+
ctx, cancel := context.WithCancel(ctx)
374+
defer cancel()
375+
376+
scheduler, err := jobscheduler.New(ctx, jobscheduler.Config{Concurrency: 2})
377+
assert.NoError(t, err)
378+
t.Cleanup(func() { scheduler.Close() })
379+
380+
started := make(chan struct{})
381+
release := make(chan struct{})
382+
var finished atomic.Bool
383+
scheduler.Submit("queue1", "in-flight", func(_ context.Context) error {
384+
close(started)
385+
<-release
386+
finished.Store(true)
387+
return nil
388+
})
389+
390+
<-started
391+
scheduler.Drain()
392+
close(release)
393+
394+
eventually(t, time.Second, finished.Load,
395+
"in-flight job should finish after drain")
396+
}
397+
338398
// TestJobSchedulerSurvivesParentCancel verifies the shutdown ordering fix:
339399
// when the scheduler is created with context.WithoutCancel, cancelling the
340400
// parent (simulating SIGTERM) does NOT kill workers. Jobs submitted after the

0 commit comments

Comments
 (0)