Skip to content
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
92 changes: 71 additions & 21 deletions dojo/tools/anchorectl_policies/parser.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ def get_label_for_scan_types(self, scan_type):
return "AnchoreCTL Policies Report"

def get_description_for_scan_types(self, scan_type):
return "AnchoreCTLs JSON policies report format."
return "AnchoreCTLs JSON policies report format. Both legacy list-based format and new evaluation-based format (from anchorectl policy evaluate -o json) are supported."

def get_findings(self, filename, test):
content = filename.read()
Expand All @@ -29,33 +29,81 @@ def get_findings(self, filename, test):
find_date = datetime.now()
items = []

# Handle new AnchoreCTL format (object with evaluations array)
if isinstance(data, dict) and "evaluations" in data:
logger.info("Detected new AnchoreCTL policies format")
processed_data = []
# Process each evaluation in the evaluations array
for evaluation in data.get("evaluations", []):
# Only process evaluations with findings
if evaluation.get("numberOfFindings", 0) > 0 and evaluation.get("details"):
processed_item = {
"detail": [],
"digest": data.get("imageDigest", ""),
"finalAction": evaluation.get("finalAction", ""),
"finalActionReason": evaluation.get("finalActionReason", ""),
"lastEvaluation": evaluation.get("evaluationTime", ""),
"policyId": data.get("policyId", ""),
"status": evaluation.get("status", ""),
"tag": data.get("evaluatedTag", ""),
}

# Process details if they exist
for detail in evaluation.get("details", []):
processed_item["detail"].append(detail)

processed_data.append(processed_item)

data = processed_data

if not isinstance(data, list):
msg = "This doesn't look like a valid Anchore CTRL Policies report: Expected a list with image data at the root of the JSON data"
msg = "This doesn't look like a valid Anchore CTRL Policies report: Expected a list with image data at the root of the JSON data or an object with 'evaluations' array"
raise TypeError(msg)

for image in data:
if not isinstance(image, dict) or image.get("detail") is None or not isinstance(image.get("detail"), list):
msg = "This doesn't look like a valid Anchore CTRL Policies report, missing 'detail' list object key for image"
# Skip empty images
if len(data) == 0:
continue

# Check for valid structure
if not isinstance(image, dict):
msg = "This doesn't look like a valid Anchore CTRL Policies report, expected dict object for image"
raise TypeError(msg)

# Handle legacy format with detail field
if image.get("detail") is not None and isinstance(image.get("detail"), list):
details = image.get("detail")
# Handle newer format that might have details under a different structure
elif image.get("details") is not None and isinstance(image.get("details"), list):
details = image.get("details")
else:
msg = "This doesn't look like a valid Anchore CTRL Policies report, missing 'detail' or 'details' list object key for image"
raise ValueError(msg)

for result in image["detail"]:
# Process each finding detail
for result in details:
try:
gate = result["gate"]
description = result["description"]
policy_id = result["policyId"]
status = result["status"]
image_name = result["tag"]
trigger_id = result["triggerId"]
repo, tag = image_name.split(":", 2)
# Extract fields with fallbacks for different formats
gate = result.get("gate", "unknown")
description = result.get("description", "No description provided")
policy_id = result.get("policyId", image.get("policyId", "unknown"))
status = result.get("status", "unknown")

# Handle image tag from different possible locations
image_name = result.get("tag", image.get("tag", "unknown:latest"))

trigger_id = result.get("triggerId", "unknown")

# Split repo and tag safely
if ":" in image_name:
repo, tag = image_name.split(":", 1)
else:
repo = image_name
tag = "latest"

severity, active = get_severity(status, description)
vulnerability_id = extract_vulnerability_id(trigger_id)
title = (
policy_id
+ " - gate|"
+ gate
+ " - trigger|"
+ trigger_id
)
title = policy_id + " - gate|" + gate + " - trigger|" + trigger_id
find = Finding(
title=title,
test=test,
Expand All @@ -74,8 +122,10 @@ def get_findings(self, filename, test):
find.unsaved_vulnerability_ids = [vulnerability_id]
items.append(find)
except (KeyError, IndexError) as err:
msg = f"Invalid format: {err} key not found"
raise ValueError(msg)
msg = f"Invalid format or missing key: {err}. This parser supports both legacy AnchoreCTL format and the new format from 'anchorectl policy evaluate -o json'."
logger.warning(msg)
# Continue processing other findings instead of failing completely
continue
return items


Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,46 @@
{
"evaluatedTag": "test/testimage:testtag",
"policyId": "9e104ade-7b57-4cdc-93fb-4949bf3b36b6",
"imageDigest": "sha256:8htz0bf942cfcd6hg8cf6435afd318b65d23e4c1a80044304c6e3ed20",
"evaluations": [
{
"evaluationTime": "2022-09-20T08:25:52Z",
"status": "fail",
"finalAction": "stop",
"finalActionReason": "policy_evaluation",
"numberOfFindings": 3,
"details": [
{
"description": "HIGH Vulnerability found in non-os package type (test) - /usr/local/bin/testbinary (CVE-2022-1234)",
"gate": "vulnerabilities",
"imageId": "d26f0119b9634091a541b081dd8bdca435ab52e114e4b4328575c0bc2c69768b",
"policyId": "SoftwareChecks",
"status": "stop",
"tag": "test/testimage:testtag",
"triggerId": "CVE-2022-1234+test",
"triggerName": "package"
},
{
"description": "MEDIUM Vulnerability found in non-os package type (test2) - /usr/local/bin/testbinary (fixed in: 1.2.3)(GHSA-1234-abcd-5678)",
"gate": "vulnerabilities",
"imageId": "d26f0119b9634091a541b081dd8bdca435ab52e114e4b4328575c0bc2c69768b",
"policyId": "SoftwareChecks",
"status": "stop",
"tag": "test/testimage:testtag",
"triggerId": "GHSA-1234-abcd-5678+test2",
"triggerName": "package"
},
{
"description": "User root found as effective user, which is not on the allowed list",
"gate": "dockerfile",
"imageId": "d26f0119b9634091a541b081dd8bdca435ab52e114e4b4328575c0bc2c69768b",
"policyId": "RootUser",
"status": "warn",
"tag": "test/testimage:testtag",
"triggerId": "b2605c2ddbdb02b8e2365c9248dada5a",
"triggerName": "effective_user"
}
]
}
]
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
{
"evaluatedTag": "test/testimage:testtag",
"policyId": "9e104ade-7b57-4cdc-93fb-4949bf3b36b6",
"imageDigest": "sha256:8htz0bf942cfcd6hg8cf6435afd318b65d23e4c1a80044304c6e3ed20",
"evaluations": [
{
"evaluationTime": "2022-09-20T08:25:52Z",
"status": "pass",
"finalAction": "go",
"finalActionReason": "policy_evaluation",
"numberOfFindings": 0,
"details": []
}
]
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
{
"evaluatedTag": "test/testimage:testtag",
"policyId": "9e104ade-7b57-4cdc-93fb-4949bf3b36b6",
"imageDigest": "sha256:8htz0bf942cfcd6hg8cf6435afd318b65d23e4c1a80044304c6e3ed20",
"evaluations": [
{
"evaluationTime": "2022-09-20T08:25:52Z",
"status": "fail",
"finalAction": "stop",
"finalActionReason": "policy_evaluation",
"numberOfFindings": 1,
"details": [
{
"description": "User root found as effective user, which is not on the allowed list",
"gate": "dockerfile",
"imageId": "d26f0119b9634091a541b081dd8bdca435ab52e114e4b4328575c0bc2c69768b",
"policyId": "RootUser",
"status": "warn",
"tag": "test/testimage:testtag",
"triggerId": "b2605c2ddbdb02b8e2365c9248dada5a",
"triggerName": "effective_user"
}
]
}
]
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
{
"evaluatedTag": "test/testimage:testtag",
"policyId": "9e104ade-7b57-4cdc-93fb-4949bf3b36b6",
"imageDigest": "sha256:8htz0bf942cfcd6hg8cf6435afd318b65d23e4c1a80044304c6e3ed20",
"evaluations": [
{
"evaluationTime": "2022-09-20T08:25:52Z",
"status": "fail",
"finalAction": "stop",
"finalActionReason": "policy_evaluation",
"numberOfFindings": 1,
"details": [
{
"description": "CRITICAL User root found as effective user, which is not on the allowed list",
"gate": "dockerfile",
"imageId": "d26f0119b9634091a541b081dd8bdca435ab52e114e4b4328575c0bc2c69768b",
"policyId": "RootUser",
"status": "warn",
"tag": "test/testimage:testtag",
"triggerId": "b2605c2ddbdb02b8e2365c9248dada5a",
"triggerName": "effective_user"
}
]
}
]
}
33 changes: 33 additions & 0 deletions unittests/tools/test_anchorectl_policies_parser.py
Original file line number Diff line number Diff line change
Expand Up @@ -35,3 +35,36 @@ def test_anchore_engine_parser_has_one_finding_and_description_has_severity(self
self.assertEqual(singleFinding.severity, "Critical")
self.assertEqual(singleFinding.title, "RootUser - gate|dockerfile - trigger|b2605c2ddbdb02b8e2365c9248dada5a")
self.assertEqual(singleFinding.description, "CRITICAL User root found as effective user, which is not on the allowed list")

# Tests for the new AnchoreCTL format
def test_new_format_anchore_engine_parser_has_no_finding(self):
with (get_unit_tests_scans_path("anchorectl_policies") / "new_format_no_violation.json").open(encoding="utf-8") as testfile:
parser = AnchoreCTLPoliciesParser()
findings = parser.get_findings(testfile, Test())
self.assertEqual(0, len(findings))

def test_new_format_anchore_engine_parser_has_one_finding_and_it_is_correctly_parsed(self):
with (get_unit_tests_scans_path("anchorectl_policies") / "new_format_one_violation.json").open(encoding="utf-8") as testfile:
parser = AnchoreCTLPoliciesParser()
findings = parser.get_findings(testfile, Test())
self.assertEqual(1, len(findings))
singleFinding = findings[0]
self.assertEqual(singleFinding.severity, "Medium")
self.assertEqual(singleFinding.title, "RootUser - gate|dockerfile - trigger|b2605c2ddbdb02b8e2365c9248dada5a")
self.assertEqual(singleFinding.description, "User root found as effective user, which is not on the allowed list")

def test_new_format_anchore_engine_parser_has_many_findings(self):
with (get_unit_tests_scans_path("anchorectl_policies") / "new_format_many_violations.json").open(encoding="utf-8") as testfile:
parser = AnchoreCTLPoliciesParser()
findings = parser.get_findings(testfile, Test())
self.assertEqual(3, len(findings))

def test_new_format_anchore_engine_parser_has_one_finding_and_description_has_severity(self):
with (get_unit_tests_scans_path("anchorectl_policies") / "new_format_one_violation_description_severity.json").open(encoding="utf-8") as testfile:
parser = AnchoreCTLPoliciesParser()
findings = parser.get_findings(testfile, Test())
self.assertEqual(1, len(findings))
singleFinding = findings[0]
self.assertEqual(singleFinding.severity, "Critical")
self.assertEqual(singleFinding.title, "RootUser - gate|dockerfile - trigger|b2605c2ddbdb02b8e2365c9248dada5a")
self.assertEqual(singleFinding.description, "CRITICAL User root found as effective user, which is not on the allowed list")