Skip to content

Commit 8eba633

Browse files
Merge pull request #224 from datakind/hotfix/validation-upload-memory-staging
2 parents f3e431d + a7a2532 commit 8eba633

2 files changed

Lines changed: 219 additions & 44 deletions

File tree

src/webapp/gcsutil.py

Lines changed: 76 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -1,8 +1,9 @@
11
"""Cloud storage related helper functions."""
22

33
import datetime
4-
import io
54
import logging
5+
import os
6+
import tempfile
67
from typing import Any, Dict, List, Optional
78

89
import pandas as pd
@@ -22,6 +23,41 @@
2223
SIGNED_URL_EXPIRY_MIN = 30
2324

2425

26+
def _unlink_if_exists(path: Optional[str]) -> None:
27+
"""Remove a file if path is set; ignore missing file or permission errors."""
28+
if path is None:
29+
return
30+
try:
31+
os.unlink(path)
32+
except OSError:
33+
pass
34+
35+
36+
def _download_blob_to_temp_csv_path(blob: Any, file_name: str) -> str:
37+
"""
38+
Stream GCS blob to a private temp CSV path for validation.
39+
40+
Raises:
41+
OSError: If download fails (after logging errno/context). Temp file is removed.
42+
"""
43+
fd, csv_path = tempfile.mkstemp(suffix=".csv", prefix="validate_upload_")
44+
os.close(fd)
45+
try:
46+
blob.download_to_filename(csv_path)
47+
except OSError as e:
48+
logger.error(
49+
"GCS download_to_filename failed for %r temp_path=%r errno=%s strerror=%s",
50+
file_name,
51+
csv_path,
52+
e.errno,
53+
e.strerror,
54+
exc_info=True,
55+
)
56+
_unlink_if_exists(csv_path)
57+
raise
58+
return csv_path
59+
60+
2561
def rename_file(
2662
bucket_name: str,
2763
file_name: str,
@@ -423,16 +459,17 @@ def _run_validation_and_get_normalized_df(
423459
institution_identifier: Optional[str],
424460
) -> tuple[List[str], Any]:
425461
"""Run validation on blob content; return inferred schema names and normalized DataFrame."""
462+
local_csv_path: Optional[str] = None
426463
try:
427-
with blob.open("r") as file:
428-
result = validate_file_reader(
429-
file,
430-
allowed_schemas,
431-
base_schema,
432-
inst_schema,
433-
institution_id=institution_id,
434-
institution_identifier=institution_identifier,
435-
)
464+
local_csv_path = _download_blob_to_temp_csv_path(blob, file_name)
465+
result = validate_file_reader(
466+
local_csv_path,
467+
allowed_schemas,
468+
base_schema,
469+
inst_schema,
470+
institution_id=institution_id,
471+
institution_identifier=institution_identifier,
472+
)
436473
inferred_schema_names = [str(s) for s in result.get("schemas", [])]
437474
logging.debug(
438475
"Validation successful for %s: %s", file_name, inferred_schema_names
@@ -447,20 +484,40 @@ def _run_validation_and_get_normalized_df(
447484
# Log any other error with context before re-raising (no silent failures).
448485
logging.exception("Validation failed for %s: %s", file_name, e)
449486
raise
487+
finally:
488+
_unlink_if_exists(local_csv_path)
450489

451490
def _write_dataframe_to_gcs_as_csv(
452491
self, bucket: Any, blob_name: str, normalized_df: pd.DataFrame
453492
) -> None:
454493
"""Write a DataFrame to GCS as UTF-8 CSV. Used for validated/ output."""
455-
csv_buffer = io.StringIO()
456-
normalized_df.to_csv(
457-
csv_buffer, index=False, encoding="utf-8", lineterminator="\n"
458-
)
459-
blob = bucket.blob(blob_name)
460-
blob.upload_from_string(
461-
csv_buffer.getvalue().encode("utf-8"),
462-
content_type="text/csv; charset=utf-8",
463-
)
494+
fd, local_csv_path = tempfile.mkstemp(suffix=".csv", prefix="validated_out_")
495+
os.close(fd)
496+
try:
497+
try:
498+
normalized_df.to_csv(
499+
local_csv_path,
500+
index=False,
501+
encoding="utf-8",
502+
lineterminator="\n",
503+
)
504+
except OSError as e:
505+
logger.error(
506+
"to_csv failed for validated blob %r temp_path=%r errno=%s strerror=%s",
507+
blob_name,
508+
local_csv_path,
509+
e.errno,
510+
e.strerror,
511+
exc_info=True,
512+
)
513+
raise
514+
blob = bucket.blob(blob_name)
515+
blob.upload_from_filename(
516+
local_csv_path,
517+
content_type="text/csv; charset=utf-8",
518+
)
519+
finally:
520+
_unlink_if_exists(local_csv_path)
464521

465522
def get_file_contents(self, bucket_name: str, file_name: str) -> Any:
466523
"""Returns a file as a bytes object."""

src/webapp/gcsutil_test.py

Lines changed: 143 additions & 25 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,8 @@
11
"""Tests for gcsutil.StorageControl validation and normalized/raw archive flow."""
22

3-
import io
3+
import errno
4+
import os
5+
import tempfile
46
from typing import Any
57
from unittest.mock import MagicMock, patch
68

@@ -182,6 +184,14 @@ def blob_side_effect(name: str) -> Any:
182184
mock_client.bucket.return_value = mock_bucket
183185

184186
small_df = pd.DataFrame({"col_a": [1, 2], "col_b": ["x", "y"]})
187+
uploaded_chunks: list[bytes] = []
188+
189+
def capture_validated_upload(path: str, **kwargs: Any) -> None:
190+
with open(path, "rb") as f:
191+
uploaded_chunks.append(f.read())
192+
193+
mock_validated_blob.upload_from_filename.side_effect = capture_validated_upload
194+
185195
control = StorageControl()
186196
with patch("src.webapp.gcsutil.storage.Client", return_value=mock_client):
187197
with patch.object(
@@ -201,13 +211,14 @@ def blob_side_effect(name: str) -> Any:
201211
mock_unvalidated_blob, mock_bucket, "raw/cohort.csv"
202212
)
203213
mock_unvalidated_blob.delete.assert_called_once()
204-
# _write_dataframe_to_gcs_as_csv is called; it does bucket.blob(validated_blob_name).upload_from_string
214+
# _write_dataframe_to_gcs_as_csv uploads from a temp file via upload_from_filename
205215
assert mock_bucket.blob.call_count >= 2
206-
mock_validated_blob.upload_from_string.assert_called_once()
207-
call_args = mock_validated_blob.upload_from_string.call_args
208-
assert call_args.kwargs["content_type"] == "text/csv; charset=utf-8"
209-
uploaded = call_args.args[0]
210-
assert isinstance(uploaded, bytes)
216+
mock_validated_blob.upload_from_filename.assert_called_once()
217+
assert mock_validated_blob.upload_from_filename.call_args.kwargs[
218+
"content_type"
219+
] == ("text/csv; charset=utf-8")
220+
assert len(uploaded_chunks) == 1
221+
uploaded = uploaded_chunks[0]
211222
assert b"col_a,col_b" in uploaded
212223
assert b"1,x" in uploaded
213224

@@ -251,9 +262,12 @@ def test_validate_file_propagates_hard_validation_error() -> None:
251262
def test_run_validation_and_get_normalized_df_returns_names_and_df() -> None:
252263
"""Returns (inferred_schema_names, normalized_df) when validation succeeds."""
253264
mock_blob = MagicMock()
254-
mock_file = io.StringIO("foo_col,bar_col\n1,a\n2,b\n")
255-
mock_blob.open.return_value.__enter__ = lambda self: mock_file
256-
mock_blob.open.return_value.__exit__ = lambda self, *args: None
265+
266+
def download_to_path(path: str) -> None:
267+
with open(path, "w", encoding="utf-8", newline="") as f:
268+
f.write("foo_col,bar_col\n1,a\n2,b\n")
269+
270+
mock_blob.download_to_filename.side_effect = download_to_path
257271

258272
control = StorageControl()
259273
with patch("src.webapp.gcsutil.validate_file_reader") as mock_validate:
@@ -281,9 +295,7 @@ def test_run_validation_and_get_normalized_df_propagates_hard_validation_error()
281295
):
282296
"""HardValidationError is re-raised without wrapping."""
283297
mock_blob = MagicMock()
284-
mock_file = io.StringIO("bad")
285-
mock_blob.open.return_value.__enter__ = lambda self: mock_file
286-
mock_blob.open.return_value.__exit__ = lambda self, *args: None
298+
mock_blob.download_to_filename.side_effect = lambda p: open(p, "w").close()
287299

288300
control = StorageControl()
289301
with patch(
@@ -298,9 +310,7 @@ def test_run_validation_and_get_normalized_df_propagates_hard_validation_error()
298310
def test_run_validation_and_get_normalized_df_propagates_value_error() -> None:
299311
"""ValueError from validate_file_reader (e.g. encoding) is re-raised."""
300312
mock_blob = MagicMock()
301-
mock_file = io.StringIO("data")
302-
mock_blob.open.return_value.__enter__ = lambda self: mock_file
303-
mock_blob.open.return_value.__exit__ = lambda self, *args: None
313+
mock_blob.download_to_filename.side_effect = lambda p: open(p, "w").close()
304314

305315
control = StorageControl()
306316
with patch(
@@ -316,9 +326,7 @@ def test_run_validation_and_get_normalized_df_propagates_value_error() -> None:
316326
def test_run_validation_and_get_normalized_df_propagates_unicode_error() -> None:
317327
"""UnicodeError from validate_file_reader (e.g. decode) is re-raised."""
318328
mock_blob = MagicMock()
319-
mock_file = io.StringIO("data")
320-
mock_blob.open.return_value.__enter__ = lambda self: mock_file
321-
mock_blob.open.return_value.__exit__ = lambda self, *args: None
329+
mock_blob.download_to_filename.side_effect = lambda p: open(p, "w").close()
322330

323331
control = StorageControl()
324332
with patch(
@@ -343,13 +351,123 @@ def test_write_dataframe_to_gcs_as_csv_uploads_utf8_csv() -> None:
343351
mock_bucket.blob.return_value = mock_blob
344352

345353
df = pd.DataFrame({"A": [1, 2], "B": ["a", "b"]})
354+
355+
def assert_csv_at_path(path: str, **kwargs: Any) -> None:
356+
with open(path, "r", encoding="utf-8") as f:
357+
assert f.read().strip() == "A,B\n1,a\n2,b"
358+
359+
mock_blob.upload_from_filename.side_effect = assert_csv_at_path
360+
346361
control = StorageControl()
347362
control._write_dataframe_to_gcs_as_csv(mock_bucket, "validated/out.csv", df)
348363

349364
mock_bucket.blob.assert_called_once_with("validated/out.csv")
350-
mock_blob.upload_from_string.assert_called_once()
351-
call_args = mock_blob.upload_from_string.call_args
352-
assert call_args.kwargs["content_type"] == "text/csv; charset=utf-8"
353-
payload = call_args.args[0]
354-
assert isinstance(payload, bytes)
355-
assert payload.decode("utf-8").strip() == "A,B\n1,a\n2,b"
365+
mock_blob.upload_from_filename.assert_called_once()
366+
assert mock_blob.upload_from_filename.call_args.kwargs["content_type"] == (
367+
"text/csv; charset=utf-8"
368+
)
369+
370+
371+
def test_run_validation_download_oserror_unlinks_temp_and_skips_validate() -> None:
372+
"""If GCS download fails, temp file is removed and validate_file_reader is not run."""
373+
fd, real_path = tempfile.mkstemp(suffix=".csv", prefix="test_dl_oserr_")
374+
mock_blob = MagicMock()
375+
mock_blob.download_to_filename.side_effect = OSError(
376+
errno.ENOSPC, "No space left on device"
377+
)
378+
379+
control = StorageControl()
380+
with patch("src.webapp.gcsutil.tempfile.mkstemp", return_value=(fd, real_path)):
381+
with patch("src.webapp.gcsutil.validate_file_reader") as mock_validate:
382+
with pytest.raises(OSError, match="No space left"):
383+
control._run_validation_and_get_normalized_df(
384+
mock_blob,
385+
"school_course.csv",
386+
["STUDENT"],
387+
{},
388+
None,
389+
"pdp",
390+
None,
391+
)
392+
mock_validate.assert_not_called()
393+
394+
assert not os.path.exists(real_path)
395+
mock_blob.download_to_filename.assert_called_once_with(real_path)
396+
397+
398+
def test_run_validation_download_oserror_logs_errno() -> None:
399+
"""OSError from download_to_filename is logged with errno before re-raise."""
400+
fd, real_path = tempfile.mkstemp(suffix=".csv", prefix="test_dl_log_")
401+
mock_blob = MagicMock()
402+
mock_blob.download_to_filename.side_effect = OSError(
403+
errno.ENOSPC, "No space left on device"
404+
)
405+
406+
control = StorageControl()
407+
with patch("src.webapp.gcsutil.tempfile.mkstemp", return_value=(fd, real_path)):
408+
with patch("src.webapp.gcsutil.logger") as mock_logger:
409+
with pytest.raises(OSError):
410+
control._run_validation_and_get_normalized_df(
411+
mock_blob,
412+
"f.csv",
413+
["STUDENT"],
414+
{},
415+
None,
416+
"pdp",
417+
None,
418+
)
419+
mock_logger.error.assert_called_once()
420+
msg = mock_logger.error.call_args[0][0]
421+
assert "download_to_filename failed" in msg
422+
assert mock_logger.error.call_args[0][3] == errno.ENOSPC
423+
424+
assert not os.path.exists(real_path)
425+
426+
427+
def test_write_dataframe_to_csv_oserror_unlinks_temp() -> None:
428+
"""If to_csv fails (e.g. disk full), temp file is removed and upload is not attempted."""
429+
fd, real_path = tempfile.mkstemp(suffix=".csv", prefix="test_csv_oserr_")
430+
mock_blob = MagicMock()
431+
mock_bucket = MagicMock()
432+
mock_bucket.blob.return_value = mock_blob
433+
434+
control = StorageControl()
435+
with patch("src.webapp.gcsutil.tempfile.mkstemp", return_value=(fd, real_path)):
436+
with patch.object(
437+
pd.DataFrame,
438+
"to_csv",
439+
side_effect=OSError(errno.ENOSPC, "No space left on device"),
440+
):
441+
with patch("src.webapp.gcsutil.logger") as mock_logger:
442+
with pytest.raises(OSError, match="No space left"):
443+
control._write_dataframe_to_gcs_as_csv(
444+
mock_bucket,
445+
"validated/out.csv",
446+
pd.DataFrame({"a": [1]}),
447+
)
448+
mock_logger.error.assert_called_once()
449+
assert "to_csv failed" in mock_logger.error.call_args[0][0]
450+
assert mock_logger.error.call_args[0][3] == errno.ENOSPC
451+
452+
assert not os.path.exists(real_path)
453+
mock_blob.upload_from_filename.assert_not_called()
454+
455+
456+
def test_write_dataframe_upload_failure_still_unlinks_temp() -> None:
457+
"""If GCS upload fails after to_csv, the local temp file is still deleted."""
458+
fd, real_path = tempfile.mkstemp(suffix=".csv", prefix="test_upload_fail_")
459+
mock_blob = MagicMock()
460+
mock_blob.upload_from_filename.side_effect = RuntimeError("upload failed")
461+
mock_bucket = MagicMock()
462+
mock_bucket.blob.return_value = mock_blob
463+
464+
control = StorageControl()
465+
with patch("src.webapp.gcsutil.tempfile.mkstemp", return_value=(fd, real_path)):
466+
with pytest.raises(RuntimeError, match="upload failed"):
467+
control._write_dataframe_to_gcs_as_csv(
468+
mock_bucket,
469+
"validated/out.csv",
470+
pd.DataFrame({"x": [1]}),
471+
)
472+
473+
assert not os.path.exists(real_path)

0 commit comments

Comments
 (0)