diff --git a/scripts/coding_discovery_tools/ai_tools_discovery.py b/scripts/coding_discovery_tools/ai_tools_discovery.py index a9a0c94..e039ec5 100644 --- a/scripts/coding_discovery_tools/ai_tools_discovery.py +++ b/scripts/coding_discovery_tools/ai_tools_discovery.py @@ -370,7 +370,7 @@ def get_device_id(self) -> str: """ return self._device_id_extractor.extract_device_id() - def detect_all_tools(self, user_home: Optional[Path] = None) -> List[Dict]: + def detect_all_tools(self, user_home: Optional[Path] = None, failures: Optional[set] = None) -> List[Dict]: """ Detect all supported AI tools. @@ -401,6 +401,9 @@ def detect_all_tools(self, user_home: Optional[Path] = None) -> List[Dict]: except Exception as e: logger.warning(f"Error detecting {detector.tool_name}: {e}") report_to_sentry(e, {"phase": "detect", "tool_name": detector.tool_name}, level="warning") + # Detection errored: record the tool so the caller can keep it (presence unknown != uninstalled). + if failures is not None: + failures.add(detector.tool_name) return tools @@ -2802,6 +2805,11 @@ def _on_term_signal(signum, _frame) -> None: # Track failed reports for persistence failed_reports = [] + # (home_user, tool_name) detected present this run; backend set-diffs it in "completed" to prune the rest. + scanned_manifest = set() + # Detection/extraction errors this run; if non-empty the scan is marked unclean so the backend skips pruning. + incomplete_reasons = [] + # --- Drain pending reports from previous run --- with time_step("drain_pending_queue", "queue"): pending = load_pending_reports() @@ -2896,7 +2904,20 @@ def _on_term_signal(signum, _frame) -> None: user_home = Path.home() logger.info(f" Detecting tools for user: {user} (home: {user_home})") with time_step("detect_tools", "detect"): - user_tools = detector.detect_all_tools(user_home=user_home) + user_detect_failures = set() + user_tools = detector.detect_all_tools( + user_home=user_home, failures=user_detect_failures + ) + # Record per-user presence at detection: a tool this user actually has stays in the + # manifest even if reading/uploading it later errors (a read failure isn't an + # uninstall), and only users who detected the tool get an entry (no phantom ownership). + for detected in user_tools: + scanned_manifest.add((user, detected.get('name', 'Unknown'))) + # A detector ERRORED -> presence unknown. detector.tool_name is an umbrella label + # (e.g. "GitHub Copilot"), not the concrete row name ("GitHub Copilot (VS Code)"), + # so it can't safely target the manifest. Skip pruning this run instead. + if user_detect_failures: + incomplete_reasons.append(f"detector error for user {user}") if user_tools: logger.info(f" Found {len(user_tools)} tool(s) for {user}:") @@ -2959,6 +2980,13 @@ def _on_term_signal(signum, _frame) -> None: user_home = Path.home() try: + # Only report a tool for users who actually detected it (the manifest is + # the per-user presence set). all_tools is deduped globally, so without this + # a user-scoped tool one user has would otherwise be reported for every + # enumerated user — a phantom install the backend could never prune. + if (user_name, tool_name) not in scanned_manifest: + continue + # Filter projects to only include this user's projects with time_step("filter_projects", "process"): tool_filtered = detector.filter_tool_projects_by_user(tool_with_projects, user_home) @@ -2974,6 +3002,8 @@ def _on_term_signal(signum, _frame) -> None: f"{tool_filtered.get('_config_path') or tool_filtered.get('install_path')!r} " f"not owned by this user and no per-user data" ) + # Detected globally but not owned by this user -> drop the presence entry. + scanned_manifest.discard((user_name, tool_name)) continue # Ownership gate (Augment surfaces): same ~/.augment-keyed @@ -2986,6 +3016,8 @@ def _on_term_signal(signum, _frame) -> None: f"{tool_filtered.get('_config_path') or tool_filtered.get('install_path')!r} " f"not owned by this user and no per-user data" ) + # Detected globally but not owned by this user -> drop the presence entry. + scanned_manifest.discard((user_name, tool_name)) continue # Detect subscription plan for Claude Code @@ -3197,6 +3229,8 @@ def _on_term_signal(signum, _frame) -> None: except Exception as e: logger.error(f"Error processing tool {tool_name}: {e}", exc_info=True) + # Detected tools are already in the manifest from the detection phase, so a + # device-wide extraction failure here cannot drop a live tool (no re-add needed). report_to_sentry(e, {**sentry_ctx, "phase": "process_tool", "tool_name": tool_name}, level="warning") logger.info("") @@ -3233,6 +3267,8 @@ def _on_term_signal(signum, _frame) -> None: "os": platform.system(), "tool_count": len(tools), "user_count": len(all_users), + "manifest_size": len(scanned_manifest), + "scan_incomplete": bool(incomplete_reasons), "python_version": f"{sys.version_info.major}.{sys.version_info.minor}", "script_version": SCRIPT_VERSION, }, @@ -3244,11 +3280,30 @@ def _on_term_signal(signum, _frame) -> None: except Exception as metrics_err: logger.debug(f"Building/sending discovery metrics failed: {metrics_err}") - # Send scan completed event AFTER all scanning + # Detection/extraction hit an error this run -> mark the scan unclean BEFORE the + # completed event so the backend's reconcile skips pruning (a missing tool may mean + # "couldn't read", not "uninstalled"). Sent first so scan_error is persisted before + # the completed event dispatches the reconcile. + if incomplete_reasons: + send_scan_event( + args.domain, args.api_key, device_id, run_id, "failed", + args.app_name, + scan_error={ + "error_type": "ScanIncomplete", + "message": "; ".join(incomplete_reasons[:20]), + "timestamp": datetime.utcnow().isoformat() + "Z", + }, + sentry_context=sentry_ctx, system_user=system_user, + ) + + # only the completed event carries manifest + covered users (backend prunes from them) logger.info("Sending scan completed event...") + # manifest = (home_user, tool_name) detected present; backend set-diffs it to prune the rest. + manifest = [{"home_user": hu, "tool_name": tn} for hu, tn in sorted(scanned_manifest)] success, _ = send_scan_event( args.domain, args.api_key, device_id, run_id, "completed", - args.app_name, sentry_context=sentry_ctx, system_user=system_user + args.app_name, sentry_context=sentry_ctx, system_user=system_user, + manifest=manifest, covered_home_users=all_users, ) if success: logger.info("✓ Scan completed event sent successfully") diff --git a/scripts/coding_discovery_tools/utils.py b/scripts/coding_discovery_tools/utils.py index 4ae1878..06d0993 100644 --- a/scripts/coding_discovery_tools/utils.py +++ b/scripts/coding_discovery_tools/utils.py @@ -716,6 +716,8 @@ def send_scan_event( scan_error: Optional[Dict] = None, sentry_context: Optional[Dict] = None, system_user: Optional[str] = None, + manifest: Optional[List[Dict]] = None, + covered_home_users: Optional[List[str]] = None, ) -> Tuple[bool, bool]: """ Send scan lifecycle event to backend (in_progress, completed, failed). @@ -733,6 +735,10 @@ def send_scan_event( system_user: Optional real human user running the scan (or None). Used by the backend to attribute empty machines. MUST be a real human or None (see ``get_audit_user``), never a junk/service identity. + manifest: Optional [{"home_user", "tool_name"}] seen this run; sent only on + "completed" so the backend set-diffs it to prune the rest. + covered_home_users: Optional home users covered; sent only on "completed" to + bound the prune scope. Returns: Tuple of (success, retryable): success=True if sent, retryable=True if caller should queue @@ -755,6 +761,12 @@ def send_scan_event( if scan_error: payload["scan_error"] = scan_error + if manifest is not None: + payload["manifest"] = manifest + + if covered_home_users is not None: + payload["covered_home_users"] = covered_home_users + return send_report_to_backend( backend_url, api_key, diff --git a/tests/test_scan_completed_manifest.py b/tests/test_scan_completed_manifest.py new file mode 100644 index 0000000..23da1cc --- /dev/null +++ b/tests/test_scan_completed_manifest.py @@ -0,0 +1,511 @@ +""" +Tests for WEB-4679: the scan "completed" event carries a manifest of the +(home_user, tool_name) pairs that were actually read this run, plus the full +set of enumerated home users, so the backend can set-diff and soft-delete +(prune) tools that are no longer installed. + +Correctness property under test (the load-bearing one): the manifest is built +from per-user DETECTION/presence, not extraction success. A tool that was detected +present is recorded even if reading its config/rules errored, so a read failure can +never be mistaken for an uninstall (and never fail-closes the manifest to None). +Only users who actually detected a tool get an entry (no phantom ownership). A tool +whose DETECTOR errored marks the scan unclean (the backend skips pruning) rather than +recording a name, since detector.tool_name is an umbrella label, not the concrete row. + +Seams (mirroring the existing suite in test_send_and_persist.py / +test_discovery_flow.py): + * TestSendScanEventManifest -> utils.send_scan_event against a real + localhost HTTP server (records POST bodies). Covers the payload-shaping + contract + backward compatibility. + * TestCompletedEventManifestCLI -> main() via subprocess against a real + localhost HTTP server. Covers the end-to-end completed-event payload and + that in_progress/failed events do NOT carry a manifest. + * TestManifestExcludesErroredReads -> main() driven IN-PROCESS with a mock + detector so a per-tool read error / hash-match / success can be forced + deterministically. This is the only seam where the errored-read branch can + be isolated, because the per-(tool, user) loop body lives inline in main(). + +Only external environment is mocked: HTTP backend (real server on localhost), +HOME (so the subprocess gets an isolated discovery lock/cache and never exits +early on a live lock from another run), _SENTRY_DSN (no real Sentry calls), +discovery_cache / detector (in-process seam only). No network is required. +""" + +import json +import os +import subprocess +import sys +import tempfile +import threading +import unittest +from http.server import HTTPServer, BaseHTTPRequestHandler +from pathlib import Path +from unittest.mock import Mock, patch + +import scripts.coding_discovery_tools.utils as utils_mod +from scripts.coding_discovery_tools.utils import send_scan_event + +REPO_ROOT = Path(__file__).resolve().parent.parent + + +class _RecordingHandler(BaseHTTPRequestHandler): + """Records every POST body (parsed as JSON) and returns 200.""" + + def do_POST(self): + length = int(self.headers.get("Content-Length", 0)) + body = self.rfile.read(length) + try: + self.server.requests.append(json.loads(body)) + except ValueError: + self.server.requests.append({"_raw": body.decode("utf-8", "replace")}) + + self.send_response(200) + self.send_header("Content-Type", "application/json") + self.end_headers() + self.wfile.write(b'{"ok": true}') + + def log_message(self, format, *args): + pass # suppress server logs + + +class _ServerTestCase(unittest.TestCase): + """Spins up one recording HTTP server on localhost for the whole class.""" + + @classmethod + def setUpClass(cls): + cls.server = HTTPServer(("127.0.0.1", 0), _RecordingHandler) + cls.server.requests = [] + cls.port = cls.server.server_address[1] + cls.base_url = f"http://127.0.0.1:{cls.port}" + cls.thread = threading.Thread(target=cls.server.serve_forever) + cls.thread.daemon = True + cls.thread.start() + + @classmethod + def tearDownClass(cls): + cls.server.shutdown() + cls.thread.join(timeout=5) + + def setUp(self): + self.server.requests.clear() + + +class TestSendScanEventManifest(_ServerTestCase): + """utils.send_scan_event seam: manifest + covered_home_users are inserted + into the POST body only when provided (backward compatible).""" + + @patch("time.sleep") + @patch.object(utils_mod, "_SENTRY_DSN", "") + def test_completed_event_carries_manifest_and_covered_users(self, _sleep): + manifest = [{"home_user": "alice", "tool_name": "Cursor"}] + covered = ["alice", "bob"] + + success, _retryable = send_scan_event( + self.base_url, + "test-key", + "DEV-1", + "run-1", + "completed", + manifest=manifest, + covered_home_users=covered, + ) + + self.assertTrue(success) + self.assertEqual(len(self.server.requests), 1) + body = self.server.requests[0] + # Exact passthrough of both new fields. + self.assertEqual(body["scan_event"], "completed") + self.assertEqual(body["manifest"], manifest) + self.assertEqual(body["covered_home_users"], covered) + + @patch("time.sleep") + @patch.object(utils_mod, "_SENTRY_DSN", "") + def test_legacy_call_omits_both_keys(self, _sleep): + # No manifest / covered_home_users supplied -> neither key may appear + # in the payload (backward compatibility with the old call sites). + success, _retryable = send_scan_event( + self.base_url, "test-key", "DEV-1", "run-1", "in_progress" + ) + + self.assertTrue(success) + self.assertEqual(len(self.server.requests), 1) + body = self.server.requests[0] + self.assertNotIn("manifest", body) + self.assertNotIn("covered_home_users", body) + + @patch("time.sleep") + @patch.object(utils_mod, "_SENTRY_DSN", "") + def test_empty_manifest_still_sent(self, _sleep): + # An empty manifest is meaningfully different from "no manifest": it + # tells the backend "this scope had zero readable tools" (prune-all + # within scope). It must be sent (key present), since the production + # guard is `is not None`, not truthiness. + success, _retryable = send_scan_event( + self.base_url, + "test-key", + "DEV-1", + "run-1", + "completed", + manifest=[], + covered_home_users=["alice"], + ) + + self.assertTrue(success) + body = self.server.requests[0] + self.assertIn("manifest", body) + self.assertEqual(body["manifest"], []) + self.assertEqual(body["covered_home_users"], ["alice"]) + + +class TestCompletedEventManifestCLI(_ServerTestCase): + """End-to-end via main() subprocess: the completed event carries a + well-formed manifest + covered_home_users; lifecycle events that are not + "completed" carry neither.""" + + def _run_cli(self, timeout=600): + env = os.environ.copy() + # Isolate the discovery state dir (lock + cache) under a throwaway HOME + # so the run never exits early on a live lock left by another process, + # and starts from a cold cache (deterministic hash-match behavior). + env["HOME"] = tempfile.mkdtemp(prefix="web4679_home_") + return subprocess.run( + [ + sys.executable, + "scripts/coding_discovery_tools/ai_tools_discovery.py", + "--api-key", + "test-key-000000", + "--domain", + self.base_url, + ], + cwd=str(REPO_ROOT), + capture_output=True, + text=True, + timeout=timeout, + env=env, + ) + + def test_completed_event_has_manifest_and_covered_users(self): + result = self._run_cli() + self.assertEqual(result.returncode, 0, f"stderr: {result.stderr[-2000:]}") + + completed = [ + r for r in self.server.requests if r.get("scan_event") == "completed" + ] + self.assertEqual(len(completed), 1, "expected exactly one completed event") + body = completed[0] + + # manifest: list of {home_user, tool_name} objects. + self.assertIn("manifest", body) + self.assertIsInstance(body["manifest"], list) + for entry in body["manifest"]: + self.assertIsInstance(entry, dict) + self.assertIn("home_user", entry) + self.assertIn("tool_name", entry) + self.assertIsInstance(entry["home_user"], str) + self.assertIsInstance(entry["tool_name"], str) + + # covered_home_users: list of user names (strings). + self.assertIn("covered_home_users", body) + self.assertIsInstance(body["covered_home_users"], list) + for user in body["covered_home_users"]: + self.assertIsInstance(user, str) + + def test_non_completed_events_have_no_manifest(self): + result = self._run_cli() + self.assertEqual(result.returncode, 0, f"stderr: {result.stderr[-2000:]}") + + # An in_progress event is always sent before scanning. + non_completed = [ + r + for r in self.server.requests + if r.get("scan_event") in ("in_progress", "failed") + ] + self.assertGreaterEqual( + len(non_completed), 1, "expected at least an in_progress event" + ) + for body in non_completed: + self.assertNotIn( + "manifest", body, f"{body.get('scan_event')} must not carry a manifest" + ) + self.assertNotIn( + "covered_home_users", + body, + f"{body.get('scan_event')} must not carry covered_home_users", + ) + + def test_covered_home_users_matches_full_enumeration_not_manifest(self): + # covered_home_users is sourced from the full user enumeration + # (all_users), NOT only from users that produced manifest entries. So a + # user who contributed zero manifest entries still appears in + # covered_home_users. We assert this invariant without needing to force + # a specific zero-tool user on the host: every home_user that appears in + # the manifest must also appear in covered_home_users, and + # covered_home_users must be a superset of the manifest's user set. + result = self._run_cli() + self.assertEqual(result.returncode, 0, f"stderr: {result.stderr[-2000:]}") + + completed = [ + r for r in self.server.requests if r.get("scan_event") == "completed" + ] + self.assertEqual(len(completed), 1) + body = completed[0] + + covered = set(body["covered_home_users"]) + manifest_users = {e["home_user"] for e in body["manifest"]} + # The enumerated set must cover every user that yielded a manifest entry. + self.assertTrue( + manifest_users.issubset(covered), + f"manifest users {manifest_users} not all in covered {covered}", + ) + + +class TestManifestFromPresence(unittest.TestCase): + """The load-bearing property: the manifest is built from per-user DETECTION/presence, + not extraction success. A detected tool is recorded even if reading its config errored + (so a read failure is never mistaken for an uninstall and never nulls the manifest); + only users who detected the tool get an entry; and a DETECTOR error marks the scan + unclean instead of recording an umbrella name. + + Seam: main() driven in-process with a mocked detector + discovery_cache and a + captured send_scan_event. The per-(tool, user) loop body lives inline inside main(), + so this is the only seam where the success / hash-match / read-error branches can be + forced deterministically. No production code was changed to enable this. + """ + + def setUp(self): + import scripts.coding_discovery_tools.ai_tools_discovery as adm + + self.adm = adm + self.argv = [ + "ai_tools_discovery.py", + "--api-key", + "k", + "--domain", + "http://127.0.0.1:1", + ] + + @staticmethod + def _make_tool(name): + # Distinct install_path per tool so the (name:path) dedup keeps all three. + return {"name": name, "version": "1.0", "install_path": f"/opt/{name}", "projects": []} + + def _run_main_capture_manifest(self, send_report_result=(True, False), filter_error=None, detector_failure=None): + """Run main() with three crafted tools for one user: + ToolOK -> hash mismatch -> send path + ToolHashMatch -> hash match -> dedup short-circuit + ToolErr -> filter raises (read/extraction error) + All three are DETECTED, so all three must appear in the manifest (presence-based). + send_report_result controls send_report_to_backend's (success, retryable). + detector_failure: if set, detect_all_tools reports that tool_name via its `failures` + set (a detector error) — it must also appear in the manifest though it isn't "found". + Returns the captured (manifest, covered_home_users) from the completed send_scan_event. + """ + adm = self.adm + + tool_ok = self._make_tool("ToolOK") + tool_hm = self._make_tool("ToolHashMatch") + tool_err = self._make_tool("ToolErr") + + detector = Mock() + detector.get_device_id.return_value = "dev-xyz" + + def _detect_all(user_home=None, failures=None): + # A detector error surfaces via the `failures` set (presence unknown -> kept in manifest). + if detector_failure and failures is not None: + failures.add(detector_failure) + return [tool_ok, tool_hm, tool_err] + detector.detect_all_tools.side_effect = _detect_all + detector._set_canonical_vscode_copilot.return_value = None + detector.process_single_tool.side_effect = lambda t: t + + def _filter(tool_with_projects, _user_home): + if tool_with_projects["name"] == "ToolErr": + raise (filter_error if filter_error is not None else PermissionError("simulated read failure")) + return tool_with_projects + + detector.filter_tool_projects_by_user.side_effect = _filter + detector.generate_single_tool_report.side_effect = ( + lambda tool, device_id, home_user, system_user=None, run_id=None: { + "tools": [tool] + } + ) + + # Hash is derived from the tool name; the cache "matches" only for + # ToolHashMatch, forcing ToolOK down the send path and ToolHashMatch + # down the dedup short-circuit. + def _hash(tool_dict): + return "hash-" + tool_dict["name"] + + def _cached(tool_name, _user_name): + return "hash-ToolHashMatch" if tool_name == "ToolHashMatch" else None + + dc = Mock() + dc.acquire_lock.return_value = "acquired" + dc.heartbeat_start.return_value = Mock() + dc.get_cached_hash.side_effect = _cached + dc.update_tool.return_value = None + dc.UNBOUND_DIR = "/tmp/unbound-test" + dc.last_lock_error = None + + captured = {} + + def _send_scan_event(domain, api_key, device_id, run_id, scan_event, app_name=None, **kw): + if scan_event == "completed": + captured["manifest"] = kw.get("manifest") + captured["covered_home_users"] = kw.get("covered_home_users") + elif scan_event == "failed": + captured.setdefault("failed_events", []).append(kw.get("scan_error")) + return (True, None) + + with patch.object(adm.platform, "system", return_value="Darwin"), \ + patch.object(adm, "AIToolsDetector", return_value=detector), \ + patch.object(adm, "discovery_cache", dc), \ + patch.object(adm, "get_all_users_macos", return_value=["alice"]), \ + patch.object(adm, "compute_payload_hash", side_effect=_hash), \ + patch.object(adm, "send_report_to_backend", return_value=send_report_result), \ + patch.object(adm, "send_scan_event", side_effect=_send_scan_event), \ + patch.object(adm, "send_discovery_metrics", Mock()), \ + patch.object(adm, "load_pending_reports", return_value=[]), \ + patch.object(adm, "save_failed_reports", Mock()), \ + patch.object(adm, "report_to_sentry", Mock()), \ + patch.object(utils_mod, "_SENTRY_DSN", ""), \ + patch.object(sys, "argv", self.argv): + try: + adm.main() + except SystemExit: + pass + + return captured + + def test_read_error_keeps_tool_in_manifest(self): + # A tool whose config read ERRORS is still detected present -> stays in the manifest (a read failure isn't an uninstall). + captured = self._run_main_capture_manifest() + + self.assertIn("manifest", captured, "completed event was never sent") + self.assertIsNotNone(captured["manifest"], "a read error must NOT fail-close the manifest to None") + pairs = {(e["home_user"], e["tool_name"]) for e in captured["manifest"]} + + self.assertIn(("alice", "ToolOK"), pairs) # sent path + self.assertIn(("alice", "ToolHashMatch"), pairs) # hash-match (unchanged, still installed) + self.assertIn(("alice", "ToolErr"), pairs) # read errored but DETECTED -> kept + self.assertEqual(len(captured["manifest"]), 3) + + def test_upload_failure_keeps_tool_in_manifest(self): + # Presence is recorded before extraction, so a transient UPLOAD failure still keeps the tool in the manifest. + captured = self._run_main_capture_manifest(send_report_result=(False, True)) + + self.assertIn("manifest", captured, "completed event was never sent") + pairs = {(e["home_user"], e["tool_name"]) for e in captured["manifest"]} + # All three detected tools present, regardless of upload outcome / read error. + self.assertEqual( + pairs, + {("alice", "ToolOK"), ("alice", "ToolHashMatch"), ("alice", "ToolErr")}, + ) + + def test_covered_home_users_includes_user_with_no_manifest_entry(self): + # covered_home_users must come from the full enumeration (all_users), + # so even though "alice" is the only user and one of her tools errored, + # she still appears. More importantly, this proves covered_home_users is + # not derived from the manifest: a user whose every tool errored would + # still be covered (bounding the prune scope correctly). + captured = self._run_main_capture_manifest() + self.assertEqual(captured.get("covered_home_users"), ["alice"]) + + def test_generic_read_error_does_not_fail_close(self): + # Regression: a generic read error used to fail-close the manifest to None (blocking all pruning); it must no longer. + captured = self._run_main_capture_manifest( + filter_error=RuntimeError("simulated generic read failure") + ) + self.assertIn("manifest", captured, "completed event was never sent") + self.assertIsNotNone( + captured["manifest"], + "a generic read error must NOT fail-close the manifest to None", + ) + pairs = {(e["home_user"], e["tool_name"]) for e in captured["manifest"]} + self.assertIn(("alice", "ToolErr"), pairs) + self.assertEqual(len(captured["manifest"]), 3) + self.assertEqual(captured.get("covered_home_users"), ["alice"]) + + def test_detector_error_marks_scan_unclean(self): + # A DETECTOR error means presence is unknown, and detector.tool_name is only an umbrella + # label (e.g. "GitHub Copilot") that can't safely target the concrete install rows. So the + # run is marked unclean (a "failed" event) and the umbrella name is NOT added to the + # manifest -> the backend skips pruning rather than prune a real surface row. + captured = self._run_main_capture_manifest(detector_failure="ToolGhost") + pairs = {(e["home_user"], e["tool_name"]) for e in captured["manifest"]} + self.assertNotIn(("alice", "ToolGhost"), pairs) + # Only the three actually-detected tools remain. + self.assertEqual(len(captured["manifest"]), 3) + self.assertTrue( + captured.get("failed_events"), + "a detector error must mark the scan unclean so the backend skips pruning this run", + ) + + def test_per_user_detection_no_phantom_ownership(self): + # Phantom-ownership regression: all_tools is deduped globally, so a user-scoped tool one + # user has must NOT be attributed to a co-resident user who did not detect it. Alice has + # ToolA, Bob has ToolB; the manifest must contain exactly each user's own tool. + adm = self.adm + tool_a = self._make_tool("ToolA") + tool_b = self._make_tool("ToolB") + + detector = Mock() + detector.get_device_id.return_value = "dev-xyz" + + def _detect_all(user_home=None, failures=None): + home = str(user_home or "") + if home.endswith("alice"): + return [tool_a] + if home.endswith("bob"): + return [tool_b] + return [] + detector.detect_all_tools.side_effect = _detect_all + detector._set_canonical_vscode_copilot.return_value = None + detector._set_canonical_augment_surface.return_value = None + detector.process_single_tool.side_effect = lambda t: t + detector.filter_tool_projects_by_user.side_effect = lambda t, _h: t + detector.generate_single_tool_report.side_effect = ( + lambda tool, device_id, home_user, system_user=None, run_id=None: {"tools": [tool]} + ) + + dc = Mock() + dc.acquire_lock.return_value = "acquired" + dc.heartbeat_start.return_value = Mock() + dc.get_cached_hash.return_value = None + dc.update_tool.return_value = None + dc.UNBOUND_DIR = "/tmp/unbound-test" + dc.last_lock_error = None + + captured = {} + + def _send_scan_event(domain, api_key, device_id, run_id, scan_event, app_name=None, **kw): + if scan_event == "completed": + captured["manifest"] = kw.get("manifest") + return (True, None) + + with patch.object(adm.platform, "system", return_value="Darwin"), \ + patch.object(adm, "AIToolsDetector", return_value=detector), \ + patch.object(adm, "discovery_cache", dc), \ + patch.object(adm, "get_all_users_macos", return_value=["alice", "bob"]), \ + patch.object(adm, "compute_payload_hash", side_effect=lambda t: "h-" + t["name"]), \ + patch.object(adm, "send_report_to_backend", return_value=(True, False)), \ + patch.object(adm, "send_scan_event", side_effect=_send_scan_event), \ + patch.object(adm, "send_discovery_metrics", Mock()), \ + patch.object(adm, "load_pending_reports", return_value=[]), \ + patch.object(adm, "save_failed_reports", Mock()), \ + patch.object(adm, "report_to_sentry", Mock()), \ + patch.object(utils_mod, "_SENTRY_DSN", ""), \ + patch.object(sys, "argv", self.argv): + try: + adm.main() + except SystemExit: + pass + + pairs = {(e["home_user"], e["tool_name"]) for e in captured["manifest"]} + self.assertEqual(pairs, {("alice", "ToolA"), ("bob", "ToolB")}) + self.assertNotIn(("bob", "ToolA"), pairs) + self.assertNotIn(("alice", "ToolB"), pairs) + + +if __name__ == "__main__": + unittest.main()