diff --git a/src/webapp/gcsutil.py b/src/webapp/gcsutil.py index 7248bdf1..70665ae9 100644 --- a/src/webapp/gcsutil.py +++ b/src/webapp/gcsutil.py @@ -1,8 +1,9 @@ """Cloud storage related helper functions.""" import datetime -import io import logging +import os +import tempfile from typing import Any, Dict, List, Optional import pandas as pd @@ -22,6 +23,41 @@ SIGNED_URL_EXPIRY_MIN = 30 +def _unlink_if_exists(path: Optional[str]) -> None: + """Remove a file if path is set; ignore missing file or permission errors.""" + if path is None: + return + try: + os.unlink(path) + except OSError: + pass + + +def _download_blob_to_temp_csv_path(blob: Any, file_name: str) -> str: + """ + Stream GCS blob to a private temp CSV path for validation. + + Raises: + OSError: If download fails (after logging errno/context). Temp file is removed. + """ + fd, csv_path = tempfile.mkstemp(suffix=".csv", prefix="validate_upload_") + os.close(fd) + try: + blob.download_to_filename(csv_path) + except OSError as e: + logger.error( + "GCS download_to_filename failed for %r temp_path=%r errno=%s strerror=%s", + file_name, + csv_path, + e.errno, + e.strerror, + exc_info=True, + ) + _unlink_if_exists(csv_path) + raise + return csv_path + + def rename_file( bucket_name: str, file_name: str, @@ -423,16 +459,17 @@ def _run_validation_and_get_normalized_df( institution_identifier: Optional[str], ) -> tuple[List[str], Any]: """Run validation on blob content; return inferred schema names and normalized DataFrame.""" + local_csv_path: Optional[str] = None try: - with blob.open("r") as file: - result = validate_file_reader( - file, - allowed_schemas, - base_schema, - inst_schema, - institution_id=institution_id, - institution_identifier=institution_identifier, - ) + local_csv_path = _download_blob_to_temp_csv_path(blob, file_name) + result = validate_file_reader( + local_csv_path, + allowed_schemas, + base_schema, + inst_schema, + institution_id=institution_id, + institution_identifier=institution_identifier, + ) inferred_schema_names = [str(s) for s in result.get("schemas", [])] logging.debug( "Validation successful for %s: %s", file_name, inferred_schema_names @@ -447,20 +484,40 @@ def _run_validation_and_get_normalized_df( # Log any other error with context before re-raising (no silent failures). logging.exception("Validation failed for %s: %s", file_name, e) raise + finally: + _unlink_if_exists(local_csv_path) def _write_dataframe_to_gcs_as_csv( self, bucket: Any, blob_name: str, normalized_df: pd.DataFrame ) -> None: """Write a DataFrame to GCS as UTF-8 CSV. Used for validated/ output.""" - csv_buffer = io.StringIO() - normalized_df.to_csv( - csv_buffer, index=False, encoding="utf-8", lineterminator="\n" - ) - blob = bucket.blob(blob_name) - blob.upload_from_string( - csv_buffer.getvalue().encode("utf-8"), - content_type="text/csv; charset=utf-8", - ) + fd, local_csv_path = tempfile.mkstemp(suffix=".csv", prefix="validated_out_") + os.close(fd) + try: + try: + normalized_df.to_csv( + local_csv_path, + index=False, + encoding="utf-8", + lineterminator="\n", + ) + except OSError as e: + logger.error( + "to_csv failed for validated blob %r temp_path=%r errno=%s strerror=%s", + blob_name, + local_csv_path, + e.errno, + e.strerror, + exc_info=True, + ) + raise + blob = bucket.blob(blob_name) + blob.upload_from_filename( + local_csv_path, + content_type="text/csv; charset=utf-8", + ) + finally: + _unlink_if_exists(local_csv_path) def get_file_contents(self, bucket_name: str, file_name: str) -> Any: """Returns a file as a bytes object.""" diff --git a/src/webapp/gcsutil_test.py b/src/webapp/gcsutil_test.py index 107a1846..df1d4ee6 100644 --- a/src/webapp/gcsutil_test.py +++ b/src/webapp/gcsutil_test.py @@ -1,6 +1,8 @@ """Tests for gcsutil.StorageControl validation and normalized/raw archive flow.""" -import io +import errno +import os +import tempfile from typing import Any from unittest.mock import MagicMock, patch @@ -182,6 +184,14 @@ def blob_side_effect(name: str) -> Any: mock_client.bucket.return_value = mock_bucket small_df = pd.DataFrame({"col_a": [1, 2], "col_b": ["x", "y"]}) + uploaded_chunks: list[bytes] = [] + + def capture_validated_upload(path: str, **kwargs: Any) -> None: + with open(path, "rb") as f: + uploaded_chunks.append(f.read()) + + mock_validated_blob.upload_from_filename.side_effect = capture_validated_upload + control = StorageControl() with patch("src.webapp.gcsutil.storage.Client", return_value=mock_client): with patch.object( @@ -201,13 +211,14 @@ def blob_side_effect(name: str) -> Any: mock_unvalidated_blob, mock_bucket, "raw/cohort.csv" ) mock_unvalidated_blob.delete.assert_called_once() - # _write_dataframe_to_gcs_as_csv is called; it does bucket.blob(validated_blob_name).upload_from_string + # _write_dataframe_to_gcs_as_csv uploads from a temp file via upload_from_filename assert mock_bucket.blob.call_count >= 2 - mock_validated_blob.upload_from_string.assert_called_once() - call_args = mock_validated_blob.upload_from_string.call_args - assert call_args.kwargs["content_type"] == "text/csv; charset=utf-8" - uploaded = call_args.args[0] - assert isinstance(uploaded, bytes) + mock_validated_blob.upload_from_filename.assert_called_once() + assert mock_validated_blob.upload_from_filename.call_args.kwargs[ + "content_type" + ] == ("text/csv; charset=utf-8") + assert len(uploaded_chunks) == 1 + uploaded = uploaded_chunks[0] assert b"col_a,col_b" in uploaded assert b"1,x" in uploaded @@ -251,9 +262,12 @@ def test_validate_file_propagates_hard_validation_error() -> None: def test_run_validation_and_get_normalized_df_returns_names_and_df() -> None: """Returns (inferred_schema_names, normalized_df) when validation succeeds.""" mock_blob = MagicMock() - mock_file = io.StringIO("foo_col,bar_col\n1,a\n2,b\n") - mock_blob.open.return_value.__enter__ = lambda self: mock_file - mock_blob.open.return_value.__exit__ = lambda self, *args: None + + def download_to_path(path: str) -> None: + with open(path, "w", encoding="utf-8", newline="") as f: + f.write("foo_col,bar_col\n1,a\n2,b\n") + + mock_blob.download_to_filename.side_effect = download_to_path control = StorageControl() with patch("src.webapp.gcsutil.validate_file_reader") as mock_validate: @@ -281,9 +295,7 @@ def test_run_validation_and_get_normalized_df_propagates_hard_validation_error() ): """HardValidationError is re-raised without wrapping.""" mock_blob = MagicMock() - mock_file = io.StringIO("bad") - mock_blob.open.return_value.__enter__ = lambda self: mock_file - mock_blob.open.return_value.__exit__ = lambda self, *args: None + mock_blob.download_to_filename.side_effect = lambda p: open(p, "w").close() control = StorageControl() with patch( @@ -298,9 +310,7 @@ def test_run_validation_and_get_normalized_df_propagates_hard_validation_error() def test_run_validation_and_get_normalized_df_propagates_value_error() -> None: """ValueError from validate_file_reader (e.g. encoding) is re-raised.""" mock_blob = MagicMock() - mock_file = io.StringIO("data") - mock_blob.open.return_value.__enter__ = lambda self: mock_file - mock_blob.open.return_value.__exit__ = lambda self, *args: None + mock_blob.download_to_filename.side_effect = lambda p: open(p, "w").close() control = StorageControl() with patch( @@ -316,9 +326,7 @@ def test_run_validation_and_get_normalized_df_propagates_value_error() -> None: def test_run_validation_and_get_normalized_df_propagates_unicode_error() -> None: """UnicodeError from validate_file_reader (e.g. decode) is re-raised.""" mock_blob = MagicMock() - mock_file = io.StringIO("data") - mock_blob.open.return_value.__enter__ = lambda self: mock_file - mock_blob.open.return_value.__exit__ = lambda self, *args: None + mock_blob.download_to_filename.side_effect = lambda p: open(p, "w").close() control = StorageControl() with patch( @@ -343,13 +351,123 @@ def test_write_dataframe_to_gcs_as_csv_uploads_utf8_csv() -> None: mock_bucket.blob.return_value = mock_blob df = pd.DataFrame({"A": [1, 2], "B": ["a", "b"]}) + + def assert_csv_at_path(path: str, **kwargs: Any) -> None: + with open(path, "r", encoding="utf-8") as f: + assert f.read().strip() == "A,B\n1,a\n2,b" + + mock_blob.upload_from_filename.side_effect = assert_csv_at_path + control = StorageControl() control._write_dataframe_to_gcs_as_csv(mock_bucket, "validated/out.csv", df) mock_bucket.blob.assert_called_once_with("validated/out.csv") - mock_blob.upload_from_string.assert_called_once() - call_args = mock_blob.upload_from_string.call_args - assert call_args.kwargs["content_type"] == "text/csv; charset=utf-8" - payload = call_args.args[0] - assert isinstance(payload, bytes) - assert payload.decode("utf-8").strip() == "A,B\n1,a\n2,b" + mock_blob.upload_from_filename.assert_called_once() + assert mock_blob.upload_from_filename.call_args.kwargs["content_type"] == ( + "text/csv; charset=utf-8" + ) + + +def test_run_validation_download_oserror_unlinks_temp_and_skips_validate() -> None: + """If GCS download fails, temp file is removed and validate_file_reader is not run.""" + fd, real_path = tempfile.mkstemp(suffix=".csv", prefix="test_dl_oserr_") + mock_blob = MagicMock() + mock_blob.download_to_filename.side_effect = OSError( + errno.ENOSPC, "No space left on device" + ) + + control = StorageControl() + with patch("src.webapp.gcsutil.tempfile.mkstemp", return_value=(fd, real_path)): + with patch("src.webapp.gcsutil.validate_file_reader") as mock_validate: + with pytest.raises(OSError, match="No space left"): + control._run_validation_and_get_normalized_df( + mock_blob, + "school_course.csv", + ["STUDENT"], + {}, + None, + "pdp", + None, + ) + mock_validate.assert_not_called() + + assert not os.path.exists(real_path) + mock_blob.download_to_filename.assert_called_once_with(real_path) + + +def test_run_validation_download_oserror_logs_errno() -> None: + """OSError from download_to_filename is logged with errno before re-raise.""" + fd, real_path = tempfile.mkstemp(suffix=".csv", prefix="test_dl_log_") + mock_blob = MagicMock() + mock_blob.download_to_filename.side_effect = OSError( + errno.ENOSPC, "No space left on device" + ) + + control = StorageControl() + with patch("src.webapp.gcsutil.tempfile.mkstemp", return_value=(fd, real_path)): + with patch("src.webapp.gcsutil.logger") as mock_logger: + with pytest.raises(OSError): + control._run_validation_and_get_normalized_df( + mock_blob, + "f.csv", + ["STUDENT"], + {}, + None, + "pdp", + None, + ) + mock_logger.error.assert_called_once() + msg = mock_logger.error.call_args[0][0] + assert "download_to_filename failed" in msg + assert mock_logger.error.call_args[0][3] == errno.ENOSPC + + assert not os.path.exists(real_path) + + +def test_write_dataframe_to_csv_oserror_unlinks_temp() -> None: + """If to_csv fails (e.g. disk full), temp file is removed and upload is not attempted.""" + fd, real_path = tempfile.mkstemp(suffix=".csv", prefix="test_csv_oserr_") + mock_blob = MagicMock() + mock_bucket = MagicMock() + mock_bucket.blob.return_value = mock_blob + + control = StorageControl() + with patch("src.webapp.gcsutil.tempfile.mkstemp", return_value=(fd, real_path)): + with patch.object( + pd.DataFrame, + "to_csv", + side_effect=OSError(errno.ENOSPC, "No space left on device"), + ): + with patch("src.webapp.gcsutil.logger") as mock_logger: + with pytest.raises(OSError, match="No space left"): + control._write_dataframe_to_gcs_as_csv( + mock_bucket, + "validated/out.csv", + pd.DataFrame({"a": [1]}), + ) + mock_logger.error.assert_called_once() + assert "to_csv failed" in mock_logger.error.call_args[0][0] + assert mock_logger.error.call_args[0][3] == errno.ENOSPC + + assert not os.path.exists(real_path) + mock_blob.upload_from_filename.assert_not_called() + + +def test_write_dataframe_upload_failure_still_unlinks_temp() -> None: + """If GCS upload fails after to_csv, the local temp file is still deleted.""" + fd, real_path = tempfile.mkstemp(suffix=".csv", prefix="test_upload_fail_") + mock_blob = MagicMock() + mock_blob.upload_from_filename.side_effect = RuntimeError("upload failed") + mock_bucket = MagicMock() + mock_bucket.blob.return_value = mock_blob + + control = StorageControl() + with patch("src.webapp.gcsutil.tempfile.mkstemp", return_value=(fd, real_path)): + with pytest.raises(RuntimeError, match="upload failed"): + control._write_dataframe_to_gcs_as_csv( + mock_bucket, + "validated/out.csv", + pd.DataFrame({"x": [1]}), + ) + + assert not os.path.exists(real_path)