diff --git a/.github/workflows/llm-metadata-validation.yml b/.github/workflows/llm-metadata-validation.yml new file mode 100644 index 0000000..0a2c649 --- /dev/null +++ b/.github/workflows/llm-metadata-validation.yml @@ -0,0 +1,111 @@ +name: LLM Metadata Validation + +on: + pull_request: + paths: + - "data/**" + - "scripts/validate_llm_metadata.py" + - "tests/test_validate_llm_metadata.py" + - ".github/workflows/llm-metadata-validation.yml" + workflow_dispatch: + +jobs: + validate-llm-metadata: + runs-on: ubuntu-latest + permissions: + contents: read + pull-requests: write + env: + GEMINI_API_KEY: ${{ secrets.GEMINI_API_KEY }} + steps: + - uses: actions/checkout@v4 + with: + fetch-depth: 0 + + - name: Set up Python + uses: actions/setup-python@v5 + with: + python-version: "3.11" + cache: pip + + - name: Install dependencies + run: pip install -r requirements.txt + + - name: Detect newly added entry IDs + id: changed-ids + if: github.event_name == 'pull_request' + run: | + BASE_SHA="${{ github.event.pull_request.base.sha }}" + HEAD_SHA="${{ github.event.pull_request.head.sha }}" + ADDED_IDS="$(git diff --unified=0 "${BASE_SHA}" "${HEAD_SHA}" -- data/models.yaml data/datasets.yaml data/tools.yaml \ + | rg '^\+\s*id:\s*' -o -r '$0' \ + | sed -E 's/^\+\s*id:\s*//' \ + | tr -d '"' \ + | tr '\n' ',' \ + | sed 's/,$//')" + echo "entry_ids=${ADDED_IDS}" >> "$GITHUB_OUTPUT" + + - name: Skip when no new metadata entries + if: github.event_name == 'pull_request' && steps.changed-ids.outputs.entry_ids == '' + run: | + mkdir -p reports + cat < reports/llm_validation_report.json + {"status":"skipped","skipped_reason":"No newly added metadata entries in this PR.","counts":{"pass":0,"warning":0,"fail":0},"results":[]} + EOF + cat < reports/llm_validation_summary.md + # LLM Metadata Validation + - Status: \`skipped\` + - Pass: \`0\` + - Warning: \`0\` + - Fail: \`0\` + - Skipped reason: No newly added metadata entries in this PR. + EOF + + - name: Run LLM metadata validation + if: github.event_name != 'pull_request' || steps.changed-ids.outputs.entry_ids != '' + run: > + python scripts/validate_llm_metadata.py + --entry-ids "${{ steps.changed-ids.outputs.entry_ids }}" + --output reports/llm_validation_report.json + --summary-output reports/llm_validation_summary.md + + - name: Comment validation result on PR + if: github.event_name == 'pull_request' && always() + uses: actions/github-script@v7 + with: + script: | + const fs = require("fs"); + const marker = ""; + const summary = fs.readFileSync("reports/llm_validation_summary.md", "utf8"); + const body = `${marker}\n${summary}`; + + const { owner, repo } = context.repo; + const issue_number = context.issue.number; + + const comments = await github.paginate( + github.rest.issues.listComments, + { owner, repo, issue_number, per_page: 100 } + ); + + const existing = comments.find((comment) => + comment.user?.type === "Bot" && comment.body?.includes(marker) + ); + + if (existing) { + await github.rest.issues.updateComment({ + owner, + repo, + comment_id: existing.id, + body + }); + core.info(`Updated validation comment: ${existing.id}`); + } else { + const created = await github.rest.issues.createComment({ + owner, + repo, + issue_number, + body + }); + core.info(`Created validation comment: ${created.data.id}`); + } + diff --git a/.gitignore b/.gitignore index 3cec0f8..7864666 100644 --- a/.gitignore +++ b/.gitignore @@ -19,3 +19,4 @@ env/ # Generated (docs/data.js is a build artifact from generate_site.py) docs/data.js +reports/ \ No newline at end of file diff --git a/scripts/validate_llm_metadata.py b/scripts/validate_llm_metadata.py new file mode 100644 index 0000000..686d1b7 --- /dev/null +++ b/scripts/validate_llm_metadata.py @@ -0,0 +1,595 @@ +#!/usr/bin/env python3 +""" +LLM-based validation for submitted tags and summaries. + +This script compares repository metadata against README and paper abstract +evidence, asks Gemini for a structured JSON verdict, then writes a report +consumable by pytest and GitHub Actions. + +Usage: + python scripts/validate_llm_metadata.py + python scripts/validate_llm_metadata.py --output reports/llm.json +""" + +from __future__ import annotations + +import argparse +import json +import os +import re +import sys +import time +from dataclasses import asdict, dataclass, field +from html import unescape +from pathlib import Path +from typing import Any +from urllib.parse import urlparse + +import requests +import yaml + +ROOT = Path(__file__).parent.parent +DATA_DIR = ROOT / "data" +DEFAULT_REPORT_PATH = ROOT / "reports" / "llm_validation_report.json" +DEFAULT_SUMMARY_PATH = ROOT / "reports" / "llm_validation_summary.md" + +GEMINI_API_URL = ( + "https://generativelanguage.googleapis.com/v1beta/models/" + "{model}:generateContent" +) +DEFAULT_GEMINI_MODEL = os.getenv("GEMINI_MODEL", "gemini-2.5-flash") +REQUEST_TIMEOUT_SECONDS = 30 +MAX_EVIDENCE_CHARS = 6000 +VALID_VERDICTS = {"pass", "warning", "fail"} +RETRYABLE_STATUS_CODES = {429, 503} +MAX_GEMINI_RETRIES = 3 +RETRY_BACKOFF_SECONDS = (1.0, 2.0, 4.0) + +VALIDATION_SCHEMA: dict[str, Any] = { + "type": "OBJECT", + "properties": { + "tag_score": {"type": "NUMBER"}, + "summary_score": {"type": "NUMBER"}, + "final_verdict": {"type": "STRING", "enum": ["pass", "warning", "fail"]}, + "reason": {"type": "STRING"}, + "unsupported_tags": {"type": "ARRAY", "items": {"type": "STRING"}}, + "unsupported_claims": {"type": "ARRAY", "items": {"type": "STRING"}}, + }, + "required": [ + "tag_score", + "summary_score", + "final_verdict", + "reason", + "unsupported_tags", + "unsupported_claims", + ], +} + + +@dataclass +class EvidenceBundle: + readme_text: str = "" + abstract_text: str = "" + issues: list[str] = field(default_factory=list) + + +@dataclass +class ValidationResult: + entry_id: str + entry_type: str + name: str + tags: list[str] + summary: str + summary_ko: str + tag_score: float + summary_score: float + final_verdict: str + reason: str + unsupported_tags: list[str] + unsupported_claims: list[str] + evidence_sources: list[str] + evidence_issues: list[str] + + +def load_yaml(path: Path) -> list[dict[str, Any]]: + with open(path, encoding="utf-8") as handle: + return yaml.safe_load(handle) or [] + + +def iter_entries() -> list[tuple[str, dict[str, Any]]]: + entries: list[tuple[str, dict[str, Any]]] = [] + for entry_type, file_name in ( + ("model", "models.yaml"), + ("dataset", "datasets.yaml"), + ("tool", "tools.yaml"), + ): + for entry in load_yaml(DATA_DIR / file_name): + entries.append((entry_type, entry)) + return entries + + +def sanitize_text(text: str) -> str: + if not text: + return "" + text = unescape(text) + text = re.sub(r"<[^>]+>", " ", text) + text = re.sub(r"`{1,3}.*?`{1,3}", " ", text, flags=re.DOTALL) + text = re.sub(r"\[(.*?)\]\((.*?)\)", r"\1", text) + text = re.sub(r"^#{1,6}\s*", "", text, flags=re.MULTILINE) + text = re.sub(r"\s+", " ", text) + return text.strip() + + +def truncate_text(text: str, limit: int = MAX_EVIDENCE_CHARS) -> str: + text = sanitize_text(text) + if len(text) <= limit: + return text + return text[: limit - 3].rstrip() + "..." + + +def parse_github_repo(github_url: str) -> tuple[str, str] | None: + if not github_url: + return None + parsed = urlparse(github_url) + if parsed.netloc.lower() not in {"github.com", "www.github.com"}: + return None + parts = [part for part in parsed.path.strip("/").split("/") if part] + if len(parts) < 2: + return None + return parts[0], parts[1] + + +class EvidenceFetcher: + def __init__(self, session: requests.Session | None = None) -> None: + self.session = session or requests.Session() + + def fetch(self, entry: dict[str, Any]) -> EvidenceBundle: + bundle = EvidenceBundle() + + github_repo = parse_github_repo(entry.get("github_url", "")) + if github_repo: + owner, repo = github_repo + try: + bundle.readme_text = self.fetch_github_readme(owner, repo) + except Exception as exc: # pragma: no cover - network behavior + bundle.issues.append(f"README fetch failed: {exc}") + else: + bundle.issues.append("README evidence unavailable: missing or unsupported github_url") + + paper_url = entry.get("paper_url", "") + if paper_url: + try: + bundle.abstract_text = self.fetch_paper_abstract(paper_url) + except Exception as exc: # pragma: no cover - network behavior + bundle.issues.append(f"Abstract fetch failed: {exc}") + else: + bundle.issues.append("Abstract evidence unavailable: missing paper_url") + + bundle.readme_text = truncate_text(bundle.readme_text) + bundle.abstract_text = truncate_text(bundle.abstract_text) + return bundle + + def fetch_github_readme(self, owner: str, repo: str) -> str: + url = f"https://api.github.com/repos/{owner}/{repo}/readme" + response = self.session.get( + url, + headers={ + "Accept": "application/vnd.github.raw+json", + "User-Agent": "awesome-physical-ai-validator", + }, + timeout=REQUEST_TIMEOUT_SECONDS, + ) + response.raise_for_status() + return response.text + + def fetch_paper_abstract(self, paper_url: str) -> str: + if "arxiv.org" in paper_url: + return self.fetch_arxiv_abstract(paper_url) + response = self.session.get( + paper_url, + headers={"User-Agent": "awesome-physical-ai-validator"}, + timeout=REQUEST_TIMEOUT_SECONDS, + ) + response.raise_for_status() + return response.text + + def fetch_arxiv_abstract(self, paper_url: str) -> str: + response = self.session.get( + paper_url, + headers={"User-Agent": "awesome-physical-ai-validator"}, + timeout=REQUEST_TIMEOUT_SECONDS, + ) + response.raise_for_status() + html = response.text + match = re.search( + r'\s*]*>Abstract:(.*?)', + html, + flags=re.IGNORECASE | re.DOTALL, + ) + if not match: + raise ValueError("could not find abstract in paper page") + return sanitize_text(match.group(1)) + + +class GeminiValidator: + def __init__( + self, + api_key: str, + model: str = DEFAULT_GEMINI_MODEL, + session: requests.Session | None = None, + ) -> None: + self.api_key = api_key + self.model = model + self.session = session or requests.Session() + + def validate( + self, + entry_type: str, + entry: dict[str, Any], + evidence: EvidenceBundle, + ) -> dict[str, Any]: + prompt = build_prompt(entry_type, entry, evidence) + + last_retryable_error: requests.HTTPError | None = None + + for attempt in range(1, MAX_GEMINI_RETRIES + 1): + response = self.session.post( + GEMINI_API_URL.format(model=self.model), + headers={ + "Content-Type": "application/json", + "x-goog-api-key": self.api_key, + }, + json={ + "contents": [{"role": "user", "parts": [{"text": prompt}]}], + "generationConfig": { + "temperature": 0, + "responseMimeType": "application/json", + "responseSchema": VALIDATION_SCHEMA, + }, + }, + timeout=REQUEST_TIMEOUT_SECONDS, + ) + try: + response.raise_for_status() + except requests.HTTPError as exc: + print("Gemini API request failed.") + print(f"Model: {self.model}") + print(f"Status code: {response.status_code}") + print(f"Request URL: {response.request.url}") + print(f"Response body: {response.text}") + + if response.status_code in RETRYABLE_STATUS_CODES: + last_retryable_error = exc + if attempt < MAX_GEMINI_RETRIES: + backoff = RETRY_BACKOFF_SECONDS[min(attempt - 1, len(RETRY_BACKOFF_SECONDS) - 1)] + print( + f"Retrying Gemini request for {entry.get('id', '')} " + f"({attempt}/{MAX_GEMINI_RETRIES}) after {backoff:.1f}s" + ) + time.sleep(backoff) + continue + raise RetryableGeminiError( + entry_id=entry.get("id", ""), + status_code=response.status_code, + attempts=attempt, + message=f"Gemini request failed after {attempt} attempts", + ) from exc + + raise exc + + payload = response.json() + text = extract_gemini_text(payload) + result = json.loads(text) + return normalize_validation_result(result) + + if last_retryable_error is not None: + raise RetryableGeminiError( + entry_id=entry.get("id", ""), + status_code=0, + attempts=MAX_GEMINI_RETRIES, + message="Gemini request exhausted retries", + ) from last_retryable_error + raise RuntimeError("Gemini validation reached an unexpected state") + + +class MockGeminiValidator: + def __init__(self, responses: dict[str, dict[str, Any]]) -> None: + self.responses = responses + + def validate( + self, + entry_type: str, + entry: dict[str, Any], + evidence: EvidenceBundle, + ) -> dict[str, Any]: + entry_id = entry.get("id", "") + if entry_id not in self.responses: + raise KeyError(f"missing mock response for '{entry_id}'") + response = self.responses[entry_id] + if isinstance(response, Exception): + raise response + return normalize_validation_result(response) + + +class RetryableGeminiError(Exception): + def __init__(self, entry_id: str, status_code: int, attempts: int, message: str) -> None: + super().__init__(message) + self.entry_id = entry_id + self.status_code = status_code + self.attempts = attempts + + +def extract_gemini_text(payload: dict[str, Any]) -> str: + candidates = payload.get("candidates") or [] + if not candidates: + raise ValueError("Gemini response did not include any candidates") + content = candidates[0].get("content") or {} + parts = content.get("parts") or [] + if not parts: + raise ValueError("Gemini response did not include any content parts") + text = parts[0].get("text", "") + if not text: + raise ValueError("Gemini response part did not include text") + return text + + +def normalize_validation_result(result: dict[str, Any]) -> dict[str, Any]: + missing = [key for key in VALIDATION_SCHEMA["required"] if key not in result] + if missing: + raise ValueError(f"Gemini JSON missing required keys: {missing}") + + normalized = { + "tag_score": clamp_score(result["tag_score"]), + "summary_score": clamp_score(result["summary_score"]), + "final_verdict": str(result["final_verdict"]).lower(), + "reason": str(result["reason"]).strip(), + "unsupported_tags": [str(item) for item in result["unsupported_tags"]], + "unsupported_claims": [str(item) for item in result["unsupported_claims"]], + } + + if normalized["final_verdict"] not in VALID_VERDICTS: + raise ValueError( + f"Gemini final_verdict must be one of {sorted(VALID_VERDICTS)}, " + f"got '{normalized['final_verdict']}'" + ) + if not normalized["reason"]: + raise ValueError("Gemini reason must not be empty") + return normalized + + +def clamp_score(value: Any) -> float: + try: + numeric = float(value) + except (TypeError, ValueError) as exc: + raise ValueError(f"score must be numeric, got {value!r}") from exc + if numeric < 0: + return 0.0 + if numeric > 1: + return 1.0 + return round(numeric, 3) + + +def build_prompt(entry_type: str, entry: dict[str, Any], evidence: EvidenceBundle) -> str: + return f""" +당신은 Awesome Physical AI 저장소에 등록된 항목의 메타데이터를 검증하는 엄격한 검수자입니다. + +검증 목표: +사용자가 제출한 태그, 영어 요약문, 한국어 요약문이 GitHub README 및 논문 Abstract 근거에 비추어 적절한지 판단하세요. + +반드시 지켜야 할 기준: +1. 제공된 README evidence와 Abstract evidence만 근거로 사용하세요. +2. 외부 지식이나 일반적인 추측을 사용하지 마세요. +3. 태그가 그럴듯해 보여도 README 또는 Abstract에서 뒷받침되지 않으면 unsupported_tags에 포함하세요. +4. 영어/한국어 요약문에 README 또는 Abstract에서 확인되지 않는 주장, 과장된 표현, 잘못된 적용 분야, 잘못된 성능 주장, 잘못된 모델 설명이 있으면 unsupported_claims에 포함하세요. +5. README는 구현 방식, 프레임워크, 설치/사용법, 코드 기반 기능을 판단할 때 우선 근거로 사용하세요. +6. 논문 Abstract는 연구 목적, 핵심 기여, 적용 도메인, 제안 방법, 실험 대상 등을 판단할 때 우선 근거로 사용하세요. +7. README 또는 Abstract가 누락되었거나 근거가 약한 경우, 명확히 틀린 주장이 아니라면 fail보다 warning을 우선 사용하세요. +8. 명확히 근거와 모순되는 태그나 요약문 주장이 있으면 fail을 사용할 수 있습니다. +9. tag_score와 summary_score는 0.0 이상 1.0 이하의 숫자로 작성하세요. +10. final_verdict는 반드시 pass, warning, fail 중 하나만 사용하세요. +11. reason은 반드시 한국어로 작성하세요. +12. 응답은 설명 문장 없이 JSON만 반환하세요. + +판정 기준: +- pass: 태그와 요약문이 README 및 Abstract 근거로 충분히 뒷받침됨 +- warning: 근거가 부족하거나 일부 태그/주장이 약하게만 뒷받침됨 +- fail: 태그 또는 요약문이 근거와 명확히 모순되거나 핵심 내용이 잘못됨 + +반환해야 하는 JSON 스키마: +{json.dumps(VALIDATION_SCHEMA, ensure_ascii=False)} + +검증 항목 정보: +- Entry type: {entry_type} +- Entry id: {entry.get("id", "")} +- Entry name: {entry.get("name", "")} +- Submitted tags: {json.dumps(entry.get("tags", []), ensure_ascii=False)} +- Submitted English summary: {entry.get("description_en", "")} +- Submitted Korean summary: {entry.get("description_ko", "")} + +README evidence: +{evidence.readme_text or "(missing)"} + +Abstract evidence: +{evidence.abstract_text or "(missing)"} +""".strip() + + +def validate_entry( + entry_type: str, + entry: dict[str, Any], + fetcher: EvidenceFetcher, + validator: GeminiValidator | MockGeminiValidator, +) -> ValidationResult: + evidence = fetcher.fetch(entry) + evidence_sources: list[str] = [] + if evidence.readme_text: + evidence_sources.append("README") + if evidence.abstract_text: + evidence_sources.append("Abstract") + + try: + llm_result = validator.validate(entry_type, entry, evidence) + except RetryableGeminiError as exc: + warning_reason = ( + f"Gemini validation temporarily unavailable after {exc.attempts} attempts " + f"(status {exc.status_code}); marked as warning and continued." + ) + issues = list(evidence.issues) + issues.append(warning_reason) + return ValidationResult( + entry_id=entry.get("id", ""), + entry_type=entry_type, + name=entry.get("name", ""), + tags=entry.get("tags", []), + summary=entry.get("description_en", ""), + summary_ko=entry.get("description_ko", ""), + tag_score=0.0, + summary_score=0.0, + final_verdict="warning", + reason=warning_reason, + unsupported_tags=[], + unsupported_claims=[], + evidence_sources=evidence_sources, + evidence_issues=issues, + ) + + return ValidationResult( + entry_id=entry.get("id", ""), + entry_type=entry_type, + name=entry.get("name", ""), + tags=entry.get("tags", []), + summary=entry.get("description_en", ""), + summary_ko=entry.get("description_ko", ""), + tag_score=llm_result["tag_score"], + summary_score=llm_result["summary_score"], + final_verdict=llm_result["final_verdict"], + reason=llm_result["reason"], + unsupported_tags=llm_result["unsupported_tags"], + unsupported_claims=llm_result["unsupported_claims"], + evidence_sources=evidence_sources, + evidence_issues=evidence.issues, + ) + + +def build_report( + results: list[ValidationResult], + skipped_reason: str | None = None, +) -> dict[str, Any]: + counts = {"pass": 0, "warning": 0, "fail": 0} + for result in results: + counts[result.final_verdict] += 1 + + return { + "status": "skipped" if skipped_reason else "completed", + "skipped_reason": skipped_reason, + "counts": counts, + "results": [asdict(result) for result in results], + } + + +def render_actions_summary(report: dict[str, Any]) -> str: + lines = [ + "# LLM Metadata Validation", + "", + f"- Status: `{report['status']}`", + f"- Pass: `{report['counts']['pass']}`", + f"- Warning: `{report['counts']['warning']}`", + f"- Fail: `{report['counts']['fail']}`", + ] + skipped_reason = report.get("skipped_reason") + if skipped_reason: + lines.extend(["", f"- Skipped reason: {skipped_reason}"]) + return "\n".join(lines) + "\n" + + lines.extend(["", "| Entry | Type | Verdict | Tag score | Summary score | Notes |", "|---|---|---|---:|---:|---|"]) + for result in report["results"]: + notes: list[str] = [] + if result["unsupported_tags"]: + notes.append("unsupported tags: " + ", ".join(result["unsupported_tags"])) + if result["unsupported_claims"]: + notes.append("unsupported claims: " + ", ".join(result["unsupported_claims"])) + if result["evidence_issues"]: + notes.append("evidence issues: " + "; ".join(result["evidence_issues"])) + if not notes: + notes.append(result["reason"]) + lines.append( + f"| {result['entry_id']} | {result['entry_type']} | {result['final_verdict']} " + f"| {result['tag_score']:.2f} | {result['summary_score']:.2f} | {' / '.join(notes)} |" + ) + return "\n".join(lines) + "\n" + + +def determine_exit_code(report: dict[str, Any]) -> int: + return 0 + + +def write_report_files(report: dict[str, Any], output: Path, summary_output: Path) -> None: + output.parent.mkdir(parents=True, exist_ok=True) + output.write_text(json.dumps(report, ensure_ascii=False, indent=2), encoding="utf-8") + summary_output.parent.mkdir(parents=True, exist_ok=True) + summary_output.write_text(render_actions_summary(report), encoding="utf-8") + + +def parse_args(argv: list[str] | None = None) -> argparse.Namespace: + parser = argparse.ArgumentParser(description="Validate tags and summaries with Gemini") + parser.add_argument("--output", type=Path, default=DEFAULT_REPORT_PATH) + parser.add_argument("--summary-output", type=Path, default=DEFAULT_SUMMARY_PATH) + parser.add_argument("--mock-response-file", type=Path, default=None) + parser.add_argument( + "--entry-ids", + type=str, + default="", + help="Comma-separated entry IDs to validate. Empty means validate all entries.", + ) + return parser.parse_args(argv) + + +def main(argv: list[str] | None = None) -> int: + args = parse_args(argv) + requested_ids = {item.strip() for item in args.entry_ids.split(",") if item.strip()} + entries = [ + (entry_type, entry) + for entry_type, entry in iter_entries() + if ( + (entry.get("tags") or entry.get("description_en") or entry.get("description_ko")) + and (not requested_ids or entry.get("id", "") in requested_ids) + ) + ] + + fetcher = EvidenceFetcher() + + if args.mock_response_file: + mock_responses = json.loads(args.mock_response_file.read_text(encoding="utf-8")) + validator: GeminiValidator | MockGeminiValidator = MockGeminiValidator(mock_responses) + else: + api_key = os.getenv("GEMINI_API_KEY", "").strip() + if not api_key: + report = build_report([], skipped_reason="GEMINI_API_KEY is not configured") + write_report_files(report, args.output, args.summary_output) + print(f"LLM validation skipped: {report['skipped_reason']}") + return determine_exit_code(report) + validator = GeminiValidator(api_key=api_key) + + results: list[ValidationResult] = [] + for entry_type, entry in entries: + result = validate_entry(entry_type, entry, fetcher=fetcher, validator=validator) + results.append(result) + print( + f"[{result.final_verdict}] {result.entry_type}:{result.entry_id} " + f"(tag={result.tag_score:.2f}, summary={result.summary_score:.2f})" + ) + + report = build_report(results) + write_report_files(report, args.output, args.summary_output) + print(f"Report written to {args.output}") + return determine_exit_code(report) + + +if __name__ == "__main__": + sys.exit(main()) diff --git a/tests/test_validate_llm_metadata.py b/tests/test_validate_llm_metadata.py new file mode 100644 index 0000000..2935fba --- /dev/null +++ b/tests/test_validate_llm_metadata.py @@ -0,0 +1,320 @@ +""" +Unit tests for scripts/validate_llm_metadata.py +""" + +import shutil +from pathlib import Path + +import validate_llm_metadata as vlm + + +class FakeFetcher: + def __init__(self, evidence_by_id): + self.evidence_by_id = evidence_by_id + + def fetch(self, entry): + return self.evidence_by_id[entry["id"]] + + +def _entry(**overrides): + base = { + "id": "test-entry", + "name": "Test Entry", + "description_en": "A robot policy for manipulation.", + "tags": ["manipulation", "pytorch"], + "github_url": "https://github.com/example/project", + "paper_url": "https://arxiv.org/abs/2401.00001", + } + base.update(overrides) + return base + + +def test_normalize_validation_result_accepts_expected_shape(): + result = vlm.normalize_validation_result( + { + "tag_score": 0.9, + "summary_score": 0.8, + "final_verdict": "pass", + "reason": "Tags and summary are supported by the evidence.", + "unsupported_tags": [], + "unsupported_claims": [], + } + ) + + assert result["tag_score"] == 0.9 + assert result["summary_score"] == 0.8 + assert result["final_verdict"] == "pass" + + +def test_normalize_validation_result_rejects_missing_keys(): + try: + vlm.normalize_validation_result({"tag_score": 0.5}) + except ValueError as exc: + assert "missing required keys" in str(exc) + else: + raise AssertionError("normalize_validation_result should reject incomplete JSON") + + +def test_validate_entry_pass_case(): + fetcher = FakeFetcher( + { + "test-entry": vlm.EvidenceBundle( + readme_text="README says the project uses PyTorch for manipulation.", + abstract_text="The paper studies manipulation with an imitation learning policy.", + ) + } + ) + validator = vlm.MockGeminiValidator( + { + "test-entry": { + "tag_score": 0.95, + "summary_score": 0.93, + "final_verdict": "pass", + "reason": "Both tags and summary align with README and abstract.", + "unsupported_tags": [], + "unsupported_claims": [], + } + } + ) + + result = vlm.validate_entry("model", _entry(), fetcher, validator) + + assert result.final_verdict == "pass" + assert result.evidence_sources == ["README", "Abstract"] + + +def test_validate_entry_invalid_tag_case(): + fetcher = FakeFetcher( + { + "test-entry": vlm.EvidenceBundle( + readme_text="README only mentions manipulation and PyTorch.", + abstract_text="The abstract focuses on imitation learning for manipulation.", + ) + } + ) + validator = vlm.MockGeminiValidator( + { + "test-entry": { + "tag_score": 0.3, + "summary_score": 0.84, + "final_verdict": "fail", + "reason": "One tag is not supported by the evidence.", + "unsupported_tags": ["humanoid"], + "unsupported_claims": [], + } + } + ) + + result = vlm.validate_entry( + "model", + _entry(tags=["manipulation", "humanoid"]), + fetcher, + validator, + ) + + assert result.final_verdict == "fail" + assert result.unsupported_tags == ["humanoid"] + + +def test_validate_entry_exaggerated_summary_case(): + fetcher = FakeFetcher( + { + "test-entry": vlm.EvidenceBundle( + readme_text="README describes a research codebase for tabletop manipulation.", + abstract_text="The abstract reports results on tabletop manipulation.", + ) + } + ) + validator = vlm.MockGeminiValidator( + { + "test-entry": { + "tag_score": 0.88, + "summary_score": 0.2, + "final_verdict": "fail", + "reason": "The summary claims whole-body control that is not supported.", + "unsupported_tags": [], + "unsupported_claims": ["supports whole-body humanoid control"], + } + } + ) + + result = vlm.validate_entry( + "model", + _entry(description_en="Supports whole-body humanoid control for many robots."), + fetcher, + validator, + ) + + assert result.final_verdict == "fail" + assert "whole-body humanoid control" in result.unsupported_claims[0] + + +def test_validate_entry_insufficient_evidence_case(): + fetcher = FakeFetcher( + { + "test-entry": vlm.EvidenceBundle( + readme_text="", + abstract_text="", + issues=["README evidence unavailable", "Abstract evidence unavailable"], + ) + } + ) + validator = vlm.MockGeminiValidator( + { + "test-entry": { + "tag_score": 0.55, + "summary_score": 0.52, + "final_verdict": "warning", + "reason": "Evidence is too limited for a confident pass.", + "unsupported_tags": [], + "unsupported_claims": [], + } + } + ) + + result = vlm.validate_entry("model", _entry(), fetcher, validator) + + assert result.final_verdict == "warning" + assert result.evidence_issues == ["README evidence unavailable", "Abstract evidence unavailable"] + + +def test_validate_entry_retry_exhausted_becomes_warning(): + fetcher = FakeFetcher( + { + "test-entry": vlm.EvidenceBundle( + readme_text="README mentions manipulation.", + abstract_text="Abstract mentions manipulation.", + ) + } + ) + validator = vlm.MockGeminiValidator( + { + "test-entry": vlm.RetryableGeminiError( + entry_id="test-entry", + status_code=503, + attempts=3, + message="Gemini request failed after 3 attempts", + ) + } + ) + + result = vlm.validate_entry("model", _entry(), fetcher, validator) + + assert result.final_verdict == "warning" + assert result.tag_score == 0.0 + assert result.summary_score == 0.0 + assert "temporarily unavailable" in result.reason + assert any("status 503" in issue for issue in result.evidence_issues) + + +def test_build_report_counts_and_exit_code(): + results = [ + vlm.ValidationResult( + entry_id="a", + entry_type="model", + name="A", + tags=[], + summary="", + summary_ko="", + tag_score=0.9, + summary_score=0.9, + final_verdict="pass", + reason="ok", + unsupported_tags=[], + unsupported_claims=[], + evidence_sources=["README"], + evidence_issues=[], + ), + vlm.ValidationResult( + entry_id="b", + entry_type="dataset", + name="B", + tags=[], + summary="", + summary_ko="", + tag_score=0.4, + summary_score=0.4, + final_verdict="fail", + reason="bad", + unsupported_tags=["bad-tag"], + unsupported_claims=[], + evidence_sources=["README"], + evidence_issues=[], + ), + ] + + report = vlm.build_report(results) + + assert report["counts"] == {"pass": 1, "warning": 0, "fail": 1} + assert vlm.determine_exit_code(report) == 0 + + +def test_build_report_counts_warning_from_retry_fallback(): + results = [ + vlm.ValidationResult( + entry_id="retry-case", + entry_type="model", + name="Retry Case", + tags=[], + summary="", + summary_ko="", + tag_score=0.0, + summary_score=0.0, + final_verdict="warning", + reason="Gemini validation temporarily unavailable after 3 attempts (status 503); marked as warning and continued.", + unsupported_tags=[], + unsupported_claims=[], + evidence_sources=["README"], + evidence_issues=["Gemini validation temporarily unavailable after 3 attempts (status 503); marked as warning and continued."], + ) + ] + + report = vlm.build_report(results) + + assert report["counts"] == {"pass": 0, "warning": 1, "fail": 0} + assert vlm.determine_exit_code(report) == 0 + + +def test_main_skips_without_api_key(monkeypatch): + tmp_dir = Path("reports") / "pytest_llm_tmp" + if tmp_dir.exists(): + shutil.rmtree(tmp_dir) + tmp_dir.mkdir(parents=True) + output = tmp_dir / "llm.json" + summary = tmp_dir / "llm.md" + monkeypatch.delenv("GEMINI_API_KEY", raising=False) + + exit_code = vlm.main(["--output", str(output), "--summary-output", str(summary)]) + + assert exit_code == 0 + assert output.exists() + assert summary.exists() + assert '"status": "skipped"' in output.read_text(encoding="utf-8") + shutil.rmtree(tmp_dir) + + +def test_render_actions_summary_contains_table(): + report = { + "status": "completed", + "skipped_reason": None, + "counts": {"pass": 1, "warning": 1, "fail": 0}, + "results": [ + { + "entry_id": "foo", + "entry_type": "model", + "final_verdict": "warning", + "tag_score": 0.7, + "summary_score": 0.6, + "unsupported_tags": ["rl"], + "unsupported_claims": [], + "evidence_issues": ["Abstract evidence unavailable"], + "reason": "Need more evidence", + } + ], + } + + summary = vlm.render_actions_summary(report) + + assert "| Entry | Type | Verdict |" in summary + assert "foo" in summary + assert "unsupported tags: rl" in summary