Skip to content

Commit 90b5bc4

Browse files
committed
start campaign support
1 parent f96f9d6 commit 90b5bc4

2 files changed

Lines changed: 141 additions & 7 deletions

File tree

.gitignore

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -108,4 +108,5 @@ venv.bak/
108108
# intellij
109109
.idea/
110110

111-
.DS_Store
111+
.DS_Store
112+
.history

attackcti/attack_api.py

Lines changed: 139 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -208,6 +208,19 @@ def translate_stix_objects(self, stix_objects):
208208
"x_mitre_collection_layers": "collection_layers",
209209
"x_mitre_contributors": "contributors"
210210
}
211+
campaign_stix_mapping = {
212+
"type": "type",
213+
"id": "id",
214+
"created_by_ref": "created_by_ref",
215+
"created": "created",
216+
"modified": "modified",
217+
"title": "name",
218+
"description": "campaign_description",
219+
"aliases": "campaign_aliases",
220+
"object_marking_refs": "object_marking_refs",
221+
"external_references": "external_references",
222+
223+
}
211224

212225
# ******** Helper Functions ********
213226
def handle_list(list_object, object_type):
@@ -230,6 +243,8 @@ def handle_list(list_object, object_type):
230243
obj_dict['tactic_id'] = list_object[0]['external_id']
231244
elif obj_dict['type'] == 'matrix':
232245
obj_dict['matrix_id'] = list_object[0]['external_id']
246+
elif obj_dict['type'] == 'campaign':
247+
obj_dict['campaign_id'] = list_object[0]['external_id']
233248
elif object_type == "kill_chain_phases":
234249
tactic_list = list()
235250
for phase in list_object:
@@ -266,6 +281,8 @@ def handle_list(list_object, object_type):
266281
stix_mapping = marking_stix_mapping
267282
elif obj['type'] == "x-mitre-data-source":
268283
stix_mapping = data_source_stix_mapping
284+
elif obj['type'] == "campaign":
285+
stix_mapping = campaign_stix_mapping
269286
else:
270287
return stix_objects_list
271288

@@ -330,7 +347,8 @@ def get_enterprise(self, stix_format=True):
330347
"tactics": self.get_enterprise_tactics,
331348
"matrix": Filter("type", "=", "x-mitre-matrix"),
332349
"identity": Filter("type", "=", "identity"),
333-
"marking-definition": Filter("type", "=", "marking-definition")
350+
"marking-definition": Filter("type", "=", "marking-definition"),
351+
"campaign": self.get_enterprise_campaigns
334352
}
335353
enterprise_stix_objects = dict()
336354
for key in enterprise_filter_objects:
@@ -339,6 +357,25 @@ def get_enterprise(self, stix_format=True):
339357
enterprise_stix_objects[key] = self.translate_stix_objects(enterprise_stix_objects[key])
340358
return enterprise_stix_objects
341359

360+
def get_enterprise_campaigns(self, skip_revoked_deprecated=True, stix_format=True):
361+
""" Extracts all the available campaigns STIX objects in the Enterprise ATT&CK matrix
362+
363+
Args:
364+
skip_revoked_deprecated (bool): default True. Skip revoked and deprecated STIX objects.
365+
stix_format (bool): Returns results in original STIX format or friendly syntax (e.g. 'attack-pattern' or 'technique')
366+
367+
Returns:
368+
List of STIX objects
369+
"""
370+
enterprise_campaigns = self.TC_ENTERPRISE_SOURCE.query([Filter("type", "=", "campaign")])
371+
372+
if skip_revoked_deprecated:
373+
enterprise_campaigns = self.remove_revoked_deprecated(enterprise_campaigns)
374+
375+
if not stix_format:
376+
enterprise_campaigns = self.translate_stix_objects(enterprise_campaigns)
377+
return enterprise_campaigns
378+
342379
def get_enterprise_techniques(self, skip_revoked_deprecated=True, include_subtechniques=True, enrich_data_sources = False, stix_format=True):
343380
""" Extracts all the available techniques STIX objects in the Enterprise ATT&CK matrix
344381
@@ -636,15 +673,36 @@ def get_mobile(self, stix_format=True):
636673
"tactics": self.get_mobile_tactics,
637674
"matrix": Filter("type", "=", "x-mitre-matrix"),
638675
"identity": Filter("type", "=", "identity"),
639-
"marking-definition": Filter("type", "=", "marking-definition")
676+
"marking-definition": Filter("type", "=", "marking-definition"),
677+
"campaigns": self.get_mobile_campaigns
640678
}
641679
mobile_stix_objects = {}
642680
for key in mobile_filter_objects:
643681
mobile_stix_objects[key] = self.TC_MOBILE_SOURCE.query(mobile_filter_objects[key]) if isinstance(mobile_filter_objects[key], Filter) else mobile_filter_objects[key]()
644682
if not stix_format:
645683
mobile_stix_objects[key] = self.translate_stix_objects(mobile_stix_objects[key])
646684
return mobile_stix_objects
647-
685+
686+
def get_mobile_campaigns(self, skip_revoked_deprecated=True, stix_format=True):
687+
""" Extracts all the available techniques STIX objects in the Mobile ATT&CK matrix
688+
689+
Args:
690+
skip_revoked_deprecated (bool): default True. Skip revoked and deprecated STIX objects.
691+
stix_format (bool): Returns results in original STIX format or friendly syntax (e.g. 'attack-pattern' or 'technique')
692+
693+
Returns:
694+
List of STIX objects
695+
"""
696+
697+
mobile_campaigns = self.TC_MOBILE_SOURCE.query(Filter("type", "=", "campaigns"))
698+
699+
if skip_revoked_deprecated:
700+
mobile_campaigns = self.remove_revoked_deprecated(mobile_campaigns)
701+
702+
if not stix_format:
703+
mobile_campaigns = self.translate_stix_objects(mobile_campaigns)
704+
return mobile_campaigns
705+
648706
def get_mobile_techniques(self, skip_revoked_deprecated=True, include_subtechniques=True, stix_format=True):
649707
""" Extracts all the available techniques STIX objects in the Mobile ATT&CK matrix
650708
@@ -945,7 +1003,32 @@ def get_stix_objects(self, stix_format=True):
9451003
for resource_type in attack_stix_objects[matrix].keys():
9461004
attack_stix_objects[matrix][resource_type] = self.translate_stix_objects(attack_stix_objects[matrix][resource_type])
9471005
return attack_stix_objects
948-
1006+
1007+
def get_campaigns(self, skip_revoked_deprecated=True, stix_format=True):
1008+
""" Extracts all the available campaigns STIX objects across all ATT&CK matrices
1009+
1010+
Args:
1011+
skip_revoked_deprecated (bool): default True. Skip revoked and deprecated STIX objects.
1012+
stix_format (bool): Returns results in original STIX format or friendly syntax (e.g. 'attack-pattern' or 'technique')
1013+
1014+
Returns:
1015+
List of STIX objects
1016+
"""
1017+
1018+
enterprise_campaigns = self.get_enterprise_campaigns()
1019+
mobile_campaigns = self.get_mobile_campaigns()
1020+
for mc in mobile_campaigns:
1021+
if mc not in enterprise_campaigns:
1022+
enterprise_campaigns.append(mc)
1023+
1024+
if skip_revoked_deprecated:
1025+
enterprise_campaigns = self.remove_revoked_deprecated(enterprise_campaigns)
1026+
1027+
if not stix_format:
1028+
enterprise_campaigns = self.translate_stix_objects(enterprise_campaigns)
1029+
1030+
return enterprise_campaigns
1031+
9491032
def get_techniques(self, include_subtechniques=True, skip_revoked_deprecated=True, enrich_data_sources=False, stix_format=True):
9501033
""" Extracts all the available techniques STIX objects across all ATT&CK matrices
9511034
@@ -1282,6 +1365,36 @@ def get_object_by_attack_id(self, object_type, attack_id, stix_format=True):
12821365
all_stix_objects = self.translate_stix_objects(all_stix_objects)
12831366
return all_stix_objects
12841367

1368+
def get_campaign_by_alias(self, campaign_alias, case=True, stix_format=True):
1369+
""" Extracts campaign STIX objects by alias name accross all ATT&CK matrices
1370+
1371+
Args:
1372+
campaign_alias (str) : Alias of threat actor group
1373+
case (bool) : case sensitive or not
1374+
stix_format (bool): Returns results in original STIX format or friendly syntax (e.g. 'attack-pattern' or 'technique')
1375+
1376+
Returns:
1377+
List of STIX objects
1378+
1379+
"""
1380+
if not case:
1381+
all_campaigns = self.get_campaigns()
1382+
all_campaigns_list = list()
1383+
for campaign in all_campaigns:
1384+
if "aliases" in campaign.keys():
1385+
for alias in campaign['aliases']:
1386+
if campaign_alias.lower() in alias.lower():
1387+
all_campaigns_list.append(campaign)
1388+
else:
1389+
filter_objects = [
1390+
Filter('type', '=', 'campaign'),
1391+
Filter('aliases', '=', campaign_alias)
1392+
]
1393+
all_campaigns_list = self.COMPOSITE_DS.query(filter_objects)
1394+
if not stix_format:
1395+
all_campaigns_list = self.translate_stix_objects(all_campaigns_list)
1396+
return all_campaigns_list
1397+
12851398
def get_group_by_alias(self, group_alias, case=True, stix_format=True):
12861399
""" Extracts group STIX objects by alias name accross all ATT&CK matrices
12871400
@@ -1311,7 +1424,27 @@ def get_group_by_alias(self, group_alias, case=True, stix_format=True):
13111424
if not stix_format:
13121425
all_groups_list = self.translate_stix_objects(all_groups_list)
13131426
return all_groups_list
1314-
1427+
1428+
def get_campaigns_since_time(self, timestamp, stix_format=True):
1429+
""" Extracts campaings STIX objects since specific time accross all ATT&CK matrices
1430+
1431+
Args:
1432+
timestamp (timestamp): Timestamp
1433+
stix_format (bool): Returns results in original STIX format or friendly syntax (e.g. 'attack-pattern' or 'technique')
1434+
1435+
Returns:
1436+
List of STIX objects
1437+
1438+
"""
1439+
filter_objects = [
1440+
Filter('type', '=', 'campaigns'),
1441+
Filter('created', '>', timestamp)
1442+
]
1443+
all_campaigns_list = self.COMPOSITE_DS.query(filter_objects)
1444+
if not stix_format:
1445+
all_campaigns_list = self.translate_stix_objects(all_campaigns_list)
1446+
return all_campaigns_list
1447+
13151448
def get_techniques_since_time(self, timestamp, stix_format=True):
13161449
""" Extracts techniques STIX objects since specific time accross all ATT&CK matrices
13171450
@@ -1804,4 +1937,4 @@ def enrich_techniques_data_sources(self, stix_object):
18041937
if technique_ds:
18051938
new_data_sources = [ v for v in technique_ds.values()]
18061939
stix_object[i] = stix_object[i].new_version(x_mitre_data_sources = new_data_sources)
1807-
return stix_object
1940+
return stix_object

0 commit comments

Comments
 (0)