|
| 1 | +#!/usr/bin/env python3 |
| 2 | +""" |
| 3 | +File watcher for auto-importing test results to MongoDB. |
| 4 | +
|
| 5 | +Usage: |
| 6 | + # Start watching for new files |
| 7 | + python -m db.watcher |
| 8 | +
|
| 9 | + # One-time scan (import existing files) |
| 10 | + python -m db.watcher --scan |
| 11 | +""" |
| 12 | + |
| 13 | +import argparse |
| 14 | +import json |
| 15 | +import logging |
| 16 | +import signal |
| 17 | +import sys |
| 18 | +import time |
| 19 | +from pathlib import Path |
| 20 | +from typing import Dict, List, Set |
| 21 | + |
| 22 | +from watchdog.observers import Observer |
| 23 | +from watchdog.events import FileSystemEventHandler |
| 24 | + |
| 25 | +from .client import MongoDBClient |
| 26 | +from .config import DatabaseConfig |
| 27 | +from .importer import DataImporter |
| 28 | +from .repository import DispatcherSummaryRepository, TestRunRepository |
| 29 | +from .utils import is_dispatcher_summary_file, should_skip_file |
| 30 | + |
| 31 | +logger = logging.getLogger(__name__) |
| 32 | + |
| 33 | + |
| 34 | +class Watcher: |
| 35 | + """Watch directories and auto-import test results to MongoDB. |
| 36 | +
|
| 37 | + - output_dir: imports test result files to test_runs collection |
| 38 | + - summary_dir: stores summary metadata to dispatcher_summaries collection |
| 39 | + """ |
| 40 | + |
| 41 | + def __init__( |
| 42 | + self, |
| 43 | + output_dir: Path = None, |
| 44 | + summary_dir: Path = None, |
| 45 | + mongo_uri: str = None, |
| 46 | + ): |
| 47 | + self.output_dir = Path(output_dir) if output_dir else Path("./output") |
| 48 | + self.summary_dir = Path(summary_dir) if summary_dir else Path("./summary_output") |
| 49 | + |
| 50 | + # Initialize MongoDB connection |
| 51 | + config = DatabaseConfig.from_env() |
| 52 | + if mongo_uri: |
| 53 | + config.mongo_uri = mongo_uri |
| 54 | + |
| 55 | + client = MongoDBClient(config) |
| 56 | + if not client.health_check(): |
| 57 | + raise ConnectionError(f"Cannot connect to MongoDB: {config.mongo_uri}") |
| 58 | + |
| 59 | + # Repositories |
| 60 | + self.test_run_repo = TestRunRepository( |
| 61 | + client.get_collection(config.collection_name) |
| 62 | + ) |
| 63 | + self.summary_repo = DispatcherSummaryRepository( |
| 64 | + client.get_collection(config.summary_collection_name) |
| 65 | + ) |
| 66 | + self.importer = DataImporter(self.test_run_repo) |
| 67 | + |
| 68 | + self._observer = None |
| 69 | + self._processed: Set[str] = set() |
| 70 | + |
| 71 | + def scan(self) -> Dict[str, Dict[str, List[str]]]: |
| 72 | + """ |
| 73 | + One-time scan and import all existing files. |
| 74 | +
|
| 75 | + Returns: |
| 76 | + { |
| 77 | + "output": {"imported": [...], "skipped": [...], "failed": [...]}, |
| 78 | + "summary": {"imported": [...], "skipped": [...], "failed": [...]} |
| 79 | + } |
| 80 | + """ |
| 81 | + result = { |
| 82 | + "output": {"imported": [], "skipped": [], "failed": []}, |
| 83 | + "summary": {"imported": [], "skipped": [], "failed": []}, |
| 84 | + } |
| 85 | + |
| 86 | + # Import test results from output directory |
| 87 | + if self.output_dir.exists(): |
| 88 | + logger.info(f"Scanning output: {self.output_dir}") |
| 89 | + output_result = self.importer.import_directory( |
| 90 | + self.output_dir, recursive=True, include_summaries=False |
| 91 | + ) |
| 92 | + result["output"]["imported"].extend(output_result.get("imported", [])) |
| 93 | + result["output"]["skipped"].extend(output_result.get("skipped", [])) |
| 94 | + result["output"]["failed"].extend(output_result.get("failed", [])) |
| 95 | + logger.info( |
| 96 | + f"Output: {len(result['output']['imported'])} imported, " |
| 97 | + f"{len(result['output']['skipped'])} skipped, " |
| 98 | + f"{len(result['output']['failed'])} failed" |
| 99 | + ) |
| 100 | + |
| 101 | + # Store summary metadata (not re-import test results) |
| 102 | + if self.summary_dir.exists(): |
| 103 | + logger.info(f"Scanning summaries: {self.summary_dir}") |
| 104 | + for summary_file in sorted(self.summary_dir.glob("dispatcher_summary_*.json")): |
| 105 | + summary_result = self._import_summary_metadata(summary_file) |
| 106 | + result["summary"]["imported"].extend(summary_result.get("imported", [])) |
| 107 | + result["summary"]["skipped"].extend(summary_result.get("skipped", [])) |
| 108 | + result["summary"]["failed"].extend(summary_result.get("failed", [])) |
| 109 | + logger.info( |
| 110 | + f"Summary: {len(result['summary']['imported'])} imported, " |
| 111 | + f"{len(result['summary']['skipped'])} skipped, " |
| 112 | + f"{len(result['summary']['failed'])} failed" |
| 113 | + ) |
| 114 | + |
| 115 | + return result |
| 116 | + |
| 117 | + def _import_summary_metadata(self, summary_path: Path) -> Dict[str, List[str]]: |
| 118 | + """ |
| 119 | + Import summary metadata to dispatcher_summaries collection. |
| 120 | +
|
| 121 | + This stores the summary info (which tests ran, success/failure) |
| 122 | + without re-importing the actual test results. |
| 123 | + """ |
| 124 | + result = {"imported": [], "skipped": [], "failed": []} |
| 125 | + |
| 126 | + try: |
| 127 | + with open(summary_path, "r", encoding="utf-8") as f: |
| 128 | + summary_data = json.load(f) |
| 129 | + |
| 130 | + timestamp = summary_data.get("timestamp") |
| 131 | + if not timestamp: |
| 132 | + logger.warning(f"No timestamp in summary: {summary_path}") |
| 133 | + result["failed"].append(str(summary_path)) |
| 134 | + return result |
| 135 | + |
| 136 | + # Check if already exists |
| 137 | + if self.summary_repo.exists(timestamp): |
| 138 | + logger.debug(f"Summary already exists: {timestamp}") |
| 139 | + result["skipped"].append(timestamp) |
| 140 | + return result |
| 141 | + |
| 142 | + # Add source file info |
| 143 | + summary_data.setdefault("_metadata", {}) |
| 144 | + summary_data["_metadata"]["source_file"] = summary_path.name |
| 145 | + |
| 146 | + # Insert summary metadata |
| 147 | + self.summary_repo.upsert(summary_data) |
| 148 | + result["imported"].append(timestamp) |
| 149 | + logger.info(f"Imported summary: {timestamp}") |
| 150 | + |
| 151 | + except json.JSONDecodeError as e: |
| 152 | + logger.error(f"Invalid JSON in {summary_path}: {e}") |
| 153 | + result["failed"].append(str(summary_path)) |
| 154 | + except Exception as e: |
| 155 | + logger.error(f"Failed to import summary {summary_path}: {e}") |
| 156 | + result["failed"].append(str(summary_path)) |
| 157 | + |
| 158 | + return result |
| 159 | + |
| 160 | + def start(self): |
| 161 | + """Start watching for new files.""" |
| 162 | + logger.info("Performing initial scan...") |
| 163 | + self.scan() |
| 164 | + |
| 165 | + # Setup file event handler |
| 166 | + handler = _Handler(self, self._processed) |
| 167 | + |
| 168 | + self._observer = Observer() |
| 169 | + if self.output_dir.exists(): |
| 170 | + self._observer.schedule(handler, str(self.output_dir), recursive=True) |
| 171 | + logger.info(f"Watching: {self.output_dir}") |
| 172 | + if self.summary_dir.exists(): |
| 173 | + self._observer.schedule(handler, str(self.summary_dir), recursive=True) |
| 174 | + logger.info(f"Watching: {self.summary_dir}") |
| 175 | + |
| 176 | + self._observer.start() |
| 177 | + logger.info("Watcher started. Press Ctrl+C to stop.") |
| 178 | + |
| 179 | + def stop(self): |
| 180 | + """Stop watching.""" |
| 181 | + if self._observer: |
| 182 | + self._observer.stop() |
| 183 | + self._observer.join() |
| 184 | + logger.info("Watcher stopped") |
| 185 | + |
| 186 | + def run_forever(self): |
| 187 | + """Start watcher and block until interrupted.""" |
| 188 | + self.start() |
| 189 | + try: |
| 190 | + while self._observer and self._observer.is_alive(): |
| 191 | + time.sleep(1) |
| 192 | + except KeyboardInterrupt: |
| 193 | + logger.info("Interrupted by user") |
| 194 | + finally: |
| 195 | + self.stop() |
| 196 | + |
| 197 | + |
| 198 | +class _Handler(FileSystemEventHandler): |
| 199 | + """Internal handler for file system events.""" |
| 200 | + |
| 201 | + def __init__(self, watcher: Watcher, processed: Set[str]): |
| 202 | + self.watcher = watcher |
| 203 | + self.processed = processed |
| 204 | + |
| 205 | + def on_created(self, event): |
| 206 | + if event.is_directory: |
| 207 | + return |
| 208 | + self._process_file(Path(event.src_path)) |
| 209 | + |
| 210 | + def on_modified(self, event): |
| 211 | + if event.is_directory: |
| 212 | + return |
| 213 | + path = Path(event.src_path) |
| 214 | + if str(path) not in self.processed: |
| 215 | + self._process_file(path) |
| 216 | + |
| 217 | + def _process_file(self, path: Path): |
| 218 | + if path.suffix != ".json": |
| 219 | + return |
| 220 | + |
| 221 | + path_str = str(path) |
| 222 | + if path_str in self.processed: |
| 223 | + return |
| 224 | + |
| 225 | + # Wait for file to be fully written |
| 226 | + time.sleep(0.5) |
| 227 | + |
| 228 | + if is_dispatcher_summary_file(path): |
| 229 | + logger.info(f"New summary: {path.name}") |
| 230 | + result = self.watcher._import_summary_metadata(path) |
| 231 | + if result["imported"]: |
| 232 | + self.processed.add(path_str) |
| 233 | + logger.info(f"Imported summary metadata") |
| 234 | + elif not should_skip_file(path): |
| 235 | + logger.info(f"New test result: {path.name}") |
| 236 | + run_id = self.watcher.importer.import_test_result(path) |
| 237 | + if run_id: |
| 238 | + self.processed.add(path_str) |
| 239 | + logger.info(f"Imported: {run_id}") |
| 240 | + |
| 241 | + |
| 242 | +def main(): |
| 243 | + parser = argparse.ArgumentParser( |
| 244 | + description="Watch for test results and import to MongoDB" |
| 245 | + ) |
| 246 | + parser.add_argument("--scan", action="store_true", help="One-time scan only") |
| 247 | + parser.add_argument("--output-dir", default="./output", help="Output directory") |
| 248 | + parser.add_argument("--summary-dir", default="./summary_output", help="Summary directory") |
| 249 | + parser.add_argument("--mongo-uri", help="MongoDB connection URI") |
| 250 | + parser.add_argument("-v", "--verbose", action="store_true", help="Verbose output") |
| 251 | + args = parser.parse_args() |
| 252 | + |
| 253 | + logging.basicConfig( |
| 254 | + level=logging.DEBUG if args.verbose else logging.INFO, |
| 255 | + format="%(asctime)s - %(levelname)s - %(message)s", |
| 256 | + datefmt="%H:%M:%S", |
| 257 | + ) |
| 258 | + |
| 259 | + try: |
| 260 | + watcher = Watcher( |
| 261 | + output_dir=args.output_dir, |
| 262 | + summary_dir=args.summary_dir, |
| 263 | + mongo_uri=args.mongo_uri, |
| 264 | + ) |
| 265 | + |
| 266 | + if args.scan: |
| 267 | + result = watcher.scan() |
| 268 | + print("\n=== Output Files (test_runs) ===") |
| 269 | + print(f"Imported: {len(result['output']['imported'])}") |
| 270 | + print(f"Skipped: {len(result['output']['skipped'])}") |
| 271 | + print(f"Failed: {len(result['output']['failed'])}") |
| 272 | + print("\n=== Summary Files (dispatcher_summaries) ===") |
| 273 | + print(f"Imported: {len(result['summary']['imported'])}") |
| 274 | + print(f"Skipped: {len(result['summary']['skipped'])}") |
| 275 | + print(f"Failed: {len(result['summary']['failed'])}") |
| 276 | + else: |
| 277 | + watcher.run_forever() |
| 278 | + |
| 279 | + except ConnectionError as e: |
| 280 | + print(f"Error: {e}") |
| 281 | + sys.exit(1) |
| 282 | + except KeyboardInterrupt: |
| 283 | + pass |
| 284 | + |
| 285 | + |
| 286 | +if __name__ == "__main__": |
| 287 | + main() |
0 commit comments