Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -2,13 +2,13 @@

# Local Executor

The Local executor runs evaluations on your machine using Docker. It provides a fast way to iterate if you have Docker installed, evaluating existing endpoints.
The Local executor runs evaluations on your machine. By default it uses Docker containers, and it can also run evaluations directly on the host process (`execution.use_docker: false`).

See common concepts and commands in {ref}`executors-overview`.

## Prerequisites

- Docker
- Docker (required only when `execution.use_docker: true`, which is the default)
- Python environment with the NeMo Evaluator Launcher CLI available (install the launcher by following {ref}`gs-install`)

## Quick Start
Expand All @@ -30,6 +30,24 @@ nemo-evaluator-launcher run --config packages/nemo-evaluator-launcher/examples/l
-o target.api_endpoint.api_key_name=NGC_API_KEY
```

### Run without Docker containers

```bash
nemo-evaluator-launcher run --config packages/nemo-evaluator-launcher/examples/local_basic.yaml \
--no-docker \
-o target.api_endpoint.api_key_name=NGC_API_KEY
```

Equivalent YAML:

```yaml
execution:
type: local
use_docker: false
```

When using `use_docker: false`, the requested benchmark task must be available from locally installed NeMo Evaluator packages (harness wheels). The launcher now validates this before execution and fails early if the harness/task is not installed.

## Environment Variables and Secrets

Environment variables use the unified prefix syntax (`$host:`, `$lit:`, `$runtime:`) described in {ref}`env-vars-configuration`. Declare them at the top-level `env_vars:` section, at `evaluation.env_vars`, or per-task. Secret values are stored in a `.secrets.env` file alongside the generated `run.sh` and sourced at runtime — they never appear in the script itself.
Expand Down Expand Up @@ -58,6 +76,7 @@ The Local executor uses Docker volume mounts for data persistence:
You can customize your local executor by specifying `extra_docker_args`.
This parameter allows you to pass any flag to the `docker run` command that is executed by the NeMo Evaluator Launcher.
You can use it to mount additional volumes, set environment variables or customize your network settings.
`extra_docker_args` is ignored when `execution.use_docker: false`.

For example, if you would like your job to use a specific docker network, you can specify:

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -94,6 +94,13 @@ class Cmd:
"If not specified, loads $PWD/.env if it exists."
},
)
no_docker: bool = field(
default=False,
alias=["--no-docker"],
metadata={
"help": "Run local executor tasks directly on host without launching Docker containers. Equivalent to setting execution.use_docker=false."
},
)

def _parse_requested_tasks(self) -> list[str]:
"""Parse -t arguments into a list of task names.
Expand Down Expand Up @@ -207,6 +214,16 @@ def execute(self) -> None:
hydra_overrides=self.override,
)

if self.no_docker:
if config.execution.type != "local":
raise ValueError(
"--no-docker is only supported with execution.type=local."
)
is_struct = OmegaConf.is_struct(config)
OmegaConf.set_struct(config, False)
config.execution.use_docker = False
OmegaConf.set_struct(config, is_struct)

# Apply task filtering if -t is specified
if requested_tasks:
config = filter_tasks(config, requested_tasks)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
import base64
import datetime
import os
import shlex
from dataclasses import dataclass
from typing import Optional

Expand Down Expand Up @@ -61,8 +62,9 @@ def _str_to_echo_command(str_to_save: str, filename: str) -> CmdAndReadableComme
debug_str = "\n".join(
[f"# Contents of {filename}"] + ["# " + s for s in str_to_save.splitlines()]
)
quoted_filename = shlex.quote(filename)
return CmdAndReadableComment(
cmd=f'echo "{str_to_save_b64}" | base64 -d > {filename}', debug=debug_str
cmd=f'echo "{str_to_save_b64}" | base64 -d > {quoted_filename}', debug=debug_str
)


Expand Down Expand Up @@ -167,6 +169,7 @@ def get_eval_factory_command(
cfg: DictConfig,
user_task_config: DictConfig,
task_definition: dict,
output_dir: str = CONTAINER_RESULTS_DIR,
) -> CmdAndReadableComment:
# This gets the eval_factory_config merged from both top-level and task-level.
merged_nemo_evaluator_config = get_eval_factory_config(
Expand Down Expand Up @@ -214,7 +217,7 @@ def get_eval_factory_command(
_set_nested_optionally_overriding(
merged_nemo_evaluator_config,
["config", "output_dir"],
CONTAINER_RESULTS_DIR,
output_dir,
)
api_key_name = get_api_key_name(cfg)
if api_key_name:
Expand Down Expand Up @@ -275,7 +278,7 @@ def get_eval_factory_command(
if config_path:
create_unresolved_config_cmd = _str_to_echo_command(
open(config_path, "r").read(),
filename=f"{CONTAINER_RESULTS_DIR}/launcher_unresolved_config.yaml",
filename=f"{output_dir}/launcher_unresolved_config.yaml",
)
commands.append(create_unresolved_config_cmd.cmd)
debug.append(create_unresolved_config_cmd.debug)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,5 +15,6 @@
#
type: local
output_dir: ???
use_docker: true
extra_docker_args: ""
mode: sequential
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import platform
import shlex
import shutil
import signal
import subprocess
import time
from typing import Iterator, List, Optional, Tuple, Union
Expand Down Expand Up @@ -67,6 +68,61 @@
from nemo_evaluator_launcher.executors.registry import register_executor


def _get_local_available_tasks() -> dict[str, set[str]]:
"""Return locally installed NeMo Evaluator tasks grouped by harness."""
try:
from nemo_evaluator.api import get_available_evaluations
except ImportError as e:
raise RuntimeError(
"execution.use_docker=false requires `nemo-evaluator` to be installed locally. "
"Install nemo-evaluator (with the harness/task wheels you need), or enable Docker execution."
) from e

framework_task_mapping, _, _ = get_available_evaluations()
return {
framework: set(tasks.keys())
for framework, tasks in framework_task_mapping.items()
}


def _validate_task_available_locally(
*,
task_query: str,
task_definition: dict,
available_tasks_by_harness: dict[str, set[str]],
) -> None:
"""Validate that a task exists in locally installed NeMo Evaluator packages."""
harness_name = str(task_definition.get("harness") or "")
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

[Power Review] 💡 Missing defensive handling when both harness and task name are empty in _validate_task_available_locally · 70% confidence

Per the knowledge document (Lack of Defensive default/else Branch): the function handles two cases — harness_name is truthy (lines 98-114) and harness_name is falsy (lines 116-123). In the falsy branch, if task_name is also empty (both harness and task keys are missing or empty in task_definition), the code searches for "" across all harnesses and produces a generic error message. A more explicit guard at the top of the function would provide a clearer, earlier error message for this malformed-input boundary.

💡 Suggestion: Add an early guard: if not harness_name and not task_name: raise ValueError(f"Task '{task_query}' has no harness or task name defined in its task definition.") before the main branching logic.

task_name = str(task_definition.get("task") or "")

if harness_name:
harness_tasks = available_tasks_by_harness.get(harness_name)
if harness_tasks is None:
available_harnesses = sorted(available_tasks_by_harness.keys())
raise ValueError(
f"Task '{task_query}' requires harness '{harness_name}', but this harness is not installed locally. "
f"Installed harnesses: {available_harnesses or ['<none>']}. "
"Install the corresponding NeMo Evaluator wheel, or run with Docker."
)
if task_name not in harness_tasks:
available_tasks = sorted(harness_tasks)
raise ValueError(
f"Task '{task_query}' is not available in installed harness '{harness_name}'. "
f"Available tasks in this harness: {available_tasks or ['<none>']}. "
"Install a wheel that contains this task, or run with Docker."
)
return

matching_harnesses = [
harness for harness, tasks in available_tasks_by_harness.items() if task_name in tasks
]
if not matching_harnesses:
raise ValueError(
f"Task '{task_query}' is not available in locally installed NeMo Evaluator packages. "
"Install a wheel that contains this task, or run with Docker."
)


@register_executor("local")
class LocalExecutor(BaseExecutor):
@classmethod
Expand All @@ -83,12 +139,21 @@ def execute_eval(cls, cfg: DictConfig, dry_run: bool = False) -> str:
Raises:
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

[Power Review] ⚠️ Monolithic function: execute_eval spans 340+ lines · 75% confidence

Per the knowledge document (Monolithic Functions >50 lines): 'AI generates long, flat functions instead of composing smaller units. Functions exceeding ~50 lines are harder to test, review, and maintain.' Based on the diff line numbers, execute_eval spans from approximately line 139 to at least line 480, making it well over 300 lines. This PR adds further branches and logic, exacerbating the problem.

💡 Suggestion: Extract cohesive blocks into well-named helpers: validation/preconditions (_validate_execution_config()), task iteration and definition resolution, script generation, and job execution. Each extracted method should be independently testable. The no-docker additions present a natural refactoring opportunity.

RuntimeError: If the run script fails.
"""
use_docker = bool(cfg.execution.get("use_docker", True))
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

[Power Review] ⚠️ Geological layers: use_docker conditionals scattered throughout execute_eval · 85% confidence

Per the knowledge document (Geological Layers): 'New code bolted on top of old code without integrating — duplicate data paths, parallel config mechanisms, wrapper-on-wrapper stacking.' The diff adds ~11 use_docker conditional branches scattered across execute_eval (lines 142, 145, 150, 155, 204, 268, 306, 340, 355, 373, 472). Each conditional duplicates a small decision about Docker vs host execution rather than abstracting the difference behind a clean boundary. This makes the method harder to reason about and will compound if more execution modes are added.

💡 Suggestion: Extract Docker-specific and host-specific task-building logic into separate private methods (e.g., _build_docker_task_config() and _build_host_task_config()) or introduce a lightweight strategy object. The main execute_eval method should orchestrate at a high level, delegating mode-specific details to these helpers. This keeps the conditional surface to one or two dispatch points instead of eleven.

Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

[Power Review] 💡 Missing test for boundary between use_docker=true and use_docker=false in kill_job · 80% confidence

Per the knowledge document (Missing Test for 'Just Inside / Just Outside' Pair): the kill_job method now branches on use_docker (line 802/821) but the test files in this diff do not include any tests for the kill_job path in no-docker mode — specifically testing: (1) kill with valid PID file, (2) kill with missing PID file, (3) kill with empty/invalid PID file content, (4) kill with use_docker=False and no container name. These boundary transitions should be exercised.

💡 Suggestion: Add test cases in test_local_executor.py covering kill_job for no-docker jobs: valid PID file, missing PID file, malformed PID file content (empty, non-numeric, zero, negative), and the transition where use_docker=False with no container name.


# Check if docker is available (skip in dry_run mode)
if not dry_run and shutil.which("docker") is None:
if use_docker and not dry_run and shutil.which("docker") is None:
raise RuntimeError(
"Docker is not installed or not in PATH. "
"Please install Docker to run local evaluations."
)
if not use_docker and cfg.deployment.type != "none":
raise ValueError(
"execution.use_docker=false is only supported with deployment.type=none."
)
local_available_tasks: dict[str, set[str]] | None = None
if not use_docker:
local_available_tasks = _get_local_available_tasks()

# Generate invocation ID for this evaluation run
invocation_id = generate_invocation_id()
Expand Down Expand Up @@ -136,6 +201,12 @@ def execute_eval(cls, cfg: DictConfig, dry_run: bool = False) -> str:
container=task.get("container"),
endpoint_type=task.get("endpoint_type"),
)
if not use_docker:
_validate_task_available_locally(
task_query=task.name,
task_definition=task_definition,
available_tasks_by_harness=local_available_tasks or {},
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

[Power Review] 💡 Defensive or {} is unreachable — consider assertion instead · 85% confidence

At line 208, available_tasks_by_harness=local_available_tasks or {} uses or {} as a fallback. However, local_available_tasks is set to a dict at line 156 (inside if not use_docker), and _validate_task_available_locally is only called at line 204 (inside if not use_docker). So local_available_tasks is guaranteed to be a non-None dict at this point. The or {} can never activate, making it a tautological guard. This falls under the 'Redundant or Tautological Condition' suggestion pattern.

💡 Suggestion: Replace local_available_tasks or {} with just local_available_tasks, optionally preceded by assert local_available_tasks is not None to document the invariant.

)

# Track unlisted tasks for safeguard check
if task_definition.get("is_unlisted", False):
Expand Down Expand Up @@ -194,11 +265,15 @@ def execute_eval(cls, cfg: DictConfig, dry_run: bool = False) -> str:
dataset_mount_container = None
dataset_env_var_value = None
if "dataset_dir" in task:
dataset_mount_host = task["dataset_dir"]
# Get container mount path (default to /datasets if not specified)
dataset_mount_container = task.get("dataset_mount_path", "/datasets")
# Set NEMO_EVALUATOR_DATASET_DIR to the container mount path
dataset_env_var_value = dataset_mount_container
if use_docker:
dataset_mount_host = task["dataset_dir"]
# Get container mount path (default to /datasets if not specified)
dataset_mount_container = task.get("dataset_mount_path", "/datasets")
# Set NEMO_EVALUATOR_DATASET_DIR to the container mount path
dataset_env_var_value = dataset_mount_container
else:
# In no-docker mode, pass dataset_dir directly to local process.
dataset_env_var_value = task["dataset_dir"]

# Build env_groups for secrets file generation
env_groups = {}
Expand All @@ -225,7 +300,12 @@ def execute_eval(cls, cfg: DictConfig, dry_run: bool = False) -> str:
task_output_dir = output_dir / task.name
task_output_dir.mkdir(parents=True, exist_ok=True)
eval_factory_command_struct = get_eval_factory_command(
cfg, task, task_definition
cfg,
task,
task_definition,
output_dir=(
"/results" if use_docker else str(task_output_dir / "artifacts")
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

[Power Review] ⚠️ Hardcoded "/results" duplicates CONTAINER_RESULTS_DIR constant · 80% confidence

The get_eval_factory_command function defines its default parameter as output_dir: str = CONTAINER_RESULTS_DIR (helpers.py line 172). However, the caller in executor.py passes the magic string "/results" for the Docker path instead of referencing the same constant. This is a Tight Coupling to Implementation Detail issue — if CONTAINER_RESULTS_DIR is ever updated, this hardcoded value will silently diverge, leading to path mismatches between the executor and the eval factory command configuration.

💡 Suggestion: Import and use the CONTAINER_RESULTS_DIR constant: output_dir=(CONTAINER_RESULTS_DIR if use_docker else str(task_output_dir / "artifacts")). This ensures the Docker path always matches the function's documented default.

),
)
eval_factory_command = eval_factory_command_struct.cmd
# The debug comment for placing into the script and easy debug. Reason
Expand Down Expand Up @@ -257,6 +337,7 @@ def execute_eval(cls, cfg: DictConfig, dry_run: bool = False) -> str:
"dataset_mount_host": dataset_mount_host,
"dataset_mount_container": dataset_mount_container,
"dataset_env_var_value": dataset_env_var_value,
"run_with_docker": use_docker,
}
evaluation_tasks.append(evaluation_task)

Expand All @@ -271,6 +352,7 @@ def execute_eval(cls, cfg: DictConfig, dry_run: bool = False) -> str:
evaluation_tasks=[evaluation_task],
auto_export_destinations=auto_export_destinations,
extra_docker_args=extra_docker_args,
has_docker_tasks=use_docker,
).rstrip("\n")
+ "\n"
)
Expand All @@ -288,6 +370,7 @@ def execute_eval(cls, cfg: DictConfig, dry_run: bool = False) -> str:
evaluation_tasks=evaluation_tasks,
auto_export_destinations=auto_export_destinations,
extra_docker_args=extra_docker_args,
has_docker_tasks=use_docker,
).rstrip("\n")
+ "\n"
)
Expand Down Expand Up @@ -386,8 +469,13 @@ def execute_eval(cls, cfg: DictConfig, dry_run: bool = False) -> str:
executor="local",
data={
"output_dir": str(evaluation_task["output_dir"]),
"container": evaluation_task["client_container_name"],
"container": (
evaluation_task["client_container_name"]
if use_docker
else ""
),
"eval_image": evaluation_task["eval_image"],
"use_docker": use_docker,
},
config=OmegaConf.to_object(cfg),
)
Expand Down Expand Up @@ -711,33 +799,46 @@ def kill_job(job_id: str) -> None:
f"Job {job_id} is not a local job (executor: {job_data.executor})"
)

# Get container name from database
container_name = job_data.data.get("container")
if not container_name:
raise ValueError(f"No container name found for job {job_id}")
use_docker = bool(job_data.data.get("use_docker", True))
output_dir = pathlib.Path(job_data.data.get("output_dir", ""))
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

[Power Review] ⚠️ Empty output_dir from job database creates PID file path relative to cwd · 80% confidence

Per the knowledge document (Unhandled Empty or Null Input): a function receives an empty string and proceeds without a guard. job_data.data.get("output_dir", "") returns an empty string if the key is missing (e.g., for job records created before this PR, or corrupted data). pathlib.Path("") resolves to the current working directory, so pid_file becomes ./logs/stage.pid. If such a file coincidentally exists in the cwd, kill_job could read an unrelated PID and send SIGTERM to the wrong process group — a potentially destructive action.

💡 Suggestion: Validate that output_dir is non-empty before constructing the PID file path. For example: if not output_dir.parts: raise ValueError(f"No output_dir found for job {job_id}") or skip the PID-file kill path entirely when output_dir is empty.

Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

[Power Review] 💡 Empty-string fallback for output_dir could create relative PID file path · 70% confidence

Per the knowledge document (WARNING: Error Message Without Context / ERROR: Null/None Dereference on Fallible Return): job_data.data.get("output_dir", "") defaults to an empty string. pathlib.Path("") / "logs" / "stage.pid" produces the relative path logs/stage.pid, which could inadvertently match a file in the process's current working directory. While pid_file.exists() guards against reading a nonexistent file, if such a relative path happens to exist, kill_job could send SIGTERM to an unrelated process.

💡 Suggestion: Validate that output_dir is non-empty and absolute before using it for PID file lookups. If missing, skip the PID-based kill or raise an error early:

output_dir_str = job_data.data.get("output_dir", "")
if not output_dir_str:
    raise ValueError(f"No output directory found for job {job_id}")
output_dir = pathlib.Path(output_dir_str)

container_name = job_data.data.get("container") or ""

killed_something = False

# First, try to stop the Docker container if it's running
result = subprocess.run(
shlex.split(f"docker stop {container_name}"),
capture_output=True,
text=True,
timeout=30,
)
if result.returncode == 0:
killed_something = True
# Don't raise error if container doesn't exist (might be still pulling)

# Find and kill Docker processes for this container
result = subprocess.run(
shlex.split(f"pkill -f 'docker run.*{container_name}'"),
capture_output=True,
text=True,
timeout=10,
)
if result.returncode == 0:
killed_something = True
# Try to stop script process group if a pid file is present.
pid_file = output_dir / "logs" / "stage.pid"
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This unfortunately introduces TOCTOU vulnerability. If anything happens to stage.pid file (overwrite, symlink race) we risk killing an alien process or other malicious actions. Since we operate on third party process, we cannot use typical flock approach. If we were to stay with PID file, then checking start time / process info before killing could be beneficial. For the scope of this MR, it's okay to skip this

Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

[Power Review] ⚠️ Filesystem TOCTOU on pid_file.exists() then pid_file.read_text() · 85% confidence

The knowledge document explicitly flags os.path.exists(path) then file operation as a Non-Atomic Check-Then-Act (TOCTOU) anti-pattern (CWE-367): 'filesystem TOCTOU — os.path.exists(path) then open(path) ... another process can replace the file between check and use. Instead: open first and handle the error.' Here, pid_file.exists() is checked on line 810, then pid_file.read_text() is called on line 812. The shell template's trap 'rm -f "$logs_dir/stage.pid"' EXIT (run.template.sh line 70) actively removes this file on process exit, creating a real window where the file disappears between check and read. While the except (OSError, ValueError): pass block on line 818 would catch the resulting FileNotFoundError, the exists() guard is redundant and constitutes a textbook TOCTOU pattern.

💡 Suggestion: Remove the pid_file.exists() check and rely solely on exception handling. This is both more correct and more concise:

try:
    pid = int(pid_file.read_text().strip())
    if hasattr(os, 'killpg'):
        os.killpg(pid, signal.SIGTERM)
    else:
        os.kill(pid, signal.SIGTERM)
    killed_something = True
except (OSError, ValueError):
    pass

Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

[Power Review] ⚠️ PID file not cleaned up after failed or successful kill in kill_job · 75% confidence

Per the knowledge document's 'Temporary File or Directory Not Cleaned Up' rule: a temporary file is created but never deleted, especially on error paths. The kill_job method reads stage.pid and attempts os.killpg/os.kill. If the process is already dead (stale PID), the OSError is caught and silently ignored (line 818), but the PID file itself is never removed. Additionally, even on a successful SIGTERM send, the method proceeds immediately without waiting for the process's EXIT trap to fire and clean up the file. This leaves stale PID files that could cause subsequent kill_job calls to target a reused PID — an analog to the 'Use After Release' pattern in PID namespace.

💡 Suggestion: After the try/except block (around line 819), add cleanup of the PID file regardless of whether the kill succeeded or failed. For example: try: pid_file.unlink(missing_ok=True) except OSError: pass. This makes cleanup idempotent and prevents stale PID files from accumulating.

if pid_file.exists():
try:
pid = int(pid_file.read_text().strip())
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

[Power Review] 🔴 Missing validation that PID is positive before os.killpg — PID 0 kills caller's process group · 95% confidence

Per the knowledge document (Missing Input Validation, CWE-20): data arriving from outside the trust boundary (file read) is used without checking range. The PID is read from a file (stage.pid) and passed directly to os.killpg(pid, signal.SIGTERM). If the file contains 0, os.killpg(0, SIGTERM) sends SIGTERM to every process in the calling process group — potentially killing the launcher itself and sibling processes. A negative value would also be invalid. The int() conversion alone does not guard against these dangerous values.

💡 Suggestion: After parsing the PID, validate it is strictly positive before use: if pid <= 0: raise ValueError(f'Invalid PID: {pid}'). This should be placed before the os.killpg() call and within the existing try/except block so that invalid values are caught gracefully.

if hasattr(os, "killpg"):
os.killpg(pid, signal.SIGTERM)
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

[Power Review] ⚠️ os.killpg assumes PID is a process group leader, but subshell inherits parent PGID · 82% confidence

Per knowledge doc 'Wrong Variable or Field Referenced' and 'Stale or Cached Value Used After Update': os.killpg(pid, signal.SIGTERM) sends a signal to the process GROUP identified by pid. However, the subshell spawned by (...) inherits the parent script's process group — its PID is NOT a process group ID. If $$ is fixed to $BASHPID, then os.killpg(subshell_pid, ...) would fail with ESRCH because the subshell PID is not a PGID. If $$ remains as-is (parent PID), os.killpg kills the entire script's process group (all tasks). Either way, the kill mechanism does not correctly target a single task.

💡 Suggestion: Either: (1) use os.kill(pid, signal.SIGTERM) to signal just the subshell process (and rely on SIGTERM propagation or explicit child cleanup), or (2) have the subshell create its own process group in the template (e.g., prepend the subshell body with a set -m or use setsid) so that os.killpg is valid.

Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

[Power Review] ⚠️ os.killpg uses PID as PGID — only works if script is a process group leader · 72% confidence

os.killpg(pgid, sig) sends a signal to an entire process group identified by its process group ID (PGID). The code reads a PID from a file (written by echo "$$" in the shell template) and passes it directly to os.killpg as if it were a PGID. This only works if the script process happens to be its own process group leader (PID == PGID), which is not guaranteed — especially when the script is launched via subprocess.run() without start_new_session=True, as the child inherits the parent's process group. For non-docker jobs, the PID-file mechanism is the only kill path (docker stop is skipped), so if killpg fails with ProcessLookupError (caught silently by except OSError), kill_job will always report failure for no-docker jobs. This matches the 'Incorrect Operator or Operand' error pattern — PID and PGID are related but distinct values.

💡 Suggestion: Either (a) use os.getpgid(pid) to resolve the actual PGID before calling os.killpg, or (b) use os.kill(pid, signal.SIGTERM) to signal the script process directly (though children may survive), or (c) ensure the script is started in its own process group (e.g., subprocess.Popen with start_new_session=True) so that PID == PGID is guaranteed.

else:
os.kill(pid, signal.SIGTERM)
killed_something = True
except (OSError, ValueError):
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

[Power Review] ⚠️ Swallowed exception in PID-based kill path · 90% confidence

Per the knowledge document (ERROR: Swallowed Exception With No Action): 'An exception is caught and silently ignored. The caller has no way to know the operation failed.' Here, except (OSError, ValueError): pass discards all diagnostic information when the PID file read or process kill fails. A ValueError from int() (corrupt PID file) or an OSError other than 'no such process' (e.g., EPERM — permission denied) would be silently lost, making debugging kill failures very difficult. Downgraded to warning because the killed_something flag does communicate functional outcome and the code falls through to Docker-based killing as a secondary strategy.

💡 Suggestion: At minimum, log the exception at debug/warning level so operators can diagnose kill failures. Consider also narrowing the OSError handling — e.g., only silencing ProcessLookupError (errno ESRCH) when the process is already dead, while letting permission errors propagate.

Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

[Power Review] ⚠️ Swallowed exceptions in kill_job PID cleanup · 85% confidence

The knowledge document states: 'Never swallow exceptions without logging or re-raising.' While the exceptions caught here are specific (OSError, ValueError) rather than bare, they are silently discarded with pass. An OSError could indicate a permission-denied condition (not just process-already-gone), and a ValueError could indicate a corrupt PID file — both worth logging for debuggability of kill failures.

💡 Suggestion: Add a debug/warning log for the caught exceptions. For example: except (OSError, ValueError) as exc: logger.debug("Could not signal process from %s: %s", pid_file, exc). This preserves the non-fatal behavior while leaving a diagnostic trail.

pass

if use_docker and container_name:
# First, try to stop the Docker container if it's running
result = subprocess.run(
shlex.split(f"docker stop {container_name}"),
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

[Power Review] ⚠️ shlex.split on f-string with container_name may split malformed names · 72% confidence

Per knowledge doc 'Injection: Untrusted Data in Executable Context' (CWE-78): container_name originates from stored job data (job_data.data.get('container')). While subprocess.run is called with a list (not shell=True), shlex.split(f"docker stop {container_name}") will split on whitespace and interpret shell quoting in container_name. A container name containing spaces or shell metacharacters would produce unexpected argument splitting. The same pattern applies to line 835 with pkill -f. Although this pattern existed pre-diff, the new code at line 804 (container_name = job_data.data.get("container") or "") now allows empty string through, and the guard at line 821 prevents execution with empty names, but a name with embedded spaces/quotes would still be mis-split.

💡 Suggestion: Use an explicit argument list instead of shlex.split on an f-string: subprocess.run(["docker", "stop", container_name], ...). This avoids the splitting ambiguity entirely.

capture_output=True,
text=True,
timeout=30,
)
if result.returncode == 0:
killed_something = True
# Don't raise error if container doesn't exist (might be still pulling)

# Find and kill Docker processes for this container
result = subprocess.run(
shlex.split(f"pkill -f 'docker run.*{container_name}'"),
capture_output=True,
text=True,
timeout=10,
)
if result.returncode == 0:
killed_something = True

# If we successfully killed something, mark as killed
if killed_something:
Expand All @@ -758,7 +859,13 @@ def kill_job(job_id: str) -> None:
# Use common helper to get informative error message based on job status
current_status = status_list[0].state if status_list else None
error_msg = LocalExecutor.get_kill_failure_message(
job_id, f"container: {container_name}", current_status
job_id,
(
f"container: {container_name}"
if container_name
else f"pid_file: {pid_file}"
),
current_status,
)
raise RuntimeError(error_msg)

Expand Down
Loading
Loading