Skip to content

Commit ec0b7ea

Browse files
JAORMXclaude
andcommitted
Expose RunnerPID and terminate stale runners before cleanup
Add VM.RunnerPID() convenience method that returns the runner process ID directly, complementing the existing VM.ID() string accessor. Update WithCleanDataDir to terminate orphaned runner processes before wiping the data directory. Reads the persisted state file, checks if the recorded PID is still alive, sends SIGTERM with a 5s grace period, then SIGKILL if needed. Prevents orphaned runners from holding KVM file descriptors and virtiofs mounts after a parent process crash. Closes #3 Closes #5 Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
1 parent bbf6c8b commit ec0b7ea

5 files changed

Lines changed: 286 additions & 0 deletions

File tree

options.go

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,7 @@ import (
77
"context"
88
"os"
99
"path/filepath"
10+
"syscall"
1011

1112
"github.com/stacklok/propolis/hypervisor"
1213
"github.com/stacklok/propolis/image"
@@ -81,6 +82,8 @@ type config struct {
8182
cleanDataDir bool
8283
removeAll func(string) error
8384
stat func(string) (os.FileInfo, error)
85+
killProcess func(pid int, sig syscall.Signal) error
86+
processAlive func(pid int) bool
8487
}
8588

8689
func defaultConfig() *config {
@@ -96,6 +99,14 @@ func defaultConfig() *config {
9699
dataDir: dataDir,
97100
removeAll: forceRemoveAll,
98101
stat: os.Stat,
102+
killProcess: func(pid int, sig syscall.Signal) error { return syscall.Kill(pid, sig) },
103+
processAlive: func(pid int) bool {
104+
proc, err := os.FindProcess(pid)
105+
if err != nil {
106+
return false
107+
}
108+
return proc.Signal(syscall.Signal(0)) == nil
109+
},
99110
}
100111
}
101112

propolis.go

Lines changed: 57 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,8 @@ import (
2424
"path/filepath"
2525
"strconv"
2626
"strings"
27+
"syscall"
28+
"time"
2729

2830
"github.com/stacklok/propolis/hypervisor"
2931
"github.com/stacklok/propolis/hypervisor/libkrun"
@@ -226,6 +228,15 @@ func Run(ctx context.Context, imageRef string, opts ...Option) (*VM, error) {
226228
return vm, nil
227229
}
228230

231+
const (
232+
// staleTermTimeout is the maximum time to wait for a stale runner to
233+
// exit after SIGTERM before sending SIGKILL.
234+
staleTermTimeout = 5 * time.Second
235+
// staleTermPoll is the interval between liveness checks during stale
236+
// runner termination.
237+
staleTermPoll = 250 * time.Millisecond
238+
)
239+
229240
func cleanDataDir(cfg *config) error {
230241
if cfg.dataDir == "" {
231242
return nil
@@ -238,6 +249,9 @@ func cleanDataDir(cfg *config) error {
238249
return fmt.Errorf("check data dir: %w", err)
239250
}
240251

252+
// Best-effort: terminate any stale runner process before wiping.
253+
terminateStaleRunner(cfg)
254+
241255
var keep []string
242256
cache := cacheDir(cfg)
243257
if cache != "" && isWithin(cfg.dataDir, cache) {
@@ -259,6 +273,49 @@ func cleanDataDir(cfg *config) error {
259273
return nil
260274
}
261275

276+
// terminateStaleRunner checks the state file in the data directory for a
277+
// previously-running runner process. If the PID is alive, it sends SIGTERM
278+
// and waits up to staleTermTimeout before sending SIGKILL. This prevents
279+
// orphaned runner processes from holding KVM file descriptors and virtiofs
280+
// mounts when the parent process was hard-killed.
281+
func terminateStaleRunner(cfg *config) {
282+
mgr := state.NewManager(cfg.dataDir)
283+
st, err := mgr.Load()
284+
if err != nil {
285+
slog.Debug("could not load state for stale runner check", "error", err)
286+
return
287+
}
288+
if st.PID <= 0 {
289+
return
290+
}
291+
if !cfg.processAlive(st.PID) {
292+
slog.Debug("stale runner already dead", "pid", st.PID)
293+
return
294+
}
295+
296+
slog.Warn("terminating stale runner process", "pid", st.PID)
297+
if err := cfg.killProcess(st.PID, syscall.SIGTERM); err != nil {
298+
slog.Warn("failed to send SIGTERM to stale runner", "pid", st.PID, "error", err)
299+
return
300+
}
301+
302+
// Poll until the process exits or the timeout expires.
303+
deadline := time.Now().Add(staleTermTimeout)
304+
for time.Now().Before(deadline) {
305+
time.Sleep(staleTermPoll)
306+
if !cfg.processAlive(st.PID) {
307+
slog.Info("stale runner terminated gracefully", "pid", st.PID)
308+
return
309+
}
310+
}
311+
312+
// Force kill.
313+
slog.Warn("stale runner did not exit after SIGTERM, sending SIGKILL", "pid", st.PID)
314+
if err := cfg.killProcess(st.PID, syscall.SIGKILL); err != nil {
315+
slog.Warn("failed to send SIGKILL to stale runner", "pid", st.PID, "error", err)
316+
}
317+
}
318+
262319
// forceRemoveAll removes the given path, handling read-only directory trees
263320
// such as Go module caches (whose entries are set to 0444/0555). It first
264321
// attempts a plain os.RemoveAll; on failure it walks the tree making every

propolis_test.go

Lines changed: 190 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,8 @@ import (
88
"fmt"
99
"os"
1010
"path/filepath"
11+
"sync"
12+
"syscall"
1113
"testing"
1214

1315
v1 "github.com/google/go-containerregistry/pkg/v1"
@@ -726,3 +728,191 @@ func TestWithBackend(t *testing.T) {
726728
WithBackend(backend).apply(cfg)
727729
assert.Equal(t, backend, cfg.backend)
728730
}
731+
732+
// --- terminateStaleRunner tests ---
733+
734+
func TestTerminateStaleRunner_NoStateFile(t *testing.T) {
735+
t.Parallel()
736+
737+
dataDir := t.TempDir()
738+
cfg := defaultConfig()
739+
cfg.dataDir = dataDir
740+
741+
// Should not panic or error when no state file exists.
742+
terminateStaleRunner(cfg)
743+
}
744+
745+
func TestTerminateStaleRunner_DeadProcess(t *testing.T) {
746+
t.Parallel()
747+
748+
dataDir := t.TempDir()
749+
750+
// Write state with a PID that doesn't exist.
751+
mgr := state.NewManager(dataDir)
752+
ls, err := mgr.LoadAndLock(context.Background())
753+
require.NoError(t, err)
754+
ls.State.Active = true
755+
ls.State.PID = 2147483647 // max PID, almost certainly dead
756+
require.NoError(t, ls.Save())
757+
ls.Release()
758+
759+
cfg := defaultConfig()
760+
cfg.dataDir = dataDir
761+
762+
var killCalled bool
763+
cfg.killProcess = func(_ int, _ syscall.Signal) error {
764+
killCalled = true
765+
return nil
766+
}
767+
cfg.processAlive = func(_ int) bool { return false }
768+
769+
terminateStaleRunner(cfg)
770+
assert.False(t, killCalled, "should not attempt to kill a dead process")
771+
}
772+
773+
func TestTerminateStaleRunner_AliveProcess_GracefulExit(t *testing.T) {
774+
t.Parallel()
775+
776+
dataDir := t.TempDir()
777+
778+
mgr := state.NewManager(dataDir)
779+
ls, err := mgr.LoadAndLock(context.Background())
780+
require.NoError(t, err)
781+
ls.State.Active = true
782+
ls.State.PID = 99999
783+
require.NoError(t, ls.Save())
784+
ls.Release()
785+
786+
cfg := defaultConfig()
787+
cfg.dataDir = dataDir
788+
789+
var mu sync.Mutex
790+
var signals []syscall.Signal
791+
aliveCount := 0
792+
793+
cfg.killProcess = func(pid int, sig syscall.Signal) error {
794+
assert.Equal(t, 99999, pid)
795+
mu.Lock()
796+
signals = append(signals, sig)
797+
mu.Unlock()
798+
return nil
799+
}
800+
cfg.processAlive = func(_ int) bool {
801+
mu.Lock()
802+
defer mu.Unlock()
803+
aliveCount++
804+
// Process is alive on first check (before SIGTERM), dead on second
805+
// (after SIGTERM + first poll).
806+
return aliveCount <= 1
807+
}
808+
809+
terminateStaleRunner(cfg)
810+
811+
mu.Lock()
812+
defer mu.Unlock()
813+
require.Len(t, signals, 1, "should only send SIGTERM")
814+
assert.Equal(t, syscall.SIGTERM, signals[0])
815+
}
816+
817+
func TestTerminateStaleRunner_AliveProcess_RequiresKill(t *testing.T) {
818+
t.Parallel()
819+
820+
dataDir := t.TempDir()
821+
822+
mgr := state.NewManager(dataDir)
823+
ls, err := mgr.LoadAndLock(context.Background())
824+
require.NoError(t, err)
825+
ls.State.Active = true
826+
ls.State.PID = 99999
827+
require.NoError(t, ls.Save())
828+
ls.Release()
829+
830+
cfg := defaultConfig()
831+
cfg.dataDir = dataDir
832+
833+
var mu sync.Mutex
834+
var signals []syscall.Signal
835+
836+
cfg.killProcess = func(pid int, sig syscall.Signal) error {
837+
assert.Equal(t, 99999, pid)
838+
mu.Lock()
839+
signals = append(signals, sig)
840+
mu.Unlock()
841+
return nil
842+
}
843+
// Process never exits on its own.
844+
cfg.processAlive = func(_ int) bool { return true }
845+
846+
terminateStaleRunner(cfg)
847+
848+
mu.Lock()
849+
defer mu.Unlock()
850+
require.Len(t, signals, 2, "should send SIGTERM then SIGKILL")
851+
assert.Equal(t, syscall.SIGTERM, signals[0])
852+
assert.Equal(t, syscall.SIGKILL, signals[1])
853+
}
854+
855+
func TestTerminateStaleRunner_ZeroPID(t *testing.T) {
856+
t.Parallel()
857+
858+
dataDir := t.TempDir()
859+
860+
// Write state with PID=0 (clean shutdown).
861+
mgr := state.NewManager(dataDir)
862+
ls, err := mgr.LoadAndLock(context.Background())
863+
require.NoError(t, err)
864+
ls.State.Active = false
865+
ls.State.PID = 0
866+
require.NoError(t, ls.Save())
867+
ls.Release()
868+
869+
cfg := defaultConfig()
870+
cfg.dataDir = dataDir
871+
872+
var killCalled bool
873+
cfg.killProcess = func(_ int, _ syscall.Signal) error {
874+
killCalled = true
875+
return nil
876+
}
877+
878+
terminateStaleRunner(cfg)
879+
assert.False(t, killCalled, "should not attempt to kill PID 0")
880+
}
881+
882+
func TestRun_WithCleanDataDir_TerminatesStaleRunner(t *testing.T) {
883+
t.Parallel()
884+
885+
dataDir := t.TempDir()
886+
887+
// Pre-populate state as if a previous runner crashed.
888+
mgr := state.NewManager(dataDir)
889+
ls, err := mgr.LoadAndLock(context.Background())
890+
require.NoError(t, err)
891+
ls.State.Active = true
892+
ls.State.PID = 2147483647 // dead PID
893+
require.NoError(t, ls.Save())
894+
ls.Release()
895+
896+
rootfsDir := filepath.Join(dataDir, "rootfs")
897+
require.NoError(t, os.MkdirAll(rootfsDir, 0o755))
898+
899+
handle := &mockVMHandle{id: "1234", alive: true}
900+
netProv := &mockNetProvider{sockPath: "/tmp/fake.sock"}
901+
902+
vm, err := Run(context.Background(), "test:latest",
903+
WithDataDir(dataDir),
904+
WithCleanDataDir(),
905+
WithPreflightChecker(preflight.NewEmpty()),
906+
WithRootFSPath(rootfsDir),
907+
WithNetProvider(netProv),
908+
WithBackend(&mockBackend{startHandle: handle}),
909+
)
910+
require.NoError(t, err)
911+
require.NotNil(t, vm)
912+
913+
// The new state should reflect the new VM, not the stale one.
914+
loaded, loadErr := mgr.Load()
915+
require.NoError(t, loadErr)
916+
assert.True(t, loaded.Active)
917+
assert.Equal(t, 1234, loaded.PID)
918+
}

vm.go

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -125,6 +125,16 @@ func (vm *VM) Remove(ctx context.Context) error {
125125
return nil
126126
}
127127

128+
// RunnerPID returns the runner process ID, or 0 if the ID cannot be parsed
129+
// as a PID (e.g. non-process-based backends).
130+
func (vm *VM) RunnerPID() int {
131+
pid, err := pidFromID(vm.handle.ID())
132+
if err != nil {
133+
return 0
134+
}
135+
return pid
136+
}
137+
128138
// Name returns the VM name.
129139
func (vm *VM) Name() string { return vm.name }
130140

vm_test.go

Lines changed: 18 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -226,6 +226,24 @@ func TestVM_Remove_RemovesRootfsOutsideCache(t *testing.T) {
226226
require.NoError(t, err)
227227
}
228228

229+
func TestVM_RunnerPID_Valid(t *testing.T) {
230+
t.Parallel()
231+
232+
handle := &mockVMHandle{id: "1234", alive: true}
233+
vm := &VM{name: "test-vm", handle: handle}
234+
235+
assert.Equal(t, 1234, vm.RunnerPID())
236+
}
237+
238+
func TestVM_RunnerPID_NonNumericID(t *testing.T) {
239+
t.Parallel()
240+
241+
handle := &mockVMHandle{id: "abc-123", alive: true}
242+
vm := &VM{name: "test-vm", handle: handle}
243+
244+
assert.Equal(t, 0, vm.RunnerPID())
245+
}
246+
229247
func TestVM_Accessors(t *testing.T) {
230248
t.Parallel()
231249

0 commit comments

Comments
 (0)