Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion api/src/shared/database/database.py
Original file line number Diff line number Diff line change
Expand Up @@ -96,7 +96,7 @@ def refresh_materialized_view(session: "Session", view_name: str) -> bool:
session.execute(text(f"REFRESH MATERIALIZED VIEW CONCURRENTLY {view_name}"))
return True
except Exception as error:
logging.error(f"Error raised while refreshing view: {error}")
logging.error("Error raised while refreshing view: %s", error)
return False


Expand Down
36 changes: 21 additions & 15 deletions functions-python/backfill_dataset_service_date_range/src/main.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
import os
import functions_framework

from shared.helpers.logger import Logger
from shared.helpers.logger import init_logger

from shared.database.database import with_db_session, refresh_materialized_view

Expand All @@ -28,7 +28,7 @@
env = os.getenv("ENV", "dev").lower()
bucket_name = f"mobilitydata-datasets-{env}"

logging.basicConfig(level=logging.INFO)
init_logger()


def is_version_gte(target_version: str, version_field):
Expand Down Expand Up @@ -66,10 +66,10 @@ def backfill_datasets(session: "Session"):
)
).all()

logging.info(f"Found {len(datasets)} datasets to process.")
logging.info("Found %s datasets to process.", len(datasets))

for dataset in datasets:
logging.info(f"Processing gtfsdataset ID {dataset.stable_id}")
logging.info("Processing gtfsdataset ID %s", dataset.stable_id)
gtfsdataset_id = dataset.stable_id
feed_stable_id = "-".join(gtfsdataset_id.split("-")[0:2])
# Get the latest validation report for the dataset
Expand All @@ -81,7 +81,8 @@ def backfill_datasets(session: "Session"):

if not latest_validation_report:
logging.info(
f"Skipping gtfsdataset ID {gtfsdataset_id}: no validation reports found."
"Skipping gtfsdataset ID %s: no validation reports found.",
gtfsdataset_id,
)
continue

Expand All @@ -90,7 +91,7 @@ def backfill_datasets(session: "Session"):
try:
# Download the JSON report
blob_url = f"{feed_stable_id}/{gtfsdataset_id}/report_{latest_validation_report.validator_version}.json"
logging.info("Blob URL: " + blob_url)
logging.info("Blob URL: %s", blob_url)
dataset_blob = bucket.blob(blob_url)
if not dataset_blob.exists():
logging.info("Blob not found, downloading from URL")
Expand All @@ -102,7 +103,9 @@ def backfill_datasets(session: "Session"):
logging.info("Blob found, downloading from blob")
json_data = json.loads(dataset_blob.download_as_string())
except Exception as e:
logging.error(f"Error downloading blob: {e} trying json report url")
logging.error(
"Error downloading blob trying json report url: %s", e
)
response = requests.get(json_report_url)
response.raise_for_status()
json_data = response.json()
Expand Down Expand Up @@ -133,7 +136,9 @@ def backfill_datasets(session: "Session"):

formatted_dates = f"{utc_service_start_date:%Y-%m-%d %H:%M} - {utc_service_end_date:%Y-%m-%d %H:%M}"
logging.info(
f"Updated gtfsdataset ID {gtfsdataset_id} with value: {formatted_dates}"
"Updated gtfsdataset ID %s with value: ",
gtfsdataset_id,
formatted_dates,
)
total_changes_count += 1
changes_count += 1
Expand All @@ -151,32 +156,31 @@ def backfill_datasets(session: "Session"):

except requests.RequestException as e:
logging.error(
f"Error downloading JSON for gtfsdataset ID {gtfsdataset_id}: {e}"
"Error downloading JSON for gtfsdataset ID %s: %s", gtfsdataset_id, e
)
except json.JSONDecodeError as e:
logging.error(
f"Error parsing JSON for gtfsdataset ID {gtfsdataset_id}: {e}"
"Error parsing JSON for gtfsdataset ID %s: %s", gtfsdataset_id, e
)
except Exception as e:
logging.error(f"Error processing gtfsdataset ID {gtfsdataset_id}: {e}")
logging.error("Error processing gtfsdataset ID %s: %s", gtfsdataset_id, e)

try:
session.commit()
logging.info("Database changes committed.")
session.close()
return total_changes_count
except Exception as e:
logging.error("Error committing changes:", e)
logging.error("Error committing changes: %s", e)
session.rollback()
session.close()
raise Exception(f"Error creating dataset: {e}")
raise Exception("Error creating dataset: %s", e)


@functions_framework.http
@with_db_session
def backfill_dataset_service_date_range(_, db_session: Session):
"""Fills gtfs dataset service date range from the latest validation report."""
Logger.init_logger()
change_count = 0
try:
logging.info("Database session started.")
Expand All @@ -186,4 +190,6 @@ def backfill_dataset_service_date_range(_, db_session: Session):
logging.error(f"Error setting the datasets service date range values: {error}")
return f"Error setting the datasets service date range values: {error}", 500

return f"Script executed successfully. {change_count} datasets updated", 200
result = f"Script executed successfully. {change_count} datasets updated"
logging.info(result)
return result, 200
Original file line number Diff line number Diff line change
Expand Up @@ -164,10 +164,9 @@ def test_backfill_datasets_service_date_range_swap(mock_get, mock_storage_client
mock_session.commit.assert_called_once()


@patch("logging.error", autospec=True)
@patch("google.cloud.storage.Client", autospec=True)
@patch("requests.get")
def test_backfill_datasets_error_commit(mock_get, mock_storage_client, mock_logger):
def test_backfill_datasets_error_commit(mock_get, mock_storage_client):
# Mock the storage client and bucket
mock_bucket = MagicMock()
mock_client_instance = mock_storage_client.return_value
Expand Down Expand Up @@ -412,9 +411,8 @@ def test_backfill_datasets_fail_to_get_validation_report(mock_get, mock_storage_
mock_session.commit.assert_called_once()


@patch("main.Logger", autospec=True)
@patch("main.backfill_datasets")
def test_backfill_dataset_service_date_range(mock_backfill_datasets, mock_logger):
def test_backfill_dataset_service_date_range(mock_backfill_datasets):
mock_backfill_datasets.return_value = 5

with patch.dict(os.environ, {"FEEDS_DATABASE_URL": default_db_url}):
Expand All @@ -425,11 +423,8 @@ def test_backfill_dataset_service_date_range(mock_backfill_datasets, mock_logger
assert status_code == 200


@patch("main.Logger", autospec=True)
@patch("main.backfill_datasets")
def test_backfill_dataset_service_date_range_error_raised(
mock_backfill_datasets, mock_logger
):
def test_backfill_dataset_service_date_range_error_raised(mock_backfill_datasets):
mock_backfill_datasets.side_effect = Exception("Mocked exception")

with patch.dict(os.environ, {"FEEDS_DATABASE_URL": default_db_url}):
Expand Down
2 changes: 1 addition & 1 deletion functions-python/helpers/feed_status.py
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,7 @@ def get_filters(status: str):
.update({Feed.status: status}, synchronize_session=False)
)
except Exception as e:
logging.error(f"Error updating feed statuses: {e}")
logging.error("Error updating feed statuses: %s", e)
raise Exception(f"Error updating feed statuses: {e}")

try:
Expand Down
12 changes: 9 additions & 3 deletions functions-python/tasks_executor/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -5,11 +5,17 @@ This directory contains Google Cloud Functions used as a single point of access
## Usage
The function receive the following payload:
```
{
{
"task": "string", # [required] Name of the task to execute
"payload": { } [optional] Payload to pass to the task
}
"payload": {
}
```

Example:
```json
{
"task": "rebuild_missing_validation_reports",
"payload": {
"dry_run": true,
"filter_after_in_days": 14,
"filter_statuses": ["active", "inactive", "future"]
Expand Down
4 changes: 3 additions & 1 deletion functions-python/tasks_executor/src/main.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,11 +18,13 @@

import flask
import functions_framework

from shared.helpers.logger import init_logger
from tasks.validation_reports.rebuild_missing_validation_reports import (
rebuild_missing_validation_reports_handler,
)


init_logger()
LIST_COMMAND: Final[str] = "list"
tasks = {
"list_tasks": {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,9 @@
This task generates the missing reports in the GTFS datasets.
The reports are generated using the _gtfs_validator_ GCP workflow.

## Task ID
Use task Id: `rebuild_missing_validation_reports`

## Usage
The function receive the following payload:
```
Expand All @@ -13,13 +16,13 @@ The function receive the following payload:
}
```
Example:
``
```
{
"dry_run": true,
"filter_after_in_days": 14,
"filter_statuses": ["active", "inactive", "future"]
}
`````
```

# GCP environment variables
The function uses the following environment variables:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,8 +30,6 @@
from shared.helpers.query_helper import get_datasets_with_missing_reports_query
from shared.helpers.validation_report.validation_report_update import execute_workflows

logging.basicConfig(level=logging.INFO)

QUERY_LIMIT: Final[int] = 100


Expand Down Expand Up @@ -130,7 +128,7 @@ def rebuild_missing_validation_reports(
if dry_run
else "Rebuild missing validation reports task executed successfully."
)
return {
result = {
"message": message,
"total_processed": total_processed,
"params": {
Expand All @@ -141,6 +139,8 @@ def rebuild_missing_validation_reports(
"validator_endpoint": validator_endpoint,
},
}
logging.info(result)
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Shouldn't the log give a bit more info, like what is this object it's printing?

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The record contains a "message" field used by GCP as a descriptor:
Screenshot 2025-05-27 at 1 57 45 PM

return result


def get_parameters(payload):
Expand Down
5 changes: 2 additions & 3 deletions functions-python/update_feed_status/src/main.py
Original file line number Diff line number Diff line change
@@ -1,17 +1,16 @@
import logging
import functions_framework
from shared.helpers.logger import Logger
from shared.helpers.logger import init_logger
from shared.helpers.feed_status import update_feed_statuses_query
from shared.database.database import with_db_session

logging.basicConfig(level=logging.INFO)
init_logger()


@with_db_session
@functions_framework.http
def update_feed_status(_, db_session):
"""Updates the Feed status based on the latets dataset service date range."""
Logger.init_logger()
try:
logging.info("Database session started.")
diff_counts = update_feed_statuses_query(db_session, [])
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -108,9 +108,8 @@ def test_update_feed_status_failed_query():
assert str(e) == "Error updating feed statuses: Mocked exception"


@patch("main.Logger", autospec=True)
@patch("main.update_feed_statuses_query")
def test_updated_feed_status(mock_update_query, mock_logger):
def test_updated_feed_status(mock_update_query):
return_value = {"active": 5}
mock_update_query.return_value = return_value

Expand All @@ -122,9 +121,8 @@ def test_updated_feed_status(mock_update_query, mock_logger):
assert status_code == 200


@patch("main.Logger", autospec=True)
@patch("main.update_feed_statuses_query")
def test_updated_feed_status_error_raised(mock_update_query, mock_logger):
def test_updated_feed_status_error_raised(mock_update_query):
mock_update_query.side_effect = Exception("Mocked exception")

with patch.dict(os.environ, {"FEEDS_DATABASE_URL": default_db_url}):
Expand Down
Loading