Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -37,8 +37,8 @@ $ docker compose build --build-arg uid=1000
|`unittests/scans/<parser_dir>/{many_vulns,no_vuln,one_vuln}.json` | Sample files containing meaningful data for unit tests. The minimal set.
|`unittests/tools/test_<parser_name>_parser.py` | Unit tests of the parser.
|`dojo/settings/settings.dist.py` | If you want to use a modern hashcode based deduplication algorithm
|`docs/content/en/connecting_your_tools/parsers/<file/api>/<parser_file>.md` | Documentation, what kind of file format is required and how it should be obtained
|`docs/content/en/connecting_your_tools/parsers/<file/api>/<parser_file>.md` | Documentation, what kind of file format is required and how it should be obtained


## Factory contract

Expand All @@ -57,6 +57,7 @@ Parsers are loaded dynamicaly with a factory pattern. To have your parser loaded
3. `def get_description_for_scan_types(self, scan_type):` This function return a string used to provide some text in the UI (long description)
4. `def get_findings(self, file, test)` This function return a list of findings
6. If your parser have more than 1 scan_type (for detailled mode) you **MUST** implement `def set_mode(self, mode)` method
7. The parser instance is re-used over all imports performed for this scan_type, so do not store any data at class level

Example:

Expand Down Expand Up @@ -145,7 +146,7 @@ Very bad example:
Various file formats are handled through libraries. In order to keep DefectDojo slim and also don't extend the attack surface, keep the number of libraries used minimal and take other parsers as an example.

#### defusedXML in favour of lxml
As xml is by default an unsecure format, the information parsed from various xml output has to be parsed in a secure way. Within an evaluation, we determined that defusedXML is the library which we will use in the future to parse xml files in parsers as this library is rated more secure. Thus, we will only accept PRs with the defusedxml library.
As xml is by default an unsecure format, the information parsed from various xml output has to be parsed in a secure way. Within an evaluation, we determined that defusedXML is the library which we will use in the future to parse xml files in parsers as this library is rated more secure. Thus, we will only accept PRs with the defusedxml library.

### Not all attributes are mandatory

Expand Down Expand Up @@ -366,4 +367,3 @@ Please add a new .md file in [`docs/content/en/connecting_your_tools/parsers`] w
* A link to the scanner itself - (e.g. GitHub or vendor link)

Here is an example of a completed Parser documentation page: [https://github.com/DefectDojo/django-DefectDojo/blob/master/docs/content/en/connecting_your_tools/parsers/file/acunetix.md](https://github.com/DefectDojo/django-DefectDojo/blob/master/docs/content/en/connecting_your_tools/parsers/file/acunetix.md)

51 changes: 30 additions & 21 deletions dojo/tools/fortify/fpr_parser.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@
logger = logging.getLogger(__name__)


class FortifyFPRParser:
class FortifyRelatedData:
def __init__(self):
self.descriptions: dict[str, DescriptionData] = {}
self.snippets: dict[str, SnippetData] = {}
Expand All @@ -20,6 +20,11 @@ def __init__(self):
self.suppressed: dict[str, bool] = {}
self.threaded_comments: dict[str, list[str]] = {}


class FortifyFPRParser:
def __init__(self):
pass

def parse_fpr(self, filename, test):
if str(filename.__class__) == "<class '_io.TextIOWrapper'>":
input_zip = zipfile.ZipFile(filename.name, "r")
Expand Down Expand Up @@ -60,41 +65,44 @@ def identify_namespace(self, root: Element) -> dict[str, str]:

def parse_related_data(self, root: Element, test: Test) -> None:
"""Parse the XML and generate a list of findings."""
related_data = FortifyRelatedData()
for description in root.findall("Description", self.namespaces):
class_id = description.attrib.get("classID")
logger.debug(f"Description: {class_id}")
if class_id:
self.descriptions[class_id] = self.parse_description_information(description)
related_data.descriptions[class_id] = self.parse_description_information(description)

for snippet in root.find("Snippets", self.namespaces):
snippet_id = snippet.attrib.get("id")
logger.debug(f"Snippet: {snippet_id}")
if snippet_id:
self.snippets[snippet_id] = self.parse_snippet_information(snippet)
related_data.snippets[snippet_id] = self.parse_snippet_information(snippet)

for rule in root.find("EngineData", self.namespaces).find("RuleInfo", self.namespaces):
rule_id = rule.attrib.get("id")
logger.debug(f"Rule: {rule_id}")
if rule_id:
self.rules[rule_id] = self.parse_rule_information(rule.find("MetaInfo", self.namespaces))
related_data.rules[rule_id] = self.parse_rule_information(rule.find("MetaInfo", self.namespaces))
return related_data

def parse_audit_log(self, audit_log: Element) -> None:
def add_audit_log(self, related_data, audit_log: Element) -> None:
logger.debug("Parse audit log")
if audit_log is None:
return
return related_data

for issue in audit_log.find("IssueList", self.namespaces_audit_log).findall("Issue", self.namespaces_audit_log):
instance_id = issue.attrib.get("instanceId")
if instance_id:
suppressed_string = issue.attrib.get("suppressed")
suppressed = suppressed_string.lower() == "true" if suppressed_string else False
logger.debug(f"Issue: {instance_id} - Suppressed: {suppressed}")
self.suppressed[instance_id] = suppressed
related_data.suppressed[instance_id] = suppressed

threaded_comments = issue.find("ThreadedComments", self.namespaces_audit_log)
logger.debug(f"ThreadedComments: {threaded_comments}")
if threaded_comments is not None:
self.threaded_comments[instance_id] = [self.get_comment_text(comment) for comment in threaded_comments.findall("Comment", self.namespaces_audit_log)]
related_data.threaded_comments[instance_id] = [self.get_comment_text(comment) for comment in threaded_comments.findall("Comment", self.namespaces_audit_log)]
return related_data

def get_comment_text(self, comment: Element) -> str:
content = comment.findtext("Content", "", self.namespaces_audit_log)
Expand All @@ -107,8 +115,9 @@ def convert_vulnerabilities_to_findings(self, root: Element, audit_log: Element,
"""Convert the list of vulnerabilities to a list of findings."""
"""Try to mimic the logic from the xml parser"""
"""Future Improvement: share code between xml and fpr parser (it was split up earlier)"""
self.parse_related_data(root, test)
self.parse_audit_log(audit_log)
related_data = self.parse_related_data(root, test)
# add audit log information to related data
related_data = self.add_audit_log(related_data, audit_log)

findings = []
for vuln in root.find("Vulnerabilities", self.namespaces):
Expand All @@ -117,18 +126,18 @@ def convert_vulnerabilities_to_findings(self, root: Element, audit_log: Element,
self.parse_class_information(vuln, vuln_data)
self.parse_analysis_information(vuln, vuln_data)

snippet = self.snippets.get(vuln_data.snippet_id)
description = self.descriptions.get(vuln_data.class_id)
rule = self.rules.get(vuln_data.class_id)
snippet = related_data.snippets.get(vuln_data.snippet_id)
description = related_data.descriptions.get(vuln_data.class_id)
rule = related_data.rules.get(vuln_data.class_id)

finding = Finding(test=test, static_finding=True)

finding.active, finding.false_p = self.compute_status(vuln_data)
finding.active, finding.false_p = self.compute_status(related_data, vuln_data)
finding.title = self.format_title(vuln_data, snippet, description, rule)
finding.description = self.format_description(vuln_data, snippet, description, rule)
finding.mitigation = self.format_mitigation(vuln_data, snippet, description, rule)
finding.severity = self.compute_severity(vuln_data, snippet, description, rule)
finding.impact = self.format_impact(vuln_data)
finding.impact = self.format_impact(related_data, vuln_data)

finding.file_path = vuln_data.source_location_path
finding.line = int(self.compute_line(vuln_data, snippet, description, rule))
Expand Down Expand Up @@ -302,22 +311,22 @@ def compute_severity(self, vulnerability, snippet, description, rule) -> str:

return "Informational"

def format_impact(self, vuln_data) -> str:
def format_impact(self, related_data, vuln_data) -> str:
"""Format the impact of the vulnerability based on the threaded comments."""
logger.debug(f"Threaded comments: {self.threaded_comments}")
threaded_comments = self.threaded_comments.get(vuln_data.instance_id)
logger.debug(f"Threaded comments: {related_data.threaded_comments}")
threaded_comments = related_data.threaded_comments.get(vuln_data.instance_id)
if not threaded_comments:
return ""

impact = "Threaded Comments:\n"
for comment in self.threaded_comments[vuln_data.instance_id]:
for comment in related_data.threaded_comments[vuln_data.instance_id]:
impact += f"{comment}\n"

return impact

def compute_status(self, vulnerability) -> tuple[bool, bool]:
def compute_status(self, related_data, vulnerability) -> tuple[bool, bool]:
"""Compute the status of the vulnerability based on the instance ID. Return active, false_p"""
if vulnerability.instance_id in self.suppressed:
if vulnerability.instance_id in related_data.suppressed:
return False, True
return True, False

Expand Down
20 changes: 10 additions & 10 deletions dojo/tools/ms_defender/parser.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,9 +11,6 @@ class MSDefenderParser:

"""Import from MSDefender findings"""

def __init__(self):
self.findings = []

def get_scan_types(self):
return ["MSDefender Parser"]

Expand All @@ -24,11 +21,12 @@ def get_description_for_scan_types(self, scan_type):
return ("MSDefender findings can be retrieved using the REST API")

def get_findings(self, file, test):
findings = []
if str(file.name).endswith(".json"):
vulnerabilityfile = json.load(file)
vulnerabilitydata = vulnerabilityfile["value"]
for vulnerability in vulnerabilitydata:
self.process_json(vulnerability)
findings.append(self.process_json(vulnerability))
elif str(file.name).endswith(".zip"):
if str(file.__class__) == "<class '_io.TextIOWrapper'>":
input_zip = zipfile.ZipFile(file.name, "r")
Expand All @@ -51,27 +49,29 @@ def get_findings(self, file, test):
vulnerabilities = []
machines = {}
for vulnerabilityfile in vulnerabilityfiles:
logger.debug("Loading vulnerabilitiy file: %s", vulnerabilityfile)
output = json.loads(zipdata[vulnerabilityfile].decode("ascii"))["value"]
for data in output:
vulnerabilities.append(data)
for machinefile in machinefiles:
logger.debug("Loading machine file: %s", vulnerabilityfile)
output = json.loads(zipdata[machinefile].decode("ascii"))["value"]
for data in output:
machines[data.get("id")] = data
for vulnerability in vulnerabilities:
try:
machine = machines.get(vulnerability["machineId"], None)
if machine is not None:
self.process_zip(vulnerability, machine)
findings.append(self.process_json_with_machine_info(vulnerability, machine))
else:
logger.debug("fallback to process without machine: no machine id")
self.process_json(vulnerability)
findings.append(self.process_json(vulnerability))
except (IndexError, KeyError):
logger.exception("fallback to process without machine: exception")
self.process_json(vulnerability)
else:
return []
return self.findings
return findings

def process_json(self, vulnerability):
description = ""
Expand All @@ -95,10 +95,10 @@ def process_json(self, vulnerability):
if vulnerability["cveId"] is not None:
finding.unsaved_vulnerability_ids = []
finding.unsaved_vulnerability_ids.append(vulnerability["cveId"])
self.findings.append(finding)
finding.unsaved_endpoints = []
return finding

def process_zip(self, vulnerability, machine):
def process_json_with_machine_info(self, vulnerability, machine):
description = ""
description += "cveId: " + str(vulnerability.get("cveId", "")) + "\n"
description += "machineId: " + str(vulnerability.get("machineId", "")) + "\n"
Expand Down Expand Up @@ -142,14 +142,14 @@ def process_zip(self, vulnerability, machine):
if "cveId" in vulnerability:
finding.unsaved_vulnerability_ids = []
finding.unsaved_vulnerability_ids.append(vulnerability["cveId"])
self.findings.append(finding)
finding.unsaved_endpoints = []
if "computerDnsName" in machine and machine["computerDnsName"] is not None:
finding.unsaved_endpoints.append(Endpoint(host=str(machine["computerDnsName"]).replace(" ", "").replace("(", "_").replace(")", "_")))
if "lastIpAddress" in machine and machine["lastIpAddress"] is not None:
finding.unsaved_endpoints.append(Endpoint(host=str(machine["lastIpAddress"])))
if "lastExternalIpAddress" in machine and machine["lastExternalIpAddress"] is not None:
finding.unsaved_endpoints.append(Endpoint(host=str(machine["lastExternalIpAddress"])))
return finding

def severity_check(self, severity_input):
if severity_input in {"Informational", "Low", "Medium", "High", "Critical"}:
Expand Down
1 change: 1 addition & 0 deletions dojo/tools/ptart/retest_parser.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ def __init__(self):
self.cvss_type = None

def get_test_data(self, tree):
self.cvss_type = None
if "retests" in tree:
self.cvss_type = tree.get("cvss_type", None)
retests = tree["retests"]
Expand Down
16 changes: 16 additions & 0 deletions unittests/tools/test_ms_defender_parser.py
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,22 @@ def test_parser_defender_zip(self):
endpoint.clean()
self.assertEqual("1.1.1.1", finding.unsaved_endpoints[0].host)

def test_parser_defender_zip_repeated(self):
"""
It was found that the defender parser was caching findings across different runs of the parser.
This test might be a good default test for any parser to make sure nothing is cached.
"""
testfile = (get_unit_tests_scans_path("ms_defender") / "defender.zip").open(encoding="utf-8")
parser = MSDefenderParser()
findings = parser.get_findings(testfile, Test())
testfile.close()
self.assertEqual(4, len(findings))

testfile_repeated = (get_unit_tests_scans_path("ms_defender") / "defender.zip").open(encoding="utf-8")
findings_repeated = parser.get_findings(testfile, Test())
testfile_repeated.close()
self.assertEqual(4, len(findings_repeated))

def test_parser_defender_wrong_machines_zip(self):
testfile = (get_unit_tests_scans_path("ms_defender") / "defender_wrong_machines.zip").open(encoding="utf-8")
parser = MSDefenderParser()
Expand Down