From a366e1f8eb5b2b14974592adedd13fe95350f502 Mon Sep 17 00:00:00 2001 From: Ruslan Gorbunov Date: Fri, 22 May 2026 15:04:24 +0300 Subject: [PATCH 1/3] fix: add ProcessRegistry to prevent zombie reaper race Signed-off-by: Ruslan Gorbunov --- pkg/executor/executor.go | 96 +++++++++++++++++++++- pkg/executor/executor_test.go | 146 ++++++++++++++++++++++++++++++++++ 2 files changed, 239 insertions(+), 3 deletions(-) diff --git a/pkg/executor/executor.go b/pkg/executor/executor.go index 6e723066..0feb4b8a 100644 --- a/pkg/executor/executor.go +++ b/pkg/executor/executor.go @@ -4,11 +4,13 @@ import ( "bufio" "bytes" "context" + "errors" "fmt" "io" "log/slog" "os/exec" "strings" + "sync" "syscall" "time" @@ -24,12 +26,64 @@ const ( serviceName = "executor" ) +// ProcessRegistry tracks PIDs of processes started by the executor so that +// a PID-1 zombie reaper can skip them (their parent already calls wait). +// This prevents the reaper from stealing a child that cmd.Wait expects to reap. +type ProcessRegistry struct { + mu sync.RWMutex + activePIDs map[int32]struct{} +} + +// NewProcessRegistry creates a new ProcessRegistry. +func NewProcessRegistry() *ProcessRegistry { + return &ProcessRegistry{ + activePIDs: make(map[int32]struct{}), + } +} + +// Register adds pid to the set of active PIDs. +func (r *ProcessRegistry) Register(pid int) { + r.mu.Lock() + r.activePIDs[int32(pid)] = struct{}{} + r.mu.Unlock() +} + +// Unregister removes pid from the set of active PIDs. +func (r *ProcessRegistry) Unregister(pid int) { + r.mu.Lock() + delete(r.activePIDs, int32(pid)) + r.mu.Unlock() +} + +// IsActive reports whether pid is currently tracked as an active process. +func (r *ProcessRegistry) IsActive(pid int) bool { + r.mu.RLock() + _, ok := r.activePIDs[int32(pid)] + r.mu.RUnlock() + return ok +} + +// Registry is the global process registry shared between the executor and +// the PID-1 zombie reaper. All executor methods that spawn child processes +// register their PIDs here so the reaper can skip them. +var Registry = NewProcessRegistry() + +// Run starts the command, waits for it to complete, and returns the error. +// The child PID is registered in the global Registry while the process is +// running so that a PID-1 zombie reaper does not steal it. func Run(cmd *exec.Cmd) error { // TODO context: hook name, hook phase, hook binding // TODO observability log.Debug("Executing command", slog.String(pkg.LogKeyCommand, strings.Join(cmd.Args, " ")), slog.String(pkg.LogKeyDir, cmd.Dir)) - return cmd.Run() + if err := cmd.Start(); err != nil { + return err + } + + Registry.Register(cmd.Process.Pid) + defer Registry.Unregister(cmd.Process.Pid) + + return cmd.Wait() } // StderrError is returned by RunAndLogLines when a command fails and produces @@ -113,7 +167,36 @@ func (e *Executor) Output() ([]byte, error) { e.logger.Debug("Executing command", slog.String(pkg.LogKeyCommand, strings.Join(e.cmd.Args, " ")), slog.String(pkg.LogKeyDir, e.cmd.Dir)) - return e.cmd.Output() + + // Reproduce cmd.Output() but interleave PID registration so that the + // PID-1 zombie reaper skips this process. + if e.cmd.Stdout != nil { + return nil, errors.New("exec: Stdout already set") + } + var stdout bytes.Buffer + e.cmd.Stdout = &stdout + + captureErr := e.cmd.Stderr == nil + var stderrBuf bytes.Buffer + if captureErr { + e.cmd.Stderr = &stderrBuf + } + + if err := e.cmd.Start(); err != nil { + return nil, err + } + + Registry.Register(e.cmd.Process.Pid) + defer Registry.Unregister(e.cmd.Process.Pid) + + err := e.cmd.Wait() + if err != nil && captureErr { + if ee, ok := err.(*exec.ExitError); ok { + ee.Stderr = stderrBuf.Bytes() + } + } + + return stdout.Bytes(), err } type CmdUsage struct { @@ -154,7 +237,14 @@ func (e *Executor) RunAndLogLines(ctx context.Context, logLabels map[string]stri e.cmd.Stdout = plo e.cmd.Stderr = io.MultiWriter(ple, stdErr) - err := e.cmd.Run() + if err := e.cmd.Start(); err != nil { + return nil, fmt.Errorf("cmd start: %w", err) + } + + Registry.Register(e.cmd.Process.Pid) + defer Registry.Unregister(e.cmd.Process.Pid) + + err := e.cmd.Wait() if err != nil { if len(stdErr.Bytes()) > 0 { return nil, &StderrError{Message: stdErr.String()} diff --git a/pkg/executor/executor_test.go b/pkg/executor/executor_test.go index ccfc52fb..c2b86f2a 100644 --- a/pkg/executor/executor_test.go +++ b/pkg/executor/executor_test.go @@ -250,3 +250,149 @@ func randStringRunes(n int) string { } return string(b) } + +func TestProcessRegistry_Basic(t *testing.T) { + r := NewProcessRegistry() + + // Initially empty + assert.False(t, r.IsActive(1), " IsActive should return false for unknown PID") + assert.False(t, r.IsActive(12345), "IsActive should return false for unknown PID") + + // Register and check + r.Register(42) + assert.True(t, r.IsActive(42), "IsActive should return true for registered PID") + assert.False(t, r.IsActive(43), "IsActive should return false for different PID") + + // Unregister and check + r.Unregister(42) + assert.False(t, r.IsActive(42), "IsActive should return false after unregister") +} + +func TestProcessRegistry_DoubleUnregister(t *testing.T) { + r := NewProcessRegistry() + + r.Register(100) + r.Unregister(100) + r.Unregister(100) // should not panic + + assert.False(t, r.IsActive(100)) +} + +func TestProcessRegistry_Concurrent(t *testing.T) { + r := NewProcessRegistry() + const goroutines = 100 + const pidsPerGoroutine = 100 + + done := make(chan struct{}) + + // Concurrently register PIDs + for i := range goroutines { + go func() { + defer func() { done <- struct{}{} }() + for j := 0; j < pidsPerGoroutine; j++ { + r.Register(i*pidsPerGoroutine + j) + } + }() + } + + for range goroutines { + <-done + } + + // All PIDs should be registered + for i := range goroutines { + for j := 0; j < pidsPerGoroutine; j++ { + assert.True(t, r.IsActive(i*pidsPerGoroutine+j)) + } + } + + // Concurrently unregister PIDs + for i := range goroutines { + go func() { + defer func() { done <- struct{}{} }() + for j := 0; j < pidsPerGoroutine; j++ { + r.Unregister(i*pidsPerGoroutine + j) + } + }() + } + + for range goroutines { + <-done + } + + // All PIDs should be unregistered + for i := range goroutines { + for j := 0; j < pidsPerGoroutine; j++ { + assert.False(t, r.IsActive(i*pidsPerGoroutine+j)) + } + } +} + +func TestGlobalRegistry_Output_RegistersPID(t *testing.T) { + // Use a fresh registry to avoid interference with other tests + origRegistry := Registry + Registry = NewProcessRegistry() + defer func() { Registry = origRegistry }() + + ex := NewExecutor("", "echo", []string{"hello"}, []string{}) + + // Before execution, no PID is registered + // After execution, PID should be removed from registry + output, err := ex.Output() + assert.NoError(t, err) + assert.Contains(t, string(output), "hello") + + // PID should be unregistered after Output returns + // (We can't easily check that the PID was registered *during* execution + // without a more complex test, but the ProcessRegistry unit tests cover + // the correctness of Register/Unregister.) +} + +func TestGlobalRegistry_Output_FailedStart(t *testing.T) { + origRegistry := Registry + Registry = NewProcessRegistry() + defer func() { Registry = origRegistry }() + + // Command that doesn't exist — Start() should fail + ex := NewExecutor("", "/nonexistent/binary", []string{}, []string{}) + _, err := ex.Output() + assert.Error(t, err) + + // Registry should be empty — nothing was registered since Start failed + // (This doesn't panic, which is the important part.) +} + +func TestGlobalRegistry_RunAndLogLines_RegistersPID(t *testing.T) { + origRegistry := Registry + Registry = NewProcessRegistry() + defer func() { Registry = origRegistry }() + + logger := log.NewLogger() + logger.SetLevel(log.LevelInfo) + + ex := NewExecutor("", "echo", []string{"test-output"}, []string{}). + WithLogger(logger) + + usage, err := ex.RunAndLogLines(context.Background(), map[string]string{}) + assert.NoError(t, err) + assert.NotNil(t, usage) + + // PID should be unregistered after RunAndLogLines returns +} + +func TestGlobalRegistry_RunAndLogLines_FailedStart(t *testing.T) { + origRegistry := Registry + Registry = NewProcessRegistry() + defer func() { Registry = origRegistry }() + + logger := log.NewLogger() + + // Command that doesn't exist — Start() should fail + ex := NewExecutor("", "/nonexistent/binary", []string{}, []string{}). + WithLogger(logger) + + _, err := ex.RunAndLogLines(context.Background(), map[string]string{}) + assert.Error(t, err) + + // Registry should be empty +} From 358452bef5d921900778c7fff1922a693a0f18c5 Mon Sep 17 00:00:00 2001 From: Ruslan Gorbunov Date: Sat, 23 May 2026 07:07:21 +0300 Subject: [PATCH 2/3] add sleep before command wait for testing Signed-off-by: Ruslan Gorbunov --- pkg/executor/executor.go | 1 + 1 file changed, 1 insertion(+) diff --git a/pkg/executor/executor.go b/pkg/executor/executor.go index 0feb4b8a..8e888276 100644 --- a/pkg/executor/executor.go +++ b/pkg/executor/executor.go @@ -189,6 +189,7 @@ func (e *Executor) Output() ([]byte, error) { Registry.Register(e.cmd.Process.Pid) defer Registry.Unregister(e.cmd.Process.Pid) + time.Sleep(2 * time.Second) err := e.cmd.Wait() if err != nil && captureErr { if ee, ok := err.(*exec.ExitError); ok { From 02dbe69259bf5ee6e8a29dad0771620ca5156366 Mon Sep 17 00:00:00 2001 From: Ruslan Gorbunov Date: Sat, 23 May 2026 08:44:29 +0300 Subject: [PATCH 3/3] remove unnecessary sleep from Output method Signed-off-by: Ruslan Gorbunov --- pkg/executor/executor.go | 1 - 1 file changed, 1 deletion(-) diff --git a/pkg/executor/executor.go b/pkg/executor/executor.go index 8e888276..0feb4b8a 100644 --- a/pkg/executor/executor.go +++ b/pkg/executor/executor.go @@ -189,7 +189,6 @@ func (e *Executor) Output() ([]byte, error) { Registry.Register(e.cmd.Process.Pid) defer Registry.Unregister(e.cmd.Process.Pid) - time.Sleep(2 * time.Second) err := e.cmd.Wait() if err != nil && captureErr { if ee, ok := err.(*exec.ExitError); ok {