diff --git a/dojo/tools/deepfence_threatmapper/compliance.py b/dojo/tools/deepfence_threatmapper/compliance.py index 32b24cde2c4..36b71a4d796 100644 --- a/dojo/tools/deepfence_threatmapper/compliance.py +++ b/dojo/tools/deepfence_threatmapper/compliance.py @@ -3,7 +3,13 @@ class DeepfenceThreatmapperCompliance: def get_findings(self, row, headers, test): - description = "" + if "compliance_check_type" in headers and "test_number" in headers: + return self._parse_old_format(row, headers, test) + if "Compliance Standard" in headers and "Control ID" in headers: + return self._parse_new_format(row, headers, test) + return None + + def _parse_old_format(self, row, headers, test): compliance_check_type = row[headers["compliance_check_type"]] count = row[headers["count"]] doc_id = row[headers["doc_id"]] @@ -18,34 +24,76 @@ def get_findings(self, row, headers, test): test_desc = row[headers["test_desc"]] test_info = row[headers["test_info"]] test_number = row[headers["test_number"]] - description += "**compliance_check_type:** " + str(compliance_check_type) + "\n" - description += "**host_name:** " + str(host_name) + "\n" - description += "**cloud_account_id:** " + str(cloud_account_id) + "\n" - description += "**masked:** " + str(masked) + "\n" - description += "**node_id:** " + str(node_id) + "\n" - description += "**node_name:** " + str(node_name) + "\n" - description += "**node_type:** " + str(node_type) + "\n" - description += "**status:** " + str(status) + "\n" - description += "**test_category:** " + str(test_category) + "\n" - description += "**test_desc:** " + str(test_desc) + "\n" - description += "**test_info:** " + str(test_info) + "\n" - description += "**test_number:** " + str(test_number) + "\n" - description += "**count:** " + str(count) + "\n" - description += "**doc_id:** " + str(doc_id) + "\n" + + description = ( + f"**Compliance Check Type:** {compliance_check_type}\n" + f"**Host Name:** {host_name}\n" + f"**Cloud Account ID:** {cloud_account_id}\n" + f"**Masked:** {masked}\n" + f"**Node ID:** {node_id}\n" + f"**Node Name:** {node_name}\n" + f"**Node Type:** {node_type}\n" + f"**Status:** {status}\n" + f"**Test Category:** {test_category}\n" + f"**Test Description:** {test_desc}\n" + f"**Test Info:** {test_info}\n" + f"**Test Number:** {test_number}\n" + f"**Count:** {count}\n" + f"**Doc ID:** {doc_id}\n" + ) + + return Finding( + title=f"Threatmapper_Compliance_Report-{test_number}", + description=description, + severity=self.compliance_severity(status), + static_finding=False, + dynamic_finding=True, + test=test, + ) + + def _parse_new_format(self, row, headers, test): + compliance_standard = row[headers["Compliance Standard"]] + status = row[headers["Status"]] + category = row[headers["Category"]] + description_text = row[headers["Description"]] + info = row[headers["Info"]] + control_id = row[headers["Control ID"]] + node_name = row[headers["Node Name"]] + node_type = row[headers["Node Type"]] + remediation = row[headers["Remediation"]] + masked = row[headers["Masked"]] + + description = ( + f"**Compliance Standard:** {compliance_standard}\n" + f"**Status:** {status}\n" + f"**Category:** {category}\n" + f"**Description:** {description_text}\n" + f"**Info:** {info}\n" + f"**Control ID:** {control_id}\n" + f"**Node Name:** {node_name}\n" + f"**Node Type:** {node_type}\n" + f"**Remediation:** {remediation}\n" + f"**Masked:** {masked}\n" + ) + return Finding( - title="Threatmapper_Compliance_Report-" + test_number, + title=f"Threatmapper_Compliance_Report-{control_id}", description=description, severity=self.compliance_severity(status), static_finding=False, dynamic_finding=True, + mitigation=remediation, test=test, ) def compliance_severity(self, severity_input): + if severity_input is None: + return "Info" + severity_input = severity_input.lower() if severity_input in {"pass", "info"}: - output = "Info" - elif severity_input == "warn": - output = "Medium" - else: - output = "Info" - return output + return "Info" + if severity_input == "warn": + return "Medium" + if severity_input == "fail": + return "High" + return "Info" diff --git a/dojo/tools/deepfence_threatmapper/malware.py b/dojo/tools/deepfence_threatmapper/malware.py index a1defd18401..3fa0a230920 100644 --- a/dojo/tools/deepfence_threatmapper/malware.py +++ b/dojo/tools/deepfence_threatmapper/malware.py @@ -3,7 +3,13 @@ class DeepfenceThreatmapperMalware: def get_findings(self, row, headers, test): - description = "" + if "Rule Name" in headers and "Class" in headers: + return self._parse_old_format(row, headers, test) + if "Rule Name" in headers and "Node Type" in headers: + return self._parse_new_format(row, headers, test) + return None + + def _parse_old_format(self, row, headers, test): Rule_Name = row[headers["Rule Name"]] Class = row[headers["Class"]] File_Name = row[headers["File Name"]] @@ -13,14 +19,48 @@ def get_findings(self, row, headers, test): NodeType = row[headers["NodeType"]] Container_Name = row[headers["Container Name"]] Kubernetes_Cluster_Name = row[headers["Kubernetes Cluster Name"]] - description += "**Summary:** " + str(Summary) + "\n" - description += "**Rule Name:** " + str(Rule_Name) + "\n" - description += "**Class:** " + str(Class) + "\n" - description += "**File Name:** " + str(File_Name) + "\n" - description += "**Node Name:** " + str(Node_Name) + "\n" - description += "**NodeType:** " + str(NodeType) + "\n" - description += "**Container Name:** " + str(Container_Name) + "\n" - description += "**Kubernetes Cluster Name:** " + str(Kubernetes_Cluster_Name) + "\n" + + description = ( + f"**Summary:** {Summary}\n" + f"**Rule Name:** {Rule_Name}\n" + f"**Class:** {Class}\n" + f"**File Name:** {File_Name}\n" + f"**Node Name:** {Node_Name}\n" + f"**NodeType:** {NodeType}\n" + f"**Container Name:** {Container_Name}\n" + f"**Kubernetes Cluster Name:** {Kubernetes_Cluster_Name}\n" + ) + + return Finding( + title=Rule_Name, + description=description, + file_path=File_Name, + severity=self.severity(Severity), + static_finding=False, + dynamic_finding=True, + test=test, + ) + + def _parse_new_format(self, row, headers, test): + Rule_Name = row[headers["Rule Name"]] + File_Name = row[headers["File Name"]] + Summary = row[headers["Summary"]] + Severity = row[headers["Severity"]] + Node_Name = row[headers["Node Name"]] + Node_Type = row[headers["Node Type"]] + Kubernetes_Cluster_Name = row[headers["Kubernetes Cluster Name"]] + Masked = row[headers["Masked"]] + + description = ( + f"**Summary:** {Summary}\n" + f"**Rule Name:** {Rule_Name}\n" + f"**File Name:** {File_Name}\n" + f"**Node Name:** {Node_Name}\n" + f"**Node Type:** {Node_Type}\n" + f"**Kubernetes Cluster Name:** {Kubernetes_Cluster_Name}\n" + f"**Masked:** {Masked}\n" + ) + return Finding( title=Rule_Name, description=description, diff --git a/dojo/tools/deepfence_threatmapper/parser.py b/dojo/tools/deepfence_threatmapper/parser.py index 3f5fd2a5a18..2b95a385f09 100644 --- a/dojo/tools/deepfence_threatmapper/parser.py +++ b/dojo/tools/deepfence_threatmapper/parser.py @@ -27,14 +27,23 @@ def get_findings(self, filename, test): first = False for i in range(len(row)): headers[row[i]] = i - elif headers.get("Rule Name") is not None and headers.get("Class") is not None: + elif ( + ("Rule Name" in headers and "Class" in headers) or + ("Rule Name" in headers and "Node Type" in headers) + ): findings.append(DeepfenceThreatmapperMalware().get_findings(row, headers, test)) elif headers.get("Filename") is not None and headers.get("Content") is not None: value = DeepfenceThreatmapperSecret().get_findings(row, headers, test) if value is not None: findings.append(value) - elif headers.get("@timestamp") is not None and headers.get("cve_attack_vector") is not None: + elif ( + ("cve_id" in headers and "cve_attack_vector" in headers) or + ("CVE ID" in headers and "Attack Vector" in headers) + ): findings.append(DeepfenceThreatmapperVulnerability().get_findings(row, headers, test)) - elif headers.get("@timestamp") is not None and headers.get("compliance_check_type") is not None: + elif ( + ("compliance_check_type" in headers and "test_number" in headers) or + ("Compliance Standard" in headers and "Control ID" in headers) + ): findings.append(DeepfenceThreatmapperCompliance().get_findings(row, headers, test)) return findings diff --git a/dojo/tools/deepfence_threatmapper/secret.py b/dojo/tools/deepfence_threatmapper/secret.py index 3d9f2584149..1915e4be694 100644 --- a/dojo/tools/deepfence_threatmapper/secret.py +++ b/dojo/tools/deepfence_threatmapper/secret.py @@ -3,6 +3,13 @@ class DeepfenceThreatmapperSecret: def get_findings(self, row, headers, test): + if "Name" in headers and "Signature" in headers: + return self._parse_old_format(row, headers, test) + if "Content Starting Index" in headers and "Masked" in headers: + return self._parse_new_format(row, headers, test) + return None + + def _parse_old_format(self, row, headers, test): description = "" Filename = row[headers["Filename"]] Content = row[headers["Content"]] @@ -13,27 +20,57 @@ def get_findings(self, row, headers, test): Container_Name = row[headers["Container Name"]] Kubernetes_Cluster_Name = row[headers["Kubernetes Cluster Name"]] Signature = row[headers["Signature"]] - description += "**Filename:** " + str(Filename) + "\n" - description += "**Name:** " + str(Name) + "\n" - description += "**Rule:** " + str(Rule) + "\n" - description += "**Node Name:** " + str(Node_Name) + "\n" - description += "**Container Name:** " + str(Container_Name) + "\n" - description += "**Kubernetes Cluster Name:** " + str(Kubernetes_Cluster_Name) + "\n" - description += "**Content:** " + str(Content) + "\n" - description += "**Signature:** " + str(Signature) + "\n" - if Name is not None and Severity is not None: - finding = Finding( - title=str(Name), - description=description, - file_path=Filename, - severity=self.severity(Severity), - static_finding=False, - dynamic_finding=True, - test=test, + description += f"**Filename:** {Filename}\n" + description += f"**Name:** {Name}\n" + description += f"**Rule:** {Rule}\n" + description += f"**Node Name:** {Node_Name}\n" + description += f"**Container Name:** {Container_Name}\n" + description += f"**Kubernetes Cluster Name:** {Kubernetes_Cluster_Name}\n" + description += f"**Content:** {Content}\n" + description += f"**Signature:** {Signature}\n" + if Name and Severity: + return Finding( + title=str(Name), + description=description, + file_path=Filename, + severity=self.severity(Severity), + static_finding=False, + dynamic_finding=True, + test=test, + ) + return None + + def _parse_new_format(self, row, headers, test): + description = "" + Filename = row[headers["Filename"]] + Content = row[headers["Content"]] + Rule = row[headers["Rule"]] + Severity = row[headers["Severity"]] + Content_Starting_Index = row[headers["Content Starting Index"]] + Node_Name = row[headers["Node Name"]] + Node_Type = row[headers["Node Type"]] + Kubernetes_Cluster_Name = row[headers["Kubernetes Cluster Name"]] + Masked = row[headers["Masked"]] + description += f"**Filename:** {Filename}\n" + description += f"**Rule:** {Rule}\n" + description += f"**Node Name:** {Node_Name}\n" + description += f"**Node Type:** {Node_Type}\n" + description += f"**Kubernetes Cluster Name:** {Kubernetes_Cluster_Name}\n" + description += f"**Content:** {Content}\n" + description += f"**Content Starting Index:** {Content_Starting_Index}\n" + description += f"**Masked:** {Masked}\n" + title = f"{Rule} in {Filename}" if Rule else "Secret Finding" + if Severity: + return Finding( + title=title, + description=description, + file_path=Filename, + severity=self.severity(Severity), + static_finding=False, + dynamic_finding=True, + test=test, ) - else: - finding = None - return finding + return None def severity(self, severity_input): if severity_input is None: diff --git a/dojo/tools/deepfence_threatmapper/vulnerability.py b/dojo/tools/deepfence_threatmapper/vulnerability.py index 3539518177b..69a01850e7a 100644 --- a/dojo/tools/deepfence_threatmapper/vulnerability.py +++ b/dojo/tools/deepfence_threatmapper/vulnerability.py @@ -3,7 +3,13 @@ class DeepfenceThreatmapperVulnerability: def get_findings(self, row, headers, test): - description = "" + if "cve_id" in headers and "cve_attack_vector" in headers: + return self._parse_old_format(row, headers, test) + if "CVE ID" in headers and "Attack Vector" in headers: + return self._parse_new_format(row, headers, test) + return None + + def _parse_old_format(self, row, headers, test): cve_attack_vector = row[headers["cve_attack_vector"]] cve_caused_by_package = row[headers["cve_caused_by_package"]] cve_container_image = row[headers["cve_container_image"]] @@ -18,19 +24,23 @@ def get_findings(self, row, headers, test): host_name = row[headers["host_name"]] cloud_account_id = row[headers["cloud_account_id"]] masked = row[headers["masked"]] - description += "**cve_attack_vector:** " + str(cve_attack_vector) + "\n" - description += "**cve_caused_by_package:** " + str(cve_caused_by_package) + "\n" - description += "**cve_container_image:** " + str(cve_container_image) + "\n" - description += "**cve_container_image_id:** " + str(cve_container_image_id) + "\n" - description += "**cve_description:** " + str(cve_description) + "\n" - description += "**cve_severity:** " + str(cve_severity) + "\n" - description += "**cve_overall_score:** " + str(cve_overall_score) + "\n" - description += "**cve_type:** " + str(cve_type) + "\n" - description += "**host_name:** " + str(host_name) + "\n" - description += "**cloud_account_id:** " + str(cloud_account_id) + "\n" - description += "**masked:** " + str(masked) + "\n" + + description = ( + f"**Attack Vector:** {cve_attack_vector}\n" + f"**Caused By Package:** {cve_caused_by_package}\n" + f"**Container Image:** {cve_container_image}\n" + f"**Container Image ID:** {cve_container_image_id}\n" + f"**Description:** {cve_description}\n" + f"**Severity:** {cve_severity}\n" + f"**Overall Score:** {cve_overall_score}\n" + f"**Type:** {cve_type}\n" + f"**Host Name:** {host_name}\n" + f"**Cloud Account ID:** {cloud_account_id}\n" + f"**Masked:** {masked}\n" + ) + return Finding( - title="Threatmapper_Vuln_Report-" + cve_id, + title=f"Threatmapper_Vuln_Report-{cve_id}", description=description, component_name=cve_caused_by_package, severity=self.severity(cve_severity), @@ -42,6 +52,54 @@ def get_findings(self, row, headers, test): test=test, ) + def _parse_new_format(self, row, headers, test): + cve_id = row[headers["CVE ID"]] + severity = row[headers["Severity"]] + attack_vector = row[headers["Attack Vector"]] + caused_by_package = row[headers["Caused By Package"]] + caused_by_package_path = row[headers["Caused By Package Path"]] + cvss_score = row[headers["CVSS Score"]] + description_text = row[headers["Description"]] + fixed_in = row[headers["Fixed In"]] + link = row[headers["Link"]] + overall_score = row[headers["Overall Score"]] + cve_type = row[headers["Type"]] + node_name = row[headers["Node Name"]] + node_type = row[headers["Node Type"]] + cluster_name = row[headers["Kubernetes Cluster Name"]] + masked = row[headers["Masked"]] + + description = ( + f"**CVE ID:** {cve_id}\n" + f"**Severity:** {severity}\n" + f"**Attack Vector:** {attack_vector}\n" + f"**Caused By Package:** {caused_by_package}\n" + f"**Caused By Package Path:** {caused_by_package_path}\n" + f"**CVSS Score:** {cvss_score}\n" + f"**Description:** {description_text}\n" + f"**Fixed In:** {fixed_in}\n" + f"**Link:** {link}\n" + f"**Overall Score:** {overall_score}\n" + f"**Type:** {cve_type}\n" + f"**Node Name:** {node_name}\n" + f"**Node Type:** {node_type}\n" + f"**Kubernetes Cluster Name:** {cluster_name}\n" + f"**Masked:** {masked}\n" + ) + + return Finding( + title=f"Threatmapper_Vuln_Report-{cve_id}", + description=description, + component_name=caused_by_package, + severity=self.severity(severity), + static_finding=False, + dynamic_finding=True, + mitigation=fixed_in, + references=link, + cve=cve_id, + test=test, + ) + def severity(self, severity_input): if severity_input is None: return "Info" diff --git a/unittests/scans/deepfence_threatmapper/compliance_report_newformat.xlsx b/unittests/scans/deepfence_threatmapper/compliance_report_newformat.xlsx new file mode 100644 index 00000000000..b0addbd103a Binary files /dev/null and b/unittests/scans/deepfence_threatmapper/compliance_report_newformat.xlsx differ diff --git a/unittests/scans/deepfence_threatmapper/malware_report_newformat.xlsx b/unittests/scans/deepfence_threatmapper/malware_report_newformat.xlsx new file mode 100644 index 00000000000..46dc625a2c6 Binary files /dev/null and b/unittests/scans/deepfence_threatmapper/malware_report_newformat.xlsx differ diff --git a/unittests/scans/deepfence_threatmapper/secret_report_newformat.xlsx b/unittests/scans/deepfence_threatmapper/secret_report_newformat.xlsx new file mode 100644 index 00000000000..5083eb9ce78 Binary files /dev/null and b/unittests/scans/deepfence_threatmapper/secret_report_newformat.xlsx differ diff --git a/unittests/scans/deepfence_threatmapper/vulnerability_report_newformat.xlsx b/unittests/scans/deepfence_threatmapper/vulnerability_report_newformat.xlsx new file mode 100644 index 00000000000..a91204162df Binary files /dev/null and b/unittests/scans/deepfence_threatmapper/vulnerability_report_newformat.xlsx differ diff --git a/unittests/tools/test_deepfence_threatmapper_parser.py b/unittests/tools/test_deepfence_threatmapper_parser.py index 8561920f61a..47cd67a4e0d 100644 --- a/unittests/tools/test_deepfence_threatmapper_parser.py +++ b/unittests/tools/test_deepfence_threatmapper_parser.py @@ -13,6 +13,14 @@ def test_parse_file_compliance_report(self): self.assertEqual(findings[0].title, "Threatmapper_Compliance_Report-gdpr_3.6") self.assertEqual(findings[0].severity, "Info") + def test_parse_file_compliance_report_newformat(self): + with (get_unit_tests_scans_path("deepfence_threatmapper") / "compliance_report_newformat.xlsx").open("rb") as testfile: + parser = DeepfenceThreatmapperParser() + findings = parser.get_findings(testfile, Test()) + self.assertEqual(66, len(findings)) + self.assertEqual(findings[0].title, "Threatmapper_Compliance_Report-gdpr_3.4") + self.assertEqual(findings[0].severity, "Info") + def test_parse_file_malware_report(self): with (get_unit_tests_scans_path("deepfence_threatmapper") / "malware_report.xlsx").open("rb") as testfile: parser = DeepfenceThreatmapperParser() @@ -22,6 +30,14 @@ def test_parse_file_malware_report(self): self.assertEqual(findings[0].severity, "Low") self.assertEqual(findings[0].file_path, "/tmp/Deepfence/YaraHunter/df_db09257b02e615049e0aecc05be2dc2401735e67db4ab74225df777c62c39753/usr/sbin/mkfs.cramfs") + def test_parse_file_malware_report_newformat(self): + with (get_unit_tests_scans_path("deepfence_threatmapper") / "malware_report_newformat.xlsx").open("rb") as testfile: + parser = DeepfenceThreatmapperParser() + findings = parser.get_findings(testfile, Test()) + self.assertEqual(66, len(findings)) + self.assertEqual(findings[0].title, "spyeye") + self.assertEqual(findings[0].severity, "High") + def test_parse_file_secret_report(self): with (get_unit_tests_scans_path("deepfence_threatmapper") / "secret_report.xlsx").open("rb") as testfile: parser = DeepfenceThreatmapperParser() @@ -31,6 +47,14 @@ def test_parse_file_secret_report(self): self.assertEqual(findings[0].severity, "High") self.assertEqual(findings[0].file_path, "usr/share/doc/curl-8.3.0/TheArtOfHttpScripting.md") + def test_parse_file_secret_report_newformat(self): + with (get_unit_tests_scans_path("deepfence_threatmapper") / "secret_report_newformat.xlsx").open("rb") as testfile: + parser = DeepfenceThreatmapperParser() + findings = parser.get_findings(testfile, Test()) + self.assertEqual(15, len(findings)) + self.assertEqual(findings[0].title, "index-username_and_password_in_uri in /var/lib/host-containerd/io.containerd.snapshotter.v1.overlayfs/snapshots/1/fs/usr/lib64/python2.7/urllib2.py") + self.assertEqual(findings[0].severity, "High") + def test_parse_file_vulnerability_report(self): with (get_unit_tests_scans_path("deepfence_threatmapper") / "vulnerability_report.xlsx").open("rb") as testfile: parser = DeepfenceThreatmapperParser() @@ -40,3 +64,12 @@ def test_parse_file_vulnerability_report(self): self.assertEqual(findings[0].severity, "Low") self.assertEqual(findings[0].mitigation, "2.5-10.amzn2.0.1") self.assertEqual(findings[0].cve, "CVE-2021-36084") + + def test_parse_file_vulnerability_report_newformat(self): + with (get_unit_tests_scans_path("deepfence_threatmapper") / "vulnerability_report_newformat.xlsx").open("rb") as testfile: + parser = DeepfenceThreatmapperParser() + findings = parser.get_findings(testfile, Test()) + self.assertEqual(254, len(findings)) + self.assertEqual(findings[0].title, "Threatmapper_Vuln_Report-CVE-2005-2541") + self.assertEqual(findings[0].severity, "Critical") + self.assertEqual(findings[0].cve, "CVE-2005-2541")