diff --git a/dojo/finding/deduplication.py b/dojo/finding/deduplication.py index 99d416f44f0..900476926a8 100644 --- a/dojo/finding/deduplication.py +++ b/dojo/finding/deduplication.py @@ -217,14 +217,17 @@ def is_deduplication_on_engagement_mismatch(new_finding, to_duplicate_finding): return False -def get_endpoints_as_url(finding): - # Fix for https://github.com/DefectDojo/django-DefectDojo/issues/10215 - # When endpoints lack a protocol (scheme), str(e) returns a string like "10.20.197.218:6379" - # without the "//" prefix. hyperlink.parse() then misinterprets the hostname as the scheme. - # We replicate the behavior from dojo/endpoint/utils.py line 265: prepend "//" if "://" is missing - # to ensure hyperlink.parse() correctly identifies host, port, and path components. +def get_endpoints_as_url(endpoints): + """ + Convert a list of Endpoint objects to parsed hyperlink URLs. + + Fix for https://github.com/DefectDojo/django-DefectDojo/issues/10215 + When endpoints lack a protocol (scheme), str(e) returns a string like "10.20.197.218:6379" + without the "//" prefix. hyperlink.parse() then misinterprets the hostname as the scheme. + We prepend "//" if "://" is missing to ensure correct parsing. + """ urls = [] - for e in finding.endpoints.all(): + for e in endpoints: endpoint_str = str(e) if "://" not in endpoint_str: endpoint_str = "//" + endpoint_str @@ -242,8 +245,9 @@ def are_urls_equal(url1, url2, fields): return True -def finding_locations(finding): - return [ref.location.url for ref in finding.locations.all()] +def finding_locations(location_refs): + """Extract URLs from a list of location references.""" + return [ref.location.url for ref in location_refs] def are_location_urls_equal(url1, url2, fields): @@ -266,8 +270,11 @@ def are_locations_duplicates(new_finding, to_duplicate_finding): return True if settings.V3_FEATURE_LOCATIONS: - list1 = finding_locations(new_finding) - list2 = finding_locations(to_duplicate_finding) + # Use unsaved_locations for unsaved findings (preview mode), saved M2M otherwise + locs1 = new_finding.locations.all() if new_finding.pk else getattr(new_finding, "unsaved_locations", []) + locs2 = to_duplicate_finding.locations.all() if to_duplicate_finding.pk else getattr(to_duplicate_finding, "unsaved_locations", []) + list1 = finding_locations(locs1) + list2 = finding_locations(locs2) deduplicationLogger.debug( f"Starting deduplication by location fields for finding {new_finding.id} with locations {list1} and finding {to_duplicate_finding.id} with locations {list2}", @@ -284,8 +291,11 @@ def are_locations_duplicates(new_finding, to_duplicate_finding): deduplicationLogger.debug(f"locations are not duplicates: {new_finding.id} and {to_duplicate_finding.id}") return False # TODO: Delete this after the move to Locations - list1 = get_endpoints_as_url(new_finding) - list2 = get_endpoints_as_url(to_duplicate_finding) + # Use unsaved_endpoints for unsaved findings (preview mode), saved M2M otherwise + eps1 = new_finding.endpoints.all() if new_finding.pk else getattr(new_finding, "unsaved_endpoints", []) + eps2 = to_duplicate_finding.endpoints.all() if to_duplicate_finding.pk else getattr(to_duplicate_finding, "unsaved_endpoints", []) + list1 = get_endpoints_as_url(eps1) + list2 = get_endpoints_as_url(eps2) deduplicationLogger.debug( f"Starting deduplication by endpoint fields for finding {new_finding.id} with urls {list1} and finding {to_duplicate_finding.id} with urls {list2}", @@ -535,6 +545,9 @@ def find_candidates_for_reimport_legacy(test, findings, service=None): def _is_candidate_older(new_finding, candidate): + # Unsaved findings (e.g. preview mode) have no PK — all DB candidates are older by definition + if new_finding.pk is None: + return True # Ensure the newer finding is marked as duplicate of the older finding is_older = candidate.id < new_finding.id if not is_older: @@ -715,7 +728,116 @@ def _flush_duplicate_changes(modified_new_findings): return modified_new_findings +# --------------------------------------------------------------------------- +# Match-only functions (read-only, no DB writes) +# These return [(new_finding, matched_candidate), ...] without persisting. +# Used by both the regular dedup pipeline and the Pro import/reimport preview engine. +# --------------------------------------------------------------------------- + + +def match_batch_hash_code(findings): + """Find dedup matches by hash_code without persisting. Returns [(finding, candidate), ...].""" + if not findings: + return [] + test = findings[0].test + candidates_by_hash = find_candidates_for_deduplication_hash(test, findings) + if not candidates_by_hash: + return [] + matches = [] + for new_finding in findings: + for match in get_matches_from_hash_candidates(new_finding, candidates_by_hash): + matches.append((new_finding, match)) + break + return matches + + +def match_batch_unique_id(findings): + """Find dedup matches by unique_id_from_tool without persisting. Returns [(finding, candidate), ...].""" + if not findings: + return [] + test = findings[0].test + candidates_by_uid = find_candidates_for_deduplication_unique_id(test, findings) + if not candidates_by_uid: + return [] + matches = [] + for new_finding in findings: + for match in get_matches_from_unique_id_candidates(new_finding, candidates_by_uid): + matches.append((new_finding, match)) + break + return matches + + +def match_batch_uid_or_hash(findings): + """Find dedup matches by uid or hash_code without persisting. Returns [(finding, candidate), ...].""" + if not findings: + return [] + test = findings[0].test + candidates_by_uid, existing_by_hash = find_candidates_for_deduplication_uid_or_hash(test, findings) + if not (candidates_by_uid or existing_by_hash): + return [] + matches = [] + for new_finding in findings: + if new_finding.duplicate: + continue + for match in get_matches_from_uid_or_hash_candidates(new_finding, candidates_by_uid, existing_by_hash): + matches.append((new_finding, match)) + break + return matches + + +def match_batch_legacy(findings): + """Find dedup matches by legacy algorithm without persisting. Returns [(finding, candidate), ...].""" + if not findings: + return [] + test = findings[0].test + candidates_by_title, candidates_by_cwe = find_candidates_for_deduplication_legacy(test, findings) + if not (candidates_by_title or candidates_by_cwe): + return [] + matches = [] + for new_finding in findings: + for match in get_matches_from_legacy_candidates(new_finding, candidates_by_title, candidates_by_cwe): + matches.append((new_finding, match)) + break + return matches + + +def match_batch_of_findings(findings): + """ + Batch match findings against existing candidates without persisting. + + Returns list of (new_finding, matched_candidate) tuples. + Works with both saved and unsaved findings. + """ + if not findings: + return [] + enabled = System_Settings.objects.get().enable_deduplication + if not enabled: + return [] + # Only sort by id for saved findings; unsaved findings have no id + if findings[0].pk is not None: + findings = sorted(findings, key=attrgetter("id")) + test = findings[0].test + dedup_alg = test.deduplication_algorithm + if dedup_alg == settings.DEDUPE_ALGO_HASH_CODE: + return match_batch_hash_code(findings) + if dedup_alg == settings.DEDUPE_ALGO_UNIQUE_ID_FROM_TOOL: + return match_batch_unique_id(findings) + if dedup_alg == settings.DEDUPE_ALGO_UNIQUE_ID_FROM_TOOL_OR_HASH_CODE: + return match_batch_uid_or_hash(findings) + return match_batch_legacy(findings) + + +# --------------------------------------------------------------------------- +# Batch dedup functions (match + persist) +# These call the match-only functions above and then persist the results. +# --------------------------------------------------------------------------- + + def _dedupe_batch_hash_code(findings): + # NOTE: These functions intentionally interleave matching and set_duplicate() + # rather than calling the match_batch_*() functions above. This is because + # set_duplicate() modifies finding.duplicate in-memory, which affects the + # duplicate check in subsequent loop iterations (especially for uid_or_hash). if not findings: return [] test = findings[0].test diff --git a/dojo/importers/default_importer.py b/dojo/importers/default_importer.py index a57b6884152..103bb2f48f1 100644 --- a/dojo/importers/default_importer.py +++ b/dojo/importers/default_importer.py @@ -308,35 +308,12 @@ def process_findings( return new_findings - def close_old_findings( - self, - findings: list[Finding], - **kwargs: dict, - ) -> list[Finding]: + def get_close_old_findings_queryset(self, new_hash_codes, new_unique_ids_from_tool): """ - Closes old findings based on a hash code match at either the product - or the engagement scope. Closing an old finding entails setting the - finding to mitigated status, setting all location statuses to mitigated, - as well as leaving a not on the finding indicating that it was mitigated - because the vulnerability is no longer present in the submitted scan report. - """ - # First check if close old findings is desired - if not self.close_old_findings_toggle: - return [] + Build queryset of findings that would be closed, without closing them. - logger.debug("IMPORT_SCAN: Closing findings no longer present in scan report") - # Remove all the findings that are coming from the report already mitigated - new_hash_codes = [] - new_unique_ids_from_tool = [] - for finding in findings.values(): - # Do not process closed findings in the report - if finding.get("is_mitigated", False): - continue - # Grab the hash code - if (hash_code := finding.get("hash_code")) is not None: - new_hash_codes.append(hash_code) - if (unique_id_from_tool := finding.get("unique_id_from_tool")) is not None: - new_unique_ids_from_tool.append(unique_id_from_tool) + Reusable by preview engines to count findings that would be closed. + """ # Get the initial filtered list of old findings to be closed without # considering the scope of the product or engagement # Include both active findings and risk-accepted findings (which have active=False) @@ -373,6 +350,38 @@ def close_old_findings( old_findings = old_findings.filter(service=self.service) else: old_findings = old_findings.filter(Q(service__isnull=True) | Q(service__exact="")) + return old_findings + + def close_old_findings( + self, + findings: list[Finding], + **kwargs: dict, + ) -> list[Finding]: + """ + Closes old findings based on a hash code match at either the product + or the engagement scope. Closing an old finding entails setting the + finding to mitigated status, setting all location statuses to mitigated, + as well as leaving a not on the finding indicating that it was mitigated + because the vulnerability is no longer present in the submitted scan report. + """ + # First check if close old findings is desired + if not self.close_old_findings_toggle: + return [] + + logger.debug("IMPORT_SCAN: Closing findings no longer present in scan report") + # Remove all the findings that are coming from the report already mitigated + new_hash_codes = [] + new_unique_ids_from_tool = [] + for finding in findings.values(): + # Do not process closed findings in the report + if finding.get("is_mitigated", False): + continue + # Grab the hash code + if (hash_code := finding.get("hash_code")) is not None: + new_hash_codes.append(hash_code) + if (unique_id_from_tool := finding.get("unique_id_from_tool")) is not None: + new_unique_ids_from_tool.append(unique_id_from_tool) + old_findings = self.get_close_old_findings_queryset(new_hash_codes, new_unique_ids_from_tool) # Update the status of the findings and any locations for old_finding in old_findings: url = str(get_full_url(reverse("view_test", args=(self.test.id,)))) diff --git a/dojo/tools/anchore_grype/parser.py b/dojo/tools/anchore_grype/parser.py index f53935d8ee2..16132890c7e 100644 --- a/dojo/tools/anchore_grype/parser.py +++ b/dojo/tools/anchore_grype/parser.py @@ -76,7 +76,8 @@ def get_findings(self, file, test): rel_epss = related_vulnerability.get("epss") rel_vuln_id = related_vulnerability.get("id") vulnerability_ids = self.get_vulnerability_ids( - vuln_id, related_vulnerabilities, + vuln_id, + related_vulnerabilities, ) matches = item["matchDetails"] @@ -87,11 +88,7 @@ def get_findings(self, file, test): artifact_purl = artifact.get("purl") artifact_location = artifact.get("locations") file_path = None - if ( - artifact_location - and len(artifact_location) > 0 - and artifact_location[0].get("path") - ): + if artifact_location and len(artifact_location) > 0 and artifact_location[0].get("path"): file_path = artifact_location[0].get("path") finding_title = f"{vuln_id} in {artifact_name}:{artifact_version}" @@ -99,25 +96,17 @@ def get_findings(self, file, test): finding_tags = None finding_description = "" if vuln_namespace: - finding_description += ( - f"**Vulnerability Namespace:** {vuln_namespace}" - ) + finding_description += f"**Vulnerability Namespace:** {vuln_namespace}" if vuln_description: - finding_description += ( - f"\n**Vulnerability Description:** {vuln_description}" - ) + finding_description += f"\n**Vulnerability Description:** {vuln_description}" if rel_description and rel_description != vuln_description: finding_description += f"\n**Related Vulnerability Description:** {rel_description}" if matches: if isinstance(item["matchDetails"], dict): - finding_description += ( - f"\n**Matcher:** {matches['matcher']}" - ) + finding_description += f"\n**Matcher:** {matches['matcher']}" finding_tags = [matches["matcher"].replace("-matcher", "")] elif len(matches) == 1: - finding_description += ( - f"\n**Matcher:** {matches[0]['matcher']}" - ) + finding_description += f"\n**Matcher:** {matches[0]['matcher']}" finding_tags = [ matches[0]["matcher"].replace("-matcher", ""), ] @@ -148,30 +137,22 @@ def get_findings(self, file, test): finding_references = "" if vuln_datasource: - finding_references += ( - f"**Vulnerability Datasource:** {vuln_datasource}\n" - ) + finding_references += f"**Vulnerability Datasource:** {vuln_datasource}\n" if vuln_urls: if len(vuln_urls) == 1: if vuln_urls[0] != vuln_datasource: - finding_references += ( - f"**Vulnerability URL:** {vuln_urls[0]}\n" - ) + finding_references += f"**Vulnerability URL:** {vuln_urls[0]}\n" else: finding_references += "**Vulnerability URLs:**\n" for url in vuln_urls: if url != vuln_datasource: finding_references += f"- {url}\n" if rel_datasource: - finding_references += ( - f"**Related Vulnerability Datasource:** {rel_datasource}\n" - ) + finding_references += f"**Related Vulnerability Datasource:** {rel_datasource}\n" if rel_urls: if len(rel_urls) == 1: if rel_urls[0] != vuln_datasource: - finding_references += ( - f"**Related Vulnerability URL:** {rel_urls[0]}\n" - ) + finding_references += f"**Related Vulnerability URL:** {rel_urls[0]}\n" else: finding_references += "**Related Vulnerability URLs:**\n" for url in rel_urls: @@ -246,7 +227,8 @@ def get_cvss(self, cvss): vector = cvss_item["vector"] cvss_objects = cvss_parser.parse_cvss_from_text(vector) if len(cvss_objects) > 0 and isinstance( - cvss_objects[0], CVSS3, + cvss_objects[0], + CVSS3, ): return vector return None @@ -276,8 +258,11 @@ def get_vulnerability_ids(self, vuln_id, related_vulnerabilities): if vuln_id: vulnerability_ids.append(vuln_id) if related_vulnerabilities: - vulnerability_ids.extend(related_vulnerability_id for related_vulnerability in related_vulnerabilities - if (related_vulnerability_id := related_vulnerability.get("id"))) + vulnerability_ids.extend( + related_vulnerability_id + for related_vulnerability in related_vulnerabilities + if (related_vulnerability_id := related_vulnerability.get("id")) + ) if vulnerability_ids: return vulnerability_ids return None diff --git a/dojo/tools/cargo_audit/parser.py b/dojo/tools/cargo_audit/parser.py index cb7eeb97e31..cd84b8100dc 100644 --- a/dojo/tools/cargo_audit/parser.py +++ b/dojo/tools/cargo_audit/parser.py @@ -80,24 +80,13 @@ def get_findings(self, filename, test): vuln_id = advisory.get("id") vulnerability_ids = [advisory.get("id")] categories = f"**Categories:** {', '.join(advisory['categories'])}" if "categories" in advisory else "" - description = ( - categories - + f"\n**Description:** `{advisory.get('description')}`" - ) + description = categories + f"\n**Description:** `{advisory.get('description')}`" - if ( - item["affected"] is not None - and "functions" in item["affected"] - ): + if item["affected"] is not None and "functions" in item["affected"]: affected_func = [ - f'{func}: {", ".join(versions)}' - for func, versions in item["affected"][ - "functions" - ].items() + f"{func}: {', '.join(versions)}" for func, versions in item["affected"]["functions"].items() ] - description += ( - f"\n**Affected functions**: {', '.join(affected_func)}" - ) + description += f"\n**Affected functions**: {', '.join(affected_func)}" references = f"{advisory.get('url')}\n" + "\n".join( advisory["references"], diff --git a/dojo/tools/dependency_check/parser.py b/dojo/tools/dependency_check/parser.py index e976d0a4d96..4c472e2f4c4 100644 --- a/dojo/tools/dependency_check/parser.py +++ b/dojo/tools/dependency_check/parser.py @@ -87,7 +87,10 @@ def add_finding(self, finding, dupes): dupes[key] = finding def get_filename_and_path_from_dependency( - self, dependency, related_dependency, namespace, + self, + dependency, + related_dependency, + namespace, ): if related_dependency is None: return dependency.findtext( @@ -104,7 +107,10 @@ def get_filename_and_path_from_dependency( return None, None def get_component_name_and_version_from_dependency( - self, dependency, related_dependency, namespace, + self, + dependency, + related_dependency, + namespace, ): identifiers_node = dependency.find(namespace + "identifiers") if identifiers_node is not None: @@ -116,20 +122,13 @@ def get_component_name_and_version_from_dependency( purl_parts = purl.to_dict() component_name = ( purl_parts["namespace"] + ":" - if purl_parts["namespace"] - and len(purl_parts["namespace"]) > 0 - else "" - ) - component_name += ( - purl_parts["name"] - if purl_parts["name"] and len(purl_parts["name"]) > 0 + if purl_parts["namespace"] and len(purl_parts["namespace"]) > 0 else "" ) + component_name += purl_parts["name"] if purl_parts["name"] and len(purl_parts["name"]) > 0 else "" component_name = component_name or None component_version = ( - purl_parts["version"] - if purl_parts["version"] and len(purl_parts["version"]) > 0 - else "" + purl_parts["version"] if purl_parts["version"] and len(purl_parts["version"]) > 0 else "" ) return component_name, component_version, pck_id @@ -149,20 +148,10 @@ def get_component_name_and_version_from_dependency( if cpe_node: cpe_id = cpe_node.findtext(f"{namespace}name") cpe = CPE(cpe_id) - component_name = ( - cpe.get_vendor()[0] + ":" - if len(cpe.get_vendor()) > 0 - else "" - ) - component_name += ( - cpe.get_product()[0] if len(cpe.get_product()) > 0 else "" - ) + component_name = cpe.get_vendor()[0] + ":" if len(cpe.get_vendor()) > 0 else "" + component_name += cpe.get_product()[0] if len(cpe.get_product()) > 0 else "" component_name = component_name or None - component_version = ( - cpe.get_version()[0] - if len(cpe.get_version()) > 0 - else None - ) + component_version = cpe.get_version()[0] if len(cpe.get_version()) > 0 else None return component_name, component_version, None maven_node = identifiers_node.find( @@ -251,7 +240,8 @@ def get_severity_and_cvss_meta(self, vulnerability, namespace) -> dict: if severity: if severity.strip().lower() not in self.SEVERITY_MAPPING: logger.warning( - "Warning: Unknow severity value detected '%s'. Bypass to 'Medium' value", severity, + "Warning: Unknow severity value detected '%s'. Bypass to 'Medium' value", + severity, ) severity = "Medium" else: @@ -266,13 +256,20 @@ def get_severity_and_cvss_meta(self, vulnerability, namespace) -> dict: } def get_finding_from_vulnerability( - self, dependency, related_dependency, vulnerability, test, namespace, + self, + dependency, + related_dependency, + vulnerability, + test, + namespace, ): ( dependency_filename, dependency_filepath, ) = self.get_filename_and_path_from_dependency( - dependency, related_dependency, namespace, + dependency, + related_dependency, + namespace, ) # logger.debug('dependency_filename: %s', dependency_filename) @@ -318,13 +315,17 @@ def get_finding_from_vulnerability( component_version, component_purl, ) = self.get_component_name_and_version_from_dependency( - dependency, related_dependency, namespace, + dependency, + related_dependency, + namespace, ) stripped_name = name # startswith CVE-XXX-YYY stripped_name = re.sub( - r"^CVE-\d{4}-\d{4,7}", "", stripped_name, + r"^CVE-\d{4}-\d{4,7}", + "", + stripped_name, ).strip() # startswith CWE-XXX: stripped_name = re.sub(r"^CWE-\d+\:", "", stripped_name).strip() @@ -333,7 +334,8 @@ def get_finding_from_vulnerability( if component_name is None: logger.warning( - "component_name was None for File: %s, using dependency file name instead.", dependency_filename, + "component_name was None for File: %s, using dependency file name instead.", + dependency_filename, ) component_name = dependency_filename @@ -352,15 +354,9 @@ def get_finding_from_vulnerability( ref_url = reference_node.findtext(f"{namespace}url") ref_name = reference_node.findtext(f"{namespace}name") if ref_url == ref_name: - reference_detail += ( - f"**Source:** {ref_source}\n**URL:** {ref_url}\n\n" - ) + reference_detail += f"**Source:** {ref_source}\n**URL:** {ref_url}\n\n" else: - reference_detail += ( - f"**Source:** {ref_source}\n" - f"**URL:** {ref_url}\n" - f"**Name:** {ref_name}\n\n" - ) + reference_detail += f"**Source:** {ref_source}\n**URL:** {ref_url}\n**Name:** {ref_name}\n\n" if related_dependency is not None: tags.append("related") @@ -370,14 +366,18 @@ def get_finding_from_vulnerability( notes = "Document on why we are suppressing this vulnerability is missing!" tags.append("no_suppression_document") mitigation = f"**This vulnerability is mitigated and/or suppressed:** {notes}\n" - mitigation += f"Update {component_name}:{component_version} to at least the version recommended in the description" + mitigation += ( + f"Update {component_name}:{component_version} to at least the version recommended in the description" + ) mitigated = datetime.datetime.now(datetime.UTC) is_Mitigated = True active = False tags.append("suppressed") else: - mitigation = f"Update {component_name}:{component_version} to at least the version recommended in the description" + mitigation = ( + f"Update {component_name}:{component_version} to at least the version recommended in the description" + ) description += "\n**Filepath:** " + str(dependency_filepath) active = True @@ -467,19 +467,15 @@ def get_findings(self, filename, test): namespace + "relatedDependencies", ) if relatedDependencies is not None: - for ( - relatedDependency - ) in relatedDependencies.findall( + for relatedDependency in relatedDependencies.findall( namespace + "relatedDependency", ): - finding = ( - self.get_finding_from_vulnerability( - dependency, - relatedDependency, - vulnerability, - test, - namespace, - ) + finding = self.get_finding_from_vulnerability( + dependency, + relatedDependency, + vulnerability, + test, + namespace, ) if finding: # could be None if scan_date: @@ -503,7 +499,9 @@ def get_findings(self, filename, test): elif settings.V3_FEATURE_LOCATIONS: # Collect product-level dependency locations _, _, component_purl = self.get_component_name_and_version_from_dependency( - dependency, None, namespace, + dependency, + None, + namespace, ) if component_purl: test.unsaved_metadata.append( diff --git a/dojo/tools/jfrog_xray_unified/parser.py b/dojo/tools/jfrog_xray_unified/parser.py index f391e3b001c..9235d0e20b1 100644 --- a/dojo/tools/jfrog_xray_unified/parser.py +++ b/dojo/tools/jfrog_xray_unified/parser.py @@ -49,9 +49,7 @@ def get_item(vulnerability, test): # not all cves have cvssv3 scores, so skip these. If no v3 scores, # we'll default to index 0 if "cvss_v3_score" in vulnerability["cves"][thisCveIndex]: - thisCvssV3Score = vulnerability["cves"][thisCveIndex][ - "cvss_v3_score" - ] + thisCvssV3Score = vulnerability["cves"][thisCveIndex]["cvss_v3_score"] if thisCvssV3Score > highestCvssV3Score: highestCvssV3Index = thisCveIndex highestCvssV3Score = thisCvssV3Score @@ -84,23 +82,13 @@ def get_item(vulnerability, test): cvss_v2 = worstCve["cvss_v2_vector"] fix_available = False - if ( - "fixed_versions" in vulnerability - and len(vulnerability["fixed_versions"]) > 0 - ): + if "fixed_versions" in vulnerability and len(vulnerability["fixed_versions"]) > 0: mitigation = "Versions containing a fix:\n" mitigation += "\n".join(vulnerability["fixed_versions"]) fix_available = True - if ( - "external_advisory_source" in vulnerability - and "external_advisory_severity" in vulnerability - ): - extra_desc = ( - vulnerability["external_advisory_source"] - + ": " - + vulnerability["external_advisory_severity"] - ) + if "external_advisory_source" in vulnerability and "external_advisory_severity" in vulnerability: + extra_desc = vulnerability["external_advisory_source"] + ": " + vulnerability["external_advisory_severity"] if vulnerability["issue_id"]: title = vulnerability["issue_id"] + " - " + vulnerability["summary"] @@ -108,10 +96,15 @@ def get_item(vulnerability, test): title = vulnerability["summary"] references_str = vulnerability.get("references") - references = "\n".join(references_str) if isinstance(references_str, list) else (references_str if isinstance(references_str, str) else "") + references = ( + "\n".join(references_str) + if isinstance(references_str, list) + else (references_str if isinstance(references_str, str) else "") + ) scan_time = datetime.strptime( - vulnerability["artifact_scan_time"], "%Y-%m-%dT%H:%M:%S%z", + vulnerability["artifact_scan_time"], + "%Y-%m-%dT%H:%M:%S%z", ) # component has several parts separated by colons. Last part is the @@ -132,9 +125,7 @@ def get_item(vulnerability, test): title=title, test=test, severity=severity, - description=( - vulnerability.get("description", vulnerability.get("summary")) + "\n\n" + extra_desc - ).strip(), + description=(vulnerability.get("description", vulnerability.get("summary")) + "\n\n" + extra_desc).strip(), mitigation=mitigation, component_name=component_name, component_version=component_version, @@ -162,7 +153,9 @@ def get_item(vulnerability, test): if settings.V3_FEATURE_LOCATIONS and package_type and component_name: purl_type = package_type.lower() finding.unsaved_locations.append( - LocationData.dependency(purl_type=purl_type, name=component_name, version=component_version, file_path=vulnerability["path"]), + LocationData.dependency( + purl_type=purl_type, name=component_name, version=component_version, file_path=vulnerability["path"], + ), ) return finding diff --git a/dojo/tools/threat_composer/parser.py b/dojo/tools/threat_composer/parser.py index 4e347ae82e7..e7e95b6ca77 100644 --- a/dojo/tools/threat_composer/parser.py +++ b/dojo/tools/threat_composer/parser.py @@ -66,7 +66,6 @@ def get_findings(self, file, test): mitigation_links[linked_id].append(mitigations[mitigation_id]) for threat in data["threats"]: - if "threatAction" in threat: title = threat["threatAction"] severity, impact, comments = self.parse_threat_metadata(threat.get("metadata", []))