|
| 1 | +"""Parser for OpenReports (https://github.com/openreports/reports-api) vulnerability scan reports""" |
| 2 | + |
| 3 | +import json |
| 4 | +import logging |
| 5 | + |
| 6 | +from dojo.models import Finding |
| 7 | + |
| 8 | +logger = logging.getLogger(__name__) |
| 9 | + |
| 10 | + |
| 11 | +OPENREPORTS_SEVERITIES = { |
| 12 | + "critical": "Critical", |
| 13 | + "high": "High", |
| 14 | + "medium": "Medium", |
| 15 | + "low": "Low", |
| 16 | + "info": "Info", |
| 17 | +} |
| 18 | + |
| 19 | +DESCRIPTION_TEMPLATE = """{message} |
| 20 | +
|
| 21 | +**Category:** {category} |
| 22 | +**Policy:** {policy} |
| 23 | +**Result:** {result} |
| 24 | +**Source:** {source} |
| 25 | +**Package Name:** {pkg_name} |
| 26 | +**Installed Version:** {installed_version} |
| 27 | +**Fixed Version:** {fixed_version} |
| 28 | +**Primary URL:** {primary_url} |
| 29 | +""" |
| 30 | + |
| 31 | + |
| 32 | +class OpenreportsParser: |
| 33 | + def get_scan_types(self): |
| 34 | + return ["OpenReports Scan"] |
| 35 | + |
| 36 | + def get_label_for_scan_types(self, scan_type): |
| 37 | + return "OpenReports Scan" |
| 38 | + |
| 39 | + def get_description_for_scan_types(self, scan_type): |
| 40 | + return "Import OpenReports JSON scan report." |
| 41 | + |
| 42 | + def get_findings(self, scan_file, test): |
| 43 | + scan_data = scan_file.read() |
| 44 | + |
| 45 | + try: |
| 46 | + data = json.loads(str(scan_data, "utf-8")) |
| 47 | + except Exception: |
| 48 | + data = json.loads(scan_data) |
| 49 | + |
| 50 | + if data is None: |
| 51 | + return [] |
| 52 | + |
| 53 | + findings = [] |
| 54 | + |
| 55 | + # Handle both single report and list of reports |
| 56 | + reports = [] |
| 57 | + if isinstance(data, dict): |
| 58 | + # Check if it's a Kubernetes List object |
| 59 | + if data.get("kind") == "List" and "items" in data: |
| 60 | + reports = data["items"] |
| 61 | + # Check if it's a single Report object |
| 62 | + elif data.get("kind") == "Report": |
| 63 | + reports = [data] |
| 64 | + elif isinstance(data, list): |
| 65 | + reports = data |
| 66 | + |
| 67 | + for report in reports: |
| 68 | + if not isinstance(report, dict) or report.get("kind") != "Report": |
| 69 | + continue |
| 70 | + |
| 71 | + findings.extend(self._parse_report(test, report)) |
| 72 | + |
| 73 | + return findings |
| 74 | + |
| 75 | + def _parse_report(self, test, report): |
| 76 | + findings = [] |
| 77 | + |
| 78 | + # Extract metadata |
| 79 | + metadata = report.get("metadata", {}) |
| 80 | + report_name = metadata.get("name", "") |
| 81 | + namespace = metadata.get("namespace", "") |
| 82 | + |
| 83 | + # Extract scope information |
| 84 | + scope = report.get("scope", {}) |
| 85 | + scope_kind = scope.get("kind", "") |
| 86 | + scope_name = scope.get("name", "") |
| 87 | + |
| 88 | + # Create service identifier from scope and metadata |
| 89 | + service_name = f"{namespace}/{scope_kind}/{scope_name}" if namespace else f"{scope_kind}/{scope_name}" |
| 90 | + |
| 91 | + # Extract results |
| 92 | + results = report.get("results", []) |
| 93 | + |
| 94 | + for result in results: |
| 95 | + if not isinstance(result, dict): |
| 96 | + continue |
| 97 | + |
| 98 | + finding = self._create_finding_from_result(test, result, service_name, report_name) |
| 99 | + if finding: |
| 100 | + findings.append(finding) |
| 101 | + |
| 102 | + return findings |
| 103 | + |
| 104 | + def _create_finding_from_result(self, test, result, service_name, report_name): |
| 105 | + try: |
| 106 | + # Extract basic fields |
| 107 | + message = result.get("message", "") |
| 108 | + category = result.get("category", "") |
| 109 | + policy = result.get("policy", "") |
| 110 | + result_status = result.get("result", "") |
| 111 | + severity = result.get("severity", "info").lower() |
| 112 | + source = result.get("source", "") |
| 113 | + |
| 114 | + # Extract properties |
| 115 | + properties = result.get("properties", {}) |
| 116 | + pkg_name = properties.get("pkgName", "") |
| 117 | + installed_version = properties.get("installedVersion", "") |
| 118 | + fixed_version = properties.get("fixedVersion", "") |
| 119 | + primary_url = properties.get("primaryURL", "") |
| 120 | + |
| 121 | + # Convert severity to DefectDojo format |
| 122 | + severity_normalized = OPENREPORTS_SEVERITIES.get(severity, "Info") |
| 123 | + |
| 124 | + # Create title |
| 125 | + if policy.startswith("CVE-"): |
| 126 | + title = f"{policy} in {pkg_name}" |
| 127 | + else: |
| 128 | + title = f"{policy}: {message}" |
| 129 | + |
| 130 | + # Create description |
| 131 | + description = DESCRIPTION_TEMPLATE.format( |
| 132 | + message=message, |
| 133 | + category=category, |
| 134 | + policy=policy, |
| 135 | + result=result_status, |
| 136 | + source=source, |
| 137 | + pkg_name=pkg_name, |
| 138 | + installed_version=installed_version, |
| 139 | + fixed_version=fixed_version, |
| 140 | + primary_url=primary_url, |
| 141 | + ) |
| 142 | + |
| 143 | + # Determine if fix is available |
| 144 | + fix_available = bool(fixed_version and fixed_version.strip()) |
| 145 | + |
| 146 | + # Set mitigation based on fixed version |
| 147 | + mitigation = f"Upgrade to version: {fixed_version}" if fixed_version else "" |
| 148 | + |
| 149 | + # Set references |
| 150 | + references = primary_url if primary_url else "" |
| 151 | + |
| 152 | + # Determine active status based on result |
| 153 | + active = result_status not in ["skip", "pass"] |
| 154 | + verified = result_status in ["fail", "warn"] |
| 155 | + |
| 156 | + # Create tags |
| 157 | + tags = [category, source] |
| 158 | + if scope_kind := service_name.split("/")[1] if "/" in service_name else "": |
| 159 | + tags.append(scope_kind) |
| 160 | + |
| 161 | + finding = Finding( |
| 162 | + test=test, |
| 163 | + title=title, |
| 164 | + description=description, |
| 165 | + severity=severity_normalized, |
| 166 | + references=references, |
| 167 | + mitigation=mitigation, |
| 168 | + component_name=pkg_name, |
| 169 | + component_version=installed_version, |
| 170 | + service=service_name, |
| 171 | + active=active, |
| 172 | + verified=verified, |
| 173 | + static_finding=True, |
| 174 | + dynamic_finding=False, |
| 175 | + fix_available=fix_available, |
| 176 | + tags=tags, |
| 177 | + ) |
| 178 | + |
| 179 | + # Add vulnerability ID if it's a CVE |
| 180 | + if policy.startswith("CVE-"): |
| 181 | + finding.unsaved_vulnerability_ids = [policy] |
| 182 | + |
| 183 | + return finding |
| 184 | + |
| 185 | + except KeyError as exc: |
| 186 | + logger.warning("Failed to parse OpenReports result due to missing key: %r", exc) |
| 187 | + return None |
| 188 | + except Exception as exc: |
| 189 | + logger.warning("Failed to parse OpenReports result: %r", exc) |
| 190 | + return None |
0 commit comments