Skip to content

Commit c671e8c

Browse files
committed
pr comments
1 parent f1f331f commit c671e8c

3 files changed

Lines changed: 38 additions & 26 deletions

File tree

functions-python/helpers/utils.py

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,7 @@
1616
import hashlib
1717
import logging
1818
import os
19+
import ssl
1920

2021
import requests
2122
import urllib3
@@ -98,6 +99,7 @@ def download_and_get_hash(
9899
api_key_parameter_name=None,
99100
credentials=None,
100101
logger=None,
102+
trusted_certs=False, # If True, disables SSL verification
101103
):
102104
"""
103105
Downloads the content of a URL and stores it in a file and returns the hash of the file
@@ -128,6 +130,10 @@ def download_and_get_hash(
128130
if authentication_type == 2 and api_key_parameter_name and credentials:
129131
headers[api_key_parameter_name] = credentials
130132

133+
if trusted_certs:
134+
ctx.check_hostname = False
135+
ctx.verify_mode = ssl.CERT_NONE
136+
131137
with urllib3.PoolManager(ssl_context=ctx) as http:
132138
with http.request(
133139
"GET", url, preload_content=False, headers=headers, redirect=True

functions-python/tasks_executor/src/tasks/dataset_files/rebuild_missing_dataset_files.py

Lines changed: 26 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -1,20 +1,17 @@
1+
import json
12
import logging
23
import os
3-
import shutil
4-
import ssl
54
import tempfile
6-
import urllib.request
5+
import traceback
76
import uuid
87
import zipfile
98

109
from google.cloud import storage
10+
from sqlalchemy.orm import joinedload
1111

1212
from shared.database.database import with_db_session
1313
from shared.database_gen.sqlacodegen_models import Gtfsdataset, Gtfsfile
14-
from shared.helpers.utils import get_hash_from_file
15-
16-
# Disable SSL verification — trusted internal sources only
17-
ssl._create_default_https_context = ssl._create_unverified_context
14+
from shared.helpers.utils import get_hash_from_file, download_and_get_hash
1815

1916

2017
def rebuild_missing_dataset_files_handler(payload) -> dict:
@@ -56,6 +53,7 @@ def get_datasets_with_missing_files_query(db_session, after_date, latest_only):
5653
| Gtfsdataset.unzipped_size_bytes.is_(None)
5754
| ~Gtfsdataset.gtfsfiles.any()
5855
)
56+
.options(joinedload(Gtfsdataset.feed))
5957
)
6058

6159
if after_date:
@@ -67,29 +65,29 @@ def get_datasets_with_missing_files_query(db_session, after_date, latest_only):
6765
return query
6866

6967

70-
def download_to_file(url: str, local_path: str):
71-
"""
72-
Downloads a file from URL and writes it to local disk.
73-
"""
74-
with urllib.request.urlopen(url) as response, open(local_path, "wb") as out_file:
75-
shutil.copyfileobj(response, out_file)
76-
77-
78-
def process_dataset(dataset: Gtfsdataset):
68+
def process_dataset(dataset: Gtfsdataset, credentials=None):
7969
"""
8070
Downloads, extracts, uploads, and indexes files for a GTFS dataset.
8171
8272
Args:
8373
dataset (Gtfsdataset): The dataset to process.
74+
credentials (str): Optional credentials for authentication.
8475
"""
8576
hosted_url = dataset.hosted_url
8677
stable_id = dataset.stable_id
8778
logging.info("Processing dataset %s with URL %s", stable_id, hosted_url)
88-
bucket_name = os.environ["DATASETS_BUCKET_NAME"]
79+
bucket_name = os.getenv("DATASETS_BUCKET_NAME")
8980

9081
with tempfile.TemporaryDirectory() as tmp_dir:
9182
zip_path = os.path.join(tmp_dir, "dataset.zip")
92-
download_to_file(hosted_url, zip_path)
83+
download_and_get_hash(
84+
hosted_url,
85+
zip_path,
86+
authentication_type=dataset.feed.authentication_type,
87+
api_key_parameter_name=dataset.feed.api_key_parameter_name,
88+
credentials=credentials,
89+
trusted_certs=True,
90+
)
9391
dataset.zipped_size_bytes = os.path.getsize(zip_path)
9492

9593
with zipfile.ZipFile(zip_path, "r") as zip_ref:
@@ -110,7 +108,7 @@ def process_dataset(dataset: Gtfsdataset):
110108
file_path = os.path.join(root, file_name)
111109
# Only store files in GCS for latest datasets
112110
if dataset.latest:
113-
logging.info("Storing latest dataset files")
111+
logging.info("Storing latest dataset file %s", file_name)
114112
blob_path = f"{'-'.join(stable_id.split('-')[:2])}/{stable_id}/extracted/{file_name}"
115113
blob = bucket.blob(blob_path)
116114
blob.upload_from_filename(file_path)
@@ -182,10 +180,18 @@ def rebuild_missing_dataset_files(
182180
total_processed = 0
183181
count = 0
184182
batch_count = 5
183+
credentials = json.loads(os.getenv("FEEDS_CREDENTIALS", "{}"))
185184
logging.info("Starting to process datasets with missing files...")
186185

187186
for dataset in datasets.all():
188-
process_dataset(dataset)
187+
try:
188+
process_dataset(
189+
dataset, credentials=credentials.get(dataset.feed.stable_id)
190+
)
191+
except Exception:
192+
logging.error("Error processing dataset %s:", dataset.stable_id)
193+
traceback.print_exc()
194+
continue
189195
count += 1
190196
total_processed += 1
191197

functions-python/tasks_executor/tests/tasks/dataset_files/test_rebuild_missing_dataset_files.py

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -93,14 +93,14 @@ def test_rebuild_missing_dataset_files_processing(
9393
self.assertTrue(process_mock.called or response["total_processed"] == 0)
9494

9595
@patch("builtins.open", new_callable=mock_open)
96-
@patch("tasks.dataset_files.rebuild_missing_dataset_files.urllib.request.urlopen")
96+
@patch("tasks.dataset_files.rebuild_missing_dataset_files.download_and_get_hash")
9797
@patch("tasks.dataset_files.rebuild_missing_dataset_files.storage.Client")
9898
@patch("tasks.dataset_files.rebuild_missing_dataset_files.zipfile.ZipFile")
9999
@patch(
100100
"tasks.dataset_files.rebuild_missing_dataset_files.tempfile.TemporaryDirectory"
101101
)
102102
def test_process_dataset_latest(
103-
self, tmpdir_mock, zipfile_mock, storage_mock, urlopen_mock, open_mock
103+
self, tmpdir_mock, zipfile_mock, storage_mock, download_mock, open_mock
104104
):
105105
dataset = MagicMock()
106106
dataset.hosted_url = "http://example.com"
@@ -112,7 +112,7 @@ def test_process_dataset_latest(
112112

113113
mock_response = MagicMock()
114114
mock_response.read = MagicMock(side_effect=[b"zip content", b""])
115-
urlopen_mock.return_value.__enter__.return_value = mock_response
115+
download_mock.return_value.__enter__.return_value = mock_response
116116

117117
mock_file = MagicMock()
118118
mock_file.read.side_effect = [b"chunk1", b"chunk2", b""] # ends properly
@@ -137,14 +137,14 @@ def test_process_dataset_latest(
137137
mock_blob.make_public.assert_called_once()
138138

139139
@patch("builtins.open", new_callable=mock_open)
140-
@patch("tasks.dataset_files.rebuild_missing_dataset_files.urllib.request.urlopen")
140+
@patch("tasks.dataset_files.rebuild_missing_dataset_files.download_and_get_hash")
141141
@patch("tasks.dataset_files.rebuild_missing_dataset_files.storage.Client")
142142
@patch("tasks.dataset_files.rebuild_missing_dataset_files.zipfile.ZipFile")
143143
@patch(
144144
"tasks.dataset_files.rebuild_missing_dataset_files.tempfile.TemporaryDirectory"
145145
)
146146
def test_process_dataset_not_latest(
147-
self, tmpdir_mock, zipfile_mock, storage_mock, urlopen_mock, open_mock
147+
self, tmpdir_mock, zipfile_mock, storage_mock, download_mock, open_mock
148148
):
149149
dataset = MagicMock()
150150
dataset.hosted_url = "http://example.com"
@@ -156,7 +156,7 @@ def test_process_dataset_not_latest(
156156

157157
mock_response = MagicMock()
158158
mock_response.read = MagicMock(side_effect=[b"zip content", b""])
159-
urlopen_mock.return_value.__enter__.return_value = mock_response
159+
download_mock.return_value.__enter__.return_value = mock_response
160160

161161
mock_file = MagicMock()
162162
mock_file.read.side_effect = [b"chunk1", b"chunk2", b""] # ends properly

0 commit comments

Comments
 (0)