Skip to content

Commit d896c90

Browse files
Refactor path validation and error handling in discovery functions
- Consolidated path validation logic to prevent traversal attacks. - Improved error handling in subprocess calls for test discovery, preserving command arguments for better context. - Updated Windows-specific wait times for file release during cleanup. - Enhanced readability and maintainability of path resolution functions.
1 parent 207d671 commit d896c90

3 files changed

Lines changed: 47 additions & 56 deletions

File tree

codeflash/code_utils/git_worktree_utils.py

Lines changed: 14 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -8,10 +8,7 @@
88
import tempfile
99
import time
1010
from pathlib import Path
11-
from typing import TYPE_CHECKING, Any, Optional
12-
13-
if TYPE_CHECKING:
14-
from collections.abc import Callable
11+
from typing import Any, Callable, Optional
1512

1613
import git
1714

@@ -24,6 +21,8 @@
2421
# Constants for Windows retry logic
2522
MAX_WINDOWS_RETRIES = 3
2623
INITIAL_RETRY_DELAY_SECONDS = 0.5
24+
WINDOWS_FILE_RELEASE_WAIT_LONG = 0.3
25+
WINDOWS_FILE_RELEASE_WAIT_SHORT = 0.1
2726

2827

2928
def create_worktree_snapshot_commit(worktree_dir: Path, commit_message: str) -> None:
@@ -212,14 +211,18 @@ def _manual_cleanup_worktree_directory(worktree_dir: Path) -> None:
212211

213212
# Brief wait on Windows to allow file handles to be released
214213
if is_windows:
215-
wait_time = 0.3 if attempt_num < max_retries else 0.1
214+
wait_time = (
215+
WINDOWS_FILE_RELEASE_WAIT_LONG if attempt_num < max_retries else WINDOWS_FILE_RELEASE_WAIT_SHORT
216+
)
216217
time.sleep(wait_time)
217218

218219
# Check if removal was successful
219220
if not worktree_dir.exists():
220221
return
221222

222-
except Exception: # noqa: S110
223+
except (OSError, PermissionError):
224+
# Silently ignore permission errors during cleanup attempts
225+
# These are expected on Windows when files are locked
223226
pass
224227

225228

@@ -251,9 +254,11 @@ def _validate_worktree_path_safety(worktree_dir: Path) -> bool:
251254
return worktree_dir_resolved != worktree_dirs_resolved
252255

253256

254-
def _create_windows_rmtree_error_handler() -> Callable[
255-
[Callable[[str], None], str, tuple[type[BaseException], BaseException, Any]], None
256-
]:
257+
# Type alias for shutil.rmtree error handler signature
258+
ErrorHandler = Callable[[Callable[[str], None], str, tuple[type[BaseException], BaseException, Any]], None]
259+
260+
261+
def _create_windows_rmtree_error_handler() -> ErrorHandler:
257262
"""Create an error handler for shutil.rmtree that handles Windows-specific issues.
258263
259264
This handler attempts to remove read-only attributes when encountering permission errors.

codeflash/discovery/discover_unit_tests.py

Lines changed: 6 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -599,26 +599,25 @@ def discover_tests_pytest(
599599
# Prevent console window spawning on Windows which can cause hangs in LSP
600600
run_kwargs["creationflags"] = subprocess.CREATE_NO_WINDOW
601601

602+
cmd_list = [SAFE_SYS_EXECUTABLE, discovery_script, str(project_root), str(tests_root), str(tmp_pickle_path)]
602603
with custom_addopts():
603604
try:
604-
result = subprocess.run(
605-
[SAFE_SYS_EXECUTABLE, discovery_script, str(project_root), str(tests_root), str(tmp_pickle_path)],
606-
check=False,
607-
**run_kwargs,
608-
)
605+
result = subprocess.run(cmd_list, check=False, **run_kwargs)
609606
except subprocess.TimeoutExpired:
610607
logger.error(
611608
f"Test discovery subprocess timed out after {run_kwargs.get('timeout', 600)} seconds. "
612609
f"Command: {discovery_script}"
613610
)
614-
result = subprocess.CompletedProcess(args=[], returncode=-1, stdout="", stderr="Timeout")
611+
# Preserve command args for better error context
612+
result = subprocess.CompletedProcess(args=cmd_list, returncode=-1, stdout="", stderr="Timeout")
615613
except (OSError, subprocess.SubprocessError, ValueError) as e:
616614
logger.error(
617615
f"Test discovery subprocess failed with error: {e}. "
618616
f"Command: {discovery_script}, "
619617
f"Project root: {project_root}, Tests root: {tests_root}"
620618
)
621-
result = subprocess.CompletedProcess(args=[], returncode=-1, stdout="", stderr=str(e))
619+
# Preserve command args for better error context
620+
result = subprocess.CompletedProcess(args=cmd_list, returncode=-1, stdout="", stderr=str(e))
622621

623622
try:
624623
# Check if pickle file exists before trying to read it

codeflash/discovery/functions_to_optimize.py

Lines changed: 27 additions & 40 deletions
Original file line numberDiff line numberDiff line change
@@ -679,22 +679,17 @@ def _validate_path_no_traversal(path: Path | str) -> bool:
679679
True if path is safe (no traversal components), False otherwise
680680
681681
"""
682-
path_str = str(path)
683-
# Check for path traversal attempts
684-
# Check for absolute paths that might escape (additional safety check)
685-
# Note: We allow absolute paths as they're needed for worktree paths
686-
return ".." not in path_str
687-
688-
def _resolve_path(path: Path | str) -> Path:
689-
# Use strict=False so we don't fail on paths that don't exist yet (e.g. worktree paths)
690-
# SECURITY: Validate path before resolution to prevent traversal attacks
691-
if not _validate_path_no_traversal(path):
692-
error_msg = f"Path contains traversal components: {path}"
693-
raise ValueError(error_msg)
694-
return Path(path).resolve(strict=False)
682+
try:
683+
path_obj = Path(path)
684+
except (ValueError, OSError):
685+
return False
686+
else:
687+
# Check if any part is exactly '..' (not just containing '..' as a substring)
688+
# This avoids false positives like files named "config..bak"
689+
return ".." not in path_obj.parts
695690

696-
def _resolve_path_consistent(path: Path | str) -> Path:
697-
"""Resolve path consistently: use strict resolution if path exists, otherwise non-strict.
691+
def _resolve_path_safely(path: Path | str) -> Path:
692+
"""Resolve path, preferring strict resolution but falling back to non-strict.
698693
699694
SECURITY: This function validates paths to prevent traversal attacks before resolution.
700695
Paths should come from trusted sources (git operations, file system discovery),
@@ -716,42 +711,34 @@ def _resolve_path_consistent(path: Path | str) -> Path:
716711
raise ValueError(error_msg)
717712

718713
path_obj = Path(path)
719-
if path_obj.exists():
720-
try:
721-
return path_obj.resolve()
722-
except (OSError, RuntimeError):
723-
return _resolve_path(path)
724-
return _resolve_path(path)
714+
try:
715+
# Prefer strict resolution if path exists, otherwise use non-strict
716+
return path_obj.resolve(strict=True) if path_obj.exists() else path_obj.resolve(strict=False)
717+
except (OSError, RuntimeError):
718+
# Fallback to non-strict resolution on errors
719+
return path_obj.resolve(strict=False)
725720

726721
# Resolve all root paths to absolute paths for consistent comparison
727722
# Use consistent resolution: strict for existing paths, non-strict for non-existent
728-
tests_root_resolved = _resolve_path_consistent(tests_root)
729-
module_root_resolved = _resolve_path_consistent(module_root)
723+
tests_root_resolved = _resolve_path_safely(tests_root)
724+
module_root_resolved = _resolve_path_safely(module_root)
730725

731726
# Resolve ignore paths and submodule paths
732-
ignore_paths_resolved = [_resolve_path_consistent(p) for p in ignore_paths]
733-
submodule_paths_resolved = [_resolve_path_consistent(p) for p in submodule_paths]
727+
ignore_paths_resolved = [_resolve_path_safely(p) for p in ignore_paths]
728+
submodule_paths_resolved = [_resolve_path_safely(p) for p in submodule_paths]
734729

735730
# We desperately need Python 3.10+ only support to make this code readable with structural pattern matching
736731
for file_path_path, functions in modified_functions.items():
737732
_functions = functions
738733
# SECURITY: Validate file path before processing to prevent traversal attacks
734+
# Note: Paths come from trusted sources (git operations), but we validate defensively
739735
if not _validate_path_no_traversal(file_path_path):
740-
logger.warning(f"Skipping file with traversal components: {file_path_path}")
741-
continue
742-
# Resolve file path to absolute path
743-
# Convert to Path if it's a string (e.g., from get_functions_within_git_diff)
744-
file_path_obj = Path(file_path_path)
745-
# Use consistent resolution: try strict first (for existing files), fall back to non-strict
746-
if file_path_obj.exists():
747-
try:
748-
file_path_resolved = file_path_obj.resolve()
749-
except (OSError, RuntimeError):
750-
file_path_resolved = _resolve_path(file_path_path)
751-
else:
752-
file_path_resolved = _resolve_path(file_path_path)
753-
754-
file_path = str(file_path_obj)
736+
error_msg = f"Path contains traversal components: {file_path_path}"
737+
logger.error(error_msg)
738+
raise ValueError(error_msg)
739+
# Resolve file path to absolute path using consolidated resolution logic
740+
file_path_resolved = _resolve_path_safely(file_path_path)
741+
file_path = str(file_path_path) # Keep original path string for compatibility
755742

756743
# Check if file is in tests root using resolved paths
757744
try:

0 commit comments

Comments
 (0)