Skip to content

Commit b96207b

Browse files
Fix imports, annotations, and function naming issues
Co-authored-by: kevinbackhouse <4358136+kevinbackhouse@users.noreply.github.com>
1 parent 5cc8c2e commit b96207b

5 files changed

Lines changed: 38 additions & 19 deletions

File tree

src/seclab_taskflows/mcp_servers/alert_results_models.py

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,8 @@
11
# SPDX-FileCopyrightText: 2025 GitHub
22
# SPDX-License-Identifier: MIT
33

4+
from __future__ import annotations
5+
46
from typing import Optional
57

68
from sqlalchemy import Column, ForeignKey, Integer, Text

src/seclab_taskflows/mcp_servers/codeql_python/codeql_sqlite_models.py

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,8 @@
11
# SPDX-FileCopyrightText: 2025 GitHub
22
# SPDX-License-Identifier: MIT
33

4+
from __future__ import annotations
5+
46
from typing import Optional
57

68
from sqlalchemy import Text

src/seclab_taskflows/mcp_servers/codeql_python/mcp_server.py

Lines changed: 12 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
# SPDX-FileCopyrightText: 2025 GitHub
22
# SPDX-License-Identifier: MIT
33

4+
from __future__ import annotations
45

56
import csv
67
import importlib.resources
@@ -18,8 +19,8 @@
1819
from sqlalchemy import create_engine
1920
from sqlalchemy.orm import Session
2021

21-
from ..utils import process_repo
22-
from .codeql_sqlite_models import Base, Source
22+
from seclab_taskflows.mcp_servers.utils import process_repo
23+
from seclab_taskflows.mcp_servers.codeql_python.codeql_sqlite_models import Base, Source
2324

2425
logging.basicConfig(
2526
level=logging.DEBUG,
@@ -53,27 +54,29 @@ def source_to_dict(result):
5354
}
5455

5556
def _resolve_query_path(language: str, query: str) -> Path:
56-
global TEMPLATED_QUERY_PATHS
5757
if language not in TEMPLATED_QUERY_PATHS:
58-
raise RuntimeError(f"Error: Language `{language}` not supported!")
58+
msg = f"Error: Language `{language}` not supported!"
59+
raise RuntimeError(msg)
5960
query_path = TEMPLATED_QUERY_PATHS[language].get(query)
6061
if not query_path:
61-
raise RuntimeError(f"Error: query `{query}` not supported for `{language}`!")
62+
msg = f"Error: query `{query}` not supported for `{language}`!"
63+
raise RuntimeError(msg)
6264
return Path(query_path)
6365

6466

6567
def _resolve_db_path(relative_db_path: str | Path):
66-
global CODEQL_DBS_BASE_PATH
6768
# path joins will return "/B" if "/A" / "////B" etc. as well
6869
# not windows compatible and probably needs additional hardening
6970
relative_db_path = str(relative_db_path).strip().lstrip('/')
7071
relative_db_path = Path(relative_db_path)
7172
absolute_path = (CODEQL_DBS_BASE_PATH / relative_db_path).resolve()
7273
if not absolute_path.is_relative_to(CODEQL_DBS_BASE_PATH.resolve()):
73-
raise RuntimeError(f"Error: Database path {absolute_path} is outside the base path {CODEQL_DBS_BASE_PATH}")
74+
msg = f"Error: Database path {absolute_path} is outside the base path {CODEQL_DBS_BASE_PATH}"
75+
raise RuntimeError(msg)
7476
if not absolute_path.is_dir():
7577
_debug_log(f"Database path not found: {absolute_path}")
76-
raise RuntimeError(f"Error: Database not found at {absolute_path}!")
78+
msg = f"Error: Database not found at {absolute_path}!"
79+
raise RuntimeError(msg)
7780
return str(absolute_path)
7881

7982
# This sqlite database is specifically made for CodeQL for Python MCP.
@@ -88,7 +91,7 @@ def __init__(self, memcache_state_dir: str):
8891
Base.metadata.create_all(self.engine, tables=[Source.__table__])
8992

9093

91-
def store_new_source(self, repo, source_location, line, source_type, notes, update = False):
94+
def store_new_source(self, repo, source_location, line, source_type, notes, *, update = False):
9295
with Session(self.engine) as session:
9396
existing = session.query(Source).filter_by(repo = repo, source_location = source_location, line = line).first()
9497
if existing:

src/seclab_taskflows/mcp_servers/gh_code_scanning.py

Lines changed: 7 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,8 @@
11
# SPDX-FileCopyrightText: 2025 GitHub
22
# SPDX-License-Identifier: MIT
33

4+
from __future__ import annotations
5+
46
import json
57
import logging
68
import os
@@ -17,7 +19,7 @@
1719
from sqlalchemy import create_engine
1820
from sqlalchemy.orm import Session
1921

20-
from .alert_results_models import AlertFlowGraph, AlertResults, Base
22+
from seclab_taskflows.mcp_servers.alert_results_models import AlertFlowGraph, AlertResults, Base
2123

2224
logging.basicConfig(
2325
level=logging.DEBUG,
@@ -30,6 +32,9 @@
3032

3133
GH_TOKEN = os.getenv('GH_TOKEN', default='')
3234

35+
# Minimum number of parts in an HTML URL to extract owner/repo
36+
MIN_URL_PARTS = 5
37+
3338
CODEQL_DBS_BASE_PATH = mcp_data_dir('seclab-taskflows', 'codeql', 'CODEQL_DBS_BASE_PATH')
3439
ALERT_RESULTS_DIR = mcp_data_dir('seclab-taskflows', 'gh_code_scanning', 'ALERT_RESULTS_DIR')
3540

@@ -54,7 +59,7 @@ def _get_repo_from_html_url(html_url: str) -> str:
5459
if not html_url:
5560
return ''
5661
parts = html_url.split('/')
57-
if len(parts) < 5:
62+
if len(parts) < MIN_URL_PARTS:
5863
return ''
5964
return f"{parts[3]}/{parts[4]}".lower()
6065

src/seclab_taskflows/mcp_servers/ghsa.py

Lines changed: 15 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,8 @@
1+
# SPDX-FileCopyrightText: 2025 GitHub
2+
# SPDX-License-Identifier: MIT
3+
4+
from __future__ import annotations
5+
16
import json
27
import logging
38
import re
@@ -7,7 +12,9 @@
712
from pydantic import Field
813
from seclab_taskflow_agent.path_utils import log_file_name
914

10-
from .gh_code_scanning import call_api
15+
from seclab_taskflows.mcp_servers.gh_code_scanning import call_api
16+
17+
logger = logging.getLogger(__name__)
1118

1219
logging.basicConfig(
1320
level=logging.DEBUG,
@@ -22,7 +29,7 @@
2229
# The advisories contain a lot of information, so we need to filter
2330
# some of it out to avoid exceeding the maximum prompt size.
2431
def parse_advisory(advisory: dict) -> dict:
25-
logging.debug(f"advisory: {advisory}")
32+
logger.debug("advisory: %s", advisory)
2633
return {
2734
"ghsa_id": advisory.get("ghsa_id", ""),
2835
"cve_id": advisory.get("cve_id", ""),
@@ -31,7 +38,7 @@ def parse_advisory(advisory: dict) -> dict:
3138
"state": advisory.get("state", ""),
3239
}
3340

34-
async def fetch_GHSA_list_from_gh(owner: str, repo: str) -> str | list:
41+
async def fetch_ghsa_list_from_gh(owner: str, repo: str) -> str | list:
3542
"""Fetch all security advisories for a specific repository."""
3643
url = f"https://api.github.com/repos/{owner}/{repo}/security-advisories"
3744
params = {'per_page': 100}
@@ -60,16 +67,16 @@ async def fetch_GHSA_list_from_gh(owner: str, repo: str) -> str | list:
6067
return "No advisories found."
6168

6269
@mcp.tool()
63-
async def fetch_GHSA_list(owner: str = Field(description="The owner of the repo"),
70+
async def fetch_ghsa_list(owner: str = Field(description="The owner of the repo"),
6471
repo: str = Field(description="The repository name")) -> str:
6572
"""Fetch all GitHub Security Advisories (GHSAs) for a specific repository."""
66-
results = await fetch_GHSA_list_from_gh(owner, repo)
73+
results = await fetch_ghsa_list_from_gh(owner, repo)
6774
if isinstance(results, str):
6875
return results
6976
return json.dumps(results, indent=2)
7077

7178

72-
async def fetch_GHSA_details_from_gh(owner: str, repo: str, ghsa_id: str) -> str | dict:
79+
async def fetch_ghsa_details_from_gh(owner: str, repo: str, ghsa_id: str) -> str | dict:
7380
"""Fetch the details of a repository security advisory."""
7481
url = f"https://api.github.com/repos/{owner}/{repo}/security-advisories/{ghsa_id}"
7582
resp = await call_api(url, {})
@@ -80,11 +87,11 @@ async def fetch_GHSA_details_from_gh(owner: str, repo: str, ghsa_id: str) -> str
8087
return "Not found."
8188

8289
@mcp.tool()
83-
async def fetch_GHSA_details(owner: str = Field(description="The owner of the repo"),
90+
async def fetch_ghsa_details(owner: str = Field(description="The owner of the repo"),
8491
repo: str = Field(description="The repository name"),
8592
ghsa_id: str = Field(description="The ghsa_id of the advisory")) -> str:
8693
"""Fetch a GitHub Security Advisory for a specific repository and GHSA ID."""
87-
results = await fetch_GHSA_details_from_gh(owner, repo, ghsa_id)
94+
results = await fetch_ghsa_details_from_gh(owner, repo, ghsa_id)
8895
if isinstance(results, str):
8996
return results
9097
return json.dumps(results, indent=2)

0 commit comments

Comments
 (0)