22import itertools
33import logging
44import subprocess
5+ import time
56from abc import ABC , abstractmethod
67from pathlib import Path
7- from typing import Annotated , Self
8+ from typing import Annotated , Callable , Self
89
910from pydantic import BaseModel , ConfigDict , Field , model_validator
1011
1112from posit_bakery .error import BakeryToolRuntimeError
1213from posit_bakery .image .image_target import ImageTarget , Tag
14+ from posit_bakery .retry import RetryPolicy , is_transient_error , retry_on_transient
1315from posit_bakery .util import find_bin
1416
1517log = logging .getLogger (__name__ )
@@ -200,6 +202,7 @@ class OrasIndexCreateWorkflow(BaseModel):
200202 image_target : Annotated [ImageTarget , Field (description = "Target this index represents." )]
201203 annotations : Annotated [dict [str , str ], Field (default_factory = dict )]
202204 plain_http : Annotated [bool , Field (default = False )]
205+ retry_policy : Annotated [RetryPolicy , Field (default_factory = RetryPolicy )]
203206
204207 @property
205208 def temp_index_tag (self ) -> str :
@@ -209,14 +212,22 @@ def temp_index_tag(self) -> str:
209212 )
210213
211214 def run (self , dry_run : bool = False ) -> OrasIndexCreateResult :
215+ # Retry transient registry errors: the per-platform source manifests
216+ # are pushed by digest from separate runners and may not yet be
217+ # readable here due to registry eventual consistency.
218+ cmd = OrasManifestIndexCreate (
219+ oras_bin = self .oras_bin ,
220+ sources = self .image_target .get_merge_sources (),
221+ destination = self .temp_index_tag ,
222+ annotations = self .annotations ,
223+ plain_http = self .plain_http ,
224+ )
212225 try :
213- OrasManifestIndexCreate (
214- oras_bin = self .oras_bin ,
215- sources = self .image_target .get_merge_sources (),
216- destination = self .temp_index_tag ,
217- annotations = self .annotations ,
218- plain_http = self .plain_http ,
219- ).run (dry_run = dry_run )
226+ retry_on_transient (
227+ lambda : cmd .run (dry_run = dry_run ),
228+ policy = self .retry_policy ,
229+ description = f"index-create for '{ self .image_target .uid } '" ,
230+ )
220231 return OrasIndexCreateResult (success = True , temp_ref = self .temp_index_tag )
221232 except BakeryToolRuntimeError as e :
222233 log .error (f"oras index-create failed: { e } " )
@@ -239,18 +250,26 @@ class OrasIndexCopyWorkflow(BaseModel):
239250 oras_bin : Annotated [str , Field (description = "Path to the oras binary." )]
240251 image_target : Annotated [ImageTarget , Field (description = "Target whose tags to fan out to." )]
241252 plain_http : Annotated [bool , Field (default = False )]
253+ retry_policy : Annotated [RetryPolicy , Field (default_factory = RetryPolicy )]
242254
243255 def run (self , source : str , dry_run : bool = False ) -> OrasIndexCopyResult :
244256 try :
245257 destinations = []
246258 for destination , tags in itertools .groupby (self .image_target .tags , lambda x : x .destination ):
247259 combined = destination + ":" + "," .join (t .suffix for t in tags )
248- OrasCopy (
260+ copy = OrasCopy (
249261 oras_bin = self .oras_bin ,
250262 source = source ,
251263 destination = combined ,
252264 plain_http = self .plain_http ,
253- ).run (dry_run = dry_run )
265+ )
266+ # Retry transient registry errors: the temp-registry source
267+ # index may still be propagating when the copy first reads it.
268+ retry_on_transient (
269+ lambda c = copy : c .run (dry_run = dry_run ),
270+ policy = self .retry_policy ,
271+ description = f"index-copy for '{ self .image_target .uid } ' -> { combined } " ,
272+ )
254273 destinations .append (combined )
255274 return OrasIndexCopyResult (success = True , destinations = destinations )
256275 except BakeryToolRuntimeError as e :
@@ -303,6 +322,108 @@ def run(self, dry_run: bool = False) -> OrasIndexVerifyResult:
303322 return OrasIndexVerifyResult (success = False , verified = verified , error = str (e ))
304323
305324
325+ class OrasSourcesReadyResult (BaseModel ):
326+ """Result of a pre-flight source-digest availability wait."""
327+
328+ success : Annotated [bool , Field (description = "Whether every source digest became readable before the timeout." )]
329+ ready : Annotated [
330+ list [str ], Field (default_factory = list , description = "Source refs confirmed readable, in resolution order." )
331+ ]
332+ missing : Annotated [
333+ list [str ], Field (default_factory = list , description = "Source refs still unreadable when the wait gave up." )
334+ ]
335+ waited_seconds : Annotated [float , Field (default = 0.0 , description = "Wall-clock seconds spent waiting." )]
336+ error : Annotated [str | None , Field (default = None , description = "Diagnostic message on timeout." )]
337+
338+
339+ class OrasWaitForSourcesWorkflow (BaseModel ):
340+ """Poll source digests until they are all readable from the registry.
341+
342+ Per-platform manifests are pushed *by digest* from separate build runners,
343+ and registries with read-after-write (eventual consistency) behaviour —
344+ notably GHCR — may briefly 404 those digests when the publish runner first
345+ asks for them. This pre-flight turns "hope it has propagated" into
346+ condition-based waiting: each source is probed with ``oras manifest fetch
347+ --descriptor`` (a lightweight existence check) and removed from the pending
348+ set once it resolves. The wait succeeds as soon as every source resolves,
349+ and fails (logging exactly which digests lagged) once ``timeout`` elapses.
350+ """
351+
352+ model_config = ConfigDict (arbitrary_types_allowed = True )
353+
354+ oras_bin : Annotated [str , Field (description = "Path to the oras binary." )]
355+ sources : Annotated [list [str ], Field (description = "Source refs (registry refs, typically by-digest) to await." )]
356+ timeout : Annotated [float , Field (default = 600.0 , description = "Maximum seconds to wait for all sources (10 min)." )]
357+ poll_interval : Annotated [float , Field (default = 5.0 , description = "Seconds between polling sweeps." )]
358+ plain_http : Annotated [bool , Field (default = False )]
359+
360+ def _is_available (self , ref : str ) -> bool :
361+ try :
362+ OrasManifestFetch (
363+ oras_bin = self .oras_bin ,
364+ reference = ref ,
365+ descriptor = True ,
366+ plain_http = self .plain_http ,
367+ ).run (dry_run = False )
368+ return True
369+ except BakeryToolRuntimeError as e :
370+ if is_transient_error (e ):
371+ return False
372+ raise
373+
374+ def run (
375+ self ,
376+ dry_run : bool = False ,
377+ * ,
378+ sleep : Callable [[float ], None ] = time .sleep ,
379+ now : Callable [[], float ] = time .monotonic ,
380+ ) -> OrasSourcesReadyResult :
381+ """Probe each source until all resolve or ``timeout`` elapses.
382+
383+ :param dry_run: When True, report success without contacting the
384+ registry (nothing has been pushed to wait on).
385+ :param sleep: Sleep function, injectable for testing.
386+ :param now: Monotonic clock, injectable for testing.
387+ """
388+ unique_sources = list (dict .fromkeys (self .sources ))
389+ if dry_run or not unique_sources :
390+ return OrasSourcesReadyResult (success = True , ready = unique_sources )
391+
392+ start = now ()
393+ ready : list [str ] = []
394+ pending = list (unique_sources )
395+ while True :
396+ still_pending : list [str ] = []
397+ for ref in pending :
398+ if self ._is_available (ref ):
399+ ready .append (ref )
400+ else :
401+ still_pending .append (ref )
402+ pending = still_pending
403+
404+ if not pending :
405+ return OrasSourcesReadyResult (success = True , ready = ready , waited_seconds = now () - start )
406+
407+ elapsed = now () - start
408+ if elapsed >= self .timeout :
409+ return OrasSourcesReadyResult (
410+ success = False ,
411+ ready = ready ,
412+ missing = pending ,
413+ waited_seconds = elapsed ,
414+ error = (
415+ f"{ len (pending )} source digest(s) still unreadable after { elapsed :.0f} s "
416+ f"(timeout { self .timeout :.0f} s): { ', ' .join (pending )} "
417+ ),
418+ )
419+
420+ log .info (
421+ f"Waiting on { len (pending )} source digest(s) to become readable "
422+ f"({ elapsed :.0f} s/{ self .timeout :.0f} s elapsed); retrying in { self .poll_interval :.0f} s."
423+ )
424+ sleep (self .poll_interval )
425+
426+
306427class OrasMergeWorkflowResult (BaseModel ):
307428 """Result of an ORAS merge workflow execution."""
308429
0 commit comments