Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion external-import/google-ti-feeds/CONTRIBUTING.md
Original file line number Diff line number Diff line change
Expand Up @@ -292,7 +292,7 @@ CONVERTER_CONFIGS["{entity_type}"] = GTI_{ENTITY_TYPE}_CONVERTER_CONFIG

```python
# File: src/custom/client_api.py
# Add to fetch_subentities_ids method
# Add to fetch_subentities method
subentity_types = [
"malware_families",
"threat_actors",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -175,38 +175,24 @@ async def fetch_campaigns(
async for campaign_data in self.campaign_client.fetch_campaigns(initial_state):
yield campaign_data

async def fetch_subentities_ids(
async def fetch_subentities(
self, entity_name: str, entity_id: str, subentity_types: list[str]
) -> dict[str, list[str]]:
"""Fetch subentities IDs from the API.
) -> dict[str, list[Any]]:
"""Fetch related subentities with full payloads from the API.

Args:
entity_name (str): The name of the entity.
entity_id (str): The ID of the entity.
subentity_types (list[str]): The type of subentities to fetch.

Returns:
dict[str, list[str]]: The fetched subentities IDs.
dict[str, list[Any]]: The fetched related subentities.

"""
return await self.shared_client.fetch_subentities_ids(
return await self.shared_client.fetch_subentities(
entity_name, entity_id, subentity_types
)

async def fetch_subentity_details(
self, subentity_ids: dict[str, list[str]]
) -> dict[str, list[Any]]:
"""Fetch subentity details in parallel for multiple IDs.

Args:
subentity_ids: dictionary mapping entity types to lists of IDs

Returns:
dictionary mapping entity types to lists of fetched entities

"""
return await self.shared_client.fetch_subentity_details(subentity_ids)

def _create_fetcher_factory(self) -> GenericFetcherFactory:
"""Create and configure the fetcher factory with all configurations."""
base_headers = {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,6 @@
from typing import Any

from connector.src.custom.client_api.client_api_base import BaseClientAPI
from connector.src.utils.api_engine.exceptions.api_http_error import ApiHttpError

LOG_PREFIX = "[FetcherShared]"

Expand All @@ -22,28 +21,29 @@ def __init__(
"""Initialize Shared Client API."""
super().__init__(config, logger, api_client, fetcher_factory)

async def fetch_subentities_ids(
async def fetch_subentities(
self, entity_name: str, entity_id: str, subentity_types: list[str]
) -> dict[str, list[str]]:
"""Fetch subentities IDs from the API.
) -> dict[str, list[Any]]:
"""Fetch related subentities with full payloads from the API.

Args:
entity_name (str): The name of the entity.
entity_id (str): The ID of the entity.
subentity_types (list[str]): The type of subentities to fetch.

Returns:
dict[str, list[str]]: The fetched subentities IDs.
dict[str, list[Any]]: Related subentities grouped by type.

"""
subentities_ids = {}
subentities: dict[str, list[Any]] = {}
total_collection_calls = 0

relationships_fetcher = self.fetcher_factory.create_fetcher_by_name(
"relationships", base_url=self.config.api_url.unicode_string()
)
try:
for subentity_type in subentity_types:
all_ids = []
all_entities: list[Any] = []

params = {
entity_name: entity_id,
Expand All @@ -55,24 +55,17 @@ async def fetch_subentities_ids(
async for page_data in self._paginate_with_cursor(
relationships_fetcher, params, f"{subentity_type} relationships"
):
total_collection_calls += 1
if isinstance(page_data, list):
all_ids.extend(
[
item["id"]
for item in page_data
if isinstance(item, dict) and item.get("id")
]
)
all_entities.extend(page_data)
elif isinstance(page_data, dict) and "data" in page_data:
data = page_data["data"]
if isinstance(data, list):
all_ids.extend(
[
item["id"]
for item in data
if isinstance(item, dict) and item.get("id")
]
)
all_entities.extend(data)
elif isinstance(data, dict):
all_entities.append(data)
elif isinstance(page_data, dict):
all_entities.append(page_data)

except Exception as e:
self.logger.debug(
Expand All @@ -84,21 +77,24 @@ async def fetch_subentities_ids(
},
)

if all_ids:
if all_entities:
typed_entities = self._deserialize_subentities(
subentity_type, all_entities
)
self.logger.info(
"Retrieved relationship IDs",
"Retrieved related entities",
{
"prefix": LOG_PREFIX,
"count": len(all_ids),
"count": len(typed_entities),
"subentity_type": subentity_type,
"entity_name": entity_name,
"entity_id": entity_id,
},
)
subentities_ids[subentity_type] = all_ids
subentities[subentity_type] = typed_entities
else:
self.logger.debug(
"No relationship IDs found",
"No related entities found",
{
"prefix": LOG_PREFIX,
"subentity_type": subentity_type,
Expand All @@ -107,7 +103,6 @@ async def fetch_subentities_ids(
},
)

return subentities_ids
except Exception as e:
self.logger.error(
"Failed to gather relationships",
Expand All @@ -119,7 +114,7 @@ async def fetch_subentities_ids(
},
)
return {entity_type: [] for entity_type in subentity_types}
finally:
else:
self.logger.info(
"Finished gathering relationships",
{
Expand All @@ -128,85 +123,43 @@ async def fetch_subentities_ids(
"entity_id": entity_id,
},
)
return subentities

async def fetch_subentity_details(
self, subentity_ids: dict[str, list[str]]
) -> dict[str, list[Any]]:
"""Fetch subentity details in parallel for multiple IDs.

Args:
subentity_ids: dictionary mapping entity types to lists of IDs

Returns:
dictionary mapping entity types to lists of fetched entities

"""
subentities: dict[str, list[Any]] = {}
total_to_fetch = sum(len(ids) for ids in subentity_ids.values())

if total_to_fetch > 0:
self.logger.info(
"Fetching details for subentities",
{"prefix": LOG_PREFIX, "total_to_fetch": total_to_fetch},
def _deserialize_subentities(
self, subentity_type: str, entities: list[Any]
) -> list[Any]:
"""Deserialize related entities to their configured model when available."""
try:
fetcher = self.fetcher_factory.create_fetcher_by_name(
subentity_type, base_url=self.config.api_url.unicode_string()
)
response_model = fetcher.config.response_model
except Exception as e:
self.logger.debug(
"Could not create typed fetcher for related entities",
{
"prefix": LOG_PREFIX,
"subentity_type": subentity_type,
"error": str(e),
},
)
response_model = None

for entity_type, ids in subentity_ids.items():
if not ids:
subentities[entity_type] = []
continue

try:
fetcher = self.fetcher_factory.create_fetcher_by_name(
entity_type, base_url=self.config.api_url.unicode_string()
)
entities = await fetcher.fetch_multiple(ids)
subentities[entity_type] = entities
self.logger.debug(
"Fetched entities",
{
"prefix": LOG_PREFIX,
"count": len(entities),
"entity_type": entity_type,
},
)

except ApiHttpError as e:
if e.status_code == 404 and entity_type == "files":
self.logger.info(
"404 errors expected for files (files may no longer exist in VirusTotal). Treating as normal behavior.",
{"prefix": LOG_PREFIX},
)
subentities[entity_type] = []
else:
self.logger.error(
"HTTP error fetching details",
deserialized_entities: list[Any] = []
for entity in entities:
if response_model and isinstance(entity, dict):
try:
deserialized_entities.append(response_model.model_validate(entity))
continue
except Exception as e:
self.logger.debug(
"Failed to deserialize related entity, keeping raw payload",
{
"prefix": LOG_PREFIX,
"status_code": e.status_code,
"entity_type": entity_type,
"subentity_type": subentity_type,
"error": str(e),
},
)
subentities[entity_type] = []
except Exception as e:
self.logger.error(
"Failed to fetch details",
{
"prefix": LOG_PREFIX,
"entity_type": entity_type,
"error": str(e),
},
)
subentities[entity_type] = []

if total_to_fetch > 0:
fetched_summary = ", ".join(
[f"{k}: {len(v)}" for k, v in subentities.items() if len(v) > 0]
)
if fetched_summary:
self.logger.info(
"Fetched details",
{"prefix": LOG_PREFIX, "summary": fetched_summary},
)
deserialized_entities.append(entity)

return subentities
return deserialized_entities
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@

GTI_RELATIONSHIP_FETCHER_CONFIG = GenericFetcherConfig(
entity_type="relationships",
endpoint="/collections/{entity_id}/relationships/{entity_type}",
endpoint="/collections/{entity_id}/{entity_type}",
display_name="relationships",
exception_class=GTIRelationshipFetchError,
response_model=None,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -97,14 +97,14 @@ async def run(self, initial_state: dict[str, Any] | None) -> None:
campaign_entities = self.converter.convert_campaign_to_stix(
campaign
)
subentities_ids = await self.client_api.fetch_subentities_ids(
subentities = await self.client_api.fetch_subentities(
entity_name="entity_id",
entity_id=campaign.id,
subentity_types=subentity_types,
)

rel_summary = ", ".join(
[f"{k}: {len(v)}" for k, v in subentities_ids.items()]
[f"{k}: {len(v)}" for k, v in subentities.items()]
)
if len(rel_summary) > 0:
self.logger.info(
Expand All @@ -117,13 +117,12 @@ async def run(self, initial_state: dict[str, Any] | None) -> None:
},
)

# Skip fetch_subentity_details for attack_techniques (quota optimization)
attack_technique_ids = subentities_ids.get("attack_techniques", [])
filtered_subentities_ids = {
k: v
for k, v in subentities_ids.items()
if k != "attack_techniques"
}
attack_technique_entities = subentities.pop("attack_techniques", [])
attack_technique_ids = [
attack_technique.id
for attack_technique in attack_technique_entities
if getattr(attack_technique, "id", None)
]

if attack_technique_ids:
self.logger.info(
Expand All @@ -134,11 +133,7 @@ async def run(self, initial_state: dict[str, Any] | None) -> None:
},
)

subentities_detailed = (
await self.client_api.fetch_subentity_details(
filtered_subentities_ids
)
)
subentities_detailed = subentities

# Convert attack technique IDs to proper model format for conversion
if attack_technique_ids:
Expand Down
Loading
Loading