Skip to content

Commit da02282

Browse files
committed
Added a way to process a specific dataset to rebuild_missing_dataset_files + corrected a bug.
1 parent 3457d0f commit da02282

2 files changed

Lines changed: 39 additions & 14 deletions

File tree

functions-python/tasks_executor/src/tasks/dataset_files/README.md

Lines changed: 12 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,8 @@ The function accepts the following payload:
1919
{
2020
"dry_run": true, // [optional] If true, do not upload or modify the database (default: true)
2121
"after_date": "YYYY-MM-DD", // [optional] Only include datasets downloaded after this ISO date
22-
"latest_only": true // [optional] If true, only process the latest version of each dataset (default: true)
22+
"latest_only": true, // [optional] If true, only process the latest version of each dataset (default: true)
23+
"dataset_id": id // [optional] If provided, only process the specified dataset. It will supersede the after_date and latest_only parameters.
2324
}
2425
```
2526

@@ -32,7 +33,13 @@ The function accepts the following payload:
3233
"latest_only": true
3334
}
3435
```
35-
36+
or
37+
```json
38+
{
39+
"dry_run": false,
40+
"dataset_id": "mdb-1147-202407031702"
41+
}
42+
```
3643
---
3744

3845
## What It Does
@@ -52,6 +59,9 @@ For each GTFS dataset with missing file information (missing zipped/unzipped siz
5259
7. Computes SHA256 hashes for each file
5360
8. Stores metadata in the `Gtfsfile` table for later use
5461

62+
If the `dataset_id` parameter is provided, the process is a bit simplified. It does not download the dataset as it is
63+
assumed the dataset is already present in the bucket. The rest of the processing is the same.
64+
5565
---
5666

5767
## GCP Environment Variables

functions-python/tasks_executor/src/tasks/dataset_files/rebuild_missing_dataset_files.py

Lines changed: 27 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -23,9 +23,13 @@ def rebuild_missing_dataset_files_handler(payload) -> dict:
2323
dry_run = payload.get("dry_run", True)
2424
after_date = payload.get("after_date", None)
2525
latest_only = payload.get("latest_only", True)
26+
dataset_id = payload.get("dataset_id", None)
2627

2728
return rebuild_missing_dataset_files(
28-
dry_run=dry_run, after_date=after_date, latest_only=latest_only
29+
dry_run=dry_run,
30+
after_date=after_date,
31+
latest_only=latest_only,
32+
dataset_id=dataset_id,
2933
)
3034

3135

@@ -67,6 +71,7 @@ def rebuild_missing_dataset_files(
6771
dry_run: bool = True,
6872
after_date: str = None,
6973
latest_only: bool = True,
74+
dataset_id: str = None,
7075
) -> dict:
7176
"""
7277
Processes GTFS datasets missing extracted files and updates database.
@@ -76,13 +81,22 @@ def rebuild_missing_dataset_files(
7681
dry_run (bool): If True, only logs how many would be processed.
7782
after_date (str): Only consider datasets downloaded after this ISO date.
7883
latest_only (bool): Whether to include only latest datasets.
84+
dataset_id (str | None): If provided, only process the dataset with this stable id.
7985
8086
Returns:
8187
dict: Result summary.
8288
"""
83-
datasets = get_datasets_with_missing_files_query(
84-
db_session, after_date=after_date, latest_only=latest_only
85-
)
89+
90+
if dataset_id:
91+
datasets = (
92+
db_session.query(Gtfsdataset)
93+
.filter(Gtfsdataset.stable_id == dataset_id)
94+
.options(joinedload(Gtfsdataset.feed))
95+
)
96+
else:
97+
datasets = get_datasets_with_missing_files_query(
98+
db_session, after_date=after_date, latest_only=latest_only
99+
)
86100

87101
if dry_run:
88102
total = datasets.count()
@@ -102,6 +116,9 @@ def rebuild_missing_dataset_files(
102116
logging.info("Starting to process datasets with missing files...")
103117
execution_id = f"task-executor-uuid-{uuid.uuid4()}"
104118
messages = []
119+
all_datasets_count = datasets.count()
120+
topic = (os.getenv("DATASET_PROCESSING_TOPIC_NAME"),)
121+
105122
for dataset in datasets.all():
106123
try:
107124
message = {
@@ -124,16 +141,13 @@ def rebuild_missing_dataset_files(
124141
count += 1
125142
total_processed += 1
126143

127-
if count % batch_count == 0:
128-
publish_messages(
129-
messages,
130-
os.getenv("PROJECT_ID"),
131-
os.getenv("DATASET_PROCESSING_TOPIC_NAME"),
132-
)
144+
if count % batch_count == 0 or all_datasets_count == count:
145+
publish_messages(messages, os.getenv("PROJECT_ID"), topic)
133146
messages = []
134147
logging.info(
135-
"Published message for %d datasets. Total processed: %d",
136-
batch_count,
148+
"Published message to topic %s for %d datasets. Total processed: %d",
149+
topic,
150+
batch_count if count % batch_count == 0 else all_datasets_count - count,
137151
total_processed,
138152
)
139153

@@ -147,6 +161,7 @@ def rebuild_missing_dataset_files(
147161
"after_date": after_date,
148162
"latest_only": latest_only,
149163
"datasets_bucket_name": os.environ.get("DATASETS_BUCKET_NAME"),
164+
"dataset_id": dataset_id,
150165
},
151166
}
152167
logging.info("Task summary: %s", result)

0 commit comments

Comments
 (0)