diff --git a/xet_data/src/processing/file_download_session.rs b/xet_data/src/processing/file_download_session.rs index 8a8c70014..793ec6fc9 100644 --- a/xet_data/src/processing/file_download_session.rs +++ b/xet_data/src/processing/file_download_session.rs @@ -143,8 +143,14 @@ impl FileDownloadSession { /// /// This path does not acquire the session-level file download semaphore. #[instrument(skip_all, name = "FileDownloadSession::download_stream", fields(hash = file_info.hash()))] - pub fn download_stream(&self, file_info: &XetFileInfo, tracking_id: Ulid) -> Result { - let reconstructor = self.setup_reconstructor(file_info, None, tracking_id, None, None)?; + pub fn download_stream( + &self, + file_info: &XetFileInfo, + source_range: Option>, + tracking_id: Ulid, + ) -> Result { + let file_range = source_range.map(|r| FileRange::new(r.start, r.end)); + let reconstructor = self.setup_reconstructor(file_info, file_range, tracking_id, None, None)?; Ok(reconstructor.reconstruct_to_stream()) } @@ -437,7 +443,7 @@ mod tests { let config = TranslatorConfig::local_config(&cas_path).unwrap(); let session = FileDownloadSession::new(config.into(), None).await.unwrap(); - let mut stream = session.download_stream(&xfi, Ulid::new()).unwrap(); + let mut stream = session.download_stream(&xfi, None, Ulid::new()).unwrap(); let mut collected = Vec::new(); while let Some(chunk) = stream.next().await.unwrap() { @@ -464,7 +470,7 @@ mod tests { let config = TranslatorConfig::local_config(&cas_path).unwrap(); let session = FileDownloadSession::new(config.into(), None).await.unwrap(); - let stream = session.download_stream(&xfi, Ulid::new()).unwrap(); + let stream = session.download_stream(&xfi, None, Ulid::new()).unwrap(); let collected = tokio::task::spawn_blocking(move || { let mut stream = stream; @@ -497,7 +503,7 @@ mod tests { let config = TranslatorConfig::local_config(&cas_path).unwrap(); let session = FileDownloadSession::new(config.into(), None).await.unwrap(); - let mut stream = session.download_stream(&xfi, Ulid::new()).unwrap(); + let mut stream = session.download_stream(&xfi, None, Ulid::new()).unwrap(); // Drain all data while stream.next().await.unwrap().is_some() {} @@ -527,8 +533,8 @@ mod tests { let config = TranslatorConfig::local_config(&cas_path).unwrap(); let session = FileDownloadSession::new(config.into(), None).await.unwrap(); - let mut stream_a = session.download_stream(&xfi_a, Ulid::new()).unwrap(); - let mut stream_b = session.download_stream(&xfi_b, Ulid::new()).unwrap(); + let mut stream_a = session.download_stream(&xfi_a, None, Ulid::new()).unwrap(); + let mut stream_b = session.download_stream(&xfi_b, None, Ulid::new()).unwrap(); let task_a = tokio::spawn(async move { let mut buf = Vec::new(); @@ -571,7 +577,7 @@ mod tests { let session = FileDownloadSession::new(config.into(), None).await.unwrap(); // Create and drop a stream without ever reading from it. - let stream = session.download_stream(&xfi, Ulid::new()).unwrap(); + let stream = session.download_stream(&xfi, None, Ulid::new()).unwrap(); drop(stream); tokio::task::yield_now().await; @@ -600,7 +606,7 @@ mod tests { // Repeatedly create, start, optionally read, and drop streams. for i in 0..5u32 { - let mut stream = session.download_stream(&xfi, Ulid::new()).unwrap(); + let mut stream = session.download_stream(&xfi, None, Ulid::new()).unwrap(); if i % 3 == 0 { let _ = stream.next().await; } @@ -632,7 +638,7 @@ mod tests { let session = FileDownloadSession::new(config.into(), None).await.unwrap(); // Read one chunk via blocking next() in a spawn_blocking, then drop. - let stream = session.download_stream(&xfi, Ulid::new()).unwrap(); + let stream = session.download_stream(&xfi, None, Ulid::new()).unwrap(); tokio::task::spawn_blocking(move || { let mut stream = stream; @@ -668,7 +674,7 @@ mod tests { let config = TranslatorConfig::local_config(&cas_path).unwrap(); let session = FileDownloadSession::new(config.into(), None).await.unwrap(); - let mut stream = session.download_stream(&xfi, Ulid::new()).unwrap(); + let mut stream = session.download_stream(&xfi, None, Ulid::new()).unwrap(); stream.cancel(); assert!(stream.next().await.unwrap().is_none()); assert!(stream.next().await.unwrap().is_none()); @@ -691,7 +697,7 @@ mod tests { let config = TranslatorConfig::local_config(&cas_path).unwrap(); let session = FileDownloadSession::new(config.into(), None).await.unwrap(); - let mut stream = session.download_stream(&xfi, Ulid::new()).unwrap(); + let mut stream = session.download_stream(&xfi, None, Ulid::new()).unwrap(); let _ = stream.next().await.unwrap(); stream.cancel(); assert!(stream.next().await.unwrap().is_none()); diff --git a/xet_data/src/processing/test_utils.rs b/xet_data/src/processing/test_utils.rs index 6ce51cff3..e6057f53b 100644 --- a/xet_data/src/processing/test_utils.rs +++ b/xet_data/src/processing/test_utils.rs @@ -309,7 +309,7 @@ impl HydrateDehydrateTest { let out_filename = self.dest_dir.join(entry.file_name()); let xf: XetFileInfo = serde_json::from_reader(File::open(entry.path()).unwrap()).unwrap(); - let mut stream = session.download_stream(&xf, Ulid::new()).unwrap(); + let mut stream = session.download_stream(&xf, None, Ulid::new()).unwrap(); let mut file = File::create(&out_filename).unwrap(); while let Some(chunk) = stream.next().await.unwrap() {