|
| 1 | +"""Hierarchy builder for grouping STIX objects in parent-child relationships.""" |
| 2 | + |
| 3 | +from typing import Dict, List |
| 4 | + |
| 5 | +from mitreattack.diffStix.utils.stix_utils import has_subtechniques, resolve_datacomponent_parent |
| 6 | + |
| 7 | + |
| 8 | +class HierarchyBuilder: |
| 9 | + """Builds hierarchical groupings of STIX objects (techniques/subtechniques, datasources/components).""" |
| 10 | + |
| 11 | + def __init__(self, diff_stix_instance): |
| 12 | + """Initialize HierarchyBuilder with a DiffStix instance. |
| 13 | +
|
| 14 | + Parameters |
| 15 | + ---------- |
| 16 | + diff_stix_instance : DiffStix |
| 17 | + The DiffStix instance containing data and helper methods |
| 18 | + """ |
| 19 | + self.diff_stix = diff_stix_instance |
| 20 | + |
| 21 | + def get_groupings(self, object_type: str, stix_objects: List, section: str, domain: str) -> List[Dict[str, object]]: |
| 22 | + """Group STIX objects together within a section. |
| 23 | +
|
| 24 | + A "group" in this sense is a set of STIX objects that are all in the same section, e.g. new minor version. |
| 25 | + In this case, since a domain/object type are implied before we get here, it would be |
| 26 | + e.g. "All Enterprise Techniques & Subtechniques, grouped alphabetically by name, and the |
| 27 | + sub-techniques are 'grouped' under their parent technique" |
| 28 | +
|
| 29 | + Parameters |
| 30 | + ---------- |
| 31 | + object_type : str |
| 32 | + Type of STIX object that is being worked with. |
| 33 | + stix_objects : List |
| 34 | + List of STIX objects that need to be grouped. |
| 35 | + section : str |
| 36 | + Section of the changelog that is being created with the objects, |
| 37 | + e.g. new major version, revocation, etc. |
| 38 | + domain : str |
| 39 | + ATT&CK domain (e.g., "enterprise-attack") |
| 40 | +
|
| 41 | + Returns |
| 42 | + ------- |
| 43 | + List[Dict[str, object]] |
| 44 | + A list of sorted, complex dictionary objects that tell if this "group" of objects have |
| 45 | + their parent objects in the same section. |
| 46 | + """ |
| 47 | + datastore_version = "old" if section == "deletions" else "new" |
| 48 | + subtechnique_relationships = self.diff_stix.data[datastore_version][domain]["relationships"]["subtechniques"] |
| 49 | + techniques = self.diff_stix.data[datastore_version][domain]["attack_objects"]["techniques"] |
| 50 | + datacomponents = self.diff_stix.data[datastore_version][domain]["attack_objects"]["datacomponents"] |
| 51 | + datasources = self.diff_stix.data[datastore_version][domain]["attack_objects"]["datasources"] |
| 52 | + |
| 53 | + childless = [] |
| 54 | + parents = [] |
| 55 | + children = {} |
| 56 | + # get parents which have children |
| 57 | + if object_type == "datasource": |
| 58 | + for stix_object in stix_objects: |
| 59 | + if stix_object.get("x_mitre_data_source_ref"): |
| 60 | + children[stix_object["id"]] = stix_object |
| 61 | + else: |
| 62 | + parents.append(stix_object) |
| 63 | + else: |
| 64 | + for stix_object in stix_objects: |
| 65 | + is_subtechnique = stix_object.get("x_mitre_is_subtechnique") |
| 66 | + |
| 67 | + if is_subtechnique: |
| 68 | + children[stix_object["id"]] = stix_object |
| 69 | + elif has_subtechniques(stix_object=stix_object, subtechnique_relationships=subtechnique_relationships): |
| 70 | + parents.append(stix_object) |
| 71 | + else: |
| 72 | + childless.append(stix_object) |
| 73 | + |
| 74 | + parentToChildren = {} |
| 75 | + # subtechniques |
| 76 | + for relationship in subtechnique_relationships.values(): |
| 77 | + if relationship["source_ref"] not in children: |
| 78 | + continue |
| 79 | + |
| 80 | + parent_technique_stix_id = relationship["target_ref"] |
| 81 | + the_subtechnique = children[relationship["source_ref"]] |
| 82 | + if parent_technique_stix_id not in parentToChildren: |
| 83 | + parentToChildren[parent_technique_stix_id] = [] |
| 84 | + parentToChildren[parent_technique_stix_id].append(the_subtechnique) |
| 85 | + |
| 86 | + # datacomponents |
| 87 | + for datacomponent in datacomponents.values(): |
| 88 | + if datacomponent["id"] not in children: |
| 89 | + continue |
| 90 | + |
| 91 | + # Prefer explicit reference, otherwise try a heuristic lookup |
| 92 | + parent_datasource_id = datacomponent.get("x_mitre_data_source_ref") |
| 93 | + if not parent_datasource_id: |
| 94 | + parent_datasource_id = resolve_datacomponent_parent(datacomponent, datasources) |
| 95 | + the_datacomponent = children[datacomponent["id"]] |
| 96 | + if parent_datasource_id: |
| 97 | + if parent_datasource_id not in parentToChildren: |
| 98 | + parentToChildren[parent_datasource_id] = [] |
| 99 | + parentToChildren[parent_datasource_id].append(the_datacomponent) |
| 100 | + |
| 101 | + # now group parents and children |
| 102 | + groupings = [] |
| 103 | + for parent_stix_object in childless + parents: |
| 104 | + child_objects = ( |
| 105 | + parentToChildren.pop(parent_stix_object["id"]) if parent_stix_object["id"] in parentToChildren else [] |
| 106 | + ) |
| 107 | + groupings.append( |
| 108 | + { |
| 109 | + "parent": parent_stix_object, |
| 110 | + "parentInSection": True, |
| 111 | + "children": child_objects, |
| 112 | + } |
| 113 | + ) |
| 114 | + |
| 115 | + for parent_stix_id, child_objects in parentToChildren.items(): |
| 116 | + parent_stix_object = None |
| 117 | + if parent_stix_id in techniques: |
| 118 | + parent_stix_object = techniques[parent_stix_id] |
| 119 | + elif parent_stix_id in datasources: |
| 120 | + parent_stix_object = datasources[parent_stix_id] |
| 121 | + |
| 122 | + if parent_stix_object: |
| 123 | + groupings.append( |
| 124 | + { |
| 125 | + "parent": parent_stix_object, |
| 126 | + "parentInSection": False, |
| 127 | + "children": child_objects, |
| 128 | + } |
| 129 | + ) |
| 130 | + |
| 131 | + groupings = sorted(groupings, key=lambda grouping: grouping["parent"]["name"]) |
| 132 | + return groupings |
0 commit comments