Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
95 changes: 76 additions & 19 deletions src/webapp/gcsutil.py
Original file line number Diff line number Diff line change
@@ -1,8 +1,9 @@
"""Cloud storage related helper functions."""

import datetime
import io
import logging
import os
import tempfile
from typing import Any, Dict, List, Optional

import pandas as pd
Expand All @@ -22,6 +23,41 @@
SIGNED_URL_EXPIRY_MIN = 30


def _unlink_if_exists(path: Optional[str]) -> None:
"""Remove a file if path is set; ignore missing file or permission errors."""
if path is None:
return
try:
os.unlink(path)
except OSError:
pass


def _download_blob_to_temp_csv_path(blob: Any, file_name: str) -> str:
"""
Stream GCS blob to a private temp CSV path for validation.

Raises:
OSError: If download fails (after logging errno/context). Temp file is removed.
"""
fd, csv_path = tempfile.mkstemp(suffix=".csv", prefix="validate_upload_")
os.close(fd)
try:
blob.download_to_filename(csv_path)
except OSError as e:
logger.error(
"GCS download_to_filename failed for %r temp_path=%r errno=%s strerror=%s",
file_name,
csv_path,
e.errno,
e.strerror,
exc_info=True,
)
_unlink_if_exists(csv_path)
raise
return csv_path


def rename_file(
bucket_name: str,
file_name: str,
Expand Down Expand Up @@ -423,16 +459,17 @@ def _run_validation_and_get_normalized_df(
institution_identifier: Optional[str],
) -> tuple[List[str], Any]:
"""Run validation on blob content; return inferred schema names and normalized DataFrame."""
local_csv_path: Optional[str] = None
try:
with blob.open("r") as file:
result = validate_file_reader(
file,
allowed_schemas,
base_schema,
inst_schema,
institution_id=institution_id,
institution_identifier=institution_identifier,
)
local_csv_path = _download_blob_to_temp_csv_path(blob, file_name)
result = validate_file_reader(
local_csv_path,
allowed_schemas,
base_schema,
inst_schema,
institution_id=institution_id,
institution_identifier=institution_identifier,
)
inferred_schema_names = [str(s) for s in result.get("schemas", [])]
logging.debug(
"Validation successful for %s: %s", file_name, inferred_schema_names
Expand All @@ -447,20 +484,40 @@ def _run_validation_and_get_normalized_df(
# Log any other error with context before re-raising (no silent failures).
logging.exception("Validation failed for %s: %s", file_name, e)
raise
finally:
_unlink_if_exists(local_csv_path)

def _write_dataframe_to_gcs_as_csv(
self, bucket: Any, blob_name: str, normalized_df: pd.DataFrame
) -> None:
"""Write a DataFrame to GCS as UTF-8 CSV. Used for validated/ output."""
csv_buffer = io.StringIO()
normalized_df.to_csv(
csv_buffer, index=False, encoding="utf-8", lineterminator="\n"
)
blob = bucket.blob(blob_name)
blob.upload_from_string(
csv_buffer.getvalue().encode("utf-8"),
content_type="text/csv; charset=utf-8",
)
fd, local_csv_path = tempfile.mkstemp(suffix=".csv", prefix="validated_out_")
os.close(fd)
try:
try:
normalized_df.to_csv(
local_csv_path,
index=False,
encoding="utf-8",
lineterminator="\n",
)
except OSError as e:
logger.error(
"to_csv failed for validated blob %r temp_path=%r errno=%s strerror=%s",
blob_name,
local_csv_path,
e.errno,
e.strerror,
exc_info=True,
)
raise
blob = bucket.blob(blob_name)
blob.upload_from_filename(
local_csv_path,
content_type="text/csv; charset=utf-8",
)
finally:
_unlink_if_exists(local_csv_path)

def get_file_contents(self, bucket_name: str, file_name: str) -> Any:
"""Returns a file as a bytes object."""
Expand Down
168 changes: 143 additions & 25 deletions src/webapp/gcsutil_test.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
"""Tests for gcsutil.StorageControl validation and normalized/raw archive flow."""

import io
import errno
import os
import tempfile
from typing import Any
from unittest.mock import MagicMock, patch

Expand Down Expand Up @@ -182,6 +184,14 @@ def blob_side_effect(name: str) -> Any:
mock_client.bucket.return_value = mock_bucket

small_df = pd.DataFrame({"col_a": [1, 2], "col_b": ["x", "y"]})
uploaded_chunks: list[bytes] = []

def capture_validated_upload(path: str, **kwargs: Any) -> None:
with open(path, "rb") as f:
uploaded_chunks.append(f.read())

mock_validated_blob.upload_from_filename.side_effect = capture_validated_upload

control = StorageControl()
with patch("src.webapp.gcsutil.storage.Client", return_value=mock_client):
with patch.object(
Expand All @@ -201,13 +211,14 @@ def blob_side_effect(name: str) -> Any:
mock_unvalidated_blob, mock_bucket, "raw/cohort.csv"
)
mock_unvalidated_blob.delete.assert_called_once()
# _write_dataframe_to_gcs_as_csv is called; it does bucket.blob(validated_blob_name).upload_from_string
# _write_dataframe_to_gcs_as_csv uploads from a temp file via upload_from_filename
assert mock_bucket.blob.call_count >= 2
mock_validated_blob.upload_from_string.assert_called_once()
call_args = mock_validated_blob.upload_from_string.call_args
assert call_args.kwargs["content_type"] == "text/csv; charset=utf-8"
uploaded = call_args.args[0]
assert isinstance(uploaded, bytes)
mock_validated_blob.upload_from_filename.assert_called_once()
assert mock_validated_blob.upload_from_filename.call_args.kwargs[
"content_type"
] == ("text/csv; charset=utf-8")
assert len(uploaded_chunks) == 1
uploaded = uploaded_chunks[0]
assert b"col_a,col_b" in uploaded
assert b"1,x" in uploaded

Expand Down Expand Up @@ -251,9 +262,12 @@ def test_validate_file_propagates_hard_validation_error() -> None:
def test_run_validation_and_get_normalized_df_returns_names_and_df() -> None:
"""Returns (inferred_schema_names, normalized_df) when validation succeeds."""
mock_blob = MagicMock()
mock_file = io.StringIO("foo_col,bar_col\n1,a\n2,b\n")
mock_blob.open.return_value.__enter__ = lambda self: mock_file
mock_blob.open.return_value.__exit__ = lambda self, *args: None

def download_to_path(path: str) -> None:
with open(path, "w", encoding="utf-8", newline="") as f:
f.write("foo_col,bar_col\n1,a\n2,b\n")

mock_blob.download_to_filename.side_effect = download_to_path

control = StorageControl()
with patch("src.webapp.gcsutil.validate_file_reader") as mock_validate:
Expand Down Expand Up @@ -281,9 +295,7 @@ def test_run_validation_and_get_normalized_df_propagates_hard_validation_error()
):
"""HardValidationError is re-raised without wrapping."""
mock_blob = MagicMock()
mock_file = io.StringIO("bad")
mock_blob.open.return_value.__enter__ = lambda self: mock_file
mock_blob.open.return_value.__exit__ = lambda self, *args: None
mock_blob.download_to_filename.side_effect = lambda p: open(p, "w").close()

control = StorageControl()
with patch(
Expand All @@ -298,9 +310,7 @@ def test_run_validation_and_get_normalized_df_propagates_hard_validation_error()
def test_run_validation_and_get_normalized_df_propagates_value_error() -> None:
"""ValueError from validate_file_reader (e.g. encoding) is re-raised."""
mock_blob = MagicMock()
mock_file = io.StringIO("data")
mock_blob.open.return_value.__enter__ = lambda self: mock_file
mock_blob.open.return_value.__exit__ = lambda self, *args: None
mock_blob.download_to_filename.side_effect = lambda p: open(p, "w").close()

control = StorageControl()
with patch(
Expand All @@ -316,9 +326,7 @@ def test_run_validation_and_get_normalized_df_propagates_value_error() -> None:
def test_run_validation_and_get_normalized_df_propagates_unicode_error() -> None:
"""UnicodeError from validate_file_reader (e.g. decode) is re-raised."""
mock_blob = MagicMock()
mock_file = io.StringIO("data")
mock_blob.open.return_value.__enter__ = lambda self: mock_file
mock_blob.open.return_value.__exit__ = lambda self, *args: None
mock_blob.download_to_filename.side_effect = lambda p: open(p, "w").close()

control = StorageControl()
with patch(
Expand All @@ -343,13 +351,123 @@ def test_write_dataframe_to_gcs_as_csv_uploads_utf8_csv() -> None:
mock_bucket.blob.return_value = mock_blob

df = pd.DataFrame({"A": [1, 2], "B": ["a", "b"]})

def assert_csv_at_path(path: str, **kwargs: Any) -> None:
with open(path, "r", encoding="utf-8") as f:
assert f.read().strip() == "A,B\n1,a\n2,b"

mock_blob.upload_from_filename.side_effect = assert_csv_at_path

control = StorageControl()
control._write_dataframe_to_gcs_as_csv(mock_bucket, "validated/out.csv", df)

mock_bucket.blob.assert_called_once_with("validated/out.csv")
mock_blob.upload_from_string.assert_called_once()
call_args = mock_blob.upload_from_string.call_args
assert call_args.kwargs["content_type"] == "text/csv; charset=utf-8"
payload = call_args.args[0]
assert isinstance(payload, bytes)
assert payload.decode("utf-8").strip() == "A,B\n1,a\n2,b"
mock_blob.upload_from_filename.assert_called_once()
assert mock_blob.upload_from_filename.call_args.kwargs["content_type"] == (
"text/csv; charset=utf-8"
)


def test_run_validation_download_oserror_unlinks_temp_and_skips_validate() -> None:
"""If GCS download fails, temp file is removed and validate_file_reader is not run."""
fd, real_path = tempfile.mkstemp(suffix=".csv", prefix="test_dl_oserr_")
mock_blob = MagicMock()
mock_blob.download_to_filename.side_effect = OSError(
errno.ENOSPC, "No space left on device"
)

control = StorageControl()
with patch("src.webapp.gcsutil.tempfile.mkstemp", return_value=(fd, real_path)):
with patch("src.webapp.gcsutil.validate_file_reader") as mock_validate:
with pytest.raises(OSError, match="No space left"):
control._run_validation_and_get_normalized_df(
mock_blob,
"school_course.csv",
["STUDENT"],
{},
None,
"pdp",
None,
)
mock_validate.assert_not_called()

assert not os.path.exists(real_path)
mock_blob.download_to_filename.assert_called_once_with(real_path)


def test_run_validation_download_oserror_logs_errno() -> None:
"""OSError from download_to_filename is logged with errno before re-raise."""
fd, real_path = tempfile.mkstemp(suffix=".csv", prefix="test_dl_log_")
mock_blob = MagicMock()
mock_blob.download_to_filename.side_effect = OSError(
errno.ENOSPC, "No space left on device"
)

control = StorageControl()
with patch("src.webapp.gcsutil.tempfile.mkstemp", return_value=(fd, real_path)):
with patch("src.webapp.gcsutil.logger") as mock_logger:
with pytest.raises(OSError):
control._run_validation_and_get_normalized_df(
mock_blob,
"f.csv",
["STUDENT"],
{},
None,
"pdp",
None,
)
mock_logger.error.assert_called_once()
msg = mock_logger.error.call_args[0][0]
assert "download_to_filename failed" in msg
assert mock_logger.error.call_args[0][3] == errno.ENOSPC

assert not os.path.exists(real_path)


def test_write_dataframe_to_csv_oserror_unlinks_temp() -> None:
"""If to_csv fails (e.g. disk full), temp file is removed and upload is not attempted."""
fd, real_path = tempfile.mkstemp(suffix=".csv", prefix="test_csv_oserr_")
mock_blob = MagicMock()
mock_bucket = MagicMock()
mock_bucket.blob.return_value = mock_blob

control = StorageControl()
with patch("src.webapp.gcsutil.tempfile.mkstemp", return_value=(fd, real_path)):
with patch.object(
pd.DataFrame,
"to_csv",
side_effect=OSError(errno.ENOSPC, "No space left on device"),
):
with patch("src.webapp.gcsutil.logger") as mock_logger:
with pytest.raises(OSError, match="No space left"):
control._write_dataframe_to_gcs_as_csv(
mock_bucket,
"validated/out.csv",
pd.DataFrame({"a": [1]}),
)
mock_logger.error.assert_called_once()
assert "to_csv failed" in mock_logger.error.call_args[0][0]
assert mock_logger.error.call_args[0][3] == errno.ENOSPC

assert not os.path.exists(real_path)
mock_blob.upload_from_filename.assert_not_called()


def test_write_dataframe_upload_failure_still_unlinks_temp() -> None:
"""If GCS upload fails after to_csv, the local temp file is still deleted."""
fd, real_path = tempfile.mkstemp(suffix=".csv", prefix="test_upload_fail_")
mock_blob = MagicMock()
mock_blob.upload_from_filename.side_effect = RuntimeError("upload failed")
mock_bucket = MagicMock()
mock_bucket.blob.return_value = mock_blob

control = StorageControl()
with patch("src.webapp.gcsutil.tempfile.mkstemp", return_value=(fd, real_path)):
with pytest.raises(RuntimeError, match="upload failed"):
control._write_dataframe_to_gcs_as_csv(
mock_bucket,
"validated/out.csv",
pd.DataFrame({"x": [1]}),
)

assert not os.path.exists(real_path)
Loading