Skip to content

Commit 30b7e21

Browse files
new snyk_issue_api parser for code issues (file based) (#12903)
* snyk_issue_api: support code items * make fix_available backward compatible
1 parent 6e38ffa commit 30b7e21

6 files changed

Lines changed: 574 additions & 0 deletions

File tree

Lines changed: 63 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,63 @@
1+
---
2+
title: "Snyk Issue API"
3+
toc_hide: true
4+
---
5+
The Snyk Issue API parser supports importing vulnerability data from the Snyk Issue API in JSON format. Currently only parsing issues of type `code` is supported. Samples of ther issue types are welcome.
6+
7+
For more information about the Snyk Issue API, refer to the [official Snyk API documentation](https://docs.snyk.io/snyk-api/reference/issues#get-orgs-org_id-issues).
8+
9+
### API request
10+
Example API request to get only code issues:
11+
```
12+
GET https://api.snyk.io/rest/orgs/{org_id}/issues?version=2025-08-02&type=code
13+
```
14+
15+
For more details see: https://docs.snyk.io/snyk-api/reference/issues#get-orgs-org_id-issues
16+
17+
### Sample Scan Data
18+
Sample Snyk Issue API scans can be found [here](https://github.com/DefectDojo/django-DefectDojo/tree/master/unittests/scans/snyk_issue_api).
19+
20+
### Field Mapping
21+
The parser maps fields from the Snyk Issue API response to DefectDojo's Finding model as follows:
22+
23+
| Finding Field | Snyk Issue API Field | Notes |
24+
|--------------|---------------------|-------|
25+
| title | attributes.title | |
26+
| severity | attributes.effective_severity_level | Mapped to Critical/High/Medium/Low/Info |
27+
| description | attributes.description | |
28+
| unique_id_from_tool | id | Top-level issue ID |
29+
| file_path | coordinates[].representations[].sourceLocation.file | First occurrence |
30+
| line | coordinates[].representations[].sourceLocation.region.start.line | Line where the issue starts |
31+
| date | attributes.created_at | ISO format date |
32+
| cwe | classes[].id | First CWE class found |
33+
| active | attributes.status == "open" AND NOT attributes.ignored | Inactive if ignored or not open |
34+
| verified | true | Always set to true |
35+
| static_finding | true | Always set to true |
36+
| dynamic_finding | false | Always set to false |
37+
| out_of_scope | attributes.ignored | Set to true if issue is ignored |
38+
| fix_available* | coordinates[].is_fixable_* | True if any fixability flag is true. |
39+
40+
#### Impact Field
41+
The impact field combines multiple pieces of information:
42+
1. Problem details:
43+
- Source (e.g., "SNYK")
44+
- Type (e.g., "vulnerability")
45+
- Last update timestamp
46+
- Severity level
47+
2. All source locations, each containing:
48+
- File path
49+
- Commit ID
50+
- Line range (start-end)
51+
- Column range (start-end)
52+
53+
#### Additional Processing
54+
- Multiple CWEs are handled by using the first one as the primary CWE and listing additional ones in the references field
55+
- Risk scores are included in the severity_justification field when available
56+
- Only issues with type="code" are processed
57+
- Line numbers: Only the starting line is stored in the Finding model, but both start and end lines are included in the impact field for reference
58+
59+
### Default Deduplication Hashcode Fields
60+
By default, DefectDojo identifies duplicate Findings using these [hashcode fields](https://docs.defectdojo.com/en/working_with_findings/finding_deduplication/about_deduplication/):
61+
62+
- unique id from tool
63+
- file path

dojo/tools/snyk_issue_api/__init__.py

Whitespace-only changes.
Lines changed: 180 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,180 @@
1+
import json
2+
from contextlib import suppress
3+
from datetime import datetime
4+
5+
from dojo.models import Finding
6+
7+
8+
class SnykIssueApiParser:
9+
def get_scan_types(self):
10+
return ["Snyk Issue API Scan"]
11+
12+
def get_label_for_scan_types(self, scan_type):
13+
return scan_type
14+
15+
def get_description_for_scan_types(self, scan_type):
16+
return "Snyk Issue API output file can be imported in JSON format."
17+
18+
def get_findings(self, json_output, test):
19+
tree = self.parse_json(json_output)
20+
return self.process_tree(tree, test)
21+
22+
def parse_json(self, json_output):
23+
try:
24+
data = json_output.read()
25+
try:
26+
tree = json.loads(str(data, "utf-8"))
27+
except Exception:
28+
tree = json.loads(data)
29+
except Exception:
30+
msg = "Invalid format"
31+
raise ValueError(msg)
32+
33+
return tree
34+
35+
def process_tree(self, tree, test):
36+
if not tree or "data" not in tree:
37+
return []
38+
39+
findings = []
40+
for issue in tree.get("data", []):
41+
finding = self.get_finding(issue, test)
42+
if finding:
43+
findings.append(finding)
44+
return findings
45+
46+
def get_finding(self, issue, test):
47+
# Check top-level type must be "issue" as "packages" have their own API it seems.
48+
if not issue or issue.get("type") != "issue":
49+
return None
50+
51+
attributes = issue.get("attributes", {})
52+
53+
# Check attributes-level type must be "code"
54+
# Other items are not supported yet due to a lack of samples and lack of documentation
55+
# package_vulnerability,license,cloud,code,customconfig
56+
if attributes.get("type") != "code":
57+
return None
58+
59+
# Extract CWE classes
60+
cwes = []
61+
for class_info in attributes.get("classes", []):
62+
if class_info.get("source") == "CWE":
63+
cwe_id = class_info.get("id", "").replace("CWE-", "")
64+
if cwe_id.isdigit():
65+
cwes.append(int(cwe_id))
66+
67+
# Extract location information, fixability and collect all source locations for impact
68+
file_path = None
69+
line = None
70+
fix_available = False
71+
impact_locations = []
72+
73+
for coordinate in attributes.get("coordinates", []):
74+
# Check if any fix is available
75+
if coordinate.get("is_fixable_snyk") or \
76+
coordinate.get("is_fixable_upstream") or \
77+
coordinate.get("is_fixable_manually"):
78+
fix_available = True
79+
80+
for representation in coordinate.get("representations", []):
81+
if "sourceLocation" in representation:
82+
location = representation["sourceLocation"]
83+
region = location.get("region", {})
84+
start = region.get("start", {})
85+
end = region.get("end", {})
86+
87+
# Store location details for impact field
88+
impact_locations.append([
89+
"Source Location:",
90+
f"File: {location.get('file', 'Unknown')}",
91+
f"Commit: {location.get('commit_id', 'Unknown')}",
92+
f"Lines: {start.get('line', '?')}-{end.get('line', '?')}",
93+
f"Columns: {start.get('column', '?')}-{end.get('column', '?')}",
94+
"", # Empty line between locations
95+
])
96+
97+
# Store first location for finding fields
98+
if not file_path:
99+
file_path = location.get("file")
100+
if region:
101+
line = start.get("line")
102+
103+
# Map severity levels
104+
severity_map = {
105+
"critical": "Critical",
106+
"high": "High",
107+
"medium": "Medium",
108+
"low": "Low",
109+
"info": "Info",
110+
}
111+
severity = severity_map.get(attributes.get("effective_severity_level", "").lower(), "Info")
112+
113+
# Parse created_at date
114+
created = None
115+
if attributes.get("created_at"):
116+
with suppress(ValueError):
117+
created = datetime.strptime(attributes["created_at"], "%Y-%m-%dT%H:%M:%S.%fZ")
118+
if not created:
119+
with suppress(ValueError):
120+
created = datetime.strptime(attributes["created_at"], "%Y-%m-%dT%H:%M:%SZ")
121+
122+
# Create finding
123+
finding = Finding(
124+
title=attributes.get("title", ""),
125+
test=test,
126+
severity=severity,
127+
description=attributes.get("description", ""),
128+
static_finding=True,
129+
dynamic_finding=False,
130+
unique_id_from_tool=issue.get("id"),
131+
file_path=file_path,
132+
line=line,
133+
out_of_scope=attributes.get("ignored", False),
134+
active=attributes.get("status") == "open" and not attributes.get("ignored", False),
135+
verified=True,
136+
cwe=cwes[0] if cwes else None,
137+
date=created,
138+
)
139+
140+
# Set fix_available if the field exists in the model
141+
if hasattr(finding, "fix_available"):
142+
finding.fix_available = fix_available
143+
144+
# Add risk score if available
145+
risk = attributes.get("risk", {})
146+
if risk and "score" in risk:
147+
score = risk["score"]
148+
if isinstance(score, dict):
149+
finding.severity_justification = (
150+
f"Risk Score: {score.get('value', 'N/A')} "
151+
f"(Model: {score.get('model', 'N/A')})"
152+
)
153+
154+
# Add additional CWEs as references
155+
if len(cwes) > 1:
156+
finding.references = "Additional CWEs: " + ", ".join(f"CWE-{cwe}" for cwe in cwes[1:])
157+
158+
# Add problem details and all source locations to impact
159+
impact_details = []
160+
161+
# Add problem information
162+
problems = attributes.get("problems", [])
163+
if problems:
164+
problem = problems[0] # Take the first problem
165+
impact_details.extend([
166+
f"Source: {problem.get('source', 'Unknown')}",
167+
f"Type: {problem.get('type', 'Unknown')}",
168+
f"Last Updated: {problem.get('updated_at', 'Unknown')}",
169+
f"Severity: {severity}",
170+
"", # Empty line before locations
171+
])
172+
173+
# Add all source locations
174+
for location in impact_locations:
175+
impact_details.extend(location)
176+
177+
if impact_details:
178+
finding.impact = "\n".join(impact_details).rstrip()
179+
180+
return finding
Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,6 @@
1+
{
2+
"jsonapi": {
3+
"version": "1.0"
4+
},
5+
"data": []
6+
}

0 commit comments

Comments
 (0)