Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
70 commits
Select commit Hold shift + click to select a range
e85f2bb
added refresh materialized view cloud function
qcdyx Jul 8, 2025
1366ec3
added missing google_cloudfunctions2_function resource for refresh_ma…
qcdyx Jul 8, 2025
d9e5938
cloud task for refresh materialized view
qcdyx Jul 9, 2025
9171a54
reorganized terraform resources
qcdyx Jul 10, 2025
f12efcf
fixed broken terraform build
qcdyx Jul 10, 2025
85c0ea5
fixed requirements.txt
qcdyx Jul 10, 2025
46d5474
updated code
qcdyx Jul 10, 2025
f7d6cbd
fixed main.tf
qcdyx Jul 10, 2025
3958575
fixed main.tf
qcdyx Jul 10, 2025
c01b6bd
applied bouncing mechanism
qcdyx Jul 11, 2025
d81591c
fixed test
qcdyx Jul 11, 2025
1619824
improved refresh mv function
qcdyx Jul 14, 2025
e1dffc0
used 30 minute bounce window
qcdyx Jul 15, 2025
fc0180c
added google-cloud-tasks into requirements.txt
qcdyx Jul 15, 2025
a810e06
updated code
qcdyx Jul 15, 2025
8a47536
reversed changes
qcdyx Jul 15, 2025
124b62c
direct call to refresh materialized view gcp function
qcdyx Jul 15, 2025
990cee3
included the FUNCTION_URL_REFRESH_MV environment variable in the reve…
qcdyx Jul 16, 2025
4b22d7f
ensured the service account has the required IAM permissions to invo…
qcdyx Jul 16, 2025
2d5dca5
renamed cloud function
qcdyx Jul 16, 2025
bc20148
added FUNCTION_SIGNATURE_TYPE
qcdyx Jul 16, 2025
1352619
changed to FUNCTION_SIGNATURE_TYPE = "http"
qcdyx Jul 16, 2025
4a9c0da
corrected FUNCTION_URL_REFRESH_MV
qcdyx Jul 17, 2025
6f67847
updated url
qcdyx Jul 17, 2025
36cc336
Processor code must use ID token
qcdyx Jul 17, 2025
f46d0c5
updated code
qcdyx Jul 17, 2025
165ef3d
updated google-auth
qcdyx Jul 17, 2025
a641952
upgraded code
qcdyx Jul 17, 2025
51cf489
upgraded google-auth to 2.29.0 and force a rebuild in GCP
qcdyx Jul 17, 2025
f58c14d
differentiated 2 requests libraries
qcdyx Jul 18, 2025
06e5a32
added authentication into refresh function
qcdyx Jul 18, 2025
b175dbc
used http call to the refresh function in other GCP functions
qcdyx Jul 21, 2025
d6007d6
removed refresh function and used tasks executor
qcdyx Jul 22, 2025
3b2c5e3
updated main.tf
qcdyx Jul 23, 2025
8008e59
removed terraform script for refresh function
qcdyx Jul 23, 2025
56be0b7
added cloud tasks in requirements.txt
qcdyx Jul 23, 2025
07c09aa
fixed terraform error
qcdyx Jul 23, 2025
1886c5d
set "trigger_http": true
qcdyx Jul 23, 2025
8498b69
put back the required zip file artifact for tasks-executor
qcdyx Jul 23, 2025
d0b09c6
fixed terraform build failure
qcdyx Jul 23, 2025
0b4929e
fixed terraform script
qcdyx Jul 23, 2025
1c7b9ea
added cloud tasks queue for refresh materialized view
qcdyx Jul 23, 2025
b5630e3
put back GOOGLE_FUNCTION_SOURCE
qcdyx Jul 23, 2025
4418946
reverted some changes
qcdyx Jul 23, 2025
011e57a
fixed import error
qcdyx Jul 23, 2025
68ea8c2
added the new imports inside the create_refresh_materialized_view_tas…
qcdyx Jul 23, 2025
dcf5d95
updated code to use create_refresh_materialized_view()
qcdyx Jul 24, 2025
1e0377d
updated test mocks
qcdyx Jul 24, 2025
f5df6b8
Merge branch 'main' into 1172-refresh-feedsearch-view-asynchronically
qcdyx Jul 24, 2025
806261b
fixed lint errors
qcdyx Jul 24, 2025
7939f34
fixed lint errors
qcdyx Jul 24, 2025
98df589
cleaned up code
qcdyx Jul 28, 2025
81efd16
fixed lint errors
qcdyx Jul 28, 2025
f24d84c
fixed TypeError: not all arguments converted during string formatting
qcdyx Jul 28, 2025
c5b04f4
added google-cloud-tasks in update_feed_status
qcdyx Jul 28, 2025
10b4af6
resolved PR comments
qcdyx Jul 29, 2025
b97f5ae
removed unnecessary terraform changes
qcdyx Jul 29, 2025
a580abf
Update functions-python/tasks_executor/src/main.py
qcdyx Jul 29, 2025
336eaf3
resolved PR comments
qcdyx Jul 29, 2025
c88d705
Merge branch '1172-refresh-feedsearch-view-asynchronically' of github…
qcdyx Jul 29, 2025
8ba561d
renamed queue to MATERIALIZED_VIEW_QUEUE
qcdyx Jul 29, 2025
1391702
added create_http_task_with_deduplication function
qcdyx Jul 29, 2025
c457c13
reverted previous changes
qcdyx Jul 30, 2025
e308d12
add test_refresh_materialized_view.py
qcdyx Jul 30, 2025
b4dde33
fixed test
qcdyx Jul 30, 2025
13d9f8f
fixed patchig
qcdyx Jul 30, 2025
bb94fc6
code refactoring
qcdyx Aug 5, 2025
2845dda
managed imports
qcdyx Aug 5, 2025
18d92ab
fixed broken tests
qcdyx Aug 5, 2025
eaeaf1c
Merge branch 'main' into 1172-refresh-feedsearch-view-asynchronically
qcdyx Aug 6, 2025
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
100 changes: 100 additions & 0 deletions api/src/shared/common/gcp_utils.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,100 @@
import logging
import os
from google.cloud import tasks_v2
from google.protobuf.timestamp_pb2 import Timestamp


def create_refresh_materialized_view_task():
"""
Asynchronously refresh a materialized view.
Ensures deduplication by generating a unique task name.

Returns:
dict: Response message and status code.
"""
from google.protobuf import timestamp_pb2
from datetime import datetime, timedelta

try:
logging.info("Creating materialized view refresh task.")
now = datetime.now()

# BOUNCE WINDOW: next :00 or :30
minute = now.minute
if minute < 30:
bucket_time = now.replace(minute=30, second=0, microsecond=0)
else:
bucket_time = now.replace(minute=0, second=0, microsecond=0) + timedelta(hours=1)

timestamp_str = bucket_time.strftime("%Y-%m-%d-%H-%M")
task_name = f"refresh-materialized-view-{timestamp_str}"

# Convert to protobuf timestamp
proto_time = timestamp_pb2.Timestamp()
proto_time.FromDatetime(bucket_time)

# Cloud Tasks setup
client = tasks_v2.CloudTasksClient()
project = os.getenv("PROJECT_ID")
location = os.getenv("LOCATION")
queue = os.getenv("MATERIALIZED_VIEW_QUEUE")
url = (
f"https://{os.getenv('GCP_REGION')}-"
f"{os.getenv('PROJECT_ID')}.cloudfunctions.net/"
f"tasks-executor-{os.getenv('ENVIRONMENT_NAME')}"
)

task_name = client.task_path(project, location, queue, task_name)

# Enqueue the task
try:
create_http_task_with_name(
client=client,
body=b"",
url=url,
project_id=project,
gcp_region=location,
queue_name=queue,
task_name=task_name,
task_time=proto_time,
http_method=tasks_v2.HttpMethod.GET,
)
logging.info(f"Scheduled refresh materialized view task for {timestamp_str}")
return {"message": f"Refresh task for {timestamp_str} scheduled."}, 200
except Exception as e:
if "ALREADY_EXISTS" in str(e):
logging.info(f"Task already exists for {timestamp_str}, skipping.")

except Exception as error:
error_msg = f"Error enqueuing task: {error}"
logging.error(error_msg)
return {"error": error_msg}, 500


def create_http_task_with_name(
client: "tasks_v2.CloudTasksClient",
body: bytes,
url: str,
project_id: str,
gcp_region: str,
queue_name: str,
task_name: str,
task_time: Timestamp,
http_method: "tasks_v2.HttpMethod",
):
"""Creates a GCP Cloud Task."""

token = tasks_v2.OidcToken(service_account_email=os.getenv("SERVICE_ACCOUNT_EMAIL"))

task = tasks_v2.Task(
name=task_name,
schedule_time=task_time,
http_request=tasks_v2.HttpRequest(
url=url,
http_method=http_method,
oidc_token=token,
body=body,
headers={"Content-Type": "application/json"},
),
)
client.create_task(parent=client.queue_path(project_id, gcp_region, queue_name), task=task)
1 change: 1 addition & 0 deletions api/src/shared/database/database.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
from sqlalchemy.orm import sessionmaker
import logging


from shared.common.logging_utils import get_env_logging_level


Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ geoalchemy2==0.14.7
# Google specific packages for this function
cloudevents~=1.10.1
google-cloud-storage
google-cloud-tasks

# Configuration
python-dotenv==1.0.0
Original file line number Diff line number Diff line change
Expand Up @@ -2,9 +2,10 @@
import os
import functions_framework

from shared.common.gcp_utils import create_refresh_materialized_view_task
from shared.helpers.logger import init_logger

from shared.database.database import with_db_session, refresh_materialized_view
from shared.database.database import with_db_session

from sqlalchemy.orm import joinedload, Session
from sqlalchemy import or_, func
Expand All @@ -17,13 +18,12 @@
from shared.database_gen.sqlacodegen_models import (
Gtfsdataset,
Validationreport,
t_feedsearch,
)

import requests
import json

from google.cloud import storage
import requests

env = os.getenv("ENV", "dev").lower()
bucket_name = f"mobilitydata-datasets-{env}"
Expand Down Expand Up @@ -146,7 +146,7 @@ def backfill_datasets(session: "Session"):
try:
changes_count = 0
session.commit()
refresh_materialized_view(session, t_feedsearch.name)
create_refresh_materialized_view_task()
logging.info(f"{changes_count} elements committed.")
except Exception as e:
logging.error("Error committing changes:", e)
Expand Down
59 changes: 41 additions & 18 deletions functions-python/batch_process_dataset/README.md
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
# Bath Process Dataset

Subscribed to the topic set in the `batch-datasets` function, `batch-process-dataset` is triggered for each message published. It handles the processing of each feed individually, ensuring data consistency and integrity. The function performs the following operations:

1. **Download Data**: It retrieves the feed data from the provided URL.
Expand All @@ -8,34 +9,56 @@ Subscribed to the topic set in the `batch-datasets` function, `batch-process-dat

The URL format for accessing these datasets is standardized as `<bucket-url>/<feed_stable_id>/<dataset_id>.zip`, ensuring a consistent and predictable path for data retrieval.


# Message format

The function expects a Pub/Sub message with the following format:

```json
{
"message": {
"data":
{
"execution_id": "execution_id",
"producer_url": "producer_url",
"feed_stable_id": "feed_stable_id",
"feed_id": "feed_id",
"dataset_id": "dataset_id",
"dataset_hash": "dataset_hash",
"authentication_type": "authentication_type",
"authentication_info_url": "authentication_info_url",
"api_key_parameter_name": "api_key_parameter_name"
}
}
{
"message": {
"data": {
"execution_id": "execution_id",
"producer_url": "producer_url",
"feed_stable_id": "feed_stable_id",
"feed_id": "feed_id",
"dataset_id": "dataset_id",
"dataset_hash": "dataset_hash",
"authentication_type": "authentication_type",
"authentication_info_url": "authentication_info_url",
"api_key_parameter_name": "api_key_parameter_name"
}
}
}
```

# Example

```json
{
"message": {
"data": {
"execution_id": "JLU_20250721A",
"producer_url": "http://api.511.org/transit/datafeeds?operator_id=CE",
"feed_stable_id": "mdb-2684",
"feed_id": "2f5d7b4e-bb9b-49ae-a011-b61d7d9b53ff",
"dataset_id": null,
"dataset_hash": null,
"authentication_type": "1",
"authentication_info_url": "https://511.org/open-data/token",
"api_key_parameter_name": "api_key"
}
}
}
```

# Function configuration

The function is configured using the following environment variables:

- `DATASETS_BUCKET_NAME`: The name of the bucket where the datasets are stored.
- `FEEDS_DATABASE_URL`: The URL of the feeds database.
- `MAXIMUM_EXECUTIONS`: [Optional] The maximum number of executions per datasets. This controls the number of times a dataset can be processed per execution id. By default, is 1.


# Local development
The local development of this function follows the same steps as the other functions. Please refer to the [README.md](../README.md) file for more information.

The local development of this function follows the same steps as the other functions. Please refer to the [README.md](../README.md) file for more information.
1 change: 1 addition & 0 deletions functions-python/batch_process_dataset/requirements.txt
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ google-api-core
google-cloud-firestore
google-cloud-datastore
google-cloud-bigquery
google-cloud-tasks
cloudevents~=1.10.1

# Configuration
Expand Down
35 changes: 18 additions & 17 deletions functions-python/batch_process_dataset/src/main.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,9 +29,11 @@
from google.cloud import storage
from sqlalchemy import func

from shared.database_gen.sqlacodegen_models import Gtfsdataset, t_feedsearch, Gtfsfile
from shared.common.gcp_utils import create_refresh_materialized_view_task
from shared.database_gen.sqlacodegen_models import Gtfsdataset, Gtfsfile

from shared.dataset_service.main import DatasetTraceService, DatasetTrace, Status
from shared.database.database import with_db_session, refresh_materialized_view
from shared.database.database import with_db_session
import logging

from shared.helpers.logger import init_logger, get_logger
Expand Down Expand Up @@ -233,9 +235,11 @@ def upload_dataset(self, public=True) -> DatasetFile or None:
file_sha256_hash=file_sha256_hash,
hosted_url=f"{self.public_hosted_datasets_url}/{dataset_full_path}",
extracted_files=extracted_files,
zipped_size=os.path.getsize(temp_file_path)
if os.path.exists(temp_file_path)
else None,
zipped_size=(
os.path.getsize(temp_file_path)
if os.path.exists(temp_file_path)
else None
),
)

self.logger.info(
Expand Down Expand Up @@ -298,15 +302,15 @@ def create_dataset(self, dataset_file: DatasetFile, db_session: Session):
hash=dataset_file.file_sha256_hash,
downloaded_at=func.now(),
hosted_url=dataset_file.hosted_url,
gtfsfiles=dataset_file.extracted_files
if dataset_file.extracted_files
else [],
gtfsfiles=(
dataset_file.extracted_files if dataset_file.extracted_files else []
),
zipped_size_bytes=dataset_file.zipped_size,
unzipped_size_bytes=sum(
[ex.file_size_bytes for ex in dataset_file.extracted_files]
)
if dataset_file.extracted_files
else None,
unzipped_size_bytes=(
sum([ex.file_size_bytes for ex in dataset_file.extracted_files])
if dataset_file.extracted_files
else None
),
)
if latest_dataset:
latest_dataset.latest = False
Expand All @@ -315,10 +319,7 @@ def create_dataset(self, dataset_file: DatasetFile, db_session: Session):
db_session.commit()
self.logger.info(f"[{self.feed_stable_id}] Dataset created successfully.")

refresh_materialized_view(db_session, t_feedsearch.name)
self.logger.info(
f"[{self.feed_stable_id}] Materialized view refresh event triggered successfully."
)
create_refresh_materialized_view_task()
except Exception as e:
raise Exception(f"Error creating dataset: {e}")

Expand Down
10 changes: 3 additions & 7 deletions functions-python/helpers/feed_status.py
Original file line number Diff line number Diff line change
@@ -1,12 +1,12 @@
import logging
from datetime import datetime, timezone
from sqlalchemy import text
from shared.database_gen.sqlacodegen_models import Gtfsdataset, Feed, t_feedsearch
from shared.database.database import refresh_materialized_view
from shared.database_gen.sqlacodegen_models import Gtfsdataset, Feed
from typing import TYPE_CHECKING

if TYPE_CHECKING:
from sqlalchemy.orm import Session
from shared.common.gcp_utils import create_refresh_materialized_view_task


# query to update the status of the feeds based on the service date range of the latest dataset
Expand Down Expand Up @@ -74,14 +74,10 @@ def get_filters(status: str):
raise Exception(f"Error updating feed statuses: {e}")

try:
session.commit()
refresh_materialized_view(session, t_feedsearch.name)
create_refresh_materialized_view_task()
logging.info("Feed Database changes for status committed.")
logging.info("Status Changes: %s", diff_counts)
session.close()
return diff_counts
except Exception as e:
logging.error("Error committing changes:", e)
session.rollback()
session.close()
raise Exception(f"Error creating dataset: {e}")
29 changes: 15 additions & 14 deletions functions-python/helpers/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -151,20 +151,21 @@ def create_http_task(
gcp_region: str,
queue_name: str,
) -> None:
"""Creates a GCP Cloud Task."""
from shared.common.gcp_utils import create_http_task_with_name
from google.cloud import tasks_v2
from google.protobuf import timestamp_pb2

task = tasks_v2.Task(
http_request=tasks_v2.HttpRequest(
url=url,
http_method=tasks_v2.HttpMethod.POST,
oidc_token=tasks_v2.OidcToken(
service_account_email=os.getenv("SERVICE_ACCOUNT_EMAIL")
),
body=body,
headers={"Content-Type": "application/json"},
)
)
client.create_task(
parent=client.queue_path(project_id, gcp_region, queue_name), task=task
proto_time = timestamp_pb2.Timestamp()
proto_time.GetCurrentTime()

create_http_task_with_name(
client=client,
body=body,
url=url,
project_id=project_id,
gcp_region=gcp_region,
queue_name=queue_name,
task_name="task_name",
task_time=proto_time,
http_method=tasks_v2.HttpMethod.POST,
)
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ geoalchemy2==0.14.7

# Google specific packages for this function
cloudevents~=1.10.1
google-cloud-tasks

# Configuration
python-dotenv==1.0.0
1 change: 1 addition & 0 deletions functions-python/reverse_geolocation/requirements.txt
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ google-cloud-storage
google-cloud-tasks
google-cloud-datastore
google-cloud-pubsub
google-auth==2.29.0

# Additional packages for this function
pandas
Expand Down
Loading
Loading