Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions doc/changes/DM-54502.misc.rst
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
Capture ResourcePath transfer_from exception in copy_files_for_distribution
26 changes: 13 additions & 13 deletions python/lsst/ctrl/bps/panda/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,6 @@
]

import binascii
import concurrent.futures
import json
import logging
import os
Expand Down Expand Up @@ -183,7 +182,9 @@ def copy_files_for_distribution(files_to_stage, file_distribution_uri, max_copy_
Path on the edge node accessed storage,
including access protocol, bucket name to place files.
max_copy_workers : `int`
Maximum number of workers for copying files.
Maximum number of workers for copying files. Present for API
compatibility; worker selection is handled internally by
`ResourcePath.mtransfer`.

Raises
------
Expand All @@ -205,18 +206,17 @@ def copy_files_for_distribution(files_to_stage, file_distribution_uri, max_copy_
folder_uri = file_distribution_uri.join(folder_name, forceDirectory=False)
files_to_copy[ResourcePath(local_pfn, forceDirectory=False)] = folder_uri

copy_executor = concurrent.futures.ThreadPoolExecutor(max_workers=max_copy_workers)
future_file_copy = []
for src, trgt in files_to_copy.items():
_LOG.debug("Staging %s to %s", src, trgt)
# S3 clients explicitly instantiate here to overpass this
# https://stackoverflow.com/questions/52820971/is-boto3-client-thread-safe
trgt.exists()
future_file_copy.append(copy_executor.submit(trgt.transfer_from, src, transfer="copy"))

for future in concurrent.futures.as_completed(future_file_copy):
if future.result() is not None:
raise RuntimeError("Error of placing files to the distribution point")
_LOG.info("Staging %s to %s", src, trgt)
results = ResourcePath.mtransfer("copy", files_to_copy.items(), do_raise=False)

for trgt, result in results.items():
if not result.success:
raise RuntimeError(
f"Error of placing file to the distribution point: {trgt}"
) from result.exception
if not trgt.exists():
raise RuntimeError(f"File was not copied to the distribution point: {trgt}")


def get_idds_client(config):
Expand Down
26 changes: 25 additions & 1 deletion tests/test_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,10 +27,13 @@

"""Unit tests for ctrl_bps_panda utilities."""

import os
import tempfile
import unittest

from lsst.ctrl.bps import GenericWorkflowExec, GenericWorkflowJob
from lsst.ctrl.bps.panda.utils import _make_pseudo_filename
from lsst.ctrl.bps.panda.utils import _make_pseudo_filename, copy_files_for_distribution
from lsst.resources import ResourcePath


class TestPandaUtils(unittest.TestCase):
Expand All @@ -46,6 +49,27 @@ def testOKPseudoFilename(self):
name = _make_pseudo_filename({}, gwjob)
self.assertIn("j" * 15, name)

def testCopyFilesForDistribution(self):
with tempfile.TemporaryDirectory() as src_tmpdir, tempfile.TemporaryDirectory() as dest_tmpdir:
single_file = os.path.join(src_tmpdir, "payload.txt")
with open(single_file, "w") as handle:
handle.write("payload")

directory = os.path.join(src_tmpdir, "inputs")
os.mkdir(directory)
nested_file = os.path.join(directory, "nested.txt")
with open(nested_file, "w") as handle:
handle.write("nested")

copy_files_for_distribution(
{"single": single_file, "directory": directory},
ResourcePath(dest_tmpdir, forceDirectory=True),
max_copy_workers=2,
)

self.assertTrue(os.path.exists(os.path.join(dest_tmpdir, "payload.txt")))
self.assertTrue(os.path.exists(os.path.join(dest_tmpdir, "inputs", "nested.txt")))


if __name__ == "__main__":
unittest.main()
Loading