Skip to content

Commit 168e2dd

Browse files
authored
[Google TI Feeds] fetch relationship entities directly from collections API (#6236)
1 parent b5f584b commit 168e2dd

11 files changed

Lines changed: 166 additions & 262 deletions

File tree

external-import/google-ti-feeds/CONTRIBUTING.md

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -292,7 +292,7 @@ CONVERTER_CONFIGS["{entity_type}"] = GTI_{ENTITY_TYPE}_CONVERTER_CONFIG
292292

293293
```python
294294
# File: src/custom/client_api.py
295-
# Add to fetch_subentities_ids method
295+
# Add to fetch_subentities method
296296
subentity_types = [
297297
"malware_families",
298298
"threat_actors",

external-import/google-ti-feeds/connector/src/custom/client_api/client_api.py

Lines changed: 5 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -175,38 +175,24 @@ async def fetch_campaigns(
175175
async for campaign_data in self.campaign_client.fetch_campaigns(initial_state):
176176
yield campaign_data
177177

178-
async def fetch_subentities_ids(
178+
async def fetch_subentities(
179179
self, entity_name: str, entity_id: str, subentity_types: list[str]
180-
) -> dict[str, list[str]]:
181-
"""Fetch subentities IDs from the API.
180+
) -> dict[str, list[Any]]:
181+
"""Fetch related subentities with full payloads from the API.
182182
183183
Args:
184184
entity_name (str): The name of the entity.
185185
entity_id (str): The ID of the entity.
186186
subentity_types (list[str]): The type of subentities to fetch.
187187
188188
Returns:
189-
dict[str, list[str]]: The fetched subentities IDs.
189+
dict[str, list[Any]]: The fetched related subentities.
190190
191191
"""
192-
return await self.shared_client.fetch_subentities_ids(
192+
return await self.shared_client.fetch_subentities(
193193
entity_name, entity_id, subentity_types
194194
)
195195

196-
async def fetch_subentity_details(
197-
self, subentity_ids: dict[str, list[str]]
198-
) -> dict[str, list[Any]]:
199-
"""Fetch subentity details in parallel for multiple IDs.
200-
201-
Args:
202-
subentity_ids: dictionary mapping entity types to lists of IDs
203-
204-
Returns:
205-
dictionary mapping entity types to lists of fetched entities
206-
207-
"""
208-
return await self.shared_client.fetch_subentity_details(subentity_ids)
209-
210196
def _create_fetcher_factory(self) -> GenericFetcherFactory:
211197
"""Create and configure the fetcher factory with all configurations."""
212198
base_headers = {

external-import/google-ti-feeds/connector/src/custom/client_api/client_api_shared.py

Lines changed: 54 additions & 101 deletions
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,6 @@
44
from typing import Any
55

66
from connector.src.custom.client_api.client_api_base import BaseClientAPI
7-
from connector.src.utils.api_engine.exceptions.api_http_error import ApiHttpError
87

98
LOG_PREFIX = "[FetcherShared]"
109

@@ -22,28 +21,29 @@ def __init__(
2221
"""Initialize Shared Client API."""
2322
super().__init__(config, logger, api_client, fetcher_factory)
2423

25-
async def fetch_subentities_ids(
24+
async def fetch_subentities(
2625
self, entity_name: str, entity_id: str, subentity_types: list[str]
27-
) -> dict[str, list[str]]:
28-
"""Fetch subentities IDs from the API.
26+
) -> dict[str, list[Any]]:
27+
"""Fetch related subentities with full payloads from the API.
2928
3029
Args:
3130
entity_name (str): The name of the entity.
3231
entity_id (str): The ID of the entity.
3332
subentity_types (list[str]): The type of subentities to fetch.
3433
3534
Returns:
36-
dict[str, list[str]]: The fetched subentities IDs.
35+
dict[str, list[Any]]: Related subentities grouped by type.
3736
3837
"""
39-
subentities_ids = {}
38+
subentities: dict[str, list[Any]] = {}
39+
total_collection_calls = 0
4040

4141
relationships_fetcher = self.fetcher_factory.create_fetcher_by_name(
4242
"relationships", base_url=self.config.api_url.unicode_string()
4343
)
4444
try:
4545
for subentity_type in subentity_types:
46-
all_ids = []
46+
all_entities: list[Any] = []
4747

4848
params = {
4949
entity_name: entity_id,
@@ -55,24 +55,17 @@ async def fetch_subentities_ids(
5555
async for page_data in self._paginate_with_cursor(
5656
relationships_fetcher, params, f"{subentity_type} relationships"
5757
):
58+
total_collection_calls += 1
5859
if isinstance(page_data, list):
59-
all_ids.extend(
60-
[
61-
item["id"]
62-
for item in page_data
63-
if isinstance(item, dict) and item.get("id")
64-
]
65-
)
60+
all_entities.extend(page_data)
6661
elif isinstance(page_data, dict) and "data" in page_data:
6762
data = page_data["data"]
6863
if isinstance(data, list):
69-
all_ids.extend(
70-
[
71-
item["id"]
72-
for item in data
73-
if isinstance(item, dict) and item.get("id")
74-
]
75-
)
64+
all_entities.extend(data)
65+
elif isinstance(data, dict):
66+
all_entities.append(data)
67+
elif isinstance(page_data, dict):
68+
all_entities.append(page_data)
7669

7770
except Exception as e:
7871
self.logger.debug(
@@ -84,21 +77,24 @@ async def fetch_subentities_ids(
8477
},
8578
)
8679

87-
if all_ids:
80+
if all_entities:
81+
typed_entities = self._deserialize_subentities(
82+
subentity_type, all_entities
83+
)
8884
self.logger.info(
89-
"Retrieved relationship IDs",
85+
"Retrieved related entities",
9086
{
9187
"prefix": LOG_PREFIX,
92-
"count": len(all_ids),
88+
"count": len(typed_entities),
9389
"subentity_type": subentity_type,
9490
"entity_name": entity_name,
9591
"entity_id": entity_id,
9692
},
9793
)
98-
subentities_ids[subentity_type] = all_ids
94+
subentities[subentity_type] = typed_entities
9995
else:
10096
self.logger.debug(
101-
"No relationship IDs found",
97+
"No related entities found",
10298
{
10399
"prefix": LOG_PREFIX,
104100
"subentity_type": subentity_type,
@@ -107,7 +103,6 @@ async def fetch_subentities_ids(
107103
},
108104
)
109105

110-
return subentities_ids
111106
except Exception as e:
112107
self.logger.error(
113108
"Failed to gather relationships",
@@ -119,7 +114,7 @@ async def fetch_subentities_ids(
119114
},
120115
)
121116
return {entity_type: [] for entity_type in subentity_types}
122-
finally:
117+
else:
123118
self.logger.info(
124119
"Finished gathering relationships",
125120
{
@@ -128,85 +123,43 @@ async def fetch_subentities_ids(
128123
"entity_id": entity_id,
129124
},
130125
)
126+
return subentities
131127

132-
async def fetch_subentity_details(
133-
self, subentity_ids: dict[str, list[str]]
134-
) -> dict[str, list[Any]]:
135-
"""Fetch subentity details in parallel for multiple IDs.
136-
137-
Args:
138-
subentity_ids: dictionary mapping entity types to lists of IDs
139-
140-
Returns:
141-
dictionary mapping entity types to lists of fetched entities
142-
143-
"""
144-
subentities: dict[str, list[Any]] = {}
145-
total_to_fetch = sum(len(ids) for ids in subentity_ids.values())
146-
147-
if total_to_fetch > 0:
148-
self.logger.info(
149-
"Fetching details for subentities",
150-
{"prefix": LOG_PREFIX, "total_to_fetch": total_to_fetch},
128+
def _deserialize_subentities(
129+
self, subentity_type: str, entities: list[Any]
130+
) -> list[Any]:
131+
"""Deserialize related entities to their configured model when available."""
132+
try:
133+
fetcher = self.fetcher_factory.create_fetcher_by_name(
134+
subentity_type, base_url=self.config.api_url.unicode_string()
151135
)
136+
response_model = fetcher.config.response_model
137+
except Exception as e:
138+
self.logger.debug(
139+
"Could not create typed fetcher for related entities",
140+
{
141+
"prefix": LOG_PREFIX,
142+
"subentity_type": subentity_type,
143+
"error": str(e),
144+
},
145+
)
146+
response_model = None
152147

153-
for entity_type, ids in subentity_ids.items():
154-
if not ids:
155-
subentities[entity_type] = []
156-
continue
157-
158-
try:
159-
fetcher = self.fetcher_factory.create_fetcher_by_name(
160-
entity_type, base_url=self.config.api_url.unicode_string()
161-
)
162-
entities = await fetcher.fetch_multiple(ids)
163-
subentities[entity_type] = entities
164-
self.logger.debug(
165-
"Fetched entities",
166-
{
167-
"prefix": LOG_PREFIX,
168-
"count": len(entities),
169-
"entity_type": entity_type,
170-
},
171-
)
172-
173-
except ApiHttpError as e:
174-
if e.status_code == 404 and entity_type == "files":
175-
self.logger.info(
176-
"404 errors expected for files (files may no longer exist in VirusTotal). Treating as normal behavior.",
177-
{"prefix": LOG_PREFIX},
178-
)
179-
subentities[entity_type] = []
180-
else:
181-
self.logger.error(
182-
"HTTP error fetching details",
148+
deserialized_entities: list[Any] = []
149+
for entity in entities:
150+
if response_model and isinstance(entity, dict):
151+
try:
152+
deserialized_entities.append(response_model.model_validate(entity))
153+
continue
154+
except Exception as e:
155+
self.logger.debug(
156+
"Failed to deserialize related entity, keeping raw payload",
183157
{
184158
"prefix": LOG_PREFIX,
185-
"status_code": e.status_code,
186-
"entity_type": entity_type,
159+
"subentity_type": subentity_type,
187160
"error": str(e),
188161
},
189162
)
190-
subentities[entity_type] = []
191-
except Exception as e:
192-
self.logger.error(
193-
"Failed to fetch details",
194-
{
195-
"prefix": LOG_PREFIX,
196-
"entity_type": entity_type,
197-
"error": str(e),
198-
},
199-
)
200-
subentities[entity_type] = []
201-
202-
if total_to_fetch > 0:
203-
fetched_summary = ", ".join(
204-
[f"{k}: {len(v)}" for k, v in subentities.items() if len(v) > 0]
205-
)
206-
if fetched_summary:
207-
self.logger.info(
208-
"Fetched details",
209-
{"prefix": LOG_PREFIX, "summary": fetched_summary},
210-
)
163+
deserialized_entities.append(entity)
211164

212-
return subentities
165+
return deserialized_entities

external-import/google-ti-feeds/connector/src/custom/configs/fetcher_config_common.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -43,7 +43,7 @@
4343

4444
GTI_RELATIONSHIP_FETCHER_CONFIG = GenericFetcherConfig(
4545
entity_type="relationships",
46-
endpoint="/collections/{entity_id}/relationships/{entity_type}",
46+
endpoint="/collections/{entity_id}/{entity_type}",
4747
display_name="relationships",
4848
exception_class=GTIRelationshipFetchError,
4949
response_model=None,

external-import/google-ti-feeds/connector/src/custom/orchestrators/campaign/orchestrator_campaign.py

Lines changed: 9 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -97,14 +97,14 @@ async def run(self, initial_state: dict[str, Any] | None) -> None:
9797
campaign_entities = self.converter.convert_campaign_to_stix(
9898
campaign
9999
)
100-
subentities_ids = await self.client_api.fetch_subentities_ids(
100+
subentities = await self.client_api.fetch_subentities(
101101
entity_name="entity_id",
102102
entity_id=campaign.id,
103103
subentity_types=subentity_types,
104104
)
105105

106106
rel_summary = ", ".join(
107-
[f"{k}: {len(v)}" for k, v in subentities_ids.items()]
107+
[f"{k}: {len(v)}" for k, v in subentities.items()]
108108
)
109109
if len(rel_summary) > 0:
110110
self.logger.info(
@@ -117,13 +117,12 @@ async def run(self, initial_state: dict[str, Any] | None) -> None:
117117
},
118118
)
119119

120-
# Skip fetch_subentity_details for attack_techniques (quota optimization)
121-
attack_technique_ids = subentities_ids.get("attack_techniques", [])
122-
filtered_subentities_ids = {
123-
k: v
124-
for k, v in subentities_ids.items()
125-
if k != "attack_techniques"
126-
}
120+
attack_technique_entities = subentities.pop("attack_techniques", [])
121+
attack_technique_ids = [
122+
attack_technique.id
123+
for attack_technique in attack_technique_entities
124+
if getattr(attack_technique, "id", None)
125+
]
127126

128127
if attack_technique_ids:
129128
self.logger.info(
@@ -134,11 +133,7 @@ async def run(self, initial_state: dict[str, Any] | None) -> None:
134133
},
135134
)
136135

137-
subentities_detailed = (
138-
await self.client_api.fetch_subentity_details(
139-
filtered_subentities_ids
140-
)
141-
)
136+
subentities_detailed = subentities
142137

143138
# Convert attack technique IDs to proper model format for conversion
144139
if attack_technique_ids:

0 commit comments

Comments
 (0)