Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
31 commits
Select commit Hold shift + click to select a range
ca2e5c2
feat: GTFS validator orchestrator — generic task tracker + extended r…
davidgamez Mar 31, 2026
a0d1fbf
fix: correct test_task_execution_tracker failures
davidgamez Mar 31, 2026
d26de11
fix lint
davidgamez Mar 31, 2026
3f778bc
fix lint
davidgamez Mar 31, 2026
9f1bb41
refactor: make bypass_db_update explicit in rebuild_missing_validatio…
davidgamez Mar 31, 2026
ba7a93b
split parameters
davidgamez Mar 31, 2026
b7038bd
refactor: make filter_after_in_days optional with no default
davidgamez Mar 31, 2026
6d962a9
feat: add dispatch_complete indicator to validation run status
davidgamez Mar 31, 2026
bdc97fd
docs: update validation reports README with new params and pre-releas…
davidgamez Mar 31, 2026
bfac479
docs: fix BigQuery ingestion trigger command
davidgamez Mar 31, 2026
a1cd68a
feat: add filter_op_statuses param to rebuild_missing_validation_reports
davidgamez Mar 31, 2026
fba0131
docs: add filter_op_statuses to payload example in README
davidgamez Mar 31, 2026
f26a95b
perf: apply limit before GCS blob check in rebuild_missing_validation…
davidgamez Mar 31, 2026
84fd922
perf: stop GCS blob checks early once limit is reached
davidgamez Mar 31, 2026
75feb89
fix: use 'metadata' column name in on_conflict_do_update set_ dict
davidgamez Mar 31, 2026
2a88475
fix converter permissions
davidgamez Mar 31, 2026
22e0feb
Revert "fix converter permissions"
davidgamez Mar 31, 2026
7aa011c
replace get_bucket with bucket that requires less permissions
davidgamez Apr 1, 2026
5b633e6
fix unit tests
davidgamez Apr 2, 2026
05714e3
feat: replace get_validation_run_status with generic self-scheduling …
davidgamez Apr 2, 2026
e62ef12
refactor: use Cloud Tasks retry (503) instead of self-scheduling in s…
davidgamez Apr 2, 2026
d7e9a2a
apply lint
davidgamez Apr 2, 2026
ad20ec7
refactor: rename _task_run_id to task_run_id (public attribute)
davidgamez Apr 2, 2026
c364cc8
feat: add get_task_run_status read-only task
davidgamez Apr 2, 2026
5d5a0d9
fix lint
davidgamez Apr 2, 2026
71891d0
fix: delay initial sync_task_run_status fire by 10 minutes
davidgamez Apr 2, 2026
d69932f
feat: add total_already_tracked to dry run response
davidgamez Apr 7, 2026
f9ff073
fix import
davidgamez Apr 7, 2026
c46450a
chore: remove update_validation_report — superseded by rebuild_missin…
davidgamez Apr 7, 2026
0bc17f8
update docs
davidgamez Apr 7, 2026
e6d1085
Merge branch 'main' into feat/gtfs-validators-orchestrator
davidgamez Apr 7, 2026
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
458 changes: 458 additions & 0 deletions functions-python/helpers/task_execution/task_execution_tracker.py

Large diffs are not rendered by default.

187 changes: 187 additions & 0 deletions functions-python/helpers/tests/test_task_execution_tracker.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,187 @@
#
# MobilityData 2026
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#

import unittest
import uuid
from datetime import datetime, timezone
from unittest.mock import MagicMock

from task_execution.task_execution_tracker import (
TaskExecutionTracker,
STATUS_IN_PROGRESS,
STATUS_TRIGGERED,
STATUS_COMPLETED,
STATUS_FAILED,
)


def _make_tracker(task_name="test_task", run_id="v1.0"):
"""Return a tracker with a mock DB session."""
session = MagicMock()
tracker = TaskExecutionTracker(
task_name=task_name, run_id=run_id, db_session=session
)
return tracker, session


class TestTaskExecutionTrackerStartRun(unittest.TestCase):
def test_start_run_upserts_task_run(self):
tracker, session = _make_tracker()
run_uuid = uuid.uuid4()
execute_result = MagicMock()
execute_result.scalar_one.return_value = run_uuid
session.execute.return_value = execute_result

result = tracker.start_run(total_count=100, params={"env": "staging"})

self.assertEqual(result, run_uuid)
self.assertEqual(tracker.task_run_id, run_uuid)
session.execute.assert_called_once()
session.flush.assert_called_once()

def test_start_run_cachestask_run_id(self):
tracker, session = _make_tracker()
run_uuid = uuid.uuid4()
execute_result = MagicMock()
execute_result.scalar_one.return_value = run_uuid
session.execute.return_value = execute_result

tracker.start_run(total_count=10)
tracker.start_run(total_count=20) # second call

self.assertEqual(tracker.task_run_id, run_uuid)


class TestTaskExecutionTrackerIsTriggered(unittest.TestCase):
def test_returns_true_when_triggered_row_exists(self):
tracker, session = _make_tracker()
existing_row = MagicMock()
session.query.return_value.filter.return_value.filter.return_value.first.return_value = (
existing_row
)

result = tracker.is_triggered("ds-123")
self.assertTrue(result)

def test_returns_false_when_no_row(self):
tracker, session = _make_tracker()
session.query.return_value.filter.return_value.filter.return_value.first.return_value = (
None
)

result = tracker.is_triggered("ds-999")
self.assertFalse(result)

def test_handles_none_entity_id(self):
tracker, session = _make_tracker()
session.query.return_value.filter.return_value.filter.return_value.first.return_value = (
None
)

result = tracker.is_triggered(None)
self.assertFalse(result)


class TestTaskExecutionTrackerMarkTriggered(unittest.TestCase):
def test_mark_triggered_inserts_execution_log(self):
tracker, session = _make_tracker()
tracker.task_run_id = uuid.uuid4()

tracker.mark_triggered("ds-1", execution_ref="projects/x/executions/abc")

session.execute.assert_called_once()
session.flush.assert_called_once()

def test_mark_triggered_with_metadata(self):
tracker, session = _make_tracker()
tracker.task_run_id = uuid.uuid4()

tracker.mark_triggered("ds-1", metadata={"feed_id": "f-1"})

session.execute.assert_called_once()


class TestTaskExecutionTrackerMarkCompleted(unittest.TestCase):
def test_mark_completed_updates_status(self):
tracker, session = _make_tracker()
query_mock = MagicMock()
session.query.return_value.filter.return_value.filter.return_value = query_mock

tracker.mark_completed("ds-1")

query_mock.update.assert_called_once()
update_args = query_mock.update.call_args[0][0]
self.assertEqual(update_args["status"], STATUS_COMPLETED)
self.assertIn("completed_at", update_args)


class TestTaskExecutionTrackerMarkFailed(unittest.TestCase):
def test_mark_failed_sets_error_message(self):
tracker, session = _make_tracker()
query_mock = MagicMock()
session.query.return_value.filter.return_value.filter.return_value = query_mock

tracker.mark_failed("ds-1", error_message="Workflow timed out")

query_mock.update.assert_called_once()
update_args = query_mock.update.call_args[0][0]
self.assertEqual(update_args["status"], STATUS_FAILED)
self.assertEqual(update_args["error_message"], "Workflow timed out")


class TestTaskExecutionTrackerGetSummary(unittest.TestCase):
def _make_task_run(self, status=STATUS_IN_PROGRESS, total_count=10):
run = MagicMock()
run.status = status
run.total_count = total_count
run.created_at = datetime.now(timezone.utc)
return run

def test_returns_none_summary_when_no_run(self):
tracker, session = _make_tracker()
session.query.return_value.filter.return_value.first.return_value = None
session.query.return_value.filter.return_value.all.return_value = []

summary = tracker.get_summary()

self.assertIsNone(summary["run_status"])
self.assertEqual(summary["triggered"], 0)
self.assertEqual(summary["completed"], 0)

def test_counts_by_status(self):
tracker, session = _make_tracker()
task_run = self._make_task_run(total_count=5)

rows = [
MagicMock(status=STATUS_TRIGGERED),
MagicMock(status=STATUS_TRIGGERED),
MagicMock(status=STATUS_COMPLETED),
MagicMock(status=STATUS_FAILED),
]

def query_side_effect(*args):
m = MagicMock()
m.filter.return_value.first.return_value = task_run
m.filter.return_value.all.return_value = rows
return m

session.query.side_effect = query_side_effect

summary = tracker.get_summary()
self.assertEqual(summary["triggered"], 2)
self.assertEqual(summary["completed"], 1)
self.assertEqual(summary["failed"], 1)
self.assertEqual(summary["pending"], 1) # 5 total - 4 processed
Original file line number Diff line number Diff line change
Expand Up @@ -47,13 +47,18 @@ def execute_workflows(
validator_endpoint=None,
bypass_db_update=False,
reports_bucket_name=None,
tracker=None,
):
"""
Execute the workflow for the latest datasets that need their validation report to be updated
Execute the workflow for the latest datasets that need their validation report to be updated.

:param latest_datasets: List of tuples containing the feed stable id and dataset stable id
:param validator_endpoint: The URL of the validator
:param bypass_db_update: Whether to bypass the database update
:param reports_bucket_name: The name of the bucket where the reports are stored
:param tracker: Optional TaskExecutionTracker for idempotent execution tracking.
When provided, datasets already in triggered/completed state are skipped
and newly triggered datasets are recorded.
:return: List of dataset stable ids for which the workflow was executed
"""
project_id = f"mobility-feeds-{env}"
Expand All @@ -64,6 +69,9 @@ def execute_workflows(
count = 0
logging.info(f"Executing workflow for {len(latest_datasets)} datasets")
for feed_id, dataset_id in latest_datasets:
if tracker and tracker.is_triggered(dataset_id):
logging.info(f"Skipping already triggered dataset {feed_id}/{dataset_id}")
continue
try:
input_data = {
"data": {
Expand All @@ -83,12 +91,21 @@ def execute_workflows(
if reports_bucket_name:
input_data["data"]["reports_bucket_name"] = reports_bucket_name
logging.info(f"Executing workflow for {feed_id}/{dataset_id}")
execute_workflow(project_id, input_data=input_data)
execution = execute_workflow(project_id, input_data=input_data)
execution_triggered_datasets.append(dataset_id)
if tracker:
tracker.mark_triggered(
entity_id=dataset_id,
execution_ref=execution.name,
metadata={"feed_id": feed_id},
bypass_db_update=bypass_db_update,
)
except Exception as e:
logging.error(
f"Error while executing workflow for {feed_id}/{dataset_id}: {e}"
)
if tracker:
tracker.mark_failed(entity_id=dataset_id, error_message=str(e))
count += 1
logging.info(f"Triggered workflow execution for {count} datasets")
if count % batch_size == 0:
Expand Down
17 changes: 17 additions & 0 deletions functions-python/process_validation_report/src/main.py
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@
from shared.helpers.logger import init_logger
from shared.helpers.transform import get_nested_value
from shared.helpers.feed_status import update_feed_statuses_query
from shared.helpers.task_execution.task_execution_tracker import TaskExecutionTracker

init_logger()

Expand Down Expand Up @@ -286,6 +287,22 @@ def create_validation_report_entities(
return str(error), 200

update_feed_statuses_query(db_session, [feed_stable_id])

# Update execution tracker regardless of bypass_db_update, so monitoring
# works for both pre-release and post-release validation runs.
try:
tracker = TaskExecutionTracker(
task_name="gtfs_validation",
run_id=version,
db_session=db_session,
)
tracker.mark_completed(dataset_stable_id)
db_session.commit()
except Exception as tracker_error:
logging.warning(
"Could not update task execution tracker: %s", tracker_error
)

result = f"Created {len(entities)} entities."
logging.info(result)
return result, 200
Expand Down
16 changes: 15 additions & 1 deletion functions-python/tasks_executor/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -20,12 +20,26 @@ Examples:
"task": "rebuild_missing_validation_reports",
"payload": {
"dry_run": true,
"filter_after_in_days": 14,
"bypass_db_update": true,
"filter_after_in_days": null,
"force_update": false,
"validator_endpoint": "https://stg-gtfs-validator-web-mbzoxaljzq-ue.a.run.app",
"limit": 1,
"filter_statuses": ["active", "inactive", "future"]
}
}
```

```json
{
"task": "get_validation_run_status",
"payload": {
"task_name": "gtfs_validation",
"run_id": "7.1.1-SNAPSHOT"
}
}
```

```json
{
"task": "rebuild_missing_bounding_boxes",
Expand Down
31 changes: 31 additions & 0 deletions functions-python/tasks_executor/src/main.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import functions_framework

from shared.helpers.logger import init_logger
from shared.helpers.task_execution.task_execution_tracker import TaskInProgressError
from tasks.data_import.transitfeeds.sync_transitfeeds import sync_transitfeeds_handler
from tasks.data_import.transportdatagouv.import_tdg_feeds import import_tdg_handler
from tasks.data_import.transportdatagouv.update_tdg_redirects import (
Expand All @@ -39,6 +40,12 @@
from tasks.validation_reports.rebuild_missing_validation_reports import (
rebuild_missing_validation_reports_handler,
)
from tasks.sync_task_run_status import (
sync_task_run_status_handler,
)
from tasks.get_task_run_status import (
get_task_run_status_handler,
)
from tasks.visualization_files.rebuild_missing_visualization_files import (
rebuild_missing_visualization_files_handler,
)
Expand Down Expand Up @@ -71,6 +78,25 @@
"description": "Rebuilds missing validation reports for GTFS datasets.",
"handler": rebuild_missing_validation_reports_handler,
},
"get_task_run_status": {
"description": (
"Read-only snapshot of a task_run tracked by TaskExecutionTracker. "
"Returns current DB state (triggered/completed/failed/pending counts) "
"without triggering any GCP Workflows polling or status transitions. "
"Required: task_name, run_id."
),
"handler": get_task_run_status_handler,
},
"sync_task_run_status": {
"description": (
"Generic self-scheduling monitor for any task_run. "
"Polls GCP Workflows for triggered entries, updates statuses, "
"marks the task_run completed when all done, and re-schedules "
"itself every 10 minutes until complete. "
"Required: task_name, run_id."
),
"handler": sync_task_run_status_handler,
},
"rebuild_missing_bounding_boxes": {
"description": "Rebuilds missing bounding boxes for GTFS datasets that contain valid stops.txt files.",
"handler": rebuild_missing_bounding_boxes_handler,
Expand Down Expand Up @@ -195,5 +221,10 @@ def tasks_executor(request: flask.Request) -> flask.Response:

# Default JSON response
return flask.make_response(flask.jsonify(result), 200)
except TaskInProgressError as error:
# Signal Cloud Tasks to retry — the run is not yet complete
return flask.make_response(
flask.jsonify({"status": "in_progress", "detail": str(error)}), 503
)
except Exception as error:
return flask.make_response(flask.jsonify({"error": str(error)}), 500)
Loading
Loading