@@ -258,12 +258,14 @@ def _get_none_subscribers(self, fileinfo):
258258 return [
259259 ReplaceMetadataDirectiveSubscriber (),
260260 ReplaceTaggingDirectiveSubscriber (),
261+ ExcludeAnnotationDirectiveSubscriber (),
261262 ]
262263
263264 def _get_metadata_directive_subscribers (self , fileinfo ):
264265 return [
265266 self ._create_metadata_directive_props_subscriber (fileinfo ),
266267 ReplaceTaggingDirectiveSubscriber (),
268+ ExcludeAnnotationDirectiveSubscriber (),
267269 ]
268270
269271 def _get_default_subscribers (self , fileinfo ):
@@ -275,6 +277,19 @@ def _get_default_subscribers(self, fileinfo):
275277 self ._cli_params ,
276278 source_client = fileinfo .source_client ,
277279 ),
280+ ExcludeAnnotationDirectiveSubscriber (),
281+ ]
282+
283+ def _get_all_subscribers (self , fileinfo ):
284+ return [
285+ self ._create_metadata_directive_props_subscriber (fileinfo ),
286+ SetTagsSubscriber (
287+ self ._client ,
288+ self ._transfer_config ,
289+ self ._cli_params ,
290+ source_client = fileinfo .source_client ,
291+ ),
292+ self ._create_annotations_subscriber (fileinfo ),
278293 ]
279294
280295 def _create_metadata_directive_props_subscriber (self , fileinfo ):
@@ -289,20 +304,43 @@ def _create_metadata_directive_props_subscriber(self, fileinfo):
289304 )
290305 return SetMetadataDirectivePropsSubscriber (** subscriber_kwargs )
291306
307+ def _create_annotations_subscriber (self , fileinfo ):
308+ kwargs = {
309+ 'client' : self ._client ,
310+ 'transfer_config' : self ._transfer_config ,
311+ 'cli_params' : self ._cli_params ,
312+ 'source_client' : fileinfo .source_client ,
313+ }
314+ if not self ._cli_params .get ('dir_op' ):
315+ kwargs ['head_object_response' ] = (
316+ fileinfo .associated_response_data
317+ )
318+ return SetAnnotationsSubscriber (** kwargs )
319+
292320
293- class ReplaceDirectiveSubscriber (BaseSubscriber ):
321+ class SetDirectiveSubscriber (BaseSubscriber ):
294322 _DIRECTIVE_PARAM = ''
323+ _DIRECTIVE_VALUE = ''
295324
296325 def on_queued (self , future , ** kwargs ):
297- future .meta .call_args .extra_args [self ._DIRECTIVE_PARAM ] = 'REPLACE'
326+ future .meta .call_args .extra_args [self ._DIRECTIVE_PARAM ] = (
327+ self ._DIRECTIVE_VALUE
328+ )
298329
299330
300- class ReplaceMetadataDirectiveSubscriber (ReplaceDirectiveSubscriber ):
331+ class ReplaceMetadataDirectiveSubscriber (SetDirectiveSubscriber ):
301332 _DIRECTIVE_PARAM = 'MetadataDirective'
333+ _DIRECTIVE_VALUE = 'REPLACE'
302334
303335
304- class ReplaceTaggingDirectiveSubscriber (ReplaceDirectiveSubscriber ):
336+ class ReplaceTaggingDirectiveSubscriber (SetDirectiveSubscriber ):
305337 _DIRECTIVE_PARAM = 'TaggingDirective'
338+ _DIRECTIVE_VALUE = 'REPLACE'
339+
340+
341+ class ExcludeAnnotationDirectiveSubscriber (SetDirectiveSubscriber ):
342+ _DIRECTIVE_PARAM = 'AnnotationDirective'
343+ _DIRECTIVE_VALUE = 'EXCLUDE'
306344
307345
308346class SetMetadataDirectivePropsSubscriber (BaseSubscriber ):
@@ -442,3 +480,137 @@ def _serialize_to_header_value(self, tags):
442480
443481 def _is_multipart_copy (self , future ):
444482 return future .meta .size >= self ._transfer_config .multipart_threshold
483+
484+
485+ class AnnotationCopyError (Exception ):
486+ def __init__ (self , bucket , key , succeeded , failed ):
487+ succeeded_names = ', ' .join (succeeded ) or '(none)'
488+ failed_descriptions = '; ' .join (
489+ f'{ name } : { error } ' for name , error in failed
490+ )
491+ super ().__init__ (
492+ f'Failed to copy all annotations to s3://{ bucket } /{ key } . '
493+ f'The object was copied successfully and was not deleted. '
494+ f'Annotations written: { succeeded_names } . '
495+ f'Annotations that failed: { failed_descriptions } .'
496+ )
497+
498+
499+ class SetAnnotationsSubscriber (OnDoneFilteredSubscriber ):
500+ _ANNOTATIONS_CONTEXT_KEY = 'CopySourceAnnotations'
501+
502+ def __init__ (
503+ self , client , transfer_config , cli_params , source_client ,
504+ head_object_response = None ,
505+ ):
506+ self ._client = client
507+ self ._transfer_config = transfer_config
508+ self ._cli_params = cli_params
509+ self ._source_client = source_client
510+ self ._head_object_response = head_object_response
511+
512+ def on_queued (self , future , ** kwargs ):
513+ # Annotations only need to be copied for multipart copies. Single-part
514+ # copies carry them over server-side.
515+ if not self ._is_multipart_copy (future ):
516+ return
517+ bucket , key = self ._get_bucket_key_from_copy_source (future )
518+ source_version_id = self ._get_source_version_id ()
519+ annotation_names = self ._list_annotation_names (
520+ bucket , key , source_version_id
521+ )
522+ if not annotation_names :
523+ return
524+ annotations = {}
525+ for name in annotation_names :
526+ annotations [name ] = self ._get_annotation (
527+ bucket , key , name , source_version_id
528+ )
529+ future .meta .user_context [self ._ANNOTATIONS_CONTEXT_KEY ] = annotations
530+
531+ def _on_success (self , future ):
532+ annotations = future .meta .user_context .get (
533+ self ._ANNOTATIONS_CONTEXT_KEY
534+ )
535+ if not annotations :
536+ return
537+ bucket = future .meta .call_args .bucket
538+ key = future .meta .call_args .key
539+ dest_etag , dest_version_id = self ._get_dest_object_identity (future )
540+ succeeded = []
541+ failed = []
542+ for name , payload in annotations .items ():
543+ try :
544+ self ._put_annotation (
545+ bucket , key , name , payload , dest_etag , dest_version_id
546+ )
547+ succeeded .append (name )
548+ except Exception as e :
549+ failed .append ((name , str (e )))
550+ if failed :
551+ future .set_exception (
552+ AnnotationCopyError (bucket , key , succeeded , failed )
553+ )
554+
555+ def _get_dest_object_identity (self , future ):
556+ response = future .result ()
557+ return response .get ('ETag' ), response .get ('VersionId' )
558+
559+ def _get_source_version_id (self ):
560+ if self ._head_object_response is not None :
561+ return self ._head_object_response .get ('VersionId' )
562+ return None
563+
564+ def _list_annotation_names (self , bucket , key , version_id = None ):
565+ extra_args = {}
566+ utils .RequestParamsMapper .map_list_object_annotations_params (
567+ extra_args , self ._cli_params
568+ )
569+ if version_id is not None :
570+ extra_args ['VersionId' ] = version_id
571+ paginator = self ._source_client .get_paginator (
572+ 'list_object_annotations'
573+ )
574+ names = []
575+ for page in paginator .paginate (Bucket = bucket , Key = key , ** extra_args ):
576+ for annotation in page .get ('Annotations' , []):
577+ names .append (annotation ['AnnotationName' ])
578+ return names
579+
580+ def _get_annotation (self , bucket , key , name , version_id = None ):
581+ extra_args = {}
582+ utils .RequestParamsMapper .map_get_object_annotation_params (
583+ extra_args , self ._cli_params
584+ )
585+ if version_id is not None :
586+ extra_args ['VersionId' ] = version_id
587+ response = self ._source_client .get_object_annotation (
588+ Bucket = bucket , Key = key , AnnotationName = name , ** extra_args
589+ )
590+ return response ['AnnotationPayload' ].read ()
591+
592+ def _put_annotation (
593+ self , bucket , key , name , payload , dest_etag , dest_version_id
594+ ):
595+ extra_args = {}
596+ utils .RequestParamsMapper .map_put_object_annotation_params (
597+ extra_args , self ._cli_params
598+ )
599+ if dest_etag is not None :
600+ extra_args ['ObjectIfMatch' ] = dest_etag
601+ if dest_version_id is not None :
602+ extra_args ['VersionId' ] = dest_version_id
603+ self ._client .put_object_annotation (
604+ Bucket = bucket ,
605+ Key = key ,
606+ AnnotationName = name ,
607+ AnnotationPayload = payload ,
608+ ** extra_args ,
609+ )
610+
611+ def _get_bucket_key_from_copy_source (self , future ):
612+ copy_source = future .meta .call_args .copy_source
613+ return copy_source ['Bucket' ], copy_source ['Key' ]
614+
615+ def _is_multipart_copy (self , future ):
616+ return future .meta .size >= self ._transfer_config .multipart_threshold
0 commit comments