Skip to content

Commit f7ae56f

Browse files
committed
fix: S3Downloader - improve concurrent behavior
1 parent 5336f47 commit f7ae56f

2 files changed

Lines changed: 37 additions & 7 deletions

File tree

integrations/amazon_bedrock/src/haystack_integrations/components/downloaders/s3/s3_downloader.py

Lines changed: 14 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@
33
# SPDX-License-Identifier: Apache-2.0
44

55
import os
6+
import uuid
67
from collections.abc import Callable
78
from concurrent.futures import ThreadPoolExecutor
89
from pathlib import Path
@@ -200,14 +201,20 @@ def _download_file(self, document: Document) -> Document | None:
200201

201202
file_path = self.file_root_path / Path(file_name)
202203

203-
if file_path.is_file():
204-
# set access and modification time to now without redownloading the file
205-
file_path.touch()
206-
207-
else:
204+
# if the file exists, avoid downloading it and just update the timestamp
205+
try:
206+
os.utime(file_path, None)
207+
except FileNotFoundError:
208208
s3_key = self.s3_key_generation_function(document) if self.s3_key_generation_function else file_name
209-
# we know that _storage is not None after warm_up() is called, but mypy does not know that
210-
self._storage.download(key=s3_key, local_file_path=file_path) # type: ignore[union-attr]
209+
# download to a temp path to prevent other downloaders running concurrently to see a partially-written file
210+
tmp_path = file_path.with_name(f"{file_path.name}.tmp-{uuid.uuid4().hex}")
211+
try:
212+
# we know that _storage is not None after warm_up() is called, but mypy does not know that
213+
self._storage.download(key=s3_key, local_file_path=tmp_path) # type: ignore[union-attr]
214+
os.replace(tmp_path, file_path)
215+
except BaseException:
216+
tmp_path.unlink(missing_ok=True)
217+
raise
211218

212219
document.meta["file_path"] = str(file_path)
213220
return document

integrations/amazon_bedrock/tests/test_s3_downloader.py

Lines changed: 23 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -308,6 +308,29 @@ def test_run_returns_empty_when_all_filtered(self, tmp_path, mock_s3_storage, mo
308308
assert out["documents"] == []
309309
mock_s3_storage.download.assert_not_called()
310310

311+
def test_download_writes_to_temp_path_then_renames(self, tmp_path, mock_boto3_session):
312+
d = S3Downloader(file_root_path=str(tmp_path))
313+
314+
final_path = tmp_path / "test.pdf"
315+
captured_paths = []
316+
317+
def fake_download(key, local_file_path: Path):
318+
captured_paths.append(Path(local_file_path))
319+
assert not final_path.exists(), "final path must not exist while download is in progress"
320+
Path(local_file_path).write_bytes(b"complete content")
321+
322+
mock_storage = MagicMock(spec=S3Storage)
323+
mock_storage.download.side_effect = fake_download
324+
d._storage = mock_storage
325+
326+
d.run(documents=[Document(meta={"file_name": "test.pdf"})])
327+
328+
assert len(captured_paths) == 1
329+
assert captured_paths[0] != final_path
330+
assert captured_paths[0].name.startswith("test.pdf.tmp-")
331+
assert final_path.exists()
332+
assert final_path.read_bytes() == b"complete content"
333+
311334
def test_cleanup_cache_evicts_old_files(self, tmp_path, mock_s3_storage, mock_boto3_session):
312335
d = S3Downloader(file_root_path=str(tmp_path), max_cache_size=1)
313336
d._storage = mock_s3_storage

0 commit comments

Comments
 (0)