|
| 1 | +# SPDX-FileCopyrightText: GitHub, Inc. |
| 2 | +# SPDX-License-Identifier: MIT |
| 3 | + |
1 | 4 | import logging |
2 | 5 |
|
3 | 6 | from fastmcp import FastMCP |
|
6 | 9 | import json |
7 | 10 | from urllib.parse import urlparse, parse_qs |
8 | 11 | from .gh_code_scanning import call_api |
9 | | -from seclab_taskflow_agent.path_utils import log_file_name |
| 12 | +from seclab_taskflow_agent.path_utils import mcp_data_dir, log_file_name |
| 13 | +from .ghsa_models import GHSA, GHSASummary, Base |
| 14 | +from pathlib import Path |
| 15 | +from sqlalchemy import create_engine |
| 16 | +from sqlalchemy.orm import Session |
| 17 | +from .utils import process_repo |
10 | 18 |
|
11 | 19 | logging.basicConfig( |
12 | 20 | level=logging.DEBUG, |
|
17 | 25 |
|
18 | 26 | mcp = FastMCP("GitHubRepoAdvisories") |
19 | 27 |
|
| 28 | +MEMORY = mcp_data_dir("seclab-taskflows", "ghsa", "GHSA_DIR") |
| 29 | + |
| 30 | + |
| 31 | +def ghsa_to_dict(result): |
| 32 | + return { |
| 33 | + "id": result.id, |
| 34 | + "ghsa_id": result.ghsa_id, |
| 35 | + "repo": result.repo.lower(), |
| 36 | + "severity": result.severity, |
| 37 | + "cve_id": result.cve_id, |
| 38 | + "description": result.description, |
| 39 | + "summary": result.summary, |
| 40 | + "published_at": result.published_at, |
| 41 | + "state": result.state, |
| 42 | + } |
| 43 | + |
| 44 | + |
| 45 | +def ghsa_summary_to_dict(summary): |
| 46 | + return { |
| 47 | + "id": summary.id, |
| 48 | + "repo": summary.repo.lower(), |
| 49 | + "total_advisories": summary.total_advisories, |
| 50 | + "high_severity_count": summary.high_severity_count, |
| 51 | + "medium_severity_count": summary.medium_severity_count, |
| 52 | + "low_severity_count": summary.low_severity_count, |
| 53 | + "summary_notes": summary.summary_notes, |
| 54 | + } |
| 55 | + |
| 56 | +class GHSABackend: |
| 57 | + def __init__(self, db_dir: str): |
| 58 | + # Directory in which the GHSA SQLite database file will be stored. |
| 59 | + self.db_dir = db_dir |
| 60 | + db_uri = "sqlite://" if not Path(self.db_dir).exists() else f"sqlite:///{self.db_dir}/ghsa.db" |
| 61 | + self.engine = create_engine(db_uri, echo=False) |
| 62 | + Base.metadata.create_all( |
| 63 | + self.engine, |
| 64 | + tables=[ |
| 65 | + GHSA.__table__, |
| 66 | + GHSASummary.__table__, |
| 67 | + ], |
| 68 | + ) |
| 69 | + |
| 70 | + def store_new_ghsa(self, repo, ghsa_id, severity, cve_id, description, summary, published_at, state): |
| 71 | + with Session(self.engine) as session: |
| 72 | + existing = session.query(GHSA).filter_by(repo=repo, ghsa_id=ghsa_id).first() |
| 73 | + if existing: |
| 74 | + if severity: |
| 75 | + existing.severity = severity |
| 76 | + if cve_id: |
| 77 | + existing.cve_id = cve_id |
| 78 | + if description: |
| 79 | + existing.description = description |
| 80 | + if summary: |
| 81 | + existing.summary = summary |
| 82 | + if published_at: |
| 83 | + existing.published_at = published_at |
| 84 | + if state: |
| 85 | + existing.state = state |
| 86 | + else: |
| 87 | + new_ghsa = GHSA( |
| 88 | + repo=repo, |
| 89 | + ghsa_id=ghsa_id, |
| 90 | + severity=severity, |
| 91 | + cve_id=cve_id, |
| 92 | + description=description, |
| 93 | + summary=summary, |
| 94 | + published_at=published_at, |
| 95 | + state=state, |
| 96 | + ) |
| 97 | + session.add(new_ghsa) |
| 98 | + session.commit() |
| 99 | + return f"Updated or added GHSA {ghsa_id} for {repo}" |
| 100 | + |
| 101 | + def get_ghsa(self, repo, ghsa_id): |
| 102 | + with Session(self.engine) as session: |
| 103 | + existing = session.query(GHSA).filter_by(repo=repo, ghsa_id=ghsa_id).first() |
| 104 | + if not existing: |
| 105 | + return None |
| 106 | + return ghsa_to_dict(existing) |
| 107 | + |
| 108 | + def get_ghsas(self, repo): |
| 109 | + with Session(self.engine) as session: |
| 110 | + existing = session.query(GHSA).filter_by(repo=repo).all() |
| 111 | + return [ghsa_to_dict(ghsa) for ghsa in existing] |
| 112 | + |
| 113 | + def store_new_ghsa_summary( |
| 114 | + self, |
| 115 | + repo, |
| 116 | + total_advisories, |
| 117 | + high_severity_count, |
| 118 | + medium_severity_count, |
| 119 | + low_severity_count, |
| 120 | + summary_notes, |
| 121 | + ): |
| 122 | + with Session(self.engine) as session: |
| 123 | + existing = session.query(GHSASummary).filter_by(repo=repo).first() |
| 124 | + if existing: |
| 125 | + existing.total_advisories = total_advisories |
| 126 | + existing.high_severity_count = high_severity_count |
| 127 | + existing.medium_severity_count = medium_severity_count |
| 128 | + existing.low_severity_count = low_severity_count |
| 129 | + existing.summary_notes = summary_notes |
| 130 | + else: |
| 131 | + new_summary = GHSASummary( |
| 132 | + repo=repo, |
| 133 | + total_advisories=total_advisories, |
| 134 | + high_severity_count=high_severity_count, |
| 135 | + medium_severity_count=medium_severity_count, |
| 136 | + low_severity_count=low_severity_count, |
| 137 | + summary_notes=summary_notes, |
| 138 | + ) |
| 139 | + session.add(new_summary) |
| 140 | + session.commit() |
| 141 | + return f"Updated or added GHSA summary for {repo}" |
| 142 | + |
| 143 | + def get_ghsa_summary(self, repo): |
| 144 | + with Session(self.engine) as session: |
| 145 | + existing = session.query(GHSASummary).filter_by(repo=repo).first() |
| 146 | + if not existing: |
| 147 | + return None |
| 148 | + return ghsa_summary_to_dict(existing) |
| 149 | + |
| 150 | + def clear_repo(self, repo): |
| 151 | + with Session(self.engine) as session: |
| 152 | + session.query(GHSA).filter_by(repo=repo).delete() |
| 153 | + session.query(GHSASummary).filter_by(repo=repo).delete() |
| 154 | + session.commit() |
| 155 | + return f"Cleared GHSA results for repo {repo}" |
| 156 | + |
| 157 | + |
| 158 | +backend = GHSABackend(MEMORY) |
20 | 159 |
|
21 | 160 | # The advisories contain a lot of information, so we need to filter |
22 | 161 | # some of it out to avoid exceeding the maximum prompt size. |
23 | 162 | def parse_advisory(advisory: dict) -> dict: |
24 | 163 | logging.debug(f"advisory: {advisory}") |
25 | 164 | return { |
26 | | - "ghsa_id": advisory.get("ghsa_id", ""), |
27 | | - "cve_id": advisory.get("cve_id", ""), |
28 | | - "summary": advisory.get("summary", ""), |
29 | | - "published_at": advisory.get("published_at", ""), |
30 | | - "state": advisory.get("state", ""), |
| 165 | + "ghsa_id": advisory.get("ghsa_id") or "", |
| 166 | + "cve_id": advisory.get("cve_id") or "", |
| 167 | + "summary": advisory.get("summary") or "", |
| 168 | + "description": advisory.get("description") or "", |
| 169 | + "severity": advisory.get("severity") or "", |
| 170 | + "published_at": advisory.get("published_at") or "", |
| 171 | + "state": advisory.get("state") or "", |
31 | 172 | } |
32 | 173 |
|
33 | 174 |
|
@@ -70,6 +211,132 @@ async def fetch_GHSA_list( |
70 | 211 | return results |
71 | 212 | return json.dumps(results, indent=2) |
72 | 213 |
|
| 214 | +@mcp.tool() |
| 215 | +async def fetch_and_store_GHSA_list( |
| 216 | + owner: str = Field(description="The owner of the repo"), repo: str = Field(description="The repository name"), |
| 217 | + return_results: bool = Field(description="Whether to return the fetched results as a JSON string", default=False) |
| 218 | +) -> str: |
| 219 | + """Fetch all GitHub Security Advisories (GHSAs) for a specific repository and store them in the database.""" |
| 220 | + results = await fetch_GHSA_list_from_gh(owner, repo) |
| 221 | + if isinstance(results, str): |
| 222 | + return results |
| 223 | + for advisory in results: |
| 224 | + backend.store_new_ghsa( |
| 225 | + process_repo(owner, repo), |
| 226 | + advisory["ghsa_id"], |
| 227 | + advisory["severity"], |
| 228 | + advisory["cve_id"], |
| 229 | + advisory["description"], |
| 230 | + advisory["summary"], |
| 231 | + advisory["published_at"], |
| 232 | + advisory["state"], |
| 233 | + ) |
| 234 | + if return_results: |
| 235 | + return json.dumps(results, indent=2) |
| 236 | + return f"Fetched and stored {len(results)} GHSAs for {owner}/{repo}" |
| 237 | + |
| 238 | +@mcp.tool() |
| 239 | +def store_new_ghsa( |
| 240 | + owner: str = Field(description="The owner of the GitHub repository"), |
| 241 | + repo: str = Field(description="The name of the GitHub repository"), |
| 242 | + ghsa_id: str = Field(description="The GHSA ID of the advisory"), |
| 243 | + severity: str = Field(description="The severity of the advisory"), |
| 244 | + cve_id: str = Field(description="The CVE ID if available", default=""), |
| 245 | + description: str = Field(description="Description for this advisory", default=""), |
| 246 | + summary: str = Field(description="Summary for this advisory", default=""), |
| 247 | + published_at: str = Field(description="Published timestamp for this advisory", default=""), |
| 248 | + state: str = Field(description="State for this advisory (e.g. published, withdrawn)", default=""), |
| 249 | +): |
| 250 | + """Store a GHSA advisory record in the database.""" |
| 251 | + return backend.store_new_ghsa( |
| 252 | + process_repo(owner, repo), ghsa_id, severity, cve_id, description, summary, published_at, state |
| 253 | + ) |
| 254 | + |
| 255 | +@mcp.tool() |
| 256 | +def get_ghsa_from_db( |
| 257 | + owner: str = Field(description="The owner of the GitHub repository"), |
| 258 | + repo: str = Field(description="The name of the GitHub repository"), |
| 259 | + ghsa_id: str = Field(description="The GHSA ID of the advisory"), |
| 260 | +): |
| 261 | + """Get a GHSA advisory record from the database.""" |
| 262 | + repo_name = process_repo(owner, repo) |
| 263 | + result = backend.get_ghsa(repo_name, ghsa_id) |
| 264 | + if not result: |
| 265 | + return f"Error: No GHSA entry exists in repo: {repo_name} and ghsa_id {ghsa_id}" |
| 266 | + return json.dumps(result) |
| 267 | + |
| 268 | + |
| 269 | +@mcp.tool() |
| 270 | +def get_ghsas_for_repo_from_db( |
| 271 | + owner: str = Field(description="The owner of the GitHub repository"), |
| 272 | + repo: str = Field(description="The name of the GitHub repository"), |
| 273 | +): |
| 274 | + """Get all GHSA advisory records for a repository.""" |
| 275 | + return json.dumps(backend.get_ghsas(process_repo(owner, repo))) |
| 276 | + |
| 277 | +@mcp.tool() |
| 278 | +def store_new_ghsa_summary( |
| 279 | + owner: str = Field(description="The owner of the GitHub repository"), |
| 280 | + repo: str = Field(description="The name of the GitHub repository"), |
| 281 | + total_advisories: int = Field(description="Total number of advisories"), |
| 282 | + high_severity_count: int = Field(description="Number of high severity advisories"), |
| 283 | + medium_severity_count: int = Field(description="Number of medium severity advisories"), |
| 284 | + low_severity_count: int = Field(description="Number of low severity advisories"), |
| 285 | + summary_notes: str = Field(description="Notes for the advisory summary", default=""), |
| 286 | +): |
| 287 | + """Store GHSA summary statistics for a repository.""" |
| 288 | + return backend.store_new_ghsa_summary( |
| 289 | + process_repo(owner, repo), |
| 290 | + total_advisories, |
| 291 | + high_severity_count, |
| 292 | + medium_severity_count, |
| 293 | + low_severity_count, |
| 294 | + summary_notes, |
| 295 | + ) |
| 296 | + |
| 297 | + |
| 298 | +@mcp.tool() |
| 299 | +def update_ghsa_summary_notes( |
| 300 | + owner: str = Field(description="The owner of the GitHub repository"), |
| 301 | + repo: str = Field(description="The name of the GitHub repository"), |
| 302 | + summary_notes: str = Field(description="New notes for the advisory summary", default=""), |
| 303 | +): |
| 304 | + """Update summary notes for the GHSA summary for a repository.""" |
| 305 | + repo_name = process_repo(owner, repo) |
| 306 | + existing = backend.get_ghsa_summary(repo_name) |
| 307 | + if not existing: |
| 308 | + return f"Error: No GHSA summary exists in repo: {repo_name}" |
| 309 | + return backend.store_new_ghsa_summary( |
| 310 | + repo_name, |
| 311 | + existing["total_advisories"], |
| 312 | + existing["high_severity_count"], |
| 313 | + existing["medium_severity_count"], |
| 314 | + existing["low_severity_count"], |
| 315 | + summary_notes, |
| 316 | + ) |
| 317 | + |
| 318 | + |
| 319 | +@mcp.tool() |
| 320 | +def get_ghsa_summary( |
| 321 | + owner: str = Field(description="The owner of the GitHub repository"), |
| 322 | + repo: str = Field(description="The name of the GitHub repository"), |
| 323 | +): |
| 324 | + """Get the GHSA summary for a repository.""" |
| 325 | + repo_name = process_repo(owner, repo) |
| 326 | + result = backend.get_ghsa_summary(repo_name) |
| 327 | + if not result: |
| 328 | + return f"Error: No GHSA summary exists in repo: {repo_name}" |
| 329 | + return json.dumps(result) |
| 330 | + |
| 331 | + |
| 332 | +@mcp.tool() |
| 333 | +def clear_repo( |
| 334 | + owner: str = Field(description="The owner of the GitHub repository"), |
| 335 | + repo: str = Field(description="The name of the GitHub repository"), |
| 336 | +): |
| 337 | + """Clear GHSA and GHSA summary records for a repository.""" |
| 338 | + return backend.clear_repo(process_repo(owner, repo)) |
| 339 | + |
73 | 340 |
|
74 | 341 | async def fetch_GHSA_details_from_gh(owner: str, repo: str, ghsa_id: str) -> str | dict: |
75 | 342 | """Fetch the details of a repository security advisory.""" |
|
0 commit comments