Skip to content

Commit 9b46ccd

Browse files
committed
check distributed files availability after transfer
1 parent 01a8ea7 commit 9b46ccd

2 files changed

Lines changed: 37 additions & 13 deletions

File tree

python/lsst/ctrl/bps/panda/utils.py

Lines changed: 12 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -39,7 +39,6 @@
3939
]
4040

4141
import binascii
42-
import concurrent.futures
4342
import json
4443
import logging
4544
import os
@@ -183,7 +182,9 @@ def copy_files_for_distribution(files_to_stage, file_distribution_uri, max_copy_
183182
Path on the edge node accessed storage,
184183
including access protocol, bucket name to place files.
185184
max_copy_workers : `int`
186-
Maximum number of workers for copying files.
185+
Maximum number of workers for copying files. Present for API
186+
compatibility; worker selection is handled internally by
187+
`ResourcePath.mtransfer`.
187188
188189
Raises
189190
------
@@ -205,18 +206,17 @@ def copy_files_for_distribution(files_to_stage, file_distribution_uri, max_copy_
205206
folder_uri = file_distribution_uri.join(folder_name, forceDirectory=False)
206207
files_to_copy[ResourcePath(local_pfn, forceDirectory=False)] = folder_uri
207208

208-
copy_executor = concurrent.futures.ThreadPoolExecutor(max_workers=max_copy_workers)
209-
future_file_copy = []
210209
for src, trgt in files_to_copy.items():
211210
_LOG.debug("Staging %s to %s", src, trgt)
212-
# S3 clients explicitly instantiate here to overpass this
213-
# https://stackoverflow.com/questions/52820971/is-boto3-client-thread-safe
214-
trgt.exists()
215-
future_file_copy.append(copy_executor.submit(trgt.transfer_from, src, transfer="copy"))
216-
217-
for future in concurrent.futures.as_completed(future_file_copy):
218-
if future.result() is not None:
219-
raise RuntimeError("Error of placing files to the distribution point")
211+
results = ResourcePath.mtransfer("copy", files_to_copy.items(), do_raise=False)
212+
213+
for trgt, result in results.items():
214+
if not result.success:
215+
raise RuntimeError(
216+
f"Error of placing file to the distribution point: {trgt}"
217+
) from result.exception
218+
if not trgt.exists():
219+
raise RuntimeError(f"File was not copied to the distribution point: {trgt}")
220220

221221

222222
def get_idds_client(config):

tests/test_utils.py

Lines changed: 25 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -27,10 +27,13 @@
2727

2828
"""Unit tests for ctrl_bps_panda utilities."""
2929

30+
import os
31+
import tempfile
3032
import unittest
3133

3234
from lsst.ctrl.bps import GenericWorkflowExec, GenericWorkflowJob
33-
from lsst.ctrl.bps.panda.utils import _make_pseudo_filename
35+
from lsst.ctrl.bps.panda.utils import _make_pseudo_filename, copy_files_for_distribution
36+
from lsst.resources import ResourcePath
3437

3538

3639
class TestPandaUtils(unittest.TestCase):
@@ -46,6 +49,27 @@ def testOKPseudoFilename(self):
4649
name = _make_pseudo_filename({}, gwjob)
4750
self.assertIn("j" * 15, name)
4851

52+
def testCopyFilesForDistribution(self):
53+
with tempfile.TemporaryDirectory() as src_tmpdir, tempfile.TemporaryDirectory() as dest_tmpdir:
54+
single_file = os.path.join(src_tmpdir, "payload.txt")
55+
with open(single_file, "w") as handle:
56+
handle.write("payload")
57+
58+
directory = os.path.join(src_tmpdir, "inputs")
59+
os.mkdir(directory)
60+
nested_file = os.path.join(directory, "nested.txt")
61+
with open(nested_file, "w") as handle:
62+
handle.write("nested")
63+
64+
copy_files_for_distribution(
65+
{"single": single_file, "directory": directory},
66+
ResourcePath(dest_tmpdir, forceDirectory=True),
67+
max_copy_workers=2,
68+
)
69+
70+
self.assertTrue(os.path.exists(os.path.join(dest_tmpdir, "payload.txt")))
71+
self.assertTrue(os.path.exists(os.path.join(dest_tmpdir, "inputs", "nested.txt")))
72+
4973

5074
if __name__ == "__main__":
5175
unittest.main()

0 commit comments

Comments
 (0)