3939]
4040
4141import binascii
42- import concurrent .futures
4342import json
4443import logging
4544import os
@@ -183,7 +182,9 @@ def copy_files_for_distribution(files_to_stage, file_distribution_uri, max_copy_
183182 Path on the edge node accessed storage,
184183 including access protocol, bucket name to place files.
185184 max_copy_workers : `int`
186- Maximum number of workers for copying files.
185+ Maximum number of workers for copying files. Present for API
186+ compatibility; worker selection is handled internally by
187+ `ResourcePath.mtransfer`.
187188
188189 Raises
189190 ------
@@ -205,18 +206,17 @@ def copy_files_for_distribution(files_to_stage, file_distribution_uri, max_copy_
205206 folder_uri = file_distribution_uri .join (folder_name , forceDirectory = False )
206207 files_to_copy [ResourcePath (local_pfn , forceDirectory = False )] = folder_uri
207208
208- copy_executor = concurrent .futures .ThreadPoolExecutor (max_workers = max_copy_workers )
209- future_file_copy = []
210209 for src , trgt in files_to_copy .items ():
211- _LOG .debug ("Staging %s to %s" , src , trgt )
212- # S3 clients explicitly instantiate here to overpass this
213- # https://stackoverflow.com/questions/52820971/is-boto3-client-thread-safe
214- trgt .exists ()
215- future_file_copy .append (copy_executor .submit (trgt .transfer_from , src , transfer = "copy" ))
216-
217- for future in concurrent .futures .as_completed (future_file_copy ):
218- if future .result () is not None :
219- raise RuntimeError ("Error of placing files to the distribution point" )
210+ _LOG .info ("Staging %s to %s" , src , trgt )
211+ results = ResourcePath .mtransfer ("copy" , files_to_copy .items (), do_raise = False )
212+
213+ for trgt , result in results .items ():
214+ if not result .success :
215+ raise RuntimeError (
216+ f"Error of placing file to the distribution point: { trgt } "
217+ ) from result .exception
218+ if not trgt .exists ():
219+ raise RuntimeError (f"File was not copied to the distribution point: { trgt } " )
220220
221221
222222def get_idds_client (config ):
0 commit comments