77
88import aiohttp
99from asgiref .sync import sync_to_async
10+ from collections import defaultdict
1011
1112from pulpcore .plugin .models import Artifact , ProgressReport , Remote
1213from pulpcore .plugin .stages import ContentSaver , DeclarativeArtifact , DeclarativeContent , Stage
@@ -62,7 +63,7 @@ def __init__(self, remote, signed_only):
6263 self .manifest_list_dcs = []
6364 self .manifest_dcs = []
6465 self .signature_dcs = []
65- self ._synced_digests = set ( )
66+ self ._synced_digests = defaultdict ( list )
6667 self ._full_tag_list = []
6768 self ._cosign_tags = []
6869
@@ -78,20 +79,26 @@ async def _download_manifest_data(self, manifest_url):
7879
7980 return content_data , raw_text_data , response
8081
81- async def _check_for_existing_manifest (self , download_tag ):
82- response = await download_tag
82+ async def _check_for_existing_manifest (self , head_manifest_task ):
83+ response = await head_manifest_task
8384
8485 digest = response .headers .get ("docker-content-digest" )
86+ url = response .url
87+ original_reference = url .split ("/" )[- 1 ]
8588
8689 if manifest := await Manifest .objects .filter (
8790 digest = digest , pulp_domain = get_domain ()
8891 ).afirst ():
8992 raw_text_data = manifest .data
9093 content_data = json .loads (raw_text_data )
9194 else :
92- content_data , raw_text_data , response = await self ._download_manifest_data (response .url )
95+ if not original_reference .startswith ("sha256:" ):
96+ # Fetch the tag with its digest
97+ url = url .rsplit (original_reference , 1 )[0 ] + digest
9398
94- return content_data , raw_text_data , response
99+ content_data , raw_text_data , response = await self ._download_manifest_data (url )
100+
101+ return content_data , raw_text_data , response , original_reference
95102
96103 async def run (self ):
97104 """
@@ -105,31 +112,38 @@ async def run(self):
105112 repo_name = self .remote .namespaced_upstream_name
106113 tag_list_url = "/v2/{name}/tags/list" .format (name = repo_name )
107114 self ._full_tag_list = await self .get_paginated_tag_list (tag_list_url , repo_name )
108- self ._cosign_tags = filter_resources (
109- self ._full_tag_list , ["sha256-*" ], self .remote .exclude_tags
110- )
111- if self .remote .include_tags or self .remote .exclude_tags :
115+ includes = self .remote .includes or []
116+ excludes = self .remote .excludes or []
117+
118+ tag_includes = [i for i in includes if not i .startswith ("sha256:" )]
119+ digest_includes = [i for i in includes if i .startswith ("sha256:" )]
120+ tag_excludes = [e for e in excludes if not e .startswith ("sha256:" )]
121+
122+ self ._cosign_tags = filter_resources (self ._full_tag_list , ["sha256-*" ], tag_excludes )
123+
124+ if includes or excludes :
112125 # Split sync into two parts, first all non-cosign tags, then cosign tags
113- exclude_tags_and_cosign = ( self . remote . exclude_tags or []) + ["sha256-*" ]
114- tag_list = filter_resources (
115- self ._full_tag_list , self . remote . include_tags , exclude_tags_and_cosign
126+ exclude_tags_and_cosign = tag_excludes + ["sha256-*" ]
127+ filtered_tags = filter_resources (
128+ self ._full_tag_list , tag_includes , exclude_tags_and_cosign
116129 )
130+ manifest_list = filtered_tags + digest_includes
117131 else :
118- tag_list = self ._full_tag_list
132+ manifest_list = self ._full_tag_list
119133 await pb .aincrement ()
120134
121- await self ._process_tags ( tag_list , signature_source )
135+ await self ._process_manifests ( manifest_list , signature_source , "Processing Manifests" )
122136
123- if self . remote . include_tags or self . remote . exclude_tags :
124- # Process cosign companion tags after all non-cosign tags are synced
137+ if includes or excludes :
138+ # Process cosign companion tags after all primary content is synced
125139 companion_tags = self ._find_cosign_companion_tags ()
126140 if companion_tags :
127141 log .info (
128142 "Syncing %d cosign companion tag(s) for filtered images" ,
129143 len (companion_tags ),
130144 )
131- await self ._process_tags (
132- companion_tags , signature_source , msg = "Processing Cosign Companion Tags"
145+ await self ._process_manifests (
146+ companion_tags , signature_source , "Processing Cosign Companion Tags"
133147 )
134148
135149 def _find_cosign_companion_tags (self ):
@@ -143,54 +157,53 @@ def _find_cosign_companion_tags(self):
143157 companion_tags .append (tag )
144158 return companion_tags
145159
146- async def _process_tags (self , tag_list , signature_source , msg = "Processing Tags" ):
147- """Download and process a batch of tags , creating declarative content objects."""
160+ async def _process_manifests (self , manifests , signature_source , msg ):
161+ """Download and process a batch of manifests , creating declarative content objects."""
148162 BATCH_SIZE = 500
149163 to_download = []
150164
151- for tag_name in tag_list :
152- relative_url = "/v2/{name}/manifests/{tag}" .format (
153- name = self .remote .namespaced_upstream_name , tag = tag_name
154- )
155- tag_url = urljoin (self .remote .url , relative_url )
156- downloader = self .remote .get_downloader (url = tag_url )
165+ for reference in manifests :
166+ relative_url = f"/v2/{ self .remote .namespaced_upstream_name } /manifests/{ reference } "
167+ manifest_url = urljoin (self .remote .url , relative_url )
168+ downloader = self .remote .get_downloader (url = manifest_url )
157169 to_download .append (
158170 downloader .run (extra_data = {"headers" : V2_ACCEPT_HEADERS , "http_method" : "head" })
159171 )
160172
161173 async with ProgressReport (
162174 message = msg ,
163- code = "sync.processing.tag " ,
164- total = len (tag_list ),
165- ) as pb_parsed_tags :
175+ code = "sync.processing.manifest " ,
176+ total = len (manifests ),
177+ ) as pb_parsed_manifests :
166178 to_download_artifact = [
167- self ._check_for_existing_manifest (download_tag )
168- for download_tag in asyncio .as_completed (to_download )
179+ self ._check_for_existing_manifest (download_manifest )
180+ for download_manifest in asyncio .as_completed (to_download )
169181 ]
170182
171183 for artifact in asyncio .as_completed (to_download_artifact ):
172- content_data , raw_text_data , response = await artifact
184+ content_data , raw_text_data , response , manifest_ref = await artifact
173185
174186 digest = calculate_digest (raw_text_data )
175- tag_name = response . url . split ( "/" )[ - 1 ]
187+ is_tag = not manifest_ref . startswith ( "sha256:" )
176188 media_type = determine_media_type (content_data , response )
177189
178190 if self .signed_only and not signature_source :
179191 if not (
180- self ._is_cosign_companion_tag (tag_name , media_type , content_data )
192+ self ._is_cosign_companion_tag (manifest_ref , media_type , content_data )
181193 or await self ._has_cosign_signature (digest )
182194 ):
183195 log .info (
184196 "The unsigned image {digest} can't be synced "
185197 "due to a requirement to sync signed content "
186198 "only." .format (digest = digest )
187199 )
188- await pb_parsed_tags .aincrement ()
200+ await pb_parsed_manifests .aincrement ()
189201 continue
190202
191203 validate_manifest (content_data , media_type , digest )
192204
193- tag_dc = DeclarativeContent (Tag (name = tag_name ))
205+ if is_tag :
206+ tag_dc = DeclarativeContent (Tag (name = manifest_ref ))
194207
195208 if media_type in (MEDIA_TYPE .MANIFEST_LIST , MEDIA_TYPE .INDEX_OCI ):
196209 list_dc = self .create_manifest_list (
@@ -214,7 +227,7 @@ async def _process_tags(self, tag_list, signature_source, msg="Processing Tags")
214227 "The whole manifest list is skipped." .format (
215228 img_digest = man_dc .content .digest ,
216229 ml_digest = list_dc .content .digest ,
217- tag = tag_name ,
230+ tag = manifest_ref ,
218231 )
219232 )
220233 break
@@ -224,20 +237,23 @@ async def _process_tags(self, tag_list, signature_source, msg="Processing Tags")
224237 else :
225238 # Manifest indices can be signed too. It is not mandatory.
226239 # If signature is available mirror it.
227- self ._synced_digests . add ( digest )
240+ self ._synced_digests [ digest ]. append ( manifest_ref )
228241 if signature_source is not None :
229242 list_sig_dcs = await self .create_signatures (list_dc , signature_source )
230243 if list_sig_dcs :
231244 self .signature_dcs .extend (list_sig_dcs )
232- tag_dc .extra_data ["tagged_manifest_dc" ] = list_dc
233245 for listed_manifest in list_dc .extra_data ["listed_manifests" ]:
234- self ._synced_digests .add (listed_manifest ["manifest_dc" ].content .digest )
246+ self ._synced_digests [
247+ listed_manifest ["manifest_dc" ].content .digest
248+ ].append (manifest_ref )
235249 await self .handle_blobs (
236250 listed_manifest ["manifest_dc" ], listed_manifest ["content_data" ]
237251 )
238252 self .manifest_dcs .append (listed_manifest ["manifest_dc" ])
239253 self .manifest_list_dcs .append (list_dc )
240- self .tag_dcs .append (tag_dc )
254+ if is_tag :
255+ tag_dc .extra_data ["tagged_manifest_dc" ] = list_dc
256+ self .tag_dcs .append (tag_dc )
241257
242258 else :
243259 # Simple tagged manifest
@@ -249,14 +265,15 @@ async def _process_tags(self, tag_list, signature_source, msg="Processing Tags")
249265 if self .signed_only and not man_sig_dcs :
250266 continue
251267 self .signature_dcs .extend (man_sig_dcs )
252- self ._synced_digests .add (digest )
253- tag_dc .extra_data ["tagged_manifest_dc" ] = man_dc
268+ self ._synced_digests [digest ].append (manifest_ref )
254269 await self .handle_blobs (man_dc , content_data )
255- self .tag_dcs .append (tag_dc )
270+ if is_tag :
271+ tag_dc .extra_data ["tagged_manifest_dc" ] = man_dc
272+ self .tag_dcs .append (tag_dc )
256273 self .manifest_dcs .append (man_dc )
257274
258275 # Count the skipped tasks as parsed too.
259- await pb_parsed_tags .aincrement ()
276+ await pb_parsed_manifests .aincrement ()
260277
261278 # Flush the queues to prevent overly excessive memory usage.
262279 # This will cap the number of in flight high level objects to about BATCH_SIZE.
0 commit comments