Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
86 changes: 55 additions & 31 deletions bbot_server/modules/findings/findings_api.py
Original file line number Diff line number Diff line change
@@ -1,26 +1,34 @@
from fastapi import Query
from typing import Annotated, Literal, Optional
from typing import Annotated, Optional

from bbot_server.assets import CustomAssetFields
from bbot_server.applets.base import BaseApplet, api_endpoint
from bbot_server.modules.findings.findings_models import Finding, SEVERITY_COLORS, SeverityScore, FindingsQuery

# Max CVSS score for each severity band (top of range).
# Used to derive a default risk score from finding_max_severity.
SEVERITY_TO_CVSS = {
"INFO": 0.0,
"LOW": 0.1,
"MEDIUM": 4.0,
"HIGH": 7.0,
"CRITICAL": 9.0,
}

Comment thread
ausmaster marked this conversation as resolved.

# add 'findings' field to the main asset model
class FindingFields(CustomAssetFields):
findings: Annotated[list[str], "indexed", "indexed-text"] = []
finding_severities: Annotated[dict[str, int], "indexed"] = {}
finding_max_severity: Annotated[Optional[str], "indexed"] = None
finding_max_severity_score: Annotated[int, "indexed"] = 0
# Effective risk level for this asset. Uses the same enum as severity:
# "INFO", "LOW", "MEDIUM", "HIGH", "CRITICAL", or None.
# Auto-synced from finding_max_severity unless risk_override is True.
# IMPORTANT: Every code path that updates finding_max_severity must also
# update risk (when risk_override is False) to keep these fields in sync.
risk: Annotated[Optional[Literal["INFO", "LOW", "MEDIUM", "HIGH", "CRITICAL"]], "indexed"] = None
# Effective risk score for this asset: None or a float from 0.0 to 10.0
# (1 decimal place). Auto-synced from finding_max_severity (using CVSS
# thresholds) unless risk_override is True.
risk: Annotated[Optional[float], "indexed"] = None
# Whether risk has been manually overridden. When True, new findings
# will NOT auto-update risk. Clearing the override resets this to False
# and reverts risk to finding_max_severity.
# and reverts risk to the CVSS-derived value.
risk_override: Annotated[bool, "indexed"] = False


Expand Down Expand Up @@ -144,45 +152,59 @@ async def severity_counts(
findings = dict(sorted(findings.items(), key=lambda x: x[1], reverse=True))
return findings

@api_endpoint("/set_risk", methods=["PATCH"], summary="Set or clear a manual risk override for an asset")
@api_endpoint("/set_risk", methods=["PATCH"], summary="Set or clear a manual risk score for an asset")
async def set_risk(
self,
host: Annotated[str, Query(description="The host of the asset to update")],
risk: Annotated[
Optional[str],
Optional[float],
Query(
description="Risk level: INFO, LOW, MEDIUM, HIGH, or CRITICAL. Omit or set to null to clear the override and revert to finding_max_severity."
description=(
"Risk score from 0.0 to 10.0 (1 decimal place). "
"Omit to clear the override and revert to the auto-calculated CVSS value."
)
),
] = None,
override_none: Annotated[
bool,
Query(
description=(
"Set to true to explicitly override risk to None (no risk score). "
"Takes precedence over the risk parameter."
)
),
] = False,
) -> dict:
"""
Manually override an asset's risk level, or clear the override to revert
to the auto-calculated value (finding_max_severity).
Manually set or clear an asset's risk score.

IMPORTANT: When risk is set to a value different from finding_max_severity,
it is considered manually overridden and will not be auto-updated by new findings.
Clearing it (risk=null) reverts to finding_max_severity.
Three modes:
- risk=<float> → override risk to the given value (0.0–10.0, 1 decimal).
- override_none=true → override risk to None (e.g. "no risk score").
- (omit both) → clear the override and revert to the CVSS-derived
value from finding_max_severity.
"""
asset = await self.root._get_asset(host=host, fields=["finding_max_severity"])
if not asset:
raise self.BBOTServerNotFoundError(f"Asset {host} not found")

if risk is not None:
# Validate the risk value against known severity levels
risk = risk.upper()
SeverityScore.to_score(risk) # raises if invalid
# IMPORTANT: Both risk and risk_override must be updated together.
if override_none:
# Explicit override to None
update = {"risk": None, "risk_override": True}
description = f"Risk manually set to [bold]None[/bold] on [bold]{host}[/bold]"
elif risk is not None:
# Override to a specific float value
if risk < 0.0 or risk > 10.0:
raise self.BBOTServerValueError("risk must be between 0.0 and 10.0")
risk = round(risk, 1)
update = {"risk": risk, "risk_override": True}
description = f"Risk manually set to [bold]{risk}[/bold] on [bold]{host}[/bold]"
else:
# Clear the override: revert risk to finding_max_severity.
# IMPORTANT: risk_override must be set to False and risk must be
# recalculated from finding_max_severity to restore the invariant.
# Clear the override: revert to CVSS-derived value
finding_max_severity = asset.get("finding_max_severity", None)
update = {"risk": finding_max_severity, "risk_override": False}
description = (
f"Risk override cleared on [bold]{host}[/bold], reverted to [bold]{finding_max_severity}[/bold]"
)
reverted_risk = SEVERITY_TO_CVSS.get(finding_max_severity) if finding_max_severity else None
update = {"risk": reverted_risk, "risk_override": False}
description = f"Risk override cleared on [bold]{host}[/bold], reverted to [bold]{reverted_risk}[/bold]"

await self.root._update_asset(host, update)
await self.emit_activity(
Expand Down Expand Up @@ -291,11 +313,13 @@ async def _insert_or_update_finding(self, finding: Finding, asset, event=None):
else:
asset.finding_max_severity_score = 0
asset.finding_max_severity = None
# IMPORTANT: Only sync risk when not manually overridden.
# When risk_override is True, risk is user-controlled and must not be touched.
# Auto-sync risk from finding_max_severity when not manually overridden.
old_risk = getattr(asset, "risk", None)
if not getattr(asset, "risk_override", False):
asset.risk = asset.finding_max_severity
if asset.finding_max_severity is not None:
asset.risk = SEVERITY_TO_CVSS[asset.finding_max_severity]
else:
asset.risk = None

# insert the new vulnerability
await self.root._insert_asset(finding.model_dump())
Expand Down
90 changes: 70 additions & 20 deletions tests/test_applets/test_applet_findings.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,14 +28,42 @@ async def after_scan_1(self):
assert {f.confidence for f in findings} == {"UNKNOWN"}
assert {f.confidence_score for f in findings} == {1}

# risk should auto-sync from finding_max_severity
# risk should auto-sync from finding_max_severity via CVSS: HIGH -> 7.0
www_asset = await self.bbot_server.get_asset(host="www.evilcorp.com")
assert www_asset.risk == "HIGH"
assert www_asset.risk == 7.0
assert www_asset.risk_override == False
www2_asset = await self.bbot_server.get_asset(host="www2.evilcorp.com")
assert www2_asset.risk == "HIGH"
assert www2_asset.risk == 7.0
assert www2_asset.risk_override == False

# api.evilcorp.com has no findings yet → risk should be None
api_asset = await self.bbot_server.get_asset(host="api.evilcorp.com")
assert api_asset.risk is None
assert api_asset.risk_override == False

# set risk on asset with no findings, then clear → should revert to None
result = await self.bbot_server.set_risk(host="api.evilcorp.com", risk=5.0)
assert result["risk"] == 5.0
assert result["risk_override"] == True
result = await self.bbot_server.set_risk(host="api.evilcorp.com")
assert result["risk"] is None
assert result["risk_override"] == False
api_asset = await self.bbot_server.get_asset(host="api.evilcorp.com")
assert api_asset.risk is None
assert api_asset.risk_override == False

# override risk to None on asset with no findings (explicit "no risk score")
result = await self.bbot_server.set_risk(host="api.evilcorp.com", override_none=True)
assert result["risk"] is None
assert result["risk_override"] == True
api_asset = await self.bbot_server.get_asset(host="api.evilcorp.com")
assert api_asset.risk is None
assert api_asset.risk_override == True
# clear → should revert to None (no findings = no CVSS value)
result = await self.bbot_server.set_risk(host="api.evilcorp.com")
assert result["risk"] is None
assert result["risk_override"] == False

async def after_scan_2(self):
findings = [f async for f in self.bbot_server.list_findings()]
assert len(findings) == 4
Expand Down Expand Up @@ -166,38 +194,60 @@ async def after_scan_2(self):

# --- risk field tests ---

# after scan 2, www2 and api have CRITICAL findings, so risk should auto-update
# after scan 2, www2 and api have CRITICAL findings → CVSS 9.0
www2_asset = await self.bbot_server.get_asset(host="www2.evilcorp.com")
assert www2_asset.risk == "CRITICAL"
assert www2_asset.risk == 9.0
assert www2_asset.risk_override == False
api_asset = await self.bbot_server.get_asset(host="api.evilcorp.com")
assert api_asset.risk == "CRITICAL"
assert api_asset.risk == 9.0
assert api_asset.risk_override == False
# www only had HIGH findings from scan 1, risk should still be HIGH
# www only had HIGH findings from scan 1 → CVSS 7.0
www_asset = await self.bbot_server.get_asset(host="www.evilcorp.com")
assert www_asset.risk == "HIGH"
assert www_asset.risk == 7.0
assert www_asset.risk_override == False

# manually override risk on www2
result = await self.bbot_server.set_risk(host="www2.evilcorp.com", risk="LOW")
assert result["risk"] == "LOW"
# manually set risk on www2 (float 0.0-10.0)
result = await self.bbot_server.set_risk(host="www2.evilcorp.com", risk=7.5)
assert result["risk"] == 7.5
assert result["risk_override"] == True
www2_asset = await self.bbot_server.get_asset(host="www2.evilcorp.com")
assert www2_asset.risk == 7.5
assert www2_asset.risk_override == True

# set risk with extra precision — should round to 1 decimal
result = await self.bbot_server.set_risk(host="www2.evilcorp.com", risk=3.14)
assert result["risk"] == 3.1
assert result["risk_override"] == True

# boundary values
result = await self.bbot_server.set_risk(host="www2.evilcorp.com", risk=0.0)
assert result["risk"] == 0.0
assert result["risk_override"] == True
result = await self.bbot_server.set_risk(host="www2.evilcorp.com", risk=10.0)
assert result["risk"] == 10.0
assert result["risk_override"] == True

# override risk to None — explicit "no risk score"
result = await self.bbot_server.set_risk(host="www2.evilcorp.com", override_none=True)
assert result["risk"] is None
assert result["risk_override"] == True
www2_asset = await self.bbot_server.get_asset(host="www2.evilcorp.com")
assert www2_asset.risk == "LOW"
assert www2_asset.risk is None
assert www2_asset.risk_override == True

# clear the override, risk should revert to finding_max_severity
# clear override should revert to CVSS-derived value (CRITICAL → 9.0)
result = await self.bbot_server.set_risk(host="www2.evilcorp.com")
assert result["risk"] == "CRITICAL"
assert result["risk"] == 9.0
assert result["risk_override"] == False
www2_asset = await self.bbot_server.get_asset(host="www2.evilcorp.com")
assert www2_asset.risk == "CRITICAL"
assert www2_asset.risk == 9.0
assert www2_asset.risk_override == False

# verify RISK_UPDATED activities were emitted (allow time for async queue processing)
# expected: 2 from scan 1 (www + www2: None->HIGH),
# 2 from scan 2 (www2: HIGH->CRITICAL, api: None->CRITICAL),
# 2 from manual set + clear above
# verify RISK_UPDATED activities were emitted
# expected: 2 from scan 1 auto-sync (www + www2: None->7.0),
# 4 from after_scan_1 manual set_risk (api: set 5.0, clear, set None, clear),
# 2 from scan 2 auto-sync (www2: 7.0->9.0, api: None->9.0),
# 6 from after_scan_2 manual set_risk (7.5, 3.1, 0.0, 10.0, None, clear)
await asyncio.sleep(1.0)
activities = [a async for a in self.bbot_server.list_activities() if a.type == "RISK_UPDATED"]
assert len(activities) == 6
assert len(activities) == 14
Loading