diff --git a/scripts/coding_discovery_tools/ai_tools_discovery.py b/scripts/coding_discovery_tools/ai_tools_discovery.py index 26392484..1371fdb6 100644 --- a/scripts/coding_discovery_tools/ai_tools_discovery.py +++ b/scripts/coding_discovery_tools/ai_tools_discovery.py @@ -76,7 +76,7 @@ CursorSkillsExtractorFactory, ClineSkillsExtractorFactory, ) - from .utils import send_report_to_backend, send_scan_event, send_discovery_metrics, get_user_info, get_all_users_macos, get_all_users_windows, get_all_users_linux, load_pending_reports, save_failed_reports, report_to_sentry, get_claude_subscription_type, get_cursor_subscription_type, in_container, _get_queue_file_path + from .utils import send_report_to_backend, send_scan_event, send_discovery_metrics, get_user_info, get_audit_user, get_all_users_macos, get_all_users_windows, get_all_users_linux, load_pending_reports, save_failed_reports, report_to_sentry, get_claude_subscription_type, get_cursor_subscription_type, in_container, _get_queue_file_path from .linux_extraction_helpers import linux_home_for_user from .logging_helpers import configure_logger, log_rules_details, log_mcp_details, log_settings_details from .settings_transformers import transform_settings_to_backend_format @@ -130,7 +130,7 @@ CursorSkillsExtractorFactory, ClineSkillsExtractorFactory, ) - from scripts.coding_discovery_tools.utils import send_report_to_backend, send_scan_event, send_discovery_metrics, get_user_info, get_all_users_macos, get_all_users_windows, get_all_users_linux, load_pending_reports, save_failed_reports, report_to_sentry, get_claude_subscription_type, get_cursor_subscription_type, in_container, _get_queue_file_path + from scripts.coding_discovery_tools.utils import send_report_to_backend, send_scan_event, send_discovery_metrics, get_user_info, get_audit_user, get_all_users_macos, get_all_users_windows, get_all_users_linux, load_pending_reports, save_failed_reports, report_to_sentry, get_claude_subscription_type, get_cursor_subscription_type, in_container, _get_queue_file_path from scripts.coding_discovery_tools.linux_extraction_helpers import linux_home_for_user from scripts.coding_discovery_tools.logging_helpers import configure_logger, log_rules_details, log_mcp_details, log_settings_details from scripts.coding_discovery_tools.settings_transformers import transform_settings_to_backend_format @@ -141,6 +141,7 @@ logger = logging.getLogger(__name__) payload_logger = logging.getLogger(__name__ + ".payload") +detail_logger = logging.getLogger(__name__ + ".detail") configure_logger() @@ -834,7 +835,7 @@ def _merge_mcp_configs_into_projects( "rules": [] } new_count += 1 - logger.info(f" Added new project from MCP config: {project_path} ({num_servers} MCP servers)") + detail_logger.info(f" Added new project from MCP config: {project_path} ({num_servers} MCP servers)") if mcp_projects: logger.info(f" MCP config merge complete: {len(mcp_projects)} projects processed ({merged_count} merged, {new_count} new), {total_mcp_servers} total MCP servers") @@ -883,7 +884,7 @@ def _merge_claude_mcp_configs_into_projects( if additional_mcp_data and "additionalMcpData" not in projects_dict[project_path]: projects_dict[project_path]["additionalMcpData"] = additional_mcp_data merged_count += 1 - logger.info(f" Merged Claude MCP config into existing project: {project_path} ({num_servers} MCP servers)") + detail_logger.info(f" Merged Claude MCP config into existing project: {project_path} ({num_servers} MCP servers)") # Ensure rules field exists if "rules" not in projects_dict[project_path]: projects_dict[project_path]["rules"] = [] @@ -898,7 +899,7 @@ def _merge_claude_mcp_configs_into_projects( new_project["additionalMcpData"] = additional_mcp_data projects_dict[project_path] = new_project new_count += 1 - logger.info(f" Added new project from Claude MCP config: {project_path} ({num_servers} MCP servers)") + detail_logger.info(f" Added new project from Claude MCP config: {project_path} ({num_servers} MCP servers)") if mcp_projects: logger.info(f" Claude MCP config merge complete: {len(mcp_projects)} projects processed ({merged_count} merged, {new_count} new), {total_mcp_servers} total MCP servers") @@ -939,7 +940,7 @@ def _merge_skills_into_projects( projects_dict[project_path]["skills"] = [] projects_dict[project_path]["skills"].extend(skills) merged_count += 1 - logger.info(f" Merged skills into project: {project_path} ({num_skills} skills)") + detail_logger.info(f" Merged skills into project: {project_path} ({num_skills} skills)") else: # Create new project entry with skills array projects_dict[project_path] = { @@ -949,7 +950,7 @@ def _merge_skills_into_projects( "mcpServers": [] } new_count += 1 - logger.info(f" Added new project from skills: {project_path} ({num_skills} skills)") + detail_logger.info(f" Added new project from skills: {project_path} ({num_skills} skills)") if skills_projects: logger.info(f" Skills merge complete: {len(skills_projects)} projects processed ({merged_count} merged, {new_count} new), {total_skills} total skills") @@ -986,7 +987,7 @@ def _merge_rules_into_projects( projects_dict[project_path]["rules"] = [] projects_dict[project_path]["rules"].extend(rules) merged_count += 1 - logger.info(f" Merged rules into existing project: {project_path} ({num_rules} rules)") + detail_logger.info(f" Merged rules into existing project: {project_path} ({num_rules} rules)") else: # Create new project entry with rules projects_dict[project_path] = { @@ -996,7 +997,7 @@ def _merge_rules_into_projects( "mcpServers": [] } new_count += 1 - logger.info(f" Added new project from rules: {project_path} ({num_rules} rules)") + detail_logger.info(f" Added new project from rules: {project_path} ({num_rules} rules)") if rules_projects: logger.info(f" Rules merge complete: {len(rules_projects)} projects processed ({merged_count} merged, {new_count} new), {total_rules} total rules") @@ -1175,12 +1176,12 @@ def _process_claude_code_tool(self, tool: Dict) -> Dict[str, Dict]: if self._claude_settings_extractor: try: settings = self._claude_settings_extractor.extract_settings() - logger.info(f" Settings extraction returned: {settings is not None}, count: {len(settings) if settings else 0}") + detail_logger.info(f" Settings extraction returned: {settings is not None}, count: {len(settings) if settings else 0}") if settings: num_settings = len(settings) logger.info(f" ✓ Found {num_settings} settings file(s)") tool["_settings"] = settings - logger.info(f" ✓ Stored _settings in tool dict (keys: {list(tool.keys())})") + detail_logger.info(f" ✓ Stored _settings in tool dict (keys: {list(tool.keys())})") log_settings_details(settings, tool_name) else: logger.warning(" ⚠ No settings found - extract_settings() returned None or empty list") @@ -2091,8 +2092,8 @@ def process_single_tool(self, tool: Dict) -> Dict: if "_config_path" in tool: tool_dict["_config_path"] = tool["_config_path"] - logger.info(f" Checking for settings in tool dict for {tool_name}...") - logger.info(f" Tool dict keys: {list(tool.keys())}") + detail_logger.info(f" Checking for settings in tool dict for {tool_name}...") + detail_logger.info(f" Tool dict keys: {list(tool.keys())}") if "_settings" in tool: try: @@ -2110,13 +2111,13 @@ def process_single_tool(self, tool: Dict) -> Dict: permissions = None else: settings_list = tool["_settings"] - logger.info(f" ✓ Found _settings in tool dict, count: {len(settings_list) if settings_list else 0}") + detail_logger.info(f" ✓ Found _settings in tool dict, count: {len(settings_list) if settings_list else 0}") permissions = transform_settings_to_backend_format(settings_list) if permissions: tool_dict["permissions"] = permissions logger.info(f" ✓ Added permissions to {tool_name} report") - logger.info(f" Permissions keys: {list(permissions.keys())}") + detail_logger.info(f" Permissions keys: {list(permissions.keys())}") else: logger.warning(f" ⚠ Permissions transformation returned None for {tool_name}") logger.warning(f" Settings that were passed: {tool['_settings']}") @@ -2199,14 +2200,11 @@ def main(): verbosity.add_argument( '--dump', action='store_true', - help='Also log the full per-tool JSON payload sent to the backend.', - ) - verbosity.add_argument( - '--summary', - action='store_true', - help='Suppress per-tool detail output: Report Summary box, transport ' - 'lines (Sending / ✓ sent), and logging_helpers sub-boxes. Keeps ' - 'headline output, per-tool totals, warnings, and errors.', + help='Show ALL detail logs: per-file rule/MCP/settings boxes, ' + 'per-project merge/add lines, internal diagnostics, the per-tool ' + 'Report Summary box, and transport lines (Sending / ✓ sent). ' + 'Default output is concise (headlines, per-tool totals, warnings, ' + 'errors). For the JSON sent to the backend, use --payload.', ) verbosity.add_argument( '--payload', @@ -2219,12 +2217,17 @@ def main(): if args.payload: logging.getLogger().setLevel(logging.WARNING) payload_logger.setLevel(logging.INFO) - elif args.summary: + elif not args.dump: + # Concise is the DEFAULT; --dump restores full detail. Quiet the + # per-file rule/MCP/settings sub-boxes (logging_helpers)... try: from scripts.coding_discovery_tools import logging_helpers as _lh except ImportError: from . import logging_helpers as _lh logging.getLogger(_lh.__name__).setLevel(logging.WARNING) + # ...plus the per-item merge/add lines and internal dict diagnostics. + # Roll-up totals and headlines stay on `logger`. + detail_logger.setLevel(logging.WARNING) # Hook-triggered invocations pass the api_key via env so it never appears # in argv / /proc//cmdline. CLI --api-key remains supported for MDM # and direct-script usage. @@ -2324,6 +2327,10 @@ def time_step(name: str, phase: str) -> Iterator[None]: # of one idempotent "failed" event. _cleanup_done = [False] _finished = [False] + # The real-human-or-None audit identity, captured once below via + # get_audit_user(). Initialized here so the failure closures can pass it + # safely even if they fire before capture (in which case it is still None). + system_user = None def _mark_run_failed(error_type: str, message: str) -> None: """Best-effort: tell the backend this run failed. Idempotent (a flag race @@ -2344,6 +2351,7 @@ def _mark_run_failed(error_type: str, message: str) -> None: "timestamp": datetime.utcnow().isoformat() + "Z", }, sentry_context=sentry_ctx, + system_user=system_user, ) except Exception: pass @@ -2488,9 +2496,20 @@ def _on_term_signal(signum, _frame) -> None: logger.info(f" - {user}") logger.info("") - # Get system_user once (who is running the script) for audit purposes + # Get the real human running the script (or None) once, for audit + # attribution. This must be a real human OR None — never root / + # _service / SYSTEM / a Windows machine account / "unknown" — so the + # backend never attributes an empty machine to a service identity. The + # data-report path (generate_single_tool_report) falls back to + # home_user when this is None. with time_step("get_system_user", "detect"): - system_user = get_user_info() + system_user = get_audit_user() + if system_user is None: + logger.debug( + "Audit system_user resolved to None (non-human or undetectable " + "context); tool reports fall back to home_user and lifecycle " + "events omit system_user" + ) sentry_ctx["system_user"] = system_user # Send scan in_progress event BEFORE scanning @@ -2498,7 +2517,7 @@ def _on_term_signal(signum, _frame) -> None: with time_step("send_in_progress", "send"): success, _ = send_scan_event( args.domain, args.api_key, device_id, run_id, "in_progress", - args.app_name, sentry_context=sentry_ctx + args.app_name, sentry_context=sentry_ctx, system_user=system_user ) if success: logger.info("✓ Scan in_progress event sent successfully") @@ -2674,7 +2693,7 @@ def _on_term_signal(signum, _frame) -> None: 'rules': num_rules }) - if not args.summary and not args.payload: + if args.dump: logger.info("") logger.info(" ┌─ Report Summary ────────────────────────────────────────────────") logger.info(f" │ User: {user_name}") @@ -2707,7 +2726,7 @@ def _on_term_signal(signum, _frame) -> None: logger.info(" └──────────────────────────────────────────────────────────────────") logger.info("") - if args.dump or args.payload: + if args.payload: payload_logger.info(" Complete JSON payload being sent to backend:") payload_logger.info(" " + "=" * 70) try: @@ -2747,16 +2766,16 @@ def _on_term_signal(signum, _frame) -> None: cached_hash = discovery_cache.get_cached_hash(tool_name, user_name) if local_payload_hash and cached_hash == local_payload_hash: - if not args.summary and not args.payload: + if args.dump: logger.info(f" · {tool_name} unchanged for user {user_name} (hash match), skipping upload") else: - if not args.summary and not args.payload: + if args.dump: logger.info(f" Sending {tool_name} report for user {user_name} to backend...") with time_step("send_report_per_tool_user", "send"): success, retryable = send_report_to_backend(args.domain, args.api_key, single_tool_report, args.app_name, sentry_context=sentry_ctx) if success: - if not args.summary and not args.payload: + if args.dump: logger.info(f" ✓ {tool_name} report for user {user_name} sent successfully") if local_payload_hash: discovery_cache.update_tool(tool_name, user_name, local_payload_hash) @@ -2765,7 +2784,7 @@ def _on_term_signal(signum, _frame) -> None: if retryable: failed_reports.append(single_tool_report) - if not args.summary and not args.payload: + if args.dump: logger.info("") except PermissionError as e: @@ -2783,7 +2802,7 @@ def _on_term_signal(signum, _frame) -> None: success, _ = send_scan_event( args.domain, args.api_key, device_id, run_id, "failed", args.app_name, home_user=user_name, scan_error=scan_error, - sentry_context=sentry_ctx + sentry_context=sentry_ctx, system_user=system_user ) if not success: logger.warning("✗ Failed to send scan failed event") @@ -2860,7 +2879,7 @@ def _on_term_signal(signum, _frame) -> None: logger.info("Sending scan completed event...") success, _ = send_scan_event( args.domain, args.api_key, device_id, run_id, "completed", - args.app_name, sentry_context=sentry_ctx + args.app_name, sentry_context=sentry_ctx, system_user=system_user ) if success: logger.info("✓ Scan completed event sent successfully") diff --git a/scripts/coding_discovery_tools/coding_tool_factory.py b/scripts/coding_discovery_tools/coding_tool_factory.py index 36358da2..e9842252 100644 --- a/scripts/coding_discovery_tools/coding_tool_factory.py +++ b/scripts/coding_discovery_tools/coding_tool_factory.py @@ -736,7 +736,7 @@ def create_all_tool_detectors(os_name: Optional[str] = None) -> list: ToolDetectorFactory.create_roo_detector(os_name), ] - # Add Claude Cowork detector for macOS and Windows + # Add Claude Cowork detector for macOS, Windows, and Linux claude_cowork_detector = ToolDetectorFactory.create_claude_cowork_detector(os_name) if claude_cowork_detector is not None: detectors.append(claude_cowork_detector) diff --git a/scripts/coding_discovery_tools/linux/claude_cowork/claude_cowork.py b/scripts/coding_discovery_tools/linux/claude_cowork/claude_cowork.py index 8f06e9d4..034b901a 100644 --- a/scripts/coding_discovery_tools/linux/claude_cowork/claude_cowork.py +++ b/scripts/coding_discovery_tools/linux/claude_cowork/claude_cowork.py @@ -3,16 +3,20 @@ Cowork is the agentic feature of the Claude Desktop app. We treat it as a distinct tool from Claude Code (which is the CLI). A device is considered -to have Cowork installed if the on-disk session tree exists at: +to have Cowork installed if BOTH: - ~/.config/Claude/local-agent-mode-sessions/ + - A Claude Desktop installation is discoverable on disk, AND + - The on-disk session tree exists at + ~/.config/Claude/local-agent-mode-sessions/ (Claude Desktop is an Electron app and follows the XDG convention of storing its data under ~/.config/Claude/ on Linux.) -If only the app is present (Cowork never enabled / never used), there is -nothing on disk to report so we return None. When running as root we check -every user's home (and /root) so MDM-style deployments are covered. +If only the app is present (Cowork never enabled / never used) there is +nothing on disk to report, and if only the sessions tree is present (app +uninstalled, config residue left behind) it is not a real install — in both +cases we return None. When running as root we check every user's home (and +/root) so MDM-style deployments are covered. """ import logging @@ -52,17 +56,34 @@ def detect(self) -> Optional[Dict]: for user_home in get_linux_user_homes(): sessions_dir = _sessions_dir_for_user(user_home) try: - if sessions_dir.exists() and sessions_dir.is_dir(): - app_install = self._find_install_dir() - install_path = str(app_install) if app_install else str(sessions_dir) - return { - "name": self.tool_name, - "version": self.get_version(), - "install_path": install_path, - } + if not (sessions_dir.exists() and sessions_dir.is_dir()): + continue except OSError as e: logger.debug(f"Error checking Cowork sessions dir {sessions_dir}: {e}") continue + + # Require the Claude Desktop install to be present. The per-user + # ``~/.config/Claude`` tree (which holds the sessions dir) survives + # an uninstall (anthropics/claude-code#25013), so reporting on the + # sessions dir alone produced false positives. Gate on a real + # install dir. + app_install = self._find_install_dir() + if app_install is None: + logger.debug( + "Cowork sessions present under %s but no Claude Desktop " + "install found; treating as residue (not installed).", + user_home, + ) + continue + + return { + "name": self.tool_name, + "version": self.get_version(), + # Report the sessions dir (consistent with the macOS detector and + # the central ``_detect_claude_cowork`` path). ``app_install`` is + # the gate, not the reported path. + "install_path": str(sessions_dir), + } return None def get_version(self) -> Optional[str]: @@ -74,7 +95,9 @@ def get_version(self) -> Optional[str]: """ return None - def _find_install_dir(self) -> Optional[Path]: + def _find_install_dir(self, user_home: Optional[Path] = None) -> Optional[Path]: + # ``user_home`` is accepted for a uniform call signature with the central + # path; Linux install dirs are machine-global so it is unused here. for candidate in _candidate_install_dirs(): try: if candidate.exists() and candidate.is_dir(): diff --git a/scripts/coding_discovery_tools/linux/junie/junie.py b/scripts/coding_discovery_tools/linux/junie/junie.py index fbeca2b2..155fc68f 100644 --- a/scripts/coding_discovery_tools/linux/junie/junie.py +++ b/scripts/coding_discovery_tools/linux/junie/junie.py @@ -9,6 +9,8 @@ from ...coding_tool_base import BaseToolDetector from ...linux_extraction_helpers import get_linux_user_homes +from ...user_tool_detector import find_junie_binary_for_user +from ..jetbrains.jetbrains import LinuxJetBrainsDetector logger = logging.getLogger(__name__) @@ -48,22 +50,75 @@ def get_version(self) -> Optional[str]: return None def _detect_junie_for_user(self, user_home: Path) -> Optional[Dict]: - """Detect Junie installation for a specific user.""" - junie_dir = user_home / self.JUNIE_DIR_NAME + """Detect Junie installation for a specific user. - if not junie_dir.exists() or not junie_dir.is_dir(): + Gates on a real install signal — the Junie CLI binary OR the Junie + plugin in a JetBrains IDE — not on the ``~/.junie`` directory, which is + user-authored guidelines residue that survives uninstall. ``~/.junie`` + is still used as the version source. + """ + junie_bin = find_junie_binary_for_user(user_home) + install_path: Optional[str] = junie_bin + + if not install_path: + install_path = self._has_junie_jetbrains_plugin(user_home) + + if not install_path: return None - logger.debug(f"Found Junie directory at: {junie_dir}") + logger.debug(f"Detected Junie install signal at: {install_path}") - version = self._get_version_from_config(junie_dir) + version = self._get_version_from_config(user_home / self.JUNIE_DIR_NAME) return { "name": self.tool_name, "version": version or "Unknown", - "install_path": str(junie_dir) + "install_path": install_path } + def _has_junie_jetbrains_plugin(self, user_home: Path) -> Optional[str]: + """Return an install_path if the Junie plugin is present in a JetBrains + IDE belonging to ``user_home``, else None. + + Scoping matters: ``LinuxJetBrainsDetector.detect()`` ignores ``user_home`` + and iterates every user home internally, so calling it would attribute + another user's Junie plugin to whichever user is currently being scanned + (cross-user false positive). Instead we drive the detector's per-user + config-dir scan for ``user_home`` only, then enrich with plugins. The + JetBrains detector itself is never modified — only its existing per-user + methods are reused read-only. + """ + try: + jetbrains_detector = LinuxJetBrainsDetector() + scoped_ides = jetbrains_detector._scan_jetbrains_config_dir(user_home) + except (PermissionError, OSError) as e: + logger.debug(f"JetBrains scan for Junie failed under {user_home}: {e}") + return None + + for ide in scoped_ides: + config_path = ide.get("config_path") + if not self._path_under_user_home(config_path, user_home): + continue + try: + plugins = jetbrains_detector._get_plugins(config_path) + except (PermissionError, OSError) as e: + logger.debug(f"Plugin scan for Junie failed under {config_path}: {e}") + continue + for plugin_name in plugins: + if "junie" in plugin_name.lower(): + return config_path + return None + + @staticmethod + def _path_under_user_home(config_path: Optional[str], user_home: Path) -> bool: + """True if ``config_path`` is inside ``user_home`` (strict scoping guard).""" + if not config_path: + return False + try: + return Path(config_path).resolve().is_relative_to(user_home.resolve()) + except (OSError, ValueError): + return False + def _get_version_from_config(self, junie_dir: Path) -> Optional[str]: """Try to extract Junie version from configuration files.""" config_files = [ @@ -76,7 +131,7 @@ def _get_version_from_config(self, junie_dir: Path) -> Optional[str]: if config_file.exists(): with open(config_file, 'r', encoding='utf-8') as f: data = json.load(f) - if 'version' in data: + if isinstance(data, dict) and isinstance(data.get('version'), str): return data['version'] except (json.JSONDecodeError, OSError, PermissionError) as e: logger.debug(f"Could not read config file {config_file}: {e}") diff --git a/scripts/coding_discovery_tools/macos/junie/junie.py b/scripts/coding_discovery_tools/macos/junie/junie.py index 34b1df71..d697b373 100644 --- a/scripts/coding_discovery_tools/macos/junie/junie.py +++ b/scripts/coding_discovery_tools/macos/junie/junie.py @@ -7,7 +7,9 @@ from typing import Optional, Dict, List from ...coding_tool_base import BaseToolDetector +from ...macos.jetbrains.jetbrains import MacOSJetBrainsDetector from ...macos_extraction_helpers import is_running_as_root +from ...user_tool_detector import find_junie_binary_for_user logger = logging.getLogger(__name__) @@ -56,22 +58,74 @@ def get_version(self) -> Optional[str]: def _detect_junie_for_user(self, user_home: Path) -> Optional[Dict]: """ Detect Junie installation for a specific user. + + Gates on a real install signal — the Junie CLI binary OR the Junie + plugin in a JetBrains IDE — not on the ``~/.junie`` directory, which is + user-authored guidelines residue that survives uninstall. ``~/.junie`` + is still used as the version source. """ - junie_dir = user_home / self.JUNIE_DIR_NAME + junie_bin = find_junie_binary_for_user(user_home) + install_path: Optional[str] = junie_bin + + if not install_path: + install_path = self._has_junie_jetbrains_plugin(user_home) - if not junie_dir.exists() or not junie_dir.is_dir(): + if not install_path: return None - logger.debug(f"Found Junie directory at: {junie_dir}") + logger.debug(f"Detected Junie install signal at: {install_path}") - version = self._get_version_from_config(junie_dir) + version = self._get_version_from_config(user_home / self.JUNIE_DIR_NAME) return { "name": self.tool_name, "version": version or "Unknown", - "install_path": str(junie_dir) + "install_path": install_path } + def _has_junie_jetbrains_plugin(self, user_home: Path) -> Optional[str]: + """Return an install_path if the Junie plugin is present in a JetBrains + IDE belonging to ``user_home``, else None. + + Scoping matters: ``MacOSJetBrainsDetector.detect()`` ignores ``user_home`` + and, under root, scans every user under ``/Users``. Calling it here would + attribute another user's Junie plugin to whichever user is currently + being scanned (cross-user false positive). Instead we drive the + detector's per-user config-dir scan for ``user_home`` only, then enrich + with plugins. The JetBrains detector itself is never modified — only its + existing per-user methods are reused read-only. + """ + try: + jetbrains_detector = MacOSJetBrainsDetector() + scoped_ides = jetbrains_detector._scan_jetbrains_config_dir(user_home) + except (PermissionError, OSError) as e: + logger.debug(f"JetBrains scan for Junie failed under {user_home}: {e}") + return None + + for ide in scoped_ides: + config_path = ide.get("config_path") + if not self._path_under_user_home(config_path, user_home): + continue + try: + plugins = jetbrains_detector._get_plugins(config_path) + except (PermissionError, OSError) as e: + logger.debug(f"Plugin scan for Junie failed under {config_path}: {e}") + continue + for plugin_name in plugins: + if "junie" in plugin_name.lower(): + return config_path + return None + + @staticmethod + def _path_under_user_home(config_path: Optional[str], user_home: Path) -> bool: + """True if ``config_path`` is inside ``user_home`` (strict scoping guard).""" + if not config_path: + return False + try: + return Path(config_path).resolve().is_relative_to(user_home.resolve()) + except (OSError, ValueError): + return False + def _get_version_from_config(self, junie_dir: Path) -> Optional[str]: """ Try to extract Junie version from configuration files. @@ -87,7 +141,7 @@ def _get_version_from_config(self, junie_dir: Path) -> Optional[str]: import json with open(config_file, 'r', encoding='utf-8') as f: data = json.load(f) - if 'version' in data: + if isinstance(data, dict) and isinstance(data.get('version'), str): return data['version'] except (json.JSONDecodeError, OSError, PermissionError) as e: logger.debug(f"Could not read config file {config_file}: {e}") diff --git a/scripts/coding_discovery_tools/mcp_extraction_helpers.py b/scripts/coding_discovery_tools/mcp_extraction_helpers.py index 8327eb8a..43657025 100644 --- a/scripts/coding_discovery_tools/mcp_extraction_helpers.py +++ b/scripts/coding_discovery_tools/mcp_extraction_helpers.py @@ -6,6 +6,7 @@ """ import datetime +import hashlib import json import logging import os @@ -375,6 +376,102 @@ def _check_expired_token(cfg: Dict[str, Any], now_ms: int) -> Optional[Dict[str, } +_MCP_REMOTE_HEADER_RE = re.compile(r"^([A-Za-z0-9_-]+):\s*(.*)$") + + +def _mcp_remote_server_url_hash(cfg: Dict[str, Any]) -> Optional[str]: + """If cfg is a stdio config that launches `mcp-remote`, return the hash + mcp-remote uses to key that server's cached credentials. Else None. + + Mirrors mcp-remote's getServerUrlHash: md5 over the server URL, then the + `--resource` value (if given), then a compact JSON dump of `--header` pairs + sorted by key (if any), joined with '|'. Header values are hashed verbatim + because mcp-remote expands `${ENV}` only after computing the hash. + """ + if cfg.get("url"): + return None + command = cfg.get("command") + args = cfg.get("args") + if not command or not isinstance(args, list): + return None + str_args = [a for a in args if isinstance(a, str)] + launches_mcp_remote = ( + os.path.basename(str(command)).split("@", 1)[0] == "mcp-remote" + or any(a == "mcp-remote" or a.startswith("mcp-remote@") for a in str_args) + ) + if not launches_mcp_remote: + return None + + server_url = next((a for a in str_args if a.startswith(("http://", "https://"))), None) + if not server_url: + return None + + headers: Dict[str, str] = {} + resource = "" + i = 0 + while i < len(str_args) - 1: + if str_args[i] == "--header": + match = _MCP_REMOTE_HEADER_RE.match(str_args[i + 1]) + if match: + headers[match.group(1)] = match.group(2) + i += 2 + continue + if str_args[i] == "--resource": + resource = str_args[i + 1] + i += 2 + continue + i += 1 + + parts = [server_url] + if resource: + parts.append(resource) + if headers: + ordered = {k: headers[k] for k in sorted(headers)} + parts.append(json.dumps(ordered, separators=(",", ":"), ensure_ascii=False)) + return hashlib.md5("|".join(parts).encode("utf-8")).hexdigest() + + +def _mcp_remote_has_cached_token(server_url_hash: str) -> bool: + """True if mcp-remote already has a cached token for this server hash under + any installed version dir. Honors mcp-remote's $MCP_REMOTE_CONFIG_DIR.""" + base = os.environ.get("MCP_REMOTE_CONFIG_DIR") or str(Path.home() / ".mcp-auth") + try: + return any( + p.is_file() and p.stat().st_size > 0 + for p in Path(base).glob(f"mcp-remote-*/{server_url_hash}_tokens.json") + ) + except OSError: + return False + + +def _mcp_remote_unauthed_result(cfg: Dict[str, Any]) -> Optional[Dict[str, Any]]: + """Return an auth_required scan result if this config launches `mcp-remote` + for a server it has no cached token for. Else None. + + mcp-remote opens an OAuth browser tab whenever it's spawned without a usable + token. A discovery scan must never do that, so when no token is cached we skip + spawning entirely and report auth_required — the browser flow can only occur + if the process runs, and here it never does. When a token IS cached, we fall + through and scan normally (mcp-remote uses the token non-interactively).""" + server_url_hash = _mcp_remote_server_url_hash(cfg) + if server_url_hash is None or _mcp_remote_has_cached_token(server_url_hash): + return None + + scanned_at = ( + datetime.datetime.now(datetime.timezone.utc).replace(microsecond=0).isoformat() + ) + return { + "scanned_at": scanned_at, + "tools": None, + "tool_count": None, + "server_info": None, + "error": { + "code": "auth_required", + "details": {"reason": "mcp_remote_no_cached_token"}, + }, + } + + def _scan_servers_in_mapping( mcp_servers: Dict[str, Any], ) -> Dict[str, Dict[str, Any]]: @@ -399,6 +496,13 @@ def _scan_servers_in_mapping( _SCAN_CACHE[key] = expired results[name] = expired continue + # Skip spawning mcp-remote when it has no cached token: spawning it + # unauthenticated opens an OAuth browser tab. Report auth_required instead. + unauthed = _mcp_remote_unauthed_result(cfg) + if unauthed is not None: + _SCAN_CACHE[key] = unauthed + results[name] = unauthed + continue pending.append((name, _maybe_inject_bearer(cfg, now_ms), key)) if not pending: @@ -955,6 +1059,31 @@ def _iter_admin_user_homes(is_admin: bool, users_dir: Optional[Path]) -> List[Pa return [] +def _own_home_already_scanned(admin_homes: List[Path]) -> bool: + """True when the current process's home is already among ``admin_homes``. + + The admin/root branches scan every dir under the users root and then + separately re-process ``Path.home()`` to cover root's own home. That extra + step is correct on macOS/Linux, where the privileged account's home + (``/var/root`` / ``/root``) lives *outside* the users root. On Windows the + admin is a normal ``C:\\Users\\`` profile already covered by the + ``C:\\Users`` scan, so re-processing ``Path.home()`` double-counts its + configs (WEB-4673). This guard lets callers skip the re-add only in that + case (case-insensitive path match, since Windows paths are not case-sensitive). + """ + def _norm(p: Path) -> str: + try: + return os.path.normcase(os.path.normpath(str(p.resolve()))) + except OSError: + return os.path.normcase(os.path.normpath(str(p))) + + try: + home_n = _norm(Path.home()) + except Exception: + return False + return any(_norm(d) == home_n for d in admin_homes) + + def read_global_mcp_config( config_path: Path, tool_name: str = "MCP", @@ -1224,9 +1353,11 @@ def extract_ide_global_configs_with_root_support( logger.debug(f"Skipping user directory {user_dir} for {tool_name}: {e}") continue - # On Darwin/Windows also check root/admin's own home (not included in users_dir) + # On Darwin also check root's own home (/var/root, not under /Users). + # On Windows the admin is a normal C:\Users\ profile already in + # admin_homes, so re-adding it would double-count (WEB-4673) — skip then. import platform as _platform - if _platform.system() != "Linux": + if _platform.system() != "Linux" and not _own_home_already_scanned(admin_homes): try: root_configs = extract_configs_for_user_func(Path.home()) all_configs.extend(root_configs) @@ -1536,9 +1667,11 @@ def extract_dual_path_configs_with_root_support( except (ValueError, OSError): pass - # On Darwin/Windows also check root/admin's own home (not included in users_dir) + # On Darwin also check root's own home (/var/root, not under /Users). + # On Windows the admin is a normal C:\Users\ profile already in + # admin_homes, so re-adding it would double-count (WEB-4673) — skip then. import platform as _platform - if _platform.system() != "Linux": + if _platform.system() != "Linux" and not _own_home_already_scanned(admin_homes): if preferred_path.exists(): root_projects = extract_from_file_func(preferred_path) if root_projects: @@ -1700,9 +1833,11 @@ def extract_claudeai_mcp_servers_with_root_support(projects: List[Dict]) -> None except (PermissionError, OSError) as e: logger.debug(f"Error scanning claude.ai servers for user {user_dir.name}: {e}") - # On Darwin/Windows also scan the admin's own home (not in users_dir) + # On Darwin also scan the admin's own home (/var/root, not under /Users). + # On Windows the admin's home is already in admin_homes — skip to avoid + # double-counting its claude.ai MCP servers (WEB-4673). import platform as _platform - if _platform.system() != "Linux": + if _platform.system() != "Linux" and not _own_home_already_scanned(admin_homes): extract_claudeai_mcp_servers(Path.home() / ".claude", projects) else: extract_claudeai_mcp_servers(Path.home() / ".claude", projects) @@ -1954,9 +2089,11 @@ def extract_claude_plugin_mcp_configs_with_root_support( _scan_plugin_cache_dir(plugins_dir / "cache", projects, plugin_lookup=plugin_lookup) - # On Darwin/Windows also scan admin's own home plugins (not in users_dir) + # On Darwin also scan the admin's own home plugins (not under /Users). + # On Windows the admin's home is already in admin_homes — skip to avoid + # double-counting its plugin MCP configs (WEB-4673). import platform as _platform - if _platform.system() != "Linux": + if _platform.system() != "Linux" and not _own_home_already_scanned(admin_homes): extract_claude_plugin_mcp_configs(projects, plugin_lookup=plugin_lookup) else: extract_claude_plugin_mcp_configs(projects, plugin_lookup=plugin_lookup) diff --git a/scripts/coding_discovery_tools/scan_single_mcp_server.py b/scripts/coding_discovery_tools/scan_single_mcp_server.py index 249087b4..6e8629a8 100644 --- a/scripts/coding_discovery_tools/scan_single_mcp_server.py +++ b/scripts/coding_discovery_tools/scan_single_mcp_server.py @@ -28,7 +28,15 @@ def _normalize_url(url): def scan_one(server_name, server_config): """Scan a single server; returns the per-server object {name, command, url, args, scan}.""" servers = transform_mcp_servers_to_array({server_name: server_config}) - return servers[0] if servers else None + obj = servers[0] if servers else None + # Forward the base64 script body the hook attached for local-script servers + # (it resolved the path with cwd; we run detached without it). The backend + # recomputes sha256 -> `script:` fingerprint and stores the body. + if obj is not None and isinstance(server_config, dict) and server_config.get('script_content'): + obj['script_content'] = server_config['script_content'] + print(f"info: forwarding script_content ({len(server_config['script_content'])} b64 chars) for {server_name}", + file=sys.stderr) + return obj def _curl_config_quote(value): @@ -103,12 +111,6 @@ def main(): print(f"error: scan produced no result [{ctx}]", file=sys.stderr) return 1 - scan = server_obj.get("scan") or {} - if not (scan.get("tools") or []): - reason = (scan.get("error") or {}).get("code") or "no tools" - print(f"skip: not reporting ({reason}, tools=0) [{ctx}]", file=sys.stderr) - return 0 - try: result = report(args.domain, args.api_key, server_obj) except Exception as e: diff --git a/scripts/coding_discovery_tools/user_tool_detector.py b/scripts/coding_discovery_tools/user_tool_detector.py index f5107a6f..0da27024 100644 --- a/scripts/coding_discovery_tools/user_tool_detector.py +++ b/scripts/coding_discovery_tools/user_tool_detector.py @@ -5,6 +5,7 @@ user-specific paths like ~/.nvm, ~/.bun, and user configuration directories. """ +import json import logging import os import platform @@ -81,6 +82,10 @@ def detect_tool_for_user(detector: BaseToolDetector, user_home: Path) -> Optiona elif tool_name == "claude cowork": return _detect_claude_cowork(detector, user_home) + # Junie detection + elif tool_name == "junie": + return _detect_junie(detector, user_home) + # Default: Use detector's standard detection return detector.detect() @@ -336,7 +341,16 @@ def _detect_cursor_cli(detector: BaseToolDetector, user_home: Path) -> Optional[ def _detect_claude_cowork(detector: BaseToolDetector, user_home: Path) -> Optional[Dict]: - """Detect Claude Cowork installation for a user.""" + """Detect Claude Cowork installation for a user. + + Requires BOTH the on-disk Cowork sessions tree AND a present Claude Desktop + install. The per-user Claude config tree (which holds the sessions dir) + survives uninstall (anthropics/claude-code#25013), so on Linux/Windows + gating on the sessions dir alone produced false positives. macOS already + AND-requires ``/Applications/Claude.app``; Linux/Windows now AND-require an + install dir resolved by the OS detector's ``_find_install_dir`` (keeping the + install-dir candidate lists in the OS modules — one source of truth). + """ system = platform.system() if system == "Darwin": app_path = Path("/Applications/Claude.app") @@ -346,20 +360,219 @@ def _detect_claude_cowork(detector: BaseToolDetector, user_home: Path) -> Option except OSError: return None sessions_dir = user_home / "Library" / "Application Support" / "Claude" / COWORK_SESSIONS_DIR + require_install_dir = False elif system == "Linux": sessions_dir = user_home / ".config" / "Claude" / COWORK_SESSIONS_DIR + require_install_dir = True else: sessions_dir = user_home / "AppData" / "Roaming" / "Claude" / COWORK_SESSIONS_DIR + require_install_dir = True try: - if sessions_dir.exists() and sessions_dir.is_dir(): + if not (sessions_dir.exists() and sessions_dir.is_dir()): + return None + except (PermissionError, OSError): + return None + + if require_install_dir: + find_install_dir = getattr(detector, "_find_install_dir", None) + if not callable(find_install_dir): + return None + try: + # Pass the scanned user's home so an admin/MDM multi-user scan probes + # THIS user's per-user install dir, not the scanner's (Windows). + if find_install_dir(user_home) is None: + return None + except (PermissionError, OSError): + return None + + return { + "name": detector.tool_name, + "version": detector.get_version(), + "install_path": str(sessions_dir) + } + + +def _junie_version_from_config(user_home: Path) -> Optional[str]: + """Read the Junie version from ``~/.junie/config.json`` (or settings.json). + + ``~/.junie`` is residue and must NOT gate detection, but once Junie is + confirmed via the binary/plugin it is still the authoritative version source. + Best-effort: any read/parse error yields None ("Unknown" downstream). + """ + junie_dir = user_home / ".junie" + for name in ("config.json", "settings.json"): + config_file = junie_dir / name + try: + if not config_file.exists(): + continue + with open(config_file, "r", encoding="utf-8") as f: + data = json.load(f) + if isinstance(data, dict) and isinstance(data.get("version"), str): + return data["version"] + except (json.JSONDecodeError, OSError, PermissionError) as e: + logger.debug(f"Could not read Junie config file {config_file}: {e}") + continue + return None + + +def _detect_junie(detector: BaseToolDetector, user_home: Path) -> Optional[Dict]: + """Detect Junie installation for a user. + + Gates on a real install signal — the Junie CLI **binary** OR the **Junie + plugin** present in a JetBrains IDE — not on the ``~/.junie`` directory. + ``~/.junie`` is a user-authored guidelines dir (AGENTS.md / config.json): + it survives uninstall AND is created by usage rather than install, so + gating on it produced false positives. ``~/.junie`` remains the version + source and the rules/MCP extraction source. + + The JetBrains plugin check is delegated to the OS detector's + ``_has_junie_jetbrains_plugin`` (keeps the OS-specific JetBrains detector + choice in the OS module, mirroring the ``_find_install_dir`` delegation used + for Claude Cowork). + """ + junie_bin = find_junie_binary_for_user(user_home) + if junie_bin: + return { + "name": detector.tool_name, + "version": _junie_version_from_config(user_home) or "Unknown", + "install_path": junie_bin, + } + + plugin_check = getattr(detector, "_has_junie_jetbrains_plugin", None) + if callable(plugin_check): + try: + plugin_path = plugin_check(user_home) + except (PermissionError, OSError) as e: + logger.debug(f"Junie JetBrains plugin check failed for {user_home}: {e}") + plugin_path = None + if plugin_path: return { "name": detector.tool_name, - "version": detector.get_version(), - "install_path": str(sessions_dir) + "version": _junie_version_from_config(user_home) or "Unknown", + "install_path": plugin_path, } - except (PermissionError, OSError): - pass + + return None + + +def find_junie_binary_for_user(user_home: Path) -> Optional[str]: + """Find the absolute path to the ``junie`` CLI binary for a specific user. + + Mirrors ``find_claude_binary_for_user``. Junie's CLI ships a shim at + ``~/.local/bin/junie`` plus versioned builds under + ``~/.local/share/junie/versions//junie``, and is also installable via + Homebrew, Bun and npm-global. Machine-global candidates (Homebrew, + /usr/local) are owner-attributed under a root/MDM scan so one user's install + is not fanned out to every user (the 93b5fc2 cross-user FP class). Never + raises. + + Args: + user_home: Path to the user's home directory. + + Returns: + Absolute path to the junie binary as a string, or None if not found. + """ + if platform.system() == "Windows": + # Existence-gated, not os.access(X_OK): on Windows X_OK is True for any + # file, so it cannot distinguish a binary. + npm_dir = user_home / "AppData" / "Roaming" / "npm" + candidates = [ + user_home / ".local" / "bin" / "junie.exe", + user_home / ".local" / "bin" / "junie", + npm_dir / "junie.cmd", + npm_dir / "junie.exe", + user_home / ".bun" / "bin" / "junie.exe", + ] + for candidate in candidates: + try: + if candidate.exists(): + return str(candidate) + except (PermissionError, OSError): + continue + + # Native installer keeps the real binary under a versioned subdir; pick + # the newest version so a stale older build isn't reported. + versions_dir = user_home / ".local" / "share" / "junie" / "versions" + try: + if versions_dir.exists(): + version_dirs = sorted( + (d for d in versions_dir.iterdir() if d.is_dir()), + key=_cursor_agent_version_key, + reverse=True, + ) + for version_dir in version_dirs: + versioned = version_dir / "junie.exe" + try: + if versioned.exists(): + return str(versioned) + except (PermissionError, OSError): + continue + except (PermissionError, OSError) as e: + logger.debug(f"Could not enumerate Windows junie versions: {e}") + return None + + # POSIX (macOS / Linux). + user_relative = [ + user_home / ".local" / "bin" / "junie", # Official installer shim + user_home / ".bun" / "bin" / "junie", # Bun global install + user_home / ".npm-global" / "bin" / "junie", # npm global prefix + ] + # Homebrew and /usr/local are MACHINE-GLOBAL — always probed, but under a + # root/MDM multi-user scan each is owner-attributed (the loop below) so one + # user's install isn't fanned out to every user. + machine_global = [ + Path("/opt/homebrew/bin/junie"), # Apple Silicon Homebrew + Path("/usr/local/bin/junie"), # Intel Mac / manual install + ] + + is_root = is_running_as_root() + for candidate in machine_global + user_relative: + try: + if candidate.exists() and os.access(str(candidate), os.X_OK): + if is_root and candidate in machine_global \ + and not machine_global_binary_owned_by_user(candidate, user_home): + continue + return str(candidate) + except (PermissionError, OSError): + continue + + # Versioned install dir: pick the newest version's junie. + versions_dir = user_home / ".local" / "share" / "junie" / "versions" + try: + if versions_dir.exists(): + version_dirs = sorted( + (d for d in versions_dir.iterdir() if d.is_dir()), + key=_cursor_agent_version_key, + reverse=True, + ) + for version_dir in version_dirs: + versioned = version_dir / "junie" + try: + if versioned.exists() and os.access(str(versioned), os.X_OK): + return str(versioned) + except (PermissionError, OSError): + continue + except (PermissionError, OSError) as e: + logger.debug(f"Could not enumerate junie versions: {e}") + + # npm-global prefix backstop. Root-guarded inside the helper: ``npm prefix + # -g`` resolves the scanner's prefix, not the user's. + npm_resolved = resolve_npm_global_tool_bin("junie", user_home, is_root) + if npm_resolved: + return npm_resolved + + # PATH backstop: non-root only — under root ``which`` resolves the scanner's + # PATH, mis-attributing its junie to a user who has only residue. + if not is_root: + which_path = run_command(["which", "junie"], VERSION_TIMEOUT) + if which_path: + try: + resolved = Path(which_path) + if resolved.exists() and os.access(str(resolved), os.X_OK): + return str(resolved) + except (PermissionError, OSError): + pass return None diff --git a/scripts/coding_discovery_tools/utils.py b/scripts/coding_discovery_tools/utils.py index 03d0d6e2..4ae18786 100644 --- a/scripts/coding_discovery_tools/utils.py +++ b/scripts/coding_discovery_tools/utils.py @@ -477,6 +477,126 @@ def get_all_users_linux() -> List[str]: return users +# Identities that are never a real human end-user. We map these to None for the +# audit/payload value so the backend never attributes a machine to a service +# account. Matching is case-insensitive against the whole, trimmed name. +# Trade-off: a rare human whose login literally equals one of these (e.g. an +# admin named "administrator", or a person named "daemon"/"nginx") is also +# rejected. We accept that — for audit attribution a false None is safer than +# mislabelling a machine with a service identity (the FE shows "No AI tools +# detected" rather than a wrong owner). +_NON_HUMAN_USERS: FrozenSet[str] = frozenset( + { + "root", + "system", + "unknown", + # Windows built-in / service identities (may also appear bare, without a + # DOMAIN prefix, depending on how whoami resolves them). + "administrator", + "localsystem", + "local service", + "network service", + # Common Linux service accounts. + "www-data", + "postgres", + "nobody", + "daemon", + "nginx", + "mysql", + } +) + +# Windows domains that only ever own service principals (e.g. +# ``NT AUTHORITY\LOCAL SERVICE``, ``NT SERVICE\MSSQLSERVER``). Any user under +# these is a service account, never a human end-user. +_NON_HUMAN_WINDOWS_DOMAINS: FrozenSet[str] = frozenset({"nt authority", "nt service"}) + + +def _strip_windows_domain(name: str) -> str: + """Return the bare username from a Windows ``DOMAIN\\username`` string. + + ``whoami`` on Windows returns ``DOMAIN\\username`` (or ``MACHINE\\username``); + we only want the trailing username component. Names without a backslash are + returned unchanged. + + Args: + name: Raw whoami output (possibly ``DOMAIN\\username``). + + Returns: + The bare username with any domain prefix stripped. + """ + if name and "\\" in name: + return name.split("\\")[-1] + return name + + +def _real_user_or_none(name: Optional[str]) -> Optional[str]: + """Return the trimmed username if it is a real human, otherwise None. + + Maps junk / service / machine identities to None so scan-lifecycle audit + payloads never attribute a machine to a non-human account. This is the + canonical filter and is self-contained: it strips any Windows ``DOMAIN\\`` + prefix itself, so it is safe regardless of the caller's path. Rejected + (case-insensitive, after trimming + domain-stripping): + - empty / whitespace-only + - anything under the ``NT AUTHORITY`` / ``NT SERVICE`` Windows domains + (e.g. ``NT AUTHORITY\\LOCAL SERVICE``, ``NT SERVICE\\MSSQLSERVER``) + - the literal ``"unknown"`` + - ``"root"``, ``"system"``, and Windows built-ins (administrator, + localsystem, local service, network service) + - anything starting with ``"_"`` (macOS daemon accounts) + - anything ending with ``"$"`` (Windows machine accounts) + - common Linux service accounts (www-data, postgres, nobody, daemon, + nginx, mysql) + + Args: + name: Candidate username (may be None, may be ``DOMAIN\\username``). + + Returns: + The trimmed, domain-stripped username if it is a real human, else None. + """ + if not name: + return None + raw = name.strip() + # Reject service principals by their Windows domain before stripping it. + if "\\" in raw and raw.split("\\")[0].strip().lower() in _NON_HUMAN_WINDOWS_DOMAINS: + return None + stripped = _strip_windows_domain(raw).strip() + if not stripped: + return None + if stripped.startswith("_"): + return None + if stripped.endswith("$"): + return None + if stripped.lower() in _NON_HUMAN_USERS: + return None + return stripped + + +def get_audit_user() -> Optional[str]: + """Return the real human user running the scan, or None. + + This is the value to attach to scan-lifecycle audit payloads: it is the + real human OR None, never a junk/service/machine identity. For + container/daemon/root scans where no human can be resolved, returns None + rather than a synthesized owner. + + Returns: + The real human username, or None when no human user can be resolved. + """ + # On Windows, resolve the RAW, domain-qualified identity (``whoami`` → + # ``DOMAIN\\user``) so _real_user_or_none can apply its NT AUTHORITY / + # NT SERVICE domain rejection. get_user_info() pre-strips the ``DOMAIN\\`` + # prefix (path-building needs the bare name), which would otherwise hide a + # service principal like ``NT SERVICE\\MSSQLSERVER`` behind its bare, + # non-denylisted name. Fall back to get_user_info() if whoami yields nothing. + if platform.system() == "Windows": + raw = run_command(["whoami"], COMMAND_TIMEOUT) + if raw: + return _real_user_or_none(raw) + return _real_user_or_none(get_user_info()) + + def get_user_info() -> str: """ Get current user information (whoami equivalent). @@ -485,23 +605,28 @@ def get_user_info() -> str: On macOS, when running as root, finds the user with the most storage space in /Users directory to get the actual user instead of "root". - - On Windows, when running as administrator, finds the actual logged-in user - by querying explorer.exe process owner, Win32_ComputerSystem, or active console - session instead of returning "Administrator" or "admin". - + + On Windows, returns ``whoami`` with any ``DOMAIN\\`` prefix stripped, falling + back to ``getpass.getuser()``. (It does NOT currently resolve the real + interactive user when running as a service/SYSTEM — get_audit_user() maps + such non-human identities to None.) + + NOTE: This ALWAYS returns a usable, non-None string (falling back to + "unknown"). Callers that build filesystem paths like ``/Users/`` rely + on that guarantee. For an audit/payload value that is the real human OR + None, use ``get_audit_user()`` instead. + Returns: - Current username as string + Current username as string (never None) """ try: username = None - + if platform.system() == "Windows": # Use whoami command on Windows (works reliably) whoami_output = run_command(["whoami"], COMMAND_TIMEOUT) # Extract just the username if whoami returns DOMAIN\username format - if username and "\\" in username: - username = username.split("\\")[-1] + username = _strip_windows_domain(whoami_output) if whoami_output else None else: # On macOS/Linux, check if running as root first current_user = run_command(["whoami"], COMMAND_TIMEOUT) @@ -589,7 +714,8 @@ def send_scan_event( app_name: Optional[str] = None, home_user: Optional[str] = None, scan_error: Optional[Dict] = None, - sentry_context: Optional[Dict] = None + sentry_context: Optional[Dict] = None, + system_user: Optional[str] = None, ) -> Tuple[bool, bool]: """ Send scan lifecycle event to backend (in_progress, completed, failed). @@ -604,6 +730,9 @@ def send_scan_event( home_user: Optional user context (for user-specific failures) scan_error: Optional error data (required when scan_event="failed") sentry_context: Optional context dict forwarded to Sentry on failure + system_user: Optional real human user running the scan (or None). Used by + the backend to attribute empty machines. MUST be a real human or + None (see ``get_audit_user``), never a junk/service identity. Returns: Tuple of (success, retryable): success=True if sent, retryable=True if caller should queue @@ -617,6 +746,9 @@ def send_scan_event( if app_name: payload["app_name"] = app_name + if system_user: + payload["system_user"] = system_user + if home_user: payload["home_user"] = home_user diff --git a/scripts/coding_discovery_tools/windows/claude_cowork/claude_cowork.py b/scripts/coding_discovery_tools/windows/claude_cowork/claude_cowork.py index 4bd61a86..89572760 100644 --- a/scripts/coding_discovery_tools/windows/claude_cowork/claude_cowork.py +++ b/scripts/coding_discovery_tools/windows/claude_cowork/claude_cowork.py @@ -34,9 +34,14 @@ def _get_cowork_sessions_dir() -> Optional[Path]: return Path(appdata) / "Claude" / COWORK_SESSIONS_DIR -def _candidate_install_dirs() -> List[Path]: - """Common locations where Claude Desktop is installed on Windows.""" - user_home = Path.home() +def _candidate_install_dirs(user_home: Path) -> List[Path]: + """Common locations where Claude Desktop is installed on Windows. + + ``user_home`` is the home of the user being scanned (not necessarily the + scanner's own home): under an admin/MDM multi-user scan the per-user + ``AppData\\Local\\Programs\\Claude`` install lives under the scanned user's + profile, so probing ``Path.home()`` would miss it. + """ return [ user_home / "AppData" / "Local" / "Programs" / "Claude", user_home / "AppData" / "Local" / "Programs" / "claude", @@ -67,19 +72,25 @@ def detect(self) -> Optional[Dict]: if not sessions_present: return None - # If we can locate the actual Claude Desktop install dir, report it - # as the install_path; otherwise fall back to the sessions dir, which - # is at least a stable location associated with this device. + # Require the Claude Desktop install to be present. The per-user + # ``%APPDATA%\Claude`` tree (which holds the sessions dir) survives an + # uninstall (anthropics/claude-code#25013), so reporting on the sessions + # dir alone produced false positives. Gate on a real install dir. app_install = self._find_install_dir() - install_path = str(app_install) if app_install else str(sessions_dir) + if app_install is None: + logger.debug( + "Cowork sessions present but no Claude Desktop install found; " + "treating as residue (not installed)." + ) + return None - # An on-disk sessions tree without the app present is unusual but - # not impossible (uninstall + leftover state). Still report it — - # the sessions dir is the data we actually care about. return { "name": self.tool_name, "version": self.get_version(), - "install_path": install_path, + # Report the sessions dir (consistent with the macOS detector and the + # central ``_detect_claude_cowork`` path). ``app_install`` is the gate, + # not the reported path. + "install_path": str(sessions_dir), } def get_version(self) -> Optional[str]: @@ -92,8 +103,9 @@ def get_version(self) -> Optional[str]: """ return None - def _find_install_dir(self) -> Optional[Path]: - for candidate in _candidate_install_dirs(): + def _find_install_dir(self, user_home: Optional[Path] = None) -> Optional[Path]: + home = user_home or getattr(self, "user_home", None) or Path.home() + for candidate in _candidate_install_dirs(Path(home)): try: if candidate.exists() and candidate.is_dir(): return candidate diff --git a/scripts/coding_discovery_tools/windows/github_copilot/detect_copilot.py b/scripts/coding_discovery_tools/windows/github_copilot/detect_copilot.py index 121b65d3..42109914 100644 --- a/scripts/coding_discovery_tools/windows/github_copilot/detect_copilot.py +++ b/scripts/coding_discovery_tools/windows/github_copilot/detect_copilot.py @@ -5,6 +5,7 @@ from typing import Optional, Dict, List from ...coding_tool_base import BaseCopilotDetector +from ...vscode_extension_helpers import find_extension_in_editor from ...windows_extraction_helpers import is_running_as_admin from ..jetbrains.jetbrains import WindowsJetBrainsDetector @@ -110,58 +111,30 @@ def _detect_vscode_for_user(self, user_home: Path) -> List[Dict]: """ Detect VS Code Copilot for a specific user. - Scans %USERPROFILE%\\.vscode\\extensions for folders starting with github.copilot*. + Reads the LIVE ``.vscode\\extensions\\extensions.json`` registry rather + than globbing for ``github.copilot*`` folders. VS Code rewrites this + registry on uninstall, but the extension FOLDER survives + (microsoft/vscode#81046), so the old folder glob produced phantom rows + for uninstalled Copilot. This matches the SAFE macOS/Linux path. """ results = [] vscode_ext_dir = user_home / ".vscode" / "extensions" - # Note: don't early-return when the marketplace extensions dir is absent - # — a built-in-Copilot user may have no .vscode\extensions at all, and the - # built-in fallback below still needs to run. - if not vscode_ext_dir.exists(): - logger.debug(f"VS Code extensions directory not found: {vscode_ext_dir}") - copilot_dirs = [] - else: - try: - # Look for github.copilot* directories - copilot_dirs = list(vscode_ext_dir.glob("github.copilot*")) - except (PermissionError, OSError) as e: - logger.debug(f"Error scanning VS Code extensions: {e}") - copilot_dirs = [] - - try: - for copilot_dir in copilot_dirs: - if not copilot_dir.is_dir(): - continue - - version = "unknown" - pkg_json = copilot_dir / "package.json" - - if pkg_json.exists(): - data = _load_jsonc(pkg_json) - if data: - version = data.get('version', 'unknown') - - if version == "unknown" and "-" in copilot_dir.name: - try: - version = copilot_dir.name.rsplit('-', 1)[1] - except IndexError: - pass - - ext_name = "GitHub Copilot (VS Code)" - if "copilot-chat" in copilot_dir.name.lower(): - ext_name = "GitHub Copilot Chat (VS Code)" - - results.append({ - "name": ext_name, - "version": version, - "publisher": "GitHub", - "install_path": str(copilot_dir) - }) - logger.info(f"Detected: {ext_name} v{version} at {copilot_dir}") - - except (PermissionError, OSError) as e: - logger.debug(f"Error scanning VS Code extensions: {e}") + for ext_id, name in ( + ("github.copilot", "GitHub Copilot (VS Code)"), + ("github.copilot-chat", "GitHub Copilot Chat (VS Code)"), + ): + entry = find_extension_in_editor(user_home, "Code", ext_id) + if entry is None: + continue + _location, version = entry + results.append({ + "name": name, + "version": version or "unknown", + "publisher": "GitHub", + "install_path": str(vscode_ext_dir), + }) + logger.info(f"Detected: {name} v{version or 'unknown'} at {vscode_ext_dir}") # Fall back to BUILT-IN Copilot (bundled in the VS Code install) when no # marketplace Copilot extension is present, so built-in users — and their diff --git a/scripts/coding_discovery_tools/windows/junie/junie.py b/scripts/coding_discovery_tools/windows/junie/junie.py index 3f50a1cd..94c1ef6e 100644 --- a/scripts/coding_discovery_tools/windows/junie/junie.py +++ b/scripts/coding_discovery_tools/windows/junie/junie.py @@ -13,7 +13,9 @@ from typing import Optional, Dict, List from ...coding_tool_base import BaseToolDetector +from ...user_tool_detector import find_junie_binary_for_user from ...windows_extraction_helpers import scan_windows_user_directories +from ..jetbrains.jetbrains import WindowsJetBrainsDetector logger = logging.getLogger(__name__) @@ -55,22 +57,70 @@ def get_version(self) -> Optional[str]: return None def _detect_junie_for_user(self, user_home: Path) -> Optional[Dict]: - """Detect Junie installation for a specific user.""" - junie_dir = user_home / self.JUNIE_DIR_NAME + """Detect Junie installation for a specific user. - if not junie_dir.exists() or not junie_dir.is_dir(): + Gates on a real install signal — the Junie CLI binary OR the Junie + plugin in a JetBrains IDE — not on the ``%USERPROFILE%\\.junie`` + directory, which is user-authored guidelines residue that survives + uninstall. ``.junie`` is still used as the version source. + """ + junie_bin = find_junie_binary_for_user(user_home) + install_path: Optional[str] = junie_bin + + if not install_path: + install_path = self._has_junie_jetbrains_plugin(user_home) + + if not install_path: return None - logger.debug(f"Found Junie directory at: {junie_dir}") + logger.debug(f"Detected Junie install signal at: {install_path}") - version = self._get_version_from_config(junie_dir) + version = self._get_version_from_config(user_home / self.JUNIE_DIR_NAME) return { "name": self.tool_name, "version": version or "Unknown", - "install_path": str(junie_dir) + "install_path": install_path } + def _has_junie_jetbrains_plugin(self, user_home: Path) -> Optional[str]: + """Return an install_path if the Junie plugin is present in a JetBrains + IDE belonging to ``user_home``, else None. + + On Windows ``WindowsJetBrainsDetector.detect()`` already honors + ``self.user_home`` (its ``jetbrains_config_dir`` property derives from it), + so the scan is scoped by construction. We additionally guard each match by + confirming the IDE config path is under ``user_home`` so a stray + cross-user entry can never be attributed to the user being scanned. The + JetBrains detector itself is never modified. + """ + try: + jetbrains_detector = WindowsJetBrainsDetector() + jetbrains_detector.user_home = user_home + all_ides = jetbrains_detector.detect() or [] + except (PermissionError, OSError) as e: + logger.debug(f"JetBrains scan for Junie failed under {user_home}: {e}") + return None + + for ide in all_ides: + config_path = ide.get("_config_path") or ide.get("install_path") + if not self._path_under_user_home(config_path, user_home): + continue + for plugin_name in ide.get("plugins", []): + if "junie" in plugin_name.lower(): + return config_path + return None + + @staticmethod + def _path_under_user_home(config_path: Optional[str], user_home: Path) -> bool: + """True if ``config_path`` is inside ``user_home`` (strict scoping guard).""" + if not config_path: + return False + try: + return Path(config_path).resolve().is_relative_to(user_home.resolve()) + except (OSError, ValueError): + return False + def _get_version_from_config(self, junie_dir: Path) -> Optional[str]: """Try to extract Junie version from configuration files.""" config_files = [ @@ -83,7 +133,7 @@ def _get_version_from_config(self, junie_dir: Path) -> Optional[str]: if config_file.exists(): with open(config_file, 'r', encoding='utf-8') as f: data = json.load(f) - if 'version' in data: + if isinstance(data, dict) and isinstance(data.get('version'), str): return data['version'] except (json.JSONDecodeError, OSError, PermissionError) as e: logger.debug(f"Could not read config file {config_file}: {e}") diff --git a/setup-scheduled-scan.ps1 b/setup-scheduled-scan.ps1 index 50dee7b6..3d8f1151 100644 --- a/setup-scheduled-scan.ps1 +++ b/setup-scheduled-scan.ps1 @@ -97,13 +97,13 @@ Unbound Scheduled Run Setup (Windows) Usage: Install (discover): .\setup-scheduled-scan.ps1 -ApiKey -Domain - Install (onboard): .\setup-scheduled-scan.ps1 -Command onboard -ApiKey -DiscoveryKey [-Domain ] + Install (onboard): .\setup-scheduled-scan.ps1 -Command onboard -ApiKey [-Domain ] Uninstall: .\setup-scheduled-scan.ps1 -Uninstall Options: -Command 'discover' (default) or 'onboard' -ApiKey User API key (or discovery key when -Command discover) - -DiscoveryKey Discovery key (required for -Command onboard) + -DiscoveryKey Org discovery key (optional; only for sudo/MDM all-users scans) -Domain Backend URL -Uninstall Remove the scheduled task '@ @@ -252,10 +252,8 @@ switch ($Command) { Write-Log ("Discover exited with code {0}" -f $ec) } 'onboard' { - if ([string]::IsNullOrEmpty($DiscoveryKey)) { - Write-Log "ERROR: discovery_key missing from Credential Manager (required for onboard command)" - exit 1 - } + # Per-user onboarding scans with the user's own API key (WEB-4891), so a + # discovery key is optional — forward it only when one was stored. $unbound = (Get-Command unbound -ErrorAction SilentlyContinue).Source if (-not $unbound) { Write-Log "ERROR: 'unbound' CLI not found in PATH. Install with: npm install -g unbound-cli" @@ -264,7 +262,7 @@ switch ($Command) { # Credentials go via env vars — Win32_Process.CommandLine is readable by any # authenticated user and Windows Event Log 4688 captures full command lines. $env:UNBOUND_API_KEY = $ApiKey - $env:UNBOUND_DISCOVERY_KEY = $DiscoveryKey + if (-not [string]::IsNullOrEmpty($DiscoveryKey)) { $env:UNBOUND_DISCOVERY_KEY = $DiscoveryKey } $cmdArgs = @('onboard') if (-not [string]::IsNullOrEmpty($Domain)) { $cmdArgs += @('--domain', $Domain) } Write-Log "Executing: unbound onboard (credentials via env vars) ..." @@ -401,10 +399,8 @@ if ([string]::IsNullOrEmpty($ApiKey)) { Write-Host "Error: -ApiKey is required" Show-Usage } -if ($Command -eq 'onboard' -and [string]::IsNullOrEmpty($DiscoveryKey)) { - Write-Host "Error: -DiscoveryKey is required when -Command onboard" - Show-Usage -} +# -DiscoveryKey is optional for onboard: per-user onboarding scans with the +# user's own API key (WEB-4891). It is only needed for sudo/MDM all-users scans. if ($Command -eq 'discover' -and [string]::IsNullOrEmpty($Domain)) { Write-Host "Error: -Domain is required when -Command discover" Show-Usage diff --git a/setup-scheduled-scan.sh b/setup-scheduled-scan.sh index 652e36c3..2404df1d 100755 --- a/setup-scheduled-scan.sh +++ b/setup-scheduled-scan.sh @@ -67,13 +67,13 @@ usage() { echo "" echo "Usage:" echo " Install (discover): $0 [--command discover] --api-key --domain " - echo " Install (onboard): $0 --command onboard --api-key --discovery-key [--domain ]" + echo " Install (onboard): $0 --command onboard --api-key [--domain ]" echo " Uninstall: $0 --uninstall" echo "" echo "Options:" echo " --command Subcommand to schedule: 'discover' (default) or 'onboard'" echo " --api-key User API key (or discovery key when --command discover)" - echo " --discovery-key Discovery key (required for --command onboard)" + echo " --discovery-key Org discovery key (optional; only for sudo/MDM all-users scans)" echo " --domain Backend URL (e.g., https://backend.getunbound.ai)" echo " --uninstall Remove the scheduled job" echo " --help Show this help message" @@ -309,11 +309,9 @@ case "\$COMMAND" in fi ;; onboard) - # Re-run the full unbound onboard flow daily. - if [ -z "\$DISCOVERY_KEY" ]; then - log "ERROR: discovery_key missing from stored credentials (required for onboard)" - exit 1 - fi + # Re-run the full unbound onboard flow daily. Per-user onboarding scans + # with the user's own API key (WEB-4891), so a separate discovery key is + # optional here — it is forwarded only when one was stored (older setups). # Use the path that was resolved at setup time first. This survives nvm version # switches, non-standard npm prefix layouts, and any other case where the # scheduler's minimal PATH wouldn't find the binary. Fall back to a fresh PATH @@ -331,9 +329,13 @@ case "\$COMMAND" in ARGS=(onboard) [ -n "\$DOMAIN" ] && ARGS+=(--domain "\$DOMAIN") log "Executing: unbound onboard (keys via env vars) \${DOMAIN:+--domain \$DOMAIN}" - UNBOUND_API_KEY="\$API_KEY" UNBOUND_DISCOVERY_KEY="\$DISCOVERY_KEY" "\$UNBOUND_BIN" "\${ARGS[@]}" >> "\$LOG_DIR/scheduled.log" 2>&1 || EXIT_CODE=\$? + if [ -n "\$DISCOVERY_KEY" ]; then + UNBOUND_API_KEY="\$API_KEY" UNBOUND_DISCOVERY_KEY="\$DISCOVERY_KEY" "\$UNBOUND_BIN" "\${ARGS[@]}" >> "\$LOG_DIR/scheduled.log" 2>&1 || EXIT_CODE=\$? + else + UNBOUND_API_KEY="\$API_KEY" "\$UNBOUND_BIN" "\${ARGS[@]}" >> "\$LOG_DIR/scheduled.log" 2>&1 || EXIT_CODE=\$? + fi if [ \$EXIT_CODE -ne 0 ] && tail -40 "\$LOG_DIR/scheduled.log" | grep -qiE "401|[Ii]nvalid.*(api.?key|key)|[Uu]nauthorized"; then - log "HINT: Auth error detected — your API key may have been rotated in the Unbound dashboard. Re-run to update stored credentials: unbound onboard --set-cron --api-key --discovery-key " + log "HINT: Auth error detected — your API key may have been rotated in the Unbound dashboard. Re-run to update stored credentials: unbound onboard --set-cron --api-key " fi ;; *) @@ -647,10 +649,9 @@ if [ -z "$API_KEY" ]; then echo "Error: --api-key is required" usage fi -if [ "$COMMAND" = "onboard" ] && [ -z "$DISCOVERY_KEY" ]; then - echo "Error: --discovery-key is required when --command onboard" - usage -fi +# --discovery-key is optional for onboard: per-user onboarding scans with the +# user's own API key (WEB-4891). It is only needed for sudo/MDM (all-users) +# enrollment, which is scheduled via --command discover, not onboard. if [ "$COMMAND" = "discover" ] && [ -z "$DOMAIN" ]; then echo "Error: --domain is required when --command discover" usage diff --git a/tests/test_cli_flags.py b/tests/test_cli_flags.py index 650cd4a0..1756561c 100644 --- a/tests/test_cli_flags.py +++ b/tests/test_cli_flags.py @@ -1,5 +1,5 @@ """ -Tests for the --dump / --summary / --payload verbosity flags. +Tests for the --dump / --payload verbosity flags. """ import argparse import io @@ -15,13 +15,12 @@ def _build_parser(): parser.add_argument('--app_name') g = parser.add_mutually_exclusive_group() g.add_argument('--dump', action='store_true') - g.add_argument('--summary', action='store_true') g.add_argument('--payload', action='store_true') return parser class TestVerbosityFlagsMutex(unittest.TestCase): - """argparse should reject any two-of-three verbosity flag combination.""" + """argparse should reject combining the verbosity flags (--dump/--payload).""" def setUp(self): self.parser = _build_parser() @@ -33,47 +32,34 @@ def _expect_exit(self, *flags): def test_dump_alone_ok(self): args = self.parser.parse_args(['--api-key', 'k', '--domain', 'd', '--dump']) self.assertTrue(args.dump) - self.assertFalse(args.summary) - self.assertFalse(args.payload) - - def test_summary_alone_ok(self): - args = self.parser.parse_args(['--api-key', 'k', '--domain', 'd', '--summary']) - self.assertTrue(args.summary) - self.assertFalse(args.dump) self.assertFalse(args.payload) def test_payload_alone_ok(self): args = self.parser.parse_args(['--api-key', 'k', '--domain', 'd', '--payload']) self.assertTrue(args.payload) self.assertFalse(args.dump) - self.assertFalse(args.summary) def test_no_flags_defaults_all_false(self): args = self.parser.parse_args(['--api-key', 'k', '--domain', 'd']) self.assertFalse(args.dump) - self.assertFalse(args.summary) self.assertFalse(args.payload) - def test_dump_and_summary_rejected(self): - self._expect_exit('--dump', '--summary') + def test_summary_flag_rejected(self): + # --summary was removed; concise is the default. A stale caller passing + # it must fail loudly (argparse exit), not silently get default output. + self._expect_exit('--summary') def test_dump_and_payload_rejected(self): self._expect_exit('--dump', '--payload') - def test_summary_and_payload_rejected(self): - self._expect_exit('--summary', '--payload') - - def test_all_three_rejected(self): - self._expect_exit('--dump', '--summary', '--payload') - class TestLoggingHelpersSuppression(unittest.TestCase): """ Setting the logging_helpers module's logger to WARNING must silence the INFO output of log_rules_details / log_mcp_details / log_settings_details. - This is the property --summary relies on; we test it directly so a future - rename of the module doesn't silently break --summary. + This is the property the concise default relies on; we test it directly so + a future module rename doesn't silently break the default suppression. """ def setUp(self): diff --git a/tests/test_copilot_cli_discovery.py b/tests/test_copilot_cli_discovery.py index 0d881735..c7e87d56 100644 --- a/tests/test_copilot_cli_discovery.py +++ b/tests/test_copilot_cli_discovery.py @@ -25,6 +25,7 @@ from unittest.mock import ANY, MagicMock, patch import scripts.coding_discovery_tools.utils as utils_mod +import scripts.coding_discovery_tools.mcp_extraction_helpers as mcp_helpers from scripts.coding_discovery_tools.ai_tools_discovery import AIToolsDetector from scripts.coding_discovery_tools.coding_tool_factory import ( CopilotCliMCPConfigExtractorFactory, @@ -493,6 +494,7 @@ def test_jsonc_strippers_are_single_source_of_truth(self): _MCP_MOD = ( "scripts.coding_discovery_tools.macos.copilot_cli.mcp_config_extractor" ) +_MCP_HELPERS_MOD = "scripts.coding_discovery_tools.mcp_extraction_helpers" class TestCopilotCliWorkspaceMcpExtraction(unittest.TestCase): @@ -2657,6 +2659,134 @@ def test_standalone_not_macos_subclass(self): self.assertNotIn(MacOSCopilotCliSkillsExtractor, WindowsCopilotCliSkillsExtractor.__mro__) +class TestAdminScanOwnHomeNotDoubleCounted(unittest.TestCase): + """WEB-4673 (bug 2): an admin all-users scan must not re-process the admin's + own home when it is already among the scanned user dirs (the Windows case, + where the admin is a normal C:\\Users\\ profile). On macOS the root home + (/var/root) is outside /Users, so it is still added.""" + + def setUp(self): + utils_mod._SENTRY_DSN = "" + self.tmp_dir = tempfile.mkdtemp() + self.home = Path(self.tmp_dir) / "Alice" + self.home.mkdir(parents=True) + + def tearDown(self): + shutil.rmtree(self.tmp_dir, ignore_errors=True) + + def test_helper_true_when_home_in_list(self): + with patch(f"{_MCP_HELPERS_MOD}.Path.home", return_value=self.home): + self.assertTrue(mcp_helpers._own_home_already_scanned([self.home])) + + def test_helper_false_when_home_outside_list(self): + other = Path(self.tmp_dir) / "Bob" + other.mkdir() + with patch(f"{_MCP_HELPERS_MOD}.Path.home", return_value=self.home): + self.assertFalse(mcp_helpers._own_home_already_scanned([other])) + + def test_global_helper_no_double_count(self): + """extract_ide_global_configs_with_root_support: own home processed once.""" + calls = [] + + def extract_func(uh): + calls.append(Path(uh)) + return [{"path": str(Path(uh) / ".copilot"), "mcpServers": [{"name": "s"}]}] + + with patch(f"{_MCP_HELPERS_MOD}._iter_admin_user_homes", return_value=[self.home]), \ + patch(f"{_MCP_HELPERS_MOD}.Path.home", return_value=self.home), \ + patch("platform.system", return_value="Darwin"): + result = mcp_helpers.extract_ide_global_configs_with_root_support(extract_func) + self.assertEqual(calls.count(self.home), 1, f"home processed {calls.count(self.home)}x") + self.assertEqual(len(result), 1) # not duplicated + + def test_global_helper_still_adds_root_home_outside_users(self): + """macOS case: root's own home (outside /Users) is still added separately.""" + root_home = Path(self.tmp_dir) / "var_root" + root_home.mkdir() + calls = [] + + def extract_func(uh): + calls.append(Path(uh)) + return [] + + with patch(f"{_MCP_HELPERS_MOD}._iter_admin_user_homes", return_value=[self.home]), \ + patch(f"{_MCP_HELPERS_MOD}.Path.home", return_value=root_home), \ + patch("platform.system", return_value="Darwin"): + mcp_helpers.extract_ide_global_configs_with_root_support(extract_func) + self.assertIn(self.home, calls) + self.assertIn(root_home, calls) # added via the root re-add step + + def test_dual_path_helper_no_double_count(self): + """extract_dual_path_configs_with_root_support: own preferred path read once + (the admin home is already covered by the loop, so the re-add is skipped).""" + preferred = self.home / "config.json" + preferred.write_text("{}", encoding="utf-8") + fallback = self.home / "fallback.json" # absent; preferred wins + calls = [] + + def extract_from_file(path): + calls.append(Path(path)) + return [{"path": str(Path(path).parent), "mcpServers": [{"name": "s"}]}] + + with patch(f"{_MCP_HELPERS_MOD}._iter_admin_user_homes", return_value=[self.home]), \ + patch(f"{_MCP_HELPERS_MOD}.Path.home", return_value=self.home), \ + patch("platform.system", return_value="Darwin"): + result = mcp_helpers.extract_dual_path_configs_with_root_support( + preferred, fallback, extract_from_file) + self.assertEqual(calls.count(preferred), 1, f"preferred read {calls.count(preferred)}x") + self.assertEqual(len(result), 1) # not duplicated + + def test_dual_path_helper_still_adds_root_home_outside_users(self): + """macOS case: root's own preferred path (outside /Users) is still read.""" + root_home = Path(self.tmp_dir) / "var_root" + root_home.mkdir() + preferred = root_home / "config.json" + preferred.write_text("{}", encoding="utf-8") + fallback = root_home / "fallback.json" + calls = [] + + def extract_from_file(path): + calls.append(Path(path)) + return [{"path": str(Path(path).parent), "mcpServers": [{"name": "s"}]}] + + with patch(f"{_MCP_HELPERS_MOD}._iter_admin_user_homes", return_value=[self.home]), \ + patch(f"{_MCP_HELPERS_MOD}.Path.home", return_value=root_home), \ + patch("platform.system", return_value="Darwin"): + mcp_helpers.extract_dual_path_configs_with_root_support( + preferred, fallback, extract_from_file) + self.assertIn(preferred, calls) # root home outside /Users -> re-add still fires + + def test_claudeai_helper_no_double_count(self): + """extract_claudeai_mcp_servers_with_root_support: own ~/.claude scanned once.""" + (self.home / ".claude").mkdir() + calls = [] + + def fake_scan(claude_dir, projects): + calls.append(Path(claude_dir)) + + with patch(f"{_MCP_HELPERS_MOD}._iter_admin_user_homes", return_value=[self.home]), \ + patch(f"{_MCP_HELPERS_MOD}.Path.home", return_value=self.home), \ + patch(f"{_MCP_HELPERS_MOD}.extract_claudeai_mcp_servers", side_effect=fake_scan), \ + patch("platform.system", return_value="Darwin"): + mcp_helpers.extract_claudeai_mcp_servers_with_root_support([]) + self.assertEqual(calls.count(self.home / ".claude"), 1) + + def test_plugin_helper_no_double_count(self): + """extract_claude_plugin_mcp_configs_with_root_support: own home not re-scanned.""" + calls = [] + + def fake_own(projects, plugin_lookup=None): + calls.append("own-home") + + with patch(f"{_MCP_HELPERS_MOD}._iter_admin_user_homes", return_value=[self.home]), \ + patch(f"{_MCP_HELPERS_MOD}.Path.home", return_value=self.home), \ + patch(f"{_MCP_HELPERS_MOD}.extract_claude_plugin_mcp_configs", side_effect=fake_own), \ + patch("platform.system", return_value="Darwin"): + mcp_helpers.extract_claude_plugin_mcp_configs_with_root_support([]) + # admin home already covered by the loop -> the own-home re-add is skipped + self.assertEqual(calls.count("own-home"), 0) + + # --------------------------------------------------------------------------- # 20. Linux skills user-level guard: a root/MDM scan must NOT double-count a # non-scanner user's ~/.agents (or ~/.claude) skills as project skills diff --git a/tests/test_cowork_residue_detection.py b/tests/test_cowork_residue_detection.py new file mode 100644 index 00000000..d41ae6d2 --- /dev/null +++ b/tests/test_cowork_residue_detection.py @@ -0,0 +1,289 @@ +"""Residue-vs-real detection tests for Claude Cowork (FIX 3). + +On Linux and Windows the detector used to report Cowork whenever the on-disk +session tree (``/Claude/local-agent-mode-sessions/``) existed. But the +per-user Claude config tree survives an uninstall (anthropics/claude-code#25013), +so the sessions dir alone is residue and produced false positives. Detection now +AND-requires a present Claude Desktop install (resolved by the OS detector's +``_find_install_dir``). macOS already AND-required ``/Applications/Claude.app`` +and is unchanged. + +Both routing entry points are covered: + +* the central ``_detect_claude_cowork`` (``user_tool_detector.py``) — the + production root/MDM path, which builds ``sessions_dir`` itself and delegates + the install check to ``detector._find_install_dir``; and +* the OS ``detect()`` modules (Windows / Linux). +""" + +import os +import tempfile +import unittest +from pathlib import Path +from unittest.mock import Mock, patch + +import scripts.coding_discovery_tools.utils as utils_mod +from scripts.coding_discovery_tools.claude_cowork_skills_helpers import COWORK_SESSIONS_DIR +from scripts.coding_discovery_tools.user_tool_detector import _detect_claude_cowork + +_MOD = "scripts.coding_discovery_tools.user_tool_detector" + + +def _make_detector(install_dir=None): + det = Mock() + det.tool_name = "Claude Cowork" + det.get_version.return_value = None + det._find_install_dir = Mock(return_value=install_dir) + return det + + +class TestCentralCoworkLinux(unittest.TestCase): + """Central ``_detect_claude_cowork`` — Linux branch (root/MDM path).""" + + def setUp(self): + utils_mod._SENTRY_DSN = "" + self.tmp = tempfile.TemporaryDirectory() + self.home = Path(self.tmp.name) + + def tearDown(self): + self.tmp.cleanup() + + def _make_sessions(self): + sdir = self.home / ".config" / "Claude" / COWORK_SESSIONS_DIR + sdir.mkdir(parents=True) + return sdir + + def test_residue_sessions_only_not_detected(self): + """Sessions tree present but NO install dir -> not detected (FP fix).""" + self._make_sessions() + det = _make_detector(install_dir=None) + with patch(f"{_MOD}.platform.system", return_value="Linux"): + result = _detect_claude_cowork(det, self.home) + self.assertIsNone(result) + + def test_sessions_plus_install_detected(self): + sdir = self._make_sessions() + det = _make_detector(install_dir=Path("/opt/Claude")) + with patch(f"{_MOD}.platform.system", return_value="Linux"): + result = _detect_claude_cowork(det, self.home) + self.assertIsNotNone(result) + self.assertEqual(result["name"], "Claude Cowork") + self.assertEqual(result["install_path"], str(sdir)) + + def test_no_sessions_not_detected_even_with_install(self): + det = _make_detector(install_dir=Path("/opt/Claude")) + with patch(f"{_MOD}.platform.system", return_value="Linux"): + result = _detect_claude_cowork(det, self.home) + self.assertIsNone(result) + # The install check is short-circuited (sessions absent first). + det._find_install_dir.assert_not_called() + + def test_detector_missing_find_install_dir_not_detected(self): + """Defensive: a detector without ``_find_install_dir`` (shouldn't happen + on Linux/Windows) -> not detected rather than crashing.""" + self._make_sessions() + det = Mock(spec=["tool_name", "get_version"]) + det.tool_name = "Claude Cowork" + with patch(f"{_MOD}.platform.system", return_value="Linux"): + result = _detect_claude_cowork(det, self.home) + self.assertIsNone(result) + + +class TestCentralCoworkWindows(unittest.TestCase): + """Central ``_detect_claude_cowork`` — Windows branch.""" + + def setUp(self): + utils_mod._SENTRY_DSN = "" + self.tmp = tempfile.TemporaryDirectory() + self.home = Path(self.tmp.name) + + def tearDown(self): + self.tmp.cleanup() + + def _make_sessions(self): + sdir = self.home / "AppData" / "Roaming" / "Claude" / COWORK_SESSIONS_DIR + sdir.mkdir(parents=True) + return sdir + + def test_residue_sessions_only_not_detected(self): + self._make_sessions() + det = _make_detector(install_dir=None) + with patch(f"{_MOD}.platform.system", return_value="Windows"): + result = _detect_claude_cowork(det, self.home) + self.assertIsNone(result) + + def test_sessions_plus_install_detected(self): + sdir = self._make_sessions() + det = _make_detector(install_dir=self.home / "AppData" / "Local" / "Programs" / "Claude") + with patch(f"{_MOD}.platform.system", return_value="Windows"): + result = _detect_claude_cowork(det, self.home) + self.assertIsNotNone(result) + self.assertEqual(result["install_path"], str(sdir)) + + +class TestCentralCoworkMacUnchanged(unittest.TestCase): + """macOS branch is unchanged: it AND-gates ``/Applications/Claude.app`` and + does NOT consult ``_find_install_dir``.""" + + def setUp(self): + utils_mod._SENTRY_DSN = "" + self.tmp = tempfile.TemporaryDirectory() + self.home = Path(self.tmp.name) + + def tearDown(self): + self.tmp.cleanup() + + def _make_sessions(self): + sdir = self.home / "Library" / "Application Support" / "Claude" / COWORK_SESSIONS_DIR + sdir.mkdir(parents=True) + return sdir + + def test_app_absent_not_detected(self): + self._make_sessions() + det = _make_detector(install_dir=None) + with patch(f"{_MOD}.platform.system", return_value="Darwin"), \ + patch(f"{_MOD}.Path.exists", return_value=False): + result = _detect_claude_cowork(det, self.home) + self.assertIsNone(result) + + @unittest.skipIf(os.name == "nt", "POSIX-only: macOS /Applications/Claude.app path semantics (backslash on Windows)") + def test_app_present_and_sessions_detected_without_find_install_dir(self): + sdir = self._make_sessions() + det = _make_detector(install_dir=None) + real_exists = Path.exists + + def fake_exists(self): + if str(self) == "/Applications/Claude.app": + return True + return real_exists(self) + + with patch(f"{_MOD}.platform.system", return_value="Darwin"), \ + patch("pathlib.Path.exists", fake_exists): + result = _detect_claude_cowork(det, self.home) + self.assertIsNotNone(result) + self.assertEqual(result["install_path"], str(sdir)) + # macOS path never consults the install-dir delegate. + det._find_install_dir.assert_not_called() + + +# ── OS detect() modules ────────────────────────────────────────────────────── + +_WIN_MOD = "scripts.coding_discovery_tools.windows.claude_cowork.claude_cowork" +_LINUX_MOD = "scripts.coding_discovery_tools.linux.claude_cowork.claude_cowork" + + +class TestWindowsCoworkDetect(unittest.TestCase): + def setUp(self): + utils_mod._SENTRY_DSN = "" + from scripts.coding_discovery_tools.windows.claude_cowork.claude_cowork import ( + WindowsClaudeCoworkDetector, + ) + self.Detector = WindowsClaudeCoworkDetector + self.tmp = tempfile.TemporaryDirectory() + self.home = Path(self.tmp.name) + self.appdata = self.home / "AppData" / "Roaming" + + def tearDown(self): + self.tmp.cleanup() + + def _make_sessions(self): + sdir = self.appdata / "Claude" / COWORK_SESSIONS_DIR + sdir.mkdir(parents=True) + return sdir + + def test_residue_sessions_only_not_detected(self): + """Sessions present, no install dir on disk -> not detected (FP fix).""" + self._make_sessions() + with patch.dict(os.environ, {"APPDATA": str(self.appdata)}), \ + patch.object(self.Detector, "_find_install_dir", return_value=None): + self.assertIsNone(self.Detector().detect()) + + def test_sessions_plus_install_detected(self): + sdir = self._make_sessions() + install = self.home / "AppData" / "Local" / "Programs" / "Claude" + with patch.dict(os.environ, {"APPDATA": str(self.appdata)}), \ + patch.object(self.Detector, "_find_install_dir", return_value=install): + result = self.Detector().detect() + self.assertIsNotNone(result) + # install_path is the gated SESSIONS dir (consistent with macOS + central path). + self.assertEqual(result["install_path"], str(sdir)) + + def test_no_appdata_not_detected(self): + with patch.dict(os.environ, {}, clear=True): + self.assertIsNone(self.Detector().detect()) + + def test_central_scan_uses_scanned_user_home_not_scanner(self): + """Regression (admin/MDM FN): the central path must resolve the install + dir under the SCANNED user's home, not the scanner's Path.home(). User B + has the per-user install; the scanner home has none -> B is still + detected. (Real _find_install_dir, not mocked.)""" + b_sessions = self.home / "AppData" / "Roaming" / "Claude" / COWORK_SESSIONS_DIR + b_sessions.mkdir(parents=True) + (self.home / "AppData" / "Local" / "Programs" / "Claude").mkdir(parents=True) + scanner_home = Path(self.tmp.name + "_scanner") + scanner_home.mkdir() + with patch(f"{_MOD}.platform.system", return_value="Windows"), \ + patch(f"{_WIN_MOD}.Path.home", return_value=scanner_home): + result = _detect_claude_cowork(self.Detector(), self.home) + self.assertIsNotNone(result) + self.assertEqual(result["install_path"], str(b_sessions)) + + def test_central_scan_scanner_install_not_attributed_to_other_user(self): + """Inverse: install only under the scanner's home, user B has only the + sessions residue -> B not detected (no cross-user attribution).""" + b_sessions = self.home / "AppData" / "Roaming" / "Claude" / COWORK_SESSIONS_DIR + b_sessions.mkdir(parents=True) + scanner_home = Path(self.tmp.name + "_scanner") + (scanner_home / "AppData" / "Local" / "Programs" / "Claude").mkdir(parents=True) + with patch(f"{_MOD}.platform.system", return_value="Windows"), \ + patch(f"{_WIN_MOD}.Path.home", return_value=scanner_home): + result = _detect_claude_cowork(self.Detector(), self.home) + self.assertIsNone(result) + + +class TestLinuxCoworkDetect(unittest.TestCase): + def setUp(self): + utils_mod._SENTRY_DSN = "" + from scripts.coding_discovery_tools.linux.claude_cowork.claude_cowork import ( + LinuxClaudeCoworkDetector, + ) + self.Detector = LinuxClaudeCoworkDetector + self.tmp = tempfile.TemporaryDirectory() + self.home = Path(self.tmp.name) + + def tearDown(self): + self.tmp.cleanup() + + def _make_sessions(self): + sdir = self.home / ".config" / "Claude" / COWORK_SESSIONS_DIR + sdir.mkdir(parents=True) + return sdir + + def test_residue_sessions_only_not_detected(self): + self._make_sessions() + with patch(f"{_LINUX_MOD}.get_linux_user_homes", return_value=[self.home]), \ + patch.object(self.Detector, "_find_install_dir", return_value=None): + self.assertIsNone(self.Detector().detect()) + + def test_sessions_plus_install_detected(self): + sdir = self._make_sessions() + with patch(f"{_LINUX_MOD}.get_linux_user_homes", return_value=[self.home]), \ + patch.object(self.Detector, "_find_install_dir", return_value=Path("/opt/Claude")): + result = self.Detector().detect() + self.assertIsNotNone(result) + # install_path is the gated SESSIONS dir (consistent with macOS + central path). + self.assertEqual(result["install_path"], str(sdir)) + + def test_multi_user_residue_does_not_leak(self): + """Two users with sessions but NO install -> not detected for either.""" + self._make_sessions() + home2 = Path(self.tmp.name + "_2") + home2.mkdir() + (home2 / ".config" / "Claude" / COWORK_SESSIONS_DIR).mkdir(parents=True) + with patch(f"{_LINUX_MOD}.get_linux_user_homes", return_value=[self.home, home2]), \ + patch.object(self.Detector, "_find_install_dir", return_value=None): + self.assertIsNone(self.Detector().detect()) + + +if __name__ == "__main__": + unittest.main() diff --git a/tests/test_github_copilot_vscode_detection.py b/tests/test_github_copilot_vscode_detection.py index 6824068b..27187a8c 100644 --- a/tests/test_github_copilot_vscode_detection.py +++ b/tests/test_github_copilot_vscode_detection.py @@ -204,18 +204,54 @@ def test_builtin_not_attributed_when_user_does_not_use_vscode(self): self.assertEqual(self._detect(), []) def test_marketplace_present_does_not_invoke_builtin_fallback(self): - """Existing Windows glob path unchanged: a marketplace github.copilot* - folder is detected and the new built-in fallback is never consulted.""" + """Windows now reads the LIVE extensions.json registry (matching + macOS/Linux): a registered github.copilot entry is detected and the + built-in fallback is never consulted.""" self._make_code_user_dir() - mk = self.user_home / ".vscode" / "extensions" / "github.copilot-1.250.0" - mk.mkdir(parents=True) - (mk / "package.json").write_text(json.dumps({"version": "1.250.0"}), encoding="utf-8") + reg = self.user_home / ".vscode" / "extensions" / "extensions.json" + reg.parent.mkdir(parents=True) + reg.write_text( + json.dumps([{"identifier": {"id": "github.copilot"}, "version": "1.250.0"}]), + encoding="utf-8", + ) with patch.object(self.Detector, "_detect_vscode_builtin_copilot") as spy, \ patch(f"{_WIN_MOD}._VSCODE_SYSTEM_APP_EXTENSION_ROOTS", [self.app_ext]): res = self.Detector()._detect_vscode_for_user(self.user_home) spy.assert_not_called() self.assertEqual([r["version"] for r in res], ["1.250.0"]) + def test_residue_extension_folder_only_not_detected(self): + """REGRESSION GUARD (FIX 2): an uninstalled Copilot whose extension + FOLDER survives (microsoft/vscode#81046) but is absent from the live + extensions.json registry must NOT be detected. The old folder glob + produced a phantom row here; the registry read does not. The built-in + fallback is still consulted (and finds nothing, since no Code/User dir).""" + # Surviving extension folder, but extensions.json lists nothing. + ext_dir = self.user_home / ".vscode" / "extensions" + residue = ext_dir / "github.copilot-1.250.0" + residue.mkdir(parents=True) + (residue / "package.json").write_text(json.dumps({"version": "1.250.0"}), encoding="utf-8") + (ext_dir / "extensions.json").write_text(json.dumps([]), encoding="utf-8") + with patch(f"{_WIN_MOD}._VSCODE_SYSTEM_APP_EXTENSION_ROOTS", [self.app_ext]): + res = self.Detector()._detect_vscode_for_user(self.user_home) + self.assertEqual(res, []) + + def test_live_registry_entry_detected(self): + """A live github.copilot-chat registry entry -> detected with the + registered version and the extensions dir as install_path.""" + ext_dir = self.user_home / ".vscode" / "extensions" + ext_dir.mkdir(parents=True) + (ext_dir / "extensions.json").write_text( + json.dumps([{"identifier": {"id": "github.copilot-chat"}, "version": "0.30.0"}]), + encoding="utf-8", + ) + with patch(f"{_WIN_MOD}._VSCODE_SYSTEM_APP_EXTENSION_ROOTS", [self.app_ext]): + res = self.Detector()._detect_vscode_for_user(self.user_home) + self.assertEqual(len(res), 1) + self.assertEqual(res[0]["name"], "GitHub Copilot Chat (VS Code)") + self.assertEqual(res[0]["version"], "0.30.0") + self.assertEqual(res[0]["install_path"], str(ext_dir)) + def test_builtin_plain_copilot_labeled_generic(self): self._make_code_user_dir() (self.copilot / "package.json").write_text( diff --git a/tests/test_junie_residue_detection.py b/tests/test_junie_residue_detection.py new file mode 100644 index 00000000..bced9abb --- /dev/null +++ b/tests/test_junie_residue_detection.py @@ -0,0 +1,289 @@ +"""Residue-vs-real detection tests for Junie (FIX 1). + +The fix stops treating the ``~/.junie`` directory as proof of installation. +``~/.junie`` is a user-authored guidelines dir (AGENTS.md / config.json): it +survives an uninstall AND is created by usage rather than install, so gating on +it produced false positives. Detection now gates on a real install signal — the +Junie CLI *binary* (via ``find_junie_binary_for_user``) OR the *Junie plugin* +present in a JetBrains IDE. ``~/.junie`` stays the version source and the +rules/MCP extraction source. + +Both routing entry points are exercised: + +* ``detect_tool_for_user`` -> ``_detect_junie`` — the production root/MDM + central path (``user_tool_detector.py``). +* ``find_junie_binary_for_user`` — the per-user binary resolver, including the + root owner-attribution guard. + +Both directions are proven: real binary/plugin -> detected (false-NEGATIVE +guard); residue-only ``~/.junie`` -> NOT detected (the FP fix). +""" + +import os +import tempfile +import unittest +from pathlib import Path +from unittest.mock import Mock, patch + +import scripts.coding_discovery_tools.utils as utils_mod +from scripts.coding_discovery_tools.user_tool_detector import ( + detect_tool_for_user, + find_junie_binary_for_user, +) + +_MOD = "scripts.coding_discovery_tools.user_tool_detector" +_UTILS = "scripts.coding_discovery_tools.utils" + +_HOMEBREW = Path("/opt/homebrew/bin/junie") +_USR_LOCAL = Path("/usr/local/bin/junie") +_ABS_LITERALS = (_HOMEBREW, _USR_LOCAL) + + +def _isolate_abs(present: Path = None): + """Mask the two absolute junie literals as absent (so a real install on the + test host can't leak), except an optional ``present`` target which also + reports executable. All other paths fall through to the real os/Path.""" + real_exists = Path.exists + real_access = os.access + + def fake_exists(self): + if self in _ABS_LITERALS: + return self == present + return real_exists(self) + + def fake_access(path, mode): + if present is not None and str(path) == str(present): + return True + if Path(path) in _ABS_LITERALS: + return False + return real_access(path, mode) + + return patch("pathlib.Path.exists", fake_exists), patch.object(os, "access", fake_access) + + +def _stat_for_uid(target: Path, uid: int): + real_stat = os.stat + + def fake_stat(path, *args, **kwargs): + if str(path) == str(target): + return Mock(st_uid=uid) + return real_stat(path, *args, **kwargs) + + return fake_stat + + +def _pwd_home(uid_to_home: dict): + def fake_getpwuid(uid): + if uid in uid_to_home: + return Mock(pw_dir=str(uid_to_home[uid])) + raise KeyError(uid) + + return fake_getpwuid + + +def _make_junie_detector(plugin_path=None): + """A junie detector stub for the central ``_detect_junie`` path. The plugin + signal is delegated via ``_has_junie_jetbrains_plugin``; default None.""" + det = Mock() + det.tool_name = "Junie" + det._has_junie_jetbrains_plugin = Mock(return_value=plugin_path) + return det + + +def _make_exec(path: Path) -> Path: + path.parent.mkdir(parents=True, exist_ok=True) + path.write_text("#!/bin/sh\n") + os.chmod(path, 0o755) + return path + + +class TestJunieCentralPathPosix(unittest.TestCase): + """``detect_tool_for_user`` -> ``_detect_junie`` (root/MDM path), POSIX.""" + + def setUp(self): + utils_mod._SENTRY_DSN = "" + self.tmp = tempfile.TemporaryDirectory() + self.home = Path(self.tmp.name) + self._install_isolation() + + def tearDown(self): + self.tmp.cleanup() + + def _install_isolation(self, present: Path = None): + p_exists, p_access = _isolate_abs(present) + p_exists.start() + p_access.start() + self.addCleanup(p_exists.stop) + self.addCleanup(p_access.stop) + + # --- residue-only: NOT detected -------------------------------------- + + def test_residue_junie_dir_only_not_detected(self): + """Only ``~/.junie`` (with config.json) present, no binary, no plugin + -> not detected. The central FP fix.""" + cdir = self.home / ".junie" + cdir.mkdir() + (cdir / "config.json").write_text('{"version": "1.0.0"}') + det = _make_junie_detector(plugin_path=None) + with patch(f"{_MOD}.platform.system", return_value="Darwin"), \ + patch(f"{_MOD}.is_running_as_root", return_value=True), \ + patch(f"{_MOD}.run_command", return_value=None): + result = detect_tool_for_user(det, self.home) + self.assertIsNone(result) + + # --- real binary: detected ------------------------------------------- + + def test_local_bin_binary_detected(self): + junie = _make_exec(self.home / ".local" / "bin" / "junie") + (self.home / ".junie").mkdir() + (self.home / ".junie" / "config.json").write_text('{"version": "3.4.5"}') + det = _make_junie_detector(plugin_path=None) + with patch(f"{_MOD}.platform.system", return_value="Darwin"), \ + patch(f"{_MOD}.is_running_as_root", return_value=True), \ + patch(f"{_MOD}.run_command", return_value=None): + result = detect_tool_for_user(det, self.home) + self.assertIsNotNone(result) + self.assertEqual(result["name"], "Junie") + self.assertEqual(result["install_path"], str(junie)) + self.assertEqual(result["version"], "3.4.5") + + def test_versioned_binary_detected_newest(self): + base = self.home / ".local" / "share" / "junie" / "versions" + _make_exec(base / "1.9.0" / "junie") + newest = _make_exec(base / "1.10.0" / "junie") + det = _make_junie_detector(plugin_path=None) + with patch(f"{_MOD}.platform.system", return_value="Darwin"), \ + patch(f"{_MOD}.is_running_as_root", return_value=True), \ + patch(f"{_MOD}.run_command", return_value=None): + result = detect_tool_for_user(det, self.home) + self.assertIsNotNone(result) + # Numeric version sort picks 1.10.0 over 1.9.0 (not a string sort). + self.assertEqual(result["install_path"], str(newest)) + + # --- plugin signal: detected ----------------------------------------- + + def test_jetbrains_plugin_detected_when_no_binary(self): + det = _make_junie_detector(plugin_path="/cfg/PyCharm2024.1") + with patch(f"{_MOD}.platform.system", return_value="Darwin"), \ + patch(f"{_MOD}.is_running_as_root", return_value=True), \ + patch(f"{_MOD}.run_command", return_value=None): + result = detect_tool_for_user(det, self.home) + self.assertIsNotNone(result) + self.assertEqual(result["install_path"], "/cfg/PyCharm2024.1") + + def test_version_unknown_when_no_config(self): + _make_exec(self.home / ".local" / "bin" / "junie") + det = _make_junie_detector(plugin_path=None) + with patch(f"{_MOD}.platform.system", return_value="Darwin"), \ + patch(f"{_MOD}.is_running_as_root", return_value=True), \ + patch(f"{_MOD}.run_command", return_value=None): + result = detect_tool_for_user(det, self.home) + self.assertEqual(result["version"], "Unknown") + + +class TestFindJunieBinaryRootAttribution(unittest.TestCase): + """``find_junie_binary_for_user`` machine-global owner attribution + guards.""" + + def setUp(self): + utils_mod._SENTRY_DSN = "" + self.tmp = tempfile.TemporaryDirectory() + self.home = Path(self.tmp.name) + self._install_isolation() + + def tearDown(self): + self.tmp.cleanup() + + def _install_isolation(self, present: Path = None): + p_exists, p_access = _isolate_abs(present) + p_exists.start() + p_access.start() + self.addCleanup(p_exists.stop) + self.addCleanup(p_access.stop) + + @unittest.skipIf(os.name == "nt", "POSIX-only: machine-global owner attribution uses pwd (absent on Windows)") + def test_homebrew_skipped_when_root_owned_by_other(self): + """Under root, /opt/homebrew/bin/junie owned by a DIFFERENT user's home + is skipped (not fanned out); no user-local binary -> None.""" + self._install_isolation(_HOMEBREW) + other = self.home.parent / "someone_else" + with patch(f"{_MOD}.platform.system", return_value="Darwin"), \ + patch(f"{_MOD}.is_running_as_root", return_value=True), \ + patch(f"{_UTILS}.os.stat", side_effect=_stat_for_uid(_HOMEBREW, 502)), \ + patch(f"{_UTILS}.pwd.getpwuid", side_effect=_pwd_home({502: other})), \ + patch(f"{_MOD}.run_command", return_value=None): + self.assertIsNone(find_junie_binary_for_user(self.home)) + + @unittest.skipIf(os.name == "nt", "POSIX-only: machine-global owner attribution uses pwd (absent on Windows)") + def test_homebrew_attributed_when_root_owned_by_this_user(self): + """Under root, /opt/homebrew/bin/junie owned by THIS user's home is + attributed (returned).""" + self._install_isolation(_HOMEBREW) + with patch(f"{_MOD}.platform.system", return_value="Darwin"), \ + patch(f"{_MOD}.is_running_as_root", return_value=True), \ + patch(f"{_UTILS}.os.stat", side_effect=_stat_for_uid(_HOMEBREW, 501)), \ + patch(f"{_UTILS}.pwd.getpwuid", side_effect=_pwd_home({501: self.home})), \ + patch(f"{_MOD}.run_command", return_value=None): + self.assertEqual(find_junie_binary_for_user(self.home), str(_HOMEBREW)) + + def test_which_backstop_skipped_when_root(self): + """The ``which junie`` PATH backstop is skipped under root (it resolves + the scanner's PATH). With no user-local binary -> None, and run_command + is never called.""" + run_mock = Mock(return_value=str(self.home / "x" / "junie")) + with patch(f"{_MOD}.platform.system", return_value="Darwin"), \ + patch(f"{_MOD}.is_running_as_root", return_value=True), \ + patch(f"{_MOD}.run_command", run_mock): + self.assertIsNone(find_junie_binary_for_user(self.home)) + run_mock.assert_not_called() + + def test_which_backstop_used_when_not_root(self): + target = _make_exec(self.home / "custom" / "junie") + with patch(f"{_MOD}.platform.system", return_value="Darwin"), \ + patch(f"{_MOD}.is_running_as_root", return_value=False), \ + patch(f"{_MOD}.run_command", return_value=str(target)): + self.assertEqual(find_junie_binary_for_user(self.home), str(target)) + + +class TestFindJunieBinaryWindows(unittest.TestCase): + """Windows candidate list (existence-gated, no X_OK semantics).""" + + def setUp(self): + utils_mod._SENTRY_DSN = "" + self.tmp = tempfile.TemporaryDirectory() + self.home = Path(self.tmp.name) + + def tearDown(self): + self.tmp.cleanup() + + def test_local_bin_exe_detected(self): + exe = self.home / ".local" / "bin" / "junie.exe" + exe.parent.mkdir(parents=True, exist_ok=True) + exe.write_text("") + with patch(f"{_MOD}.platform.system", return_value="Windows"): + self.assertEqual(find_junie_binary_for_user(self.home), str(exe)) + + def test_npm_cmd_detected(self): + cmd = self.home / "AppData" / "Roaming" / "npm" / "junie.cmd" + cmd.parent.mkdir(parents=True, exist_ok=True) + cmd.write_text("") + with patch(f"{_MOD}.platform.system", return_value="Windows"): + self.assertEqual(find_junie_binary_for_user(self.home), str(cmd)) + + def test_versioned_exe_detected_newest(self): + base = self.home / ".local" / "share" / "junie" / "versions" + (base / "1.9.0").mkdir(parents=True) + (base / "1.9.0" / "junie.exe").write_text("") + (base / "1.10.0").mkdir(parents=True) + newest = base / "1.10.0" / "junie.exe" + newest.write_text("") + with patch(f"{_MOD}.platform.system", return_value="Windows"): + self.assertEqual(find_junie_binary_for_user(self.home), str(newest)) + + def test_residue_dir_only_returns_none(self): + (self.home / ".junie").mkdir() + with patch(f"{_MOD}.platform.system", return_value="Windows"): + self.assertIsNone(find_junie_binary_for_user(self.home)) + + +if __name__ == "__main__": + unittest.main() diff --git a/tests/test_junie_skills_extraction.py b/tests/test_junie_skills_extraction.py index 26ba6b33..d7a88602 100644 --- a/tests/test_junie_skills_extraction.py +++ b/tests/test_junie_skills_extraction.py @@ -9,6 +9,7 @@ """ import json +import os import shutil import sys import tempfile @@ -45,7 +46,47 @@ def test_user_level_rule_returns_home(self): self.assertEqual(find_junie_project_root(rule), Path("/Users/test")) +# Module path for the central binary/plugin resolver the junie detectors call. +_UTD = "scripts.coding_discovery_tools.user_tool_detector" + + +def _make_exec(path: Path) -> Path: + """Create an executable file (a fake junie binary) at ``path``.""" + path.parent.mkdir(parents=True, exist_ok=True) + path.write_text("#!/bin/sh\n") + os.chmod(path, 0o755) + return path + + +def _write_jetbrains_plugin(ide_config_dir: Path, plugin_id: str, plugin_name: str) -> None: + """Write a realistic JetBrains plugin (``plugins//META-INF/plugin.xml``) + under an IDE config dir so the detector's real ``_get_plugins`` reads it.""" + meta_inf = ide_config_dir / "plugins" / plugin_id / "META-INF" + meta_inf.mkdir(parents=True, exist_ok=True) + (meta_inf / "plugin.xml").write_text( + f"{plugin_id}{plugin_name}", + encoding="utf-8", + ) + + +def _macos_jetbrains_ide_dir(user_home: Path, ide_folder: str) -> Path: + """Path to a per-user JetBrains IDE config dir on macOS, created on disk.""" + ide_dir = user_home / "Library" / "Application Support" / "JetBrains" / ide_folder + ide_dir.mkdir(parents=True, exist_ok=True) + return ide_dir + + +def _windows_jetbrains_ide_dir(user_home: Path, ide_folder: str) -> Path: + """Path to a per-user JetBrains IDE config dir on Windows, created on disk.""" + ide_dir = user_home / "AppData" / "Roaming" / "JetBrains" / ide_folder + ide_dir.mkdir(parents=True, exist_ok=True) + return ide_dir + + class TestMacOSJunieDetector(unittest.TestCase): + """Detection now gates on the junie binary OR a Junie JetBrains plugin — + NOT on the ``~/.junie`` guidelines dir, which is residue that survives + uninstall. ``~/.junie`` stays the version source only.""" def setUp(self): self._tmp = tempfile.mkdtemp() @@ -55,30 +96,124 @@ def setUp(self): def tearDown(self): shutil.rmtree(self._tmp, ignore_errors=True) + # --- residue-only: NOT detected (the FP fix / regression guard) ------- + + @patch(f"{_UTD}.run_command", return_value=None) + @patch(f"{_UTD}.platform.system", return_value="Darwin") + @patch(f"{_UTD}.is_running_as_root", return_value=False) @patch("scripts.coding_discovery_tools.macos.junie.junie.is_running_as_root", return_value=False) @patch("scripts.coding_discovery_tools.macos.junie.junie.Path.home") - def test_detects_when_junie_dir_exists(self, mock_home, _): + def test_residue_junie_dir_only_not_detected(self, mock_home, *_): mock_home.return_value = self.home (self.home / ".junie").mkdir() + with patch.object(MacOSJunieDetector, "_has_junie_jetbrains_plugin", return_value=None): + self.assertIsNone(MacOSJunieDetector().detect()) + + @patch("scripts.coding_discovery_tools.macos.junie.junie.is_running_as_root", return_value=False) + @patch("scripts.coding_discovery_tools.macos.junie.junie.Path.home") + def test_returns_none_when_nothing_present(self, mock_home, _): + mock_home.return_value = self.home + with patch.object(MacOSJunieDetector, "_has_junie_jetbrains_plugin", return_value=None): + self.assertIsNone(MacOSJunieDetector().detect()) + + # --- real install signals: detected ----------------------------------- + + @patch(f"{_UTD}.run_command", return_value=None) + @patch(f"{_UTD}.platform.system", return_value="Darwin") + @patch(f"{_UTD}.is_running_as_root", return_value=False) + @patch("scripts.coding_discovery_tools.macos.junie.junie.is_running_as_root", return_value=False) + @patch("scripts.coding_discovery_tools.macos.junie.junie.Path.home") + def test_detects_when_binary_present(self, mock_home, *_): + mock_home.return_value = self.home + junie_bin = _make_exec(self.home / ".local" / "bin" / "junie") result = MacOSJunieDetector().detect() self.assertIsNotNone(result) self.assertEqual(result["name"], "Junie") + self.assertEqual(result["install_path"], str(junie_bin)) + @patch(f"{_UTD}.run_command", return_value=None) + @patch(f"{_UTD}.platform.system", return_value="Darwin") + @patch(f"{_UTD}.is_running_as_root", return_value=False) @patch("scripts.coding_discovery_tools.macos.junie.junie.is_running_as_root", return_value=False) @patch("scripts.coding_discovery_tools.macos.junie.junie.Path.home") - def test_returns_none_when_no_junie_dir(self, mock_home, _): + def test_detects_when_jetbrains_plugin_present(self, mock_home, *_): mock_home.return_value = self.home - self.assertIsNone(MacOSJunieDetector().detect()) + plugin_path = "/Users/test/Library/Application Support/JetBrains/PyCharm2024.1" + with patch.object(MacOSJunieDetector, "_has_junie_jetbrains_plugin", return_value=plugin_path): + result = MacOSJunieDetector().detect() + self.assertIsNotNone(result) + self.assertEqual(result["install_path"], plugin_path) + @patch(f"{_UTD}.run_command", return_value=None) + @patch(f"{_UTD}.platform.system", return_value="Darwin") + @patch(f"{_UTD}.is_running_as_root", return_value=False) @patch("scripts.coding_discovery_tools.macos.junie.junie.is_running_as_root", return_value=False) @patch("scripts.coding_discovery_tools.macos.junie.junie.Path.home") - def test_version_read_from_config_json(self, mock_home, _): + def test_version_read_from_config_json_when_binary_present(self, mock_home, *_): mock_home.return_value = self.home + _make_exec(self.home / ".local" / "bin" / "junie") junie_dir = self.home / ".junie" junie_dir.mkdir() (junie_dir / "config.json").write_text(json.dumps({"version": "1.2.3"})) self.assertEqual(MacOSJunieDetector().detect()["version"], "1.2.3") + def test_jetbrains_plugin_helper_matches_junie(self): + """The plugin helper matches a real on-disk JetBrains IDE that has a + 'Junie' plugin and returns its config path; a Copilot-only IDE yields + None. Exercises the real per-user ``_scan_jetbrains_config_dir`` + + ``_get_plugins`` path (no mocked-out helper) so the ``config_path`` key + wiring is covered.""" + det = MacOSJunieDetector() + + ide_dir = _macos_jetbrains_ide_dir(self.home, "PyCharm2024.1") + _write_jetbrains_plugin(ide_dir, "github-copilot", "GitHub Copilot") + # Junie-only present: matched, returns the scoped config path. + _write_jetbrains_plugin(ide_dir, "intellij-junie", "Junie") + self.assertEqual(det._has_junie_jetbrains_plugin(self.home), str(ide_dir)) + + def test_jetbrains_plugin_helper_no_junie_returns_none(self): + """A Copilot-only IDE (no Junie plugin) yields None.""" + det = MacOSJunieDetector() + ide_dir = _macos_jetbrains_ide_dir(self.home, "PyCharm2024.1") + _write_jetbrains_plugin(ide_dir, "github-copilot", "GitHub Copilot") + self.assertIsNone(det._has_junie_jetbrains_plugin(self.home)) + + @patch("scripts.coding_discovery_tools.macos.jetbrains.jetbrains.is_running_as_root", return_value=False) + @patch("scripts.coding_discovery_tools.macos.jetbrains.jetbrains.Path.home") + def test_cross_user_scan_not_misattributed(self, mock_home, _mock_root): + """REGRESSION (cross-user FP): user A has the Junie JetBrains plugin and + user B does NOT. Asking the helper about user B must return None — B must + never inherit A's plugin. + + The bug mechanism: ``MacOSJetBrainsDetector.detect()`` ignores the + ``user_home`` the junie helper sets and scans ``Path.home()`` (here = user + A). The old ``detect()``-based helper therefore returned user A's Junie + config path even when asked about user B. The per-user scoped scan reads + only user B's config dir, so it correctly returns None. + + FAILS before the C1 fix, PASSES after. + """ + users = Path(self._tmp) / "Users" + user_a = users / "userA_has_junie" + user_b = users / "userB_no_junie" + user_a.mkdir(parents=True) + user_b.mkdir(parents=True) + # detect() (old code) scans Path.home() — pin it to user A. + mock_home.return_value = user_a + + # user A has a JetBrains IDE with the Junie plugin. + ide_a = _macos_jetbrains_ide_dir(user_a, "PyCharm2024.1") + _write_jetbrains_plugin(ide_a, "intellij-junie", "Junie") + # user B has a JetBrains IDE too, but only Copilot — no Junie. + ide_b = _macos_jetbrains_ide_dir(user_b, "PyCharm2024.1") + _write_jetbrains_plugin(ide_b, "github-copilot", "GitHub Copilot") + + det = MacOSJunieDetector() + # user B must NOT be credited with user A's Junie plugin. + self.assertIsNone(det._has_junie_jetbrains_plugin(user_b)) + # user A is still correctly detected (no regression of the happy path). + self.assertEqual(det._has_junie_jetbrains_plugin(user_a), str(ide_a)) + class TestMacOSJunieRulesExtractor(unittest.TestCase): @@ -136,17 +271,89 @@ def setUp(self): def tearDown(self): shutil.rmtree(self._tmp, ignore_errors=True) - def test_detect_via_scan_finds_user(self): + def test_residue_junie_dir_only_not_detected(self): + """A bare ``%USERPROFILE%\\.junie`` (guidelines residue) is NOT a real + install -> not detected (the FP fix / regression guard).""" (self.home / ".junie").mkdir() def fake_scan(cb): cb(self.home) - with patch("scripts.coding_discovery_tools.windows.junie.junie.scan_windows_user_directories", side_effect=fake_scan): + with patch("scripts.coding_discovery_tools.windows.junie.junie.scan_windows_user_directories", side_effect=fake_scan), \ + patch(f"{_UTD}.platform.system", return_value="Windows"), \ + patch.object(WindowsJunieDetector, "_has_junie_jetbrains_plugin", return_value=None): + result = WindowsJunieDetector().detect() + + self.assertIsNone(result) + + def test_detect_via_scan_finds_user_with_binary(self): + """A real junie binary (``~/.local/bin/junie.exe``) -> detected.""" + junie_bin = self.home / ".local" / "bin" / "junie.exe" + junie_bin.parent.mkdir(parents=True, exist_ok=True) + junie_bin.write_text("") + + def fake_scan(cb): + cb(self.home) + + with patch("scripts.coding_discovery_tools.windows.junie.junie.scan_windows_user_directories", side_effect=fake_scan), \ + patch(f"{_UTD}.platform.system", return_value="Windows"): result = WindowsJunieDetector().detect() self.assertIsNotNone(result) self.assertEqual(result["name"], "Junie") + self.assertEqual(result["install_path"], str(junie_bin)) + + def test_detect_via_scan_finds_user_with_plugin(self): + """A Junie JetBrains plugin (no binary) -> detected.""" + def fake_scan(cb): + cb(self.home) + + with patch("scripts.coding_discovery_tools.windows.junie.junie.scan_windows_user_directories", side_effect=fake_scan), \ + patch(f"{_UTD}.platform.system", return_value="Windows"), \ + patch.object(WindowsJunieDetector, "_has_junie_jetbrains_plugin", return_value="C:\\cfg\\PyCharm"): + result = WindowsJunieDetector().detect() + + self.assertIsNotNone(result) + self.assertEqual(result["install_path"], "C:\\cfg\\PyCharm") + + def test_jetbrains_plugin_helper_matches_junie(self): + """The helper matches a real on-disk JetBrains IDE with a 'Junie' plugin + and returns its per-user config path (via the ``_config_path`` key); a + Copilot-only IDE yields None. Exercises the real Windows + ``AppData/Roaming/JetBrains`` user-scoped ``detect()`` + ``_get_plugins`` + path (no mocked-out helper).""" + det = WindowsJunieDetector() + + ide_dir = _windows_jetbrains_ide_dir(self.home, "PyCharm2024.1") + _write_jetbrains_plugin(ide_dir, "github-copilot", "GitHub Copilot") + _write_jetbrains_plugin(ide_dir, "intellij-junie", "Junie") + self.assertEqual(det._has_junie_jetbrains_plugin(self.home), str(ide_dir)) + + def test_jetbrains_plugin_helper_no_junie_returns_none(self): + """A Copilot-only IDE (no Junie plugin) yields None.""" + det = WindowsJunieDetector() + ide_dir = _windows_jetbrains_ide_dir(self.home, "PyCharm2024.1") + _write_jetbrains_plugin(ide_dir, "github-copilot", "GitHub Copilot") + self.assertIsNone(det._has_junie_jetbrains_plugin(self.home)) + + def test_cross_user_scan_not_misattributed(self): + """REGRESSION (cross-user FP): user A has the Junie plugin, user B does + NOT. The Windows JetBrains detector is user_home-scoped, and the junie + helper additionally guards that the matched config path is under + ``user_home``. Asking about user B returns None; user A still matches.""" + user_a = Path(self._tmp) / "Users" / "userA_has_junie" + user_b = Path(self._tmp) / "Users" / "userB_no_junie" + user_a.mkdir(parents=True) + user_b.mkdir(parents=True) + + ide_a = _windows_jetbrains_ide_dir(user_a, "PyCharm2024.1") + _write_jetbrains_plugin(ide_a, "intellij-junie", "Junie") + ide_b = _windows_jetbrains_ide_dir(user_b, "PyCharm2024.1") + _write_jetbrains_plugin(ide_b, "github-copilot", "GitHub Copilot") + + det = WindowsJunieDetector() + self.assertIsNone(det._has_junie_jetbrains_plugin(user_b)) + self.assertEqual(det._has_junie_jetbrains_plugin(user_a), str(ide_a)) class TestWindowsJunieRulesExtractor(unittest.TestCase): diff --git a/tests/test_junie_skills_extraction_linux.py b/tests/test_junie_skills_extraction_linux.py index 3dc82e34..d2627023 100644 --- a/tests/test_junie_skills_extraction_linux.py +++ b/tests/test_junie_skills_extraction_linux.py @@ -9,6 +9,7 @@ """ import json +import os import shutil import sys import tempfile @@ -19,6 +20,36 @@ sys.path.insert(0, str(Path(__file__).parent.parent)) from scripts.coding_discovery_tools.linux.junie.junie import LinuxJunieDetector + +# Module path for the central binary/plugin resolver the junie detector calls. +_UTD = "scripts.coding_discovery_tools.user_tool_detector" +_JUNIE_MOD = "scripts.coding_discovery_tools.linux.junie.junie" + + +def _make_exec(path: Path) -> Path: + """Create an executable file (a fake junie binary) at ``path``.""" + path.parent.mkdir(parents=True, exist_ok=True) + path.write_text("#!/bin/sh\n") + os.chmod(path, 0o755) + return path + + +def _write_jetbrains_plugin(ide_config_dir: Path, plugin_id: str, plugin_name: str) -> None: + """Write a realistic JetBrains plugin (``plugins//META-INF/plugin.xml``) + under an IDE config dir so the detector's real ``_get_plugins`` reads it.""" + meta_inf = ide_config_dir / "plugins" / plugin_id / "META-INF" + meta_inf.mkdir(parents=True, exist_ok=True) + (meta_inf / "plugin.xml").write_text( + f"{plugin_id}{plugin_name}", + encoding="utf-8", + ) + + +def _linux_jetbrains_ide_dir(user_home: Path, ide_folder: str) -> Path: + """Path to a per-user JetBrains IDE config dir on Linux, created on disk.""" + ide_dir = user_home / ".config" / "JetBrains" / ide_folder + ide_dir.mkdir(parents=True, exist_ok=True) + return ide_dir from scripts.coding_discovery_tools.linux.junie.junie_rules_extractor import ( LinuxJunieRulesExtractor, find_junie_project_root, @@ -52,33 +83,119 @@ def setUp(self): def tearDown(self): shutil.rmtree(self._tmp, ignore_errors=True) - def test_detects_when_junie_dir_exists(self): + def _linux_resolver_patches(self): + """Patches that pin the central binary resolver to the Linux POSIX path + deterministically (so these run on the macOS/Windows CI runners): force + ``platform.system='Linux'``, non-root, and a None ``which`` backstop.""" + return ( + patch(f"{_UTD}.platform.system", return_value="Linux"), + patch(f"{_UTD}.is_running_as_root", return_value=False), + patch(f"{_UTD}.run_command", return_value=None), + ) + + # --- residue-only: NOT detected (the FP fix / regression guard) ------- + + def test_residue_junie_dir_only_not_detected(self): + """A bare ``~/.junie`` (guidelines residue) is NOT a real install.""" (self.home / ".junie").mkdir() - with patch("scripts.coding_discovery_tools.linux.junie.junie.get_linux_user_homes", return_value=[self.home]): + p1, p2, p3 = self._linux_resolver_patches() + with patch(f"{_JUNIE_MOD}.get_linux_user_homes", return_value=[self.home]), \ + patch.object(LinuxJunieDetector, "_has_junie_jetbrains_plugin", return_value=None), \ + p1, p2, p3: + self.assertIsNone(LinuxJunieDetector().detect()) + + def test_returns_none_when_nothing_present(self): + p1, p2, p3 = self._linux_resolver_patches() + with patch(f"{_JUNIE_MOD}.get_linux_user_homes", return_value=[self.home]), \ + patch.object(LinuxJunieDetector, "_has_junie_jetbrains_plugin", return_value=None), \ + p1, p2, p3: + self.assertIsNone(LinuxJunieDetector().detect()) + + # --- real install signals: detected ----------------------------------- + + def test_detects_when_binary_present(self): + junie_bin = _make_exec(self.home / ".local" / "bin" / "junie") + p1, p2, p3 = self._linux_resolver_patches() + with patch(f"{_JUNIE_MOD}.get_linux_user_homes", return_value=[self.home]), p1, p2, p3: result = LinuxJunieDetector().detect() self.assertIsNotNone(result) self.assertEqual(result["name"], "Junie") + self.assertEqual(result["install_path"], str(junie_bin)) - def test_returns_none_when_no_junie_dir(self): - with patch("scripts.coding_discovery_tools.linux.junie.junie.get_linux_user_homes", return_value=[self.home]): - self.assertIsNone(LinuxJunieDetector().detect()) + def test_detects_when_jetbrains_plugin_present(self): + p1, p2, p3 = self._linux_resolver_patches() + with patch(f"{_JUNIE_MOD}.get_linux_user_homes", return_value=[self.home]), \ + patch.object(LinuxJunieDetector, "_has_junie_jetbrains_plugin", return_value="/cfg/PyCharm"), \ + p1, p2, p3: + result = LinuxJunieDetector().detect() + self.assertIsNotNone(result) + self.assertEqual(result["install_path"], "/cfg/PyCharm") - def test_version_read_from_config_json(self): + def test_version_read_from_config_json_when_binary_present(self): + _make_exec(self.home / ".local" / "bin" / "junie") junie_dir = self.home / ".junie" junie_dir.mkdir() (junie_dir / "config.json").write_text(json.dumps({"version": "2.1.0"})) - with patch("scripts.coding_discovery_tools.linux.junie.junie.get_linux_user_homes", return_value=[self.home]): + p1, p2, p3 = self._linux_resolver_patches() + with patch(f"{_JUNIE_MOD}.get_linux_user_homes", return_value=[self.home]), p1, p2, p3: result = LinuxJunieDetector().detect() self.assertEqual(result["version"], "2.1.0") def test_multi_user_returns_first_found(self): home2 = Path(self._tmp) / "home" / "bob" home2.mkdir(parents=True) - (self.home / ".junie").mkdir() + # Only alice has a real binary; bob has residue only -> alice is found. + alice_bin = _make_exec(self.home / ".local" / "bin" / "junie") (home2 / ".junie").mkdir() - with patch("scripts.coding_discovery_tools.linux.junie.junie.get_linux_user_homes", return_value=[self.home, home2]): + p1, p2, p3 = self._linux_resolver_patches() + with patch(f"{_JUNIE_MOD}.get_linux_user_homes", return_value=[self.home, home2]), \ + patch.object(LinuxJunieDetector, "_has_junie_jetbrains_plugin", return_value=None), \ + p1, p2, p3: result = LinuxJunieDetector().detect() - self.assertEqual(result["install_path"], str(self.home / ".junie")) + self.assertEqual(result["install_path"], str(alice_bin)) + + def test_jetbrains_plugin_helper_matches_junie(self): + """The helper matches a real on-disk JetBrains IDE with a 'Junie' plugin + and returns its per-user config path; a Copilot-only IDE yields None. + Exercises the real Linux ``.config/JetBrains`` per-user scan + + ``_get_plugins`` path (no mocked-out helper).""" + det = LinuxJunieDetector() + + ide_dir = _linux_jetbrains_ide_dir(self.home, "PyCharm2024.1") + _write_jetbrains_plugin(ide_dir, "github-copilot", "GitHub Copilot") + _write_jetbrains_plugin(ide_dir, "intellij-junie", "Junie") + self.assertEqual(det._has_junie_jetbrains_plugin(self.home), str(ide_dir)) + + def test_jetbrains_plugin_helper_no_junie_returns_none(self): + """A Copilot-only IDE (no Junie plugin) yields None.""" + det = LinuxJunieDetector() + ide_dir = _linux_jetbrains_ide_dir(self.home, "PyCharm2024.1") + _write_jetbrains_plugin(ide_dir, "github-copilot", "GitHub Copilot") + self.assertIsNone(det._has_junie_jetbrains_plugin(self.home)) + + def test_cross_user_scan_not_misattributed(self): + """REGRESSION (cross-user FP): user alice has the Junie plugin, user bob + does NOT. ``LinuxJetBrainsDetector.detect()`` iterates *every* user home + internally, so the old helper attributed alice's plugin to bob. The + per-user scoped scan reads only bob's ``.config/JetBrains`` and returns + None. FAILS before the C1 fix, PASSES after.""" + bob = Path(self._tmp) / "home" / "bob" + bob.mkdir(parents=True) + + alice_ide = _linux_jetbrains_ide_dir(self.home, "PyCharm2024.1") + _write_jetbrains_plugin(alice_ide, "intellij-junie", "Junie") + bob_ide = _linux_jetbrains_ide_dir(bob, "PyCharm2024.1") + _write_jetbrains_plugin(bob_ide, "github-copilot", "GitHub Copilot") + + det = LinuxJunieDetector() + # detect() iterates all user homes — make sure both are visible to the + # detector so the old (unscoped) code would have found alice's plugin. + with patch( + "scripts.coding_discovery_tools.linux.jetbrains.jetbrains.get_linux_user_homes", + return_value=[self.home, bob], + ): + self.assertIsNone(det._has_junie_jetbrains_plugin(bob)) + self.assertEqual(det._has_junie_jetbrains_plugin(self.home), str(alice_ide)) class TestLinuxJunieRulesExtractor(unittest.TestCase): diff --git a/tests/test_send_and_persist.py b/tests/test_send_and_persist.py index a815ab3a..352f538a 100644 --- a/tests/test_send_and_persist.py +++ b/tests/test_send_and_persist.py @@ -401,6 +401,40 @@ def test_scan_completed_event(self, _sleep): payload = json.loads(self.server.requests[0]["body"]) self.assertEqual(payload["scan_event"], "completed") + @patch("time.sleep") + @patch.object(utils_mod, "_SENTRY_DSN", "") + def test_scan_event_includes_system_user_when_provided(self, _sleep): + """A real human system_user is included in the lifecycle payload.""" + success, _retryable = send_scan_event( + self.base_url, + "test-key", + "DEVICE123", + "run-uuid-1234", + "completed", + system_user="alice", + ) + + self.assertTrue(success) + payload = json.loads(self.server.requests[0]["body"]) + self.assertEqual(payload["system_user"], "alice") + + @patch("time.sleep") + @patch.object(utils_mod, "_SENTRY_DSN", "") + def test_scan_event_omits_system_user_when_none(self, _sleep): + """system_user is omitted from the payload when None (no junk owner).""" + success, _retryable = send_scan_event( + self.base_url, + "test-key", + "DEVICE123", + "run-uuid-1234", + "in_progress", + system_user=None, + ) + + self.assertTrue(success) + payload = json.loads(self.server.requests[0]["body"]) + self.assertNotIn("system_user", payload) + @patch("time.sleep") @patch.object(utils_mod, "_SENTRY_DSN", "") def test_scan_failed_with_user_error(self, _sleep): diff --git a/tests/test_user_filtering.py b/tests/test_user_filtering.py index 42a48511..18461ab1 100644 --- a/tests/test_user_filtering.py +++ b/tests/test_user_filtering.py @@ -11,8 +11,12 @@ _fetch_dscl_batch_data, _is_human_user_macos, _parse_dscl_list_output, + _real_user_or_none, + _strip_windows_domain, get_all_users_macos, get_all_users_windows, + get_audit_user, + get_user_info, ) EMPTY_BATCH = DsclBatchData(uid_map={}, shell_map={}, hidden_set=frozenset()) @@ -206,5 +210,161 @@ def test_filters_system_dirs_via_constant(self, _mock_sys): self.assertNotIn(excluded, result) +class TestRealUserOrNone(unittest.TestCase): + """The audit helper: real human in -> trimmed name; junk/service -> None.""" + + def test_real_human_passes_through(self): + self.assertEqual(_real_user_or_none("alice"), "alice") + + def test_real_human_is_trimmed(self): + self.assertEqual(_real_user_or_none(" alice "), "alice") + + def test_root_rejected(self): + self.assertIsNone(_real_user_or_none("root")) + + def test_root_case_insensitive(self): + self.assertIsNone(_real_user_or_none("ROOT")) + + def test_macos_daemon_underscore_prefix_rejected(self): + self.assertIsNone(_real_user_or_none("_windowserver")) + + def test_system_rejected(self): + self.assertIsNone(_real_user_or_none("SYSTEM")) + + def test_windows_machine_account_trailing_dollar_rejected(self): + self.assertIsNone(_real_user_or_none("WIN-ABC$")) + + def test_linux_service_account_rejected(self): + self.assertIsNone(_real_user_or_none("www-data")) + + def test_other_linux_service_accounts_rejected(self): + for name in ("postgres", "nobody", "daemon", "nginx", "mysql"): + self.assertIsNone(_real_user_or_none(name), name) + + def test_unknown_literal_rejected(self): + self.assertIsNone(_real_user_or_none("unknown")) + + def test_empty_string_rejected(self): + self.assertIsNone(_real_user_or_none("")) + + def test_whitespace_only_rejected(self): + self.assertIsNone(_real_user_or_none(" ")) + + def test_none_input_rejected(self): + self.assertIsNone(_real_user_or_none(None)) + + def test_windows_builtin_service_identities_rejected(self): + for name in ( + "NT AUTHORITY\\LOCAL SERVICE", + "NT AUTHORITY\\NETWORK SERVICE", + "NT AUTHORITY\\SYSTEM", + "nt authority\\local service", # case-insensitive domain + "NT SERVICE\\MSSQLSERVER", # any NT SERVICE principal + ): + self.assertIsNone(_real_user_or_none(name), name) + + def test_bare_windows_builtins_rejected(self): + for name in ("Administrator", "LocalSystem", "LOCAL SERVICE", "Network Service"): + self.assertIsNone(_real_user_or_none(name), name) + + def test_domain_qualified_human_is_stripped_self_contained(self): + # Self-contained: strips DOMAIN\\ even if a caller skips get_user_info. + self.assertEqual(_real_user_or_none("CORP\\alice"), "alice") + + +class TestGetAuditUser(unittest.TestCase): + """get_audit_user() returns the real human OR None (never junk). + + These exercise the non-Windows resolution path (get_user_info()), so they + pin platform.system to "Darwin" — otherwise on a Windows CI runner + get_audit_user() takes the raw-whoami branch and the runner's real account + leaks past the get_user_info() patch. The Windows raw-whoami branch is + covered by TestGetAuditUserWindowsService. + """ + + @patch("scripts.coding_discovery_tools.utils.platform.system", return_value="Darwin") + @patch("scripts.coding_discovery_tools.utils.get_user_info", return_value="alice") + def test_returns_real_human(self, _mock_user, _mock_platform): + self.assertEqual(get_audit_user(), "alice") + + @patch("scripts.coding_discovery_tools.utils.platform.system", return_value="Darwin") + @patch("scripts.coding_discovery_tools.utils.get_user_info", return_value="root") + def test_root_maps_to_none(self, _mock_user, _mock_platform): + self.assertIsNone(get_audit_user()) + + @patch("scripts.coding_discovery_tools.utils.platform.system", return_value="Darwin") + @patch("scripts.coding_discovery_tools.utils.get_user_info", return_value="unknown") + def test_unknown_maps_to_none(self, _mock_user, _mock_platform): + self.assertIsNone(get_audit_user()) + + +class TestGetAuditUserWindowsService(unittest.TestCase): + """On Windows, get_audit_user must apply the domain-based rejection to the + RAW whoami output. get_user_info() pre-strips the DOMAIN\\ prefix, so a + service principal like NT SERVICE\\MSSQLSERVER would otherwise leak through + as the bare, non-denylisted name 'MSSQLSERVER'. + """ + + @patch("scripts.coding_discovery_tools.utils.run_command") + @patch("scripts.coding_discovery_tools.utils.platform.system", return_value="Windows") + def test_nt_service_named_accounts_rejected(self, _sys, mock_cmd): + for raw in ( + "NT SERVICE\\MSSQLSERVER", + "NT SERVICE\\WinDefend", + "NT AUTHORITY\\SYSTEM", + "NT AUTHORITY\\LOCAL SERVICE", + ): + mock_cmd.return_value = raw + self.assertIsNone(get_audit_user(), raw) + + @patch("scripts.coding_discovery_tools.utils.run_command", return_value="CORP\\alice") + @patch("scripts.coding_discovery_tools.utils.platform.system", return_value="Windows") + def test_domain_qualified_human_passes(self, _sys, _cmd): + self.assertEqual(get_audit_user(), "alice") + + @patch("scripts.coding_discovery_tools.utils.run_command", return_value=None) + @patch("scripts.coding_discovery_tools.utils.platform.system", return_value="Windows") + def test_empty_whoami_falls_back_to_get_user_info(self, _sys, _cmd): + with patch( + "scripts.coding_discovery_tools.utils.get_user_info", return_value="bob" + ): + self.assertEqual(get_audit_user(), "bob") + + +class TestGetUserInfoGuaranteedString(unittest.TestCase): + """get_user_info() must always return a usable string for /Users paths.""" + + @patch("scripts.coding_discovery_tools.utils.platform.system", return_value="Darwin") + @patch("scripts.coding_discovery_tools.utils.run_command", return_value="alice") + def test_returns_string_for_normal_user(self, _cmd, _sys): + result = get_user_info() + self.assertEqual(result, "alice") + self.assertIsInstance(result, str) + + @patch("scripts.coding_discovery_tools.utils.platform.system", return_value="Darwin") + @patch("scripts.coding_discovery_tools.utils.run_command", return_value=None) + def test_falls_back_to_unknown_never_none(self, _cmd, _sys): + # When every resolution method yields nothing, the /Users-path resolver + # must still return a non-None string (so callers never build /Users/None). + # getpass is imported locally inside get_user_info, so patch the stdlib module. + with patch("getpass.getuser", return_value=""): + result = get_user_info() + self.assertIsNotNone(result) + self.assertEqual(result, "unknown") + + +class TestStripWindowsDomain(unittest.TestCase): + """Windows DOMAIN\\username parse yields the bare username.""" + + def test_domain_prefix_stripped(self): + self.assertEqual(_strip_windows_domain("CORP\\bob"), "bob") + + def test_machine_prefix_stripped(self): + self.assertEqual(_strip_windows_domain("WIN-ABC\\alice"), "alice") + + def test_no_backslash_returned_unchanged(self): + self.assertEqual(_strip_windows_domain("alice"), "alice") + + if __name__ == "__main__": unittest.main()