Skip to content

Commit fe0b183

Browse files
committed
created clients for core objects to organize code
1 parent 22a6213 commit fe0b183

13 files changed

Lines changed: 2303 additions & 0 deletions

src/attackcti/core/__init__.py

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,10 @@
1+
"""Core helpers for querying and enriching ATT&CK STIX content."""
2+
3+
from __future__ import annotations
4+
5+
from .query_client import QueryClient
6+
7+
__all__ = [
8+
"QueryClient",
9+
]
10+
Lines changed: 25 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,25 @@
1+
"""Initialization of the attackcti.core.objects module."""
2+
3+
from .analytics import AnalyticsClient
4+
from .campaigns import CampaignsClient
5+
from .data_sources import DataSourcesClient
6+
from .detections import DetectionsClient
7+
from .groups import GroupsClient
8+
from .mitigations import MitigationsClient
9+
from .relationships import RelationshipsClient
10+
from .software import SoftwareClient
11+
from .tactics import TacticsClient
12+
from .techniques import TechniquesClient
13+
14+
__all__ = [
15+
"CampaignsClient",
16+
"DataSourcesClient",
17+
"AnalyticsClient",
18+
"DetectionsClient",
19+
"GroupsClient",
20+
"MitigationsClient",
21+
"RelationshipsClient",
22+
"SoftwareClient",
23+
"TacticsClient",
24+
"TechniquesClient",
25+
]
Lines changed: 60 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,60 @@
1+
"""Cross-domain analytics query helpers."""
2+
3+
from __future__ import annotations
4+
5+
from typing import Any, Callable, Iterable
6+
7+
from stix2 import CompositeDataSource, Filter
8+
9+
from ...models import Analytic as AnalyticModel
10+
from ...utils.stix import (
11+
as_dict,
12+
parse_stix_objects,
13+
query_stix_objects_by_ids,
14+
remove_revoked_deprecated,
15+
)
16+
17+
18+
class AnalyticsClient:
19+
"""Cross-domain analytics client (COMPOSITE_DS-backed)."""
20+
21+
def __init__(
22+
self,
23+
*,
24+
data_source: CompositeDataSource,
25+
remove_fn: Callable = remove_revoked_deprecated,
26+
parse_fn: Callable = parse_stix_objects,
27+
) -> None:
28+
"""Initialize the client with a data source and helper callbacks."""
29+
self._data_source = data_source
30+
self._remove_fn = remove_fn
31+
self._parse_fn = parse_fn
32+
33+
def get_analytics(self, *, stix_format: bool = True) -> list[dict[str, Any]]:
34+
"""Return all analytic objects."""
35+
analytics = self._data_source.query(Filter("type", "=", "x-mitre-analytic"))
36+
if not stix_format:
37+
return self._parse_fn(analytics, AnalyticModel)
38+
return [as_dict(a) for a in analytics]
39+
40+
def get_analytics_by_ids(self, ids: Iterable[str]) -> dict[str, dict[str, Any]]:
41+
"""Return analytics keyed by their STIX id."""
42+
analytics_dict: dict[str, dict[str, Any]] = {}
43+
analytic_ids = {aid for aid in ids if isinstance(aid, str) and aid}
44+
if not analytic_ids:
45+
return analytics_dict
46+
47+
analytics = query_stix_objects_by_ids(
48+
data_source=self._data_source,
49+
stix_type="x-mitre-analytic",
50+
ids=analytic_ids
51+
)
52+
for analytic in analytics:
53+
analytic_dict = as_dict(analytic)
54+
analytic_id = analytic_dict.get("id")
55+
if not isinstance(analytic_id, str) or not analytic_id:
56+
continue
57+
log_sources = analytic_dict.get("x_mitre_log_source_references") or []
58+
analytic_dict["x_attackcti_log_sources"] = [ls for ls in log_sources if isinstance(ls, dict)]
59+
analytics_dict[analytic_id] = analytic_dict
60+
return analytics_dict
Lines changed: 108 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,108 @@
1+
"""Cross-domain campaign query helpers."""
2+
3+
from __future__ import annotations
4+
5+
from typing import Any, Callable, Dict, List, Union
6+
7+
from stix2 import CompositeDataSource, Filter
8+
from stix2.v21.sdo import Campaign as CampaignV21
9+
10+
from ...models import Campaign as CampaignModel
11+
12+
13+
class CampaignsClient:
14+
"""Campaigns query helper class."""
15+
16+
def __init__(
17+
self,
18+
*,
19+
data_source: CompositeDataSource,
20+
remove_fn: Callable | None = None,
21+
parse_fn: Callable | None = None,
22+
) -> None:
23+
"""Initialize the client with a composite data source."""
24+
self._data_source = data_source
25+
self._remove_fn = remove_fn
26+
self._parse_fn = parse_fn
27+
28+
def get_campaigns(self, *, skip_revoked_deprecated: bool = True, stix_format: bool = True) -> List[Union[CampaignV21, Dict[str, Any]]]:
29+
"""Return campaigns across ATT&CK matrices.
30+
31+
Parameters
32+
----------
33+
skip_revoked_deprecated
34+
When `True`, omit revoked/deprecated campaigns.
35+
stix_format
36+
When `True`, return STIX objects; when `False`, parse to the
37+
`Campaign` Pydantic model.
38+
39+
Returns
40+
-------
41+
List[Union[CampaignV21, Dict[str, Any]]]
42+
Campaign objects in the requested format.
43+
"""
44+
all_campaigns = self._data_source.query([Filter("type", "=", "campaign")])
45+
if skip_revoked_deprecated:
46+
all_campaigns = self._remove_fn(all_campaigns)
47+
if not stix_format:
48+
all_campaigns = self._parse_fn(all_campaigns, CampaignModel)
49+
return all_campaigns
50+
51+
52+
def get_campaign_by_alias(self, *, alias: str, case: bool = True, stix_format: bool = True) -> List[Union[CampaignV21, Dict[str, Any]]]:
53+
"""Return campaigns that match a provided alias.
54+
55+
Parameters
56+
----------
57+
alias
58+
Alias to match.
59+
case
60+
When `True`, perform case-sensitive match; otherwise performs
61+
case-insensitive containment match.
62+
stix_format
63+
When `True`, return STIX objects; when `False`, parse to the
64+
`Campaign` Pydantic model.
65+
66+
Returns
67+
-------
68+
List[Union[CampaignV21, Dict[str, Any]]]
69+
Matching campaign objects in the requested format.
70+
"""
71+
if not case:
72+
all_campaigns = self.get_campaigns(stix_format=True)
73+
out: list[Any] = []
74+
for campaign in all_campaigns:
75+
if "aliases" in campaign.keys():
76+
for campaign_alias in campaign["aliases"]:
77+
if alias.lower() in campaign_alias.lower():
78+
out.append(CampaignModel)
79+
else:
80+
filter_objects = [Filter("type", "=", "campaign"), Filter("aliases", "contains", alias)]
81+
out = self._data_source.query(filter_objects)
82+
83+
if not stix_format:
84+
out = self._parse_fn(out, CampaignModel)
85+
return out
86+
87+
88+
def get_campaigns_since_time(self, *, timestamp: str, stix_format: bool = True) -> List[Union[CampaignV21, Dict[str, Any]]]:
89+
"""Return campaigns created after the provided timestamp.
90+
91+
Parameters
92+
----------
93+
timestamp
94+
Timestamp string for filtering.
95+
stix_format
96+
When `True`, return STIX objects; when `False`, parse to the
97+
`Campaign` Pydantic model.
98+
99+
Returns
100+
-------
101+
List[Union[CampaignV21, Dict[str, Any]]]
102+
Campaign objects in the requested format.
103+
"""
104+
filter_objects = [Filter("type", "=", "campaign"), Filter("created", ">", timestamp)]
105+
out = self._data_source.query(filter_objects)
106+
if not stix_format:
107+
out = self._parse_fn(out, CampaignModel)
108+
return out
Lines changed: 105 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,105 @@
1+
"""Cross-domain data source/component helpers."""
2+
3+
from __future__ import annotations
4+
5+
from typing import Any, Callable, Dict, Iterable, List
6+
from warnings import warn
7+
8+
from stix2 import CompositeDataSource, Filter
9+
10+
from ...models import DataComponent as DataComponentModel
11+
from ...models import DataSource as DataSourceModel
12+
13+
14+
class DataSourcesClient:
15+
"""Data source and data component queries."""
16+
17+
def __init__(
18+
self,
19+
*,
20+
data_source: CompositeDataSource,
21+
remove_fn: Callable | None = None,
22+
parse_fn: Callable | None = None,
23+
) -> None:
24+
"""Initialize the client with a composite data source."""
25+
self._data_source = data_source
26+
self._remove_fn = remove_fn
27+
self._parse_fn = parse_fn
28+
29+
def get_data_components(self, *, skip_revoked_deprecated: bool = True, stix_format: bool = True) -> List[Dict[str, Any]]:
30+
"""Return data components across all domains.
31+
32+
Parameters
33+
----------
34+
skip_revoked_deprecated
35+
When `True`, omit revoked/deprecated data components.
36+
stix_format
37+
When `True`, return STIX objects; when `False`, parse to the
38+
`DataComponent` Pydantic model.
39+
40+
Returns
41+
-------
42+
List[Dict[str, Any]]
43+
Data component objects in the requested format.
44+
"""
45+
all_data_components = self._data_source.query([Filter("type", "=", "x-mitre-data-component")])
46+
if skip_revoked_deprecated:
47+
all_data_components = self._remove_fn(all_data_components)
48+
if not stix_format:
49+
all_data_components = self._parse_fn(all_data_components, DataComponentModel)
50+
return all_data_components
51+
52+
def get_data_sources(self, *, include_data_components: bool = False, stix_format: bool = True) -> List[Dict[str, Any]]:
53+
"""Return data sources across all domains.
54+
55+
Parameters
56+
----------
57+
include_data_components
58+
When `True`, enrich data sources with related data components
59+
(requires Pydantic parsing).
60+
stix_format
61+
When `True`, return STIX objects; when `False`, parse to the
62+
`DataSource` Pydantic model.
63+
64+
Returns
65+
-------
66+
List[Dict[str, Any]]
67+
Data source objects in the requested format.
68+
"""
69+
warn(
70+
"Data Sources (`x-mitre-data-source`) are deprecated as of ATT&CK Specification 3.3.0. "
71+
"Data Sources are superseded by the Detection Strategy framework..",
72+
DeprecationWarning,
73+
stacklevel=2,
74+
)
75+
all_data_sources = self._data_source.query([Filter("type", "=", "x-mitre-data-source")])
76+
all_data_sources = self._remove_fn(all_data_sources)
77+
if include_data_components:
78+
all_data_sources = self._parse_fn(all_data_sources, DataSourceModel, include_data_components=True)
79+
elif not stix_format:
80+
all_data_sources = self._parse_fn(all_data_sources, DataSourceModel)
81+
return all_data_sources
82+
83+
def get_data_components_by_ids(
84+
self,
85+
ids: Iterable[str],
86+
*,
87+
stix_format: bool = True,
88+
data_components: list[dict[str, Any]] | None = None,
89+
skip_revoked_deprecated: bool = True,
90+
) -> list[dict[str, Any]]:
91+
"""Return data component objects for the requested ids."""
92+
dc_ids = {did for did in ids if isinstance(did, str) and did}
93+
if not dc_ids:
94+
return []
95+
96+
if data_components is None:
97+
data_components = self.get_data_components(
98+
skip_revoked_deprecated=skip_revoked_deprecated,
99+
stix_format=True,
100+
)
101+
102+
selected = [dc for dc in data_components if dc.get("id") in dc_ids]
103+
if not stix_format:
104+
return self._parse_fn(selected, DataComponentModel)
105+
return selected

0 commit comments

Comments
 (0)