Skip to content

Commit 28ef11d

Browse files
committed
chore: update excel files
1 parent a55039f commit 28ef11d

2 files changed

Lines changed: 85 additions & 30 deletions

File tree

mitreattack/attackToExcel/attackToExcel.py

Lines changed: 34 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -148,24 +148,32 @@ def build_dataframes(src: MemoryStore, domain: str) -> Dict:
148148

149149

150150
def build_ds_an_lg_relationships(dataframes: Dict) -> Dict[str, pd.DataFrame]:
151-
"""Build sheets for ds-an-lg.xlsx using existing relationship tables."""
152-
# Use existing DetectionStrategy -> Analytics relationship table
153-
ds_an = dataframes["detectionstrategies"].get("detectionstrategies-analytic", pd.DataFrame())
151+
"""Build detection-mappings.xlsx with a single DS → Analytic → LogSource sheet."""
154152

155-
# Use existing Analytics -> LogSource relationship table
156-
an_lg = dataframes["analytics"].get("analytic-logsource", pd.DataFrame())
153+
ds_an = dataframes["detectionstrategies"].get(
154+
"detectionstrategies-analytic", pd.DataFrame()
155+
)
157156

158-
# Use existing Analytics -> Detection Strategy relationship table
159-
an_ds = dataframes["analytics"].get("analytic-detectionstrategy", pd.DataFrame())
157+
an_ls = dataframes["analytics"].get(
158+
"analytic-logsource", pd.DataFrame()
159+
)
160+
161+
if ds_an.empty or an_ls.empty:
162+
combined = pd.DataFrame()
163+
else:
164+
combined = ds_an.merge(
165+
an_ls,
166+
on=["analytic_id", "analytic_name", "platforms"],
167+
how="left",
168+
)
160169

161170
return {
162-
"detectionstrategy_to_analytics": ds_an,
163-
"analytics_to_logsources": an_lg,
164-
"analytics_to_detectionstrategy": an_ds,
171+
"ds_an_ls": combined
165172
}
166173

167174

168-
def write_excel(dataframes: Dict, domain: str, version: Optional[str] = None, output_dir: str = ".") -> List:
175+
176+
def write_excel(dataframes: Dict, domain: str, src: MemoryStore, version: Optional[str] = None, output_dir: str = ".") -> List:
169177
"""Given a set of dataframes from build_dataframes, write the ATT&CK dataset to output directory.
170178
171179
Parameters
@@ -174,6 +182,9 @@ def write_excel(dataframes: Dict, domain: str, version: Optional[str] = None, ou
174182
A dictionary of pandas dataframes as built by build_dataframes()
175183
domain : str
176184
Domain of ATT&CK the dataframes correspond to, e.g "enterprise-attack"
185+
src : stix2.MemoryStore
186+
A STIX bundle containing ATT&CK data for a domain already loaded into memory.
187+
Mutually exclusive with `remote` and `stix_file`.
177188
version : str, optional
178189
The version of ATT&CK the dataframes correspond to, e.g "v8.1".
179190
If omitted, the output files will not be labelled with the version number, by default None
@@ -199,6 +210,10 @@ def write_excel(dataframes: Dict, domain: str, version: Optional[str] = None, ou
199210
os.makedirs(output_directory)
200211
# master dataset file
201212
master_fp = os.path.join(output_directory, f"{domain_version_string}.xlsx")
213+
214+
ds_an_ls_df = stixToDf.detectionStrategiesAnalyticsLogSourcesDf(src)
215+
add_ds_an_ls_to = {"detectionstrategies", "analytics", "datacomponents"}
216+
202217
with pd.ExcelWriter(path=master_fp, engine="xlsxwriter") as master_writer:
203218
# master list of citations
204219
citations = pd.DataFrame()
@@ -217,6 +232,10 @@ def write_excel(dataframes: Dict, domain: str, version: Optional[str] = None, ou
217232
for sheet_name in object_data:
218233
logger.debug(f"Writing sheet to {fp}: {sheet_name}")
219234
object_data[sheet_name].to_excel(object_writer, sheet_name=sheet_name, index=False)
235+
236+
# Write Detection strategy - Analytics - Log sources file
237+
if object_type in add_ds_an_ls_to and isinstance(ds_an_ls_df, pd.DataFrame) and not ds_an_ls_df.empty:
238+
ds_an_ls_df.to_excel(object_writer, sheet_name="detection mappings", index=False)
220239
written_files.append(fp)
221240

222241
# add citations to master citations list
@@ -303,6 +322,8 @@ def write_excel(dataframes: Dict, domain: str, version: Optional[str] = None, ou
303322

304323
written_files.append(fp)
305324

325+
if isinstance(ds_an_ls_df, pd.DataFrame) and not ds_an_ls_df.empty:
326+
ds_an_ls_df.to_excel(master_writer, sheet_name="detection mappings", index=False)
306327
# remove duplicate citations and add sheet to master file
307328
logger.debug(f"Writing sheet to {master_fp}: citations")
308329
citations.drop_duplicates(subset="reference", ignore_index=True).sort_values("reference").to_excel(
@@ -311,17 +332,6 @@ def write_excel(dataframes: Dict, domain: str, version: Optional[str] = None, ou
311332

312333
written_files.append(master_fp)
313334

314-
# Write Detection strategy - Analytics - Log sources file
315-
ds_an_lg_frames = build_ds_an_lg_relationships(dataframes)
316-
ds_an_lg_fp = os.path.join(output_directory, f"{domain_version_string}-detectionstrategy-analytic-logsources.xlsx")
317-
318-
with pd.ExcelWriter(ds_an_lg_fp) as rel_writer:
319-
for sheet_name, df in ds_an_lg_frames.items():
320-
if not df.empty:
321-
df.to_excel(rel_writer, sheet_name=sheet_name, index=False)
322-
323-
written_files.append(ds_an_lg_fp)
324-
325335
for thefile in written_files:
326336
logger.info(f"Excel file created: {thefile}")
327337
return written_files
@@ -398,7 +408,7 @@ def export(
398408
return
399409

400410
dataframes = build_dataframes(src=mem_store, domain=domain)
401-
write_excel(dataframes=dataframes, domain=domain, version=version, output_dir=output_dir)
411+
write_excel(dataframes=dataframes, domain=domain, src=mem_store, version=version, output_dir=output_dir)
402412

403413

404414
def main():
@@ -410,7 +420,7 @@ def main():
410420
"-domain",
411421
type=str,
412422
choices=["enterprise-attack", "mobile-attack", "ics-attack"],
413-
default="enterprise-attack",
423+
default="ics-attack",
414424
help="which domain of ATT&CK to convert",
415425
)
416426
parser.add_argument(

mitreattack/attackToExcel/stixToDf.py

Lines changed: 51 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -382,7 +382,7 @@ def analyticsToDf(src):
382382
for ds in detection_strategies:
383383
for analytic_id in ds.get("x_mitre_analytic_refs", []):
384384
analytic_to_ds_map.setdefault(analytic_id, []).append(
385-
{"detection_strategy_id": ds["id"], "detection_strategy_name": ds.get("name", "")}
385+
{"detection_strategy_attack_id": ds["external_references"][0]["external_id"], "detection_strategy_id": ds["id"], "detection_strategy_name": ds.get("name", "")}
386386
)
387387

388388
for analytic in tqdm(analytics, desc="parsing analytics"):
@@ -393,15 +393,18 @@ def analyticsToDf(src):
393393
data_comp_id = logsrc.get("x_mitre_data_component_ref", "")
394394
data_comp = src.get(data_comp_id)
395395
data_comp_name = data_comp.get("name", "") if data_comp else ""
396+
data_comp_attack_id = data_comp["external_references"][0]["external_id"]
396397

397398
logsource_rows.append(
398399
{
399400
"analytic_id": analytic["id"],
400401
"analytic_name": analytic["external_references"][0]["external_id"],
401402
"data_component_id": data_comp_id,
402403
"data_component_name": data_comp_name,
404+
"data_component_attack_id": data_comp_attack_id,
403405
"log_source_name": logsrc.get("name", ""),
404406
"channel": logsrc.get("channel", ""),
407+
"platforms": ", ".join(sorted(analytic.get("x_mitre_platforms", [])))
405408
}
406409
)
407410

@@ -412,13 +415,14 @@ def analyticsToDf(src):
412415
"analytic_id": analytic["id"],
413416
"analytic_name": analytic["external_references"][0]["external_id"],
414417
"detection_strategy_id": ds_info["detection_strategy_id"],
418+
"detection_strategy_attack_id": ds_info["detection_strategy_attack_id"],
415419
"detection_strategy_name": ds_info["detection_strategy_name"],
420+
"platforms": ", ".join(sorted(analytic.get("x_mitre_platforms", [])))
421+
416422
}
417423
)
418424

419425
dataframes["analytics"] = pd.DataFrame(analytic_rows).sort_values("name")
420-
dataframes["analytic-logsource"] = pd.DataFrame(logsource_rows)
421-
dataframes["analytic-detectionstrategy"] = pd.DataFrame(analytic_to_ds_rows)
422426

423427
citations = get_citations(analytics)
424428
if not citations.empty:
@@ -454,18 +458,19 @@ def detectionstrategiesToDf(src):
454458

455459
rel_rows.append(
456460
{
461+
"detection_strategy_attack_id": detection_strategy["external_references"][0]["external_id"],
457462
"detection_strategy_id": detection_strategy["id"],
458463
"detection_strategy_name": detection_strategy.get("name", ""),
459464
"analytic_id": analytic_id,
460465
"analytic_name": analytic_obj["external_references"][0]["external_id"],
466+
"platforms": ", ".join(sorted(analytic_obj.get("x_mitre_platforms", [])))
467+
461468
}
462469
)
463470

464471
# Build main dataframes
465472
dataframes["detectionstrategies"] = pd.DataFrame(detection_strategy_rows).sort_values("name")
466473

467-
dataframes["detectionstrategies-analytic"] = pd.DataFrame(rel_rows)
468-
469474
citations = get_citations(detection_strategies)
470475
if not citations.empty:
471476
dataframes["citations"] = citations.sort_values("reference")
@@ -520,6 +525,46 @@ def softwareToDf(src):
520525

521526
return dataframes
522527

528+
def detectionStrategiesAnalyticsLogSourcesDf(src):
529+
"""Build a single DS -> LogSource -> Analytic dataframe directly from STIX."""
530+
detection_strategies = src.query([Filter("type", "=", "x-mitre-detection-strategy")])
531+
detection_strategies = remove_revoked_deprecated(detection_strategies)
532+
533+
analytics = src.query([Filter("type", "=", "x-mitre-analytic")])
534+
analytics = remove_revoked_deprecated(analytics)
535+
analytics_by_id = {a["id"]: a for a in analytics}
536+
537+
rows = []
538+
for ds in detection_strategies:
539+
ds_attack_id = ds.get("external_references", [{}])[0].get("external_id", "")
540+
ds_id = ds.get("id", "")
541+
ds_name = ds.get("name", "")
542+
543+
for analytic_id in ds.get("x_mitre_analytic_refs", []):
544+
analytic = analytics_by_id.get(analytic_id)
545+
analytic_attack_id = analytic["external_references"][0]["external_id"]
546+
platforms = ", ".join(sorted(analytic.get("x_mitre_platforms", [])))
547+
548+
logsrc_refs = analytic.get("x_mitre_log_source_references", [])
549+
for logsrc in logsrc_refs:
550+
data_comp_id = logsrc.get("x_mitre_data_component_ref", "")
551+
data_comp = src.get(data_comp_id)
552+
553+
rows.append({
554+
"detection_strategy_attack_id": ds_attack_id,
555+
"detection_strategy_id": ds_id,
556+
"detection_strategy_name": ds_name,
557+
"analytic_id": analytic_id,
558+
"analytic_name": analytic_attack_id,
559+
"platforms": platforms,
560+
"log_source_name": logsrc.get("name", ""),
561+
"channel": logsrc.get("channel", ""),
562+
"data_component_id": data_comp_id,
563+
"data_component_name": (data_comp.get("name", "") if data_comp else ""),
564+
"data_component_attack_id": data_comp["external_references"][0]["external_id"]
565+
})
566+
567+
return pd.DataFrame(rows)
523568

524569
def groupsToDf(src):
525570
"""Parse STIX groups from the given data and return corresponding pandas dataframes.
@@ -1264,4 +1309,4 @@ def _get_relationship_citations(object_dataframe, relationship_df):
12641309
else:
12651310
for i in range(0, len(new_citations)):
12661311
new_citations[i] = ",".join([new_citations[i], subset[i]])
1267-
return new_citations
1312+
return new_citations

0 commit comments

Comments
 (0)