|
9 | 9 |
|
10 | 10 | from pathlib import Path |
11 | 11 |
|
12 | | -import saneyaml |
13 | 12 | from aboutcode.pipeline import LoopProgress |
14 | 13 | from fetchcode.vcs import fetch_via_vcs |
15 | 14 | from yaml import YAMLError |
16 | 15 |
|
17 | 16 | from vulnerabilities.models import AdvisoryAlias |
18 | | -from vulnerabilities.models import AdvisoryDetectionRule |
| 17 | +from vulnerabilities.models import DetectionRule |
| 18 | +from vulnerabilities.models import DetectionRuleTypes |
19 | 19 | from vulnerabilities.pipelines import VulnerableCodePipeline |
20 | 20 | from vulnerabilities.utils import find_all_cve |
21 | 21 |
|
@@ -50,42 +50,48 @@ def collect_and_store_rules(self): |
50 | 50 | self.log(f"Enhancing the vulnerability with {rules_count:,d} rule records") |
51 | 51 | progress = LoopProgress(total_iterations=rules_count, logger=self.log) |
52 | 52 | for file_path in progress.iter(yaml_files): |
53 | | - cve_ids = find_all_cve(str(file_path)) |
54 | | - if not cve_ids or len(cve_ids) > 1: |
| 53 | + if any(part in [".github", "images", "documentation"] for part in file_path.parts): |
55 | 54 | continue |
56 | 55 |
|
57 | | - cve_id = cve_ids[0] |
58 | | - |
59 | 56 | with open(file_path, "r") as f: |
60 | 57 | try: |
61 | | - rule_data = saneyaml.load(f) |
| 58 | + rule_data = f.read() |
62 | 59 | except YAMLError as err: |
63 | 60 | self.log(f"Invalid YAML in {file_path}: {err}. Skipping.") |
64 | 61 | continue |
65 | 62 |
|
66 | | - advisories = set() |
67 | | - try: |
68 | | - if alias := AdvisoryAlias.objects.get(alias=cve_id): |
69 | | - for adv in alias.advisories.all(): |
70 | | - advisories.add(adv) |
71 | | - except AdvisoryAlias.DoesNotExist: |
72 | | - self.log(f"Advisory {file_path.name} not found.") |
73 | | - continue |
74 | | - |
75 | | - rule_text = saneyaml.dump(rule_data) |
76 | 63 | rule_url = f"https://raw.githubusercontent.com/SigmaHQ/sigma/refs/heads/master/{file_path.relative_to(base_directory)}" |
| 64 | + cve_ids = find_all_cve(str(file_path)) |
| 65 | + found_advisories = set() |
| 66 | + for cve_id in cve_ids: |
| 67 | + try: |
| 68 | + alias = AdvisoryAlias.objects.get(alias=cve_id) |
| 69 | + for adv in alias.advisories.all(): |
| 70 | + found_advisories.add(adv) |
| 71 | + except AdvisoryAlias.DoesNotExist: |
| 72 | + self.log(f"Advisory {file_path.name} not found.") |
| 73 | + continue |
77 | 74 |
|
78 | | - for advisory in advisories: |
79 | | - AdvisoryDetectionRule.objects.update_or_create( |
80 | | - advisory=advisory, |
81 | | - rule_type="sigma", |
| 75 | + for adv in found_advisories: |
| 76 | + DetectionRule.objects.update_or_create( |
| 77 | + rule_text=rule_data, |
| 78 | + advisory=adv, |
82 | 79 | defaults={ |
83 | | - "rule_text": rule_text, |
| 80 | + "rule_type": DetectionRuleTypes.SIGMA, |
84 | 81 | "source_url": rule_url, |
85 | 82 | }, |
86 | 83 | ) |
87 | 84 |
|
88 | | - self.log(f"Successfully added {rules_count:,d} rules advisory") |
| 85 | + if not found_advisories: |
| 86 | + DetectionRule.objects.update_or_create( |
| 87 | + rule_text=rule_data, |
| 88 | + advisory=None, |
| 89 | + defaults={ |
| 90 | + "rule_type": DetectionRuleTypes.SIGMA, |
| 91 | + "source_url": rule_url, |
| 92 | + }, |
| 93 | + ) |
| 94 | + self.log(f"Successfully processed rules.") |
89 | 95 |
|
90 | 96 | def clean_downloads(self): |
91 | 97 | if self.vcs_response: |
|
0 commit comments