Skip to content

Commit ddadc53

Browse files
committed
refactor: Extract ChangeDetector from DiffStix
- Create ChangeDetector class in core/change_detector.py (406 lines) - Extract 10 change detection methods into focused class: - detect_revocation() (45 lines) - detect_deprecation() (20 lines) - categorize_version_change() (43 lines) - process_description_changes() (24 lines) - process_relationship_changes() (13 lines) - collect_related_objects() (36 lines) - create_changelog_entry() (30 lines) - find_technique_mitigation_changes() (16 lines) - collect_detection_objects() (58 lines) - find_technique_detection_changes() (33 lines) - Update DiffStix to delegate to ChangeDetector - DiffStix reduced from 1,061 lines to 918 lines (13.5% reduction) - Total reduction from original: 558 lines (38.2% from 1,462 lines) - All 132/133 tests passing (only known permission test fails)
1 parent 69b0689 commit ddadc53

2 files changed

Lines changed: 371 additions & 157 deletions

File tree

Lines changed: 357 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,357 @@
1+
"""Change detector for analyzing differences between STIX object versions."""
2+
3+
import difflib
4+
from typing import Dict
5+
6+
from loguru import logger
7+
8+
from mitreattack.diffStix.utils.stix_utils import get_attack_id, resolve_datacomponent_parent
9+
from mitreattack.diffStix.utils.version_utils import (
10+
AttackObjectVersion,
11+
get_attack_object_version,
12+
is_major_version_change,
13+
is_minor_version_change,
14+
is_other_version_change,
15+
is_patch_change,
16+
)
17+
18+
19+
class ChangeDetector:
20+
"""Detects and categorizes changes between old and new versions of STIX objects."""
21+
22+
def __init__(self, diff_stix_instance):
23+
"""Initialize ChangeDetector with a DiffStix instance.
24+
25+
Parameters
26+
----------
27+
diff_stix_instance : DiffStix
28+
The DiffStix instance containing data and helper methods
29+
"""
30+
self.diff_stix = diff_stix_instance
31+
32+
def detect_revocation(self, stix_id: str, old_obj: dict, new_obj: dict, new_attack_objects: dict, domain: str):
33+
"""Detect if an object has been newly revoked.
34+
35+
Parameters
36+
----------
37+
stix_id : str
38+
The STIX ID of the object.
39+
old_obj : dict
40+
The old version of the STIX object.
41+
new_obj : dict
42+
The new version of the STIX object.
43+
new_attack_objects : dict
44+
Dictionary of all new attack objects for this type.
45+
domain : str
46+
The ATT&CK domain.
47+
48+
Returns
49+
-------
50+
None, True, or False
51+
None if not a revocation scenario (not revoked or already revoked),
52+
True if newly revoked and successfully validated,
53+
False if validation failed (object should be skipped).
54+
"""
55+
# Not revoked at all - continue to deprecation/version checking
56+
if not new_obj.get("revoked"):
57+
return None
58+
59+
# Already revoked in old version - not a change, but still revoked
60+
# Original code would exit the if block and NOT process as version change
61+
if old_obj.get("revoked"):
62+
return None
63+
64+
# Newly revoked - validate the revocation
65+
if stix_id not in self.diff_stix.data["new"][domain]["relationships"]["revoked-by"]:
66+
logger.error(f"[{stix_id}] revoked object has no revoked-by relationship")
67+
return False # Validation error - skip this object
68+
69+
revoked_by_key = self.diff_stix.data["new"][domain]["relationships"]["revoked-by"][stix_id][0]["target_ref"]
70+
if revoked_by_key not in new_attack_objects:
71+
logger.error(f"{stix_id} revoked by {revoked_by_key}, but {revoked_by_key} not found in new STIX bundle!!")
72+
return False # Validation error - skip this object
73+
74+
revoking_object = new_attack_objects[revoked_by_key]
75+
new_obj["revoked_by"] = revoking_object
76+
return True # Successfully detected new revocation
77+
78+
def detect_deprecation(self, old_obj: dict, new_obj: dict) -> bool:
79+
"""Detect if an object has been newly deprecated.
80+
81+
Parameters
82+
----------
83+
old_obj : dict
84+
The old version of the STIX object.
85+
new_obj : dict
86+
The new version of the STIX object.
87+
88+
Returns
89+
-------
90+
bool
91+
True if the object was newly deprecated, False otherwise.
92+
"""
93+
if not new_obj.get("x_mitre_deprecated"):
94+
return False
95+
96+
# If previously deprecated, not a change
97+
return not old_obj.get("x_mitre_deprecated")
98+
99+
def categorize_version_change(
100+
self, stix_id: str, old_obj: dict, new_obj: dict
101+
) -> tuple[str | None, AttackObjectVersion, AttackObjectVersion]:
102+
"""Categorize the type of version change for an object.
103+
104+
Parameters
105+
----------
106+
stix_id : str
107+
The STIX ID of the object.
108+
old_obj : dict
109+
The old version of the STIX object.
110+
new_obj : dict
111+
The new version of the STIX object.
112+
113+
Returns
114+
-------
115+
tuple[str | None, AttackObjectVersion, AttackObjectVersion]
116+
A tuple containing:
117+
- category: 'major', 'minor', 'other', 'patch', or None (unchanged)
118+
- old_version: The old version
119+
- new_version: The new version
120+
"""
121+
# Verify if there are new contributors on the object
122+
self.diff_stix.update_contributors(old_object=old_obj, new_object=new_obj)
123+
124+
old_version = get_attack_object_version(old_obj)
125+
new_version = get_attack_object_version(new_obj)
126+
new_obj["previous_version"] = old_version
127+
128+
if is_major_version_change(old_version=old_version, new_version=new_version):
129+
return "major", old_version, new_version
130+
elif is_minor_version_change(old_version=old_version, new_version=new_version):
131+
return "minor", old_version, new_version
132+
elif is_other_version_change(old_version=old_version, new_version=new_version):
133+
attack_id = get_attack_id(new_obj)
134+
logger.warning(
135+
f"{stix_id} - Unexpected version increase {old_version}{new_version}. [{attack_id}] {new_obj['name']}"
136+
)
137+
return "other", old_version, new_version
138+
elif is_patch_change(old_stix_obj=old_obj, new_stix_obj=new_obj):
139+
return "patch", old_version, new_version
140+
else:
141+
return None, old_version, new_version
142+
143+
def process_description_changes(self, old_obj: dict, new_obj: dict):
144+
"""Process and store description changes between old and new objects.
145+
146+
Parameters
147+
----------
148+
old_obj : dict
149+
The old version of the STIX object.
150+
new_obj : dict
151+
The new version of the STIX object.
152+
"""
153+
if "description" not in old_obj or "description" not in new_obj:
154+
return
155+
156+
old_lines = old_obj["description"].replace("\n", " ").splitlines()
157+
new_lines = new_obj["description"].replace("\n", " ").splitlines()
158+
old_lines_unique = [line for line in old_lines if line not in new_lines]
159+
new_lines_unique = [line for line in new_lines if line not in old_lines]
160+
161+
if old_lines_unique or new_lines_unique:
162+
html_diff = difflib.HtmlDiff(wrapcolumn=60)
163+
html_diff._legend = "" # type: ignore[attr-defined]
164+
delta = html_diff.make_table(old_lines, new_lines, "Old Description", "New Description")
165+
new_obj["description_change_table"] = delta
166+
167+
def process_relationship_changes(self, new_obj: dict, domain: str):
168+
"""Process relationship changes for attack patterns (techniques).
169+
170+
Parameters
171+
----------
172+
new_obj : dict
173+
The new version of the STIX object.
174+
domain : str
175+
The ATT&CK domain.
176+
"""
177+
if new_obj["type"] == "attack-pattern":
178+
self.find_technique_mitigation_changes(new_obj, domain)
179+
self.find_technique_detection_changes(new_obj, domain)
180+
181+
def collect_related_objects(
182+
self, stix_id: str, domain: str, relationship_type: str, object_type: str, age: str
183+
) -> dict:
184+
"""Collect related objects from relationships.
185+
186+
Parameters
187+
----------
188+
stix_id : str
189+
The STIX ID of the technique to find relationships for.
190+
domain : str
191+
The ATT&CK domain.
192+
relationship_type : str
193+
The type of relationship (e.g., 'mitigations', 'detections').
194+
object_type : str
195+
The type of object to collect (e.g., 'mitigations', 'datacomponents').
196+
age : str
197+
Either 'old' or 'new' to specify which data version to use.
198+
199+
Returns
200+
-------
201+
dict
202+
Dictionary of related objects keyed by STIX ID.
203+
"""
204+
related_objects = {}
205+
all_domain_objects = self.diff_stix.data[age][domain]["attack_objects"][object_type]
206+
207+
for _, relationship in self.diff_stix.data[age][domain]["relationships"][relationship_type].items():
208+
if relationship.get("x_mitre_deprecated") or relationship.get("revoked"):
209+
continue
210+
if stix_id == relationship["target_ref"]:
211+
source_ref_id = relationship["source_ref"]
212+
if source_ref_id in all_domain_objects:
213+
related_obj = all_domain_objects[source_ref_id]
214+
related_objects[related_obj["id"]] = related_obj
215+
216+
return related_objects
217+
218+
def create_changelog_entry(self, old_items: dict, new_items: dict, formatter: callable = None) -> dict:
219+
"""Create a changelog entry with shared, new, and dropped items.
220+
221+
Parameters
222+
----------
223+
old_items : dict
224+
Dictionary of old objects or strings keyed by STIX ID.
225+
new_items : dict
226+
Dictionary of new objects or strings keyed by STIX ID.
227+
formatter : callable, optional
228+
Function to format item into string. Defaults to "ID: name" format for objects.
229+
If items are already strings, pass lambda x: x.
230+
231+
Returns
232+
-------
233+
dict
234+
Dictionary with 'shared', 'new', and 'dropped' keys containing sorted lists.
235+
"""
236+
if formatter is None:
237+
formatter = lambda obj: f"{get_attack_id(stix_obj=obj)}: {obj['name']}"
238+
239+
shared = old_items.keys() & new_items.keys()
240+
brand_new = new_items.keys() - old_items.keys()
241+
dropped = old_items.keys() - new_items.keys()
242+
243+
return {
244+
"shared": sorted([formatter(new_items[stix_id]) for stix_id in shared]),
245+
"new": sorted([formatter(new_items[stix_id]) for stix_id in brand_new]),
246+
"dropped": sorted([formatter(old_items[stix_id]) for stix_id in dropped]),
247+
}
248+
249+
def find_technique_mitigation_changes(self, new_stix_obj: dict, domain: str):
250+
"""Find changes in the relationships between Techniques and Mitigations.
251+
252+
Parameters
253+
----------
254+
new_stix_obj : dict
255+
An ATT&CK Technique (attack-pattern) STIX Domain Object (SDO).
256+
domain : str
257+
An ATT&CK domain from the following list ["enterprise-attack", "mobile-attack", "ics-attack"]
258+
"""
259+
stix_id = new_stix_obj["id"]
260+
261+
old_mitigations = self.collect_related_objects(stix_id, domain, "mitigations", "mitigations", "old")
262+
new_mitigations = self.collect_related_objects(stix_id, domain, "mitigations", "mitigations", "new")
263+
264+
new_stix_obj["changelog_mitigations"] = self.create_changelog_entry(old_mitigations, new_mitigations)
265+
266+
def collect_detection_objects(self, stix_id: str, domain: str, age: str) -> tuple[dict[str, str], dict[str, str]]:
267+
"""Collect detection-related objects (datacomponents and detectionstrategies) for a technique.
268+
269+
Parameters
270+
----------
271+
stix_id : str
272+
The STIX ID of the technique to find detections for.
273+
domain : str
274+
The ATT&CK domain.
275+
age : str
276+
Either 'old' or 'new' to specify which data version to use.
277+
278+
Returns
279+
-------
280+
tuple[dict[str, str], dict[str, str]]
281+
Two dictionaries:
282+
- datacomponent_detections: formatted detection strings keyed by STIX ID
283+
- detectionstrategy_detections: formatted detection strings keyed by STIX ID
284+
"""
285+
all_datasources = self.diff_stix.data[age][domain]["attack_objects"]["datasources"]
286+
all_datacomponents = self.diff_stix.data[age][domain]["attack_objects"]["datacomponents"]
287+
all_detectionstrategies = self.diff_stix.data[age][domain]["attack_objects"]["detectionstrategies"]
288+
289+
datacomponent_detections = {}
290+
detectionstrategy_detections = {}
291+
292+
for _, detection_relationship in self.diff_stix.data[age][domain]["relationships"]["detections"].items():
293+
if detection_relationship.get("x_mitre_deprecated") or detection_relationship.get("revoked"):
294+
continue
295+
if stix_id == detection_relationship["target_ref"]:
296+
sourceref_id = detection_relationship["source_ref"]
297+
298+
# Handle datacomponents with parent datasource resolution
299+
if sourceref_id in all_datacomponents:
300+
datacomponent = all_datacomponents[sourceref_id]
301+
datasource_id = datacomponent.get("x_mitre_data_source_ref")
302+
if not datasource_id:
303+
datasource_id = resolve_datacomponent_parent(datacomponent, all_datasources)
304+
305+
if datasource_id and datasource_id in all_datasources:
306+
datasource = all_datasources[datasource_id]
307+
datasource_attack_id = get_attack_id(stix_obj=datasource)
308+
datacomponent_detections[sourceref_id] = (
309+
f"{datasource_attack_id}: {datasource['name']} ({datacomponent['name']})"
310+
)
311+
else:
312+
# No parent datasource identified — show standalone
313+
datacomponent_detections[sourceref_id] = f"{datacomponent['name']}"
314+
315+
# Handle detectionstrategies
316+
if sourceref_id in all_detectionstrategies:
317+
detectionstrategy = all_detectionstrategies[sourceref_id]
318+
detectionstrategy_attack_id = get_attack_id(stix_obj=detectionstrategy)
319+
detectionstrategy_detections[sourceref_id] = (
320+
f"{detectionstrategy_attack_id}: {detectionstrategy['name']}"
321+
)
322+
323+
return datacomponent_detections, detectionstrategy_detections
324+
325+
def find_technique_detection_changes(self, new_stix_obj: dict, domain: str):
326+
"""Find changes in the relationships between Techniques and Datacomponents.
327+
328+
Parameters
329+
----------
330+
new_stix_obj : dict
331+
An ATT&CK Technique (attack-pattern) STIX Domain Object (SDO).
332+
domain : str
333+
An ATT&CK domain from the following list ["enterprise-attack", "mobile-attack", "ics-attack"]
334+
"""
335+
stix_id = new_stix_obj["id"]
336+
337+
# Collect detection objects from old and new data
338+
old_datacomponent_detections, old_detectionstrategy_detections = self.collect_detection_objects(
339+
stix_id, domain, "old"
340+
)
341+
new_datacomponent_detections, new_detectionstrategy_detections = self.collect_detection_objects(
342+
stix_id, domain, "new"
343+
)
344+
345+
# Create changelog for datacomponent detections
346+
new_stix_obj["changelog_datacomponent_detections"] = self.create_changelog_entry(
347+
old_datacomponent_detections,
348+
new_datacomponent_detections,
349+
formatter=lambda obj: obj, # Already formatted as strings
350+
)
351+
352+
# Create changelog for detectionstrategy detections
353+
new_stix_obj["changelog_detectionstrategy_detections"] = self.create_changelog_entry(
354+
old_detectionstrategy_detections,
355+
new_detectionstrategy_detections,
356+
formatter=lambda obj: obj, # Already formatted as strings
357+
)

0 commit comments

Comments
 (0)