Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 3 additions & 2 deletions AGENTS.md
Original file line number Diff line number Diff line change
Expand Up @@ -16,9 +16,10 @@ keep a local bare checkout of upstream Git repositories and serve packs from the

The codebase uses Hermit to manage toolchains. It is written in Go, and uses Just for running common tasks.

Only add comments for relatively large blocks of code, 20+ lines or more, and ONLY if it is not obvious what the code is
ONLY add comments for relatively large blocks of code, 20+ lines or more, and ONLY if it is not obvious what the code is
doing. ALWAYS add Go-style documentation comments for public variables/types/functions. If you do add comments, the
comments should explain WHY something is happening, not WHAT is happening.
comments MUST explain WHY something is happening, not WHAT is happening. NEVER add comments to top-level tests. BE
SUCCINCT WHEN COMMENTING.

Functions should return errors, not log them internally. Logging belongs at the call site so callers retain control over
how failures are reported and handled.
Expand Down
12 changes: 12 additions & 0 deletions cmd/cachewd/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -179,6 +179,10 @@ func main() {
stopSignals()
}

// Stop scheduling new work once shutdown begins; cancelScheduler below
// performs the hard teardown after in-flight jobs drain.
drainSchedulerIntake(schedulerProvider)

gracefulShutdown(ctx, logger, server, &shuttingDown, globalConfig.ShutdownReadinessDelay, globalConfig.ShutdownTimeout)

cancelScheduler()
Expand Down Expand Up @@ -211,6 +215,14 @@ func gracefulShutdown(
}
}

func drainSchedulerIntake(provider jobscheduler.Provider) {
scheduler, err := provider()
if err != nil {
return
}
scheduler.Drain()
}

const schedulerDrainTimeout = 10 * time.Second

func drainScheduler(ctx context.Context, logger *slog.Logger, provider jobscheduler.Provider) {
Expand Down
43 changes: 34 additions & 9 deletions internal/jobscheduler/jobs.go
Original file line number Diff line number Diff line change
Expand Up @@ -78,15 +78,18 @@ type RootScheduler struct {
cond *sync.Cond
lock sync.Mutex
done bool
draining bool
queue []queueJob
active map[string]string // queue -> job id
activeClones int
maxCloneConcurrency int
// ctx is cancelled when the scheduler is shutting down. Periodic re-arm
// goroutines select on it so they exit cleanly instead of submitting to a
// dead scheduler.
ctx context.Context //nolint:containedctx
cancel context.CancelFunc
// ctx is cancelled when the scheduler is torn down; periodic re-arm
// goroutines select on it so they exit instead of submitting to a dead
// scheduler.
ctx context.Context //nolint:containedctx
cancel context.CancelFunc
// drain is closed by Drain so intake stops while workers keep running.
drain chan struct{}
wg sync.WaitGroup
store ScheduleStore
metrics *schedulerMetrics
Expand Down Expand Up @@ -127,6 +130,7 @@ func New(ctx context.Context, config Config) (*RootScheduler, error) {
maxCloneConcurrency: maxClones,
store: store,
metrics: m,
drain: make(chan struct{}),
}
q.cond = sync.NewCond(&q.lock)
ctx, cancel := context.WithCancel(ctx)
Expand All @@ -147,6 +151,19 @@ func New(ctx context.Context, config Config) (*RootScheduler, error) {
return q, nil
}

// Drain stops the scheduler accepting new submissions and re-arming periodic
// jobs, while leaving workers running to finish in-flight and queued jobs. The
// worker pool is torn down later by cancelling the context passed to New.
func (q *RootScheduler) Drain() {
q.lock.Lock()
defer q.lock.Unlock()
if q.draining {
return
}
q.draining = true
close(q.drain)
}

func (q *RootScheduler) Close() error {
if q.store != nil {
return errors.WithStack(q.store.Close())
Expand All @@ -163,7 +180,7 @@ func (q *RootScheduler) WithQueuePrefix(prefix string) Scheduler {

func (q *RootScheduler) Submit(queue, id string, run func(ctx context.Context) error) {
q.lock.Lock()
if q.done {
if q.done || q.draining {
q.lock.Unlock()
return
}
Expand All @@ -174,7 +191,7 @@ func (q *RootScheduler) Submit(queue, id string, run func(ctx context.Context) e
}

func (q *RootScheduler) SubmitPeriodicJob(queue, id string, interval time.Duration, run func(ctx context.Context) error) {
if q.ctx.Err() != nil {
if q.ctx.Err() != nil || q.isDraining() {
return
}
key := jobKey(queue, id)
Expand Down Expand Up @@ -204,14 +221,22 @@ func (q *RootScheduler) SubmitPeriodicJob(queue, id string, interval time.Durati
go q.sleepThenSubmit(delay, submit)
}

// sleepThenSubmit waits for d, then runs fn — unless the scheduler is
// shutting down, in which case it returns immediately.
func (q *RootScheduler) isDraining() bool {
q.lock.Lock()
defer q.lock.Unlock()
return q.draining
}

// sleepThenSubmit waits for d, then runs fn — unless the scheduler is draining
// or being torn down, in which case it returns immediately.
func (q *RootScheduler) sleepThenSubmit(d time.Duration, fn func()) {
timer := time.NewTimer(d)
defer timer.Stop()
select {
case <-q.ctx.Done():
return
case <-q.drain:
return
case <-timer.C:
fn()
}
Expand Down
60 changes: 60 additions & 0 deletions internal/jobscheduler/jobs_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -335,6 +335,66 @@ func TestJobSchedulerSubmitDroppedAfterShutdown(t *testing.T) {
assert.False(t, executed.Load(), "submissions after shutdown should be dropped")
}

func TestJobSchedulerDrainStopsNewWork(t *testing.T) {
_, ctx := logging.Configure(context.Background(), logging.Config{Level: slog.LevelError})
ctx, cancel := context.WithCancel(ctx)
defer cancel()

scheduler, err := jobscheduler.New(ctx, jobscheduler.Config{Concurrency: 2})
assert.NoError(t, err)
t.Cleanup(func() { scheduler.Close() })

var periodicExecutions atomic.Int32
scheduler.SubmitPeriodicJob("queue1", "periodic", 50*time.Millisecond, func(_ context.Context) error {
periodicExecutions.Add(1)
return nil
})
eventually(t, time.Second, func() bool { return periodicExecutions.Load() >= 2 },
"periodic job should fire before drain")

scheduler.Drain()
time.Sleep(150 * time.Millisecond)
before := periodicExecutions.Load()
time.Sleep(300 * time.Millisecond)
assert.Equal(t, before, periodicExecutions.Load(),
"periodic job should not re-arm after drain")

var executed atomic.Bool
scheduler.Submit("queue2", "post-drain", func(_ context.Context) error {
executed.Store(true)
return nil
})
time.Sleep(100 * time.Millisecond)
assert.False(t, executed.Load(), "submissions after drain should be dropped")
}

func TestJobSchedulerDrainLetsInFlightFinish(t *testing.T) {
_, ctx := logging.Configure(context.Background(), logging.Config{Level: slog.LevelError})
ctx, cancel := context.WithCancel(ctx)
defer cancel()

scheduler, err := jobscheduler.New(ctx, jobscheduler.Config{Concurrency: 2})
assert.NoError(t, err)
t.Cleanup(func() { scheduler.Close() })

started := make(chan struct{})
release := make(chan struct{})
var finished atomic.Bool
scheduler.Submit("queue1", "in-flight", func(_ context.Context) error {
close(started)
<-release
finished.Store(true)
return nil
})

<-started
scheduler.Drain()
close(release)

eventually(t, time.Second, finished.Load,
"in-flight job should finish after drain")
}

// TestJobSchedulerSurvivesParentCancel verifies the shutdown ordering fix:
// when the scheduler is created with context.WithoutCancel, cancelling the
// parent (simulating SIGTERM) does NOT kill workers. Jobs submitted after the
Expand Down