From e16a1b584e9f284aee89a92083bcc80d421b4dc7 Mon Sep 17 00:00:00 2001 From: Jingsi Lu Date: Mon, 16 Jun 2025 15:56:55 -0400 Subject: [PATCH 01/19] created a task as part of the tasks executor with dryrun parameter. --- functions-python/helpers/query_helper.py | 28 ++++++++++ functions-python/tasks_executor/src/main.py | 4 ++ .../rebuild_missing_bounding_boxes.py | 52 +++++++++++++++++++ 3 files changed, 84 insertions(+) create mode 100644 functions-python/tasks_executor/src/tasks/missing_bounding_boxes/rebuild_missing_bounding_boxes.py diff --git a/functions-python/helpers/query_helper.py b/functions-python/helpers/query_helper.py index c085c867f..9782efd84 100644 --- a/functions-python/helpers/query_helper.py +++ b/functions-python/helpers/query_helper.py @@ -173,3 +173,31 @@ def get_datasets_with_missing_reports_query( Gtfsdataset.stable_id, Gtfsfeed.stable_id ) return query + + +def get_feeds_with_missing_bounding_boxes_query( + db_session: Session, +) -> Query: + """ + Get GTFS feeds and datasets where the dataset is missing a bounding box. + + Args: + db_session: SQLAlchemy session + + Returns: + A SQLAlchemy query object for GTFS feeds with datasets missing bounding boxes + ordered by dataset and feed stable id. + """ + query = ( + db_session.query( + Gtfsfeed.stable_id, + Gtfsdataset.stable_id, + ) + .select_from(Gtfsfeed) + .join(Gtfsdataset, Gtfsdataset.feed_id == Gtfsfeed.id) + .filter(Gtfsdataset.bounding_box.is_(None)) + .distinct(Gtfsfeed.stable_id, Gtfsdataset.stable_id) + .order_by(Gtfsdataset.stable_id, Gtfsfeed.stable_id) + ) + + return query diff --git a/functions-python/tasks_executor/src/main.py b/functions-python/tasks_executor/src/main.py index 3accd026f..7a066ce5c 100644 --- a/functions-python/tasks_executor/src/main.py +++ b/functions-python/tasks_executor/src/main.py @@ -42,6 +42,10 @@ "description": "Rebuilds missing validation reports for GTFS datasets.", "handler": rebuild_missing_validation_reports_handler, }, + "identify_missing_bounding_boxes": { + "description": "Rebuilds missing bounding boxes for GTFS datasets that contain valid stops.txt files.", + "handler": identify_missing_bounding_boxes_handler, + }, } diff --git a/functions-python/tasks_executor/src/tasks/missing_bounding_boxes/rebuild_missing_bounding_boxes.py b/functions-python/tasks_executor/src/tasks/missing_bounding_boxes/rebuild_missing_bounding_boxes.py new file mode 100644 index 000000000..23bf527c5 --- /dev/null +++ b/functions-python/tasks_executor/src/tasks/missing_bounding_boxes/rebuild_missing_bounding_boxes.py @@ -0,0 +1,52 @@ +import os +from sqlalchemy.orm import Session + +from shared.database.database import with_db_session +from shared.helpers.query_helper import get_feeds_with_missing_bounding_boxes_query +from shared.database_gen.sqlacodegen_models import Gtfsfeed, Gtfsdataset + + +def rebuild_missing_bounding_boxes_handler(payload) -> dict: + ( + dry_run, + prod_env, + ) = get_parameters(payload) + + return rebuild_missing_bounding_boxes( + dry_run=dry_run, + prod_env=prod_env, + ) + + +@with_db_session +def rebuild_missing_bounding_boxes( + dry_run: bool = True, + prod_env: bool = False, + db_session: Session | None = None, +) -> dict: + query = get_feeds_with_missing_bounding_boxes_query(db_session) + feeds = query.all() + + if dry_run: + total_processed = len(feeds) + return { + "message": f"Dry run: {total_processed} feeds with missing bounding boxes found.", + "total_processed": total_processed, + } + else: + + + +def get_parameters(payload): + """ + Get parameters from the payload and environment variables. + + Args: + payload (dict): dictionary containing the payload data. + Returns: + dict: dict with: dry_run, prod_env parameters + """ + prod_env = os.getenv("ENV", "").lower() == "prod" + dry_run = payload.get("dry_run", True) + dry_run = dry_run if isinstance(dry_run, bool) else str(dry_run).lower() == "true" + return dry_run, prod_env From fac49fae860446bf96eeea4721444c2cb54dcd08 Mon Sep 17 00:00:00 2001 From: Jingsi Lu Date: Tue, 17 Jun 2025 16:17:30 -0400 Subject: [PATCH 02/19] sent a message to the 'reverse-geolocation' queue --- .../src/reverse_geolocation_batch.py | 1 + functions-python/tasks_executor/src/main.py | 5 +- .../rebuild_missing_bounding_boxes.py | 79 ++++++++++++++++++- .../test_rebuild_missing_bounding_boxes.py | 25 ++++++ 4 files changed, 107 insertions(+), 3 deletions(-) create mode 100644 functions-python/tasks_executor/tests/tasks/bounding_boxes/test_rebuild_missing_bounding_boxes.py diff --git a/functions-python/reverse_geolocation/src/reverse_geolocation_batch.py b/functions-python/reverse_geolocation/src/reverse_geolocation_batch.py index 2cf864633..abb116e0a 100644 --- a/functions-python/reverse_geolocation/src/reverse_geolocation_batch.py +++ b/functions-python/reverse_geolocation/src/reverse_geolocation_batch.py @@ -72,6 +72,7 @@ def parse_request_parameters(request: flask.Request) -> Tuple[List[str], bool]: return country_codes, include_only_unprocessed +# mimic reverse_geolocation_batch function, format the data for the pub/sub message, add a different topic name def reverse_geolocation_batch(request: flask.Request) -> Tuple[str, int]: """Batch function to trigger reverse geolocation for feeds.""" try: diff --git a/functions-python/tasks_executor/src/main.py b/functions-python/tasks_executor/src/main.py index 7a066ce5c..31209c926 100644 --- a/functions-python/tasks_executor/src/main.py +++ b/functions-python/tasks_executor/src/main.py @@ -22,6 +22,9 @@ from tasks.validation_reports.rebuild_missing_validation_reports import ( rebuild_missing_validation_reports_handler, ) +from tasks.missing_bounding_boxes.rebuild_missing_bounding_boxes import ( + rebuild_missing_bounding_boxes_handler, +) init_logger() @@ -44,7 +47,7 @@ }, "identify_missing_bounding_boxes": { "description": "Rebuilds missing bounding boxes for GTFS datasets that contain valid stops.txt files.", - "handler": identify_missing_bounding_boxes_handler, + "handler": rebuild_missing_bounding_boxes_handler, }, } diff --git a/functions-python/tasks_executor/src/tasks/missing_bounding_boxes/rebuild_missing_bounding_boxes.py b/functions-python/tasks_executor/src/tasks/missing_bounding_boxes/rebuild_missing_bounding_boxes.py index 23bf527c5..61d944287 100644 --- a/functions-python/tasks_executor/src/tasks/missing_bounding_boxes/rebuild_missing_bounding_boxes.py +++ b/functions-python/tasks_executor/src/tasks/missing_bounding_boxes/rebuild_missing_bounding_boxes.py @@ -1,9 +1,12 @@ +import logging import os +from typing import Dict, List from sqlalchemy.orm import Session from shared.database.database import with_db_session +from shared.helpers.pub_sub import publish_messages from shared.helpers.query_helper import get_feeds_with_missing_bounding_boxes_query -from shared.database_gen.sqlacodegen_models import Gtfsfeed, Gtfsdataset +from shared.database_gen.sqlacodegen_models import Gtfsfeed def rebuild_missing_bounding_boxes_handler(payload) -> dict: @@ -25,6 +28,8 @@ def rebuild_missing_bounding_boxes( db_session: Session | None = None, ) -> dict: query = get_feeds_with_missing_bounding_boxes_query(db_session) + logging.info("Filtering for unprocessed feeds.") + query = query.filter(~Gtfsfeed.feedlocationgrouppoints.any()) feeds = query.all() if dry_run: @@ -34,7 +39,77 @@ def rebuild_missing_bounding_boxes( "total_processed": total_processed, } else: - + # publish a message to a Pub/Sub topic for each feed + pubsub_topic_name = os.getenv("PUBSUB_TOPIC_NAME", None) # todo: set new name + project_id = os.getenv("PROJECT_ID") + + logging.info("Publishing to topic: %s", pubsub_topic_name) + publish_messages(prepare_feeds_data(feeds), project_id, pubsub_topic_name) + + total_processed = len(feeds) + return { + "message": f"Successfully published {total_processed} feeds with missing bounding boxes.", + "total_processed": total_processed, + } + + +@with_db_session +def extract_country_codes_from_feeds( + feeds: List[Gtfsfeed], db_session: Session = None +) -> List[str]: + """ + Extract unique country codes from a list of feeds. + + Args: + feeds: List of Gtfsfeed objects + db_session: SQLAlchemy database session + + Returns: + List of unique country codes + """ + country_codes = set() + + for feed in feeds: + # Check if locations are already loaded, if not load them + if not feed.locations: + db_session.refresh(feed, ["locations"]) + + # Extract country codes from the feed's locations + for location in feed.locations: + if location.country_code: + country_codes.add(location.country_code) + + return list(country_codes) + + +def prepare_feeds_data(feeds: List[Gtfsfeed]) -> List[Dict]: + """ + Format feeds data for Pub/Sub messages. + + Args: + feeds: List of Gtfsfeed objects + + Returns: + List of dictionaries with feed data + """ + data = [] + + for feed in feeds: + # Get the latest dataset + if feed.gtfsdatasets and any(dataset.latest for dataset in feed.gtfsdatasets): + latest_dataset = next( + dataset for dataset in feed.gtfsdatasets if dataset.latest + ) + + data.append( + { + "stable_id": feed.stable_id, + "dataset_id": latest_dataset.stable_id, + "url": latest_dataset.hosted_url, + } + ) + + return data def get_parameters(payload): diff --git a/functions-python/tasks_executor/tests/tasks/bounding_boxes/test_rebuild_missing_bounding_boxes.py b/functions-python/tasks_executor/tests/tasks/bounding_boxes/test_rebuild_missing_bounding_boxes.py new file mode 100644 index 000000000..40f22f6c8 --- /dev/null +++ b/functions-python/tasks_executor/tests/tasks/bounding_boxes/test_rebuild_missing_bounding_boxes.py @@ -0,0 +1,25 @@ +import unittest + +from pytest import Session +from shared.database.database import with_db_session +from tasks.bounding_boxes.test_rebuild_missing_bounding_boxes import ( + get_parameters, +) + + +class TestTasksExecutor(unittest.TestCase): + def test_get_parameters(self): + """ + Test the get_parameters function to ensure it correctly extracts parameters from the payload. + """ + payload = { + "dry_run": True, + } + + ( + dry_run, + prod_env, + ) = get_parameters(payload) + + self.assertTrue(dry_run) + self.assertFalse(prod_env) From 597a0134b83a6f409359c717c456c953b272a395 Mon Sep 17 00:00:00 2001 From: Jingsi Lu Date: Wed, 18 Jun 2025 11:05:08 -0400 Subject: [PATCH 03/19] added filter after param --- functions-python/helpers/query_helper.py | 3 + .../src/reverse_geolocation_batch.py | 1 - .../rebuild_missing_bounding_boxes.py | 85 +++++++++---------- 3 files changed, 45 insertions(+), 44 deletions(-) diff --git a/functions-python/helpers/query_helper.py b/functions-python/helpers/query_helper.py index 9782efd84..175697d14 100644 --- a/functions-python/helpers/query_helper.py +++ b/functions-python/helpers/query_helper.py @@ -196,6 +196,9 @@ def get_feeds_with_missing_bounding_boxes_query( .select_from(Gtfsfeed) .join(Gtfsdataset, Gtfsdataset.feed_id == Gtfsfeed.id) .filter(Gtfsdataset.bounding_box.is_(None)) + .filter( + ~Gtfsfeed.feedlocationgrouppoints.any() + ) # Only feeds with no location group points .distinct(Gtfsfeed.stable_id, Gtfsdataset.stable_id) .order_by(Gtfsdataset.stable_id, Gtfsfeed.stable_id) ) diff --git a/functions-python/reverse_geolocation/src/reverse_geolocation_batch.py b/functions-python/reverse_geolocation/src/reverse_geolocation_batch.py index abb116e0a..2cf864633 100644 --- a/functions-python/reverse_geolocation/src/reverse_geolocation_batch.py +++ b/functions-python/reverse_geolocation/src/reverse_geolocation_batch.py @@ -72,7 +72,6 @@ def parse_request_parameters(request: flask.Request) -> Tuple[List[str], bool]: return country_codes, include_only_unprocessed -# mimic reverse_geolocation_batch function, format the data for the pub/sub message, add a different topic name def reverse_geolocation_batch(request: flask.Request) -> Tuple[str, int]: """Batch function to trigger reverse geolocation for feeds.""" try: diff --git a/functions-python/tasks_executor/src/tasks/missing_bounding_boxes/rebuild_missing_bounding_boxes.py b/functions-python/tasks_executor/src/tasks/missing_bounding_boxes/rebuild_missing_bounding_boxes.py index 61d944287..d639fcd88 100644 --- a/functions-python/tasks_executor/src/tasks/missing_bounding_boxes/rebuild_missing_bounding_boxes.py +++ b/functions-python/tasks_executor/src/tasks/missing_bounding_boxes/rebuild_missing_bounding_boxes.py @@ -6,36 +6,57 @@ from shared.database.database import with_db_session from shared.helpers.pub_sub import publish_messages from shared.helpers.query_helper import get_feeds_with_missing_bounding_boxes_query -from shared.database_gen.sqlacodegen_models import Gtfsfeed +from shared.database_gen.sqlacodegen_models import Gtfsfeed, Gtfsdataset def rebuild_missing_bounding_boxes_handler(payload) -> dict: - ( - dry_run, - prod_env, - ) = get_parameters(payload) + (dry_run,) = get_parameters(payload) return rebuild_missing_bounding_boxes( dry_run=dry_run, - prod_env=prod_env, ) @with_db_session def rebuild_missing_bounding_boxes( dry_run: bool = True, - prod_env: bool = False, + after_date: str = None, db_session: Session | None = None, ) -> dict: + """ + Find GTFS feeds/datasets missing bounding boxes and either log or publish them for processing. + + Args: + dry_run (bool): If True, only logs the number of feeds found (no publishing). + after_date (str, optional): ISO date string (YYYY-MM-DD). Only datasets downloaded after this date are included. + db_session (Session, optional): SQLAlchemy session, injected by @with_db_session. + + Returns: + dict: Summary message and count of processed feeds. + """ + filter_after = None + if after_date: + try: + filter_after = datetime.fromisoformat(after_date) + except Exception: + logging.warning( + "Invalid after_date format, expected ISO format (YYYY-MM-DD)" + ) query = get_feeds_with_missing_bounding_boxes_query(db_session) - logging.info("Filtering for unprocessed feeds.") - query = query.filter(~Gtfsfeed.feedlocationgrouppoints.any()) + if filter_after: + query = query.filter(Gtfsdataset.downloaded_at >= filter_after) feeds = query.all() if dry_run: total_processed = len(feeds) + logging.info( + "Dry run mode: %s feeds with missing bounding boxes found, filtered after %s.", + total_processed, + after_date, + ) return { - "message": f"Dry run: {total_processed} feeds with missing bounding boxes found.", + "message": f"Dry run: {total_processed} feeds with missing bounding boxes found." + + (f" Filtered after: {filter_after}" if filter_after else ""), "total_processed": total_processed, } else: @@ -47,41 +68,19 @@ def rebuild_missing_bounding_boxes( publish_messages(prepare_feeds_data(feeds), project_id, pubsub_topic_name) total_processed = len(feeds) + logging.info( + "Published %s feeds with missing bounding boxes to Pub/Sub topic: %s, filtered after %s.", + total_processed, + pubsub_topic_name, + after_date, + ) return { - "message": f"Successfully published {total_processed} feeds with missing bounding boxes.", + "message": f"Successfully published {total_processed} feeds with missing bounding boxes." + + (f" Filtered after: {filter_after}" if filter_after else ""), "total_processed": total_processed, } -@with_db_session -def extract_country_codes_from_feeds( - feeds: List[Gtfsfeed], db_session: Session = None -) -> List[str]: - """ - Extract unique country codes from a list of feeds. - - Args: - feeds: List of Gtfsfeed objects - db_session: SQLAlchemy database session - - Returns: - List of unique country codes - """ - country_codes = set() - - for feed in feeds: - # Check if locations are already loaded, if not load them - if not feed.locations: - db_session.refresh(feed, ["locations"]) - - # Extract country codes from the feed's locations - for location in feed.locations: - if location.country_code: - country_codes.add(location.country_code) - - return list(country_codes) - - def prepare_feeds_data(feeds: List[Gtfsfeed]) -> List[Dict]: """ Format feeds data for Pub/Sub messages. @@ -119,9 +118,9 @@ def get_parameters(payload): Args: payload (dict): dictionary containing the payload data. Returns: - dict: dict with: dry_run, prod_env parameters + tuple: (dry_run, after_date) """ - prod_env = os.getenv("ENV", "").lower() == "prod" dry_run = payload.get("dry_run", True) dry_run = dry_run if isinstance(dry_run, bool) else str(dry_run).lower() == "true" - return dry_run, prod_env + after_date = payload.get("after_date", None) + return dry_run, after_date From 30c4e190e77d58a4871344e5eec2a6113a3cfde6 Mon Sep 17 00:00:00 2001 From: Jingsi Lu Date: Wed, 18 Jun 2025 11:21:40 -0400 Subject: [PATCH 04/19] fixed lint errors --- .../missing_bounding_boxes/rebuild_missing_bounding_boxes.py | 1 + .../tasks/bounding_boxes/test_rebuild_missing_bounding_boxes.py | 2 -- 2 files changed, 1 insertion(+), 2 deletions(-) diff --git a/functions-python/tasks_executor/src/tasks/missing_bounding_boxes/rebuild_missing_bounding_boxes.py b/functions-python/tasks_executor/src/tasks/missing_bounding_boxes/rebuild_missing_bounding_boxes.py index d639fcd88..4e47611f8 100644 --- a/functions-python/tasks_executor/src/tasks/missing_bounding_boxes/rebuild_missing_bounding_boxes.py +++ b/functions-python/tasks_executor/src/tasks/missing_bounding_boxes/rebuild_missing_bounding_boxes.py @@ -7,6 +7,7 @@ from shared.helpers.pub_sub import publish_messages from shared.helpers.query_helper import get_feeds_with_missing_bounding_boxes_query from shared.database_gen.sqlacodegen_models import Gtfsfeed, Gtfsdataset +from datetime import datetime def rebuild_missing_bounding_boxes_handler(payload) -> dict: diff --git a/functions-python/tasks_executor/tests/tasks/bounding_boxes/test_rebuild_missing_bounding_boxes.py b/functions-python/tasks_executor/tests/tasks/bounding_boxes/test_rebuild_missing_bounding_boxes.py index 40f22f6c8..559aca479 100644 --- a/functions-python/tasks_executor/tests/tasks/bounding_boxes/test_rebuild_missing_bounding_boxes.py +++ b/functions-python/tasks_executor/tests/tasks/bounding_boxes/test_rebuild_missing_bounding_boxes.py @@ -1,7 +1,5 @@ import unittest -from pytest import Session -from shared.database.database import with_db_session from tasks.bounding_boxes.test_rebuild_missing_bounding_boxes import ( get_parameters, ) From 8346479387069033031bf680d5e2fb6e727362f6 Mon Sep 17 00:00:00 2001 From: Jingsi Lu Date: Thu, 19 Jun 2025 10:24:22 -0400 Subject: [PATCH 05/19] added test for after_date param --- .../test_rebuild_missing_bounding_boxes.py | 30 +++++++++++++------ 1 file changed, 21 insertions(+), 9 deletions(-) diff --git a/functions-python/tasks_executor/tests/tasks/bounding_boxes/test_rebuild_missing_bounding_boxes.py b/functions-python/tasks_executor/tests/tasks/bounding_boxes/test_rebuild_missing_bounding_boxes.py index 559aca479..3bbaf753e 100644 --- a/functions-python/tasks_executor/tests/tasks/bounding_boxes/test_rebuild_missing_bounding_boxes.py +++ b/functions-python/tasks_executor/tests/tasks/bounding_boxes/test_rebuild_missing_bounding_boxes.py @@ -1,8 +1,7 @@ import unittest +from datetime import datetime -from tasks.bounding_boxes.test_rebuild_missing_bounding_boxes import ( - get_parameters, -) +from tasks.missing_bounding_boxes.rebuild_missing_bounding_boxes import get_parameters class TestTasksExecutor(unittest.TestCase): @@ -14,10 +13,23 @@ def test_get_parameters(self): "dry_run": True, } - ( - dry_run, - prod_env, - ) = get_parameters(payload) - + dry_run, after_date = get_parameters(payload) self.assertTrue(dry_run) - self.assertFalse(prod_env) + self.assertIsNone(after_date) + + def test_get_parameters_with_valid_after_date(self): + """ + Test get_parameters returns a valid ISO date string for after_date. + """ + payload = { + "dry_run": False, + "after_date": "2024-06-01", + } + + dry_run, after_date = get_parameters(payload) + self.assertFalse(dry_run) + # Check that after_date is a valid ISO date string + try: + datetime.fromisoformat(after_date) + except ValueError: + self.fail(f"after_date '{after_date}' is not a valid ISO date string") From c901cdddd3780f1370711fe8b54a238de0b319b0 Mon Sep 17 00:00:00 2001 From: Jingsi Lu Date: Thu, 19 Jun 2025 10:49:30 -0400 Subject: [PATCH 06/19] added google-cloud-pubsub in requirements.txt --- functions-python/tasks_executor/requirements.txt | 1 + 1 file changed, 1 insertion(+) diff --git a/functions-python/tasks_executor/requirements.txt b/functions-python/tasks_executor/requirements.txt index 835b45c27..9822923f2 100644 --- a/functions-python/tasks_executor/requirements.txt +++ b/functions-python/tasks_executor/requirements.txt @@ -16,6 +16,7 @@ geoalchemy2==0.14.7 # Google specific packages for this function google-cloud-workflows +google-cloud-pubsub flask # Configuration From 586f682711b7ed51d96f9b6874b8866fd7ef05ae Mon Sep 17 00:00:00 2001 From: Jingsi Lu Date: Thu, 19 Jun 2025 11:28:16 -0400 Subject: [PATCH 07/19] added tests to increase branch coverage --- .../test_rebuild_missing_bounding_boxes.py | 87 +++++++++++++++---- 1 file changed, 70 insertions(+), 17 deletions(-) diff --git a/functions-python/tasks_executor/tests/tasks/bounding_boxes/test_rebuild_missing_bounding_boxes.py b/functions-python/tasks_executor/tests/tasks/bounding_boxes/test_rebuild_missing_bounding_boxes.py index 3bbaf753e..9788d2fdc 100644 --- a/functions-python/tasks_executor/tests/tasks/bounding_boxes/test_rebuild_missing_bounding_boxes.py +++ b/functions-python/tasks_executor/tests/tasks/bounding_boxes/test_rebuild_missing_bounding_boxes.py @@ -1,35 +1,88 @@ import unittest +from unittest.mock import patch, MagicMock from datetime import datetime -from tasks.missing_bounding_boxes.rebuild_missing_bounding_boxes import get_parameters +from tasks.missing_bounding_boxes.rebuild_missing_bounding_boxes import ( + get_parameters, + rebuild_missing_bounding_boxes, +) class TestTasksExecutor(unittest.TestCase): def test_get_parameters(self): - """ - Test the get_parameters function to ensure it correctly extracts parameters from the payload. - """ - payload = { - "dry_run": True, - } - + payload = {"dry_run": True} dry_run, after_date = get_parameters(payload) self.assertTrue(dry_run) self.assertIsNone(after_date) def test_get_parameters_with_valid_after_date(self): - """ - Test get_parameters returns a valid ISO date string for after_date. - """ - payload = { - "dry_run": False, - "after_date": "2024-06-01", - } - + payload = {"dry_run": False, "after_date": "2024-06-01"} dry_run, after_date = get_parameters(payload) self.assertFalse(dry_run) - # Check that after_date is a valid ISO date string + self.assertEqual(after_date, "2024-06-01") + # Check ISO format try: datetime.fromisoformat(after_date) except ValueError: self.fail(f"after_date '{after_date}' is not a valid ISO date string") + + def test_get_parameters_with_string_bool(self): + payload = {"dry_run": "false", "after_date": None} + dry_run, after_date = get_parameters(payload) + self.assertFalse(dry_run) + self.assertIsNone(after_date) + + def test_get_parameters_missing_keys(self): + payload = {} + dry_run, after_date = get_parameters(payload) + self.assertTrue(dry_run) + self.assertIsNone(after_date) + + @patch( + "tasks.missing_bounding_boxes.rebuild_missing_bounding_boxes.get_feeds_with_missing_bounding_boxes_query" + ) + def test_rebuild_missing_bounding_boxes_dry_run(self, mock_query): + # Mock the query and its .all() method + mock_query.return_value.filter.return_value = mock_query.return_value + mock_query.return_value.all.return_value = [ + ("feed1", "dataset1"), + ("feed2", "dataset2"), + ] + result = rebuild_missing_bounding_boxes( + dry_run=True, after_date=None, db_session=MagicMock() + ) + self.assertIn("Dry run", result["message"]) + self.assertEqual(result["total_processed"], 2) + + @patch( + "tasks.missing_bounding_boxes.rebuild_missing_bounding_boxes.publish_messages" + ) + @patch( + "tasks.missing_bounding_boxes.rebuild_missing_bounding_boxes.get_feeds_with_missing_bounding_boxes_query" + ) + def test_rebuild_missing_bounding_boxes_publish(self, mock_query, mock_publish): + mock_query.return_value.filter.return_value = mock_query.return_value + mock_query.return_value.all.return_value = [("feed1", "dataset1")] + mock_publish.return_value = None + result = rebuild_missing_bounding_boxes( + dry_run=False, after_date=None, db_session=MagicMock() + ) + self.assertIn("Successfully published", result["message"]) + self.assertEqual(result["total_processed"], 1) + + @patch( + "tasks.missing_bounding_boxes.rebuild_missing_bounding_boxes.get_feeds_with_missing_bounding_boxes_query" + ) + def test_rebuild_missing_bounding_boxes_invalid_after_date(self, mock_query): + mock_query.return_value.filter.return_value = mock_query.return_value + mock_query.return_value.all.return_value = [] + # Should log a warning and not raise + result = rebuild_missing_bounding_boxes( + dry_run=True, after_date="not-a-date", db_session=MagicMock() + ) + self.assertIn("Dry run", result["message"]) + self.assertEqual(result["total_processed"], 0) + + +if __name__ == "__main__": + unittest.main() From d16b020212d77669999d1c61aa43bd76fe8c4cd6 Mon Sep 17 00:00:00 2001 From: Jingsi Lu Date: Thu, 19 Jun 2025 12:31:13 -0400 Subject: [PATCH 08/19] fixed broken tests --- .../test_rebuild_missing_bounding_boxes.py | 12 +++++++++++- 1 file changed, 11 insertions(+), 1 deletion(-) diff --git a/functions-python/tasks_executor/tests/tasks/bounding_boxes/test_rebuild_missing_bounding_boxes.py b/functions-python/tasks_executor/tests/tasks/bounding_boxes/test_rebuild_missing_bounding_boxes.py index 9788d2fdc..3aaa30b1d 100644 --- a/functions-python/tasks_executor/tests/tasks/bounding_boxes/test_rebuild_missing_bounding_boxes.py +++ b/functions-python/tasks_executor/tests/tasks/bounding_boxes/test_rebuild_missing_bounding_boxes.py @@ -61,9 +61,19 @@ def test_rebuild_missing_bounding_boxes_dry_run(self, mock_query): "tasks.missing_bounding_boxes.rebuild_missing_bounding_boxes.get_feeds_with_missing_bounding_boxes_query" ) def test_rebuild_missing_bounding_boxes_publish(self, mock_query, mock_publish): + # Mock Gtfsdataset and Gtfsfeed objects + mock_dataset = MagicMock() + mock_dataset.latest = True + mock_dataset.stable_id = "dataset1" + mock_dataset.hosted_url = "http://example.com/dataset1" + mock_feed = MagicMock() + mock_feed.stable_id = "feed1" + mock_feed.gtfsdatasets = [mock_dataset] + mock_query.return_value.filter.return_value = mock_query.return_value - mock_query.return_value.all.return_value = [("feed1", "dataset1")] + mock_query.return_value.all.return_value = [mock_feed] mock_publish.return_value = None + result = rebuild_missing_bounding_boxes( dry_run=False, after_date=None, db_session=MagicMock() ) From b8e321cf524811789d258007cd697492c90134af Mon Sep 17 00:00:00 2001 From: Jingsi Lu Date: Thu, 19 Jun 2025 14:01:25 -0400 Subject: [PATCH 09/19] added missing params --- .../missing_bounding_boxes/rebuild_missing_bounding_boxes.py | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/functions-python/tasks_executor/src/tasks/missing_bounding_boxes/rebuild_missing_bounding_boxes.py b/functions-python/tasks_executor/src/tasks/missing_bounding_boxes/rebuild_missing_bounding_boxes.py index 4e47611f8..cdde9e450 100644 --- a/functions-python/tasks_executor/src/tasks/missing_bounding_boxes/rebuild_missing_bounding_boxes.py +++ b/functions-python/tasks_executor/src/tasks/missing_bounding_boxes/rebuild_missing_bounding_boxes.py @@ -11,10 +11,11 @@ def rebuild_missing_bounding_boxes_handler(payload) -> dict: - (dry_run,) = get_parameters(payload) + (dry_run, after_date) = get_parameters(payload) return rebuild_missing_bounding_boxes( dry_run=dry_run, + after_date=after_date, ) From c33565bb0e008e1bcb59747cbcba0c706dd12eae Mon Sep 17 00:00:00 2001 From: Jingsi Lu Date: Thu, 19 Jun 2025 16:24:34 -0400 Subject: [PATCH 10/19] renamed task registry --- functions-python/tasks_executor/src/main.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/functions-python/tasks_executor/src/main.py b/functions-python/tasks_executor/src/main.py index 31209c926..694f2e27c 100644 --- a/functions-python/tasks_executor/src/main.py +++ b/functions-python/tasks_executor/src/main.py @@ -45,7 +45,7 @@ "description": "Rebuilds missing validation reports for GTFS datasets.", "handler": rebuild_missing_validation_reports_handler, }, - "identify_missing_bounding_boxes": { + "rebuild_missing_bounding_boxes": { "description": "Rebuilds missing bounding boxes for GTFS datasets that contain valid stops.txt files.", "handler": rebuild_missing_bounding_boxes_handler, }, From a3a03ad6a49618a86e066382b3b4e9549e07a47c Mon Sep 17 00:00:00 2001 From: Jingsi Lu Date: Fri, 20 Jun 2025 10:24:14 -0400 Subject: [PATCH 11/19] slightly improved performance by adjusting SQL queries --- functions-python/helpers/query_helper.py | 28 +++++++++++++++++-- .../rebuild_missing_bounding_boxes.py | 17 +++++++---- 2 files changed, 36 insertions(+), 9 deletions(-) diff --git a/functions-python/helpers/query_helper.py b/functions-python/helpers/query_helper.py index 175697d14..eb6a6e9c6 100644 --- a/functions-python/helpers/query_helper.py +++ b/functions-python/helpers/query_helper.py @@ -175,11 +175,11 @@ def get_datasets_with_missing_reports_query( return query -def get_feeds_with_missing_bounding_boxes_query( +def get_feeds_ids_with_missing_bounding_boxes_query( db_session: Session, ) -> Query: """ - Get GTFS feeds and datasets where the dataset is missing a bounding box. + Get GTFS feeds ids where the dataset is missing a bounding box. Args: db_session: SQLAlchemy session @@ -200,7 +200,29 @@ def get_feeds_with_missing_bounding_boxes_query( ~Gtfsfeed.feedlocationgrouppoints.any() ) # Only feeds with no location group points .distinct(Gtfsfeed.stable_id, Gtfsdataset.stable_id) - .order_by(Gtfsdataset.stable_id, Gtfsfeed.stable_id) ) return query + + +def get_feeds_with_missing_bounding_boxes_query( + db_session: Session, +) -> Query: + """ + Get GTFS feeds with datasets missing bounding boxes. + + Args: + db_session: SQLAlchemy session + + Returns: + A SQLAlchemy query object for GTFS feeds with datasets missing bounding boxes + ordered by feed stable id. + """ + query = ( + db_session.query(Gtfsfeed) + .join(Gtfsdataset, Gtfsdataset.feed_id == Gtfsfeed.id) + .filter(Gtfsdataset.bounding_box.is_(None)) + .filter(~Gtfsfeed.feedlocationgrouppoints.any()) + .distinct(Gtfsfeed.stable_id, Gtfsdataset.stable_id) + ) + return query diff --git a/functions-python/tasks_executor/src/tasks/missing_bounding_boxes/rebuild_missing_bounding_boxes.py b/functions-python/tasks_executor/src/tasks/missing_bounding_boxes/rebuild_missing_bounding_boxes.py index cdde9e450..b2db3a067 100644 --- a/functions-python/tasks_executor/src/tasks/missing_bounding_boxes/rebuild_missing_bounding_boxes.py +++ b/functions-python/tasks_executor/src/tasks/missing_bounding_boxes/rebuild_missing_bounding_boxes.py @@ -5,8 +5,11 @@ from shared.database.database import with_db_session from shared.helpers.pub_sub import publish_messages -from shared.helpers.query_helper import get_feeds_with_missing_bounding_boxes_query -from shared.database_gen.sqlacodegen_models import Gtfsfeed, Gtfsdataset +from shared.helpers.query_helper import ( + get_feeds_ids_with_missing_bounding_boxes_query, + get_feeds_with_missing_bounding_boxes_query, +) +from shared.database_gen.sqlacodegen_models import Gtfsdataset from datetime import datetime @@ -44,7 +47,7 @@ def rebuild_missing_bounding_boxes( logging.warning( "Invalid after_date format, expected ISO format (YYYY-MM-DD)" ) - query = get_feeds_with_missing_bounding_boxes_query(db_session) + query = get_feeds_ids_with_missing_bounding_boxes_query(db_session) if filter_after: query = query.filter(Gtfsdataset.downloaded_at >= filter_after) feeds = query.all() @@ -63,11 +66,11 @@ def rebuild_missing_bounding_boxes( } else: # publish a message to a Pub/Sub topic for each feed - pubsub_topic_name = os.getenv("PUBSUB_TOPIC_NAME", None) # todo: set new name + pubsub_topic_name = os.getenv("PUBSUB_TOPIC_NAME", None) project_id = os.getenv("PROJECT_ID") logging.info("Publishing to topic: %s", pubsub_topic_name) - publish_messages(prepare_feeds_data(feeds), project_id, pubsub_topic_name) + publish_messages(prepare_feeds_data(db_session), project_id, pubsub_topic_name) total_processed = len(feeds) logging.info( @@ -83,7 +86,7 @@ def rebuild_missing_bounding_boxes( } -def prepare_feeds_data(feeds: List[Gtfsfeed]) -> List[Dict]: +def prepare_feeds_data(db_session: Session | None = None) -> List[Dict]: """ Format feeds data for Pub/Sub messages. @@ -94,6 +97,8 @@ def prepare_feeds_data(feeds: List[Gtfsfeed]) -> List[Dict]: List of dictionaries with feed data """ data = [] + query = get_feeds_with_missing_bounding_boxes_query(db_session) + feeds = query.all() for feed in feeds: # Get the latest dataset From 441484766393c8129c314043f00fbf8566caa65f Mon Sep 17 00:00:00 2001 From: Jingsi Lu Date: Fri, 20 Jun 2025 11:03:44 -0400 Subject: [PATCH 12/19] reverted SQL queries --- functions-python/helpers/query_helper.py | 2 ++ functions-python/tasks_executor/README.md | 20 +++++++++++++++++--- 2 files changed, 19 insertions(+), 3 deletions(-) diff --git a/functions-python/helpers/query_helper.py b/functions-python/helpers/query_helper.py index eb6a6e9c6..b73bca31d 100644 --- a/functions-python/helpers/query_helper.py +++ b/functions-python/helpers/query_helper.py @@ -200,6 +200,7 @@ def get_feeds_ids_with_missing_bounding_boxes_query( ~Gtfsfeed.feedlocationgrouppoints.any() ) # Only feeds with no location group points .distinct(Gtfsfeed.stable_id, Gtfsdataset.stable_id) + .order_by(Gtfsdataset.stable_id, Gtfsfeed.stable_id) ) return query @@ -224,5 +225,6 @@ def get_feeds_with_missing_bounding_boxes_query( .filter(Gtfsdataset.bounding_box.is_(None)) .filter(~Gtfsfeed.feedlocationgrouppoints.any()) .distinct(Gtfsfeed.stable_id, Gtfsdataset.stable_id) + .order_by(Gtfsdataset.stable_id, Gtfsfeed.stable_id) ) return query diff --git a/functions-python/tasks_executor/README.md b/functions-python/tasks_executor/README.md index f360f0616..6f9654bac 100644 --- a/functions-python/tasks_executor/README.md +++ b/functions-python/tasks_executor/README.md @@ -3,7 +3,9 @@ This directory contains Google Cloud Functions used as a single point of access to multiple _tasks_. ## Usage + The function receive the following payload: + ``` { "task": "string", # [required] Name of the task to execute @@ -12,6 +14,7 @@ The function receive the following payload: ``` Example: + ```json { "task": "rebuild_missing_validation_reports", @@ -21,11 +24,22 @@ Example: "filter_statuses": ["active", "inactive", "future"] } } +{ + "task": "rebuild_missing_bounding_boxes", + "payload": { + "dry_run": true, + "after_date": "2025-06-01" + } +} ``` + To get the list of supported tasks use: `` { - "name": "list_tasks", - "payload": {} +"name": "list_tasks", +"payload": {} } -````` + +``` + +``` From 17f9e7b109b92f90b296d309e291e302c4b0a019 Mon Sep 17 00:00:00 2001 From: Jingsi Lu Date: Fri, 20 Jun 2025 11:21:21 -0400 Subject: [PATCH 13/19] fixed broken tests --- .../test_rebuild_missing_bounding_boxes.py | 86 +++++++++++-------- 1 file changed, 50 insertions(+), 36 deletions(-) diff --git a/functions-python/tasks_executor/tests/tasks/bounding_boxes/test_rebuild_missing_bounding_boxes.py b/functions-python/tasks_executor/tests/tasks/bounding_boxes/test_rebuild_missing_bounding_boxes.py index 3aaa30b1d..77e19376b 100644 --- a/functions-python/tasks_executor/tests/tasks/bounding_boxes/test_rebuild_missing_bounding_boxes.py +++ b/functions-python/tasks_executor/tests/tasks/bounding_boxes/test_rebuild_missing_bounding_boxes.py @@ -39,46 +39,60 @@ def test_get_parameters_missing_keys(self): self.assertIsNone(after_date) @patch( - "tasks.missing_bounding_boxes.rebuild_missing_bounding_boxes.get_feeds_with_missing_bounding_boxes_query" - ) - def test_rebuild_missing_bounding_boxes_dry_run(self, mock_query): - # Mock the query and its .all() method - mock_query.return_value.filter.return_value = mock_query.return_value - mock_query.return_value.all.return_value = [ - ("feed1", "dataset1"), - ("feed2", "dataset2"), - ] - result = rebuild_missing_bounding_boxes( - dry_run=True, after_date=None, db_session=MagicMock() - ) - self.assertIn("Dry run", result["message"]) - self.assertEqual(result["total_processed"], 2) + "tasks.missing_bounding_boxes.rebuild_missing_bounding_boxes.get_feeds_with_missing_bounding_boxes_query" +) +def test_rebuild_missing_bounding_boxes_dry_run(self, mock_query): + # Mock Gtfsdataset and Gtfsfeed objects + mock_dataset1 = MagicMock() + mock_dataset1.latest = True + mock_dataset1.stable_id = "dataset1" + mock_dataset1.hosted_url = "http://example.com/dataset1" + mock_feed1 = MagicMock() + mock_feed1.stable_id = "feed1" + mock_feed1.gtfsdatasets = [mock_dataset1] - @patch( - "tasks.missing_bounding_boxes.rebuild_missing_bounding_boxes.publish_messages" - ) - @patch( - "tasks.missing_bounding_boxes.rebuild_missing_bounding_boxes.get_feeds_with_missing_bounding_boxes_query" + mock_dataset2 = MagicMock() + mock_dataset2.latest = True + mock_dataset2.stable_id = "dataset2" + mock_dataset2.hosted_url = "http://example.com/dataset2" + mock_feed2 = MagicMock() + mock_feed2.stable_id = "feed2" + mock_feed2.gtfsdatasets = [mock_dataset2] + + mock_query.return_value.filter.return_value = mock_query.return_value + mock_query.return_value.all.return_value = [mock_feed1, mock_feed2] + + result = rebuild_missing_bounding_boxes( + dry_run=True, after_date=None, db_session=MagicMock() ) - def test_rebuild_missing_bounding_boxes_publish(self, mock_query, mock_publish): - # Mock Gtfsdataset and Gtfsfeed objects - mock_dataset = MagicMock() - mock_dataset.latest = True - mock_dataset.stable_id = "dataset1" - mock_dataset.hosted_url = "http://example.com/dataset1" - mock_feed = MagicMock() - mock_feed.stable_id = "feed1" - mock_feed.gtfsdatasets = [mock_dataset] + self.assertIn("Dry run", result["message"]) + self.assertEqual(result["total_processed"], 2) - mock_query.return_value.filter.return_value = mock_query.return_value - mock_query.return_value.all.return_value = [mock_feed] - mock_publish.return_value = None +@patch( + "tasks.missing_bounding_boxes.rebuild_missing_bounding_boxes.publish_messages" +) +@patch( + "tasks.missing_bounding_boxes.rebuild_missing_bounding_boxes.get_feeds_with_missing_bounding_boxes_query" +) +def test_rebuild_missing_bounding_boxes_publish(self, mock_query, mock_publish): + # Mock Gtfsdataset and Gtfsfeed objects + mock_dataset = MagicMock() + mock_dataset.latest = True + mock_dataset.stable_id = "dataset1" + mock_dataset.hosted_url = "http://example.com/dataset1" + mock_feed = MagicMock() + mock_feed.stable_id = "feed1" + mock_feed.gtfsdatasets = [mock_dataset] - result = rebuild_missing_bounding_boxes( - dry_run=False, after_date=None, db_session=MagicMock() - ) - self.assertIn("Successfully published", result["message"]) - self.assertEqual(result["total_processed"], 1) + mock_query.return_value.filter.return_value = mock_query.return_value + mock_query.return_value.all.return_value = [mock_feed] + mock_publish.return_value = None + + result = rebuild_missing_bounding_boxes( + dry_run=False, after_date=None, db_session=MagicMock() + ) + self.assertIn("Successfully published", result["message"]) + self.assertEqual(result["total_processed"], 1) @patch( "tasks.missing_bounding_boxes.rebuild_missing_bounding_boxes.get_feeds_with_missing_bounding_boxes_query" From c1fa64cba6cd9d3c5950af7e9a08de75129982e9 Mon Sep 17 00:00:00 2001 From: Jingsi Lu Date: Fri, 20 Jun 2025 11:24:32 -0400 Subject: [PATCH 14/19] fixed lint errors --- .../test_rebuild_missing_bounding_boxes.py | 102 +++++++++--------- 1 file changed, 51 insertions(+), 51 deletions(-) diff --git a/functions-python/tasks_executor/tests/tasks/bounding_boxes/test_rebuild_missing_bounding_boxes.py b/functions-python/tasks_executor/tests/tasks/bounding_boxes/test_rebuild_missing_bounding_boxes.py index 77e19376b..7a6cfa147 100644 --- a/functions-python/tasks_executor/tests/tasks/bounding_boxes/test_rebuild_missing_bounding_boxes.py +++ b/functions-python/tasks_executor/tests/tasks/bounding_boxes/test_rebuild_missing_bounding_boxes.py @@ -39,60 +39,60 @@ def test_get_parameters_missing_keys(self): self.assertIsNone(after_date) @patch( - "tasks.missing_bounding_boxes.rebuild_missing_bounding_boxes.get_feeds_with_missing_bounding_boxes_query" -) -def test_rebuild_missing_bounding_boxes_dry_run(self, mock_query): - # Mock Gtfsdataset and Gtfsfeed objects - mock_dataset1 = MagicMock() - mock_dataset1.latest = True - mock_dataset1.stable_id = "dataset1" - mock_dataset1.hosted_url = "http://example.com/dataset1" - mock_feed1 = MagicMock() - mock_feed1.stable_id = "feed1" - mock_feed1.gtfsdatasets = [mock_dataset1] - - mock_dataset2 = MagicMock() - mock_dataset2.latest = True - mock_dataset2.stable_id = "dataset2" - mock_dataset2.hosted_url = "http://example.com/dataset2" - mock_feed2 = MagicMock() - mock_feed2.stable_id = "feed2" - mock_feed2.gtfsdatasets = [mock_dataset2] - - mock_query.return_value.filter.return_value = mock_query.return_value - mock_query.return_value.all.return_value = [mock_feed1, mock_feed2] - - result = rebuild_missing_bounding_boxes( - dry_run=True, after_date=None, db_session=MagicMock() + "tasks.missing_bounding_boxes.rebuild_missing_bounding_boxes.get_feeds_with_missing_bounding_boxes_query" ) - self.assertIn("Dry run", result["message"]) - self.assertEqual(result["total_processed"], 2) + def test_rebuild_missing_bounding_boxes_dry_run(self, mock_query): + # Mock Gtfsdataset and Gtfsfeed objects + mock_dataset1 = MagicMock() + mock_dataset1.latest = True + mock_dataset1.stable_id = "dataset1" + mock_dataset1.hosted_url = "http://example.com/dataset1" + mock_feed1 = MagicMock() + mock_feed1.stable_id = "feed1" + mock_feed1.gtfsdatasets = [mock_dataset1] + + mock_dataset2 = MagicMock() + mock_dataset2.latest = True + mock_dataset2.stable_id = "dataset2" + mock_dataset2.hosted_url = "http://example.com/dataset2" + mock_feed2 = MagicMock() + mock_feed2.stable_id = "feed2" + mock_feed2.gtfsdatasets = [mock_dataset2] -@patch( - "tasks.missing_bounding_boxes.rebuild_missing_bounding_boxes.publish_messages" -) -@patch( - "tasks.missing_bounding_boxes.rebuild_missing_bounding_boxes.get_feeds_with_missing_bounding_boxes_query" -) -def test_rebuild_missing_bounding_boxes_publish(self, mock_query, mock_publish): - # Mock Gtfsdataset and Gtfsfeed objects - mock_dataset = MagicMock() - mock_dataset.latest = True - mock_dataset.stable_id = "dataset1" - mock_dataset.hosted_url = "http://example.com/dataset1" - mock_feed = MagicMock() - mock_feed.stable_id = "feed1" - mock_feed.gtfsdatasets = [mock_dataset] - - mock_query.return_value.filter.return_value = mock_query.return_value - mock_query.return_value.all.return_value = [mock_feed] - mock_publish.return_value = None - - result = rebuild_missing_bounding_boxes( - dry_run=False, after_date=None, db_session=MagicMock() + mock_query.return_value.filter.return_value = mock_query.return_value + mock_query.return_value.all.return_value = [mock_feed1, mock_feed2] + + result = rebuild_missing_bounding_boxes( + dry_run=True, after_date=None, db_session=MagicMock() + ) + self.assertIn("Dry run", result["message"]) + self.assertEqual(result["total_processed"], 2) + + @patch( + "tasks.missing_bounding_boxes.rebuild_missing_bounding_boxes.publish_messages" ) - self.assertIn("Successfully published", result["message"]) - self.assertEqual(result["total_processed"], 1) + @patch( + "tasks.missing_bounding_boxes.rebuild_missing_bounding_boxes.get_feeds_with_missing_bounding_boxes_query" + ) + def test_rebuild_missing_bounding_boxes_publish(self, mock_query, mock_publish): + # Mock Gtfsdataset and Gtfsfeed objects + mock_dataset = MagicMock() + mock_dataset.latest = True + mock_dataset.stable_id = "dataset1" + mock_dataset.hosted_url = "http://example.com/dataset1" + mock_feed = MagicMock() + mock_feed.stable_id = "feed1" + mock_feed.gtfsdatasets = [mock_dataset] + + mock_query.return_value.filter.return_value = mock_query.return_value + mock_query.return_value.all.return_value = [mock_feed] + mock_publish.return_value = None + + result = rebuild_missing_bounding_boxes( + dry_run=False, after_date=None, db_session=MagicMock() + ) + self.assertIn("Successfully published", result["message"]) + self.assertEqual(result["total_processed"], 1) @patch( "tasks.missing_bounding_boxes.rebuild_missing_bounding_boxes.get_feeds_with_missing_bounding_boxes_query" From 7617ca7b045e3be640321ea53401d8d48241ac5b Mon Sep 17 00:00:00 2001 From: Jingsi Lu Date: Fri, 20 Jun 2025 11:50:42 -0400 Subject: [PATCH 15/19] fixed total_processed error --- functions-python/helpers/query_helper.py | 58 +++++++++---------- .../rebuild_missing_bounding_boxes.py | 8 +-- 2 files changed, 33 insertions(+), 33 deletions(-) diff --git a/functions-python/helpers/query_helper.py b/functions-python/helpers/query_helper.py index b73bca31d..9147729d0 100644 --- a/functions-python/helpers/query_helper.py +++ b/functions-python/helpers/query_helper.py @@ -175,35 +175,35 @@ def get_datasets_with_missing_reports_query( return query -def get_feeds_ids_with_missing_bounding_boxes_query( - db_session: Session, -) -> Query: - """ - Get GTFS feeds ids where the dataset is missing a bounding box. - - Args: - db_session: SQLAlchemy session - - Returns: - A SQLAlchemy query object for GTFS feeds with datasets missing bounding boxes - ordered by dataset and feed stable id. - """ - query = ( - db_session.query( - Gtfsfeed.stable_id, - Gtfsdataset.stable_id, - ) - .select_from(Gtfsfeed) - .join(Gtfsdataset, Gtfsdataset.feed_id == Gtfsfeed.id) - .filter(Gtfsdataset.bounding_box.is_(None)) - .filter( - ~Gtfsfeed.feedlocationgrouppoints.any() - ) # Only feeds with no location group points - .distinct(Gtfsfeed.stable_id, Gtfsdataset.stable_id) - .order_by(Gtfsdataset.stable_id, Gtfsfeed.stable_id) - ) - - return query +# def get_feeds_ids_with_missing_bounding_boxes_query( +# db_session: Session, +# ) -> Query: +# """ +# Get GTFS feeds ids where the dataset is missing a bounding box. + +# Args: +# db_session: SQLAlchemy session + +# Returns: +# A SQLAlchemy query object for GTFS feeds with datasets missing bounding boxes +# ordered by dataset and feed stable id. +# """ +# query = ( +# db_session.query( +# Gtfsfeed.stable_id, +# Gtfsdataset.stable_id, +# ) +# .select_from(Gtfsfeed) +# .join(Gtfsdataset, Gtfsdataset.feed_id == Gtfsfeed.id) +# .filter(Gtfsdataset.bounding_box.is_(None)) +# .filter( +# ~Gtfsfeed.feedlocationgrouppoints.any() +# ) # Only feeds with no location group points +# .distinct(Gtfsfeed.stable_id, Gtfsdataset.stable_id) +# .order_by(Gtfsdataset.stable_id, Gtfsfeed.stable_id) +# ) + +# return query def get_feeds_with_missing_bounding_boxes_query( diff --git a/functions-python/tasks_executor/src/tasks/missing_bounding_boxes/rebuild_missing_bounding_boxes.py b/functions-python/tasks_executor/src/tasks/missing_bounding_boxes/rebuild_missing_bounding_boxes.py index b2db3a067..4ff0947d9 100644 --- a/functions-python/tasks_executor/src/tasks/missing_bounding_boxes/rebuild_missing_bounding_boxes.py +++ b/functions-python/tasks_executor/src/tasks/missing_bounding_boxes/rebuild_missing_bounding_boxes.py @@ -6,7 +6,6 @@ from shared.database.database import with_db_session from shared.helpers.pub_sub import publish_messages from shared.helpers.query_helper import ( - get_feeds_ids_with_missing_bounding_boxes_query, get_feeds_with_missing_bounding_boxes_query, ) from shared.database_gen.sqlacodegen_models import Gtfsdataset @@ -47,7 +46,7 @@ def rebuild_missing_bounding_boxes( logging.warning( "Invalid after_date format, expected ISO format (YYYY-MM-DD)" ) - query = get_feeds_ids_with_missing_bounding_boxes_query(db_session) + query = get_feeds_with_missing_bounding_boxes_query(db_session) if filter_after: query = query.filter(Gtfsdataset.downloaded_at >= filter_after) feeds = query.all() @@ -70,9 +69,10 @@ def rebuild_missing_bounding_boxes( project_id = os.getenv("PROJECT_ID") logging.info("Publishing to topic: %s", pubsub_topic_name) - publish_messages(prepare_feeds_data(db_session), project_id, pubsub_topic_name) + feeds_data = prepare_feeds_data(db_session) + publish_messages(feeds_data, project_id, pubsub_topic_name) - total_processed = len(feeds) + total_processed = len(feeds_data) logging.info( "Published %s feeds with missing bounding boxes to Pub/Sub topic: %s, filtered after %s.", total_processed, From 19de6f5787744449f75138505231ee7a85ff4545 Mon Sep 17 00:00:00 2001 From: Jingsi Lu Date: Fri, 20 Jun 2025 14:47:41 -0400 Subject: [PATCH 16/19] fixed query --- functions-python/helpers/query_helper.py | 32 +----------------------- 1 file changed, 1 insertion(+), 31 deletions(-) diff --git a/functions-python/helpers/query_helper.py b/functions-python/helpers/query_helper.py index 9147729d0..b9ab32c2f 100644 --- a/functions-python/helpers/query_helper.py +++ b/functions-python/helpers/query_helper.py @@ -175,37 +175,6 @@ def get_datasets_with_missing_reports_query( return query -# def get_feeds_ids_with_missing_bounding_boxes_query( -# db_session: Session, -# ) -> Query: -# """ -# Get GTFS feeds ids where the dataset is missing a bounding box. - -# Args: -# db_session: SQLAlchemy session - -# Returns: -# A SQLAlchemy query object for GTFS feeds with datasets missing bounding boxes -# ordered by dataset and feed stable id. -# """ -# query = ( -# db_session.query( -# Gtfsfeed.stable_id, -# Gtfsdataset.stable_id, -# ) -# .select_from(Gtfsfeed) -# .join(Gtfsdataset, Gtfsdataset.feed_id == Gtfsfeed.id) -# .filter(Gtfsdataset.bounding_box.is_(None)) -# .filter( -# ~Gtfsfeed.feedlocationgrouppoints.any() -# ) # Only feeds with no location group points -# .distinct(Gtfsfeed.stable_id, Gtfsdataset.stable_id) -# .order_by(Gtfsdataset.stable_id, Gtfsfeed.stable_id) -# ) - -# return query - - def get_feeds_with_missing_bounding_boxes_query( db_session: Session, ) -> Query: @@ -222,6 +191,7 @@ def get_feeds_with_missing_bounding_boxes_query( query = ( db_session.query(Gtfsfeed) .join(Gtfsdataset, Gtfsdataset.feed_id == Gtfsfeed.id) + .filter(Gtfsdataset.latest.is_(True)) .filter(Gtfsdataset.bounding_box.is_(None)) .filter(~Gtfsfeed.feedlocationgrouppoints.any()) .distinct(Gtfsfeed.stable_id, Gtfsdataset.stable_id) From 1c7164228c3b4f9455325afee760c49fe216e78b Mon Sep 17 00:00:00 2001 From: Jingsi Lu Date: Mon, 23 Jun 2025 11:14:28 -0400 Subject: [PATCH 17/19] added pubsub topi name as ENV --- infra/functions-python/main.tf | 1 + 1 file changed, 1 insertion(+) diff --git a/infra/functions-python/main.tf b/infra/functions-python/main.tf index 12e3fc5c1..a945d01a5 100644 --- a/infra/functions-python/main.tf +++ b/infra/functions-python/main.tf @@ -1221,6 +1221,7 @@ resource "google_cloudfunctions2_function" "tasks_executor" { environment_variables = { PROJECT_ID = var.project_id ENV = var.environment + PUBSUB_TOPIC_NAME = "rebuild-bounding-boxes-topic" } available_memory = local.function_tasks_executor_config.memory timeout_seconds = local.function_tasks_executor_config.timeout From bb1e4b0c6e1c3369f7fa0254653b265181b47f92 Mon Sep 17 00:00:00 2001 From: Jingsi Lu Date: Mon, 23 Jun 2025 12:19:47 -0400 Subject: [PATCH 18/19] setup resources in terraform --- infra/functions-python/main.tf | 12 ++++++++++++ 1 file changed, 12 insertions(+) diff --git a/infra/functions-python/main.tf b/infra/functions-python/main.tf index a945d01a5..ba78f60fb 100644 --- a/infra/functions-python/main.tf +++ b/infra/functions-python/main.tf @@ -1246,6 +1246,18 @@ resource "google_cloudfunctions2_function" "tasks_executor" { } } +# Create the Pub/Sub topic used for publishing messages about rebuilding missing bounding boxes +resource "google_pubsub_topic" "rebuild_missing_bounding_boxes" { + name = "rebuild-bounding-boxes-topic" +} + +# Grant the Cloud Functions service account permission to publish messages to the rebuild-bounding-boxes-topic Pub/Sub topic +resource "google_pubsub_topic_iam_member" "rebuild_missing_bounding_boxes_publisher" { + topic = google_pubsub_topic.rebuild_bounding_boxes.name + role = "roles/pubsub.publisher" + member = "serviceAccount:${google_service_account.functions_service_account.email}" +} + # IAM entry for all users to invoke the function resource "google_cloudfunctions2_function_iam_member" "tokens_invoker" { project = var.project_id From 7cede2d73cae4748fadc6523bc8b32bd224746ab Mon Sep 17 00:00:00 2001 From: Jingsi Lu Date: Mon, 23 Jun 2025 13:48:50 -0400 Subject: [PATCH 19/19] fixed typos --- infra/functions-python/main.tf | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/infra/functions-python/main.tf b/infra/functions-python/main.tf index ba78f60fb..770546879 100644 --- a/infra/functions-python/main.tf +++ b/infra/functions-python/main.tf @@ -1253,7 +1253,7 @@ resource "google_pubsub_topic" "rebuild_missing_bounding_boxes" { # Grant the Cloud Functions service account permission to publish messages to the rebuild-bounding-boxes-topic Pub/Sub topic resource "google_pubsub_topic_iam_member" "rebuild_missing_bounding_boxes_publisher" { - topic = google_pubsub_topic.rebuild_bounding_boxes.name + topic = google_pubsub_topic.rebuild_missing_bounding_boxes.name role = "roles/pubsub.publisher" member = "serviceAccount:${google_service_account.functions_service_account.email}" }