@@ -12,9 +12,11 @@ use std::{cmp::Reverse, process::Command, thread::available_parallelism};
1212
1313use std:: { iter:: zip, sync:: Arc } ;
1414
15- use anyhow:: { bail , Context , Result } ;
15+ use anyhow:: { Context , Result , bail } ;
1616use async_compression:: tokio:: bufread:: { GzipDecoder , ZstdDecoder } ;
17- use containers_image_proxy:: { ImageProxy , ImageProxyConfig , OpenedImage } ;
17+ use containers_image_proxy:: {
18+ ConvertedLayerInfo , ImageProxy , ImageProxyConfig , OpenedImage , Transport ,
19+ } ;
1820use indicatif:: { MultiProgress , ProgressBar , ProgressStyle } ;
1921use oci_spec:: image:: { Descriptor , ImageConfiguration , ImageManifest , MediaType } ;
2022use rustix:: process:: geteuid;
@@ -25,7 +27,7 @@ use tokio::{
2527
2628use composefs:: { fsverity:: FsVerityHashValue , repository:: Repository } ;
2729
28- use crate :: { config_identifier, layer_identifier, tar:: split_async, ContentAndVerity } ;
30+ use crate :: { ContentAndVerity , config_identifier, layer_identifier, tar:: split_async} ;
2931
3032// Content type identifiers stored as ASCII in the splitstream file
3133pub ( crate ) const TAR_LAYER_CONTENT_TYPE : u64 = u64:: from_le_bytes ( * b"ocilayer" ) ;
@@ -36,6 +38,7 @@ struct ImageOp<ObjectID: FsVerityHashValue> {
3638 proxy : ImageProxy ,
3739 img : OpenedImage ,
3840 progress : MultiProgress ,
41+ transport : Transport ,
3942}
4043
4144impl < ObjectID : FsVerityHashValue > ImageOp < ObjectID > {
@@ -44,8 +47,11 @@ impl<ObjectID: FsVerityHashValue> ImageOp<ObjectID> {
4447 imgref : & str ,
4548 img_proxy_config : Option < ImageProxyConfig > ,
4649 ) -> Result < Self > {
50+ // Detect transport from image reference
51+ let transport = Transport :: try_from ( imgref) . context ( "Failed to get image transport" ) ?;
52+
4753 // See https://github.com/containers/skopeo/issues/2563
48- let skopeo_cmd = if imgref . starts_with ( "containers-storage:" ) && !geteuid ( ) . is_root ( ) {
54+ let skopeo_cmd = if transport == Transport :: ContainerStorage && !geteuid ( ) . is_root ( ) {
4955 let mut cmd = Command :: new ( "podman" ) ;
5056 cmd. args ( [ "unshare" , "skopeo" ] ) ;
5157 Some ( cmd)
@@ -86,10 +92,17 @@ impl<ObjectID: FsVerityHashValue> ImageOp<ObjectID> {
8692 proxy,
8793 img,
8894 progress,
95+ transport,
8996 } )
9097 }
9198
92- pub async fn ensure_layer ( & self , diff_id : & str , descriptor : & Descriptor ) -> Result < ObjectID > {
99+ pub async fn ensure_layer (
100+ & self ,
101+ diff_id : & str ,
102+ descriptor : & Descriptor ,
103+ uncompressed_layer_info : Arc < Option < Vec < ConvertedLayerInfo > > > ,
104+ layer_idx : usize ,
105+ ) -> Result < ObjectID > {
93106 // We need to use the per_manifest descriptor to download the compressed layer but it gets
94107 // stored in the repository via the per_config descriptor. Our return value is the
95108 // fsverity digest for the corresponding splitstream.
@@ -101,46 +114,62 @@ impl<ObjectID: FsVerityHashValue> ImageOp<ObjectID> {
101114 Ok ( layer_id)
102115 } else {
103116 // Otherwise, we need to fetch it...
104- let ( blob_reader, driver) = self . proxy . get_descriptor ( & self . img , descriptor) . await ?;
117+ let ( ( blob_reader, driver) , layer_size) = match self . transport {
118+ Transport :: ContainerStorage => {
119+ let layer = uncompressed_layer_info
120+ . as_ref ( )
121+ . as_ref ( )
122+ . ok_or_else ( || anyhow:: anyhow!( "Failed to get uncompressed layer info" ) ) ?;
123+
124+ let layer = layer. get ( layer_idx) . ok_or_else ( || {
125+ anyhow:: anyhow!(
126+ "Failed to get uncompressed layer info for layer index {layer_idx}"
127+ )
128+ } ) ?;
129+
130+ (
131+ self . proxy
132+ . get_blob ( & self . img , & layer. digest , layer. size )
133+ . await ?,
134+ layer. size ,
135+ )
136+ }
137+
138+ _ => (
139+ self . proxy
140+ . get_blob ( & self . img , descriptor. digest ( ) , descriptor. size ( ) )
141+ . await ?,
142+ descriptor. size ( ) ,
143+ ) ,
144+ } ;
105145
106146 // See https://github.com/containers/containers-image-proxy-rs/issues/71
107- let blob_reader = blob_reader. take ( descriptor . size ( ) ) ;
147+ let blob_reader = blob_reader. take ( layer_size ) ;
108148
109- let bar = self . progress . add ( ProgressBar :: new ( descriptor . size ( ) ) ) ;
149+ let bar = self . progress . add ( ProgressBar :: new ( layer_size ) ) ;
110150 bar. set_style ( ProgressStyle :: with_template ( "[eta {eta}] {bar:40.cyan/blue} {decimal_bytes:>7}/{decimal_total_bytes:7} {msg}" )
111151 . unwrap ( )
112152 . progress_chars ( "##-" ) ) ;
113153 let progress = bar. wrap_async_read ( blob_reader) ;
114154 self . progress . println ( format ! ( "Fetching layer {diff_id}" ) ) ?;
115155
116- let object_id = match descriptor. media_type ( ) {
117- MediaType :: ImageLayer => {
118- split_async (
119- BufReader :: new ( progress) ,
120- self . repo . clone ( ) ,
121- TAR_LAYER_CONTENT_TYPE ,
122- )
123- . await ?
124- }
125- MediaType :: ImageLayerGzip => {
126- split_async (
127- BufReader :: new ( GzipDecoder :: new ( BufReader :: new ( progress) ) ) ,
128- self . repo . clone ( ) ,
129- TAR_LAYER_CONTENT_TYPE ,
130- )
131- . await ?
132- }
133- MediaType :: ImageLayerZstd => {
134- split_async (
135- BufReader :: new ( ZstdDecoder :: new ( BufReader :: new ( progress) ) ) ,
136- self . repo . clone ( ) ,
137- TAR_LAYER_CONTENT_TYPE ,
138- )
139- . await ?
140- }
141- other => bail ! ( "Unsupported layer media type {other:?}" ) ,
156+ let reader: Box < dyn tokio:: io:: AsyncBufRead + Unpin + Send > = match self . transport {
157+ Transport :: ContainerStorage => Box :: new ( BufReader :: new ( progress) ) ,
158+
159+ _ => match descriptor. media_type ( ) {
160+ MediaType :: ImageLayer => Box :: new ( BufReader :: new ( progress) ) ,
161+ MediaType :: ImageLayerGzip => {
162+ Box :: new ( BufReader :: new ( GzipDecoder :: new ( BufReader :: new ( progress) ) ) )
163+ }
164+ MediaType :: ImageLayerZstd => {
165+ Box :: new ( BufReader :: new ( ZstdDecoder :: new ( BufReader :: new ( progress) ) ) )
166+ }
167+ other => bail ! ( "Unsupported layer media type {other:?}" ) ,
168+ } ,
142169 } ;
143170
171+ let object_id = split_async ( reader, self . repo . clone ( ) , TAR_LAYER_CONTENT_TYPE ) . await ?;
172+
144173 // skopeo is doing data checksums for us to make sure the content we received is equal
145174 // to the claimed diff_id. We trust it, but we need to check it by awaiting the driver.
146175 driver. await ?;
@@ -194,14 +223,32 @@ impl<ObjectID: FsVerityHashValue> ImageOp<ObjectID> {
194223 let threads = available_parallelism ( ) ?;
195224 let sem = Arc :: new ( Semaphore :: new ( threads. into ( ) ) ) ;
196225 let mut entries = vec ! [ ] ;
226+
227+ let uncompressed_layer_info = match self . transport {
228+ Transport :: ContainerStorage => self . proxy . get_layer_info ( & self . img ) . await ?,
229+ _ => None ,
230+ } ;
231+
232+ let uncompressed_layer_info = Arc :: new ( uncompressed_layer_info) ;
233+
197234 for ( mld, diff_id) in layers {
198235 let diff_id_ = diff_id. clone ( ) ;
199236 let self_ = Arc :: clone ( self ) ;
200237 let permit = Arc :: clone ( & sem) . acquire_owned ( ) . await ?;
201238 let descriptor = mld. clone ( ) ;
239+
240+ let layer_idx = manifest_layers
241+ . iter ( )
242+ . position ( |d| * d == descriptor)
243+ . ok_or_else ( || anyhow:: anyhow!( "Layer descriptor not found in manifest" ) ) ?;
244+
245+ let uncompressed_layer_info = Arc :: clone ( & uncompressed_layer_info) ;
246+
202247 let future = tokio:: spawn ( async move {
203248 let _permit = permit;
204- self_. ensure_layer ( & diff_id_, & descriptor) . await
249+ self_
250+ . ensure_layer ( & diff_id_, & descriptor, uncompressed_layer_info, layer_idx)
251+ . await
205252 } ) ;
206253 entries. push ( ( diff_id, future) ) ;
207254 }
0 commit comments