diff --git a/components/egress/Dockerfile b/components/egress/Dockerfile index 94ec3d236..512043e94 100644 --- a/components/egress/Dockerfile +++ b/components/egress/Dockerfile @@ -40,6 +40,9 @@ WORKDIR /workspace/components/egress # Static-ish build (no cgo by default) to simplify runtime deps. RUN go mod download +# Pre-download internal-module deps for the supervisor build below. +RUN cd /workspace/components/internal && go mod download + # Copy the rest of the egress sources COPY components/egress ./ RUN if [ -n "${CC}" ]; then export CC; fi; \ @@ -55,6 +58,23 @@ RUN if [ -n "${CC}" ]; then export CC; fi; \ -X 'github.com/alibaba/opensandbox/internal/version.GitCommit=${GIT_COMMIT}'" \ -o /out/egress . +# Build the opensandbox-supervisor binary from the internal module. +# Installed alongside /egress so a future ENTRYPOINT switch can wrap egress +# without changing this stage again. +RUN cd /workspace/components/internal && \ + if [ -n "${CC}" ]; then export CC; fi; \ + if [ -n "${CXX}" ]; then export CXX; fi; \ + export CGO_ENABLED="${CGO_ENABLED}" \ + CGO_CFLAGS="${CGO_CFLAGS:-${CFLAGS}}" \ + CGO_CXXFLAGS="${CGO_CXXFLAGS:-${CXXFLAGS}}" \ + CGO_LDFLAGS="${CGO_LDFLAGS}"; \ + go build ${GOFLAGS} -trimpath -buildvcs=false \ + -ldflags "${LDFLAGS} -buildid= -B none \ + -X 'github.com/alibaba/opensandbox/internal/version.Version=${VERSION}' \ + -X 'github.com/alibaba/opensandbox/internal/version.BuildTime=${BUILD_TIME}' \ + -X 'github.com/alibaba/opensandbox/internal/version.GitCommit=${GIT_COMMIT}'" \ + -o /out/opensandbox-supervisor ./cmd/supervisor + FROM debian:bookworm-slim # iptables is needed for DNS REDIRECT; ca-certificates for TLS to upstream resolvers @@ -91,9 +111,29 @@ RUN useradd -r -u 10042 -d /var/lib/mitmproxy -s /usr/sbin/nologin mitmproxy \ && (command -v mitmdump && mitmdump --version) \ && mkdir -p /var/egress/mitmscripts -COPY --from=builder /out/egress /egress +# All egress runtime artifacts live under one directory to keep paths grouped. +COPY --from=builder /out/egress /opt/opensandbox-egress/egress +COPY --from=builder /out/opensandbox-supervisor /opt/opensandbox-egress/supervisor +# Pre-start hook: reap any mitmdump left over from a previous crashed +# egress so the new launch can bind the transparent-MITM listen port. +# Intentionally does NOT touch iptables/nft rules — the sidecar shares +# a network namespace with the workload, so leaving rules in place keeps +# egress filtering active across the supervisor's backoff window. +COPY components/egress/scripts/cleanup.sh /opt/opensandbox-egress/cleanup.sh +RUN chmod 0755 /opt/opensandbox-egress/cleanup.sh \ + /opt/opensandbox-egress/egress \ + /opt/opensandbox-egress/supervisor COPY components/egress/mitmscripts /var/egress/mitmscripts -# Default entrypoint; expects OPENSANDBOX_NETWORK_POLICY env at runtime. -ENTRYPOINT ["/egress"] +# Supervisor wraps the egress binary: restarts on crash with backoff and +# forwards SIGTERM gracefully. The cleanup hook runs only as pre-start; +# running it on post-exit would tear down enforcement during the backoff +# window and leave the workload unprotected. +# Expects OPENSANDBOX_NETWORK_POLICY env at runtime. +ENTRYPOINT ["/opt/opensandbox-egress/supervisor", \ + "--pre-start=/opt/opensandbox-egress/cleanup.sh", \ + "--name=egress", \ + "--grace-period=20s", \ + "--", \ + "/opt/opensandbox-egress/egress"] diff --git a/components/egress/scripts/cleanup.sh b/components/egress/scripts/cleanup.sh new file mode 100755 index 000000000..e7f4ec1e2 --- /dev/null +++ b/components/egress/scripts/cleanup.sh @@ -0,0 +1,68 @@ +#!/bin/sh +# Copyright 2026 Alibaba Group Holding Ltd. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# +# Pre-start hook for opensandbox-supervisor wrapping the egress worker. +# Reaps any mitmdump left over from a previous crashed egress so the next +# launch can bind the transparent-MITM listen port (default 18081). +# +# Scope is deliberately narrow: +# * iptables NAT rules are NOT torn down here. The egress sidecar shares +# a network namespace with the workload it protects; tearing rules +# down between crashes would leave the workload with unfiltered egress +# for the full backoff window. Egress's own SetupRedirect is additive +# and tolerates pre-existing rules (first match wins). +# * The `inet opensandbox` nft table is NOT touched here either. The +# egress nftables manager already prepends `delete table inet +# opensandbox` to its ruleset script, so ApplyStatic is idempotent. +# +# Hard contract: this script MUST NOT exit non-zero. A misbehaving cleanup +# hook is worse than a stray mitmdump; supervisor would treat the hook +# failure as a launch attempt and trip its crashloop budget faster. + +# Intentionally no `set -e`. `set -u` for typo safety on env names only. +set -u + +log() { printf '[egress-cleanup] %s\n' "$*" >&2; } + +# Wraps a command so non-zero exit is silently absorbed. Output goes to +# stderr so it shows up in container logs without polluting the event log. +try() { "$@" 2>&1 | sed 's/^/ /' >&2; return 0; } + +# ─── stray mitmdump (orphaned after hard crash) ────────────────────── +kill_stray_mitmdump() { + command -v pkill >/dev/null 2>&1 || { log "pkill not present; skipping mitmdump reap"; return 0; } + # mitmdump runs as the `mitmproxy` user (uid 10042 per egress Dockerfile). + # `-u mitmproxy` scopes pkill to that uid so we never touch anything else; + # `-f mitmdump` is the cmdline match safety net inside that uid. + # SIGTERM first; give it a moment; SIGKILL anything that ignored TERM. + try pkill -TERM -u mitmproxy -f mitmdump + # Short sleep, but bounded so this hook still finishes inside the + # supervisor's PreStartTimeout (default 30s) with plenty of headroom. + sleep 1 + try pkill -KILL -u mitmproxy -f mitmdump + log "stray mitmdump processes reaped (best-effort)" +} + +main() { + log "starting (worker_exit_code=${WORKER_EXIT_CODE:-?} signal=${WORKER_SIGNAL:-?} attempt=${WORKER_ATTEMPT:-?})" + kill_stray_mitmdump + log "done" + exit 0 +} + +# Trap unexpected interpreter errors so we still exit 0. +trap 'log "cleanup hit shell error on line $LINENO; exiting 0 anyway"; exit 0' HUP INT TERM +main "$@" || true +exit 0 diff --git a/components/internal/cmd/supervisor/main.go b/components/internal/cmd/supervisor/main.go new file mode 100644 index 000000000..495625a18 --- /dev/null +++ b/components/internal/cmd/supervisor/main.go @@ -0,0 +1,194 @@ +// Copyright 2026 Alibaba Group Holding Ltd. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +// Command opensandbox-supervisor wraps a single worker process with restart +// backoff, lifecycle hooks, and a structured event log. It is designed to +// run as a container ENTRYPOINT or as a child of another process; it does +// not assume PID 1 and performs no zombie reaping. +// +// Usage: +// +// opensandbox-supervisor [flags] -- [worker-args...] +package main + +import ( + "context" + "errors" + "flag" + "fmt" + "io" + "os" + "os/signal" + "path/filepath" + "syscall" + "time" + + "github.com/alibaba/opensandbox/internal/logger" + "github.com/alibaba/opensandbox/internal/supervisor" + "github.com/alibaba/opensandbox/internal/version" + "gopkg.in/natefinch/lumberjack.v2" +) + +// multiFlag collects a repeatable flag into a string slice. +type multiFlag []string + +func (m *multiFlag) String() string { return fmt.Sprintf("%v", *m) } +func (m *multiFlag) Set(v string) error { *m = append(*m, v); return nil } + +func main() { + version.EchoVersion("OpenSandbox Supervisor") + + var ( + preStart multiFlag + postExit multiFlag + eventLog string + backoffMin time.Duration + backoffMax time.Duration + backoffJitter float64 + stableAfter time.Duration + burstWindow time.Duration + burstMax int + onBurst bool + grace time.Duration + preTimeout time.Duration + postTimeout time.Duration + name string + logLevel string + ) + + fs := flag.NewFlagSet("opensandbox-supervisor", flag.ExitOnError) + fs.Var(&preStart, "pre-start", "Executable to run before each worker launch (repeatable). No shell expansion; wrap in a script if needed.") + fs.Var(&postExit, "post-exit", "Executable to run after each worker exit (repeatable). Receives WORKER_* env. Failures are logged, not fatal.") + fs.StringVar(&eventLog, "event-log", "", "Path to JSONL event log. Empty = stderr.") + fs.DurationVar(&backoffMin, "backoff-min", time.Second, "Minimum restart backoff.") + fs.DurationVar(&backoffMax, "backoff-max", 30*time.Second, "Maximum restart backoff (exponential capped here).") + fs.Float64Var(&backoffJitter, "backoff-jitter", 0.1, "Backoff jitter fraction (0 disables, e.g. 0.1 = ±10%). Negative clamped to 0.") + fs.DurationVar(&stableAfter, "stable-after", 60*time.Second, "Worker uptime after which backoff resets.") + fs.DurationVar(&burstWindow, "burst-window", 5*time.Minute, "Crashloop budget sliding window.") + fs.IntVar(&burstMax, "burst-max", 10, "Max launches inside burst-window before tripping the breaker.") + fs.BoolVar(&onBurst, "on-burst-exit", true, "true: supervisor exits non-zero when the burst budget trips, so a higher-level supervisor (e.g. kubelet) reacts. false: keep retrying.") + fs.DurationVar(&grace, "grace-period", 10*time.Second, "Time between SIGTERM and SIGKILL when shutting the worker down.") + fs.DurationVar(&preTimeout, "pre-start-timeout", 30*time.Second, "Timeout for each pre-start hook.") + fs.DurationVar(&postTimeout, "post-exit-timeout", 30*time.Second, "Timeout for each post-exit hook.") + fs.StringVar(&name, "name", "", "Worker name shown in logs and events (default: basename of the worker cmd).") + fs.StringVar(&logLevel, "log-level", "info", "Supervisor diagnostic log level (debug|info|warn|error).") + + args := os.Args[1:] + workerArgs := splitOnDoubleDash(&args) + if err := fs.Parse(args); err != nil { + os.Exit(2) + } + if len(workerArgs) == 0 { + fmt.Fprintln(os.Stderr, "opensandbox-supervisor: missing worker command after `--`") + fs.Usage() + os.Exit(2) + } + + log := logger.MustNew(logger.Config{Level: logLevel}).Named("supervisor") + defer log.Sync() + + eventWriter, closer, err := openEventLog(eventLog) + if err != nil { + log.Errorf("event log: %v", err) + os.Exit(2) + } + defer closer() + + spec := supervisor.Spec{ + Name: name, + Cmd: workerArgs[0], + Args: workerArgs[1:], + PreStart: toHooks(preStart), + PostExit: toHooks(postExit), + BackoffMin: backoffMin, + BackoffMax: backoffMax, + BackoffJitter: &backoffJitter, + StableAfter: stableAfter, + BurstWindow: burstWindow, + BurstMax: burstMax, + OnBurstExit: &onBurst, + GracePeriod: grace, + PreStartTimeout: preTimeout, + PostExitTimeout: postTimeout, + EventLog: eventWriter, + Logger: log, + } + if spec.Name == "" { + spec.Name = filepath.Base(spec.Cmd) + } + + ctx, cancel := signal.NotifyContext(context.Background(), syscall.SIGINT, syscall.SIGTERM) + defer cancel() + + log.Infof("supervising %q (event-log=%s)", spec.Cmd, eventLogDest(eventLog)) + err = supervisor.Run(ctx, spec) + switch { + case err == nil, errors.Is(err, context.Canceled): + os.Exit(0) + case errors.Is(err, supervisor.ErrBurstExceeded): + log.Errorf("supervisor: %v", err) + os.Exit(1) + default: + log.Errorf("supervisor: %v", err) + os.Exit(2) + } +} + +// splitOnDoubleDash takes everything after the first "--" as the worker +// argv and trims the supervisor flag slice in place. +func splitOnDoubleDash(args *[]string) []string { + for i, a := range *args { + if a == "--" { + worker := append([]string(nil), (*args)[i+1:]...) + *args = (*args)[:i] + return worker + } + } + return nil +} + +func toHooks(paths []string) []supervisor.Hook { + if len(paths) == 0 { + return nil + } + out := make([]supervisor.Hook, 0, len(paths)) + for _, p := range paths { + out = append(out, supervisor.Hook{Argv: []string{p}}) + } + return out +} + +func openEventLog(path string) (io.Writer, func(), error) { + if path == "" { + return os.Stderr, func() {}, nil + } + if err := os.MkdirAll(filepath.Dir(path), 0o755); err != nil { + return nil, nil, fmt.Errorf("mkdir %s: %w", filepath.Dir(path), err) + } + lj := &lumberjack.Logger{ + Filename: path, + MaxSize: logger.DefaultRotateMaxSize, + MaxAge: logger.DefaultRotateMaxAge, + MaxBackups: logger.DefaultRotateMaxBackups, + Compress: true, + } + return lj, func() { _ = lj.Close() }, nil +} + +func eventLogDest(path string) string { + if path == "" { + return "stderr" + } + return path +} diff --git a/components/internal/cmd/supervisor/main_test.go b/components/internal/cmd/supervisor/main_test.go new file mode 100644 index 000000000..d141a0ce7 --- /dev/null +++ b/components/internal/cmd/supervisor/main_test.go @@ -0,0 +1,130 @@ +// Copyright 2026 Alibaba Group Holding Ltd. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package main + +import ( + "os" + "path/filepath" + "reflect" + "testing" +) + +func TestSplitOnDoubleDash(t *testing.T) { + cases := []struct { + name string + in []string + wantSupArgs []string + wantWorkerArgs []string + }{ + { + name: "typical: flags then worker", + in: []string{"--flag=a", "--", "/bin/egress", "-foo"}, + wantSupArgs: []string{"--flag=a"}, + wantWorkerArgs: []string{"/bin/egress", "-foo"}, + }, + { + name: "no double-dash returns nil worker, args untouched", + in: []string{"--flag=a", "/bin/egress"}, + wantSupArgs: []string{"--flag=a", "/bin/egress"}, + wantWorkerArgs: nil, + }, + { + name: "trailing double-dash, no worker", + in: []string{"--flag=a", "--"}, + wantSupArgs: []string{"--flag=a"}, + // append(nil, emptySlice...) returns nil, not []string{}. + wantWorkerArgs: nil, + }, + { + name: "second '--' belongs to worker argv", + in: []string{"--flag=a", "--", "/bin/sh", "-c", "foo -- bar"}, + wantSupArgs: []string{"--flag=a"}, + wantWorkerArgs: []string{"/bin/sh", "-c", "foo -- bar"}, + }, + { + name: "double-dash first means no supervisor flags", + in: []string{"--", "/bin/egress"}, + wantSupArgs: []string{}, + wantWorkerArgs: []string{"/bin/egress"}, + }, + { + name: "empty input", + in: []string{}, + wantSupArgs: nil, // append(nil, []string{}...) returns nil + wantWorkerArgs: nil, + }, + } + for _, c := range cases { + t.Run(c.name, func(t *testing.T) { + args := append([]string(nil), c.in...) + gotWorker := splitOnDoubleDash(&args) + if !reflect.DeepEqual(args, c.wantSupArgs) { + t.Errorf("supervisor args = %v, want %v", args, c.wantSupArgs) + } + if !reflect.DeepEqual(gotWorker, c.wantWorkerArgs) { + t.Errorf("worker args = %v, want %v", gotWorker, c.wantWorkerArgs) + } + }) + } +} + +func TestToHooks(t *testing.T) { + if got := toHooks(nil); got != nil { + t.Errorf("nil input: got %v, want nil", got) + } + if got := toHooks([]string{}); got != nil { + t.Errorf("empty input: got %v, want nil", got) + } + got := toHooks([]string{"/a/b.sh", "/c/d.sh"}) + if len(got) != 2 || got[0].Argv[0] != "/a/b.sh" || got[1].Argv[0] != "/c/d.sh" { + t.Errorf("unexpected hooks: %+v", got) + } +} + +func TestOpenEventLog_StderrWhenEmpty(t *testing.T) { + w, closer, err := openEventLog("") + if err != nil { + t.Fatal(err) + } + defer closer() + if w != os.Stderr { + t.Errorf("empty path should return os.Stderr, got %T", w) + } +} + +func TestOpenEventLog_CreatesParentDir(t *testing.T) { + dir := t.TempDir() + target := filepath.Join(dir, "nested", "deeper", "events.jsonl") + w, closer, err := openEventLog(target) + if err != nil { + t.Fatal(err) + } + defer closer() + if _, statErr := os.Stat(filepath.Dir(target)); statErr != nil { + t.Errorf("parent dir not created: %v", statErr) + } + if w == nil { + t.Error("writer is nil") + } +} + +func TestEventLogDest(t *testing.T) { + if eventLogDest("") != "stderr" { + t.Error("empty path label") + } + if eventLogDest("/var/log/x.jsonl") != "/var/log/x.jsonl" { + t.Error("path label") + } +} diff --git a/components/internal/supervisor/backoff.go b/components/internal/supervisor/backoff.go new file mode 100644 index 000000000..948812d8e --- /dev/null +++ b/components/internal/supervisor/backoff.go @@ -0,0 +1,62 @@ +// Copyright 2026 Alibaba Group Holding Ltd. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package supervisor + +import ( + "math/rand/v2" + "time" +) + +// nextBackoff returns the next sleep duration given the previous one. +// +// prev <= 0 returns min. Otherwise the previous value is doubled, clamped to +// [min, max], and perturbed by ±jitter*value. The result is clamped a second +// time so jitter cannot exceed max or go below 1ns. +func nextBackoff(prev, min, max time.Duration, jitter float64, rng func() float64) time.Duration { + if prev <= 0 { + return clampJitter(min, min, max, jitter, rng) + } + d := prev * 2 + if d < min { + d = min + } + if d > max { + d = max + } + return clampJitter(d, min, max, jitter, rng) +} + +func clampJitter(d, min, max time.Duration, jitter float64, rng func() float64) time.Duration { + if jitter <= 0 { + return d + } + span := float64(d) * jitter + // rng returns [0,1); shift to [-1,1). + delta := time.Duration((rng()*2 - 1) * span) + out := d + delta + if out < min { + out = min + } + if out > max { + out = max + } + if out < time.Nanosecond { + out = time.Nanosecond + } + return out +} + +// defaultRNG wraps math/rand/v2 for production use. +func defaultRNG() float64 { return rand.Float64() } diff --git a/components/internal/supervisor/backoff_test.go b/components/internal/supervisor/backoff_test.go new file mode 100644 index 000000000..8d8044e99 --- /dev/null +++ b/components/internal/supervisor/backoff_test.go @@ -0,0 +1,128 @@ +// Copyright 2026 Alibaba Group Holding Ltd. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package supervisor + +import ( + "testing" + "time" +) + +// fixedRNG returns a constant value to make jitter deterministic. +func fixedRNG(v float64) func() float64 { return func() float64 { return v } } + +func TestNextBackoff_NoJitterDoublesAndClamps(t *testing.T) { + min := 1 * time.Second + max := 8 * time.Second + noJitter := fixedRNG(0.5) // would be zero delta anyway with jitter=0 + + cases := []struct { + prev time.Duration + want time.Duration + }{ + {0, min}, // initial + {-1 * time.Second, min}, // negative + {500 * time.Millisecond, min}, // below min after doubling -> clamp up + {1 * time.Second, 2 * time.Second}, + {2 * time.Second, 4 * time.Second}, + {4 * time.Second, 8 * time.Second}, + {8 * time.Second, max}, // would be 16s -> clamp to max + {100 * time.Second, max}, + } + for _, c := range cases { + got := nextBackoff(c.prev, min, max, 0, noJitter) + if got != c.want { + t.Errorf("nextBackoff(prev=%s) = %s, want %s", c.prev, got, c.want) + } + } +} + +func TestNextBackoff_JitterWithinBounds(t *testing.T) { + min := 1 * time.Second + max := 10 * time.Second + jitter := 0.5 + + // rng=0 -> delta = -1 * 0.5 * d = -50% + // rng=0.5 -> delta = 0 + // rng=1- -> delta ≈ +50% + // Approximate by checking the two extremes plus the midpoint. + for _, v := range []float64{0.0, 0.5, 0.9999} { + got := nextBackoff(2*time.Second, min, max, jitter, fixedRNG(v)) + // Base after doubling = 4s. jitter range ±2s. So [2s, 6s]. + if got < 2*time.Second || got > 6*time.Second { + t.Errorf("rng=%v: got %s, want in [2s, 6s]", v, got) + } + } +} + +func TestNextBackoff_JitterClampsToMax(t *testing.T) { + min := 1 * time.Second + max := 5 * time.Second + // Base = max after doubling. Positive jitter must not push above max. + got := nextBackoff(max, min, max, 0.5, fixedRNG(0.9999)) + if got > max { + t.Errorf("got %s > max %s", got, max) + } +} + +func TestNextBackoff_JitterClampsToMin(t *testing.T) { + min := 2 * time.Second + max := 10 * time.Second + // Base = min. Negative jitter must not push below min. + got := nextBackoff(0, min, max, 0.5, fixedRNG(0.0)) + if got < min { + t.Errorf("got %s < min %s", got, min) + } +} + +// Spec.BackoffJitter is *float64 specifically so callers can pass &zero to +// disable jitter; verify the underlying nextBackoff respects jitter=0. +func TestNextBackoff_JitterZeroDisablesJitter(t *testing.T) { + min := 1 * time.Second + max := 16 * time.Second + // With jitter=0, output must be the exact doubled value regardless of rng. + for _, rng := range []float64{0.0, 0.5, 0.9999} { + got := nextBackoff(2*time.Second, min, max, 0, fixedRNG(rng)) + if got != 4*time.Second { + t.Errorf("rng=%v: got %s, want exactly 4s", rng, got) + } + } +} + +func TestApplyDefaults_BackoffJitter(t *testing.T) { + cases := []struct { + name string + in *float64 + want float64 + }{ + {"nil applies default", nil, defaultBackoffJitter}, + {"zero stays zero", floatPtr(0), 0}, + {"explicit value preserved", floatPtr(0.25), 0.25}, + {"negative clamped to zero", floatPtr(-0.5), 0}, + } + for _, c := range cases { + t.Run(c.name, func(t *testing.T) { + s := Spec{Cmd: "/bin/true", BackoffJitter: c.in} + s.applyDefaults() + if s.BackoffJitter == nil { + t.Fatal("BackoffJitter still nil after applyDefaults") + } + if *s.BackoffJitter != c.want { + t.Errorf("BackoffJitter = %v, want %v", *s.BackoffJitter, c.want) + } + }) + } +} + +func floatPtr(v float64) *float64 { return &v } diff --git a/components/internal/supervisor/burst.go b/components/internal/supervisor/burst.go new file mode 100644 index 000000000..532a613a7 --- /dev/null +++ b/components/internal/supervisor/burst.go @@ -0,0 +1,73 @@ +// Copyright 2026 Alibaba Group Holding Ltd. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package supervisor + +import ( + "time" +) + +// burstTracker counts launches within a sliding window. The ring is sized +// to BurstMax so we discard older entries automatically; exceeded() asks +// "are the BurstMax most-recent launches all within BurstWindow?". +type burstTracker struct { + max int + window time.Duration + now func() time.Time + ring []time.Time + idx int + filled int +} + +func newBurstTracker(max int, window time.Duration, now func() time.Time) *burstTracker { + if max < 1 { + max = 1 + } + return &burstTracker{ + max: max, + window: window, + now: now, + ring: make([]time.Time, max), + } +} + +func (b *burstTracker) record() { + b.ring[b.idx] = b.now() + b.idx = (b.idx + 1) % b.max + if b.filled < b.max { + b.filled++ + } +} + +// exceeded reports whether the oldest of the last BurstMax launches falls +// inside BurstWindow. With BurstMax=10 and window=5m, this triggers once 10 +// launches have all occurred within a 5-minute span. +func (b *burstTracker) exceeded() bool { + if b.filled < b.max { + return false + } + oldest := b.ring[b.idx] // next slot to overwrite = oldest entry + return b.now().Sub(oldest) <= b.window +} + +func (b *burstTracker) count() int { + cutoff := b.now().Add(-b.window) + n := 0 + for i := 0; i < b.filled; i++ { + if !b.ring[i].Before(cutoff) { + n++ + } + } + return n +} diff --git a/components/internal/supervisor/events.go b/components/internal/supervisor/events.go new file mode 100644 index 000000000..9d3762f44 --- /dev/null +++ b/components/internal/supervisor/events.go @@ -0,0 +1,97 @@ +// Copyright 2026 Alibaba Group Holding Ltd. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package supervisor + +import ( + "encoding/json" + "io" + "sync" + "time" +) + +// Event kinds. Stable string values; downstream log pipelines may filter on them. +const ( + EventStart = "start" + EventExit = "exit" + EventPreStart = "prestart" + EventPostExit = "postexit" + EventBackoff = "backoff" + EventStable = "stable" + EventBurstExit = "burst_exit" + EventShutdown = "shutdown" +) + +// Event is one structured record in the supervisor's event log. Only set +// fields are emitted (omitempty everywhere) so different kinds share one type. +type Event struct { + TS time.Time `json:"ts"` + Name string `json:"name,omitempty"` + Event string `json:"event"` + PID int `json:"pid,omitempty"` + Gen uint64 `json:"gen,omitempty"` + Attempt int `json:"attempt,omitempty"` + ExitCode *int `json:"exit_code,omitempty"` + Signal string `json:"signal,omitempty"` + DurationMS int64 `json:"duration_ms,omitempty"` + Reason string `json:"reason,omitempty"` + SleepMS int64 `json:"sleep_ms,omitempty"` + NextAttempt int `json:"next_attempt,omitempty"` + Hook string `json:"hook,omitempty"` + Attempts int `json:"attempts,omitempty"` + Window string `json:"window,omitempty"` + Error string `json:"error,omitempty"` + ResetBackoff bool `json:"reset_backoff,omitempty"` +} + +// eventWriter serializes Event writes through a mutex; concurrent writers +// will not interleave bytes mid-line. +type eventWriter struct { + mu sync.Mutex + w io.Writer + name string + now func() time.Time +} + +func newEventWriter(w io.Writer, name string, now func() time.Time) *eventWriter { + return &eventWriter{w: w, name: name, now: now} +} + +// emit fills TS/Name and writes the event followed by a newline. Errors are +// returned to the caller so the supervisor can surface them; callers may +// choose to ignore (event logging must not abort the main loop). +func (ew *eventWriter) emit(e Event) error { + if ew == nil || ew.w == nil { + return nil + } + if e.TS.IsZero() { + e.TS = ew.now() + } + if e.Name == "" { + e.Name = ew.name + } + buf, err := json.Marshal(e) + if err != nil { + return err + } + buf = append(buf, '\n') + ew.mu.Lock() + defer ew.mu.Unlock() + _, err = ew.w.Write(buf) + return err +} + +// intPtr is a small helper for Event.ExitCode (which is *int so 0 vs unset +// is distinguishable in JSON output). +func intPtr(v int) *int { return &v } diff --git a/components/internal/supervisor/events_test.go b/components/internal/supervisor/events_test.go new file mode 100644 index 000000000..c286ab245 --- /dev/null +++ b/components/internal/supervisor/events_test.go @@ -0,0 +1,103 @@ +// Copyright 2026 Alibaba Group Holding Ltd. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package supervisor + +import ( + "bufio" + "bytes" + "encoding/json" + "strings" + "sync" + "testing" + "time" +) + +func TestEventWriter_Roundtrip(t *testing.T) { + var buf bytes.Buffer + fixed := time.Date(2026, 5, 28, 10, 0, 0, 0, time.UTC) + ew := newEventWriter(&buf, "egress", func() time.Time { return fixed }) + + if err := ew.emit(Event{Event: EventStart, PID: 1234, Attempt: 1, Gen: 1}); err != nil { + t.Fatal(err) + } + if err := ew.emit(Event{Event: EventExit, PID: 1234, Gen: 1, ExitCode: intPtr(0), DurationMS: 1500}); err != nil { + t.Fatal(err) + } + + sc := bufio.NewScanner(strings.NewReader(buf.String())) + + if !sc.Scan() { + t.Fatal("expected first line") + } + var e1 Event + if err := json.Unmarshal(sc.Bytes(), &e1); err != nil { + t.Fatal(err) + } + if e1.Event != EventStart || e1.PID != 1234 || e1.Name != "egress" || !e1.TS.Equal(fixed) { + t.Fatalf("e1 unexpected: %+v", e1) + } + + if !sc.Scan() { + t.Fatal("expected second line") + } + var e2 Event + if err := json.Unmarshal(sc.Bytes(), &e2); err != nil { + t.Fatal(err) + } + if e2.Event != EventExit || e2.ExitCode == nil || *e2.ExitCode != 0 || e2.DurationMS != 1500 { + t.Fatalf("e2 unexpected: %+v", e2) + } +} + +func TestEventWriter_ConcurrentWritesDoNotInterleave(t *testing.T) { + var buf bytes.Buffer + ew := newEventWriter(&buf, "n", time.Now) + + const goroutines = 16 + const perGoroutine = 64 + var wg sync.WaitGroup + for i := 0; i < goroutines; i++ { + wg.Add(1) + go func() { + defer wg.Done() + for j := 0; j < perGoroutine; j++ { + if err := ew.emit(Event{Event: EventBackoff, SleepMS: 1}); err != nil { + t.Errorf("emit: %v", err) + } + } + }() + } + wg.Wait() + + sc := bufio.NewScanner(strings.NewReader(buf.String())) + count := 0 + for sc.Scan() { + var e Event + if err := json.Unmarshal(sc.Bytes(), &e); err != nil { + t.Fatalf("line %d not valid JSON: %v\nline=%q", count, err, sc.Text()) + } + count++ + } + if count != goroutines*perGoroutine { + t.Fatalf("got %d events, want %d", count, goroutines*perGoroutine) + } +} + +func TestEventWriter_NilSafe(t *testing.T) { + var ew *eventWriter + if err := ew.emit(Event{Event: EventStart}); err != nil { + t.Fatal(err) + } +} diff --git a/components/internal/supervisor/pgid_other.go b/components/internal/supervisor/pgid_other.go new file mode 100644 index 000000000..081c90a3f --- /dev/null +++ b/components/internal/supervisor/pgid_other.go @@ -0,0 +1,21 @@ +// Copyright 2026 Alibaba Group Holding Ltd. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +//go:build !unix + +package supervisor + +import "os/exec" + +func applyChildPgid(cmd *exec.Cmd) {} diff --git a/components/internal/supervisor/pgid_unix.go b/components/internal/supervisor/pgid_unix.go new file mode 100644 index 000000000..1317a100c --- /dev/null +++ b/components/internal/supervisor/pgid_unix.go @@ -0,0 +1,33 @@ +// Copyright 2026 Alibaba Group Holding Ltd. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +//go:build unix + +package supervisor + +import ( + "os/exec" + "syscall" +) + +// applyChildPgid puts the worker in its own process group so that signals +// sent to the supervisor's pgid (e.g. by a parent process tree) do not +// accidentally reach the worker. The supervisor sends explicit signals to +// the worker pid when it intends to. +func applyChildPgid(cmd *exec.Cmd) { + if cmd.SysProcAttr == nil { + cmd.SysProcAttr = &syscall.SysProcAttr{} + } + cmd.SysProcAttr.Setpgid = true +} diff --git a/components/internal/supervisor/spec.go b/components/internal/supervisor/spec.go new file mode 100644 index 000000000..c51944f12 --- /dev/null +++ b/components/internal/supervisor/spec.go @@ -0,0 +1,218 @@ +// Copyright 2026 Alibaba Group Holding Ltd. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +// Package supervisor runs a single child process under a restart loop with +// exponential backoff, pre-start / post-exit hooks, a crashloop circuit +// breaker, and a structured event log. It is intentionally scoped to one +// worker per supervisor; multi-process supervision is delegated to higher +// layers (e.g. Kubernetes pods). +// +// Signal handling. Callers control the supervisor's lifecycle via the +// context passed to Run: cancelling ctx triggers a SIGTERM to the worker +// followed by SIGKILL after GracePeriod. This package does NOT install any +// signal.Notify itself; the caller (e.g. cmd/supervisor) is responsible for +// translating OS signals into context cancellation. As a result: +// +// - SIGINT and SIGTERM, when wired to ctx by the caller, both result in +// SIGTERM being sent to the worker (the supervisor does not preserve +// which signal triggered shutdown). +// - SIGHUP, SIGUSR1, SIGUSR2, SIGWINCH, SIGQUIT, and similar +// application-level signals are NOT forwarded to the worker. If the +// worker needs them (e.g. config reload, log rotate, tty resize), the +// caller must add forwarding around Run. +// +// Process group. The worker is started with Setpgid=true on Unix so that +// signals delivered to the supervisor's process group do not reach the +// worker by side channel. The supervisor signals the worker explicitly via +// its PID. +// +// PID 1 / reaping. The supervisor does not call PR_SET_CHILD_SUBREAPER and +// does not reap arbitrary children, only the worker it launched. If the +// worker spawns its own descendants and is killed, those descendants are +// reparented per usual kernel rules. Run this supervisor as PID 1 only when +// the worker itself does not orphan grandchildren. +package supervisor + +import ( + "io" + "os" + "path/filepath" + "time" + + "github.com/alibaba/opensandbox/internal/logger" +) + +// Hook describes an auxiliary process invoked around the worker lifecycle. +// Argv[0] is the executable; remaining entries are arguments. Hooks are +// invoked directly (no shell); wrap in a shell script if expansion is needed. +type Hook struct { + Argv []string +} + +// Spec configures a supervisor run. +type Spec struct { + // Name identifies the supervised worker in logs and events. Defaults to + // basename(Cmd). + Name string + + // Cmd is the worker executable path. Required. + Cmd string + Args []string + Env []string // defaults to os.Environ() + Dir string // working directory; empty = inherit + + // PreStart hooks run before each worker launch. A non-zero exit aborts + // the launch and counts toward the crashloop budget. + PreStart []Hook + // PostExit hooks run after the worker has been reaped. Failures are + // logged but do not block the restart loop. + PostExit []Hook + PreStartTimeout time.Duration // default 30s + PostExitTimeout time.Duration // default 30s + + // Backoff controls inter-restart sleep. Sleep grows exponentially from + // BackoffMin to BackoffMax with ±*BackoffJitter*prev jitter. After the + // worker has been alive at least StableAfter, the backoff resets. + BackoffMin time.Duration // default 1s + BackoffMax time.Duration // default 30s + // BackoffJitter is a *float64 so callers can distinguish "unset" + // (defaults to 0.1) from "explicitly disabled" (pass &zero). Negative + // values are clamped to 0. + BackoffJitter *float64 + StableAfter time.Duration // default 60s + + // Crashloop circuit breaker. If more than BurstMax launches occur + // within BurstWindow, the supervisor either returns (OnBurstExit=true, + // default) so the surrounding runtime can react, or continues looping. + BurstWindow time.Duration // default 5m + BurstMax int // default 10 + // OnBurstExit selects burst behavior. Default true. + // A *bool lets callers override the non-zero default; nil means default. + OnBurstExit *bool + + // GracePeriod is how long SIGTERM is given to the worker on shutdown + // before SIGKILL. Default 10s. + GracePeriod time.Duration + + // EventLog receives one JSON object per line. nil => os.Stderr. + EventLog io.Writer + + // WorkerStdout / WorkerStderr forward the worker's standard streams. + // nil defaults to the supervisor's own streams. + WorkerStdout io.Writer + WorkerStderr io.Writer + + // Logger receives free-form supervisor diagnostics. nil => a no-op logger. + Logger logger.Logger + + // Clock is injected for tests; nil => real clock. + Clock Clock +} + +// Clock abstracts time for tests. Implementations must be goroutine-safe. +type Clock interface { + Now() time.Time + // After is identical to time.After. + After(d time.Duration) <-chan time.Time +} + +type realClock struct{} + +func (realClock) Now() time.Time { return time.Now() } +func (realClock) After(d time.Duration) <-chan time.Time { return time.After(d) } + +// Defaults applied to zero-valued fields. +const ( + defaultBackoffMin = time.Second + defaultBackoffMax = 30 * time.Second + defaultBackoffJitter = 0.1 + defaultStableAfter = 60 * time.Second + defaultBurstWindow = 5 * time.Minute + defaultBurstMax = 10 + defaultGracePeriod = 10 * time.Second + defaultPreStartTimeout = 30 * time.Second + defaultPostExitTimeout = 30 * time.Second +) + +func (s *Spec) applyDefaults() { + if s.Name == "" && s.Cmd != "" { + s.Name = filepath.Base(s.Cmd) + } + if s.Env == nil { + s.Env = os.Environ() + } + if s.BackoffMin <= 0 { + s.BackoffMin = defaultBackoffMin + } + if s.BackoffMax <= 0 { + s.BackoffMax = defaultBackoffMax + } + if s.BackoffMax < s.BackoffMin { + s.BackoffMax = s.BackoffMin + } + if s.BackoffJitter == nil { + v := defaultBackoffJitter + s.BackoffJitter = &v + } else if *s.BackoffJitter < 0 { + v := 0.0 + s.BackoffJitter = &v + } + if s.StableAfter <= 0 { + s.StableAfter = defaultStableAfter + } + if s.BurstWindow <= 0 { + s.BurstWindow = defaultBurstWindow + } + if s.BurstMax <= 0 { + s.BurstMax = defaultBurstMax + } + if s.OnBurstExit == nil { + v := true + s.OnBurstExit = &v + } + if s.GracePeriod <= 0 { + s.GracePeriod = defaultGracePeriod + } + if s.PreStartTimeout <= 0 { + s.PreStartTimeout = defaultPreStartTimeout + } + if s.PostExitTimeout <= 0 { + s.PostExitTimeout = defaultPostExitTimeout + } + if s.EventLog == nil { + s.EventLog = os.Stderr + } + if s.WorkerStdout == nil { + s.WorkerStdout = os.Stdout + } + if s.WorkerStderr == nil { + s.WorkerStderr = os.Stderr + } + if s.Logger == nil { + s.Logger = noopLogger{} + } + if s.Clock == nil { + s.Clock = realClock{} + } +} + +type noopLogger struct{} + +func (noopLogger) Debugf(string, ...any) {} +func (noopLogger) Infof(string, ...any) {} +func (noopLogger) Warnf(string, ...any) {} +func (noopLogger) Errorf(string, ...any) {} +func (noopLogger) With(...logger.Field) logger.Logger { return noopLogger{} } +func (noopLogger) Named(string) logger.Logger { return noopLogger{} } +func (noopLogger) Sync() error { return nil } diff --git a/components/internal/supervisor/stop.go b/components/internal/supervisor/stop.go new file mode 100644 index 000000000..25ec9cf49 --- /dev/null +++ b/components/internal/supervisor/stop.go @@ -0,0 +1,41 @@ +// Copyright 2026 Alibaba Group Holding Ltd. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package supervisor + +import ( + "os" + "syscall" + "time" +) + +// gracefulStop sends SIGTERM, then either waits for the worker to exit +// (signalled via done) or escalates to SIGKILL after grace. cmd.Wait in the +// caller is what actually reaps the process; this only signals. +// +// Wall-clock time is used deliberately (rather than the injected Clock): +// the worker is a real OS process, so the grace timeout must elapse in +// real time regardless of how tests fast-forward Spec.Clock. +func gracefulStop(p *os.Process, grace time.Duration, done <-chan struct{}) { + if p == nil { + return + } + _ = p.Signal(syscall.SIGTERM) + select { + case <-time.After(grace): + _ = p.Kill() + case <-done: + // Worker exited within grace; no kill needed. + } +} diff --git a/components/internal/supervisor/supervisor.go b/components/internal/supervisor/supervisor.go new file mode 100644 index 000000000..325eaa573 --- /dev/null +++ b/components/internal/supervisor/supervisor.go @@ -0,0 +1,280 @@ +// Copyright 2026 Alibaba Group Holding Ltd. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package supervisor + +import ( + "context" + "errors" + "fmt" + "os/exec" + "strconv" + "sync/atomic" + "syscall" + "time" +) + +// ErrBurstExceeded is returned by Run when the crashloop budget is exhausted +// and Spec.OnBurstExit is true. +var ErrBurstExceeded = errors.New("supervisor: crashloop budget exceeded") + +// Run supervises the worker described by spec until ctx is cancelled or the +// crashloop budget is exhausted. It returns ctx.Err() on graceful shutdown, +// ErrBurstExceeded on burst exit, or a setup error if Spec is invalid. +func Run(ctx context.Context, spec Spec) error { + if spec.Cmd == "" { + return errors.New("supervisor: Spec.Cmd is required") + } + spec.applyDefaults() + + ew := newEventWriter(spec.EventLog, spec.Name, spec.Clock.Now) + starts := newBurstTracker(spec.BurstMax, spec.BurstWindow, spec.Clock.Now) + + var ( + gen uint64 + attempt int + backoff time.Duration + ) + + shutdown := func() error { + _ = ew.emit(Event{Event: EventShutdown, Reason: "ctx cancelled"}) + return ctx.Err() + } + + for { + if ctx.Err() != nil { + return shutdown() + } + + attempt++ + curGen := atomic.AddUint64(&gen, 1) + + // Pre-start hooks. Failure aborts the launch, counts toward burst. + if hookErr := runHooks(ctx, spec.PreStart, spec.PreStartTimeout, spec.Env, ew, EventPreStart); hookErr != nil { + spec.Logger.Warnf("supervisor: pre-start hook failed: %v", hookErr) + starts.record() + if exitOnBurst(starts, spec, ew, attempt) { + return ErrBurstExceeded + } + backoff = sleepBackoff(ctx, spec, ew, backoff, attempt+1) + if backoff < 0 { + return shutdown() + } + continue + } + + // Launch worker. Worker run-duration is measured in wall-clock time + // because the child is a real process; fake clocks (used in tests + // for backoff control) would otherwise report 0 and never trip the + // StableAfter threshold. + starts.record() + runStart := time.Now() + cmd := exec.Command(spec.Cmd, spec.Args...) + cmd.Env = spec.Env + cmd.Dir = spec.Dir + cmd.Stdout = spec.WorkerStdout + cmd.Stderr = spec.WorkerStderr + applyChildPgid(cmd) + + if err := cmd.Start(); err != nil { + spec.Logger.Errorf("supervisor: launch failed: %v", err) + _ = ew.emit(Event{ + Event: EventExit, + Gen: curGen, + Attempt: attempt, + Reason: "launch_failed", + Error: err.Error(), + }) + if exitOnBurst(starts, spec, ew, attempt) { + return ErrBurstExceeded + } + backoff = sleepBackoff(ctx, spec, ew, backoff, attempt+1) + if backoff < 0 { + return shutdown() + } + continue + } + + pid := cmd.Process.Pid + spec.Logger.Infof("supervisor: worker started (pid=%d, gen=%d, attempt=%d)", pid, curGen, attempt) + _ = ew.emit(Event{Event: EventStart, PID: pid, Gen: curGen, Attempt: attempt}) + + // Graceful-shutdown goroutine: on ctx cancel, send SIGTERM then SIGKILL + // unless the worker exits within GracePeriod on its own. + stopped := make(chan struct{}) + go func() { + select { + case <-ctx.Done(): + gracefulStop(cmd.Process, spec.GracePeriod, stopped) + case <-stopped: + } + }() + + waitErr := cmd.Wait() + close(stopped) + runDur := time.Since(runStart) + + exitCode, sigName, reason := classifyExit(cmd, waitErr, ctx.Err() != nil) + spec.Logger.Infof("supervisor: worker exited (pid=%d, gen=%d, dur=%s, code=%d, signal=%s, reason=%s)", + pid, curGen, runDur, exitCode, sigName, reason) + _ = ew.emit(Event{ + Event: EventExit, + PID: pid, + Gen: curGen, + Attempt: attempt, + ExitCode: intPtr(exitCode), + Signal: sigName, + DurationMS: runDur.Milliseconds(), + Reason: reason, + }) + + // Stable: reset backoff if the worker stayed alive long enough. + if runDur >= spec.StableAfter { + if backoff > 0 { + _ = ew.emit(Event{ + Event: EventStable, PID: pid, Gen: curGen, ResetBackoff: true, + DurationMS: runDur.Milliseconds(), + }) + } + backoff = 0 + } + + // Post-exit hooks. Receive context env. Errors are logged, not fatal. + // Build hookEnv into a fresh slice so we never mutate spec.Env's + // underlying array (which `append(spec.Env, ...)` may do when + // cap > len). + hookEnv := make([]string, 0, len(spec.Env)+5) + hookEnv = append(hookEnv, spec.Env...) + hookEnv = append(hookEnv, + "WORKER_EXIT_CODE="+strconv.Itoa(exitCode), + "WORKER_SIGNAL="+sigName, + "WORKER_DURATION_MS="+strconv.FormatInt(runDur.Milliseconds(), 10), + "WORKER_PID="+strconv.Itoa(pid), + "WORKER_ATTEMPT="+strconv.Itoa(attempt), + ) + // Post-exit hooks must run to completion even during shutdown so + // cleanup (iptables / nft / temp files) is not aborted. Use a + // detached ctx bounded by PostExitTimeout instead of ctx. + if hookErr := runHooks(context.Background(), spec.PostExit, spec.PostExitTimeout, hookEnv, ew, EventPostExit); hookErr != nil { + spec.Logger.Warnf("supervisor: post-exit hook failed: %v", hookErr) + } + + if ctx.Err() != nil { + return shutdown() + } + + if exitOnBurst(starts, spec, ew, attempt) { + return ErrBurstExceeded + } + + backoff = sleepBackoff(ctx, spec, ew, backoff, attempt+1) + if backoff < 0 { + return shutdown() + } + } +} + +// sleepBackoff computes the next backoff, emits a backoff event, and sleeps. +// Returns the slept duration, or -1 if ctx was cancelled mid-sleep. +func sleepBackoff(ctx context.Context, spec Spec, ew *eventWriter, prev time.Duration, nextAttempt int) time.Duration { + d := nextBackoff(prev, spec.BackoffMin, spec.BackoffMax, *spec.BackoffJitter, defaultRNG) + _ = ew.emit(Event{Event: EventBackoff, SleepMS: d.Milliseconds(), NextAttempt: nextAttempt}) + select { + case <-ctx.Done(): + return -1 + case <-spec.Clock.After(d): + } + return d +} + +// exitOnBurst checks the burst tracker. Returns true if Run should bail out. +func exitOnBurst(b *burstTracker, spec Spec, ew *eventWriter, attempt int) bool { + if !b.exceeded() { + return false + } + _ = ew.emit(Event{ + Event: EventBurstExit, + Attempts: b.count(), + Window: spec.BurstWindow.String(), + Attempt: attempt, + Reason: "crashloop budget exceeded", + }) + return *spec.OnBurstExit +} + +// classifyExit extracts the worker's exit code and (if killed) the signal +// name, plus a coarse reason string for the event log. +func classifyExit(cmd *exec.Cmd, waitErr error, ctxCancelled bool) (exitCode int, sigName, reason string) { + if cmd.ProcessState == nil { + return -1, "", "no_processstate" + } + exitCode = cmd.ProcessState.ExitCode() + if ws, ok := cmd.ProcessState.Sys().(syscall.WaitStatus); ok && ws.Signaled() { + sigName = ws.Signal().String() + } + switch { + case ctxCancelled: + reason = "shutdown" + case waitErr == nil: + reason = "exited" + case sigName != "": + reason = "signaled" + default: + reason = "crashed" + } + return +} + +// runHooks invokes each hook sequentially. The first non-zero exit (or +// timeout / launch error) is recorded and returned; subsequent hooks still +// run so cleanup paths complete. +func runHooks(ctx context.Context, hooks []Hook, timeout time.Duration, env []string, ew *eventWriter, kind string) error { + var firstErr error + for _, h := range hooks { + if len(h.Argv) == 0 { + continue + } + hctx, cancel := context.WithTimeout(ctx, timeout) + cmd := exec.CommandContext(hctx, h.Argv[0], h.Argv[1:]...) + cmd.Env = env + start := time.Now() + err := cmd.Run() + dur := time.Since(start) + cancel() + + ev := Event{ + Event: kind, + Hook: h.Argv[0], + DurationMS: dur.Milliseconds(), + } + exitCode := 0 + if err != nil { + ev.Error = err.Error() + if cmd.ProcessState != nil { + exitCode = cmd.ProcessState.ExitCode() + } else { + exitCode = -1 + } + } else if cmd.ProcessState != nil { + exitCode = cmd.ProcessState.ExitCode() + } + ev.ExitCode = intPtr(exitCode) + _ = ew.emit(ev) + + if err != nil && firstErr == nil { + firstErr = fmt.Errorf("%s hook %q: %w", kind, h.Argv[0], err) + } + } + return firstErr +} diff --git a/components/internal/supervisor/supervisor_test.go b/components/internal/supervisor/supervisor_test.go new file mode 100644 index 000000000..0117c6665 --- /dev/null +++ b/components/internal/supervisor/supervisor_test.go @@ -0,0 +1,391 @@ +// Copyright 2026 Alibaba Group Holding Ltd. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package supervisor + +import ( + "bufio" + "bytes" + "context" + "encoding/json" + "errors" + "fmt" + "os" + "os/signal" + "strconv" + "strings" + "sync" + "syscall" + "testing" + "time" +) + +// The test binary re-execs itself to act as a fake child process. The mode +// is selected by env so callers do not need a separate helper binary on disk. +const childModeEnv = "SUPERVISOR_TEST_CHILD" + +func TestMain(m *testing.M) { + switch os.Getenv(childModeEnv) { + case "": + os.Exit(m.Run()) + case "exit0": + os.Exit(0) + case "exit1": + os.Exit(1) + case "crash-after-100ms": + time.Sleep(100 * time.Millisecond) + os.Exit(2) + case "sleep-then-exit0": + dur, _ := time.ParseDuration(os.Getenv("CHILD_SLEEP")) + time.Sleep(dur) + os.Exit(0) + case "hang-until-sigterm": + c := make(chan os.Signal, 1) + signal.Notify(c, syscall.SIGTERM) + <-c + os.Exit(0) + case "hang-ignore-sigterm": + signal.Ignore(syscall.SIGTERM) + time.Sleep(time.Hour) + os.Exit(99) + case "write-stdout": + fmt.Println("hello from child") + os.Exit(0) + default: + os.Exit(99) + } +} + +// childCmd builds the args needed to re-invoke the test binary in a given +// child mode. The supervisor sees this as a normal external command. +func childCmd(mode string, extraEnv ...string) (cmd string, args []string, env []string) { + env = append(os.Environ(), childModeEnv+"="+mode) + env = append(env, extraEnv...) + return os.Args[0], []string{"-test.run=TestMain"}, env +} + +// fakeClock implements Clock with controllable time advancement. After is +// implemented as a real-time short sleep so we don't have to build a full +// scheduler; tests use sub-second backoffs. +type fakeClock struct { + mu sync.Mutex + t time.Time +} + +func newFakeClock() *fakeClock { + return &fakeClock{t: time.Unix(0, 0)} +} + +func (c *fakeClock) Now() time.Time { + c.mu.Lock() + defer c.mu.Unlock() + return c.t +} + +func (c *fakeClock) After(d time.Duration) <-chan time.Time { + c.mu.Lock() + c.t = c.t.Add(d) + c.mu.Unlock() + ch := make(chan time.Time, 1) + ch <- time.Now() + return ch +} + +func parseEvents(t *testing.T, buf *bytes.Buffer) []Event { + t.Helper() + var out []Event + sc := bufio.NewScanner(strings.NewReader(buf.String())) + for sc.Scan() { + if len(sc.Bytes()) == 0 { + continue + } + var e Event + if err := json.Unmarshal(sc.Bytes(), &e); err != nil { + t.Fatalf("bad event JSON: %v\nline=%q", err, sc.Text()) + } + out = append(out, e) + } + return out +} + +func filterEvents(events []Event, kind string) []Event { + var out []Event + for _, e := range events { + if e.Event == kind { + out = append(out, e) + } + } + return out +} + +// baseSpec returns a Spec with short timeouts suitable for tests. +func baseSpec(buf *bytes.Buffer, clk Clock) Spec { + false_ := false + jitter := 0.01 + return Spec{ + Name: "test-worker", + BackoffMin: 10 * time.Millisecond, + BackoffMax: 20 * time.Millisecond, + BackoffJitter: &jitter, + StableAfter: 50 * time.Millisecond, + BurstWindow: time.Second, + BurstMax: 1000, // disabled by default for most tests + OnBurstExit: &false_, + GracePeriod: 200 * time.Millisecond, + EventLog: buf, + WorkerStdout: &bytes.Buffer{}, + WorkerStderr: &bytes.Buffer{}, + Clock: clk, + } +} + +func TestRun_CrashRestartsThenShutdown(t *testing.T) { + var buf bytes.Buffer + spec := baseSpec(&buf, newFakeClock()) + spec.Cmd, spec.Args, spec.Env = childCmd("exit1") + + ctx, cancel := context.WithCancel(context.Background()) + // Let it crash a couple of times then cancel. + go func() { + time.Sleep(300 * time.Millisecond) + cancel() + }() + err := Run(ctx, spec) + if !errors.Is(err, context.Canceled) { + t.Fatalf("want context.Canceled, got %v", err) + } + + events := parseEvents(t, &buf) + starts := filterEvents(events, EventStart) + exits := filterEvents(events, EventExit) + if len(starts) < 2 { + t.Fatalf("expected ≥2 starts, got %d", len(starts)) + } + if len(exits) < 2 { + t.Fatalf("expected ≥2 exits, got %d", len(exits)) + } + // The last exit may be a SIGTERM-on-shutdown race where the child gets + // killed before its os.Exit(1) runs. Only assert on completed exits. + for i, e := range exits { + if e.Reason == "shutdown" || e.Reason == "signaled" { + continue + } + if e.ExitCode == nil || *e.ExitCode != 1 { + code := "" + if e.ExitCode != nil { + code = strconv.Itoa(*e.ExitCode) + } + t.Errorf("exit[%d] code = %s reason=%q, want exit code 1", i, code, e.Reason) + } + } + if len(filterEvents(events, EventBackoff)) < 1 { + t.Errorf("expected backoff events") + } + if len(filterEvents(events, EventShutdown)) != 1 { + t.Errorf("expected one shutdown event") + } +} + +func TestRun_GracefulShutdownOnSIGTERM(t *testing.T) { + var buf bytes.Buffer + spec := baseSpec(&buf, newFakeClock()) + spec.Cmd, spec.Args, spec.Env = childCmd("hang-until-sigterm") + spec.GracePeriod = 2 * time.Second + + ctx, cancel := context.WithCancel(context.Background()) + go func() { + time.Sleep(150 * time.Millisecond) + cancel() + }() + err := Run(ctx, spec) + if !errors.Is(err, context.Canceled) { + t.Fatalf("want context.Canceled, got %v", err) + } + events := parseEvents(t, &buf) + exits := filterEvents(events, EventExit) + if len(exits) != 1 { + t.Fatalf("expected 1 exit, got %d", len(exits)) + } + if exits[0].ExitCode == nil || *exits[0].ExitCode != 0 { + t.Errorf("expected clean exit 0 after SIGTERM, got code=%v signal=%q", exits[0].ExitCode, exits[0].Signal) + } +} + +func TestRun_SIGKILLAfterGracePeriod(t *testing.T) { + var buf bytes.Buffer + spec := baseSpec(&buf, newFakeClock()) + spec.Cmd, spec.Args, spec.Env = childCmd("hang-ignore-sigterm") + spec.GracePeriod = 250 * time.Millisecond + + ctx, cancel := context.WithCancel(context.Background()) + go func() { + time.Sleep(150 * time.Millisecond) + cancel() + }() + err := Run(ctx, spec) + if !errors.Is(err, context.Canceled) { + t.Fatalf("want context.Canceled, got %v", err) + } + events := parseEvents(t, &buf) + exits := filterEvents(events, EventExit) + if len(exits) != 1 { + t.Fatalf("expected 1 exit, got %d", len(exits)) + } + if exits[0].Signal != syscall.SIGKILL.String() { + t.Errorf("expected SIGKILL, got %q", exits[0].Signal) + } +} + +func TestRun_StableResetsBackoff(t *testing.T) { + var buf bytes.Buffer + spec := baseSpec(&buf, newFakeClock()) + // Sleep child for longer than StableAfter so we mark stable. + spec.Cmd, spec.Args, spec.Env = childCmd("sleep-then-exit0", "CHILD_SLEEP=100ms") + spec.StableAfter = 50 * time.Millisecond + + ctx, cancel := context.WithTimeout(context.Background(), 600*time.Millisecond) + defer cancel() + _ = Run(ctx, spec) + + events := parseEvents(t, &buf) + if len(filterEvents(events, EventStable)) == 0 { + t.Fatalf("expected at least one stable event\nall events: %+v", events) + } +} + +func TestRun_BurstExitTriggers(t *testing.T) { + var buf bytes.Buffer + spec := baseSpec(&buf, newFakeClock()) + spec.Cmd, spec.Args, spec.Env = childCmd("exit1") + spec.BurstMax = 3 + spec.BurstWindow = 5 * time.Second + true_ := true + spec.OnBurstExit = &true_ + + ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second) + defer cancel() + err := Run(ctx, spec) + if !errors.Is(err, ErrBurstExceeded) { + t.Fatalf("want ErrBurstExceeded, got %v", err) + } + events := parseEvents(t, &buf) + if len(filterEvents(events, EventBurstExit)) != 1 { + t.Fatalf("expected 1 burst_exit event\nevents: %+v", events) + } +} + +func TestRun_PreStartFailureCountsAsBurst(t *testing.T) { + var buf bytes.Buffer + spec := baseSpec(&buf, newFakeClock()) + spec.Cmd, spec.Args, spec.Env = childCmd("exit0") + // A pre-start hook that always fails (false on POSIX). + spec.PreStart = []Hook{{Argv: []string{"/bin/false"}}} + spec.BurstMax = 2 + spec.BurstWindow = 5 * time.Second + true_ := true + spec.OnBurstExit = &true_ + + ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second) + defer cancel() + err := Run(ctx, spec) + if !errors.Is(err, ErrBurstExceeded) { + t.Fatalf("want ErrBurstExceeded, got %v", err) + } + events := parseEvents(t, &buf) + ps := filterEvents(events, EventPreStart) + if len(ps) < 2 { + t.Fatalf("expected ≥2 prestart events\nevents: %+v", events) + } + for i, e := range ps { + if e.ExitCode == nil || *e.ExitCode == 0 { + t.Errorf("prestart[%d] expected non-zero exit, got %v", i, e.ExitCode) + } + } + // No worker should have launched. + if got := filterEvents(events, EventStart); len(got) != 0 { + t.Errorf("expected zero start events, got %d", len(got)) + } +} + +func TestRun_PostExitHookRunsWithWorkerEnv(t *testing.T) { + var buf bytes.Buffer + spec := baseSpec(&buf, newFakeClock()) + spec.Cmd, spec.Args, spec.Env = childCmd("exit0") + + // Use a hook that writes its env to a file we can inspect. + tmp, err := os.CreateTemp(t.TempDir(), "posthook-*.env") + if err != nil { + t.Fatal(err) + } + tmp.Close() + script := fmt.Sprintf("#!/bin/sh\nenv | grep ^WORKER_ > %s\n", tmp.Name()) + hookPath := tmp.Name() + ".sh" + if err := os.WriteFile(hookPath, []byte(script), 0o755); err != nil { + t.Fatal(err) + } + spec.PostExit = []Hook{{Argv: []string{hookPath}}} + + ctx, cancel := context.WithTimeout(context.Background(), 500*time.Millisecond) + defer cancel() + _ = Run(ctx, spec) + + got, err := os.ReadFile(tmp.Name()) + if err != nil { + t.Fatal(err) + } + // The script truncates the file each invocation, so we see only the + // last hook run's env. Assert the env var keys are present; the value + // may be the shutdown iteration's (e.g. WORKER_EXIT_CODE=-1). + needles := []string{"WORKER_EXIT_CODE=", "WORKER_PID=", "WORKER_ATTEMPT=", "WORKER_DURATION_MS="} + for _, n := range needles { + if !strings.Contains(string(got), n) { + t.Errorf("hook env missing %q\nfile=\n%s", n, string(got)) + } + } +} + +func TestRun_WorkerStdoutForwarded(t *testing.T) { + var buf bytes.Buffer + var stdout bytes.Buffer + spec := baseSpec(&buf, newFakeClock()) + spec.Cmd, spec.Args, spec.Env = childCmd("write-stdout") + spec.WorkerStdout = &stdout + + ctx, cancel := context.WithTimeout(context.Background(), 500*time.Millisecond) + defer cancel() + _ = Run(ctx, spec) + + if !strings.Contains(stdout.String(), "hello from child") { + t.Errorf("expected child stdout, got %q", stdout.String()) + } +} + +func TestRun_RejectsEmptyCmd(t *testing.T) { + err := Run(context.Background(), Spec{}) + if err == nil { + t.Fatal("expected error for empty Cmd") + } + if !strings.Contains(err.Error(), "Cmd") { + t.Errorf("unexpected error: %v", err) + } +} + +// Sanity: parse a duration into a string and back to verify the suite runs +// when triggered by the real go test entry (not the re-exec path). +func TestSanity(t *testing.T) { + _, err := strconv.Atoi("1") + if err != nil { + t.Fatal(err) + } +}