Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
92 changes: 70 additions & 22 deletions dojo/tools/deepfence_threatmapper/compliance.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,13 @@

class DeepfenceThreatmapperCompliance:
def get_findings(self, row, headers, test):
description = ""
if "compliance_check_type" in headers and "test_number" in headers:
return self._parse_old_format(row, headers, test)
if "Compliance Standard" in headers and "Control ID" in headers:
return self._parse_new_format(row, headers, test)
return None

def _parse_old_format(self, row, headers, test):
compliance_check_type = row[headers["compliance_check_type"]]
count = row[headers["count"]]
doc_id = row[headers["doc_id"]]
Expand All @@ -18,34 +24,76 @@ def get_findings(self, row, headers, test):
test_desc = row[headers["test_desc"]]
test_info = row[headers["test_info"]]
test_number = row[headers["test_number"]]
description += "**compliance_check_type:** " + str(compliance_check_type) + "\n"
description += "**host_name:** " + str(host_name) + "\n"
description += "**cloud_account_id:** " + str(cloud_account_id) + "\n"
description += "**masked:** " + str(masked) + "\n"
description += "**node_id:** " + str(node_id) + "\n"
description += "**node_name:** " + str(node_name) + "\n"
description += "**node_type:** " + str(node_type) + "\n"
description += "**status:** " + str(status) + "\n"
description += "**test_category:** " + str(test_category) + "\n"
description += "**test_desc:** " + str(test_desc) + "\n"
description += "**test_info:** " + str(test_info) + "\n"
description += "**test_number:** " + str(test_number) + "\n"
description += "**count:** " + str(count) + "\n"
description += "**doc_id:** " + str(doc_id) + "\n"

description = (
f"**Compliance Check Type:** {compliance_check_type}\n"
f"**Host Name:** {host_name}\n"
f"**Cloud Account ID:** {cloud_account_id}\n"
f"**Masked:** {masked}\n"
f"**Node ID:** {node_id}\n"
f"**Node Name:** {node_name}\n"
f"**Node Type:** {node_type}\n"
f"**Status:** {status}\n"
f"**Test Category:** {test_category}\n"
f"**Test Description:** {test_desc}\n"
f"**Test Info:** {test_info}\n"
f"**Test Number:** {test_number}\n"
f"**Count:** {count}\n"
f"**Doc ID:** {doc_id}\n"
)

return Finding(
title=f"Threatmapper_Compliance_Report-{test_number}",
description=description,
severity=self.compliance_severity(status),
static_finding=False,
dynamic_finding=True,
test=test,
)

def _parse_new_format(self, row, headers, test):
compliance_standard = row[headers["Compliance Standard"]]
status = row[headers["Status"]]
category = row[headers["Category"]]
description_text = row[headers["Description"]]
info = row[headers["Info"]]
control_id = row[headers["Control ID"]]
node_name = row[headers["Node Name"]]
node_type = row[headers["Node Type"]]
remediation = row[headers["Remediation"]]
masked = row[headers["Masked"]]

description = (
f"**Compliance Standard:** {compliance_standard}\n"
f"**Status:** {status}\n"
f"**Category:** {category}\n"
f"**Description:** {description_text}\n"
f"**Info:** {info}\n"
f"**Control ID:** {control_id}\n"
f"**Node Name:** {node_name}\n"
f"**Node Type:** {node_type}\n"
f"**Remediation:** {remediation}\n"
f"**Masked:** {masked}\n"
)

return Finding(
title="Threatmapper_Compliance_Report-" + test_number,
title=f"Threatmapper_Compliance_Report-{control_id}",
Comment thread
valentijnscholten marked this conversation as resolved.
description=description,
severity=self.compliance_severity(status),
static_finding=False,
dynamic_finding=True,
mitigation=remediation,
test=test,
)

def compliance_severity(self, severity_input):
if severity_input is None:
return "Info"
severity_input = severity_input.lower()
if severity_input in {"pass", "info"}:
output = "Info"
elif severity_input == "warn":
output = "Medium"
else:
output = "Info"
return output
return "Info"
if severity_input == "warn":
return "Medium"
if severity_input == "fail":
return "High"
return "Info"
58 changes: 49 additions & 9 deletions dojo/tools/deepfence_threatmapper/malware.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,13 @@

class DeepfenceThreatmapperMalware:
def get_findings(self, row, headers, test):
description = ""
if "Rule Name" in headers and "Class" in headers:
return self._parse_old_format(row, headers, test)
if "Rule Name" in headers and "Node Type" in headers:
return self._parse_new_format(row, headers, test)
return None

def _parse_old_format(self, row, headers, test):
Rule_Name = row[headers["Rule Name"]]
Class = row[headers["Class"]]
File_Name = row[headers["File Name"]]
Expand All @@ -13,14 +19,48 @@ def get_findings(self, row, headers, test):
NodeType = row[headers["NodeType"]]
Container_Name = row[headers["Container Name"]]
Kubernetes_Cluster_Name = row[headers["Kubernetes Cluster Name"]]
description += "**Summary:** " + str(Summary) + "\n"
description += "**Rule Name:** " + str(Rule_Name) + "\n"
description += "**Class:** " + str(Class) + "\n"
description += "**File Name:** " + str(File_Name) + "\n"
description += "**Node Name:** " + str(Node_Name) + "\n"
description += "**NodeType:** " + str(NodeType) + "\n"
description += "**Container Name:** " + str(Container_Name) + "\n"
description += "**Kubernetes Cluster Name:** " + str(Kubernetes_Cluster_Name) + "\n"

description = (
f"**Summary:** {Summary}\n"
f"**Rule Name:** {Rule_Name}\n"
f"**Class:** {Class}\n"
f"**File Name:** {File_Name}\n"
f"**Node Name:** {Node_Name}\n"
f"**NodeType:** {NodeType}\n"
f"**Container Name:** {Container_Name}\n"
f"**Kubernetes Cluster Name:** {Kubernetes_Cluster_Name}\n"
)

return Finding(
title=Rule_Name,
description=description,
file_path=File_Name,
severity=self.severity(Severity),
static_finding=False,
dynamic_finding=True,
test=test,
)

def _parse_new_format(self, row, headers, test):
Rule_Name = row[headers["Rule Name"]]
File_Name = row[headers["File Name"]]
Summary = row[headers["Summary"]]
Severity = row[headers["Severity"]]
Node_Name = row[headers["Node Name"]]
Node_Type = row[headers["Node Type"]]
Kubernetes_Cluster_Name = row[headers["Kubernetes Cluster Name"]]
Masked = row[headers["Masked"]]

description = (
f"**Summary:** {Summary}\n"
f"**Rule Name:** {Rule_Name}\n"
f"**File Name:** {File_Name}\n"
f"**Node Name:** {Node_Name}\n"
f"**Node Type:** {Node_Type}\n"
f"**Kubernetes Cluster Name:** {Kubernetes_Cluster_Name}\n"
f"**Masked:** {Masked}\n"
)

return Finding(
title=Rule_Name,
description=description,
Expand Down
15 changes: 12 additions & 3 deletions dojo/tools/deepfence_threatmapper/parser.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,14 +27,23 @@ def get_findings(self, filename, test):
first = False
for i in range(len(row)):
headers[row[i]] = i
elif headers.get("Rule Name") is not None and headers.get("Class") is not None:
elif (
("Rule Name" in headers and "Class" in headers) or
("Rule Name" in headers and "Node Type" in headers)
):
findings.append(DeepfenceThreatmapperMalware().get_findings(row, headers, test))
elif headers.get("Filename") is not None and headers.get("Content") is not None:
value = DeepfenceThreatmapperSecret().get_findings(row, headers, test)
if value is not None:
findings.append(value)
elif headers.get("@timestamp") is not None and headers.get("cve_attack_vector") is not None:
elif (
("cve_id" in headers and "cve_attack_vector" in headers) or
("CVE ID" in headers and "Attack Vector" in headers)
):
findings.append(DeepfenceThreatmapperVulnerability().get_findings(row, headers, test))
elif headers.get("@timestamp") is not None and headers.get("compliance_check_type") is not None:
elif (
("compliance_check_type" in headers and "test_number" in headers) or
("Compliance Standard" in headers and "Control ID" in headers)
):
findings.append(DeepfenceThreatmapperCompliance().get_findings(row, headers, test))
return findings
77 changes: 57 additions & 20 deletions dojo/tools/deepfence_threatmapper/secret.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,13 @@

class DeepfenceThreatmapperSecret:
def get_findings(self, row, headers, test):
if "Name" in headers and "Signature" in headers:
return self._parse_old_format(row, headers, test)
if "Content Starting Index" in headers and "Masked" in headers:
return self._parse_new_format(row, headers, test)
return None

def _parse_old_format(self, row, headers, test):
description = ""
Filename = row[headers["Filename"]]
Content = row[headers["Content"]]
Expand All @@ -13,27 +20,57 @@ def get_findings(self, row, headers, test):
Container_Name = row[headers["Container Name"]]
Kubernetes_Cluster_Name = row[headers["Kubernetes Cluster Name"]]
Signature = row[headers["Signature"]]
description += "**Filename:** " + str(Filename) + "\n"
description += "**Name:** " + str(Name) + "\n"
description += "**Rule:** " + str(Rule) + "\n"
description += "**Node Name:** " + str(Node_Name) + "\n"
description += "**Container Name:** " + str(Container_Name) + "\n"
description += "**Kubernetes Cluster Name:** " + str(Kubernetes_Cluster_Name) + "\n"
description += "**Content:** " + str(Content) + "\n"
description += "**Signature:** " + str(Signature) + "\n"
if Name is not None and Severity is not None:
finding = Finding(
title=str(Name),
description=description,
file_path=Filename,
severity=self.severity(Severity),
static_finding=False,
dynamic_finding=True,
test=test,
description += f"**Filename:** {Filename}\n"
description += f"**Name:** {Name}\n"
description += f"**Rule:** {Rule}\n"
description += f"**Node Name:** {Node_Name}\n"
description += f"**Container Name:** {Container_Name}\n"
description += f"**Kubernetes Cluster Name:** {Kubernetes_Cluster_Name}\n"
description += f"**Content:** {Content}\n"
description += f"**Signature:** {Signature}\n"
if Name and Severity:
return Finding(
title=str(Name),
description=description,
file_path=Filename,
severity=self.severity(Severity),
static_finding=False,
dynamic_finding=True,
test=test,
)
return None

def _parse_new_format(self, row, headers, test):
description = ""
Filename = row[headers["Filename"]]
Content = row[headers["Content"]]
Rule = row[headers["Rule"]]
Severity = row[headers["Severity"]]
Content_Starting_Index = row[headers["Content Starting Index"]]
Node_Name = row[headers["Node Name"]]
Node_Type = row[headers["Node Type"]]
Kubernetes_Cluster_Name = row[headers["Kubernetes Cluster Name"]]
Masked = row[headers["Masked"]]
description += f"**Filename:** {Filename}\n"
description += f"**Rule:** {Rule}\n"
description += f"**Node Name:** {Node_Name}\n"
description += f"**Node Type:** {Node_Type}\n"
description += f"**Kubernetes Cluster Name:** {Kubernetes_Cluster_Name}\n"
description += f"**Content:** {Content}\n"
description += f"**Content Starting Index:** {Content_Starting_Index}\n"
description += f"**Masked:** {Masked}\n"
title = f"{Rule} in {Filename}" if Rule else "Secret Finding"
if Severity:
return Finding(
title=title,
description=description,
file_path=Filename,
severity=self.severity(Severity),
static_finding=False,
dynamic_finding=True,
test=test,
)
else:
finding = None
return finding
return None

def severity(self, severity_input):
if severity_input is None:
Expand Down
Loading