-
Notifications
You must be signed in to change notification settings - Fork 46
feat: add local no-docker mode for nemo-evaluator-launcher #786
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Changes from all commits
ca28143
ff8c248
07e6372
665f689
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -15,5 +15,6 @@ | |
| # | ||
| type: local | ||
| output_dir: ??? | ||
| use_docker: true | ||
| extra_docker_args: "" | ||
| mode: sequential | ||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -23,6 +23,7 @@ | |
| import platform | ||
| import shlex | ||
| import shutil | ||
| import signal | ||
| import subprocess | ||
| import time | ||
| from typing import Iterator, List, Optional, Tuple, Union | ||
|
|
@@ -67,6 +68,61 @@ | |
| from nemo_evaluator_launcher.executors.registry import register_executor | ||
|
|
||
|
|
||
| def _get_local_available_tasks() -> dict[str, set[str]]: | ||
| """Return locally installed NeMo Evaluator tasks grouped by harness.""" | ||
| try: | ||
| from nemo_evaluator.api import get_available_evaluations | ||
| except ImportError as e: | ||
| raise RuntimeError( | ||
| "execution.use_docker=false requires `nemo-evaluator` to be installed locally. " | ||
| "Install nemo-evaluator (with the harness/task wheels you need), or enable Docker execution." | ||
| ) from e | ||
|
|
||
| framework_task_mapping, _, _ = get_available_evaluations() | ||
| return { | ||
| framework: set(tasks.keys()) | ||
| for framework, tasks in framework_task_mapping.items() | ||
| } | ||
|
|
||
|
|
||
| def _validate_task_available_locally( | ||
| *, | ||
| task_query: str, | ||
| task_definition: dict, | ||
| available_tasks_by_harness: dict[str, set[str]], | ||
| ) -> None: | ||
| """Validate that a task exists in locally installed NeMo Evaluator packages.""" | ||
| harness_name = str(task_definition.get("harness") or "") | ||
| task_name = str(task_definition.get("task") or "") | ||
|
|
||
| if harness_name: | ||
| harness_tasks = available_tasks_by_harness.get(harness_name) | ||
| if harness_tasks is None: | ||
| available_harnesses = sorted(available_tasks_by_harness.keys()) | ||
| raise ValueError( | ||
| f"Task '{task_query}' requires harness '{harness_name}', but this harness is not installed locally. " | ||
| f"Installed harnesses: {available_harnesses or ['<none>']}. " | ||
| "Install the corresponding NeMo Evaluator wheel, or run with Docker." | ||
| ) | ||
| if task_name not in harness_tasks: | ||
| available_tasks = sorted(harness_tasks) | ||
| raise ValueError( | ||
| f"Task '{task_query}' is not available in installed harness '{harness_name}'. " | ||
| f"Available tasks in this harness: {available_tasks or ['<none>']}. " | ||
| "Install a wheel that contains this task, or run with Docker." | ||
| ) | ||
| return | ||
|
|
||
| matching_harnesses = [ | ||
| harness for harness, tasks in available_tasks_by_harness.items() if task_name in tasks | ||
| ] | ||
| if not matching_harnesses: | ||
| raise ValueError( | ||
| f"Task '{task_query}' is not available in locally installed NeMo Evaluator packages. " | ||
| "Install a wheel that contains this task, or run with Docker." | ||
| ) | ||
|
|
||
|
|
||
| @register_executor("local") | ||
| class LocalExecutor(BaseExecutor): | ||
| @classmethod | ||
|
|
@@ -83,12 +139,21 @@ def execute_eval(cls, cfg: DictConfig, dry_run: bool = False) -> str: | |
| Raises: | ||
|
Collaborator
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. [Power Review] Per the knowledge document (Monolithic Functions >50 lines): 'AI generates long, flat functions instead of composing smaller units. Functions exceeding ~50 lines are harder to test, review, and maintain.' Based on the diff line numbers, 💡 Suggestion: Extract cohesive blocks into well-named helpers: validation/preconditions ( |
||
| RuntimeError: If the run script fails. | ||
| """ | ||
| use_docker = bool(cfg.execution.get("use_docker", True)) | ||
|
Collaborator
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. [Power Review] Per the knowledge document (Geological Layers): 'New code bolted on top of old code without integrating — duplicate data paths, parallel config mechanisms, wrapper-on-wrapper stacking.' The diff adds ~11 💡 Suggestion: Extract Docker-specific and host-specific task-building logic into separate private methods (e.g.,
Collaborator
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. [Power Review] 💡 Missing test for boundary between use_docker=true and use_docker=false in kill_job · 80% confidence Per the knowledge document (Missing Test for 'Just Inside / Just Outside' Pair): the 💡 Suggestion: Add test cases in |
||
|
|
||
| # Check if docker is available (skip in dry_run mode) | ||
| if not dry_run and shutil.which("docker") is None: | ||
| if use_docker and not dry_run and shutil.which("docker") is None: | ||
| raise RuntimeError( | ||
| "Docker is not installed or not in PATH. " | ||
| "Please install Docker to run local evaluations." | ||
| ) | ||
| if not use_docker and cfg.deployment.type != "none": | ||
| raise ValueError( | ||
| "execution.use_docker=false is only supported with deployment.type=none." | ||
| ) | ||
| local_available_tasks: dict[str, set[str]] | None = None | ||
| if not use_docker: | ||
| local_available_tasks = _get_local_available_tasks() | ||
|
|
||
| # Generate invocation ID for this evaluation run | ||
| invocation_id = generate_invocation_id() | ||
|
|
@@ -136,6 +201,12 @@ def execute_eval(cls, cfg: DictConfig, dry_run: bool = False) -> str: | |
| container=task.get("container"), | ||
| endpoint_type=task.get("endpoint_type"), | ||
| ) | ||
| if not use_docker: | ||
| _validate_task_available_locally( | ||
| task_query=task.name, | ||
| task_definition=task_definition, | ||
| available_tasks_by_harness=local_available_tasks or {}, | ||
|
Collaborator
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. [Power Review] 💡 Defensive At line 208, 💡 Suggestion: Replace |
||
| ) | ||
|
|
||
| # Track unlisted tasks for safeguard check | ||
| if task_definition.get("is_unlisted", False): | ||
|
|
@@ -194,11 +265,15 @@ def execute_eval(cls, cfg: DictConfig, dry_run: bool = False) -> str: | |
| dataset_mount_container = None | ||
| dataset_env_var_value = None | ||
| if "dataset_dir" in task: | ||
| dataset_mount_host = task["dataset_dir"] | ||
| # Get container mount path (default to /datasets if not specified) | ||
| dataset_mount_container = task.get("dataset_mount_path", "/datasets") | ||
| # Set NEMO_EVALUATOR_DATASET_DIR to the container mount path | ||
| dataset_env_var_value = dataset_mount_container | ||
| if use_docker: | ||
| dataset_mount_host = task["dataset_dir"] | ||
| # Get container mount path (default to /datasets if not specified) | ||
| dataset_mount_container = task.get("dataset_mount_path", "/datasets") | ||
| # Set NEMO_EVALUATOR_DATASET_DIR to the container mount path | ||
| dataset_env_var_value = dataset_mount_container | ||
| else: | ||
| # In no-docker mode, pass dataset_dir directly to local process. | ||
| dataset_env_var_value = task["dataset_dir"] | ||
|
|
||
| # Build env_groups for secrets file generation | ||
| env_groups = {} | ||
|
|
@@ -225,7 +300,12 @@ def execute_eval(cls, cfg: DictConfig, dry_run: bool = False) -> str: | |
| task_output_dir = output_dir / task.name | ||
| task_output_dir.mkdir(parents=True, exist_ok=True) | ||
| eval_factory_command_struct = get_eval_factory_command( | ||
| cfg, task, task_definition | ||
| cfg, | ||
| task, | ||
| task_definition, | ||
| output_dir=( | ||
| "/results" if use_docker else str(task_output_dir / "artifacts") | ||
|
Collaborator
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. [Power Review] The 💡 Suggestion: Import and use the |
||
| ), | ||
| ) | ||
| eval_factory_command = eval_factory_command_struct.cmd | ||
| # The debug comment for placing into the script and easy debug. Reason | ||
|
|
@@ -257,6 +337,7 @@ def execute_eval(cls, cfg: DictConfig, dry_run: bool = False) -> str: | |
| "dataset_mount_host": dataset_mount_host, | ||
| "dataset_mount_container": dataset_mount_container, | ||
| "dataset_env_var_value": dataset_env_var_value, | ||
| "run_with_docker": use_docker, | ||
| } | ||
| evaluation_tasks.append(evaluation_task) | ||
|
|
||
|
|
@@ -271,6 +352,7 @@ def execute_eval(cls, cfg: DictConfig, dry_run: bool = False) -> str: | |
| evaluation_tasks=[evaluation_task], | ||
| auto_export_destinations=auto_export_destinations, | ||
| extra_docker_args=extra_docker_args, | ||
| has_docker_tasks=use_docker, | ||
| ).rstrip("\n") | ||
| + "\n" | ||
| ) | ||
|
|
@@ -288,6 +370,7 @@ def execute_eval(cls, cfg: DictConfig, dry_run: bool = False) -> str: | |
| evaluation_tasks=evaluation_tasks, | ||
| auto_export_destinations=auto_export_destinations, | ||
| extra_docker_args=extra_docker_args, | ||
| has_docker_tasks=use_docker, | ||
| ).rstrip("\n") | ||
| + "\n" | ||
| ) | ||
|
|
@@ -386,8 +469,13 @@ def execute_eval(cls, cfg: DictConfig, dry_run: bool = False) -> str: | |
| executor="local", | ||
| data={ | ||
| "output_dir": str(evaluation_task["output_dir"]), | ||
| "container": evaluation_task["client_container_name"], | ||
| "container": ( | ||
| evaluation_task["client_container_name"] | ||
| if use_docker | ||
| else "" | ||
| ), | ||
| "eval_image": evaluation_task["eval_image"], | ||
| "use_docker": use_docker, | ||
| }, | ||
| config=OmegaConf.to_object(cfg), | ||
| ) | ||
|
|
@@ -711,33 +799,46 @@ def kill_job(job_id: str) -> None: | |
| f"Job {job_id} is not a local job (executor: {job_data.executor})" | ||
| ) | ||
|
|
||
| # Get container name from database | ||
| container_name = job_data.data.get("container") | ||
| if not container_name: | ||
| raise ValueError(f"No container name found for job {job_id}") | ||
| use_docker = bool(job_data.data.get("use_docker", True)) | ||
| output_dir = pathlib.Path(job_data.data.get("output_dir", "")) | ||
|
Collaborator
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. [Power Review] Per the knowledge document (Unhandled Empty or Null Input): a function receives an empty string and proceeds without a guard. 💡 Suggestion: Validate that
Collaborator
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. [Power Review] 💡 Empty-string fallback for output_dir could create relative PID file path · 70% confidence Per the knowledge document (WARNING: Error Message Without Context / ERROR: Null/None Dereference on Fallible Return): 💡 Suggestion: Validate that output_dir_str = job_data.data.get("output_dir", "")
if not output_dir_str:
raise ValueError(f"No output directory found for job {job_id}")
output_dir = pathlib.Path(output_dir_str) |
||
| container_name = job_data.data.get("container") or "" | ||
|
|
||
| killed_something = False | ||
|
|
||
| # First, try to stop the Docker container if it's running | ||
| result = subprocess.run( | ||
| shlex.split(f"docker stop {container_name}"), | ||
| capture_output=True, | ||
| text=True, | ||
| timeout=30, | ||
| ) | ||
| if result.returncode == 0: | ||
| killed_something = True | ||
| # Don't raise error if container doesn't exist (might be still pulling) | ||
|
|
||
| # Find and kill Docker processes for this container | ||
| result = subprocess.run( | ||
| shlex.split(f"pkill -f 'docker run.*{container_name}'"), | ||
| capture_output=True, | ||
| text=True, | ||
| timeout=10, | ||
| ) | ||
| if result.returncode == 0: | ||
| killed_something = True | ||
| # Try to stop script process group if a pid file is present. | ||
| pid_file = output_dir / "logs" / "stage.pid" | ||
|
Collaborator
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This unfortunately introduces TOCTOU vulnerability. If anything happens to
Collaborator
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. [Power Review] The knowledge document explicitly flags 💡 Suggestion: Remove the try:
pid = int(pid_file.read_text().strip())
if hasattr(os, 'killpg'):
os.killpg(pid, signal.SIGTERM)
else:
os.kill(pid, signal.SIGTERM)
killed_something = True
except (OSError, ValueError):
pass
Collaborator
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. [Power Review] Per the knowledge document's 'Temporary File or Directory Not Cleaned Up' rule: a temporary file is created but never deleted, especially on error paths. The kill_job method reads 💡 Suggestion: After the try/except block (around line 819), add cleanup of the PID file regardless of whether the kill succeeded or failed. For example: |
||
| if pid_file.exists(): | ||
| try: | ||
| pid = int(pid_file.read_text().strip()) | ||
|
Collaborator
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. [Power Review] 🔴 Missing validation that PID is positive before os.killpg — PID 0 kills caller's process group · 95% confidence Per the knowledge document (Missing Input Validation, CWE-20): data arriving from outside the trust boundary (file read) is used without checking range. The PID is read from a file ( 💡 Suggestion: After parsing the PID, validate it is strictly positive before use: |
||
| if hasattr(os, "killpg"): | ||
| os.killpg(pid, signal.SIGTERM) | ||
|
Collaborator
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. [Power Review] Per knowledge doc 'Wrong Variable or Field Referenced' and 'Stale or Cached Value Used After Update': 💡 Suggestion: Either: (1) use
Collaborator
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. [Power Review] os.killpg(pgid, sig) sends a signal to an entire process group identified by its process group ID (PGID). The code reads a PID from a file (written by 💡 Suggestion: Either (a) use os.getpgid(pid) to resolve the actual PGID before calling os.killpg, or (b) use os.kill(pid, signal.SIGTERM) to signal the script process directly (though children may survive), or (c) ensure the script is started in its own process group (e.g., subprocess.Popen with start_new_session=True) so that PID == PGID is guaranteed. |
||
| else: | ||
| os.kill(pid, signal.SIGTERM) | ||
| killed_something = True | ||
| except (OSError, ValueError): | ||
|
Collaborator
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. [Power Review] Per the knowledge document (ERROR: Swallowed Exception With No Action): 'An exception is caught and silently ignored. The caller has no way to know the operation failed.' Here, 💡 Suggestion: At minimum, log the exception at debug/warning level so operators can diagnose kill failures. Consider also narrowing the OSError handling — e.g., only silencing
Collaborator
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. [Power Review] The knowledge document states: 'Never swallow exceptions without logging or re-raising.' While the exceptions caught here are specific (OSError, ValueError) rather than bare, they are silently discarded with 💡 Suggestion: Add a debug/warning log for the caught exceptions. For example: |
||
| pass | ||
|
|
||
| if use_docker and container_name: | ||
| # First, try to stop the Docker container if it's running | ||
| result = subprocess.run( | ||
| shlex.split(f"docker stop {container_name}"), | ||
|
Collaborator
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. [Power Review] Per knowledge doc 'Injection: Untrusted Data in Executable Context' (CWE-78): 💡 Suggestion: Use an explicit argument list instead of |
||
| capture_output=True, | ||
| text=True, | ||
| timeout=30, | ||
| ) | ||
| if result.returncode == 0: | ||
| killed_something = True | ||
| # Don't raise error if container doesn't exist (might be still pulling) | ||
|
|
||
| # Find and kill Docker processes for this container | ||
| result = subprocess.run( | ||
| shlex.split(f"pkill -f 'docker run.*{container_name}'"), | ||
| capture_output=True, | ||
| text=True, | ||
| timeout=10, | ||
| ) | ||
| if result.returncode == 0: | ||
| killed_something = True | ||
|
|
||
| # If we successfully killed something, mark as killed | ||
| if killed_something: | ||
|
|
@@ -758,7 +859,13 @@ def kill_job(job_id: str) -> None: | |
| # Use common helper to get informative error message based on job status | ||
| current_status = status_list[0].state if status_list else None | ||
| error_msg = LocalExecutor.get_kill_failure_message( | ||
| job_id, f"container: {container_name}", current_status | ||
| job_id, | ||
| ( | ||
| f"container: {container_name}" | ||
| if container_name | ||
| else f"pid_file: {pid_file}" | ||
| ), | ||
| current_status, | ||
| ) | ||
| raise RuntimeError(error_msg) | ||
|
|
||
|
|
||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
[Power Review] 💡 Missing defensive handling when both harness and task name are empty in _validate_task_available_locally · 70% confidence
Per the knowledge document (Lack of Defensive default/else Branch): the function handles two cases —
harness_nameis truthy (lines 98-114) andharness_nameis falsy (lines 116-123). In the falsy branch, iftask_nameis also empty (bothharnessandtaskkeys are missing or empty intask_definition), the code searches for""across all harnesses and produces a generic error message. A more explicit guard at the top of the function would provide a clearer, earlier error message for this malformed-input boundary.💡 Suggestion: Add an early guard:
if not harness_name and not task_name: raise ValueError(f"Task '{task_query}' has no harness or task name defined in its task definition.")before the main branching logic.