Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
25 changes: 25 additions & 0 deletions functions-python/helpers/query_helper.py
Original file line number Diff line number Diff line change
Expand Up @@ -173,3 +173,28 @@ def get_datasets_with_missing_reports_query(
Gtfsdataset.stable_id, Gtfsfeed.stable_id
)
return query


def get_feeds_with_missing_bounding_boxes_query(
db_session: Session,
) -> Query:
"""
Get GTFS feeds with datasets missing bounding boxes.

Args:
db_session: SQLAlchemy session

Returns:
A SQLAlchemy query object for GTFS feeds with datasets missing bounding boxes
ordered by feed stable id.
"""
query = (
db_session.query(Gtfsfeed)
.join(Gtfsdataset, Gtfsdataset.feed_id == Gtfsfeed.id)
.filter(Gtfsdataset.latest.is_(True))
.filter(Gtfsdataset.bounding_box.is_(None))
.filter(~Gtfsfeed.feedlocationgrouppoints.any())
.distinct(Gtfsfeed.stable_id, Gtfsdataset.stable_id)
.order_by(Gtfsdataset.stable_id, Gtfsfeed.stable_id)
)
return query
20 changes: 17 additions & 3 deletions functions-python/tasks_executor/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,9 @@
This directory contains Google Cloud Functions used as a single point of access to multiple _tasks_.

## Usage

The function receive the following payload:

```
{
"task": "string", # [required] Name of the task to execute
Expand All @@ -12,6 +14,7 @@ The function receive the following payload:
```

Example:

```json
{
"task": "rebuild_missing_validation_reports",
Expand All @@ -21,11 +24,22 @@ Example:
"filter_statuses": ["active", "inactive", "future"]
}
}
{
"task": "rebuild_missing_bounding_boxes",
"payload": {
"dry_run": true,
"after_date": "2025-06-01"
}
}
```

To get the list of supported tasks use:
``
{
"name": "list_tasks",
"payload": {}
"name": "list_tasks",
"payload": {}
}
`````

```

```
1 change: 1 addition & 0 deletions functions-python/tasks_executor/requirements.txt
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ geoalchemy2==0.14.7

# Google specific packages for this function
google-cloud-workflows
google-cloud-pubsub
flask

# Configuration
Expand Down
7 changes: 7 additions & 0 deletions functions-python/tasks_executor/src/main.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,9 @@
from tasks.validation_reports.rebuild_missing_validation_reports import (
rebuild_missing_validation_reports_handler,
)
from tasks.missing_bounding_boxes.rebuild_missing_bounding_boxes import (
rebuild_missing_bounding_boxes_handler,
)


init_logger()
Expand All @@ -42,6 +45,10 @@
"description": "Rebuilds missing validation reports for GTFS datasets.",
"handler": rebuild_missing_validation_reports_handler,
},
"rebuild_missing_bounding_boxes": {
"description": "Rebuilds missing bounding boxes for GTFS datasets that contain valid stops.txt files.",
"handler": rebuild_missing_bounding_boxes_handler,
},
}


Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,133 @@
import logging
import os
from typing import Dict, List
from sqlalchemy.orm import Session

from shared.database.database import with_db_session
from shared.helpers.pub_sub import publish_messages
from shared.helpers.query_helper import (
get_feeds_with_missing_bounding_boxes_query,
)
from shared.database_gen.sqlacodegen_models import Gtfsdataset
from datetime import datetime


def rebuild_missing_bounding_boxes_handler(payload) -> dict:
(dry_run, after_date) = get_parameters(payload)

return rebuild_missing_bounding_boxes(
dry_run=dry_run,
after_date=after_date,
)


@with_db_session
def rebuild_missing_bounding_boxes(
dry_run: bool = True,
after_date: str = None,
db_session: Session | None = None,
) -> dict:
"""
Find GTFS feeds/datasets missing bounding boxes and either log or publish them for processing.

Args:
dry_run (bool): If True, only logs the number of feeds found (no publishing).
after_date (str, optional): ISO date string (YYYY-MM-DD). Only datasets downloaded after this date are included.
db_session (Session, optional): SQLAlchemy session, injected by @with_db_session.

Returns:
dict: Summary message and count of processed feeds.
"""
filter_after = None
if after_date:
try:
filter_after = datetime.fromisoformat(after_date)
except Exception:
logging.warning(
"Invalid after_date format, expected ISO format (YYYY-MM-DD)"
)
query = get_feeds_with_missing_bounding_boxes_query(db_session)
if filter_after:
query = query.filter(Gtfsdataset.downloaded_at >= filter_after)
feeds = query.all()

if dry_run:
total_processed = len(feeds)
logging.info(
"Dry run mode: %s feeds with missing bounding boxes found, filtered after %s.",
total_processed,
after_date,
)
return {
"message": f"Dry run: {total_processed} feeds with missing bounding boxes found."
+ (f" Filtered after: {filter_after}" if filter_after else ""),
"total_processed": total_processed,
}
else:
# publish a message to a Pub/Sub topic for each feed
pubsub_topic_name = os.getenv("PUBSUB_TOPIC_NAME", None)
project_id = os.getenv("PROJECT_ID")

logging.info("Publishing to topic: %s", pubsub_topic_name)
feeds_data = prepare_feeds_data(db_session)
publish_messages(feeds_data, project_id, pubsub_topic_name)

total_processed = len(feeds_data)
logging.info(
"Published %s feeds with missing bounding boxes to Pub/Sub topic: %s, filtered after %s.",
total_processed,
pubsub_topic_name,
after_date,
)
return {
"message": f"Successfully published {total_processed} feeds with missing bounding boxes."
+ (f" Filtered after: {filter_after}" if filter_after else ""),
"total_processed": total_processed,
}


def prepare_feeds_data(db_session: Session | None = None) -> List[Dict]:
"""
Format feeds data for Pub/Sub messages.

Args:
feeds: List of Gtfsfeed objects

Returns:
List of dictionaries with feed data
"""
data = []
query = get_feeds_with_missing_bounding_boxes_query(db_session)
feeds = query.all()

for feed in feeds:
# Get the latest dataset
if feed.gtfsdatasets and any(dataset.latest for dataset in feed.gtfsdatasets):
latest_dataset = next(
dataset for dataset in feed.gtfsdatasets if dataset.latest
)

data.append(
{
"stable_id": feed.stable_id,
"dataset_id": latest_dataset.stable_id,
"url": latest_dataset.hosted_url,
}
)

return data


def get_parameters(payload):
"""
Get parameters from the payload and environment variables.

Args:
payload (dict): dictionary containing the payload data.
Returns:
tuple: (dry_run, after_date)
"""
dry_run = payload.get("dry_run", True)
dry_run = dry_run if isinstance(dry_run, bool) else str(dry_run).lower() == "true"
after_date = payload.get("after_date", None)
return dry_run, after_date
Original file line number Diff line number Diff line change
@@ -0,0 +1,112 @@
import unittest
from unittest.mock import patch, MagicMock
from datetime import datetime

from tasks.missing_bounding_boxes.rebuild_missing_bounding_boxes import (
get_parameters,
rebuild_missing_bounding_boxes,
)


class TestTasksExecutor(unittest.TestCase):
def test_get_parameters(self):
payload = {"dry_run": True}
dry_run, after_date = get_parameters(payload)
self.assertTrue(dry_run)
self.assertIsNone(after_date)

def test_get_parameters_with_valid_after_date(self):
payload = {"dry_run": False, "after_date": "2024-06-01"}
dry_run, after_date = get_parameters(payload)
self.assertFalse(dry_run)
self.assertEqual(after_date, "2024-06-01")
# Check ISO format
try:
datetime.fromisoformat(after_date)
except ValueError:
self.fail(f"after_date '{after_date}' is not a valid ISO date string")

def test_get_parameters_with_string_bool(self):
payload = {"dry_run": "false", "after_date": None}
dry_run, after_date = get_parameters(payload)
self.assertFalse(dry_run)
self.assertIsNone(after_date)

def test_get_parameters_missing_keys(self):
payload = {}
dry_run, after_date = get_parameters(payload)
self.assertTrue(dry_run)
self.assertIsNone(after_date)

@patch(
"tasks.missing_bounding_boxes.rebuild_missing_bounding_boxes.get_feeds_with_missing_bounding_boxes_query"
)
def test_rebuild_missing_bounding_boxes_dry_run(self, mock_query):
# Mock Gtfsdataset and Gtfsfeed objects
mock_dataset1 = MagicMock()
mock_dataset1.latest = True
mock_dataset1.stable_id = "dataset1"
mock_dataset1.hosted_url = "http://example.com/dataset1"
mock_feed1 = MagicMock()
mock_feed1.stable_id = "feed1"
mock_feed1.gtfsdatasets = [mock_dataset1]

mock_dataset2 = MagicMock()
mock_dataset2.latest = True
mock_dataset2.stable_id = "dataset2"
mock_dataset2.hosted_url = "http://example.com/dataset2"
mock_feed2 = MagicMock()
mock_feed2.stable_id = "feed2"
mock_feed2.gtfsdatasets = [mock_dataset2]

mock_query.return_value.filter.return_value = mock_query.return_value
mock_query.return_value.all.return_value = [mock_feed1, mock_feed2]

result = rebuild_missing_bounding_boxes(
dry_run=True, after_date=None, db_session=MagicMock()
)
self.assertIn("Dry run", result["message"])
self.assertEqual(result["total_processed"], 2)

@patch(
"tasks.missing_bounding_boxes.rebuild_missing_bounding_boxes.publish_messages"
)
@patch(
"tasks.missing_bounding_boxes.rebuild_missing_bounding_boxes.get_feeds_with_missing_bounding_boxes_query"
)
def test_rebuild_missing_bounding_boxes_publish(self, mock_query, mock_publish):
# Mock Gtfsdataset and Gtfsfeed objects
mock_dataset = MagicMock()
mock_dataset.latest = True
mock_dataset.stable_id = "dataset1"
mock_dataset.hosted_url = "http://example.com/dataset1"
mock_feed = MagicMock()
mock_feed.stable_id = "feed1"
mock_feed.gtfsdatasets = [mock_dataset]

mock_query.return_value.filter.return_value = mock_query.return_value
mock_query.return_value.all.return_value = [mock_feed]
mock_publish.return_value = None

result = rebuild_missing_bounding_boxes(
dry_run=False, after_date=None, db_session=MagicMock()
)
self.assertIn("Successfully published", result["message"])
self.assertEqual(result["total_processed"], 1)

@patch(
"tasks.missing_bounding_boxes.rebuild_missing_bounding_boxes.get_feeds_with_missing_bounding_boxes_query"
)
def test_rebuild_missing_bounding_boxes_invalid_after_date(self, mock_query):
mock_query.return_value.filter.return_value = mock_query.return_value
mock_query.return_value.all.return_value = []
# Should log a warning and not raise
result = rebuild_missing_bounding_boxes(
dry_run=True, after_date="not-a-date", db_session=MagicMock()
)
self.assertIn("Dry run", result["message"])
self.assertEqual(result["total_processed"], 0)


if __name__ == "__main__":
unittest.main()
13 changes: 13 additions & 0 deletions infra/functions-python/main.tf
Original file line number Diff line number Diff line change
Expand Up @@ -1221,6 +1221,7 @@ resource "google_cloudfunctions2_function" "tasks_executor" {
environment_variables = {
PROJECT_ID = var.project_id
ENV = var.environment
PUBSUB_TOPIC_NAME = "rebuild-bounding-boxes-topic"
}
available_memory = local.function_tasks_executor_config.memory
timeout_seconds = local.function_tasks_executor_config.timeout
Expand All @@ -1245,6 +1246,18 @@ resource "google_cloudfunctions2_function" "tasks_executor" {
}
}

# Create the Pub/Sub topic used for publishing messages about rebuilding missing bounding boxes
resource "google_pubsub_topic" "rebuild_missing_bounding_boxes" {
name = "rebuild-bounding-boxes-topic"
}

# Grant the Cloud Functions service account permission to publish messages to the rebuild-bounding-boxes-topic Pub/Sub topic
resource "google_pubsub_topic_iam_member" "rebuild_missing_bounding_boxes_publisher" {
topic = google_pubsub_topic.rebuild_missing_bounding_boxes.name
role = "roles/pubsub.publisher"
member = "serviceAccount:${google_service_account.functions_service_account.email}"
}

# IAM entry for all users to invoke the function
resource "google_cloudfunctions2_function_iam_member" "tokens_invoker" {
project = var.project_id
Expand Down