|
| 1 | +from pathlib import Path |
| 2 | +from typing import Optional, Tuple |
| 3 | + |
| 4 | +import click |
| 5 | + |
| 6 | +from robotcode.plugin import Application, OutputFormat, pass_application |
| 7 | +from robotcode.robot.config.loader import load_robot_config_from_path |
| 8 | +from robotcode.robot.config.utils import get_config_files |
| 9 | +from robotcode.robot.diagnostics.data_cache import CACHE_DIR_NAME, CacheSection, SqliteDataCache, build_cache_dir |
| 10 | + |
| 11 | +from ..config import AnalyzeConfig |
| 12 | + |
| 13 | +_SECTION_NAMES = {s.name.lower(): s for s in CacheSection} |
| 14 | + |
| 15 | + |
| 16 | +def _resolve_cache( |
| 17 | + app: Application, |
| 18 | + paths: Tuple[Path, ...], |
| 19 | +) -> Tuple[Path, Optional[SqliteDataCache]]: |
| 20 | + config_files, root_folder, _ = get_config_files( |
| 21 | + paths, |
| 22 | + app.config.config_files, |
| 23 | + root_folder=app.config.root, |
| 24 | + no_vcs=app.config.no_vcs, |
| 25 | + verbose_callback=app.verbose, |
| 26 | + ) |
| 27 | + |
| 28 | + robot_config = load_robot_config_from_path( |
| 29 | + *config_files, extra_tools={"robotcode-analyze": AnalyzeConfig}, verbose_callback=app.verbose |
| 30 | + ) |
| 31 | + |
| 32 | + analyzer_config = robot_config.tool.get("robotcode-analyze", None) if robot_config.tool is not None else None |
| 33 | + |
| 34 | + cache_base_path = root_folder or Path.cwd() |
| 35 | + if analyzer_config is not None and isinstance(analyzer_config, AnalyzeConfig): |
| 36 | + if analyzer_config.cache is not None and analyzer_config.cache.cache_dir is not None: |
| 37 | + cache_base_path = Path(analyzer_config.cache.cache_dir) |
| 38 | + |
| 39 | + cache_dir = build_cache_dir(cache_base_path) |
| 40 | + |
| 41 | + if not cache_dir.exists() or not (cache_dir / "cache.db").exists(): |
| 42 | + return cache_dir, None |
| 43 | + |
| 44 | + from ..__version__ import __version__ |
| 45 | + |
| 46 | + return cache_dir, SqliteDataCache(cache_dir, app_version=__version__) |
| 47 | + |
| 48 | + |
| 49 | +def _parse_sections(sections: Tuple[str, ...]) -> Optional[Tuple[CacheSection, ...]]: |
| 50 | + if not sections: |
| 51 | + return None |
| 52 | + |
| 53 | + result = [] |
| 54 | + for s in sections: |
| 55 | + s_lower = s.lower() |
| 56 | + if s_lower not in _SECTION_NAMES: |
| 57 | + raise click.BadParameter( |
| 58 | + f"Unknown section '{s}'. Choose from: {', '.join(_SECTION_NAMES)}", |
| 59 | + param_hint="'--section'", |
| 60 | + ) |
| 61 | + result.append(_SECTION_NAMES[s_lower]) |
| 62 | + return tuple(result) |
| 63 | + |
| 64 | + |
| 65 | +def _format_bytes(n: int) -> str: |
| 66 | + if n < 1024: |
| 67 | + return f"{n} B" |
| 68 | + if n < 1024 * 1024: |
| 69 | + return f"{n / 1024:.1f} KB" |
| 70 | + return f"{n / (1024 * 1024):.1f} MB" |
| 71 | + |
| 72 | + |
| 73 | +@click.group( |
| 74 | + name="cache", |
| 75 | + add_help_option=True, |
| 76 | + invoke_without_command=False, |
| 77 | +) |
| 78 | +def cache_group() -> None: |
| 79 | + """\ |
| 80 | + Manage the RobotCode analysis cache. |
| 81 | +
|
| 82 | + Provides subcommands to inspect, list, and clear cached data |
| 83 | + (library docs, variables, resources, namespaces). |
| 84 | + """ |
| 85 | + |
| 86 | + |
| 87 | +@cache_group.command(name="path") |
| 88 | +@click.argument( |
| 89 | + "paths", nargs=-1, type=click.Path(exists=True, dir_okay=True, file_okay=True, readable=True, path_type=Path) |
| 90 | +) |
| 91 | +@pass_application |
| 92 | +def cache_path(app: Application, paths: Tuple[Path, ...]) -> None: |
| 93 | + """\ |
| 94 | + Print the cache directory path. |
| 95 | +
|
| 96 | + Outputs the resolved cache directory for the current project |
| 97 | + and Python/Robot Framework version combination. |
| 98 | + """ |
| 99 | + cache_dir, db = _resolve_cache(app, paths) |
| 100 | + try: |
| 101 | + app.echo(str(cache_dir)) |
| 102 | + finally: |
| 103 | + if db is not None: |
| 104 | + db.close() |
| 105 | + |
| 106 | + |
| 107 | +@cache_group.command(name="info") |
| 108 | +@click.argument( |
| 109 | + "paths", nargs=-1, type=click.Path(exists=True, dir_okay=True, file_okay=True, readable=True, path_type=Path) |
| 110 | +) |
| 111 | +@pass_application |
| 112 | +def cache_info(app: Application, paths: Tuple[Path, ...]) -> None: |
| 113 | + """\ |
| 114 | + Show cache statistics. |
| 115 | +
|
| 116 | + Displays the cache directory, database size, app version, and |
| 117 | + per-section entry counts with timestamps. |
| 118 | + """ |
| 119 | + cache_dir, db = _resolve_cache(app, paths) |
| 120 | + |
| 121 | + if db is None: |
| 122 | + app.echo(f"Cache directory: {cache_dir}") |
| 123 | + app.echo("No cache database found.") |
| 124 | + return |
| 125 | + |
| 126 | + try: |
| 127 | + db_path = db.db_path |
| 128 | + db_size = db_path.stat().st_size if db_path.exists() else 0 |
| 129 | + |
| 130 | + section_data = [] |
| 131 | + total_entries = 0 |
| 132 | + total_bytes = 0 |
| 133 | + for section in CacheSection: |
| 134 | + stats = db.get_section_stats(section) |
| 135 | + total_entries += stats.entry_count |
| 136 | + total_bytes += stats.total_blob_bytes |
| 137 | + section_data.append( |
| 138 | + { |
| 139 | + "section": section.name.lower(), |
| 140 | + "entries": stats.entry_count, |
| 141 | + "size": stats.total_blob_bytes, |
| 142 | + "size_formatted": _format_bytes(stats.total_blob_bytes) if stats.entry_count else "—", |
| 143 | + "created": stats.oldest_created or None, |
| 144 | + "modified": stats.newest_modified or None, |
| 145 | + } |
| 146 | + ) |
| 147 | + |
| 148 | + if app.config.output_format is None or app.config.output_format == OutputFormat.TEXT: |
| 149 | + if app.colored: |
| 150 | + lines = [ |
| 151 | + f"- **Directory:** {cache_dir}", |
| 152 | + f"- **Database:** {db_path.name} ({_format_bytes(db_size)})", |
| 153 | + f"- **Version:** {db.app_version or '(unknown)'}", |
| 154 | + "", |
| 155 | + "| Section | Entries | Size | Created | Modified |", |
| 156 | + "|---|---:|---:|---|---|", |
| 157 | + ] |
| 158 | + for s in section_data: |
| 159 | + lines.append( |
| 160 | + f"| {s['section']} | {s['entries']} | {s['size_formatted']}" |
| 161 | + f" | {s['created'] or '—'} | {s['modified'] or '—'} |" |
| 162 | + ) |
| 163 | + lines.append(f"| **Total** | **{total_entries}** | **{_format_bytes(total_bytes)}** | | |") |
| 164 | + app.echo_as_markdown("\n".join(lines)) |
| 165 | + else: |
| 166 | + app.echo(f" Directory: {cache_dir}") |
| 167 | + app.echo(f" Database: {db_path.name} ({_format_bytes(db_size)})") |
| 168 | + app.echo(f" Version: {db.app_version or '(unknown)'}") |
| 169 | + app.echo("") |
| 170 | + header = f" {'Section':<12} {'Entries':>7} {'Size':>10} {'Created':19} {'Modified':19}" |
| 171 | + app.echo(header) |
| 172 | + app.echo(f" {'─' * (len(header) - 2)}") |
| 173 | + for s in section_data: |
| 174 | + app.echo( |
| 175 | + f" {s['section']:<12} {s['entries']:>7} {s['size_formatted']:>10}" |
| 176 | + f" {(s['created'] or '—'):19} {(s['modified'] or '—'):19}" |
| 177 | + ) |
| 178 | + app.echo(f" {'─' * (len(header) - 2)}") |
| 179 | + app.echo(f" {'Total':<12} {total_entries:>7} {_format_bytes(total_bytes):>10}") |
| 180 | + else: |
| 181 | + app.print_data( |
| 182 | + { |
| 183 | + "directory": str(cache_dir), |
| 184 | + "database": db_path.name, |
| 185 | + "database_size": db_size, |
| 186 | + "version": db.app_version or "", |
| 187 | + "sections": [ |
| 188 | + {k: v for k, v in s.items() if k != "size_formatted" and v is not None} for s in section_data |
| 189 | + ], |
| 190 | + "total_entries": total_entries, |
| 191 | + "total_size": total_bytes, |
| 192 | + } |
| 193 | + ) |
| 194 | + finally: |
| 195 | + db.close() |
| 196 | + |
| 197 | + |
| 198 | +@cache_group.command(name="list") |
| 199 | +@click.option( |
| 200 | + "-s", |
| 201 | + "--section", |
| 202 | + "sections", |
| 203 | + multiple=True, |
| 204 | + metavar="SECTION", |
| 205 | + help="Filter by section (library, variables, resource, namespace). Can be specified multiple times.", |
| 206 | +) |
| 207 | +@click.option( |
| 208 | + "-p", |
| 209 | + "--pattern", |
| 210 | + "patterns", |
| 211 | + multiple=True, |
| 212 | + metavar="PATTERN", |
| 213 | + help="Filter entries by glob pattern (e.g. 'robot.*', '*BuiltIn*'). Can be specified multiple times.", |
| 214 | +) |
| 215 | +@click.argument( |
| 216 | + "paths", nargs=-1, type=click.Path(exists=True, dir_okay=True, file_okay=True, readable=True, path_type=Path) |
| 217 | +) |
| 218 | +@pass_application |
| 219 | +def cache_list(app: Application, sections: Tuple[str, ...], patterns: Tuple[str, ...], paths: Tuple[Path, ...]) -> None: |
| 220 | + """\ |
| 221 | + List cached entries. |
| 222 | +
|
| 223 | + Shows all entries in the cache with their timestamps and sizes. |
| 224 | + Use --section to filter by specific cache sections. |
| 225 | + Use --pattern to filter entries by glob pattern. |
| 226 | + """ |
| 227 | + from fnmatch import fnmatch |
| 228 | + |
| 229 | + _, db = _resolve_cache(app, paths) |
| 230 | + |
| 231 | + if db is None: |
| 232 | + app.echo("No cache database found.") |
| 233 | + return |
| 234 | + |
| 235 | + try: |
| 236 | + selected = _parse_sections(sections) |
| 237 | + target_sections = selected if selected else tuple(CacheSection) |
| 238 | + |
| 239 | + def _matches(name: str) -> bool: |
| 240 | + if not patterns: |
| 241 | + return True |
| 242 | + return any(fnmatch(name, p) for p in patterns) |
| 243 | + |
| 244 | + if app.config.output_format is None or app.config.output_format == OutputFormat.TEXT: |
| 245 | + if app.colored: |
| 246 | + lines: list[str] = [] |
| 247 | + for section in target_sections: |
| 248 | + entries = [e for e in db.list_entries(section) if _matches(e.entry_name)] |
| 249 | + if not entries: |
| 250 | + continue |
| 251 | + lines.append(f"### {section.name.lower()} ({len(entries)} entries)") |
| 252 | + lines.append("") |
| 253 | + lines.append("| Name | Size | Created | Modified |") |
| 254 | + lines.append("|---|---:|---|---|") |
| 255 | + for entry in entries: |
| 256 | + size = _format_bytes(entry.meta_bytes + entry.data_bytes) |
| 257 | + created = entry.created_at or "—" |
| 258 | + modified = entry.modified_at or "—" |
| 259 | + lines.append(f"| {entry.entry_name} | {size} | {created} | {modified} |") |
| 260 | + lines.append("") |
| 261 | + if lines: |
| 262 | + app.echo_as_markdown("\n".join(lines)) |
| 263 | + else: |
| 264 | + app.echo("No entries found.") |
| 265 | + else: |
| 266 | + found = False |
| 267 | + for section in target_sections: |
| 268 | + entries = [e for e in db.list_entries(section) if _matches(e.entry_name)] |
| 269 | + if not entries: |
| 270 | + continue |
| 271 | + found = True |
| 272 | + app.echo(f"[{section.name.lower()}] ({len(entries)} entries)") |
| 273 | + for entry in entries: |
| 274 | + size = _format_bytes(entry.meta_bytes + entry.data_bytes) |
| 275 | + created = entry.created_at or "—" |
| 276 | + modified = entry.modified_at or "—" |
| 277 | + app.echo(f" {entry.entry_name} size={size} created={created} modified={modified}") |
| 278 | + app.echo("") |
| 279 | + if not found: |
| 280 | + app.echo("No entries found.") |
| 281 | + else: |
| 282 | + result: dict[str, list[dict[str, object]]] = {} |
| 283 | + for section in target_sections: |
| 284 | + entries = [e for e in db.list_entries(section) if _matches(e.entry_name)] |
| 285 | + if entries: |
| 286 | + result[section.name.lower()] = [ |
| 287 | + { |
| 288 | + k: v |
| 289 | + for k, v in { |
| 290 | + "name": e.entry_name, |
| 291 | + "size": e.meta_bytes + e.data_bytes, |
| 292 | + "created": e.created_at, |
| 293 | + "modified": e.modified_at, |
| 294 | + }.items() |
| 295 | + if v is not None |
| 296 | + } |
| 297 | + for e in entries |
| 298 | + ] |
| 299 | + app.print_data(result) |
| 300 | + finally: |
| 301 | + db.close() |
| 302 | + |
| 303 | + |
| 304 | +@cache_group.command(name="clear") |
| 305 | +@click.option( |
| 306 | + "-s", |
| 307 | + "--section", |
| 308 | + "sections", |
| 309 | + multiple=True, |
| 310 | + metavar="SECTION", |
| 311 | + help="Clear only specific sections (library, variables, resource, namespace). Can be specified multiple times.", |
| 312 | +) |
| 313 | +@click.argument( |
| 314 | + "paths", nargs=-1, type=click.Path(exists=True, dir_okay=True, file_okay=True, readable=True, path_type=Path) |
| 315 | +) |
| 316 | +@pass_application |
| 317 | +def cache_clear(app: Application, sections: Tuple[str, ...], paths: Tuple[Path, ...]) -> None: |
| 318 | + """\ |
| 319 | + Clear the analysis cache. |
| 320 | +
|
| 321 | + Removes cached entries from the database. By default clears all sections. |
| 322 | + Use --section to clear specific sections only. |
| 323 | + """ |
| 324 | + _, db = _resolve_cache(app, paths) |
| 325 | + |
| 326 | + if db is None: |
| 327 | + app.echo("No cache database found.") |
| 328 | + return |
| 329 | + |
| 330 | + try: |
| 331 | + selected = _parse_sections(sections) |
| 332 | + |
| 333 | + if selected: |
| 334 | + total = 0 |
| 335 | + for section in selected: |
| 336 | + count = db.clear_section(section) |
| 337 | + total += count |
| 338 | + app.echo(f"Cleared {count} entries from {section.name.lower()}.") |
| 339 | + else: |
| 340 | + total = db.clear_all() |
| 341 | + |
| 342 | + app.echo(f"Removed {total} entries total.") |
| 343 | + finally: |
| 344 | + db.close() |
| 345 | + |
| 346 | + |
| 347 | +def _resolve_cache_root( |
| 348 | + app: Application, |
| 349 | + paths: Tuple[Path, ...], |
| 350 | +) -> Path: |
| 351 | + config_files, root_folder, _ = get_config_files( |
| 352 | + paths, |
| 353 | + app.config.config_files, |
| 354 | + root_folder=app.config.root, |
| 355 | + no_vcs=app.config.no_vcs, |
| 356 | + verbose_callback=app.verbose, |
| 357 | + ) |
| 358 | + |
| 359 | + robot_config = load_robot_config_from_path( |
| 360 | + *config_files, extra_tools={"robotcode-analyze": AnalyzeConfig}, verbose_callback=app.verbose |
| 361 | + ) |
| 362 | + |
| 363 | + analyzer_config = robot_config.tool.get("robotcode-analyze", None) if robot_config.tool is not None else None |
| 364 | + |
| 365 | + cache_base_path = root_folder or Path.cwd() |
| 366 | + if analyzer_config is not None and isinstance(analyzer_config, AnalyzeConfig): |
| 367 | + if analyzer_config.cache is not None and analyzer_config.cache.cache_dir is not None: |
| 368 | + cache_base_path = Path(analyzer_config.cache.cache_dir) |
| 369 | + |
| 370 | + return cache_base_path / CACHE_DIR_NAME |
| 371 | + |
| 372 | + |
| 373 | +@cache_group.command(name="prune") |
| 374 | +@click.argument( |
| 375 | + "paths", nargs=-1, type=click.Path(exists=True, dir_okay=True, file_okay=True, readable=True, path_type=Path) |
| 376 | +) |
| 377 | +@pass_application |
| 378 | +def cache_prune(app: Application, paths: Tuple[Path, ...]) -> None: |
| 379 | + """\ |
| 380 | + Remove the entire cache directory. |
| 381 | +
|
| 382 | + Deletes the .robotcode_cache directory and all its contents, |
| 383 | + including caches for all Python and Robot Framework versions. |
| 384 | + """ |
| 385 | + import shutil |
| 386 | + |
| 387 | + cache_root = _resolve_cache_root(app, paths) |
| 388 | + |
| 389 | + if not cache_root.exists(): |
| 390 | + app.echo(f"Cache directory does not exist: {cache_root}") |
| 391 | + return |
| 392 | + |
| 393 | + shutil.rmtree(cache_root) |
| 394 | + app.echo(f"Removed {cache_root}") |
0 commit comments