Skip to content

Commit 5d37596

Browse files
committed
Upload files separately.
1 parent d13a22a commit 5d37596

1 file changed

Lines changed: 162 additions & 129 deletions

File tree

  • functions-python/batch_process_dataset/src

functions-python/batch_process_dataset/src/main.py

Lines changed: 162 additions & 129 deletions
Original file line numberDiff line numberDiff line change
@@ -146,65 +146,117 @@ def download_content(self, temporary_file_path, feed_id):
146146
is_zip = zipfile.is_zipfile(temporary_file_path)
147147
return file_hash, is_zip
148148

149-
def upload_files_to_storage(
149+
def upload_dataset_zip_to_storage(
150150
self,
151151
source_file_path,
152152
dataset_stable_id,
153-
extracted_files_path,
154153
public=True,
155-
skip_dataset_upload=False,
156154
):
157155
"""
158-
Uploads the dataset file and extracted files to GCP storage
156+
Uploads the dataset zip file to GCP storage as latest.zip and versioned zip.
159157
"""
160158
bucket = storage.Client().get_bucket(self.bucket_name)
159+
161160
target_paths = [
162161
f"{self.feed_stable_id}/latest.zip",
163162
f"{self.feed_stable_id}/{dataset_stable_id}/{dataset_stable_id}.zip",
164163
]
165164
blob = None
166-
if not skip_dataset_upload:
167-
for target_path in target_paths:
168-
blob = bucket.blob(target_path)
169-
blob.upload_from_filename(source_file_path)
170-
if public:
171-
blob.make_public()
172-
self.logger.info(f"Uploaded {blob.public_url}")
173-
174-
base_path, _ = os.path.splitext(source_file_path)
165+
for target_path in target_paths:
166+
blob = bucket.blob(target_path)
167+
blob.upload_from_filename(source_file_path)
168+
if public:
169+
blob.make_public()
170+
self.logger.info(f"Uploaded {blob.public_url}")
171+
return blob
172+
173+
def extract_and_upload_files_from_zip(
174+
self,
175+
zip_file_path: str,
176+
dataset_stable_id: str,
177+
public: bool = True,
178+
) -> List[Gtfsfile]:
179+
"""
180+
Extract files one at a time from a ZIP archive and upload each to GCS.
181+
This minimizes local disk usage by extracting and uploading one file at a time,
182+
then deleting the temporary extracted file before moving to the next.
183+
184+
:param zip_file_path: Path to the ZIP file
185+
:param dataset_stable_id: The dataset stable ID for the GCS path
186+
:param public: Whether to make the uploaded files public
187+
:return: List of Gtfsfile objects representing the extracted files
188+
"""
189+
if not zipfile.is_zipfile(zip_file_path):
190+
self.logger.error("The file %s is not a valid ZIP file.", zip_file_path)
191+
raise ValueError("File is not a valid ZIP file.")
192+
193+
bucket = storage.Client().get_bucket(self.bucket_name)
175194
extracted_files: List[Gtfsfile] = []
176-
if not extracted_files_path or not os.path.exists(extracted_files_path):
177-
self.logger.warning(
178-
"Extracted files path %s does not exist.", extracted_files_path
179-
)
180-
return blob, extracted_files
181-
self.logger.info("Processing extracted files from %s", extracted_files_path)
182-
for file_name in os.listdir(extracted_files_path):
183-
file_path = os.path.join(extracted_files_path, file_name)
184-
if os.path.isfile(file_path):
185-
file_blob = bucket.blob(
186-
f"{self.feed_stable_id}/{dataset_stable_id}/extracted/{file_name}"
195+
working_dir = os.getenv("WORKING_DIR", "/tmp/in-memory")
196+
197+
with zipfile.ZipFile(zip_file_path, "r") as zf:
198+
for member in zf.infolist():
199+
# Skip directories
200+
if member.is_dir():
201+
continue
202+
203+
# Extract a single file to a temporary path.
204+
# Use a unique filename with feed_stable_id and dataset_stable_id prefix to avoid collisions
205+
# when multiple datasets are processed concurrently. Replace '/' with '_' to flatten any
206+
# subdirectory structure from the ZIP into a single working directory.
207+
temp_extracted_path = os.path.join(
208+
working_dir,
209+
f"{self.feed_stable_id}-{dataset_stable_id}-{member.filename.replace('/', '_')}",
187210
)
188-
file_blob.upload_from_filename(file_path)
189-
if public:
190-
file_blob.make_public()
211+
191212
self.logger.info(
192-
"Uploaded extracted file %s to %s", file_name, file_blob.public_url
213+
"Extracting %s to %s", member.filename, temp_extracted_path
193214
)
194-
extracted_files.append(
195-
Gtfsfile(
196-
id=str(uuid.uuid4()),
197-
file_name=file_name,
198-
file_size_bytes=os.path.getsize(file_path),
199-
hosted_url=file_blob.public_url if public else None,
200-
hash=get_hash_from_file(file_path),
215+
with zf.open(member, "r") as src, open(
216+
temp_extracted_path, "wb"
217+
) as dst:
218+
shutil.copyfileobj(src, dst)
219+
220+
# Upload this single file to GCS under extracted/
221+
if os.path.isfile(temp_extracted_path):
222+
target_path = f"{self.feed_stable_id}/{dataset_stable_id}/extracted/{member.filename}"
223+
file_blob = bucket.blob(target_path)
224+
file_blob.upload_from_filename(temp_extracted_path)
225+
if public:
226+
file_blob.make_public()
227+
self.logger.info(
228+
"Uploaded extracted file %s to %s",
229+
member.filename,
230+
file_blob.public_url,
231+
)
232+
233+
extracted_files.append(
234+
Gtfsfile(
235+
id=str(uuid.uuid4()),
236+
file_name=member.filename,
237+
file_size_bytes=os.path.getsize(temp_extracted_path),
238+
hosted_url=file_blob.public_url if public else None,
239+
hash=get_hash_from_file(temp_extracted_path),
240+
)
241+
)
242+
243+
# Remove the local temporary extracted file to free disk space
244+
try:
245+
if os.path.exists(temp_extracted_path):
246+
os.remove(temp_extracted_path)
247+
except Exception as cleanup_err:
248+
self.logger.warning(
249+
"Failed to remove temporary file %s: %s",
250+
temp_extracted_path,
251+
cleanup_err,
201252
)
202-
)
203-
return blob, extracted_files
204253

205-
def upload_dataset(self, feed_id, public=True) -> DatasetFile or None:
254+
return extracted_files
255+
256+
def transfer_dataset(self, feed_id, public=True) -> DatasetFile or None:
206257
"""
207-
Uploads a dataset to a GCP bucket as <feed_stable_id>/latest.zip and
258+
Transfer a dataset from the provider url to the local disk then upload to the GCP bucket as
259+
<feed_stable_id>/latest.zip and
208260
<feed_stable_id>/<feed_stable_id>-<upload_datetime>.zip
209261
if the dataset hash is different from the latest dataset stored
210262
:return: the file hash and the hosted url as a tuple or None if no upload is required
@@ -229,24 +281,31 @@ def upload_dataset(self, feed_id, public=True) -> DatasetFile or None:
229281
f"[{self.feed_stable_id}] Dataset has changed (hash {self.latest_hash}"
230282
f"-> {file_sha256_hash}). Uploading new version."
231283
)
232-
extracted_files_path = self.unzip_files(temp_file_path)
233-
self.logger.info(
234-
f"Creating file {self.feed_stable_id}/latest.zip in bucket {self.bucket_name}"
235-
)
236284

237285
dataset_stable_id = self.create_dataset_stable_id(
238286
self.feed_stable_id, self.date
239287
)
240288
dataset_full_path = (
241289
f"{self.feed_stable_id}/{dataset_stable_id}/{dataset_stable_id}.zip"
242290
)
291+
292+
# Upload the zip file to GCS
293+
self.logger.info(
294+
f"Creating file {self.feed_stable_id}/latest.zip in bucket {self.bucket_name}"
295+
)
243296
self.logger.info(
244297
f"Creating file {dataset_full_path} in bucket {self.bucket_name}"
245298
)
246-
_, extracted_files = self.upload_files_to_storage(
299+
self.upload_dataset_zip_to_storage(
300+
temp_file_path,
301+
dataset_stable_id,
302+
public=public,
303+
)
304+
305+
# Extract and upload files one at a time to minimize disk usage
306+
extracted_files = self.extract_and_upload_files_from_zip(
247307
temp_file_path,
248308
dataset_stable_id,
249-
extracted_files_path,
250309
public=public,
251310
)
252311

@@ -266,6 +325,9 @@ def upload_dataset(self, feed_id, public=True) -> DatasetFile or None:
266325
f"[{self.feed_stable_id}] Datasets hash has not changed (hash {self.latest_hash} "
267326
f"-> {file_sha256_hash}). Not uploading it."
268327
)
328+
except Exception as e:
329+
self.logger.error(f"Error transferring dataset: {e}")
330+
raise e
269331
finally:
270332
if temp_file_path and os.path.exists(temp_file_path):
271333
os.remove(temp_file_path)
@@ -291,70 +353,12 @@ def process_from_bucket(self, db_session, public=True) -> Optional[DatasetFile]:
291353
os.getenv("DATASETS_BUCKET_NAME"), blob_file_path, temp_zip_path
292354
)
293355

294-
# Stream files from ZIP to GCS one by one to minimize disk usage
295-
bucket = storage.Client().get_bucket(self.bucket_name)
296-
extracted_files: List[Gtfsfile] = []
297-
working_dir = os.getenv("WORKING_DIR", "/in-memory")
298-
299-
if not zipfile.is_zipfile(temp_zip_path):
300-
self.logger.error(
301-
"The downloaded file %s is not a valid ZIP file.", temp_zip_path
302-
)
303-
raise ValueError("Downloaded dataset is not a valid ZIP file.")
304-
305-
with zipfile.ZipFile(temp_zip_path, "r") as zf:
306-
for member in zf.infolist():
307-
# Skip directories
308-
if member.is_dir():
309-
continue
310-
311-
# Extract a single file to a temporary path
312-
temp_extracted_path = os.path.join(
313-
working_dir,
314-
f"{self.feed_stable_id}-{self.dataset_stable_id}-{member.filename.replace('/', '_')}",
315-
)
316-
317-
self.logger.info(
318-
"Extracting %s to %s", member.filename, temp_extracted_path
319-
)
320-
with zf.open(member, "r") as src, open(
321-
temp_extracted_path, "wb"
322-
) as dst:
323-
shutil.copyfileobj(src, dst)
324-
325-
# Upload this single file to GCS under extracted/
326-
if os.path.isfile(temp_extracted_path):
327-
target_path = f"{self.feed_stable_id}/{self.dataset_stable_id}/extracted/{member.filename}"
328-
file_blob = bucket.blob(target_path)
329-
file_blob.upload_from_filename(temp_extracted_path)
330-
if public:
331-
file_blob.make_public()
332-
self.logger.info(
333-
"Uploaded extracted file %s to %s",
334-
member.filename,
335-
file_blob.public_url,
336-
)
337-
338-
extracted_files.append(
339-
Gtfsfile(
340-
id=str(uuid.uuid4()),
341-
file_name=member.filename,
342-
file_size_bytes=os.path.getsize(temp_extracted_path),
343-
hosted_url=file_blob.public_url if public else None,
344-
hash=get_hash_from_file(temp_extracted_path),
345-
)
346-
)
347-
348-
# Remove the local temporary extracted file to free disk space
349-
try:
350-
if os.path.exists(temp_extracted_path):
351-
os.remove(temp_extracted_path)
352-
except Exception as cleanup_err:
353-
self.logger.warning(
354-
"Failed to remove temporary file %s: %s",
355-
temp_extracted_path,
356-
cleanup_err,
357-
)
356+
# Extract and upload files one at a time to minimize disk usage
357+
extracted_files = self.extract_and_upload_files_from_zip(
358+
temp_zip_path,
359+
self.dataset_stable_id,
360+
public=public,
361+
)
358362

359363
dataset_file = DatasetFile(
360364
stable_id=self.dataset_stable_id,
@@ -388,23 +392,11 @@ def process_from_bucket(self, db_session, public=True) -> Optional[DatasetFile]:
388392
if temp_zip_path and os.path.exists(temp_zip_path):
389393
os.remove(temp_zip_path)
390394

391-
def unzip_files(self, temp_file_path):
392-
extracted_files_path = os.path.join(temp_file_path.split(".")[0], "extracted")
393-
self.logger.info(f"Unzipping files to {extracted_files_path}")
394-
# Create the directory for extracted files if it does not exist
395-
os.makedirs(extracted_files_path, exist_ok=True)
396-
with zipfile.ZipFile(temp_file_path, "r") as zip_ref:
397-
zip_ref.extractall(path=extracted_files_path)
398-
# List all files in the extracted directory
399-
extracted_files = os.listdir(extracted_files_path)
400-
self.logger.info(f"Extracted files: {extracted_files}")
401-
return extracted_files_path
402-
403395
def generate_temp_filename(self):
404396
"""
405397
Generates a temporary filename
406398
"""
407-
working_dir = os.getenv("WORKING_DIR", "/in-memory")
399+
working_dir = os.getenv("WORKING_DIR", "/tmp/in-memory")
408400
temporary_file_path = (
409401
f"{working_dir}/{self.feed_stable_id}-{random.randint(0, 1000000)}.zip"
410402
)
@@ -497,7 +489,7 @@ def process_from_producer_url(
497489
Process the dataset and store new version in GCP bucket if any changes are detected
498490
:return: the DatasetFile object created
499491
"""
500-
dataset_file = self.upload_dataset(feed_id)
492+
dataset_file = self.transfer_dataset(feed_id)
501493

502494
if dataset_file is None:
503495
self.logger.info(f"[{self.feed_stable_id}] No database update required.")
@@ -572,19 +564,22 @@ def process_dataset(cloud_event: CloudEvent):
572564
return
573565

574566
try:
575-
maximum_executions = os.getenv("MAXIMUM_EXECUTIONS", 1)
567+
try:
568+
maximum_executions = int(os.getenv("MAXIMUM_EXECUTIONS", "1"))
569+
except (ValueError, TypeError):
570+
maximum_executions = 1
576571
public_hosted_datasets_url = os.getenv("PUBLIC_HOSTED_DATASETS_URL")
577572
trace_service = None
578573
dataset_file: DatasetFile = None
579574
error_message = None
580-
# Extract data from message
581-
data = base64.b64decode(cloud_event.data["message"]["data"]).decode()
582-
json_payload = json.loads(data)
583-
stable_id = json_payload["feed_stable_id"]
575+
# # Extract data from message
576+
# data = base64.b64decode(cloud_event.data["message"]["data"]).decode()
577+
# json_payload = json.loads(data)
578+
# stable_id = json_payload["feed_stable_id"]
584579
logger = get_logger("process_dataset", stable_id)
585580
logger.info(f"JSON Payload: {json.dumps(json_payload)}")
586581

587-
execution_id = json_payload["execution_id"]
582+
# execution_id = json_payload["execution_id"]
588583
trace_service = DatasetTraceService()
589584
trace = trace_service.get_by_execution_and_stable_ids(execution_id, stable_id)
590585
logger.info(f"Dataset trace: {trace}")
@@ -653,3 +648,41 @@ def process_dataset(cloud_event: CloudEvent):
653648
"successfully completed" if not error_message else "Failed",
654649
)
655650
return "Completed." if error_message is None else error_message
651+
652+
653+
# @functions_framework.http
654+
def simulate(request) -> dict:
655+
"""HTTP endpoint to simulate a process_dataset call for testing."""
656+
# Hardcoded test values
657+
payload = {
658+
"execution_id": "manual-test-001",
659+
"producer_url": "",
660+
"feed_stable_id": "mta-test",
661+
"feed_id": "test-feed-id-123",
662+
"dataset_hash": "",
663+
"authentication_type": 0,
664+
"authentication_info_url": "",
665+
"api_key_parameter_name": "",
666+
}
667+
668+
# Create CloudEvent
669+
encoded_data = base64.b64encode(json.dumps(payload).encode()).decode()
670+
attributes = {
671+
"type": "google.cloud.pubsub.topic.v1.messagePublished",
672+
"source": "//pubsub.googleapis.com/test",
673+
"specversion": "1.0",
674+
}
675+
data = {"message": {"data": encoded_data}}
676+
cloud_event = CloudEvent(attributes, data)
677+
678+
# Call process_dataset
679+
process_dataset(cloud_event)
680+
return {"status": "completed"}
681+
682+
683+
def main(): # pragma: no cover
684+
simulate(None)
685+
686+
687+
if __name__ == "__main__":
688+
main()

0 commit comments

Comments
 (0)