|
10 | 10 |
|
11 | 11 | from mcpscope.models.finding import Severity |
12 | 12 | from mcpscope.models.security_event import SecurityEvent |
| 13 | +from mcp_taxonomy import ( |
| 14 | + AttackCategory, Severity as TaxSeverity, DetectionMethod, |
| 15 | + palisade_finding_to_taxonomy, |
| 16 | + mcpguard_event_to_taxonomy, |
| 17 | + mcpwn_finding_to_taxonomy, |
| 18 | + agentgate_signal_to_taxonomy, |
| 19 | +) |
| 20 | +from mcp_taxonomy.core import CATEGORY_SEVERITY |
13 | 21 |
|
14 | 22 | router = APIRouter() |
15 | 23 | PAGE_SIZE = 50 |
@@ -51,6 +59,55 @@ def health(): |
51 | 59 | return {"status": "ok"} |
52 | 60 |
|
53 | 61 |
|
| 62 | +@router.get("/api/taxonomy") |
| 63 | +def taxonomy_info(): |
| 64 | + """Return the canonical taxonomy definition.""" |
| 65 | + return { |
| 66 | + "attack_categories": [c.value for c in AttackCategory], |
| 67 | + "severities": [s.value for s in TaxSeverity], |
| 68 | + "detection_methods": [m.value for m in DetectionMethod], |
| 69 | + "category_severity": {c.value: s.value for c, s in CATEGORY_SEVERITY.items()}, |
| 70 | + } |
| 71 | + |
| 72 | + |
| 73 | +@router.post("/api/taxonomy/normalize") |
| 74 | +def normalize_taxonomy(body: dict): |
| 75 | + """Normalize a finding/event from any project into the canonical taxonomy.""" |
| 76 | + source = body.get("source", "") |
| 77 | + raw = body.get("raw", body) |
| 78 | + if source == "palisade-scanner": |
| 79 | + event = palisade_finding_to_taxonomy(raw) |
| 80 | + elif source == "mcpguard": |
| 81 | + event = mcpguard_event_to_taxonomy(raw) |
| 82 | + elif source == "mcpwn": |
| 83 | + event = mcpwn_finding_to_taxonomy(raw) |
| 84 | + elif source == "agentgate": |
| 85 | + event = agentgate_signal_to_taxonomy( |
| 86 | + signal_type=raw.get("signal_type", ""), |
| 87 | + weight=raw.get("weight", 0), |
| 88 | + action=raw.get("action", ""), |
| 89 | + path=raw.get("path", ""), |
| 90 | + user_agent=raw.get("userAgent", raw.get("user_agent", "")), |
| 91 | + score=raw.get("score", 0), |
| 92 | + ) |
| 93 | + else: |
| 94 | + return JSONResponse({"error": f"Unknown source: {source}"}, status_code=400) |
| 95 | + return JSONResponse({ |
| 96 | + "source": event.source, |
| 97 | + "attack_category": event.attack_category.value, |
| 98 | + "severity": event.severity.value, |
| 99 | + "confidence": event.confidence.value, |
| 100 | + "detection_method": event.detection_method.value if isinstance(event.detection_method, DetectionMethod) else event.detection_method, |
| 101 | + "title": event.title, |
| 102 | + "description": event.description, |
| 103 | + "recommendation": event.recommendation, |
| 104 | + "target": event.target, |
| 105 | + "snippet": event.snippet[:200] if event.snippet else "", |
| 106 | + "blocked": event.blocked, |
| 107 | + "risk_score": event.risk_score, |
| 108 | + }) |
| 109 | + |
| 110 | + |
54 | 111 | def _session_value(password: str, client_ip: str) -> str: |
55 | 112 | h = hmac.new(password.encode(), client_ip.encode(), "sha256") |
56 | 113 | return h.hexdigest() |
|
0 commit comments