diff --git a/bbot_server/modules/findings/findings_api.py b/bbot_server/modules/findings/findings_api.py index d92b6e5d..58daa195 100644 --- a/bbot_server/modules/findings/findings_api.py +++ b/bbot_server/modules/findings/findings_api.py @@ -1,10 +1,20 @@ from fastapi import Query -from typing import Annotated, Literal, Optional +from typing import Annotated, Optional from bbot_server.assets import CustomAssetFields from bbot_server.applets.base import BaseApplet, api_endpoint from bbot_server.modules.findings.findings_models import Finding, SEVERITY_COLORS, SeverityScore, FindingsQuery +# Max CVSS score for each severity band (top of range). +# Used to derive a default risk score from finding_max_severity. +SEVERITY_TO_CVSS = { + "INFO": 0.0, + "LOW": 0.1, + "MEDIUM": 4.0, + "HIGH": 7.0, + "CRITICAL": 9.0, +} + # add 'findings' field to the main asset model class FindingFields(CustomAssetFields): @@ -12,15 +22,13 @@ class FindingFields(CustomAssetFields): finding_severities: Annotated[dict[str, int], "indexed"] = {} finding_max_severity: Annotated[Optional[str], "indexed"] = None finding_max_severity_score: Annotated[int, "indexed"] = 0 - # Effective risk level for this asset. Uses the same enum as severity: - # "INFO", "LOW", "MEDIUM", "HIGH", "CRITICAL", or None. - # Auto-synced from finding_max_severity unless risk_override is True. - # IMPORTANT: Every code path that updates finding_max_severity must also - # update risk (when risk_override is False) to keep these fields in sync. - risk: Annotated[Optional[Literal["INFO", "LOW", "MEDIUM", "HIGH", "CRITICAL"]], "indexed"] = None + # Effective risk score for this asset: None or a float from 0.0 to 10.0 + # (1 decimal place). Auto-synced from finding_max_severity (using CVSS + # thresholds) unless risk_override is True. + risk: Annotated[Optional[float], "indexed"] = None # Whether risk has been manually overridden. When True, new findings # will NOT auto-update risk. Clearing the override resets this to False - # and reverts risk to finding_max_severity. + # and reverts risk to the CVSS-derived value. risk_override: Annotated[bool, "indexed"] = False @@ -144,45 +152,59 @@ async def severity_counts( findings = dict(sorted(findings.items(), key=lambda x: x[1], reverse=True)) return findings - @api_endpoint("/set_risk", methods=["PATCH"], summary="Set or clear a manual risk override for an asset") + @api_endpoint("/set_risk", methods=["PATCH"], summary="Set or clear a manual risk score for an asset") async def set_risk( self, host: Annotated[str, Query(description="The host of the asset to update")], risk: Annotated[ - Optional[str], + Optional[float], Query( - description="Risk level: INFO, LOW, MEDIUM, HIGH, or CRITICAL. Omit or set to null to clear the override and revert to finding_max_severity." + description=( + "Risk score from 0.0 to 10.0 (1 decimal place). " + "Omit to clear the override and revert to the auto-calculated CVSS value." + ) ), ] = None, + override_none: Annotated[ + bool, + Query( + description=( + "Set to true to explicitly override risk to None (no risk score). " + "Takes precedence over the risk parameter." + ) + ), + ] = False, ) -> dict: """ - Manually override an asset's risk level, or clear the override to revert - to the auto-calculated value (finding_max_severity). + Manually set or clear an asset's risk score. - IMPORTANT: When risk is set to a value different from finding_max_severity, - it is considered manually overridden and will not be auto-updated by new findings. - Clearing it (risk=null) reverts to finding_max_severity. + Three modes: + - risk= → override risk to the given value (0.0–10.0, 1 decimal). + - override_none=true → override risk to None (e.g. "no risk score"). + - (omit both) → clear the override and revert to the CVSS-derived + value from finding_max_severity. """ asset = await self.root._get_asset(host=host, fields=["finding_max_severity"]) if not asset: raise self.BBOTServerNotFoundError(f"Asset {host} not found") - if risk is not None: - # Validate the risk value against known severity levels - risk = risk.upper() - SeverityScore.to_score(risk) # raises if invalid - # IMPORTANT: Both risk and risk_override must be updated together. + if override_none: + # Explicit override to None + update = {"risk": None, "risk_override": True} + description = f"Risk manually set to [bold]None[/bold] on [bold]{host}[/bold]" + elif risk is not None: + # Override to a specific float value + if risk < 0.0 or risk > 10.0: + raise self.BBOTServerValueError("risk must be between 0.0 and 10.0") + risk = round(risk, 1) update = {"risk": risk, "risk_override": True} description = f"Risk manually set to [bold]{risk}[/bold] on [bold]{host}[/bold]" else: - # Clear the override: revert risk to finding_max_severity. - # IMPORTANT: risk_override must be set to False and risk must be - # recalculated from finding_max_severity to restore the invariant. + # Clear the override: revert to CVSS-derived value finding_max_severity = asset.get("finding_max_severity", None) - update = {"risk": finding_max_severity, "risk_override": False} - description = ( - f"Risk override cleared on [bold]{host}[/bold], reverted to [bold]{finding_max_severity}[/bold]" - ) + reverted_risk = SEVERITY_TO_CVSS.get(finding_max_severity) if finding_max_severity else None + update = {"risk": reverted_risk, "risk_override": False} + description = f"Risk override cleared on [bold]{host}[/bold], reverted to [bold]{reverted_risk}[/bold]" await self.root._update_asset(host, update) await self.emit_activity( @@ -291,11 +313,13 @@ async def _insert_or_update_finding(self, finding: Finding, asset, event=None): else: asset.finding_max_severity_score = 0 asset.finding_max_severity = None - # IMPORTANT: Only sync risk when not manually overridden. - # When risk_override is True, risk is user-controlled and must not be touched. + # Auto-sync risk from finding_max_severity when not manually overridden. old_risk = getattr(asset, "risk", None) if not getattr(asset, "risk_override", False): - asset.risk = asset.finding_max_severity + if asset.finding_max_severity is not None: + asset.risk = SEVERITY_TO_CVSS[asset.finding_max_severity] + else: + asset.risk = None # insert the new vulnerability await self.root._insert_asset(finding.model_dump()) diff --git a/tests/test_applets/test_applet_findings.py b/tests/test_applets/test_applet_findings.py index 95b41898..7cc4dcdc 100644 --- a/tests/test_applets/test_applet_findings.py +++ b/tests/test_applets/test_applet_findings.py @@ -28,14 +28,42 @@ async def after_scan_1(self): assert {f.confidence for f in findings} == {"UNKNOWN"} assert {f.confidence_score for f in findings} == {1} - # risk should auto-sync from finding_max_severity + # risk should auto-sync from finding_max_severity via CVSS: HIGH -> 7.0 www_asset = await self.bbot_server.get_asset(host="www.evilcorp.com") - assert www_asset.risk == "HIGH" + assert www_asset.risk == 7.0 assert www_asset.risk_override == False www2_asset = await self.bbot_server.get_asset(host="www2.evilcorp.com") - assert www2_asset.risk == "HIGH" + assert www2_asset.risk == 7.0 assert www2_asset.risk_override == False + # api.evilcorp.com has no findings yet → risk should be None + api_asset = await self.bbot_server.get_asset(host="api.evilcorp.com") + assert api_asset.risk is None + assert api_asset.risk_override == False + + # set risk on asset with no findings, then clear → should revert to None + result = await self.bbot_server.set_risk(host="api.evilcorp.com", risk=5.0) + assert result["risk"] == 5.0 + assert result["risk_override"] == True + result = await self.bbot_server.set_risk(host="api.evilcorp.com") + assert result["risk"] is None + assert result["risk_override"] == False + api_asset = await self.bbot_server.get_asset(host="api.evilcorp.com") + assert api_asset.risk is None + assert api_asset.risk_override == False + + # override risk to None on asset with no findings (explicit "no risk score") + result = await self.bbot_server.set_risk(host="api.evilcorp.com", override_none=True) + assert result["risk"] is None + assert result["risk_override"] == True + api_asset = await self.bbot_server.get_asset(host="api.evilcorp.com") + assert api_asset.risk is None + assert api_asset.risk_override == True + # clear → should revert to None (no findings = no CVSS value) + result = await self.bbot_server.set_risk(host="api.evilcorp.com") + assert result["risk"] is None + assert result["risk_override"] == False + async def after_scan_2(self): findings = [f async for f in self.bbot_server.list_findings()] assert len(findings) == 4 @@ -166,38 +194,60 @@ async def after_scan_2(self): # --- risk field tests --- - # after scan 2, www2 and api have CRITICAL findings, so risk should auto-update + # after scan 2, www2 and api have CRITICAL findings → CVSS 9.0 www2_asset = await self.bbot_server.get_asset(host="www2.evilcorp.com") - assert www2_asset.risk == "CRITICAL" + assert www2_asset.risk == 9.0 assert www2_asset.risk_override == False api_asset = await self.bbot_server.get_asset(host="api.evilcorp.com") - assert api_asset.risk == "CRITICAL" + assert api_asset.risk == 9.0 assert api_asset.risk_override == False - # www only had HIGH findings from scan 1, risk should still be HIGH + # www only had HIGH findings from scan 1 → CVSS 7.0 www_asset = await self.bbot_server.get_asset(host="www.evilcorp.com") - assert www_asset.risk == "HIGH" + assert www_asset.risk == 7.0 assert www_asset.risk_override == False - # manually override risk on www2 - result = await self.bbot_server.set_risk(host="www2.evilcorp.com", risk="LOW") - assert result["risk"] == "LOW" + # manually set risk on www2 (float 0.0-10.0) + result = await self.bbot_server.set_risk(host="www2.evilcorp.com", risk=7.5) + assert result["risk"] == 7.5 + assert result["risk_override"] == True + www2_asset = await self.bbot_server.get_asset(host="www2.evilcorp.com") + assert www2_asset.risk == 7.5 + assert www2_asset.risk_override == True + + # set risk with extra precision — should round to 1 decimal + result = await self.bbot_server.set_risk(host="www2.evilcorp.com", risk=3.14) + assert result["risk"] == 3.1 + assert result["risk_override"] == True + + # boundary values + result = await self.bbot_server.set_risk(host="www2.evilcorp.com", risk=0.0) + assert result["risk"] == 0.0 + assert result["risk_override"] == True + result = await self.bbot_server.set_risk(host="www2.evilcorp.com", risk=10.0) + assert result["risk"] == 10.0 + assert result["risk_override"] == True + + # override risk to None — explicit "no risk score" + result = await self.bbot_server.set_risk(host="www2.evilcorp.com", override_none=True) + assert result["risk"] is None assert result["risk_override"] == True www2_asset = await self.bbot_server.get_asset(host="www2.evilcorp.com") - assert www2_asset.risk == "LOW" + assert www2_asset.risk is None assert www2_asset.risk_override == True - # clear the override, risk should revert to finding_max_severity + # clear override — should revert to CVSS-derived value (CRITICAL → 9.0) result = await self.bbot_server.set_risk(host="www2.evilcorp.com") - assert result["risk"] == "CRITICAL" + assert result["risk"] == 9.0 assert result["risk_override"] == False www2_asset = await self.bbot_server.get_asset(host="www2.evilcorp.com") - assert www2_asset.risk == "CRITICAL" + assert www2_asset.risk == 9.0 assert www2_asset.risk_override == False - # verify RISK_UPDATED activities were emitted (allow time for async queue processing) - # expected: 2 from scan 1 (www + www2: None->HIGH), - # 2 from scan 2 (www2: HIGH->CRITICAL, api: None->CRITICAL), - # 2 from manual set + clear above + # verify RISK_UPDATED activities were emitted + # expected: 2 from scan 1 auto-sync (www + www2: None->7.0), + # 4 from after_scan_1 manual set_risk (api: set 5.0, clear, set None, clear), + # 2 from scan 2 auto-sync (www2: 7.0->9.0, api: None->9.0), + # 6 from after_scan_2 manual set_risk (7.5, 3.1, 0.0, 10.0, None, clear) await asyncio.sleep(1.0) activities = [a async for a in self.bbot_server.list_activities() if a.type == "RISK_UPDATED"] - assert len(activities) == 6 + assert len(activities) == 14