diff --git a/functions-python/helpers/query_helper.py b/functions-python/helpers/query_helper.py index c085c867f..b9ab32c2f 100644 --- a/functions-python/helpers/query_helper.py +++ b/functions-python/helpers/query_helper.py @@ -173,3 +173,28 @@ def get_datasets_with_missing_reports_query( Gtfsdataset.stable_id, Gtfsfeed.stable_id ) return query + + +def get_feeds_with_missing_bounding_boxes_query( + db_session: Session, +) -> Query: + """ + Get GTFS feeds with datasets missing bounding boxes. + + Args: + db_session: SQLAlchemy session + + Returns: + A SQLAlchemy query object for GTFS feeds with datasets missing bounding boxes + ordered by feed stable id. + """ + query = ( + db_session.query(Gtfsfeed) + .join(Gtfsdataset, Gtfsdataset.feed_id == Gtfsfeed.id) + .filter(Gtfsdataset.latest.is_(True)) + .filter(Gtfsdataset.bounding_box.is_(None)) + .filter(~Gtfsfeed.feedlocationgrouppoints.any()) + .distinct(Gtfsfeed.stable_id, Gtfsdataset.stable_id) + .order_by(Gtfsdataset.stable_id, Gtfsfeed.stable_id) + ) + return query diff --git a/functions-python/tasks_executor/README.md b/functions-python/tasks_executor/README.md index f360f0616..6f9654bac 100644 --- a/functions-python/tasks_executor/README.md +++ b/functions-python/tasks_executor/README.md @@ -3,7 +3,9 @@ This directory contains Google Cloud Functions used as a single point of access to multiple _tasks_. ## Usage + The function receive the following payload: + ``` { "task": "string", # [required] Name of the task to execute @@ -12,6 +14,7 @@ The function receive the following payload: ``` Example: + ```json { "task": "rebuild_missing_validation_reports", @@ -21,11 +24,22 @@ Example: "filter_statuses": ["active", "inactive", "future"] } } +{ + "task": "rebuild_missing_bounding_boxes", + "payload": { + "dry_run": true, + "after_date": "2025-06-01" + } +} ``` + To get the list of supported tasks use: `` { - "name": "list_tasks", - "payload": {} +"name": "list_tasks", +"payload": {} } -````` + +``` + +``` diff --git a/functions-python/tasks_executor/requirements.txt b/functions-python/tasks_executor/requirements.txt index 835b45c27..9822923f2 100644 --- a/functions-python/tasks_executor/requirements.txt +++ b/functions-python/tasks_executor/requirements.txt @@ -16,6 +16,7 @@ geoalchemy2==0.14.7 # Google specific packages for this function google-cloud-workflows +google-cloud-pubsub flask # Configuration diff --git a/functions-python/tasks_executor/src/main.py b/functions-python/tasks_executor/src/main.py index 3accd026f..694f2e27c 100644 --- a/functions-python/tasks_executor/src/main.py +++ b/functions-python/tasks_executor/src/main.py @@ -22,6 +22,9 @@ from tasks.validation_reports.rebuild_missing_validation_reports import ( rebuild_missing_validation_reports_handler, ) +from tasks.missing_bounding_boxes.rebuild_missing_bounding_boxes import ( + rebuild_missing_bounding_boxes_handler, +) init_logger() @@ -42,6 +45,10 @@ "description": "Rebuilds missing validation reports for GTFS datasets.", "handler": rebuild_missing_validation_reports_handler, }, + "rebuild_missing_bounding_boxes": { + "description": "Rebuilds missing bounding boxes for GTFS datasets that contain valid stops.txt files.", + "handler": rebuild_missing_bounding_boxes_handler, + }, } diff --git a/functions-python/tasks_executor/src/tasks/missing_bounding_boxes/rebuild_missing_bounding_boxes.py b/functions-python/tasks_executor/src/tasks/missing_bounding_boxes/rebuild_missing_bounding_boxes.py new file mode 100644 index 000000000..4ff0947d9 --- /dev/null +++ b/functions-python/tasks_executor/src/tasks/missing_bounding_boxes/rebuild_missing_bounding_boxes.py @@ -0,0 +1,133 @@ +import logging +import os +from typing import Dict, List +from sqlalchemy.orm import Session + +from shared.database.database import with_db_session +from shared.helpers.pub_sub import publish_messages +from shared.helpers.query_helper import ( + get_feeds_with_missing_bounding_boxes_query, +) +from shared.database_gen.sqlacodegen_models import Gtfsdataset +from datetime import datetime + + +def rebuild_missing_bounding_boxes_handler(payload) -> dict: + (dry_run, after_date) = get_parameters(payload) + + return rebuild_missing_bounding_boxes( + dry_run=dry_run, + after_date=after_date, + ) + + +@with_db_session +def rebuild_missing_bounding_boxes( + dry_run: bool = True, + after_date: str = None, + db_session: Session | None = None, +) -> dict: + """ + Find GTFS feeds/datasets missing bounding boxes and either log or publish them for processing. + + Args: + dry_run (bool): If True, only logs the number of feeds found (no publishing). + after_date (str, optional): ISO date string (YYYY-MM-DD). Only datasets downloaded after this date are included. + db_session (Session, optional): SQLAlchemy session, injected by @with_db_session. + + Returns: + dict: Summary message and count of processed feeds. + """ + filter_after = None + if after_date: + try: + filter_after = datetime.fromisoformat(after_date) + except Exception: + logging.warning( + "Invalid after_date format, expected ISO format (YYYY-MM-DD)" + ) + query = get_feeds_with_missing_bounding_boxes_query(db_session) + if filter_after: + query = query.filter(Gtfsdataset.downloaded_at >= filter_after) + feeds = query.all() + + if dry_run: + total_processed = len(feeds) + logging.info( + "Dry run mode: %s feeds with missing bounding boxes found, filtered after %s.", + total_processed, + after_date, + ) + return { + "message": f"Dry run: {total_processed} feeds with missing bounding boxes found." + + (f" Filtered after: {filter_after}" if filter_after else ""), + "total_processed": total_processed, + } + else: + # publish a message to a Pub/Sub topic for each feed + pubsub_topic_name = os.getenv("PUBSUB_TOPIC_NAME", None) + project_id = os.getenv("PROJECT_ID") + + logging.info("Publishing to topic: %s", pubsub_topic_name) + feeds_data = prepare_feeds_data(db_session) + publish_messages(feeds_data, project_id, pubsub_topic_name) + + total_processed = len(feeds_data) + logging.info( + "Published %s feeds with missing bounding boxes to Pub/Sub topic: %s, filtered after %s.", + total_processed, + pubsub_topic_name, + after_date, + ) + return { + "message": f"Successfully published {total_processed} feeds with missing bounding boxes." + + (f" Filtered after: {filter_after}" if filter_after else ""), + "total_processed": total_processed, + } + + +def prepare_feeds_data(db_session: Session | None = None) -> List[Dict]: + """ + Format feeds data for Pub/Sub messages. + + Args: + feeds: List of Gtfsfeed objects + + Returns: + List of dictionaries with feed data + """ + data = [] + query = get_feeds_with_missing_bounding_boxes_query(db_session) + feeds = query.all() + + for feed in feeds: + # Get the latest dataset + if feed.gtfsdatasets and any(dataset.latest for dataset in feed.gtfsdatasets): + latest_dataset = next( + dataset for dataset in feed.gtfsdatasets if dataset.latest + ) + + data.append( + { + "stable_id": feed.stable_id, + "dataset_id": latest_dataset.stable_id, + "url": latest_dataset.hosted_url, + } + ) + + return data + + +def get_parameters(payload): + """ + Get parameters from the payload and environment variables. + + Args: + payload (dict): dictionary containing the payload data. + Returns: + tuple: (dry_run, after_date) + """ + dry_run = payload.get("dry_run", True) + dry_run = dry_run if isinstance(dry_run, bool) else str(dry_run).lower() == "true" + after_date = payload.get("after_date", None) + return dry_run, after_date diff --git a/functions-python/tasks_executor/tests/tasks/bounding_boxes/test_rebuild_missing_bounding_boxes.py b/functions-python/tasks_executor/tests/tasks/bounding_boxes/test_rebuild_missing_bounding_boxes.py new file mode 100644 index 000000000..7a6cfa147 --- /dev/null +++ b/functions-python/tasks_executor/tests/tasks/bounding_boxes/test_rebuild_missing_bounding_boxes.py @@ -0,0 +1,112 @@ +import unittest +from unittest.mock import patch, MagicMock +from datetime import datetime + +from tasks.missing_bounding_boxes.rebuild_missing_bounding_boxes import ( + get_parameters, + rebuild_missing_bounding_boxes, +) + + +class TestTasksExecutor(unittest.TestCase): + def test_get_parameters(self): + payload = {"dry_run": True} + dry_run, after_date = get_parameters(payload) + self.assertTrue(dry_run) + self.assertIsNone(after_date) + + def test_get_parameters_with_valid_after_date(self): + payload = {"dry_run": False, "after_date": "2024-06-01"} + dry_run, after_date = get_parameters(payload) + self.assertFalse(dry_run) + self.assertEqual(after_date, "2024-06-01") + # Check ISO format + try: + datetime.fromisoformat(after_date) + except ValueError: + self.fail(f"after_date '{after_date}' is not a valid ISO date string") + + def test_get_parameters_with_string_bool(self): + payload = {"dry_run": "false", "after_date": None} + dry_run, after_date = get_parameters(payload) + self.assertFalse(dry_run) + self.assertIsNone(after_date) + + def test_get_parameters_missing_keys(self): + payload = {} + dry_run, after_date = get_parameters(payload) + self.assertTrue(dry_run) + self.assertIsNone(after_date) + + @patch( + "tasks.missing_bounding_boxes.rebuild_missing_bounding_boxes.get_feeds_with_missing_bounding_boxes_query" + ) + def test_rebuild_missing_bounding_boxes_dry_run(self, mock_query): + # Mock Gtfsdataset and Gtfsfeed objects + mock_dataset1 = MagicMock() + mock_dataset1.latest = True + mock_dataset1.stable_id = "dataset1" + mock_dataset1.hosted_url = "http://example.com/dataset1" + mock_feed1 = MagicMock() + mock_feed1.stable_id = "feed1" + mock_feed1.gtfsdatasets = [mock_dataset1] + + mock_dataset2 = MagicMock() + mock_dataset2.latest = True + mock_dataset2.stable_id = "dataset2" + mock_dataset2.hosted_url = "http://example.com/dataset2" + mock_feed2 = MagicMock() + mock_feed2.stable_id = "feed2" + mock_feed2.gtfsdatasets = [mock_dataset2] + + mock_query.return_value.filter.return_value = mock_query.return_value + mock_query.return_value.all.return_value = [mock_feed1, mock_feed2] + + result = rebuild_missing_bounding_boxes( + dry_run=True, after_date=None, db_session=MagicMock() + ) + self.assertIn("Dry run", result["message"]) + self.assertEqual(result["total_processed"], 2) + + @patch( + "tasks.missing_bounding_boxes.rebuild_missing_bounding_boxes.publish_messages" + ) + @patch( + "tasks.missing_bounding_boxes.rebuild_missing_bounding_boxes.get_feeds_with_missing_bounding_boxes_query" + ) + def test_rebuild_missing_bounding_boxes_publish(self, mock_query, mock_publish): + # Mock Gtfsdataset and Gtfsfeed objects + mock_dataset = MagicMock() + mock_dataset.latest = True + mock_dataset.stable_id = "dataset1" + mock_dataset.hosted_url = "http://example.com/dataset1" + mock_feed = MagicMock() + mock_feed.stable_id = "feed1" + mock_feed.gtfsdatasets = [mock_dataset] + + mock_query.return_value.filter.return_value = mock_query.return_value + mock_query.return_value.all.return_value = [mock_feed] + mock_publish.return_value = None + + result = rebuild_missing_bounding_boxes( + dry_run=False, after_date=None, db_session=MagicMock() + ) + self.assertIn("Successfully published", result["message"]) + self.assertEqual(result["total_processed"], 1) + + @patch( + "tasks.missing_bounding_boxes.rebuild_missing_bounding_boxes.get_feeds_with_missing_bounding_boxes_query" + ) + def test_rebuild_missing_bounding_boxes_invalid_after_date(self, mock_query): + mock_query.return_value.filter.return_value = mock_query.return_value + mock_query.return_value.all.return_value = [] + # Should log a warning and not raise + result = rebuild_missing_bounding_boxes( + dry_run=True, after_date="not-a-date", db_session=MagicMock() + ) + self.assertIn("Dry run", result["message"]) + self.assertEqual(result["total_processed"], 0) + + +if __name__ == "__main__": + unittest.main() diff --git a/infra/functions-python/main.tf b/infra/functions-python/main.tf index 12e3fc5c1..770546879 100644 --- a/infra/functions-python/main.tf +++ b/infra/functions-python/main.tf @@ -1221,6 +1221,7 @@ resource "google_cloudfunctions2_function" "tasks_executor" { environment_variables = { PROJECT_ID = var.project_id ENV = var.environment + PUBSUB_TOPIC_NAME = "rebuild-bounding-boxes-topic" } available_memory = local.function_tasks_executor_config.memory timeout_seconds = local.function_tasks_executor_config.timeout @@ -1245,6 +1246,18 @@ resource "google_cloudfunctions2_function" "tasks_executor" { } } +# Create the Pub/Sub topic used for publishing messages about rebuilding missing bounding boxes +resource "google_pubsub_topic" "rebuild_missing_bounding_boxes" { + name = "rebuild-bounding-boxes-topic" +} + +# Grant the Cloud Functions service account permission to publish messages to the rebuild-bounding-boxes-topic Pub/Sub topic +resource "google_pubsub_topic_iam_member" "rebuild_missing_bounding_boxes_publisher" { + topic = google_pubsub_topic.rebuild_missing_bounding_boxes.name + role = "roles/pubsub.publisher" + member = "serviceAccount:${google_service_account.functions_service_account.email}" +} + # IAM entry for all users to invoke the function resource "google_cloudfunctions2_function_iam_member" "tokens_invoker" { project = var.project_id