Skip to content

Commit d05a2d1

Browse files
author
Carlos-Projects
committed
chore: apply ruff fixes
1 parent df94e09 commit d05a2d1

5 files changed

Lines changed: 94 additions & 49 deletions

File tree

mcpscope/api/routes.py

Lines changed: 22 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -11,7 +11,9 @@
1111
from mcpscope.models.finding import Severity
1212
from mcpscope.models.security_event import SecurityEvent
1313
from mcp_taxonomy import (
14-
AttackCategory, Severity as TaxSeverity, DetectionMethod,
14+
AttackCategory,
15+
Severity as TaxSeverity,
16+
DetectionMethod,
1517
palisade_finding_to_taxonomy,
1618
mcpguard_event_to_taxonomy,
1719
mcpwn_finding_to_taxonomy,
@@ -92,20 +94,24 @@ def normalize_taxonomy(body: dict):
9294
)
9395
else:
9496
return JSONResponse({"error": f"Unknown source: {source}"}, status_code=400)
95-
return JSONResponse({
96-
"source": event.source,
97-
"attack_category": event.attack_category.value,
98-
"severity": event.severity.value,
99-
"confidence": event.confidence.value,
100-
"detection_method": event.detection_method.value if isinstance(event.detection_method, DetectionMethod) else event.detection_method,
101-
"title": event.title,
102-
"description": event.description,
103-
"recommendation": event.recommendation,
104-
"target": event.target,
105-
"snippet": event.snippet[:200] if event.snippet else "",
106-
"blocked": event.blocked,
107-
"risk_score": event.risk_score,
108-
})
97+
return JSONResponse(
98+
{
99+
"source": event.source,
100+
"attack_category": event.attack_category.value,
101+
"severity": event.severity.value,
102+
"confidence": event.confidence.value,
103+
"detection_method": event.detection_method.value
104+
if isinstance(event.detection_method, DetectionMethod)
105+
else event.detection_method,
106+
"title": event.title,
107+
"description": event.description,
108+
"recommendation": event.recommendation,
109+
"target": event.target,
110+
"snippet": event.snippet[:200] if event.snippet else "",
111+
"blocked": event.blocked,
112+
"risk_score": event.risk_score,
113+
}
114+
)
109115

110116

111117
def _session_value(password: str, client_ip: str) -> str:
@@ -122,7 +128,7 @@ def login_page(request: Request):
122128

123129

124130
@router.post("/api/login")
125-
def login(request: Request, response: Response, body: dict = None):
131+
def login(request: Request, response: Response, body: dict | None = None):
126132
cfg = request.app.state
127133
if not cfg.dashboard_password:
128134
return JSONResponse({"error": "Dashboard auth not configured"}, status_code=403)

mcpscope/api/server.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,7 @@
1717

1818

1919
class JSONLogFormatter(logging.Formatter):
20-
def format(self, record: logging.LoggerRecord) -> str:
20+
def format(self, record: logging.LogRecord) -> str:
2121
return json.dumps(
2222
{
2323
"time": self.formatTime(record, "%Y-%m-%dT%H:%M:%S"),

mcpscope/config.py

Lines changed: 22 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -1,10 +1,20 @@
11
from __future__ import annotations
22
import json
3+
import os
34
from pathlib import Path
45
from dataclasses import dataclass, field
6+
from typing import Any
57

68
DEFAULT_CONFIG_PATH = Path.home() / ".mcpscope" / "config.json"
79

10+
ENV_PREFIX = "MCPSCOPE_"
11+
FIELD_TYPE_MAP: dict[str, type] = {
12+
"port": int,
13+
"auto_refresh_seconds": int,
14+
"max_upload_mb": int,
15+
"log_json": bool,
16+
}
17+
818

919
@dataclass
1020
class Settings:
@@ -22,17 +32,24 @@ class Settings:
2232

2333
@classmethod
2434
def load(cls, path: str | Path | None = None) -> Settings:
35+
data: dict[str, Any] = {}
2536
path = Path(path) if path else DEFAULT_CONFIG_PATH
2637
if path.exists():
2738
try:
2839
with open(path) as f:
29-
data = json.load(f)
30-
return cls(
31-
**{k: v for k, v in data.items() if k in cls.__dataclass_fields__}
32-
)
40+
data.update(json.load(f))
3341
except (json.JSONDecodeError, TypeError):
3442
pass
35-
return cls()
43+
for field_name in cls.__dataclass_fields__:
44+
env_key = f"{ENV_PREFIX}{field_name.upper()}"
45+
if env_key in os.environ:
46+
raw = os.environ[env_key]
47+
field_type = FIELD_TYPE_MAP.get(field_name, str)
48+
if field_type is bool:
49+
data[field_name] = raw.lower() in ("1", "true", "yes")
50+
else:
51+
data[field_name] = field_type(raw)
52+
return cls(**{k: v for k, v in data.items() if k in cls.__dataclass_fields__})
3653

3754
def save(self, path: str | Path | None = None):
3855
path = Path(path) if path else DEFAULT_CONFIG_PATH

mcpscope/ingest/cisco_a2a.py

Lines changed: 45 additions & 26 deletions
Original file line numberDiff line numberDiff line change
@@ -19,19 +19,32 @@ def parse(self, data: dict) -> list[Finding]:
1919
scanner = self.SCANNER_NAME
2020
target = data.get("target", data.get("card", data.get("endpoint", "unknown")))
2121

22-
raw_findings = data.get("findings", data.get("results", data.get("assessments", [])))
22+
raw_findings = data.get(
23+
"findings", data.get("results", data.get("assessments", []))
24+
)
2325
if not isinstance(raw_findings, list):
24-
raise ParseError(f"Expected 'findings' to be a list, got {type(raw_findings).__name__}")
26+
raise ParseError(
27+
f"Expected 'findings' to be a list, got {type(raw_findings).__name__}"
28+
)
2529

2630
for item in raw_findings:
2731
if not isinstance(item, dict):
2832
continue
2933
threat_name = item.get("threat_name", "")
30-
sev_raw = item.get("severity", item.get("risk", "info")).upper()
31-
sev = Severity.CRITICAL if sev_raw == "CRITICAL" else (
32-
Severity.HIGH if sev_raw == "HIGH" else (
33-
Severity.MEDIUM if sev_raw == "MEDIUM" else (
34-
Severity.LOW if sev_raw == "LOW" else Severity.INFO)))
34+
sev_raw = (item.get("severity") or item.get("risk") or "info").upper()
35+
sev = (
36+
Severity.CRITICAL
37+
if sev_raw == "CRITICAL"
38+
else (
39+
Severity.HIGH
40+
if sev_raw == "HIGH"
41+
else (
42+
Severity.MEDIUM
43+
if sev_raw == "MEDIUM"
44+
else (Severity.LOW if sev_raw == "LOW" else Severity.INFO)
45+
)
46+
)
47+
)
3548
analyzer = item.get("analyzer", "unknown")
3649
summary = item.get("summary", "")
3750
description = item.get("description", summary)
@@ -44,29 +57,35 @@ def parse(self, data: dict) -> list[Finding]:
4457
if isinstance(item.get("details"), dict):
4558
location = item["details"].get("field", "")
4659

47-
title = threat_name or f"[{analyzer}] {summary[:60]}" if summary else f"{analyzer} finding"
60+
title = (
61+
threat_name or f"[{analyzer}] {summary[:60]}"
62+
if summary
63+
else f"{analyzer} finding"
64+
)
4865
if aitech:
4966
title = f"[{aitech}] {aitech_name}"
5067
if aisubtech:
5168
title = f"[{aisubtech}] {aisubtech_name or title}"
5269

53-
findings.append(Finding(
54-
scan_id=target,
55-
scanner=scanner,
56-
tool_name=f"a2a-{analyzer.lower()}",
57-
severity=sev,
58-
title=title,
59-
description=description or summary,
60-
raw_data={
61-
"threat_name": threat_name,
62-
"analyzer": analyzer,
63-
"aitech": aitech,
64-
"aitech_name": aitech_name,
65-
"aisubtech": aisubtech,
66-
"aisubtech_name": aisubtech_name,
67-
"location": location,
68-
"details": item.get("details"),
69-
},
70-
))
70+
findings.append(
71+
Finding(
72+
scan_id=target,
73+
scanner=scanner,
74+
tool_name=f"a2a-{analyzer.lower()}",
75+
severity=sev,
76+
title=title,
77+
description=description or summary,
78+
raw_data={
79+
"threat_name": threat_name,
80+
"analyzer": analyzer,
81+
"aitech": aitech,
82+
"aitech_name": aitech_name,
83+
"aisubtech": aisubtech,
84+
"aisubtech_name": aisubtech_name,
85+
"location": location,
86+
"details": item.get("details"),
87+
},
88+
)
89+
)
7190

7291
return findings

mcpscope/scanner.py

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,9 @@
22
import json
33
import subprocess
44
import tempfile
5+
import uuid
56
from pathlib import Path
7+
from typing import Any
68

79
from mcpscope.ingest.cisco_mcp import CiscoMCPParser
810
from mcpscope.ingest.mcpscan import MCPScanParser
@@ -12,7 +14,7 @@
1214

1315

1416
class ScannerRunner:
15-
PARSERS = {
17+
PARSERS: dict[str, dict[str, Any]] = {
1618
"mcp-scan": {
1719
"parser": MCPScanParser(),
1820
"install": "pip install mcp-scan",
@@ -68,6 +70,7 @@ def scan(
6870
raise RuntimeError("Scanner completed but no findings were detected")
6971

7072
scan = ScanRun(
73+
id=str(uuid.uuid4()),
7174
scanner=findings[0].scanner,
7275
target=target,
7376
)

0 commit comments

Comments
 (0)