Skip to content

Commit b4a61ec

Browse files
fuldaxxxldmonster
andcommitted
[shell-operator] fix: track child PIDs to avoid zombie reaper race (#886)
Signed-off-by: Ruslan Gorbunov <ruslan.gorbunov@flant.com> Co-authored-by: Pavel Okhlopkov <pavel.okhlopkov@flant.com>
1 parent bc46d63 commit b4a61ec

3 files changed

Lines changed: 365 additions & 4 deletions

File tree

pkg/executor/executor.go

Lines changed: 44 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@ import (
44
"bufio"
55
"bytes"
66
"context"
7+
"errors"
78
"fmt"
89
"io"
910
"log/slog"
@@ -24,12 +25,20 @@ const (
2425
serviceName = "executor"
2526
)
2627

28+
// Run starts the command, waits for it to complete, and returns the error.
29+
// The child PID is registered in the global process registry while the process
30+
// is running so that a PID-1 zombie reaper does not steal it.
2731
func Run(cmd *exec.Cmd) error {
2832
// TODO context: hook name, hook phase, hook binding
2933
// TODO observability
3034
log.Debug("Executing command", slog.String(pkg.LogKeyCommand, strings.Join(cmd.Args, " ")), slog.String(pkg.LogKeyDir, cmd.Dir))
3135

32-
return cmd.Run()
36+
if err := startAndRegister(cmd); err != nil {
37+
return err
38+
}
39+
defer unregisterPID(cmd.Process.Pid)
40+
41+
return cmd.Wait()
3342
}
3443

3544
// StderrError is returned by RunAndLogLines when a command fails and produces
@@ -113,7 +122,34 @@ func (e *Executor) Output() ([]byte, error) {
113122
e.logger.Debug("Executing command",
114123
slog.String(pkg.LogKeyCommand, strings.Join(e.cmd.Args, " ")),
115124
slog.String(pkg.LogKeyDir, e.cmd.Dir))
116-
return e.cmd.Output()
125+
126+
// Reproduce cmd.Output() but interleave PID registration so that the
127+
// PID-1 zombie reaper skips this process.
128+
if e.cmd.Stdout != nil {
129+
return nil, errors.New("exec: Stdout already set")
130+
}
131+
var stdout bytes.Buffer
132+
e.cmd.Stdout = &stdout
133+
134+
captureErr := e.cmd.Stderr == nil
135+
var stderrBuf bytes.Buffer
136+
if captureErr {
137+
e.cmd.Stderr = &stderrBuf
138+
}
139+
140+
if err := startAndRegister(e.cmd); err != nil {
141+
return nil, err
142+
}
143+
defer unregisterPID(e.cmd.Process.Pid)
144+
145+
err := e.cmd.Wait()
146+
if err != nil && captureErr {
147+
if ee, ok := err.(*exec.ExitError); ok {
148+
ee.Stderr = stderrBuf.Bytes()
149+
}
150+
}
151+
152+
return stdout.Bytes(), err
117153
}
118154

119155
type CmdUsage struct {
@@ -154,7 +190,12 @@ func (e *Executor) RunAndLogLines(ctx context.Context, logLabels map[string]stri
154190
e.cmd.Stdout = plo
155191
e.cmd.Stderr = io.MultiWriter(ple, stdErr)
156192

157-
err := e.cmd.Run()
193+
if err := startAndRegister(e.cmd); err != nil {
194+
return nil, fmt.Errorf("cmd start: %w", err)
195+
}
196+
defer unregisterPID(e.cmd.Process.Pid)
197+
198+
err := e.cmd.Wait()
158199
if err != nil {
159200
if len(stdErr.Bytes()) > 0 {
160201
return nil, &StderrError{Message: stdErr.String()}

pkg/executor/executor_test.go

Lines changed: 221 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -3,11 +3,11 @@ package executor
33
import (
44
"bytes"
55
"context"
6-
json "github.com/flant/shell-operator/pkg/utils/json"
76
"fmt"
87
"io"
98
"math/rand/v2"
109
"os"
10+
"os/exec"
1111
"regexp"
1212
"strings"
1313
"testing"
@@ -16,6 +16,8 @@ import (
1616
"github.com/deckhouse/deckhouse/pkg/log"
1717
"github.com/stretchr/testify/assert"
1818
"github.com/stretchr/testify/require"
19+
20+
json "github.com/flant/shell-operator/pkg/utils/json"
1921
)
2022

2123
func TestRunAndLogLines(t *testing.T) {
@@ -250,3 +252,221 @@ func randStringRunes(n int) string {
250252
}
251253
return string(b)
252254
}
255+
256+
// newTestRegistry creates a fresh processRegistry for tests, swaps the
257+
// global singleton, and restores the original with t.Cleanup. It returns
258+
// the fresh test registry.
259+
func newTestRegistry(t *testing.T) *processRegistry {
260+
t.Helper()
261+
262+
r := &processRegistry{activePIDs: make(map[int]struct{})}
263+
orig := registry
264+
registry = r
265+
t.Cleanup(func() { registry = orig })
266+
267+
return r
268+
}
269+
270+
func TestProcessRegistry_Basic(t *testing.T) {
271+
r := &processRegistry{activePIDs: make(map[int]struct{})}
272+
273+
// Initially empty
274+
assert.False(t, r.IsActive(1), "IsActive should return false for unknown PID")
275+
assert.False(t, r.IsActive(12345), "IsActive should return false for unknown PID")
276+
277+
// Register and check
278+
r.register(42)
279+
assert.True(t, r.IsActive(42), "IsActive should return true for registered PID")
280+
assert.False(t, r.IsActive(43), "IsActive should return false for different PID")
281+
282+
// Unregister and check
283+
r.unregister(42)
284+
assert.False(t, r.IsActive(42), "IsActive should return false after unregister")
285+
}
286+
287+
func TestProcessRegistry_DoubleUnregister(t *testing.T) {
288+
r := &processRegistry{activePIDs: make(map[int]struct{})}
289+
290+
r.register(100)
291+
r.unregister(100)
292+
r.unregister(100) // should not panic
293+
294+
assert.False(t, r.IsActive(100))
295+
}
296+
297+
func TestProcessRegistry_Concurrent(t *testing.T) {
298+
r := &processRegistry{activePIDs: make(map[int]struct{})}
299+
const goroutines = 100
300+
const pidsPerGoroutine = 100
301+
302+
done := make(chan struct{})
303+
304+
// Concurrently register PIDs
305+
for i := range goroutines {
306+
go func() {
307+
defer func() { done <- struct{}{} }()
308+
for j := 0; j < pidsPerGoroutine; j++ {
309+
r.register(i*pidsPerGoroutine + j)
310+
}
311+
}()
312+
}
313+
314+
for range goroutines {
315+
<-done
316+
}
317+
318+
// All PIDs should be registered
319+
for i := range goroutines {
320+
for j := 0; j < pidsPerGoroutine; j++ {
321+
assert.True(t, r.IsActive(i*pidsPerGoroutine+j))
322+
}
323+
}
324+
325+
// Concurrently unregister PIDs
326+
for i := range goroutines {
327+
go func() {
328+
defer func() { done <- struct{}{} }()
329+
for j := 0; j < pidsPerGoroutine; j++ {
330+
r.unregister(i*pidsPerGoroutine + j)
331+
}
332+
}()
333+
}
334+
335+
for range goroutines {
336+
<-done
337+
}
338+
339+
// All PIDs should be unregistered
340+
for i := range goroutines {
341+
for j := 0; j < pidsPerGoroutine; j++ {
342+
assert.False(t, r.IsActive(i*pidsPerGoroutine+j))
343+
}
344+
}
345+
}
346+
347+
func TestTracker_IsActive(t *testing.T) {
348+
newTestRegistry(t)
349+
tracker := Tracker()
350+
351+
// PID not registered
352+
assert.False(t, tracker.IsActive(42))
353+
354+
// Register via internal helper (same path as executor methods)
355+
registerPID(42)
356+
assert.True(t, tracker.IsActive(42))
357+
358+
unregisterPID(42)
359+
assert.False(t, tracker.IsActive(42))
360+
}
361+
362+
func TestStartAndRegister_AtomicWithReaper(t *testing.T) {
363+
r := newTestRegistry(t)
364+
365+
// StartAndRegister must hold the write-lock across both cmd.Start() and
366+
// PID registration, so there is no window where a zombie reaper could
367+
// observe IsActive(pid) == false for a child that cmd.Wait will later reap.
368+
cmd := exec.Command("sleep", "2")
369+
require.NoError(t, startAndRegister(cmd))
370+
defer cmd.Process.Kill()
371+
372+
pid := cmd.Process.Pid
373+
374+
// The PID must already be visible in the registry — no race window.
375+
assert.True(t, r.IsActive(pid), "PID should be registered immediately after StartAndRegister returns")
376+
377+
// Simulate what the reaper does: check via the ProcessTracker interface.
378+
tracker := Tracker()
379+
assert.True(t, tracker.IsActive(pid), "ProcessTracker must see the PID as active")
380+
381+
// Clean up: wait for the process to finish after killing it.
382+
_ = cmd.Process.Kill()
383+
_ = cmd.Wait()
384+
385+
unregisterPID(pid)
386+
assert.False(t, r.IsActive(pid), "PID should be gone after unregister")
387+
}
388+
389+
func TestGlobalRegistry_Output_RegistersPID(t *testing.T) {
390+
r := newTestRegistry(t)
391+
392+
ex := NewExecutor("", "sh", []string{"-c", "sleep 0.2; echo hello"}, []string{})
393+
394+
outputCh := make(chan []byte, 1)
395+
errCh := make(chan error, 1)
396+
go func() {
397+
output, err := ex.Output()
398+
outputCh <- output
399+
errCh <- err
400+
}()
401+
402+
assert.Eventually(t, func() bool {
403+
r.mu.RLock()
404+
defer r.mu.RUnlock()
405+
return len(r.activePIDs) > 0
406+
}, time.Second, 10*time.Millisecond, "expected registry to contain an active PID while Output is running")
407+
408+
output := <-outputCh
409+
err := <-errCh
410+
assert.NoError(t, err)
411+
assert.Contains(t, string(output), "hello")
412+
413+
r.mu.RLock()
414+
count := len(r.activePIDs)
415+
r.mu.RUnlock()
416+
assert.Empty(t, count, "expected registry to be empty after Output returns")
417+
}
418+
419+
func TestGlobalRegistry_Output_FailedStart(t *testing.T) {
420+
newTestRegistry(t)
421+
422+
// Command that doesn't exist — Start() should fail.
423+
ex := NewExecutor("", "/nonexistent/binary", []string{}, []string{})
424+
_, err := ex.Output()
425+
assert.Error(t, err)
426+
}
427+
428+
func TestGlobalRegistry_RunAndLogLines_RegistersPID(t *testing.T) {
429+
r := newTestRegistry(t)
430+
431+
logger := log.NewLogger()
432+
logger.SetLevel(log.LevelInfo)
433+
434+
ex := NewExecutor("", "sh", []string{"-c", "sleep 0.2; echo test-output"}, []string{}).
435+
WithLogger(logger)
436+
437+
usageCh := make(chan *CmdUsage, 1)
438+
errCh := make(chan error, 1)
439+
go func() {
440+
usage, err := ex.RunAndLogLines(context.Background(), map[string]string{})
441+
usageCh <- usage
442+
errCh <- err
443+
}()
444+
445+
assert.Eventually(t, func() bool {
446+
r.mu.RLock()
447+
defer r.mu.RUnlock()
448+
return len(r.activePIDs) > 0
449+
}, time.Second, 10*time.Millisecond, "expected registry to contain an active PID while RunAndLogLines is running")
450+
451+
usage := <-usageCh
452+
err := <-errCh
453+
assert.NoError(t, err)
454+
assert.NotNil(t, usage)
455+
456+
r.mu.RLock()
457+
count := len(r.activePIDs)
458+
r.mu.RUnlock()
459+
assert.Empty(t, count, "expected registry to be empty after RunAndLogLines returns")
460+
}
461+
462+
func TestGlobalRegistry_RunAndLogLines_FailedStart(t *testing.T) {
463+
newTestRegistry(t)
464+
465+
logger := log.NewLogger()
466+
467+
ex := NewExecutor("", "/nonexistent/binary", []string{}, []string{}).
468+
WithLogger(logger)
469+
470+
_, err := ex.RunAndLogLines(context.Background(), map[string]string{})
471+
assert.Error(t, err)
472+
}

0 commit comments

Comments
 (0)