Skip to content

Commit 6b4af25

Browse files
committed
fix: update logic to work with new detection strategy objects
1 parent 2a82ddf commit 6b4af25

1 file changed

Lines changed: 71 additions & 23 deletions

File tree

mitreattack/diffStix/changelog_helper.py

Lines changed: 71 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -495,14 +495,23 @@ def find_technique_detection_changes(self, new_stix_obj: dict, domain: str):
495495
continue
496496
if stix_id == detection_relationship["target_ref"]:
497497
old_sourceref_id = detection_relationship["source_ref"]
498+
# Datacomponent -> Data source relation used to exist via x_mitre_data_source_ref.
499+
# New STIX may not include parent datasources; attempt explicit ref first, then a heuristic lookup.
498500
if old_sourceref_id in all_old_domain_datacomponents:
499501
old_datacomponent = all_old_domain_datacomponents[old_sourceref_id]
500-
old_datasource_id = old_datacomponent["x_mitre_data_source_ref"]
501-
old_datasource = all_old_domain_datasources[old_datasource_id]
502-
old_datasource_attack_id = get_attack_id(stix_obj=old_datasource)
503-
old_datacomponent_detections[old_sourceref_id] = (
504-
f"{old_datasource_attack_id}: {old_datasource['name']} ({old_datacomponent['name']})"
505-
)
502+
old_datasource_id = old_datacomponent.get("x_mitre_data_source_ref")
503+
if not old_datasource_id:
504+
# Best-effort fallback: try to resolve a parent datasource from available datasource objects.
505+
old_datasource_id = resolve_datacomponent_parent(old_datacomponent, all_old_domain_datasources)
506+
if old_datasource_id and old_datasource_id in all_old_domain_datasources:
507+
old_datasource = all_old_domain_datasources[old_datasource_id]
508+
old_datasource_attack_id = get_attack_id(stix_obj=old_datasource)
509+
old_datacomponent_detections[old_sourceref_id] = (
510+
f"{old_datasource_attack_id}: {old_datasource['name']} ({old_datacomponent['name']})"
511+
)
512+
else:
513+
# No parent datasource identified — show the datacomponent name as standalone.
514+
old_datacomponent_detections[old_sourceref_id] = f"{old_datacomponent['name']}"
506515
if old_sourceref_id in all_old_domain_detectionstrategies:
507516
old_detectionstrategy = all_old_domain_detectionstrategies[old_sourceref_id]
508517
old_detectionstrategy_attack_id = get_attack_id(stix_obj=old_detectionstrategy)
@@ -515,14 +524,22 @@ def find_technique_detection_changes(self, new_stix_obj: dict, domain: str):
515524
continue
516525
if stix_id == detection_relationship["target_ref"]:
517526
new_sourceref_id = detection_relationship["source_ref"]
527+
# Handle datacomponents that may no longer reference a datasource.
518528
if new_sourceref_id in all_new_domain_datacomponents:
519529
new_datacomponent = all_new_domain_datacomponents[new_sourceref_id]
520-
new_datasource_id = new_datacomponent["x_mitre_data_source_ref"]
521-
new_datasource = all_new_domain_datasources[new_datasource_id]
522-
new_datasource_attack_id = get_attack_id(stix_obj=new_datasource)
523-
new_datacomponent_detections[new_sourceref_id] = (
524-
f"{new_datasource_attack_id}: {new_datasource['name']} ({new_datacomponent['name']})"
525-
)
530+
new_datasource_id = new_datacomponent.get("x_mitre_data_source_ref")
531+
if not new_datasource_id:
532+
# Best-effort fallback lookup into datasources
533+
new_datasource_id = resolve_datacomponent_parent(new_datacomponent, all_new_domain_datasources)
534+
if new_datasource_id and new_datasource_id in all_new_domain_datasources:
535+
new_datasource = all_new_domain_datasources[new_datasource_id]
536+
new_datasource_attack_id = get_attack_id(stix_obj=new_datasource)
537+
new_datacomponent_detections[new_sourceref_id] = (
538+
f"{new_datasource_attack_id}: {new_datasource['name']} ({new_datacomponent['name']})"
539+
)
540+
else:
541+
# No parent datasource identified — show the datacomponent name as standalone.
542+
new_datacomponent_detections[new_sourceref_id] = f"{new_datacomponent['name']}"
526543
if new_sourceref_id in all_new_domain_detectionstrategies:
527544
new_detectionstrategy = all_new_domain_detectionstrategies[new_sourceref_id]
528545
new_detectionstrategy_attack_id = get_attack_id(stix_obj=new_detectionstrategy)
@@ -813,11 +830,15 @@ def get_groupings(self, object_type: str, stix_objects: List, section: str, doma
813830
if datacomponent["id"] not in children:
814831
continue
815832

816-
parent_datasource_id = datacomponent["x_mitre_data_source_ref"]
833+
# Prefer explicit reference, otherwise try a heuristic lookup
834+
parent_datasource_id = datacomponent.get("x_mitre_data_source_ref")
835+
if not parent_datasource_id:
836+
parent_datasource_id = resolve_datacomponent_parent(datacomponent, datasources)
817837
the_datacomponent = children[datacomponent["id"]]
818-
if parent_datasource_id not in parentToChildren:
819-
parentToChildren[parent_datasource_id] = []
820-
parentToChildren[parent_datasource_id].append(the_datacomponent)
838+
if parent_datasource_id:
839+
if parent_datasource_id not in parentToChildren:
840+
parentToChildren[parent_datasource_id] = []
841+
parentToChildren[parent_datasource_id].append(the_datacomponent)
821842

822843
# now group parents and children
823844
groupings = []
@@ -898,7 +919,11 @@ def get_parent_stix_object(self, stix_object: dict, datastore_version: str, doma
898919
parent_id = subtechnique_relationship["target_ref"]
899920
return techniques[parent_id]
900921
elif stix_object["type"] == "x-mitre-data-component":
901-
return datasources[stix_object.get("x_mitre_data_source_ref")]
922+
parent_ref = stix_object.get("x_mitre_data_source_ref")
923+
if parent_ref and parent_ref in datasources:
924+
return datasources[parent_ref]
925+
# No parent datasource available for this datacomponent.
926+
return {}
902927

903928
# possible reasons for no parent object: deprecated/revoked/wrong object type passed in
904929
return {}
@@ -944,12 +969,16 @@ def placard(self, stix_object: dict, section: str, domain: str) -> str:
944969
parent_object = self.get_parent_stix_object(
945970
stix_object=revoker, datastore_version=datastore_version, domain=domain
946971
)
947-
parent_name = parent_object.get("name", "ERROR NO PARENT")
948-
relative_url = get_relative_data_component_url(datasource=parent_object, datacomponent=stix_object)
949-
revoker_link = f"{self.site_prefix}/{relative_url}"
950-
placard_string = (
951-
f"{stix_object['name']} (revoked by {parent_name}: [{revoker['name']}]({revoker_link}))"
952-
)
972+
if parent_object:
973+
parent_name = parent_object.get("name", "ERROR NO PARENT")
974+
relative_url = get_relative_data_component_url(datasource=parent_object, datacomponent=stix_object)
975+
revoker_link = f"{self.site_prefix}/{relative_url}"
976+
placard_string = (
977+
f"{stix_object['name']} (revoked by {parent_name}: [{revoker['name']}]({revoker_link}))"
978+
)
979+
else:
980+
# No parent datasource available — fall back to a plain-text representation.
981+
placard_string = f"{stix_object['name']} (revoked by {revoker['name']})"
953982

954983
else:
955984
relative_url = get_relative_url_from_stix(stix_object=revoker)
@@ -964,6 +993,9 @@ def placard(self, stix_object: dict, section: str, domain: str) -> str:
964993
if parent_object:
965994
relative_url = get_relative_data_component_url(datasource=parent_object, datacomponent=stix_object)
966995
placard_string = f"[{stix_object['name']}]({self.site_prefix}/{relative_url})"
996+
else:
997+
# No parent datasource available — display datacomponent name as plain text.
998+
placard_string = stix_object["name"]
967999

9681000
else:
9691001
relative_url = get_relative_url_from_stix(stix_object=stix_object)
@@ -1273,6 +1305,22 @@ def cleanup_values(groupings: List[dict]) -> List[dict]:
12731305
return new_values
12741306

12751307

1308+
def resolve_datacomponent_parent(datacomponent: dict, datasources: Dict[str, dict]) -> Optional[str]:
1309+
"""Best-effort resolution of a datacomponent's parent datasource when an explicit x_mitre_data_source_ref is not present.
1310+
1311+
Strategy:
1312+
1. If the datacomponent contains an explicit 'x_mitre_data_source_ref', return it.
1313+
2. If no match, return None.
1314+
"""
1315+
# explicit ref
1316+
parent_ref = datacomponent.get("x_mitre_data_source_ref")
1317+
if parent_ref:
1318+
return parent_ref
1319+
1320+
# nothing matched
1321+
return None
1322+
1323+
12761324
def version_increment_is_valid(
12771325
old_version: AttackObjectVersion | None, new_version: AttackObjectVersion | None, section: str
12781326
) -> bool:

0 commit comments

Comments
 (0)