Skip to content

Commit fa3dec9

Browse files
committed
check distributed files availability after transfer
1 parent 01a8ea7 commit fa3dec9

1 file changed

Lines changed: 13 additions & 12 deletions

File tree

python/lsst/ctrl/bps/panda/utils.py

Lines changed: 13 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -205,18 +205,19 @@ def copy_files_for_distribution(files_to_stage, file_distribution_uri, max_copy_
205205
folder_uri = file_distribution_uri.join(folder_name, forceDirectory=False)
206206
files_to_copy[ResourcePath(local_pfn, forceDirectory=False)] = folder_uri
207207

208-
copy_executor = concurrent.futures.ThreadPoolExecutor(max_workers=max_copy_workers)
209-
future_file_copy = []
210-
for src, trgt in files_to_copy.items():
211-
_LOG.debug("Staging %s to %s", src, trgt)
212-
# S3 clients explicitly instantiate here to overpass this
213-
# https://stackoverflow.com/questions/52820971/is-boto3-client-thread-safe
214-
trgt.exists()
215-
future_file_copy.append(copy_executor.submit(trgt.transfer_from, src, transfer="copy"))
216-
217-
for future in concurrent.futures.as_completed(future_file_copy):
218-
if future.result() is not None:
219-
raise RuntimeError("Error of placing files to the distribution point")
208+
with concurrent.futures.ThreadPoolExecutor(max_workers=max_copy_workers) as copy_executor:
209+
future_file_copy = {}
210+
for src, trgt in files_to_copy.items():
211+
_LOG.debug("Staging %s to %s", src, trgt)
212+
future = copy_executor.submit(trgt.transfer_from, src, transfer="copy")
213+
future_file_copy[future] = trgt
214+
215+
for future in concurrent.futures.as_completed(future_file_copy):
216+
trgt = future_file_copy[future]
217+
if future.result() is not None:
218+
raise RuntimeError(f"Error of placing file to the distribution point: {trgt}")
219+
if not trgt.exists():
220+
raise RuntimeError(f"File was not copied to the distribution point: {trgt}")
220221

221222

222223
def get_idds_client(config):

0 commit comments

Comments
 (0)