2323SIGNED_URL_EXPIRY_MIN = 30
2424
2525
26+ def _unlink_if_exists (path : Optional [str ]) -> None :
27+ """Remove a file if path is set; ignore missing file or permission errors."""
28+ if path is None :
29+ return
30+ try :
31+ os .unlink (path )
32+ except OSError :
33+ pass
34+
35+
36+ def _download_blob_to_temp_csv_path (blob : Any , file_name : str ) -> str :
37+ """
38+ Stream GCS blob to a private temp CSV path for validation.
39+
40+ Raises:
41+ OSError: If download fails (after logging errno/context). Temp file is removed.
42+ """
43+ fd , csv_path = tempfile .mkstemp (suffix = ".csv" , prefix = "validate_upload_" )
44+ os .close (fd )
45+ try :
46+ blob .download_to_filename (csv_path )
47+ except OSError as e :
48+ logger .error (
49+ "GCS download_to_filename failed for %r temp_path=%r errno=%s strerror=%s" ,
50+ file_name ,
51+ csv_path ,
52+ e .errno ,
53+ e .strerror ,
54+ exc_info = True ,
55+ )
56+ _unlink_if_exists (csv_path )
57+ raise
58+ return csv_path
59+
60+
2661def rename_file (
2762 bucket_name : str ,
2863 file_name : str ,
@@ -424,24 +459,11 @@ def _run_validation_and_get_normalized_df(
424459 institution_identifier : Optional [str ],
425460 ) -> tuple [List [str ], Any ]:
426461 """Run validation on blob content; return inferred schema names and normalized DataFrame."""
427- tmp_path : Optional [str ] = None
462+ local_csv_path : Optional [str ] = None
428463 try :
429- fd , tmp_path = tempfile .mkstemp (suffix = ".csv" , prefix = "validate_upload_" )
430- os .close (fd )
431- try :
432- blob .download_to_filename (tmp_path )
433- except OSError as e :
434- logger .error (
435- "GCS download_to_filename failed for %r temp_path=%r errno=%s strerror=%s" ,
436- file_name ,
437- tmp_path ,
438- e .errno ,
439- e .strerror ,
440- exc_info = True ,
441- )
442- raise
464+ local_csv_path = _download_blob_to_temp_csv_path (blob , file_name )
443465 result = validate_file_reader (
444- tmp_path ,
466+ local_csv_path ,
445467 allowed_schemas ,
446468 base_schema ,
447469 inst_schema ,
@@ -463,22 +485,18 @@ def _run_validation_and_get_normalized_df(
463485 logging .exception ("Validation failed for %s: %s" , file_name , e )
464486 raise
465487 finally :
466- if tmp_path is not None :
467- try :
468- os .unlink (tmp_path )
469- except OSError :
470- pass
488+ _unlink_if_exists (local_csv_path )
471489
472490 def _write_dataframe_to_gcs_as_csv (
473491 self , bucket : Any , blob_name : str , normalized_df : pd .DataFrame
474492 ) -> None :
475493 """Write a DataFrame to GCS as UTF-8 CSV. Used for validated/ output."""
476- fd , tmp_path = tempfile .mkstemp (suffix = ".csv" , prefix = "validated_out_" )
494+ fd , local_csv_path = tempfile .mkstemp (suffix = ".csv" , prefix = "validated_out_" )
477495 os .close (fd )
478496 try :
479497 try :
480498 normalized_df .to_csv (
481- tmp_path ,
499+ local_csv_path ,
482500 index = False ,
483501 encoding = "utf-8" ,
484502 lineterminator = "\n " ,
@@ -487,22 +505,19 @@ def _write_dataframe_to_gcs_as_csv(
487505 logger .error (
488506 "to_csv failed for validated blob %r temp_path=%r errno=%s strerror=%s" ,
489507 blob_name ,
490- tmp_path ,
508+ local_csv_path ,
491509 e .errno ,
492510 e .strerror ,
493511 exc_info = True ,
494512 )
495513 raise
496514 blob = bucket .blob (blob_name )
497515 blob .upload_from_filename (
498- tmp_path ,
516+ local_csv_path ,
499517 content_type = "text/csv; charset=utf-8" ,
500518 )
501519 finally :
502- try :
503- os .unlink (tmp_path )
504- except OSError :
505- pass
520+ _unlink_if_exists (local_csv_path )
506521
507522 def get_file_contents (self , bucket_name : str , file_name : str ) -> Any :
508523 """Returns a file as a bytes object."""
0 commit comments