Skip to content

Commit 92dd034

Browse files
committed
refactor: Extract StatisticsCollector from DiffStix
- Create StatisticsCollector class in core/statistics_collector.py (151 lines) - Extract three statistics methods into focused class: - collect_domain_statistics() from _collect_domain_statistics() (48 lines) - collect_unique_object_counts() from _collect_unique_object_counts() (38 lines) - generate_statistics_section() from get_statistics_section() (39 lines) - Update DiffStix to delegate to StatisticsCollector - DiffStix reduced from 1,217 lines to 1,142 lines (6.2% reduction) - Total reduction from original: 334 lines (22.8% from 1,462 lines) - All 132/133 tests passing (only known permission test fails)
1 parent c4bc16e commit 92dd034

2 files changed

Lines changed: 154 additions & 84 deletions

File tree

mitreattack/diffStix/core/diff_stix.py

Lines changed: 9 additions & 84 deletions
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,7 @@
1313

1414
from mitreattack.diffStix.core.contributor_tracker import ContributorTracker
1515
from mitreattack.diffStix.core.domain_statistics import DomainStatistics
16+
from mitreattack.diffStix.core.statistics_collector import StatisticsCollector
1617
from mitreattack.diffStix.formatters.json_generator import JsonGenerator
1718
from mitreattack.diffStix.formatters.layer_generator import LayerGenerator
1819
from mitreattack.diffStix.formatters.markdown_generator import MarkdownGenerator
@@ -197,7 +198,8 @@ def __init__(
197198

198199
self.load_data()
199200

200-
# Initialize formatters after data is loaded
201+
# Initialize components after data is loaded
202+
self._statistics_collector = StatisticsCollector(self)
201203
self._markdown_generator = MarkdownGenerator(self)
202204
self._layer_generator = LayerGenerator(self)
203205
self._json_generator = JsonGenerator(self)
@@ -1060,8 +1062,7 @@ def placard(self, stix_object: dict, section: str, domain: str) -> str:
10601062
return self._markdown_generator.placard(stix_object, section, domain)
10611063

10621064
def _collect_domain_statistics(self, datastore: MemoryStore, domain_name: str) -> DomainStatistics:
1063-
"""
1064-
Collect statistics for a single domain from a STIX datastore.
1065+
"""Collect statistics for a single domain from a STIX datastore.
10651066
10661067
Parameters
10671068
----------
@@ -1075,42 +1076,10 @@ def _collect_domain_statistics(self, datastore: MemoryStore, domain_name: str) -
10751076
DomainStatistics
10761077
Statistics for the domain.
10771078
"""
1078-
# Create MitreAttackData instance from the datastore
1079-
data = MitreAttackData(src=datastore)
1080-
1081-
# Get all object types, removing revoked and deprecated
1082-
tactics = data.get_tactics(remove_revoked_deprecated=True)
1083-
techniques = data.get_techniques(include_subtechniques=False, remove_revoked_deprecated=True)
1084-
subtechniques = data.get_subtechniques(remove_revoked_deprecated=True)
1085-
groups = data.get_groups(remove_revoked_deprecated=True)
1086-
software = data.get_software(remove_revoked_deprecated=True)
1087-
campaigns = data.get_campaigns(remove_revoked_deprecated=True)
1088-
mitigations = data.get_mitigations(remove_revoked_deprecated=True)
1089-
assets = data.get_assets(remove_revoked_deprecated=True)
1090-
datasources = data.get_datasources(remove_revoked_deprecated=True)
1091-
detectionstrategies = data.get_detectionstrategies(remove_revoked_deprecated=True)
1092-
analytics = data.get_analytics(remove_revoked_deprecated=True)
1093-
datacomponents = data.get_datacomponents(remove_revoked_deprecated=True)
1094-
1095-
return DomainStatistics(
1096-
name=domain_name,
1097-
tactics=len(tactics),
1098-
techniques=len(techniques),
1099-
subtechniques=len(subtechniques),
1100-
groups=len(groups),
1101-
software=len(software),
1102-
campaigns=len(campaigns),
1103-
mitigations=len(mitigations),
1104-
assets=len(assets),
1105-
datasources=len(datasources),
1106-
detectionstrategies=len(detectionstrategies),
1107-
analytics=len(analytics),
1108-
datacomponents=len(datacomponents),
1109-
)
1079+
return self._statistics_collector.collect_domain_statistics(datastore, domain_name)
11101080

11111081
def _collect_unique_object_counts(self, datastore_version: str) -> dict[str, int]:
1112-
"""
1113-
Collect counts of unique objects across all domains for a specific version.
1082+
"""Collect counts of unique objects across all domains for a specific version.
11141083
11151084
Some objects (Software, Groups, Campaigns) may appear in multiple domains.
11161085
This function counts unique objects to avoid double-counting.
@@ -1125,31 +1094,10 @@ def _collect_unique_object_counts(self, datastore_version: str) -> dict[str, int
11251094
dict of str to int
11261095
Counts of unique software, groups, and campaigns.
11271096
"""
1128-
all_software_ids = set()
1129-
all_groups_ids = set()
1130-
all_campaigns_ids = set()
1131-
1132-
for domain in self.domains:
1133-
datastore = self.data[datastore_version][domain]["stix_datastore"]
1134-
data = MitreAttackData(src=datastore)
1135-
1136-
software = data.get_software(remove_revoked_deprecated=True)
1137-
groups = data.get_groups(remove_revoked_deprecated=True)
1138-
campaigns = data.get_campaigns(remove_revoked_deprecated=True)
1139-
1140-
all_software_ids.update(obj["id"] for obj in software)
1141-
all_groups_ids.update(obj["id"] for obj in groups)
1142-
all_campaigns_ids.update(obj["id"] for obj in campaigns)
1143-
1144-
return {
1145-
"software": len(all_software_ids),
1146-
"groups": len(all_groups_ids),
1147-
"campaigns": len(all_campaigns_ids),
1148-
}
1097+
return self._statistics_collector.collect_unique_object_counts(datastore_version)
11491098

11501099
def get_statistics_section(self, datastore_version: str = "new") -> str:
1151-
"""
1152-
Generate a markdown section with ATT&CK statistics for all domains.
1100+
"""Generate a markdown section with ATT&CK statistics for all domains.
11531101
11541102
Parameters
11551103
----------
@@ -1162,30 +1110,7 @@ def get_statistics_section(self, datastore_version: str = "new") -> str:
11621110
str
11631111
Markdown-formatted statistics section.
11641112
"""
1165-
# Collect unique object counts across all domains
1166-
unique_counts = self._collect_unique_object_counts(datastore_version)
1167-
1168-
# Collect statistics for each domain
1169-
domain_stats = []
1170-
for domain in self.domains:
1171-
datastore = self.data[datastore_version][domain]["stix_datastore"]
1172-
domain_label = self.domain_to_domain_label[domain]
1173-
stats = self._collect_domain_statistics(datastore, domain_label)
1174-
domain_stats.append(stats)
1175-
1176-
# Build the statistics section
1177-
output = "## Statistics\n\n"
1178-
output += (
1179-
f"This version of ATT&CK contains {unique_counts['software']} Software, "
1180-
f"{unique_counts['groups']} Groups, and {unique_counts['campaigns']} Campaigns.\n\n"
1181-
)
1182-
output += "Broken out by domain:\n\n"
1183-
1184-
for stats in domain_stats:
1185-
output += stats.format_output() + "\n"
1186-
1187-
output += "\n"
1188-
return output
1113+
return self._statistics_collector.generate_statistics_section(datastore_version)
11891114

11901115
def get_markdown_section_data(self, groupings, section: str, domain: str) -> str:
11911116
"""Parse a list of STIX objects in a section and return a string for the whole section."""
Lines changed: 145 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,145 @@
1+
"""Statistics collector for ATT&CK version data."""
2+
3+
from stix2 import MemoryStore
4+
5+
from mitreattack.diffStix.core.domain_statistics import DomainStatistics
6+
from mitreattack.stix20 import MitreAttackData
7+
8+
9+
class StatisticsCollector:
10+
"""Collects and formats statistics from ATT&CK STIX data."""
11+
12+
def __init__(self, diff_stix_instance):
13+
"""Initialize StatisticsCollector with a DiffStix instance.
14+
15+
Parameters
16+
----------
17+
diff_stix_instance : DiffStix
18+
The DiffStix instance containing data and helper methods
19+
"""
20+
self.diff_stix = diff_stix_instance
21+
22+
def collect_domain_statistics(self, datastore: MemoryStore, domain_name: str) -> DomainStatistics:
23+
"""Collect statistics for a single domain from a STIX datastore.
24+
25+
Parameters
26+
----------
27+
datastore : MemoryStore
28+
The STIX MemoryStore containing the domain data.
29+
domain_name : str
30+
Display name of the domain (e.g., "Enterprise", "Mobile", "ICS").
31+
32+
Returns
33+
-------
34+
DomainStatistics
35+
Statistics for the domain.
36+
"""
37+
# Create MitreAttackData instance from the datastore
38+
data = MitreAttackData(src=datastore)
39+
40+
# Get all object types, removing revoked and deprecated
41+
tactics = data.get_tactics(remove_revoked_deprecated=True)
42+
techniques = data.get_techniques(include_subtechniques=False, remove_revoked_deprecated=True)
43+
subtechniques = data.get_subtechniques(remove_revoked_deprecated=True)
44+
groups = data.get_groups(remove_revoked_deprecated=True)
45+
software = data.get_software(remove_revoked_deprecated=True)
46+
campaigns = data.get_campaigns(remove_revoked_deprecated=True)
47+
mitigations = data.get_mitigations(remove_revoked_deprecated=True)
48+
assets = data.get_assets(remove_revoked_deprecated=True)
49+
datasources = data.get_datasources(remove_revoked_deprecated=True)
50+
detectionstrategies = data.get_detectionstrategies(remove_revoked_deprecated=True)
51+
analytics = data.get_analytics(remove_revoked_deprecated=True)
52+
datacomponents = data.get_datacomponents(remove_revoked_deprecated=True)
53+
54+
return DomainStatistics(
55+
name=domain_name,
56+
tactics=len(tactics),
57+
techniques=len(techniques),
58+
subtechniques=len(subtechniques),
59+
groups=len(groups),
60+
software=len(software),
61+
campaigns=len(campaigns),
62+
mitigations=len(mitigations),
63+
assets=len(assets),
64+
datasources=len(datasources),
65+
detectionstrategies=len(detectionstrategies),
66+
analytics=len(analytics),
67+
datacomponents=len(datacomponents),
68+
)
69+
70+
def collect_unique_object_counts(self, datastore_version: str) -> dict[str, int]:
71+
"""Collect counts of unique objects across all domains for a specific version.
72+
73+
Some objects (Software, Groups, Campaigns) may appear in multiple domains.
74+
This function counts unique objects to avoid double-counting.
75+
76+
Parameters
77+
----------
78+
datastore_version : str
79+
Either "old" or "new" to specify which version's data to analyze.
80+
81+
Returns
82+
-------
83+
dict of str to int
84+
Counts of unique software, groups, and campaigns.
85+
"""
86+
all_software_ids = set()
87+
all_groups_ids = set()
88+
all_campaigns_ids = set()
89+
90+
for domain in self.diff_stix.domains:
91+
datastore = self.diff_stix.data[datastore_version][domain]["stix_datastore"]
92+
data = MitreAttackData(src=datastore)
93+
94+
software = data.get_software(remove_revoked_deprecated=True)
95+
groups = data.get_groups(remove_revoked_deprecated=True)
96+
campaigns = data.get_campaigns(remove_revoked_deprecated=True)
97+
98+
all_software_ids.update(obj["id"] for obj in software)
99+
all_groups_ids.update(obj["id"] for obj in groups)
100+
all_campaigns_ids.update(obj["id"] for obj in campaigns)
101+
102+
return {
103+
"software": len(all_software_ids),
104+
"groups": len(all_groups_ids),
105+
"campaigns": len(all_campaigns_ids),
106+
}
107+
108+
def generate_statistics_section(self, datastore_version: str = "new") -> str:
109+
"""Generate a markdown section with ATT&CK statistics for all domains.
110+
111+
Parameters
112+
----------
113+
datastore_version : str, optional
114+
Either "old" or "new" to specify which version's statistics to generate.
115+
Defaults to "new".
116+
117+
Returns
118+
-------
119+
str
120+
Markdown-formatted statistics section.
121+
"""
122+
# Collect unique object counts across all domains
123+
unique_counts = self.collect_unique_object_counts(datastore_version)
124+
125+
# Collect statistics for each domain
126+
domain_stats = []
127+
for domain in self.diff_stix.domains:
128+
datastore = self.diff_stix.data[datastore_version][domain]["stix_datastore"]
129+
domain_label = self.diff_stix.domain_to_domain_label[domain]
130+
stats = self.collect_domain_statistics(datastore, domain_label)
131+
domain_stats.append(stats)
132+
133+
# Build the statistics section
134+
output = "## Statistics\n\n"
135+
output += (
136+
f"This version of ATT&CK contains {unique_counts['software']} Software, "
137+
f"{unique_counts['groups']} Groups, and {unique_counts['campaigns']} Campaigns.\n\n"
138+
)
139+
output += "Broken out by domain:\n\n"
140+
141+
for stats in domain_stats:
142+
output += stats.format_output() + "\n"
143+
144+
output += "\n"
145+
return output

0 commit comments

Comments
 (0)