Skip to content

Commit 6a5afa6

Browse files
committed
defined clients for each domain leveraging code objects
1 parent fe0b183 commit 6a5afa6

5 files changed

Lines changed: 186 additions & 0 deletions

File tree

src/attackcti/domains/__init__.py

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,9 @@
1+
"""Domain helper package (enterprise/mobile/ics)."""
2+
3+
from .base import DomainClientBase
4+
from .enterprise import EnterpriseClient
5+
from .ics import ICSClient
6+
from .mobile import MobileClient
7+
8+
__all__ = ["DomainClientBase", "EnterpriseClient", "MobileClient", "ICSClient"]
9+

src/attackcti/domains/base.py

Lines changed: 105 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,105 @@
1+
"""Shared domain client implementation."""
2+
3+
from __future__ import annotations
4+
5+
from typing import Any, Dict
6+
7+
from stix2 import CompositeDataSource, Filter, MemorySource, TAXIICollectionSource
8+
9+
from ..core.query_client import QueryClient
10+
from ..models import pydantic_model_mapping
11+
12+
13+
def _filter_software_by_type(items: list[Any], *, stix_type: str) -> list[Any]:
14+
"""Return software objects matching a specific STIX type."""
15+
out: list[Any] = []
16+
for item in items:
17+
if isinstance(item, dict):
18+
if item.get("type") == stix_type:
19+
out.append(item)
20+
else:
21+
item_type = getattr(item, "type", None)
22+
if item_type == stix_type:
23+
out.append(item)
24+
return out
25+
26+
27+
class DomainClientBase:
28+
"""Base class for domain-scoped clients (enterprise/mobile/ics)."""
29+
30+
def __init__(
31+
self,
32+
*,
33+
data_source: TAXIICollectionSource | MemorySource | None,
34+
) -> None:
35+
self.data_source = data_source
36+
if self.data_source is None:
37+
raise RuntimeError("domain source is not loaded")
38+
# Set up a composite data source.
39+
composite = CompositeDataSource()
40+
# Add the domain-specific data source.
41+
composite.add_data_sources([self.data_source])
42+
# Initialize the query client.
43+
self._query_client = QueryClient(composite, pydantic_map=pydantic_model_mapping)
44+
45+
def get(self, stix_format: bool = True) -> dict[str, list[Any]]:
46+
"""Return a bundle of common ATT&CK objects for the domain."""
47+
return {
48+
"techniques": self.get_techniques(stix_format=stix_format),
49+
"data-component": self.get_data_components(stix_format=stix_format),
50+
"mitigations": self.get_mitigations(stix_format=stix_format),
51+
"groups": self.get_groups(stix_format=stix_format),
52+
"malware": self.get_malware(stix_format=stix_format),
53+
"tools": self.get_tools(stix_format=stix_format),
54+
"data-source": self.get_data_sources(stix_format=stix_format),
55+
"relationships": self.get_relationships(stix_format=stix_format),
56+
"tactics": self.get_tactics(stix_format=stix_format),
57+
"matrix": self.data_source.query(Filter("type", "=", "x-mitre-matrix")),
58+
"identity": self.data_source.query(Filter("type", "=", "identity")),
59+
"marking-definition": self.data_source.query(Filter("type", "=", "marking-definition")),
60+
"campaigns": self.get_campaigns(stix_format=stix_format),
61+
}
62+
63+
# Domain-scoped wrappers that delegate to the core query helpers.
64+
65+
def get_techniques(self, stix_format: bool = True) -> list[Any]:
66+
"""Return techniques for this domain."""
67+
return self._query_client.techniques.get_techniques(stix_format=stix_format)
68+
69+
def get_data_components(self, stix_format: bool = True) -> list[Dict[str, Any]]:
70+
"""Return data components for this domain."""
71+
return self._query_client.data_sources.get_data_components(stix_format=stix_format)
72+
73+
def get_mitigations(self, stix_format: bool = True) -> list[Any]:
74+
"""Return mitigations for this domain."""
75+
return self._query_client.mitigations.get_mitigations(stix_format=stix_format)
76+
77+
def get_groups(self, stix_format: bool = True) -> list[Any]:
78+
"""Return intrusion-set groups for this domain."""
79+
return self._query_client.groups.get_groups(stix_format=stix_format)
80+
81+
def get_malware(self, stix_format: bool = True) -> list[Any]:
82+
"""Return malware for this domain."""
83+
software = self._query_client.software.get_software(stix_format=stix_format)
84+
return _filter_software_by_type(software, stix_type="malware")
85+
86+
def get_tools(self, stix_format: bool = True) -> list[Any]:
87+
"""Return tools for this domain."""
88+
software = self._query_client.software.get_software(stix_format=stix_format)
89+
return _filter_software_by_type(software, stix_type="tool")
90+
91+
def get_data_sources(self, stix_format: bool = True) -> list[Dict[str, Any]]:
92+
"""Return data sources for this domain."""
93+
return self._query_client.data_sources.get_data_sources(stix_format=stix_format)
94+
95+
def get_relationships(self, stix_format: bool = True) -> list[Any]:
96+
"""Return relationships for this domain."""
97+
return self._query_client.relationships.get_relationships(stix_format=stix_format)
98+
99+
def get_tactics(self, stix_format: bool = True) -> list[Dict[str, Any]]:
100+
"""Return tactics for this domain."""
101+
return self._query_client.tactics.get_tactics(stix_format=stix_format)
102+
103+
def get_campaigns(self, stix_format: bool = True) -> list[Any]:
104+
"""Return campaigns for this domain."""
105+
return self._query_client.campaigns.get_campaigns(stix_format=stix_format)
Lines changed: 24 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,24 @@
1+
"""Enterprise ATT&CK domain client."""
2+
3+
from __future__ import annotations
4+
5+
from stix2 import MemorySource, TAXIICollectionSource
6+
7+
from .base import DomainClientBase
8+
9+
10+
class EnterpriseClient(DomainClientBase):
11+
"""Enterprise-domain client."""
12+
13+
def __init__(
14+
self,
15+
*,
16+
data_source: TAXIICollectionSource | MemorySource | None,
17+
) -> None:
18+
super().__init__(
19+
data_source=data_source,
20+
)
21+
22+
def get_enterprise(self, stix_format: bool = True) -> dict[str, list[object]]:
23+
"""Alias for `get` to preserve older naming."""
24+
return self.get(stix_format=stix_format)

src/attackcti/domains/ics.py

Lines changed: 24 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,24 @@
1+
"""ICS ATT&CK domain client."""
2+
3+
from __future__ import annotations
4+
5+
from stix2 import MemorySource, TAXIICollectionSource
6+
7+
from .base import DomainClientBase
8+
9+
10+
class ICSClient(DomainClientBase):
11+
"""ICS-domain client."""
12+
13+
def __init__(
14+
self,
15+
*,
16+
data_source: TAXIICollectionSource | MemorySource | None,
17+
) -> None:
18+
super().__init__(
19+
data_source=data_source,
20+
)
21+
22+
def get_ics(self, stix_format: bool = True) -> dict[str, list[object]]:
23+
"""Alias for `get` to preserve older naming."""
24+
return self.get(stix_format=stix_format)

src/attackcti/domains/mobile.py

Lines changed: 24 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,24 @@
1+
"""Mobile ATT&CK domain client."""
2+
3+
from __future__ import annotations
4+
5+
from stix2 import MemorySource, TAXIICollectionSource
6+
7+
from .base import DomainClientBase
8+
9+
10+
class MobileClient(DomainClientBase):
11+
"""Mobile-domain client."""
12+
13+
def __init__(
14+
self,
15+
*,
16+
data_source: TAXIICollectionSource | MemorySource | None,
17+
) -> None:
18+
super().__init__(
19+
data_source=data_source,
20+
)
21+
22+
def get_mobile(self, stix_format: bool = True) -> dict[str, list[object]]:
23+
"""Alias for `get` to preserve older naming."""
24+
return self.get(stix_format=stix_format)

0 commit comments

Comments
 (0)