diff --git a/AGENTS.md b/AGENTS.md index e242ab9a..d2d4b511 100644 --- a/AGENTS.md +++ b/AGENTS.md @@ -16,9 +16,10 @@ keep a local bare checkout of upstream Git repositories and serve packs from the The codebase uses Hermit to manage toolchains. It is written in Go, and uses Just for running common tasks. -Only add comments for relatively large blocks of code, 20+ lines or more, and ONLY if it is not obvious what the code is +ONLY add comments for relatively large blocks of code, 20+ lines or more, and ONLY if it is not obvious what the code is doing. ALWAYS add Go-style documentation comments for public variables/types/functions. If you do add comments, the -comments should explain WHY something is happening, not WHAT is happening. +comments MUST explain WHY something is happening, not WHAT is happening. NEVER add comments to top-level tests. BE +SUCCINCT WHEN COMMENTING. Functions should return errors, not log them internally. Logging belongs at the call site so callers retain control over how failures are reported and handled. diff --git a/cmd/cachewd/main.go b/cmd/cachewd/main.go index 625d0121..0e95d893 100644 --- a/cmd/cachewd/main.go +++ b/cmd/cachewd/main.go @@ -179,6 +179,10 @@ func main() { stopSignals() } + // Stop scheduling new work once shutdown begins; cancelScheduler below + // performs the hard teardown after in-flight jobs drain. + drainSchedulerIntake(schedulerProvider) + gracefulShutdown(ctx, logger, server, &shuttingDown, globalConfig.ShutdownReadinessDelay, globalConfig.ShutdownTimeout) cancelScheduler() @@ -211,6 +215,14 @@ func gracefulShutdown( } } +func drainSchedulerIntake(provider jobscheduler.Provider) { + scheduler, err := provider() + if err != nil { + return + } + scheduler.Drain() +} + const schedulerDrainTimeout = 10 * time.Second func drainScheduler(ctx context.Context, logger *slog.Logger, provider jobscheduler.Provider) { diff --git a/internal/jobscheduler/jobs.go b/internal/jobscheduler/jobs.go index 123b08c5..ceee18dd 100644 --- a/internal/jobscheduler/jobs.go +++ b/internal/jobscheduler/jobs.go @@ -78,15 +78,18 @@ type RootScheduler struct { cond *sync.Cond lock sync.Mutex done bool + draining bool queue []queueJob active map[string]string // queue -> job id activeClones int maxCloneConcurrency int - // ctx is cancelled when the scheduler is shutting down. Periodic re-arm - // goroutines select on it so they exit cleanly instead of submitting to a - // dead scheduler. - ctx context.Context //nolint:containedctx - cancel context.CancelFunc + // ctx is cancelled when the scheduler is torn down; periodic re-arm + // goroutines select on it so they exit instead of submitting to a dead + // scheduler. + ctx context.Context //nolint:containedctx + cancel context.CancelFunc + // drain is closed by Drain so intake stops while workers keep running. + drain chan struct{} wg sync.WaitGroup store ScheduleStore metrics *schedulerMetrics @@ -127,6 +130,7 @@ func New(ctx context.Context, config Config) (*RootScheduler, error) { maxCloneConcurrency: maxClones, store: store, metrics: m, + drain: make(chan struct{}), } q.cond = sync.NewCond(&q.lock) ctx, cancel := context.WithCancel(ctx) @@ -147,6 +151,19 @@ func New(ctx context.Context, config Config) (*RootScheduler, error) { return q, nil } +// Drain stops the scheduler accepting new submissions and re-arming periodic +// jobs, while leaving workers running to finish in-flight and queued jobs. The +// worker pool is torn down later by cancelling the context passed to New. +func (q *RootScheduler) Drain() { + q.lock.Lock() + defer q.lock.Unlock() + if q.draining { + return + } + q.draining = true + close(q.drain) +} + func (q *RootScheduler) Close() error { if q.store != nil { return errors.WithStack(q.store.Close()) @@ -163,7 +180,7 @@ func (q *RootScheduler) WithQueuePrefix(prefix string) Scheduler { func (q *RootScheduler) Submit(queue, id string, run func(ctx context.Context) error) { q.lock.Lock() - if q.done { + if q.done || q.draining { q.lock.Unlock() return } @@ -174,7 +191,7 @@ func (q *RootScheduler) Submit(queue, id string, run func(ctx context.Context) e } func (q *RootScheduler) SubmitPeriodicJob(queue, id string, interval time.Duration, run func(ctx context.Context) error) { - if q.ctx.Err() != nil { + if q.ctx.Err() != nil || q.isDraining() { return } key := jobKey(queue, id) @@ -204,14 +221,22 @@ func (q *RootScheduler) SubmitPeriodicJob(queue, id string, interval time.Durati go q.sleepThenSubmit(delay, submit) } -// sleepThenSubmit waits for d, then runs fn — unless the scheduler is -// shutting down, in which case it returns immediately. +func (q *RootScheduler) isDraining() bool { + q.lock.Lock() + defer q.lock.Unlock() + return q.draining +} + +// sleepThenSubmit waits for d, then runs fn — unless the scheduler is draining +// or being torn down, in which case it returns immediately. func (q *RootScheduler) sleepThenSubmit(d time.Duration, fn func()) { timer := time.NewTimer(d) defer timer.Stop() select { case <-q.ctx.Done(): return + case <-q.drain: + return case <-timer.C: fn() } diff --git a/internal/jobscheduler/jobs_test.go b/internal/jobscheduler/jobs_test.go index 75516b4c..17cae747 100644 --- a/internal/jobscheduler/jobs_test.go +++ b/internal/jobscheduler/jobs_test.go @@ -335,6 +335,66 @@ func TestJobSchedulerSubmitDroppedAfterShutdown(t *testing.T) { assert.False(t, executed.Load(), "submissions after shutdown should be dropped") } +func TestJobSchedulerDrainStopsNewWork(t *testing.T) { + _, ctx := logging.Configure(context.Background(), logging.Config{Level: slog.LevelError}) + ctx, cancel := context.WithCancel(ctx) + defer cancel() + + scheduler, err := jobscheduler.New(ctx, jobscheduler.Config{Concurrency: 2}) + assert.NoError(t, err) + t.Cleanup(func() { scheduler.Close() }) + + var periodicExecutions atomic.Int32 + scheduler.SubmitPeriodicJob("queue1", "periodic", 50*time.Millisecond, func(_ context.Context) error { + periodicExecutions.Add(1) + return nil + }) + eventually(t, time.Second, func() bool { return periodicExecutions.Load() >= 2 }, + "periodic job should fire before drain") + + scheduler.Drain() + time.Sleep(150 * time.Millisecond) + before := periodicExecutions.Load() + time.Sleep(300 * time.Millisecond) + assert.Equal(t, before, periodicExecutions.Load(), + "periodic job should not re-arm after drain") + + var executed atomic.Bool + scheduler.Submit("queue2", "post-drain", func(_ context.Context) error { + executed.Store(true) + return nil + }) + time.Sleep(100 * time.Millisecond) + assert.False(t, executed.Load(), "submissions after drain should be dropped") +} + +func TestJobSchedulerDrainLetsInFlightFinish(t *testing.T) { + _, ctx := logging.Configure(context.Background(), logging.Config{Level: slog.LevelError}) + ctx, cancel := context.WithCancel(ctx) + defer cancel() + + scheduler, err := jobscheduler.New(ctx, jobscheduler.Config{Concurrency: 2}) + assert.NoError(t, err) + t.Cleanup(func() { scheduler.Close() }) + + started := make(chan struct{}) + release := make(chan struct{}) + var finished atomic.Bool + scheduler.Submit("queue1", "in-flight", func(_ context.Context) error { + close(started) + <-release + finished.Store(true) + return nil + }) + + <-started + scheduler.Drain() + close(release) + + eventually(t, time.Second, finished.Load, + "in-flight job should finish after drain") +} + // TestJobSchedulerSurvivesParentCancel verifies the shutdown ordering fix: // when the scheduler is created with context.WithoutCancel, cancelling the // parent (simulating SIGTERM) does NOT kill workers. Jobs submitted after the