Skip to content
This repository was archived by the owner on Apr 15, 2026. It is now read-only.

Commit 6dd788b

Browse files
Merge branch 'shutdown-fixes' into cleanup-tmp
2 parents 6139564 + d6d9f2b commit 6dd788b

3 files changed

Lines changed: 32 additions & 49 deletions

File tree

internal/runner/runner.go

Lines changed: 14 additions & 26 deletions
Original file line numberDiff line numberDiff line change
@@ -707,44 +707,32 @@ func (r *Runner) ForceKill() {
707707

708708
func (r *Runner) verifyProcessCleanup(pid int) {
709709
log := r.logger.Sugar()
710-
const checkInterval = 10 * time.Millisecond
711-
712710
log.Infow("starting process cleanup verification", "pid", pid)
713711

714712
timeout := r.cleanupTimeout
715713
if timeout == 0 {
716-
timeout = 10 * time.Second // Default fallback
714+
timeout = 10 * time.Second
717715
}
718716

719717
timer := time.NewTimer(timeout)
720718
defer timer.Stop()
721719

722-
ticker := time.NewTicker(checkInterval)
723-
defer ticker.Stop()
724-
725-
for {
720+
select {
721+
case <-r.stopped:
722+
log.Infow("process cleanup verified successfully", "pid", pid)
726723
select {
727-
case <-timer.C:
728-
log.Errorw("process cleanup timeout exceeded, forcing server exit",
729-
"pid", pid, "timeout", timeout)
730-
// Signal forced shutdown - this is idempotent and never blocks
731-
if r.forceShutdown.TriggerForceShutdown() {
732-
log.Errorw("triggered force shutdown signal")
733-
}
734-
return
724+
case r.cleanupSlot <- struct{}{}:
725+
default:
726+
}
727+
return
735728

736-
case <-ticker.C:
737-
// Verify if process group has been terminated
738-
if err := r.verifyFn(pid); err == nil {
739-
log.Infow("process cleanup verified successfully", "pid", pid)
740-
// Return cleanup token to allow future cleanup
741-
select {
742-
case r.cleanupSlot <- struct{}{}:
743-
default:
744-
}
745-
return
746-
}
729+
case <-timer.C:
730+
log.Errorw("process cleanup timeout exceeded, forcing server exit",
731+
"pid", pid, "timeout", timeout)
732+
if r.forceShutdown.TriggerForceShutdown() {
733+
log.Errorw("triggered force shutdown signal")
747734
}
735+
return
748736
}
749737
}
750738

internal/runner/runner_test.go

Lines changed: 9 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -1017,7 +1017,7 @@ func TestRunnerTempDirectoryCleanup(t *testing.T) {
10171017
err = r.Stop()
10181018
require.NoError(t, err, "Runner.Stop() should not error")
10191019

1020-
time.Sleep(100 * time.Millisecond)
1020+
time.Sleep(1 * time.Millisecond)
10211021

10221022
_, err = os.Stat(tmpDir)
10231023
assert.True(t, os.IsNotExist(err), "Temp directory should be cleaned up after Stop()")
@@ -1428,30 +1428,25 @@ func TestForceKillCleanupFailures(t *testing.T) {
14281428

14291429
t.Run("procedure mode cleanup success returns token", func(t *testing.T) {
14301430
forceShutdown := config.NewForceShutdownSignal()
1431-
verifyCallCount := 0
14321431

14331432
r := &Runner{
1434-
cleanupTimeout: 100 * time.Millisecond,
1433+
cleanupTimeout: 1 * time.Millisecond,
14351434
forceShutdown: forceShutdown,
14361435
cleanupSlot: make(chan struct{}, 1),
1437-
verifyFn: func(pid int) error {
1438-
verifyCallCount++
1439-
if verifyCallCount >= 3 {
1440-
return nil // Success after a few attempts
1441-
}
1442-
return fmt.Errorf("process still exists")
1443-
},
1444-
logger: zaptest.NewLogger(t),
1436+
stopped: make(chan bool),
1437+
logger: zaptest.NewLogger(t),
14451438
}
14461439

14471440
// Start verification process
14481441
go r.verifyProcessCleanup(12345)
14491442

1450-
// Wait for verification to complete
1451-
time.Sleep(150 * time.Millisecond)
1443+
// Signal process stopped to trigger cleanup completion
1444+
close(r.stopped)
1445+
1446+
// Give a moment for cleanup to complete
1447+
time.Sleep(1 * time.Millisecond)
14521448

14531449
// Verify cleanup token was returned
14541450
assert.Len(t, r.cleanupSlot, 1)
1455-
assert.GreaterOrEqual(t, verifyCallCount, 3)
14561451
})
14571452
}

internal/tests/shutdown_test.go

Lines changed: 9 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -25,7 +25,7 @@ func TestShutdownEndpointE2E(t *testing.T) {
2525
module: "async_sleep",
2626
predictorClass: "Predictor",
2727
concurrencyMax: 1,
28-
runnerShutdownGracePeriod: 10 * time.Second,
28+
runnerShutdownGracePeriod: 100 * time.Millisecond,
2929
})
3030
defer httpTestServer.Close()
3131

@@ -51,7 +51,7 @@ func TestShutdownEndpointE2E(t *testing.T) {
5151
// Service should shutdown gracefully
5252
require.Eventually(t, func() bool {
5353
return svc.IsStopped()
54-
}, 10*time.Second, 100*time.Millisecond, "service should have stopped after shutdown")
54+
}, 1*time.Second, 10*time.Millisecond, "service should have stopped after shutdown")
5555

5656
// Service should no longer be running
5757
assert.False(t, svc.IsRunning())
@@ -73,7 +73,7 @@ func TestShutdownEndpointWaitsForInflightPredictions(t *testing.T) {
7373
module: "async_sleep", // Use async predictor for cancellation support
7474
predictorClass: "Predictor",
7575
concurrencyMax: 1,
76-
runnerShutdownGracePeriod: 30 * time.Second, // Allow time for graceful shutdown
76+
runnerShutdownGracePeriod: 200 * time.Millisecond, // Allow time for graceful shutdown
7777
})
7878
defer httpTestServer.Close()
7979

@@ -108,7 +108,7 @@ func TestShutdownEndpointWaitsForInflightPredictions(t *testing.T) {
108108
case webhookEvent := <-receiverServer.webhookReceiverChan:
109109
assert.Equal(t, runner.PredictionProcessing, webhookEvent.Response.Status)
110110
assert.Equal(t, predictionID, webhookEvent.Response.ID)
111-
case <-time.After(10 * time.Second):
111+
case <-time.After(5 * time.Second):
112112
t.Fatal("did not receive prediction started webhook")
113113
}
114114

@@ -143,15 +143,15 @@ func TestShutdownEndpointWaitsForInflightPredictions(t *testing.T) {
143143
case webhookEvent := <-receiverServer.webhookReceiverChan:
144144
assert.Equal(t, predictionID, webhookEvent.Response.ID)
145145
assert.Equal(t, runner.PredictionCanceled, webhookEvent.Response.Status)
146-
case <-time.After(15 * time.Second):
146+
case <-time.After(5 * time.Second):
147147
t.Fatal("did not receive prediction canceled webhook")
148148
}
149149

150150
// The key test: verify the SERVICE itself has shut down gracefully
151151
// Wait for service to stop (it should stop automatically after shutdown)
152152
require.Eventually(t, func() bool {
153153
return svc.IsStopped()
154-
}, 10*time.Second, 10*time.Millisecond, "service should have stopped after shutdown")
154+
}, 1*time.Second, 10*time.Millisecond, "service should have stopped after shutdown")
155155

156156
// Service should no longer be running
157157
assert.False(t, svc.IsRunning())
@@ -187,7 +187,7 @@ func TestShutdownEndpointFailsInNonAwaitMode(t *testing.T) {
187187
assert.Equal(t, http.StatusOK, shutdownResp.StatusCode)
188188

189189
// Give it a moment for shutdown to process
190-
time.Sleep(1 * time.Second)
190+
time.Sleep(50 * time.Millisecond)
191191

192192
// Verify server has shut down by attempting health check
193193
// Should fail after shutdown completes since the server should be down
@@ -197,7 +197,7 @@ func TestShutdownEndpointFailsInNonAwaitMode(t *testing.T) {
197197
resp.Body.Close()
198198
}
199199
return err != nil // Server should be down
200-
}, 10*time.Second, 500*time.Millisecond, "server should have shut down")
200+
}, 2*time.Second, 50*time.Millisecond, "server should have shut down")
201201
}
202202

203203
func TestShutdownEndpointMultipleCallsIdempotent(t *testing.T) {
@@ -238,7 +238,7 @@ func TestShutdownEndpointMultipleCallsIdempotent(t *testing.T) {
238238
// Service should shutdown gracefully (only once)
239239
require.Eventually(t, func() bool {
240240
return svc.IsStopped()
241-
}, 10*time.Second, 100*time.Millisecond, "service should have stopped after shutdown")
241+
}, 1*time.Second, 10*time.Millisecond, "service should have stopped after shutdown")
242242

243243
assert.False(t, svc.IsRunning())
244244
assert.True(t, svc.IsStopped())

0 commit comments

Comments
 (0)