Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
70 changes: 57 additions & 13 deletions pkg/functions/runner.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,11 +11,12 @@ import (
"os"
"os/exec"
"path/filepath"
"runtime"
"time"
)

const (
defaultRunHost = "127.0.0.1" // TODO allow to be altered via a runOpt
Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

just removing the comment, we are already parsing the address

defaultRunHost = "127.0.0.1"
defaultRunPort = "8080"
readinessEndpoint = "/health/readiness"
)
Expand Down Expand Up @@ -169,13 +170,23 @@ func runGo(ctx context.Context, job *Job) (err error) {
cmd.Stdout = os.Stdout
cmd.Stderr = os.Stderr

// cmd.Cancel = stop // TODO: use when we upgrade to go 1.20
cmd.Cancel = func() error {
Comment thread
gauron99 marked this conversation as resolved.
if runtime.GOOS == "windows" {
// Interrupt is not implemented on windows apparently
return cmd.Process.Kill()
}
return cmd.Process.Signal(os.Interrupt)
}
Comment on lines +173 to +179
Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

the new Cancel function.
Windows apparently does not implement safer interrupt so we have to Kill

// force kill after delay if Interrupt signal did not work to not hang indefinitely
cmd.WaitDelay = 5 * time.Second

// See the 1.19 [release notes](https://tip.golang.org/doc/go1.19) which state:
// A Cmd with a non-empty Dir field and nil Env now implicitly sets the PWD environment variable for the subprocess to match Dir.
// The new method Cmd.Environ reports the environment that would be used to run the command, including the implicitly set PWD variable.
// cmd.Env = append(cmd.Environ(), "PORT="+job.Port) // requires go 1.19
cmd.Env = append(cmd.Env, "LISTEN_ADDRESS="+net.JoinHostPort(job.Host, job.Port), "PWD="+cmd.Dir)
cmd.Env, err = buildRunnerEnv(job, map[string]string{
"LISTEN_ADDRESS": net.JoinHostPort(job.Host, job.Port),
"PWD": cmd.Dir,
})
if err != nil {
return fmt.Errorf("error building runner environment: %w", err)
}

// Running asynchronously allows for the client Run method to return
// metadata about the running function such as its chosen port.
Expand Down Expand Up @@ -244,12 +255,22 @@ func runPython(ctx context.Context, job *Job) (err error) {
cmd.Stdout = os.Stdout
cmd.Stderr = os.Stderr

// See 1.19 [release notes](https://tip.golang.org/doc/go1.19) which state:
// A Cmd with a non-empty Dir field and nil Env now implicitly sets the
// PWD environment variable for the subprocess to match Dir.
// The new method Cmd.Environ reports the environment that would be used
// to run the command, including the implicitly set PWD variable.
cmd.Env = append(cmd.Env, "PORT="+job.Port, "LISTEN_ADDRESS="+listenAddress, "PWD="+cmd.Dir)
cmd.Cancel = func() error {
if runtime.GOOS == "windows" {
return cmd.Process.Kill()
}
return cmd.Process.Signal(os.Interrupt)
}
cmd.WaitDelay = 5 * time.Second

cmd.Env, err = buildRunnerEnv(job, map[string]string{
"PORT": job.Port,
"LISTEN_ADDRESS": listenAddress,
"PWD": cmd.Dir,
})
if err != nil {
return fmt.Errorf("error building runner environment: %w", err)
}

// Running asynchronously allows for the client Run method to return
// metadata about the running function such as its chosen port.
Expand All @@ -266,6 +287,29 @@ func runPython(ctx context.Context, job *Job) (err error) {
return
}

// buildRunnerEnv constructs the environment for a host-run subprocess.
// It starts with the parent process environment (os.Environ), layers on the
// provided extras (e.g. PORT, LISTEN_ADDRESS, PWD), and then applies any
// environment variables defined in func.yaml or passed via -e flags.
func buildRunnerEnv(job *Job, extras map[string]string) ([]string, error) {
env := os.Environ()

for k, v := range extras {
env = append(env, k+"="+v)
}

// Interpolate and append env vars from func.yaml / -e flags.
funcEnvs, err := Interpolate(job.Function.Run.Envs)
if err != nil {
return nil, fmt.Errorf("error interpolating environment variables: %w", err)
}
for k, v := range funcEnvs {
env = append(env, k+"="+v)
}

return env, nil
}

func waitFor(ctx context.Context, job *Job, timeout time.Duration) error {
var (
uri = fmt.Sprintf("http://%s%s", net.JoinHostPort(job.Host, job.Port), readinessEndpoint)
Expand Down
150 changes: 150 additions & 0 deletions pkg/functions/runner_env_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,150 @@
package functions

import (
"os"
"slices"
"strings"
"testing"

"k8s.io/utils/ptr"
)

// TestBuildRunnerEnv_InheritsParentEnv ensures that the parent process
// environment is inherited by the subprocess.
func TestBuildRunnerEnv_InheritsParentEnv(t *testing.T) {
const testKey = "FUNC_TEST_INHERIT_CHECK"
const testVal = "inherited_value"
t.Setenv(testKey, testVal)

job := &Job{Function: Function{}}
env, err := buildRunnerEnv(job, nil)
if err != nil {
t.Fatalf("unexpected error: %v", err)
}

expected := testKey + "=" + testVal
if !slices.Contains(env, expected) {
t.Errorf("expected parent env var %q in result, but not found", expected)
}
}

// TestBuildRunnerEnv_ExtrasAreIncluded ensures that extras like PORT,
// LISTEN_ADDRESS, and PWD are present in the environment.
func TestBuildRunnerEnv_ExtrasAreIncluded(t *testing.T) {
job := &Job{Function: Function{}}
extras := map[string]string{
"PORT": "8080",
"LISTEN_ADDRESS": "127.0.0.1:8080",
"PWD": "/tmp/func",
}

env, err := buildRunnerEnv(job, extras)
if err != nil {
t.Fatalf("unexpected error: %v", err)
}

for k, v := range extras {
expected := k + "=" + v
if !slices.Contains(env, expected) {
t.Errorf("expected extra env var %q in result, but not found", expected)
}
}
}

// TestBuildRunnerEnv_FuncYamlEnvsIncluded ensures that envs from func.yaml
// (Function.Run.Envs) are interpolated and included.
func TestBuildRunnerEnv_FuncYamlEnvsIncluded(t *testing.T) {
job := &Job{
Function: Function{
Run: RunSpec{
Envs: Envs{
{Name: ptr.To("MY_VAR"), Value: ptr.To("my_value")},
{Name: ptr.To("ANOTHER"), Value: ptr.To("another_value")},
},
},
},
}

env, err := buildRunnerEnv(job, nil)
if err != nil {
t.Fatalf("unexpected error: %v", err)
}

if !slices.Contains(env, "MY_VAR=my_value") {
t.Error("expected MY_VAR=my_value in result")
}
if !slices.Contains(env, "ANOTHER=another_value") {
t.Error("expected ANOTHER=another_value in result")
}
}

// TestBuildRunnerEnv_FuncYamlEnvsOverrideParent ensures that func.yaml envs
// take precedence over parent environment variables (last value wins in exec).
func TestBuildRunnerEnv_FuncYamlEnvsOverrideParent(t *testing.T) {
const testKey = "FUNC_TEST_OVERRIDE"
t.Setenv(testKey, "parent_value")

job := &Job{
Function: Function{
Run: RunSpec{
Envs: Envs{
{Name: ptr.To(testKey), Value: ptr.To("func_yaml_value")},
},
},
},
}

env, err := buildRunnerEnv(job, nil)
if err != nil {
t.Fatalf("unexpected error: %v", err)
}

// The func.yaml value should appear after the parent value.
// In exec.Cmd, the last duplicate key wins.
lastValue := ""
for _, e := range env {
if v, ok := strings.CutPrefix(e, testKey+"="); ok {
lastValue = v
}
}
if lastValue != "func_yaml_value" {
t.Errorf("expected func.yaml value to override parent, got %q", lastValue)
}
}

// TestBuildRunnerEnv_PathInherited is a quick check that PATH specifically
// is inherited from parent env.
func TestBuildRunnerEnv_PathInherited(t *testing.T) {
job := &Job{Function: Function{}}
env, err := buildRunnerEnv(job, nil)
if err != nil {
t.Fatalf("unexpected error: %v", err)
}

pathPresent := false
for _, e := range env {
if strings.HasPrefix(e, "PATH=") {
pathPresent = true
break
}
}

// PATH should always be set in a normal Unix environment
if path := os.Getenv("PATH"); path != "" && !pathPresent {
t.Error("expected PATH to be inherited from parent environment")
}
}

// TestBuildRunnerEnv_EmptyRunEnvs ensures no error when there are no
// func.yaml envs configured.
func TestBuildRunnerEnv_EmptyRunEnvs(t *testing.T) {
job := &Job{Function: Function{}}
env, err := buildRunnerEnv(job, map[string]string{"PORT": "8080"})
if err != nil {
t.Fatalf("unexpected error: %v", err)
}

if !slices.Contains(env, "PORT=8080") {
t.Error("expected PORT=8080 in result")
}
}
Loading