33import hashlib
44import json
55import logging
6+ from collections import defaultdict
67from urllib .parse import urljoin , urlparse , urlunparse
78
89import aiohttp
@@ -62,7 +63,7 @@ def __init__(self, remote, signed_only):
6263 self .manifest_list_dcs = []
6364 self .manifest_dcs = []
6465 self .signature_dcs = []
65- self ._synced_digests = set ( )
66+ self ._synced_digests = defaultdict ( list )
6667 self ._full_tag_list = []
6768 self ._cosign_tags = []
6869
@@ -78,20 +79,26 @@ async def _download_manifest_data(self, manifest_url):
7879
7980 return content_data , raw_text_data , response
8081
81- async def _check_for_existing_manifest (self , download_tag ):
82- response = await download_tag
82+ async def _check_for_existing_manifest (self , head_manifest_task ):
83+ response = await head_manifest_task
8384
8485 digest = response .headers .get ("docker-content-digest" )
86+ url = response .url
87+ original_reference = url .split ("/" )[- 1 ]
8588
8689 if manifest := await Manifest .objects .filter (
8790 digest = digest , pulp_domain = get_domain ()
8891 ).afirst ():
8992 raw_text_data = manifest .data
9093 content_data = json .loads (raw_text_data )
9194 else :
92- content_data , raw_text_data , response = await self ._download_manifest_data (response .url )
95+ if not original_reference .startswith ("sha256:" ):
96+ # Fetch the tag with its digest
97+ url = url .rsplit (original_reference , 1 )[0 ] + digest
9398
94- return content_data , raw_text_data , response
99+ content_data , raw_text_data , response = await self ._download_manifest_data (url )
100+
101+ return content_data , raw_text_data , response , original_reference
95102
96103 async def run (self ):
97104 """
@@ -105,31 +112,33 @@ async def run(self):
105112 repo_name = self .remote .namespaced_upstream_name
106113 tag_list_url = "/v2/{name}/tags/list" .format (name = repo_name )
107114 self ._full_tag_list = await self .get_paginated_tag_list (tag_list_url , repo_name )
108- self ._cosign_tags = filter_resources (
109- self ._full_tag_list , ["sha256-*" ], self .remote .exclude_tags
110- )
111- if self .remote .include_tags or self .remote .exclude_tags :
115+ includes = self .remote .includes or []
116+ excludes = self .remote .excludes or []
117+
118+ digest_includes = [i for i in includes if i .startswith ("sha256:" )]
119+ self ._cosign_tags = filter_resources (self ._full_tag_list , ["sha256-*" ], excludes )
120+
121+ if includes or excludes :
112122 # Split sync into two parts, first all non-cosign tags, then cosign tags
113- exclude_tags_and_cosign = (self .remote .exclude_tags or []) + ["sha256-*" ]
114- tag_list = filter_resources (
115- self ._full_tag_list , self .remote .include_tags , exclude_tags_and_cosign
116- )
123+ exclude_and_cosign = excludes + ["sha256-*" ]
124+ filtered_tags = filter_resources (self ._full_tag_list , includes , exclude_and_cosign )
125+ manifest_list = filtered_tags + digest_includes
117126 else :
118- tag_list = self ._full_tag_list
127+ manifest_list = self ._full_tag_list
119128 await pb .aincrement ()
120129
121- await self ._process_tags ( tag_list , signature_source )
130+ await self ._process_manifests ( manifest_list , signature_source , "Processing Manifests" )
122131
123- if self . remote . include_tags or self . remote . exclude_tags :
124- # Process cosign companion tags after all non-cosign tags are synced
132+ if includes or excludes :
133+ # Process cosign companion tags after all primary content is synced
125134 companion_tags = self ._find_cosign_companion_tags ()
126135 if companion_tags :
127136 log .info (
128137 "Syncing %d cosign companion tag(s) for filtered images" ,
129138 len (companion_tags ),
130139 )
131- await self ._process_tags (
132- companion_tags , signature_source , msg = "Processing Cosign Companion Tags"
140+ await self ._process_manifests (
141+ companion_tags , signature_source , "Processing Cosign Companion Tags"
133142 )
134143
135144 def _find_cosign_companion_tags (self ):
@@ -143,54 +152,53 @@ def _find_cosign_companion_tags(self):
143152 companion_tags .append (tag )
144153 return companion_tags
145154
146- async def _process_tags (self , tag_list , signature_source , msg = "Processing Tags" ):
147- """Download and process a batch of tags , creating declarative content objects."""
155+ async def _process_manifests (self , manifests , signature_source , msg ):
156+ """Download and process a batch of manifests , creating declarative content objects."""
148157 BATCH_SIZE = 500
149158 to_download = []
150159
151- for tag_name in tag_list :
152- relative_url = "/v2/{name}/manifests/{tag}" .format (
153- name = self .remote .namespaced_upstream_name , tag = tag_name
154- )
155- tag_url = urljoin (self .remote .url , relative_url )
156- downloader = self .remote .get_downloader (url = tag_url )
160+ for reference in manifests :
161+ relative_url = f"/v2/{ self .remote .namespaced_upstream_name } /manifests/{ reference } "
162+ manifest_url = urljoin (self .remote .url , relative_url )
163+ downloader = self .remote .get_downloader (url = manifest_url )
157164 to_download .append (
158165 downloader .run (extra_data = {"headers" : V2_ACCEPT_HEADERS , "http_method" : "head" })
159166 )
160167
161168 async with ProgressReport (
162169 message = msg ,
163- code = "sync.processing.tag " ,
164- total = len (tag_list ),
165- ) as pb_parsed_tags :
170+ code = "sync.processing.manifest " ,
171+ total = len (manifests ),
172+ ) as pb_parsed_manifests :
166173 to_download_artifact = [
167- self ._check_for_existing_manifest (download_tag )
168- for download_tag in asyncio .as_completed (to_download )
174+ self ._check_for_existing_manifest (download_manifest )
175+ for download_manifest in asyncio .as_completed (to_download )
169176 ]
170177
171178 for artifact in asyncio .as_completed (to_download_artifact ):
172- content_data , raw_text_data , response = await artifact
179+ content_data , raw_text_data , response , manifest_ref = await artifact
173180
174181 digest = calculate_digest (raw_text_data )
175- tag_name = response . url . split ( "/" )[ - 1 ]
182+ is_tag = not manifest_ref . startswith ( "sha256:" )
176183 media_type = determine_media_type (content_data , response )
177184
178185 if self .signed_only and not signature_source :
179186 if not (
180- self ._is_cosign_companion_tag (tag_name , media_type , content_data )
187+ self ._is_cosign_companion_tag (manifest_ref , media_type , content_data )
181188 or await self ._has_cosign_signature (digest )
182189 ):
183190 log .info (
184191 "The unsigned image {digest} can't be synced "
185192 "due to a requirement to sync signed content "
186193 "only." .format (digest = digest )
187194 )
188- await pb_parsed_tags .aincrement ()
195+ await pb_parsed_manifests .aincrement ()
189196 continue
190197
191198 validate_manifest (content_data , media_type , digest )
192199
193- tag_dc = DeclarativeContent (Tag (name = tag_name ))
200+ if is_tag :
201+ tag_dc = DeclarativeContent (Tag (name = manifest_ref ))
194202
195203 if media_type in (MEDIA_TYPE .MANIFEST_LIST , MEDIA_TYPE .INDEX_OCI ):
196204 list_dc = self .create_manifest_list (
@@ -214,7 +222,7 @@ async def _process_tags(self, tag_list, signature_source, msg="Processing Tags")
214222 "The whole manifest list is skipped." .format (
215223 img_digest = man_dc .content .digest ,
216224 ml_digest = list_dc .content .digest ,
217- tag = tag_name ,
225+ tag = manifest_ref ,
218226 )
219227 )
220228 break
@@ -224,20 +232,23 @@ async def _process_tags(self, tag_list, signature_source, msg="Processing Tags")
224232 else :
225233 # Manifest indices can be signed too. It is not mandatory.
226234 # If signature is available mirror it.
227- self ._synced_digests . add ( digest )
235+ self ._synced_digests [ digest ]. append ( manifest_ref )
228236 if signature_source is not None :
229237 list_sig_dcs = await self .create_signatures (list_dc , signature_source )
230238 if list_sig_dcs :
231239 self .signature_dcs .extend (list_sig_dcs )
232- tag_dc .extra_data ["tagged_manifest_dc" ] = list_dc
233240 for listed_manifest in list_dc .extra_data ["listed_manifests" ]:
234- self ._synced_digests .add (listed_manifest ["manifest_dc" ].content .digest )
241+ self ._synced_digests [
242+ listed_manifest ["manifest_dc" ].content .digest
243+ ].append (manifest_ref )
235244 await self .handle_blobs (
236245 listed_manifest ["manifest_dc" ], listed_manifest ["content_data" ]
237246 )
238247 self .manifest_dcs .append (listed_manifest ["manifest_dc" ])
239248 self .manifest_list_dcs .append (list_dc )
240- self .tag_dcs .append (tag_dc )
249+ if is_tag :
250+ tag_dc .extra_data ["tagged_manifest_dc" ] = list_dc
251+ self .tag_dcs .append (tag_dc )
241252
242253 else :
243254 # Simple tagged manifest
@@ -249,14 +260,15 @@ async def _process_tags(self, tag_list, signature_source, msg="Processing Tags")
249260 if self .signed_only and not man_sig_dcs :
250261 continue
251262 self .signature_dcs .extend (man_sig_dcs )
252- self ._synced_digests .add (digest )
253- tag_dc .extra_data ["tagged_manifest_dc" ] = man_dc
263+ self ._synced_digests [digest ].append (manifest_ref )
254264 await self .handle_blobs (man_dc , content_data )
255- self .tag_dcs .append (tag_dc )
265+ if is_tag :
266+ tag_dc .extra_data ["tagged_manifest_dc" ] = man_dc
267+ self .tag_dcs .append (tag_dc )
256268 self .manifest_dcs .append (man_dc )
257269
258270 # Count the skipped tasks as parsed too.
259- await pb_parsed_tags .aincrement ()
271+ await pb_parsed_manifests .aincrement ()
260272
261273 # Flush the queues to prevent overly excessive memory usage.
262274 # This will cap the number of in flight high level objects to about BATCH_SIZE.
0 commit comments