Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
96 changes: 93 additions & 3 deletions pkg/executor/executor.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,11 +4,13 @@ import (
"bufio"
"bytes"
"context"
"errors"
"fmt"
"io"
"log/slog"
"os/exec"
"strings"
"sync"
"syscall"
"time"

Expand All @@ -24,12 +26,64 @@ const (
serviceName = "executor"
)

// ProcessRegistry tracks PIDs of processes started by the executor so that
// a PID-1 zombie reaper can skip them (their parent already calls wait).
// This prevents the reaper from stealing a child that cmd.Wait expects to reap.
type ProcessRegistry struct {
mu sync.RWMutex
activePIDs map[int32]struct{}
}

// NewProcessRegistry creates a new ProcessRegistry.
func NewProcessRegistry() *ProcessRegistry {
return &ProcessRegistry{
activePIDs: make(map[int32]struct{}),
}
}

// Register adds pid to the set of active PIDs.
func (r *ProcessRegistry) Register(pid int) {
r.mu.Lock()
r.activePIDs[int32(pid)] = struct{}{}
r.mu.Unlock()
}

// Unregister removes pid from the set of active PIDs.
func (r *ProcessRegistry) Unregister(pid int) {
r.mu.Lock()
delete(r.activePIDs, int32(pid))
r.mu.Unlock()
}

// IsActive reports whether pid is currently tracked as an active process.
func (r *ProcessRegistry) IsActive(pid int) bool {
r.mu.RLock()
_, ok := r.activePIDs[int32(pid)]
r.mu.RUnlock()
return ok
}

// Registry is the global process registry shared between the executor and
// the PID-1 zombie reaper. All executor methods that spawn child processes
// register their PIDs here so the reaper can skip them.
var Registry = NewProcessRegistry()

// Run starts the command, waits for it to complete, and returns the error.
// The child PID is registered in the global Registry while the process is
// running so that a PID-1 zombie reaper does not steal it.
func Run(cmd *exec.Cmd) error {
// TODO context: hook name, hook phase, hook binding
// TODO observability
log.Debug("Executing command", slog.String(pkg.LogKeyCommand, strings.Join(cmd.Args, " ")), slog.String(pkg.LogKeyDir, cmd.Dir))

return cmd.Run()
if err := cmd.Start(); err != nil {
return err
}

Registry.Register(cmd.Process.Pid)
defer Registry.Unregister(cmd.Process.Pid)

return cmd.Wait()
}

// StderrError is returned by RunAndLogLines when a command fails and produces
Expand Down Expand Up @@ -113,7 +167,36 @@ func (e *Executor) Output() ([]byte, error) {
e.logger.Debug("Executing command",
slog.String(pkg.LogKeyCommand, strings.Join(e.cmd.Args, " ")),
slog.String(pkg.LogKeyDir, e.cmd.Dir))
return e.cmd.Output()

// Reproduce cmd.Output() but interleave PID registration so that the
// PID-1 zombie reaper skips this process.
if e.cmd.Stdout != nil {
return nil, errors.New("exec: Stdout already set")
}
var stdout bytes.Buffer
e.cmd.Stdout = &stdout

captureErr := e.cmd.Stderr == nil
var stderrBuf bytes.Buffer
if captureErr {
e.cmd.Stderr = &stderrBuf
}

if err := e.cmd.Start(); err != nil {
return nil, err
}

Registry.Register(e.cmd.Process.Pid)
defer Registry.Unregister(e.cmd.Process.Pid)

err := e.cmd.Wait()
if err != nil && captureErr {
if ee, ok := err.(*exec.ExitError); ok {
ee.Stderr = stderrBuf.Bytes()
}
}

return stdout.Bytes(), err
}

type CmdUsage struct {
Expand Down Expand Up @@ -154,7 +237,14 @@ func (e *Executor) RunAndLogLines(ctx context.Context, logLabels map[string]stri
e.cmd.Stdout = plo
e.cmd.Stderr = io.MultiWriter(ple, stdErr)

err := e.cmd.Run()
if err := e.cmd.Start(); err != nil {
return nil, fmt.Errorf("cmd start: %w", err)
}

Registry.Register(e.cmd.Process.Pid)
defer Registry.Unregister(e.cmd.Process.Pid)

err := e.cmd.Wait()
if err != nil {
if len(stdErr.Bytes()) > 0 {
return nil, &StderrError{Message: stdErr.String()}
Expand Down
146 changes: 146 additions & 0 deletions pkg/executor/executor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -250,3 +250,149 @@ func randStringRunes(n int) string {
}
return string(b)
}

func TestProcessRegistry_Basic(t *testing.T) {
r := NewProcessRegistry()

// Initially empty
assert.False(t, r.IsActive(1), " IsActive should return false for unknown PID")
assert.False(t, r.IsActive(12345), "IsActive should return false for unknown PID")

// Register and check
r.Register(42)
assert.True(t, r.IsActive(42), "IsActive should return true for registered PID")
assert.False(t, r.IsActive(43), "IsActive should return false for different PID")

// Unregister and check
r.Unregister(42)
assert.False(t, r.IsActive(42), "IsActive should return false after unregister")
}

func TestProcessRegistry_DoubleUnregister(t *testing.T) {
r := NewProcessRegistry()

r.Register(100)
r.Unregister(100)
r.Unregister(100) // should not panic

assert.False(t, r.IsActive(100))
}

func TestProcessRegistry_Concurrent(t *testing.T) {
r := NewProcessRegistry()
const goroutines = 100
const pidsPerGoroutine = 100

done := make(chan struct{})

// Concurrently register PIDs
for i := range goroutines {
go func() {
defer func() { done <- struct{}{} }()
for j := 0; j < pidsPerGoroutine; j++ {
r.Register(i*pidsPerGoroutine + j)
}
}()
}

for range goroutines {
<-done
}

// All PIDs should be registered
for i := range goroutines {
for j := 0; j < pidsPerGoroutine; j++ {
assert.True(t, r.IsActive(i*pidsPerGoroutine+j))
}
}

// Concurrently unregister PIDs
for i := range goroutines {
go func() {
defer func() { done <- struct{}{} }()
for j := 0; j < pidsPerGoroutine; j++ {
r.Unregister(i*pidsPerGoroutine + j)
}
}()
}

for range goroutines {
<-done
}

// All PIDs should be unregistered
for i := range goroutines {
for j := 0; j < pidsPerGoroutine; j++ {
assert.False(t, r.IsActive(i*pidsPerGoroutine+j))
}
}
}

func TestGlobalRegistry_Output_RegistersPID(t *testing.T) {
// Use a fresh registry to avoid interference with other tests
origRegistry := Registry
Registry = NewProcessRegistry()
defer func() { Registry = origRegistry }()

ex := NewExecutor("", "echo", []string{"hello"}, []string{})

// Before execution, no PID is registered
// After execution, PID should be removed from registry
output, err := ex.Output()
assert.NoError(t, err)
assert.Contains(t, string(output), "hello")

// PID should be unregistered after Output returns
// (We can't easily check that the PID was registered *during* execution
// without a more complex test, but the ProcessRegistry unit tests cover
// the correctness of Register/Unregister.)
}

func TestGlobalRegistry_Output_FailedStart(t *testing.T) {
origRegistry := Registry
Registry = NewProcessRegistry()
defer func() { Registry = origRegistry }()

// Command that doesn't exist — Start() should fail
ex := NewExecutor("", "/nonexistent/binary", []string{}, []string{})
_, err := ex.Output()
assert.Error(t, err)

// Registry should be empty — nothing was registered since Start failed
// (This doesn't panic, which is the important part.)
}

func TestGlobalRegistry_RunAndLogLines_RegistersPID(t *testing.T) {
origRegistry := Registry
Registry = NewProcessRegistry()
defer func() { Registry = origRegistry }()

logger := log.NewLogger()
logger.SetLevel(log.LevelInfo)

ex := NewExecutor("", "echo", []string{"test-output"}, []string{}).
WithLogger(logger)

usage, err := ex.RunAndLogLines(context.Background(), map[string]string{})
assert.NoError(t, err)
assert.NotNil(t, usage)

// PID should be unregistered after RunAndLogLines returns
}

func TestGlobalRegistry_RunAndLogLines_FailedStart(t *testing.T) {
origRegistry := Registry
Registry = NewProcessRegistry()
defer func() { Registry = origRegistry }()

logger := log.NewLogger()

// Command that doesn't exist — Start() should fail
ex := NewExecutor("", "/nonexistent/binary", []string{}, []string{}).
WithLogger(logger)

_, err := ex.RunAndLogLines(context.Background(), map[string]string{})
assert.Error(t, err)

// Registry should be empty
}
Loading