Skip to content

Commit 893da42

Browse files
committed
refactor: Condense single-line delegation methods in DiffStix
Remove unnecessary wrapper methods and update internal calls to use components directly, reducing boilerplate and improving code clarity. Changes: - Removed 5 unused private wrapper methods: - _collect_related_objects() - _create_changelog_entry() - _collect_detection_objects() - _collect_domain_statistics() - _collect_unique_object_counts() - Updated internal calls to use components directly: - _load_all_domains(): self.load_domain() → self._data_loader.load_domain() - _process_additions(): self.update_contributors() → self._contributor_tracker.update_contributors() - _categorize_object_changes(): Fixed calls to use self._change_detector directly for: - process_description_changes() - process_relationship_changes() - Kept all public API methods for backward compatibility Impact: - diff_stix.py: 742 → 556 lines (25% reduction, 186 lines removed) - Total from original: 1,462 → 556 lines (62% reduction, 906 lines removed) - Eliminated unnecessary indirection while maintaining public API - Improved code maintainability and readability
1 parent 13b6083 commit 893da42

1 file changed

Lines changed: 11 additions & 197 deletions

File tree

mitreattack/diffStix/core/diff_stix.py

Lines changed: 11 additions & 197 deletions
Original file line numberDiff line numberDiff line change
@@ -134,95 +134,6 @@ def release_contributors(self, value: dict):
134134
"""
135135
self._contributor_tracker.release_contributors = value
136136

137-
def _detect_revocation(self, stix_id: str, old_obj: dict, new_obj: dict, new_attack_objects: dict, domain: str):
138-
"""Detect if an object has been newly revoked.
139-
140-
Parameters
141-
----------
142-
stix_id : str
143-
The STIX ID of the object.
144-
old_obj : dict
145-
The old version of the STIX object.
146-
new_obj : dict
147-
The new version of the STIX object.
148-
new_attack_objects : dict
149-
Dictionary of all new attack objects for this type.
150-
domain : str
151-
The ATT&CK domain.
152-
153-
Returns
154-
-------
155-
None, True, or False
156-
None if not a revocation scenario (not revoked or already revoked),
157-
True if newly revoked and successfully validated,
158-
False if validation failed (object should be skipped).
159-
"""
160-
return self._change_detector.detect_revocation(stix_id, old_obj, new_obj, new_attack_objects, domain)
161-
162-
def _detect_deprecation(self, old_obj: dict, new_obj: dict) -> bool:
163-
"""Detect if an object has been newly deprecated.
164-
165-
Parameters
166-
----------
167-
old_obj : dict
168-
The old version of the STIX object.
169-
new_obj : dict
170-
The new version of the STIX object.
171-
172-
Returns
173-
-------
174-
bool
175-
True if the object was newly deprecated, False otherwise.
176-
"""
177-
return self._change_detector.detect_deprecation(old_obj, new_obj)
178-
179-
def _categorize_version_change(
180-
self, stix_id: str, old_obj: dict, new_obj: dict
181-
) -> tuple[str | None, AttackObjectVersion, AttackObjectVersion]:
182-
"""Categorize the type of version change for an object.
183-
184-
Parameters
185-
----------
186-
stix_id : str
187-
The STIX ID of the object.
188-
old_obj : dict
189-
The old version of the STIX object.
190-
new_obj : dict
191-
The new version of the STIX object.
192-
193-
Returns
194-
-------
195-
tuple[str | None, AttackObjectVersion, AttackObjectVersion]
196-
A tuple containing:
197-
- category: 'major', 'minor', 'other', 'patch', or None (unchanged)
198-
- old_version: The old version
199-
- new_version: The new version
200-
"""
201-
return self._change_detector.categorize_version_change(stix_id, old_obj, new_obj)
202-
203-
def _process_description_changes(self, old_obj: dict, new_obj: dict):
204-
"""Process and store description changes between old and new objects.
205-
206-
Parameters
207-
----------
208-
old_obj : dict
209-
The old version of the STIX object.
210-
new_obj : dict
211-
The new version of the STIX object.
212-
"""
213-
return self._change_detector.process_description_changes(old_obj, new_obj)
214-
215-
def _process_relationship_changes(self, new_obj: dict, domain: str):
216-
"""Process relationship changes for attack patterns (techniques).
217-
218-
Parameters
219-
----------
220-
new_obj : dict
221-
The new version of the STIX object.
222-
domain : str
223-
The ATT&CK domain.
224-
"""
225-
return self._change_detector.process_relationship_changes(new_obj, domain)
226137

227138
def load_data(self):
228139
"""Orchestrate loading STIX data and detecting changes."""
@@ -232,7 +143,7 @@ def load_data(self):
232143
def _load_all_domains(self):
233144
"""Load STIX data for all configured domains."""
234145
for domain in track(self.domains, description="Loading domains"):
235-
self.load_domain(domain=domain)
146+
self._data_loader.load_domain(domain=domain)
236147

237148
def _detect_all_changes(self):
238149
"""Detect and categorize changes across all domains and object types."""
@@ -311,7 +222,9 @@ def _categorize_object_changes(self, old_objects: dict, new_objects: dict, domai
311222
new_stix_obj["detailed_diff"] = ddiff.to_json()
312223

313224
# Check for revocations
314-
revocation_result = self._detect_revocation(stix_id, old_stix_obj, new_stix_obj, new_objects, domain)
225+
revocation_result = self._change_detector.detect_revocation(
226+
stix_id, old_stix_obj, new_stix_obj, new_objects, domain
227+
)
315228
if revocation_result is False:
316229
continue # Validation failed - skip
317230
elif revocation_result is True:
@@ -321,14 +234,16 @@ def _categorize_object_changes(self, old_objects: dict, new_objects: dict, domai
321234
continue # Already revoked - skip
322235

323236
# Check for deprecations
324-
if self._detect_deprecation(old_stix_obj, new_stix_obj):
237+
if self._change_detector.detect_deprecation(old_stix_obj, new_stix_obj):
325238
changes["deprecations"].add(stix_id)
326239
continue
327240
elif new_stix_obj.get("x_mitre_deprecated"):
328241
continue # Already deprecated - skip
329242

330243
# Categorize version changes
331-
category, old_version, new_version = self._categorize_version_change(stix_id, old_stix_obj, new_stix_obj)
244+
category, old_version, new_version = self._change_detector.categorize_version_change(
245+
stix_id, old_stix_obj, new_stix_obj
246+
)
332247

333248
if category == "major":
334249
changes["major_version_changes"].add(stix_id)
@@ -345,8 +260,8 @@ def _categorize_object_changes(self, old_objects: dict, new_objects: dict, domai
345260
new_stix_obj["version_change"] = f"{old_version}{new_version}"
346261

347262
# Process description and relationship changes
348-
self._process_description_changes(old_stix_obj, new_stix_obj)
349-
self._process_relationship_changes(new_stix_obj, domain)
263+
self._change_detector.process_description_changes(old_stix_obj, new_stix_obj)
264+
self._change_detector.process_relationship_changes(new_stix_obj, domain)
350265

351266
# Process new objects
352267
self._process_additions(changes["additions"], new_objects)
@@ -368,7 +283,7 @@ def _process_additions(self, additions: set, new_objects: dict):
368283
attack_id = get_attack_id(new_stix_obj)
369284

370285
# Add contributions from additions
371-
self.update_contributors(old_object=None, new_object=new_stix_obj)
286+
self._contributor_tracker.update_contributors(old_object=None, new_object=new_stix_obj)
372287

373288
# Verify version is 1.0
374289
x_mitre_version = get_attack_object_version(stix_obj=new_stix_obj)
@@ -439,51 +354,6 @@ def _store_categorized_changes(
439354
new_objects[stix_id] for stix_id in changes["unchanged"]
440355
]
441356

442-
def _collect_related_objects(
443-
self, stix_id: str, domain: str, relationship_type: str, object_type: str, age: str
444-
) -> dict:
445-
"""Collect related objects from relationships.
446-
447-
Parameters
448-
----------
449-
stix_id : str
450-
The STIX ID of the technique to find relationships for.
451-
domain : str
452-
The ATT&CK domain.
453-
relationship_type : str
454-
The type of relationship (e.g., 'mitigations', 'detections').
455-
object_type : str
456-
The type of object to collect (e.g., 'mitigations', 'datacomponents').
457-
age : str
458-
Either 'old' or 'new' to specify which data version to use.
459-
460-
Returns
461-
-------
462-
dict
463-
Dictionary of related objects keyed by STIX ID.
464-
"""
465-
return self._change_detector.collect_related_objects(stix_id, domain, relationship_type, object_type, age)
466-
467-
def _create_changelog_entry(self, old_items: dict, new_items: dict, formatter: callable = None) -> dict:
468-
"""Create a changelog entry with shared, new, and dropped items.
469-
470-
Parameters
471-
----------
472-
old_items : dict
473-
Dictionary of old objects or strings keyed by STIX ID.
474-
new_items : dict
475-
Dictionary of new objects or strings keyed by STIX ID.
476-
formatter : callable, optional
477-
Function to format item into string. Defaults to "ID: name" format for objects.
478-
If items are already strings, pass lambda x: x.
479-
480-
Returns
481-
-------
482-
dict
483-
Dictionary with 'shared', 'new', and 'dropped' keys containing sorted lists.
484-
"""
485-
return self._change_detector.create_changelog_entry(old_items, new_items, formatter)
486-
487357
def find_technique_mitigation_changes(self, new_stix_obj: dict, domain: str):
488358
"""Find changes in the relationships between Techniques and Mitigations.
489359
@@ -496,27 +366,6 @@ def find_technique_mitigation_changes(self, new_stix_obj: dict, domain: str):
496366
"""
497367
return self._change_detector.find_technique_mitigation_changes(new_stix_obj, domain)
498368

499-
def _collect_detection_objects(self, stix_id: str, domain: str, age: str) -> tuple[dict[str, str], dict[str, str]]:
500-
"""Collect detection-related objects (datacomponents and detectionstrategies) for a technique.
501-
502-
Parameters
503-
----------
504-
stix_id : str
505-
The STIX ID of the technique to find detections for.
506-
domain : str
507-
The ATT&CK domain.
508-
age : str
509-
Either 'old' or 'new' to specify which data version to use.
510-
511-
Returns
512-
-------
513-
tuple[dict[str, str], dict[str, str]]
514-
Two dictionaries:
515-
- datacomponent_detections: formatted detection strings keyed by STIX ID
516-
- detectionstrategy_detections: formatted detection strings keyed by STIX ID
517-
"""
518-
return self._change_detector.collect_detection_objects(stix_id, domain, age)
519-
520369
def find_technique_detection_changes(self, new_stix_obj: dict, domain: str):
521370
"""Find changes in the relationships between Techniques and Datacomponents.
522371
@@ -661,41 +510,6 @@ def placard(self, stix_object: dict, section: str, domain: str) -> str:
661510
"""
662511
return self._markdown_generator.placard(stix_object, section, domain)
663512

664-
def _collect_domain_statistics(self, datastore: MemoryStore, domain_name: str) -> DomainStatistics:
665-
"""Collect statistics for a single domain from a STIX datastore.
666-
667-
Parameters
668-
----------
669-
datastore : MemoryStore
670-
The STIX MemoryStore containing the domain data.
671-
domain_name : str
672-
Display name of the domain (e.g., "Enterprise", "Mobile", "ICS").
673-
674-
Returns
675-
-------
676-
DomainStatistics
677-
Statistics for the domain.
678-
"""
679-
return self._statistics_collector.collect_domain_statistics(datastore, domain_name)
680-
681-
def _collect_unique_object_counts(self, datastore_version: str) -> dict[str, int]:
682-
"""Collect counts of unique objects across all domains for a specific version.
683-
684-
Some objects (Software, Groups, Campaigns) may appear in multiple domains.
685-
This function counts unique objects to avoid double-counting.
686-
687-
Parameters
688-
----------
689-
datastore_version : str
690-
Either "old" or "new" to specify which version's data to analyze.
691-
692-
Returns
693-
-------
694-
dict of str to int
695-
Counts of unique software, groups, and campaigns.
696-
"""
697-
return self._statistics_collector.collect_unique_object_counts(datastore_version)
698-
699513
def get_statistics_section(self, datastore_version: str = "new") -> str:
700514
"""Generate a markdown section with ATT&CK statistics for all domains.
701515

0 commit comments

Comments
 (0)