Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
30 changes: 18 additions & 12 deletions xet_data/src/processing/file_download_session.rs
Original file line number Diff line number Diff line change
Expand Up @@ -143,8 +143,14 @@ impl FileDownloadSession {
///
/// This path does not acquire the session-level file download semaphore.
#[instrument(skip_all, name = "FileDownloadSession::download_stream", fields(hash = file_info.hash()))]
pub fn download_stream(&self, file_info: &XetFileInfo, tracking_id: Ulid) -> Result<DownloadStream> {
let reconstructor = self.setup_reconstructor(file_info, None, tracking_id, None, None)?;
pub fn download_stream(
&self,
file_info: &XetFileInfo,
source_range: Option<Range<u64>>,
tracking_id: Ulid,
) -> Result<DownloadStream> {
let file_range = source_range.map(|r| FileRange::new(r.start, r.end));
let reconstructor = self.setup_reconstructor(file_info, file_range, tracking_id, None, None)?;
Ok(reconstructor.reconstruct_to_stream())
}

Expand Down Expand Up @@ -437,7 +443,7 @@ mod tests {
let config = TranslatorConfig::local_config(&cas_path).unwrap();
let session = FileDownloadSession::new(config.into(), None).await.unwrap();

let mut stream = session.download_stream(&xfi, Ulid::new()).unwrap();
let mut stream = session.download_stream(&xfi, None, Ulid::new()).unwrap();

let mut collected = Vec::new();
while let Some(chunk) = stream.next().await.unwrap() {
Expand All @@ -464,7 +470,7 @@ mod tests {
let config = TranslatorConfig::local_config(&cas_path).unwrap();
let session = FileDownloadSession::new(config.into(), None).await.unwrap();

let stream = session.download_stream(&xfi, Ulid::new()).unwrap();
let stream = session.download_stream(&xfi, None, Ulid::new()).unwrap();

let collected = tokio::task::spawn_blocking(move || {
let mut stream = stream;
Expand Down Expand Up @@ -497,7 +503,7 @@ mod tests {
let config = TranslatorConfig::local_config(&cas_path).unwrap();
let session = FileDownloadSession::new(config.into(), None).await.unwrap();

let mut stream = session.download_stream(&xfi, Ulid::new()).unwrap();
let mut stream = session.download_stream(&xfi, None, Ulid::new()).unwrap();

// Drain all data
while stream.next().await.unwrap().is_some() {}
Expand Down Expand Up @@ -527,8 +533,8 @@ mod tests {
let config = TranslatorConfig::local_config(&cas_path).unwrap();
let session = FileDownloadSession::new(config.into(), None).await.unwrap();

let mut stream_a = session.download_stream(&xfi_a, Ulid::new()).unwrap();
let mut stream_b = session.download_stream(&xfi_b, Ulid::new()).unwrap();
let mut stream_a = session.download_stream(&xfi_a, None, Ulid::new()).unwrap();
let mut stream_b = session.download_stream(&xfi_b, None, Ulid::new()).unwrap();

let task_a = tokio::spawn(async move {
let mut buf = Vec::new();
Expand Down Expand Up @@ -571,7 +577,7 @@ mod tests {
let session = FileDownloadSession::new(config.into(), None).await.unwrap();

// Create and drop a stream without ever reading from it.
let stream = session.download_stream(&xfi, Ulid::new()).unwrap();
let stream = session.download_stream(&xfi, None, Ulid::new()).unwrap();
drop(stream);
tokio::task::yield_now().await;

Expand Down Expand Up @@ -600,7 +606,7 @@ mod tests {

// Repeatedly create, start, optionally read, and drop streams.
for i in 0..5u32 {
let mut stream = session.download_stream(&xfi, Ulid::new()).unwrap();
let mut stream = session.download_stream(&xfi, None, Ulid::new()).unwrap();
if i % 3 == 0 {
let _ = stream.next().await;
}
Expand Down Expand Up @@ -632,7 +638,7 @@ mod tests {
let session = FileDownloadSession::new(config.into(), None).await.unwrap();

// Read one chunk via blocking next() in a spawn_blocking, then drop.
let stream = session.download_stream(&xfi, Ulid::new()).unwrap();
let stream = session.download_stream(&xfi, None, Ulid::new()).unwrap();

tokio::task::spawn_blocking(move || {
let mut stream = stream;
Expand Down Expand Up @@ -668,7 +674,7 @@ mod tests {
let config = TranslatorConfig::local_config(&cas_path).unwrap();
let session = FileDownloadSession::new(config.into(), None).await.unwrap();

let mut stream = session.download_stream(&xfi, Ulid::new()).unwrap();
let mut stream = session.download_stream(&xfi, None, Ulid::new()).unwrap();
stream.cancel();
assert!(stream.next().await.unwrap().is_none());
assert!(stream.next().await.unwrap().is_none());
Expand All @@ -691,7 +697,7 @@ mod tests {
let config = TranslatorConfig::local_config(&cas_path).unwrap();
let session = FileDownloadSession::new(config.into(), None).await.unwrap();

let mut stream = session.download_stream(&xfi, Ulid::new()).unwrap();
let mut stream = session.download_stream(&xfi, None, Ulid::new()).unwrap();
let _ = stream.next().await.unwrap();
stream.cancel();
assert!(stream.next().await.unwrap().is_none());
Expand Down
2 changes: 1 addition & 1 deletion xet_data/src/processing/test_utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -309,7 +309,7 @@ impl HydrateDehydrateTest {
let out_filename = self.dest_dir.join(entry.file_name());

let xf: XetFileInfo = serde_json::from_reader(File::open(entry.path()).unwrap()).unwrap();
let mut stream = session.download_stream(&xf, Ulid::new()).unwrap();
let mut stream = session.download_stream(&xf, None, Ulid::new()).unwrap();

let mut file = File::create(&out_filename).unwrap();
while let Some(chunk) = stream.next().await.unwrap() {
Expand Down
Loading