diff --git a/alert_system/admin.py b/alert_system/admin.py index 8e0608ea3..e5a3a2475 100644 --- a/alert_system/admin.py +++ b/alert_system/admin.py @@ -15,14 +15,10 @@ class EventAdmin(admin.ModelAdmin): "stac_id", "created_at", "collection", - "guid", ) list_filter = ("connector", "collection") readonly_fields = ("connector",) - search_fields = ( - "stac_id", - "correlation_id", - ) + search_fields = ("stac_id",) @admin.register(LoadItem) @@ -31,7 +27,7 @@ class LoadItemAdmin(admin.ModelAdmin): "id", "event_title", "created_at", - "guid", + "event_id", "item_eligible", "is_past_event", ) @@ -46,21 +42,18 @@ class LoadItemAdmin(admin.ModelAdmin): "related_montandon_events", "related_go_events", ) - search_fields = ( - "id", - "correlation_id", - ) + search_fields = ("id",) @admin.register(AlertEmailThread) class AlertEmailThreadAdmin(admin.ModelAdmin): list_display = ( "user", - "parent_guid", + "parent_event_id", "root_email_message_id", ) search_fields = ( - "parent_guid", + "parent_event_id", "root_email_message_id", "user__username", ) diff --git a/alert_system/email_processing.py b/alert_system/email_processing.py index 3ecff07b6..5e5e970f7 100644 --- a/alert_system/email_processing.py +++ b/alert_system/email_processing.py @@ -66,14 +66,15 @@ def send_alert_email_notification( if not is_reply: thread = AlertEmailThread.objects.create( user=user, - parent_guid=load_item.parent_guid, + parent_event_id=load_item.parent_event_id, root_email_message_id=message_id, root_message_sent_at=timezone.now(), ) email_log.thread = thread email_log.save(update_fields=["thread"]) logger.info( - f"Alert Email thread created for user [{user.get_full_name()}] " f"with parent_guid [{load_item.parent_guid}]" + f"Alert Email thread created for user [{user.get_full_name()}] " + f"with parent event [{load_item.parent_event_id}]" ) logger.info(f"Alert email sent to [{user.get_full_name()}] for LoadItem ID [{load_item.id}]") @@ -127,7 +128,7 @@ def process_email_alert(load_item_id: int) -> None: existing_threads = { thread.user_id: thread for thread in AlertEmailThread.objects.filter( - parent_guid=load_item.parent_guid, + parent_event_id=load_item.parent_event_id, user_id__in=user_ids, ) } diff --git a/alert_system/etl/base/extraction.py b/alert_system/etl/base/extraction.py index f7a1827ea..13efd26a6 100644 --- a/alert_system/etl/base/extraction.py +++ b/alert_system/etl/base/extraction.py @@ -1,13 +1,17 @@ import logging +import math from abc import ABC from datetime import timedelta -from typing import Dict, Generator, List, Optional, Type +from typing import Dict, List, Optional, Type -import httpx from django.db import transaction from django.utils import timezone -from alert_system.helpers import build_stac_search +from alert_system.helpers import ( + build_stac_search, + fetch_paginated_stac_data, + fetch_stac_data, +) from alert_system.models import Connector, ExtractionItem, ImpactDetailsEnum, LoadItem from .config import ExtractionConfig @@ -65,38 +69,10 @@ def _validate_required_attributes(self): if missing_attr: raise NotImplementedError(f"{self.__class__.__name__} must define: {', '.join(missing_attr)}") - @staticmethod - def fetch_stac_data(url: str, filters: Optional[Dict] = None, timeout: int | None = 60) -> Generator[Dict, None, None]: - """ - Fetch STAC data with pagination support. - - """ - current_url = url - current_payload = filters.copy() if filters else None - - while current_url: - response = httpx.get(current_url, params=current_payload, timeout=timeout) - response.raise_for_status() - data = response.json() - - yield from data.get("features", []) - - # Find next page link - current_url = next((link["href"] for link in data.get("links", []) if link.get("rel") == "next"), None) - current_payload = None # Only use params on first request - - def _get_correlation_id(self, feature: Dict) -> str: - return feature.get("properties", {}).get("monty:corr_id") - - def _get_guid(self, feature: Dict) -> str: - return feature.get("properties", {}).get("monty:guid") - - def _build_base_defaults(self, feature: Dict, run_id: str, collection_type: ExtractionItem.CollectionType) -> Dict: + def _build_base_defaults(self, item_dict: Dict, run_id: str, collection_type: ExtractionItem.CollectionType) -> Dict: """Build common default fields for all STAC items.""" return { - "guid": self._get_guid(feature=feature), - "correlation_id": self._get_correlation_id(feature=feature), - "resp_data": feature, + "resp_data": item_dict, "connector": self.connector, "extraction_run_id": run_id, "collection": collection_type, @@ -129,6 +105,9 @@ def _save_stac_item(self, stac_id: str, defaults: Dict) -> Optional[ExtractionIt logger.warning(f"Failed to save {stac_id}: {e}", exc_info=True) return None + def _extract_related_url(self, links, collection_type) -> list[str]: + return [link["href"] for link in links if collection_type in link.get("roles", [])] + # Extraction methods def _extract_impact_items(self, stac_obj: ExtractionItem, run_id: str) -> List[ExtractionItem]: """Process impact items related to a STAC event object.""" @@ -136,28 +115,32 @@ def _extract_impact_items(self, stac_obj: ExtractionItem, run_id: str) -> List[E logger.info("No impact endpoint defined.") return [] impact_objects: List[ExtractionItem] = [] - try: - impact_features = self.fetch_stac_data( - self.base_url, - build_stac_search( - collections=self.impact_collection_type, - guid=stac_obj.guid, - ), - ) - except Exception as e: - logger.warning(f"Failed to fetch impacts for event {stac_obj.stac_id}: {e}") - return [] + links = stac_obj.resp_data["links"] + impact_urls = self._extract_related_url(links=links, collection_type=ExtractionItem.CollectionType.IMPACT) - for feature in impact_features: - impact_id = feature.get("id", None) + for impact_url in impact_urls: + try: + impact_item = fetch_stac_data( + impact_url, + ) + except Exception as e: + logger.warning(f"Failed to fetch impacts for event {stac_obj.stac_id}: {e}") + return [] + + if not impact_item: + logger.info("No impact features found — skipping impact processing.") + continue + + impact_id = impact_item.get("id", None) if not impact_id: - logger.warning(f"Impact feature missing 'id': {feature}") + logger.warning(f"No impact id found for {impact_item}") continue - defaults = self._build_base_defaults(feature, run_id=run_id, collection_type=ExtractionItem.CollectionType.IMPACT) + defaults = self._build_base_defaults(impact_item, run_id=run_id, collection_type=ExtractionItem.CollectionType.IMPACT) impact_object = self._save_stac_item(impact_id, defaults) if impact_object: impact_objects.append(impact_object) + return impact_objects def _extract_hazard_items(self, stac_obj: ExtractionItem, run_id: str) -> ExtractionItem | None: @@ -165,98 +148,115 @@ def _extract_hazard_items(self, stac_obj: ExtractionItem, run_id: str) -> Extrac if not self.hazard_collection_type: logger.info("Source does not contain hazard.") return - + links = stac_obj.resp_data["links"] + hazard_url = self._extract_related_url(links=links, collection_type=ExtractionItem.CollectionType.HAZARD) + if len(hazard_url) > 1: + logger.info("Event item contains multiple hazards") + return try: - hazard_features = self.fetch_stac_data( - self.base_url, - build_stac_search( - collections=self.hazard_collection_type, - guid=stac_obj.guid, - ), + hazard_item = fetch_stac_data( + hazard_url[0], ) except Exception as e: logger.warning(f"Failed to fetch hazards for event {stac_obj.stac_id}: {e}") raise - - hazard_feature = next(hazard_features, None) - if not hazard_feature: + if not hazard_item: logger.info("No hazard features found — skipping hazard processing.") return - hazard_id = hazard_feature.get("id", None) + hazard_id = hazard_item.get("id", None) if not hazard_id: - logger.warning(f"No hazard id found for {hazard_feature}") + logger.warning(f"No hazard id found for {hazard_item}") return - defaults = self._build_base_defaults(hazard_feature, run_id=run_id, collection_type=ExtractionItem.CollectionType.HAZARD) + defaults = self._build_base_defaults(hazard_item, run_id=run_id, collection_type=ExtractionItem.CollectionType.HAZARD) hazard_obj = self._save_stac_item(hazard_id, defaults) return hazard_obj - def process_event_items(self, extraction_run_id: str, guid: str | None = None, is_past_event: bool = False) -> None: + # TODO: Add pydantic validators here. + def process_event_item(self, event_item: Dict, extraction_run_id: str, is_past_event: bool): + loader = self.loader_class() + event_id = event_item.get("id", None) + if not event_id: + logger.warning(f"No event id found for {event_item}") + return + defaults = self._build_base_defaults( + item_dict=event_item, run_id=extraction_run_id, collection_type=ExtractionItem.CollectionType.EVENT + ) + + try: + with transaction.atomic(): + event_obj = self._save_stac_item(event_id, defaults) + if not event_obj: + logger.info("No event item extracted") + return + hazard_obj = self._extract_hazard_items(event_obj, run_id=extraction_run_id) + impact_obj = self._extract_impact_items(event_obj, run_id=extraction_run_id) + + transformer = self.transformer_class( + event_obj=event_obj, + hazard_obj=hazard_obj, + impact_obj=impact_obj, + ) + transformed_data = transformer.transform_stac_item() + + load_obj = loader.load(transformed_data, self.connector, is_past_event=is_past_event, run_id=extraction_run_id) + + logger.info(f"Successfully processed event {event_id}") + + return load_obj + + except Exception as e: + logger.warning(f"Failed to process event {event_id}: {e}", exc_info=True) + raise + + def _extract_event_items(self, extraction_run_id: str, is_past_event: bool = False) -> None: """Process all event items from the connector source.""" filters = [] - if self.filter_event: - hazard_codes = self.filter_event.get("hazard_codes", []) - hazard_cql = " OR ".join(f"a_contains(monty:hazard_codes, '{hc}')" for hc in hazard_codes) - filters.append(f"({hazard_cql})") - loader = self.loader_class() try: - event_items = self.fetch_stac_data( + event_items = fetch_paginated_stac_data( self.base_url, build_stac_search( collections=self.event_collection_type, additional_filters=filters, - guid=guid, start_datetime=None if is_past_event else self.get_start_datetime(), end_datetime=None if is_past_event else f"{timezone.now().isoformat()}", + hazard_codes=self.filter_event.get("hazard_codes") if self.filter_event else None, ), ) except Exception as e: logger.warning(f"Failed to fetch events: {e}") raise - for feature in event_items: - event_id = feature.get("id", None) - if not event_id: - logger.warning(f"No event id found for {feature}") - continue - defaults = self._build_base_defaults( - feature=feature, run_id=extraction_run_id, collection_type=ExtractionItem.CollectionType.EVENT - ) - - try: - with transaction.atomic(): - event_obj = self._save_stac_item(event_id, defaults) - if not event_obj: - logger.info("No event item extracted") - continue - hazard_obj = self._extract_hazard_items(event_obj, run_id=extraction_run_id) - impact_obj = self._extract_impact_items(event_obj, run_id=extraction_run_id) - - transformer = self.transformer_class( - event_obj=event_obj, - hazard_obj=hazard_obj, - impact_obj=impact_obj, - ) - transformed_data = transformer.transform_stac_item() + first_event_item = next(event_items, None) + if not first_event_item: + msg = f"No event items found for extraction_run_id={extraction_run_id}" + logger.warning(msg) + raise ValueError(msg) - loader.load(transformed_data, self.connector, is_past_event=is_past_event, run_id=extraction_run_id) + for event_item in event_items: + self.process_event_item(event_item=event_item, extraction_run_id=extraction_run_id, is_past_event=is_past_event) - logger.info(f"Successfully processed event {event_id}") + def process_event_from_url(self, event_url: str, extraction_run_id: str, is_past_event: bool) -> LoadItem | None: + try: + event_item = fetch_stac_data(event_url) + if not event_item: + return - except Exception as e: - logger.warning(f"Failed to process event {event_id}: {e}", exc_info=True) - raise + return self.process_event_item(event_item, extraction_run_id, is_past_event) - def run(self, extraction_run_id: str, guid: str | None = None, is_past_event: bool = False) -> None: - """Main entry point for running the connector.""" - try: - self.process_event_items(extraction_run_id, guid, is_past_event) except Exception as e: - logger.warning(f"Connector run failed: {e}", exc_info=True) + logger.warning(f"Failed to process event from URL {event_url}: {e}", exc_info=True) raise + def run(self, extraction_run_id: str, url: str | None = None, is_past_event: bool = False) -> None: + """Main entry point for running the connector.""" + if url: + self.process_event_from_url(event_url=url, extraction_run_id=extraction_run_id, is_past_event=True) + else: + self._extract_event_items(extraction_run_id, is_past_event) + class PastEventExtractionClass: def __init__(self, extractor: BaseExtractionClass): @@ -267,15 +267,24 @@ def _impact_filter(self, impact_metadata: list[dict]) -> str: filters = [] for data in impact_metadata or []: + category = data.get("category") + impact_type = data.get("type") + value = data.get("value") + if ( - data.get("category") == ImpactDetailsEnum.Category.PEOPLE - and data.get("type") == ImpactDetailsEnum.Type.AFFECTED_TOTAL # TODO: Add other possible types here. - and data.get("value") is not None + category == ImpactDetailsEnum.Category.PEOPLE + and impact_type == ImpactDetailsEnum.Type.AFFECTED_TOTAL + and value is not None + and value > 0 ): + lower_bound = 10 ** (math.floor(math.log10(value)) - 1) + upper_bound = 10 ** (math.floor(math.log10(value)) + 1) + filters.append( - f"monty:impact_detail.category = '{data['category']}' AND " - f"monty:impact_detail.type = '{data['type']}' AND " - f"monty:impact_detail.value >= {data['value']}" + f"monty:impact_detail.category = '{category}' AND " + f"monty:impact_detail.type = '{impact_type}' AND " + f"monty:impact_detail.value >= {lower_bound} AND " + f"monty:impact_detail.value <= {upper_bound}" ) return " OR ".join(f"({filter})" for filter in filters) @@ -287,19 +296,10 @@ def _country_filter(self, country_codes) -> list[str]: filters.append(f"({country_cql})") return filters - def _collect_guids(self, features, exclude: str) -> set[str]: - guids = set() - for feature in features or []: - guid = self.extractor._get_guid(feature) - if guid and guid != exclude: - guids.add(guid) - return guids - - def find_related_guids(self, load_obj: LoadItem) -> set[str]: + def find_related_events(self, load_obj: LoadItem, extraction_run_id: str) -> set[LoadItem]: start_datetime = timezone.now() - timedelta(weeks=self.extractor.connector.lookback_weeks) end_datetime = timezone.now() - - guids = set() + events = set() if self.extractor.impact_collection_type: impact_filter = self._impact_filter(load_obj.impact_metadata) @@ -312,44 +312,64 @@ def find_related_guids(self, load_obj: LoadItem) -> set[str]: additional_filters.extend(country_filters) - features = self.extractor.fetch_stac_data( + past_impact_data = fetch_paginated_stac_data( self.base_url, build_stac_search( collections=self.extractor.impact_collection_type, additional_filters=additional_filters, start_datetime=f"{start_datetime.isoformat()}", end_datetime=f"{end_datetime.isoformat()}", + hazard_codes=self.extractor.filter_event.get("hazard_codes") if self.extractor.filter_event else None, ), ) - guids |= self._collect_guids(features, load_obj.guid) - return guids + existing_events = LoadItem.objects.all() + event_map = {e.event_url: e for e in existing_events} - def extract_past_events(self, load_obj: LoadItem) -> None: - guids = self.find_related_guids(load_obj) + for data in past_impact_data: + links = data.get("links") + event_url = self.extractor._extract_related_url(links=links, collection_type=ExtractionItem.CollectionType.EVENT) - if not guids: - return + if len(event_url) != 1: + raise ValueError(f"Expected 1 EVENT url, got: {event_url}") - existing_items = LoadItem.objects.filter(guid__in=guids) - existing_map = {i.guid: i for i in existing_items} + event_url = event_url[0] - related_ids = [] + event = event_map.get(event_url) - for guid in guids: - item = existing_map.get(guid) + if not event: + event = self.extractor.process_event_from_url( + event_url=event_url, extraction_run_id=extraction_run_id, is_past_event=True + ) + if event: + event_map[event_url] = event - if not item: - self.extractor.run( - extraction_run_id=load_obj.extraction_run_id, - guid=guid, - is_past_event=True, - ) - item = LoadItem.objects.filter(guid=guid).first() + events.add(event) + return events + + # Need to make changes here if event_id is implemented. + # Collect all the events, find the latest of all the event ids. + # Attach only the value of the latest episode in impact fields. + + def extract_past_events( + self, + load_obj: LoadItem, + extraction_run_id: str, + ) -> None: + + past_events = self.find_related_events( + load_obj=load_obj, + extraction_run_id=extraction_run_id, + ) + + valid_events = [event for event in past_events if event.event_id and event.event_id != load_obj.event_id] + + if not valid_events: + return + + related_ids = [event.id for event in valid_events] - if item: - related_ids.append(item.id) - item.related_montandon_events.add(load_obj.id) + for event in valid_events: + event.related_montandon_events.add(load_obj) - if related_ids: - load_obj.related_montandon_events.set(related_ids) + load_obj.related_montandon_events.set(related_ids) diff --git a/alert_system/etl/base/loader.py b/alert_system/etl/base/loader.py index 23998ae86..667904857 100644 --- a/alert_system/etl/base/loader.py +++ b/alert_system/etl/base/loader.py @@ -14,15 +14,9 @@ class BaseLoaderClass(ABC): def filter_eligible_items(self, load_obj): raise NotImplementedError() - def extract_parent_guid(self, guid: str) -> str: - parts = guid.split("-") - - BASE_PART_COUNT = 7 - - if len(parts) > BASE_PART_COUNT: - return "-".join(parts[:BASE_PART_COUNT]) - - return guid + def extract_event_id_without_episode(self, event_id: str) -> str: + parts = event_id.split("-") + return "-".join(parts[0 : len(parts) - 1]) def load(self, transformed_data: Dict, connector: Connector, run_id: str, is_past_event: bool = False) -> LoadItem: """ @@ -35,16 +29,15 @@ def load(self, transformed_data: Dict, connector: Connector, run_id: str, is_pas Returns: Created LoadItem object """ - guid = transformed_data["guid"] - parent_guid = self.extract_parent_guid(guid) + event_id = transformed_data["event_id"] + parent_event_id = self.extract_event_id_without_episode(event_id) is_item_eligible = self.filter_eligible_items(transformed_data) load_obj, created = LoadItem.objects.update_or_create( - guid=guid, + event_id=event_id, defaults={ "connector": connector, - "parent_guid": parent_guid, - "correlation_id": transformed_data.get("correlation_id"), + "parent_event_id": parent_event_id, "event_title": transformed_data.get("title"), "event_description": transformed_data.get("description"), "country_codes": transformed_data.get("country"), @@ -59,10 +52,12 @@ def load(self, transformed_data: Dict, connector: Connector, run_id: str, is_pas "item_eligible": is_item_eligible, "is_past_event": is_past_event, "extraction_run_id": run_id, + "episode_number": transformed_data.get("episode_number"), + "event_url": transformed_data.get("event_url"), }, ) action = "Created" if created else "Updated" - logger.info(f"{action} Event for {guid=}") + logger.info(f"{action} Event for {event_id=}") return load_obj diff --git a/alert_system/etl/base/transform.py b/alert_system/etl/base/transform.py index c5c457b9f..3612b3d70 100644 --- a/alert_system/etl/base/transform.py +++ b/alert_system/etl/base/transform.py @@ -26,6 +26,8 @@ class EventType(TypedDict): country: str start_datetime: datetime end_datetime: datetime + episode_number: int + event_url: str def __init__( self, event_obj: ExtractionItem, hazard_obj: Optional[ExtractionItem] = None, impact_obj: List[ExtractionItem] = [] @@ -33,8 +35,7 @@ def __init__( self.event_obj = event_obj self.hazard_obj = hazard_obj self.impact_obj = impact_obj - self.correlation_id = event_obj.correlation_id - self.guid = event_obj.guid + self.event_id = event_obj.stac_id @abstractmethod def process_hazard(self, hazard_item: ExtractionItem | None) -> HazardType: @@ -55,7 +56,7 @@ def transform_stac_item(self): Fetches event, hazard and impact items separately, processes them, and returns processed data if available. """ - logger.info(f"Starting transformer for correlation_id={self.correlation_id}") + logger.info(f"Starting transformer for event_id={self.event_id}") # Process event, hazard and impact. event_result = self.process_event(self.event_obj) @@ -63,8 +64,7 @@ def transform_stac_item(self): impact_result = self.process_impact(self.impact_obj) return { - "guid": self.guid, - "correlation_id": self.correlation_id, + "event_id": self.event_id, **event_result, **hazard_result, **impact_result, diff --git a/alert_system/etl/gdacs_cyclone/transform.py b/alert_system/etl/gdacs_cyclone/transform.py index a9478b9d2..103f55010 100644 --- a/alert_system/etl/gdacs_cyclone/transform.py +++ b/alert_system/etl/gdacs_cyclone/transform.py @@ -29,23 +29,42 @@ def compute_buildings_exposed(self, metadata_list) -> Optional[int]: return None def process_impact(self, impact_items) -> BaseTransformerClass.ImpactType: + latest_episode = -1 metadata = [] + for item in impact_items: properties = item.resp_data.get("properties", {}) impact_detail = properties.get("monty:impact_detail", {}) + + if properties.get("forecasted"): + continue + + episode_number = properties.get("monty:episode_number") + + if episode_number is None: + logger.info("No episode number found.") + continue + category = impact_detail.get("category") type_ = impact_detail.get("type") - value = impact_detail.get("value") - if category and type_: + + if not (category and type_): + logger.info("Skipping impact item: missing category or type ") + continue + + if episode_number > latest_episode: + latest_episode = episode_number + metadata = [ { "category": category, "type": type_, - "value": value, + "value": impact_detail.get("value"), "unit": impact_detail.get("unit", ""), "estimate_type": impact_detail.get("estimate_type", ""), } ] + return { "people_exposed": self.compute_people_exposed(metadata), "buildings_exposed": self.compute_buildings_exposed(metadata), @@ -71,10 +90,13 @@ def process_hazard(self, hazard_item) -> BaseTransformerClass.HazardType: def process_event(self, event_item) -> BaseTransformerClass.EventType: properties = event_item.resp_data.get("properties", {}) + urls = event_item.resp_data.get("links") return { "title": properties.get("title", ""), "description": properties.get("description", ""), "country": properties.get("monty:country_codes", ""), "start_datetime": properties.get("start_datetime"), "end_datetime": properties.get("end_datetime"), + "episode_number": properties.get("monty:episode_number"), + "event_url": next(item["href"] for item in urls if item["rel"] == "self"), } diff --git a/alert_system/etl/gdacs_flood/transform.py b/alert_system/etl/gdacs_flood/transform.py index 5a7be2df9..d125da3d0 100644 --- a/alert_system/etl/gdacs_flood/transform.py +++ b/alert_system/etl/gdacs_flood/transform.py @@ -82,10 +82,13 @@ def process_hazard(self, hazard_item) -> BaseTransformerClass.HazardType: def process_event(self, event_item) -> BaseTransformerClass.EventType: properties = event_item.resp_data.get("properties", {}) + urls = event_item.resp_data.get("links") return { "title": properties.get("title", ""), "description": properties.get("description", ""), "country": properties.get("monty:country_codes", ""), "start_datetime": properties.get("start_datetime"), "end_datetime": properties.get("end_datetime"), + "episode_number": properties.get("monty:episode_number"), + "event_url": next(item["href"] for item in urls if item["rel"] == "self"), } diff --git a/alert_system/etl/usgs_earthquake/transform.py b/alert_system/etl/usgs_earthquake/transform.py index a35d4f3d4..b7f2cd2dc 100644 --- a/alert_system/etl/usgs_earthquake/transform.py +++ b/alert_system/etl/usgs_earthquake/transform.py @@ -77,10 +77,13 @@ def process_hazard(self, hazard_item) -> BaseTransformerClass.HazardType: def process_event(self, event_item) -> BaseTransformerClass.EventType: properties = event_item.resp_data.get("properties", {}) + urls = event_item.resp_data.get("links") return { "title": properties.get("title", ""), "description": properties.get("description", ""), "country": properties.get("monty:country_codes", ""), - "start_datetime": properties.get("datetime"), + "start_datetime": properties.get("start_datetime"), "end_datetime": properties.get("end_datetime"), + "episode_number": properties.get("monty:episode_number"), + "event_url": next(item["href"] for item in urls if item["rel"] == "self"), } diff --git a/alert_system/factories.py b/alert_system/factories.py index 0bddd70df..0753ba5dd 100644 --- a/alert_system/factories.py +++ b/alert_system/factories.py @@ -6,7 +6,9 @@ class LoadItemFactory(factory.django.DjangoModelFactory): - guid = factory.LazyFunction(lambda: str(uuid4())) + parent_event_id = factory.LazyFunction(lambda: str(uuid4())) + event_id = factory.LazyFunction(lambda: str(uuid4())) + event_url = factory.Sequence(lambda n: f"https://test-events.com/event/{n}") class Meta: model = LoadItem diff --git a/alert_system/helpers.py b/alert_system/helpers.py index f873671c3..20c646dc9 100644 --- a/alert_system/helpers.py +++ b/alert_system/helpers.py @@ -1,4 +1,7 @@ # Helper functions to build search params. +from typing import Dict, Generator, Optional + +import httpx def build_search_params( @@ -26,22 +29,23 @@ def build_search_params( return params -def build_guid_filter(guid: str) -> str: - return f"monty:guid = '{guid}'" +def build_hazard_filter(hazard_codes: list) -> str: + hazard_cql = " OR ".join(f"a_contains(monty:hazard_codes, '{hc}')" for hc in hazard_codes) + return hazard_cql def build_stac_search( collections: str, - guid: str | None = None, additional_filters: list[str] | None = None, start_datetime: str | None = None, end_datetime: str | None = None, extra_params: dict | None = None, + hazard_codes: list | None = None, ) -> dict: filters = additional_filters.copy() if additional_filters else [] - if guid: - filters.append(build_guid_filter(guid)) + if hazard_codes: + filters.append(f"({build_hazard_filter(hazard_codes=hazard_codes)})") return build_search_params( collections=collections, @@ -50,3 +54,27 @@ def build_stac_search( start_datetime=start_datetime, end_datetime=end_datetime, ) + + +def fetch_stac_data(url: str, payload: dict | None = None, timeout: int | None = 60): + response = httpx.get(url=url, params=payload, timeout=timeout) + response.raise_for_status() + return response.json() + + +def fetch_paginated_stac_data(url: str, filters: Optional[Dict] = None, timeout: int | None = 60) -> Generator[Dict, None, None]: + """ + Fetch STAC data with pagination support. + + """ + current_url = url + current_payload = filters.copy() if filters else None + + while current_url: + data = fetch_stac_data(current_url, current_payload) + + yield from data.get("features", []) + + # Find next page link + current_url = next((link["href"] for link in data.get("links", []) if link.get("rel") == "next"), None) + current_payload = None # Only use params on first request diff --git a/alert_system/migrations/0001_initial.py b/alert_system/migrations/0001_initial.py index 5a3d4ae1e..9c1146108 100644 --- a/alert_system/migrations/0001_initial.py +++ b/alert_system/migrations/0001_initial.py @@ -1,4 +1,4 @@ -# Generated by Django 4.2.26 on 2026-02-09 08:34 +# Generated by Django 4.2.30 on 2026-05-18 09:45 from django.conf import settings import django.contrib.postgres.fields @@ -11,7 +11,7 @@ class Migration(migrations.Migration): initial = True dependencies = [ - ('api', '0227_alter_eventseveritylevelhistory_options'), + ('api', '0231_alter_export_export_type'), migrations.swappable_dependency(settings.AUTH_USER_MODEL), ('notifications', '0016_alertsubscription'), ] @@ -41,9 +41,8 @@ class Migration(migrations.Migration): ('id', models.BigAutoField(auto_created=True, primary_key=True, serialize=False, verbose_name='ID')), ('extraction_run_id', models.UUIDField(blank=True, db_index=True, help_text='UUID field for tracking the extraction run through ETL pipeline', null=True)), ('created_at', models.DateTimeField(auto_now_add=True, help_text='Timestamp when the record was created', verbose_name='Created At')), - ('correlation_id', models.CharField(help_text='Correlation identifier linking all models', max_length=255, verbose_name='Correlation ID')), - ('guid', models.CharField(help_text='Globally unique ID for events', verbose_name='GUID')), - ('parent_guid', models.CharField(help_text='GUID without the episode number.', verbose_name='Parent GUID')), + ('event_id', models.CharField(help_text='Event ID of the LoadItem.', unique=True, verbose_name='Event ID')), + ('parent_event_id', models.CharField(help_text='Event ID without the episode number.', verbose_name='Parent event ID')), ('event_title', models.CharField(help_text='Title of the event', max_length=255, verbose_name='Event Title')), ('event_description', models.TextField(help_text='Description of the event', verbose_name='Event Description')), ('start_datetime', models.DateTimeField(help_text='Start datetime of the event')), @@ -52,18 +51,20 @@ class Migration(migrations.Migration): ('severity_unit', models.CharField(blank=True, help_text='Unit of measurement for severity', max_length=100, null=True, verbose_name='Severity Unit')), ('severity_label', models.CharField(blank=True, help_text='Human-readable severity label (e.g., Red, Orange, Green)', max_length=255, null=True, verbose_name='Severity Label')), ('severity_value', models.IntegerField(blank=True, help_text='Human-readable severity value', null=True, verbose_name='Severity Value')), + ('episode_number', models.IntegerField(blank=True, help_text='Episode Id of the event', null=True, verbose_name='Episode ID')), ('total_people_exposed', models.IntegerField(help_text='Total number of people exposed to event', null=True, verbose_name='Total People Exposed')), ('total_buildings_exposed', models.IntegerField(help_text='Total number of buildings exposed to event', null=True, verbose_name='Total Buildings Exposed')), ('impact_metadata', models.JSONField(help_text='Metadata constructed from all events associated', verbose_name='Impact Metadata')), ('item_eligible', models.BooleanField(help_text='Is the item eligible for alerting')), ('is_past_event', models.BooleanField(default=False, help_text='Is the item in load table a past event')), + ('event_url', models.URLField(unique=True)), ('connector', models.ForeignKey(help_text='Data source connector', on_delete=django.db.models.deletion.CASCADE, related_name='%(class)s_items', to='alert_system.connector', verbose_name='Connector')), ('related_go_events', models.ManyToManyField(blank=True, to='api.event', verbose_name='Related GO Events')), ('related_montandon_events', models.ManyToManyField(blank=True, related_name='related_to', to='alert_system.loaditem')), ], options={ - 'verbose_name': 'Eligible Item', - 'verbose_name_plural': 'Eligible Items', + 'ordering': ['-created_at'], + 'abstract': False, }, ), migrations.CreateModel( @@ -72,9 +73,7 @@ class Migration(migrations.Migration): ('id', models.BigAutoField(auto_created=True, primary_key=True, serialize=False, verbose_name='ID')), ('extraction_run_id', models.UUIDField(blank=True, db_index=True, help_text='UUID field for tracking the extraction run through ETL pipeline', null=True)), ('created_at', models.DateTimeField(auto_now_add=True, help_text='Timestamp when the record was created', verbose_name='Created At')), - ('correlation_id', models.CharField(help_text='Correlation identifier linking all models', max_length=255, verbose_name='Correlation ID')), - ('guid', models.CharField(help_text='Globally unique ID for events', verbose_name='GUID')), - ('collection', models.IntegerField(choices=[(100, 'event'), (200, 'Hazard'), (300, 'Impacts')], help_text='Collection type of the item', verbose_name='Collection')), + ('collection', models.CharField(choices=[('event', 'Event'), ('hazard', 'Hazard'), ('impact', 'Impacts')], help_text='Collection type of the item', verbose_name='Collection')), ('stac_id', models.CharField(db_index=True, help_text='Unique identifier for the event item', max_length=255, unique=True, verbose_name='Event ID')), ('resp_data', models.JSONField(blank=True, help_text='Raw JSON response from the STAC API', null=True, verbose_name='Response Data')), ('connector', models.ForeignKey(help_text='Data source connector', on_delete=django.db.models.deletion.CASCADE, related_name='%(class)s_items', to='alert_system.connector', verbose_name='Connector')), @@ -118,10 +117,6 @@ class Migration(migrations.Migration): 'ordering': ['-id'], }, ), - migrations.AddConstraint( - model_name='loaditem', - constraint=models.UniqueConstraint(fields=('guid',), name='unique_guid'), - ), migrations.AddIndex( model_name='alertemailthread', index=models.Index(fields=['parent_guid', 'user'], name='alert_syste_parent__737a31_idx'), diff --git a/alert_system/migrations/0002_remove_alertemailthread_unique_user_guid_and_more.py b/alert_system/migrations/0002_remove_alertemailthread_unique_user_guid_and_more.py new file mode 100644 index 000000000..d7af4adfa --- /dev/null +++ b/alert_system/migrations/0002_remove_alertemailthread_unique_user_guid_and_more.py @@ -0,0 +1,34 @@ +# Generated by Django 4.2.30 on 2026-05-18 08:24 + +from django.db import migrations, models + + +class Migration(migrations.Migration): + + dependencies = [ + ('alert_system', '0001_initial'), + ] + + operations = [ + migrations.RemoveConstraint( + model_name='alertemailthread', + name='unique_user_guid', + ), + migrations.RemoveIndex( + model_name='alertemailthread', + name='alert_syste_parent__737a31_idx', + ), + migrations.RenameField( + model_name='alertemailthread', + old_name='parent_guid', + new_name='parent_event_id', + ), + migrations.AddIndex( + model_name='alertemailthread', + index=models.Index(fields=['parent_event_id', 'user'], name='alert_syste_parent__7efaa4_idx'), + ), + migrations.AddConstraint( + model_name='alertemailthread', + constraint=models.UniqueConstraint(fields=('parent_event_id', 'user'), name='unique_user_parent_event'), + ), + ] diff --git a/alert_system/models.py b/alert_system/models.py index 50c11c7bd..2dafa4eb8 100644 --- a/alert_system/models.py +++ b/alert_system/models.py @@ -133,17 +133,6 @@ class BaseItem(models.Model): auto_now_add=True, verbose_name=_("Created At"), help_text=_("Timestamp when the record was created") ) - correlation_id = models.CharField( - max_length=255, - verbose_name=_("Correlation ID"), - help_text=_("Correlation identifier linking all models"), - ) - - guid = models.CharField( - verbose_name=_("GUID"), - help_text=_("Globally unique ID for events"), - ) - id: int class Meta: @@ -156,12 +145,12 @@ class ExtractionItem(BaseItem): Model for Extraction items. """ - class CollectionType(models.IntegerChoices): - EVENT = 100, _("event") - HAZARD = 200, _("Hazard") - IMPACT = 300, _("Impacts") + class CollectionType(models.TextChoices): + EVENT = "event", _("Event") + HAZARD = "hazard", _("Hazard") + IMPACT = "impact", _("Impacts") - collection = models.IntegerField( + collection = models.CharField( choices=CollectionType.choices, verbose_name=_("Collection"), help_text=_("Collection type of the item"), @@ -188,9 +177,15 @@ class LoadItem(BaseItem): Model for Load items. """ - parent_guid = models.CharField( - verbose_name=_("Parent GUID"), - help_text=_("GUID without the episode number."), + event_id = models.CharField( + verbose_name=_("Event ID"), + help_text=_("Event ID of the LoadItem."), + unique=True, + ) + + parent_event_id = models.CharField( + verbose_name=_("Parent event ID"), + help_text=_("Event ID without the episode number."), ) # TODO: New id to be used in the future. @@ -231,6 +226,13 @@ class LoadItem(BaseItem): help_text=_("Human-readable severity value"), ) + episode_number = models.IntegerField( + null=True, + blank=True, + verbose_name=_("Episode ID"), + help_text=_("Episode Id of the event"), + ) + total_people_exposed = models.IntegerField( null=True, verbose_name=_("Total People Exposed"), @@ -262,16 +264,11 @@ class LoadItem(BaseItem): related_go_events = models.ManyToManyField(Event, verbose_name="Related GO Events", blank=True) + event_url = models.URLField(null=False, blank=False, unique=True) + def __str__(self): return self.event_title - class Meta: - verbose_name = _("Eligible Item") - verbose_name_plural = _("Eligible Items") - constraints = [ - models.UniqueConstraint(fields=["guid"], name="unique_guid") - ] # NOTE: GUID should be unique in the load table. - class AlertEmailThread(models.Model): """ @@ -283,8 +280,8 @@ class AlertEmailThread(models.Model): on_delete=models.CASCADE, related_name="alert_email_threads", ) - - parent_guid = models.CharField( + # NOTE: parent_event_id field is same field form the LoadItem model. + parent_event_id = models.CharField( help_text=_("Identifier linking related LoadItems into the same email thread."), ) @@ -307,13 +304,13 @@ class Meta: verbose_name = _("Email Thread") verbose_name_plural = _("Email Threads") ordering = ["-id"] - constraints = [models.UniqueConstraint(fields=["parent_guid", "user"], name="unique_user_guid")] + constraints = [models.UniqueConstraint(fields=["parent_event_id", "user"], name="unique_user_parent_event")] indexes = [ - models.Index(fields=["parent_guid", "user"]), + models.Index(fields=["parent_event_id", "user"]), ] def __str__(self): - return f"Thread: {self.user.get_full_name()}-{self.parent_guid}" + return f"Thread: {self.user.get_full_name()}-{self.parent_event_id}" class AlertEmailLog(models.Model): diff --git a/alert_system/tasks.py b/alert_system/tasks.py index c4646b318..7e313e5ea 100644 --- a/alert_system/tasks.py +++ b/alert_system/tasks.py @@ -5,7 +5,7 @@ from celery import chain, group, shared_task from celery.exceptions import MaxRetriesExceededError from django.db import transaction -from django.db.models import Max +from django.db.models import Max, OuterRef, Subquery from alert_system.etl.base.extraction import PastEventExtractionClass from api.models import Event @@ -42,7 +42,7 @@ def polling_task(self, connector_id): if self.request.retries >= self.max_retries: logger.error(f"[ETL] Max retries exceeded for connector {connector}", exc_info=True) connector.set_connector_status(Connector.Status.FAILED) - raise MaxRetriesExceededError(f"Task {self.request.id} exceeded max retries") + raise else: raise self.retry(exc=exc) @@ -56,19 +56,28 @@ def fetch_past_events_from_monty(self, extraction_run_id): if not extraction_run_id: logger.warning("No extraction_run_id provided, skipping past events fetch") return - + # NOTE: Group by event id latest, only run on those events try: eligible_items_qs = LoadItem.objects.filter( extraction_run_id=extraction_run_id, item_eligible=True, is_past_event=False, ) + max_episode_sq = ( + LoadItem.objects.filter(parent_event_id=OuterRef("parent_event_id")) + .values("parent_event_id") + .annotate(max_episode=Max("episode_number")) + .values("max_episode")[:1] + ) - count = eligible_items_qs.count() + # Step 2: filter to only rows that match that max episode_id + filtered_qs = eligible_items_qs.filter(episode_number=Subquery(max_episode_sq)) + + count = filtered_qs.count() logger.info(f"[Past Events] Processing {count} items from run {extraction_run_id}") - first_item = eligible_items_qs.first() + first_item = filtered_qs.first() if not first_item: logger.info("No item found to extract related montandon events.") return @@ -83,10 +92,10 @@ def fetch_past_events_from_monty(self, extraction_run_id): processed = 0 failed = 0 - for load_obj in eligible_items_qs.iterator(): + for load_obj in filtered_qs.iterator(): try: with transaction.atomic(): - past_event_extraction_service.extract_past_events(load_obj=load_obj) + past_event_extraction_service.extract_past_events(load_obj=load_obj, extraction_run_id=extraction_run_id) processed += 1 except Exception as e: failed += 1 @@ -173,3 +182,7 @@ def process_connector_task(connector_id): return chain( polling_task.s(connector_id), group(fetch_past_events_from_go.s(connector_id), fetch_past_events_from_monty.s()) ).apply_async() + + # return chain( + # polling_task.s(connector_id), group(fetch_past_events_from_go.s(connector_id)) + # ).apply_async() diff --git a/alert_system/tests.py b/alert_system/tests.py index 5077d8cf7..22873284a 100644 --- a/alert_system/tests.py +++ b/alert_system/tests.py @@ -60,7 +60,7 @@ def setUp(self): ) self.eligible_item = LoadItemFactory.create( - parent_guid=str(uuid4()), + parent_event_id=str(uuid4()), connector=self.connector, item_eligible=True, is_past_event=False, @@ -107,7 +107,7 @@ def test_sent_email_for_eligible_item(self, mock_send_notification): self.assertIsNotNone(log.email_sent_at) self.assertEqual(thread.user, self.user1) - self.assertEqual(thread.parent_guid, self.eligible_item.parent_guid) + self.assertEqual(thread.parent_event_id, self.eligible_item.parent_event_id) self.assertEqual(thread.root_email_message_id, log.message_id) self.assertEqual(log.thread, thread) @@ -121,7 +121,7 @@ def test_sent_email_to_multiple_users(self, mock_send_notification): logs = AlertEmailLog.objects.filter(item=self.eligible_item, status=AlertEmailLog.Status.SENT) self.assertEqual(logs.count(), 2) - threads = AlertEmailThread.objects.filter(parent_guid=self.eligible_item.parent_guid) + threads = AlertEmailThread.objects.filter(parent_event_id=self.eligible_item.parent_event_id) self.assertEqual(threads.count(), 2) self.assertEqual(mock_send_notification.call_count, 2) @@ -217,7 +217,7 @@ def test_reply_email_for_existing_thread(self, mock_send_notification): ) initial_item = LoadItemFactory.create( - parent_guid=str(uuid4()), + parent_event_id=str(uuid4()), connector=self.connector, item_eligible=True, is_past_event=False, @@ -231,7 +231,7 @@ def test_reply_email_for_existing_thread(self, mock_send_notification): thread = AlertEmailThreadFactory.create( user=user, - parent_guid=initial_item.parent_guid, + parent_event_id=initial_item.parent_event_id, root_email_message_id=str(uuid4()), root_message_sent_at=timezone.now(), ) @@ -247,7 +247,7 @@ def test_reply_email_for_existing_thread(self, mock_send_notification): ) update_item = LoadItemFactory.create( - parent_guid=initial_item.parent_guid, + parent_event_id=initial_item.parent_event_id, connector=self.connector, item_eligible=True, is_past_event=False, @@ -267,17 +267,17 @@ def test_reply_email_for_existing_thread(self, mock_send_notification): mock_send_notification.assert_called_once() - threads = AlertEmailThread.objects.filter(parent_guid=initial_item.parent_guid) + threads = AlertEmailThread.objects.filter(parent_event_id=initial_item.parent_event_id) self.assertEqual(threads.count(), 1) @mock.patch("alert_system.email_processing.send_notification") def test_reply_email_to_multiple_users(self, mock_send_notification): - parent_guid = str(uuid4()) + parent_event_id = str(uuid4()) # Create initial item initial_item = LoadItemFactory.create( - parent_guid=parent_guid, + parent_event_id=parent_event_id, connector=self.connector, item_eligible=True, is_past_event=False, @@ -292,14 +292,14 @@ def test_reply_email_to_multiple_users(self, mock_send_notification): # Create threads for both users thread1 = AlertEmailThreadFactory.create( user=self.user1, - parent_guid=parent_guid, + parent_event_id=parent_event_id, root_email_message_id="message-id-1", root_message_sent_at=timezone.now(), ) thread2 = AlertEmailThreadFactory.create( user=self.user2, - parent_guid=parent_guid, + parent_event_id=parent_event_id, root_email_message_id="message-id-2", root_message_sent_at=timezone.now(), ) @@ -325,7 +325,7 @@ def test_reply_email_to_multiple_users(self, mock_send_notification): ) related_item = LoadItemFactory.create( - parent_guid=parent_guid, + parent_event_id=parent_event_id, connector=self.connector, item_eligible=True, is_past_event=False, @@ -350,7 +350,7 @@ def test_reply_email_to_multiple_users(self, mock_send_notification): @mock.patch("alert_system.email_processing.send_notification") def test_duplicate_reply(self, mock_send_notification): - parent_guid = str(uuid4()) + parent_event_id = str(uuid4()) user = UserFactory.create() country = CountryFactory.create( @@ -366,7 +366,7 @@ def test_duplicate_reply(self, mock_send_notification): ) LoadItemFactory.create( - parent_guid=parent_guid, + parent_event_id=parent_event_id, connector=self.connector, item_eligible=True, is_past_event=False, @@ -380,13 +380,13 @@ def test_duplicate_reply(self, mock_send_notification): thread = AlertEmailThreadFactory.create( user=user, - parent_guid=parent_guid, + parent_event_id=parent_event_id, root_email_message_id="root-123", root_message_sent_at=timezone.now(), ) update_item = LoadItemFactory.create( - parent_guid=parent_guid, + parent_event_id=parent_event_id, connector=self.connector, item_eligible=True, is_past_event=False, diff --git a/alert_system/utils.py b/alert_system/utils.py index 7cdfeb2f9..ff3a25afb 100644 --- a/alert_system/utils.py +++ b/alert_system/utils.py @@ -11,12 +11,26 @@ logger = logging.getLogger(__name__) +def get_latest_episode_load_item(load_item: LoadItem) -> LoadItem: + """ + Given a load_item, return the sibling LoadItem with the highest + episode_number for the same parent_event_id. + Falls back to the original load_item if no siblings exist. + """ + latest = LoadItem.objects.filter(parent_event_id=load_item.parent_event_id).order_by("-episode_number").first() + return latest or load_item + + def get_alert_email_context(load_item: LoadItem, user: User): country_names = [] if load_item.country_codes: country_names = list(Country.objects.filter(iso3__in=load_item.country_codes).values_list("name", flat=True)) + + # Fetch related_montandon_events from the latest episode + latest_episode_item = get_latest_episode_load_item(load_item) + email_context = { "user_name": user.get_full_name(), "event_title": load_item.event_title, @@ -26,7 +40,7 @@ def get_alert_email_context(load_item: LoadItem, user: User): "total_buildings_exposed": load_item.total_buildings_exposed, "hazard_types": load_item.connector.dtype, "related_go_events": load_item.related_go_events.all(), - "related_montandon_events": load_item.related_montandon_events.filter(item_eligible=True).order_by( + "related_montandon_events": latest_episode_item.related_montandon_events.filter(item_eligible=True).order_by( "-total_people_exposed" ), "frontend_url": settings.GO_WEB_URL,