From 1d10f6b855f42478d20b76ad34c3a27e0a0f58d8 Mon Sep 17 00:00:00 2001 From: Victor Ghita Date: Mon, 30 Mar 2026 14:31:15 +0300 Subject: [PATCH 1/3] Implement futures unordered when checking --- crates/iceberg/src/transaction/snapshot.rs | 15 +++++++++++---- 1 file changed, 11 insertions(+), 4 deletions(-) diff --git a/crates/iceberg/src/transaction/snapshot.rs b/crates/iceberg/src/transaction/snapshot.rs index c8bf26a174..8c6c5f1501 100644 --- a/crates/iceberg/src/transaction/snapshot.rs +++ b/crates/iceberg/src/transaction/snapshot.rs @@ -19,6 +19,8 @@ use std::collections::{HashMap, HashSet}; use std::future::Future; use std::ops::RangeFrom; +use futures::TryStreamExt; +use futures::stream::FuturesUnordered; use uuid::Uuid; use crate::error::Result; @@ -175,10 +177,15 @@ impl<'a> SnapshotProducer<'a> { let manifest_list = current_snapshot .load_manifest_list(self.table.file_io(), &self.table.metadata_ref()) .await?; - for manifest_list_entry in manifest_list.entries() { - let manifest = manifest_list_entry - .load_manifest(self.table.file_io()) - .await?; + + let file_io = self.table.file_io(); + let mut manifest_futures: FuturesUnordered<_> = manifest_list + .entries() + .iter() + .map(|entry| entry.load_manifest(file_io)) + .collect(); + + while let Some(manifest) = manifest_futures.try_next().await? { for entry in manifest.entries() { let file_path = entry.file_path(); if new_files.contains(file_path) && entry.is_alive() { From 65da6db877da228882f9936761d70d6f927e615e Mon Sep 17 00:00:00 2001 From: Victor Ghita Date: Tue, 31 Mar 2026 13:52:42 +0300 Subject: [PATCH 2/3] Change to task per fetch --- crates/iceberg/src/transaction/snapshot.rs | 15 +++++++++------ 1 file changed, 9 insertions(+), 6 deletions(-) diff --git a/crates/iceberg/src/transaction/snapshot.rs b/crates/iceberg/src/transaction/snapshot.rs index 8c6c5f1501..7ff421a3e2 100644 --- a/crates/iceberg/src/transaction/snapshot.rs +++ b/crates/iceberg/src/transaction/snapshot.rs @@ -24,6 +24,7 @@ use futures::stream::FuturesUnordered; use uuid::Uuid; use crate::error::Result; +use crate::runtime::spawn; use crate::spec::{ DataFile, DataFileFormat, FormatVersion, MAIN_BRANCH, ManifestContentType, ManifestEntry, ManifestFile, ManifestListWriter, ManifestWriter, ManifestWriterBuilder, Operation, Snapshot, @@ -178,14 +179,16 @@ impl<'a> SnapshotProducer<'a> { .load_manifest_list(self.table.file_io(), &self.table.metadata_ref()) .await?; - let file_io = self.table.file_io(); - let mut manifest_futures: FuturesUnordered<_> = manifest_list - .entries() - .iter() - .map(|entry| entry.load_manifest(file_io)) + let mut manifest_tasks: FuturesUnordered<_> = manifest_list + .consume_entries() + .into_iter() + .map(|entry| { + let file_io = self.table.file_io().clone(); + spawn(async move { entry.load_manifest(&file_io).await }) + }) .collect(); - while let Some(manifest) = manifest_futures.try_next().await? { + while let Some(manifest) = manifest_tasks.try_next().await? { for entry in manifest.entries() { let file_path = entry.file_path(); if new_files.contains(file_path) && entry.is_alive() { From b9d544fc2bce9b0a036d530b32a66c03e6434ffc Mon Sep 17 00:00:00 2001 From: Victor Ghita Date: Wed, 1 Apr 2026 11:31:07 +0300 Subject: [PATCH 3/3] Move to buffered and threads --- crates/iceberg/src/transaction/snapshot.rs | 27 +++++++++++----------- 1 file changed, 13 insertions(+), 14 deletions(-) diff --git a/crates/iceberg/src/transaction/snapshot.rs b/crates/iceberg/src/transaction/snapshot.rs index 7ff421a3e2..988bbf3ebc 100644 --- a/crates/iceberg/src/transaction/snapshot.rs +++ b/crates/iceberg/src/transaction/snapshot.rs @@ -19,8 +19,7 @@ use std::collections::{HashMap, HashSet}; use std::future::Future; use std::ops::RangeFrom; -use futures::TryStreamExt; -use futures::stream::FuturesUnordered; +use futures::{StreamExt, TryStreamExt}; use uuid::Uuid; use crate::error::Result; @@ -179,23 +178,23 @@ impl<'a> SnapshotProducer<'a> { .load_manifest_list(self.table.file_io(), &self.table.metadata_ref()) .await?; - let mut manifest_tasks: FuturesUnordered<_> = manifest_list - .consume_entries() - .into_iter() + let entries: Vec<_> = manifest_list.consume_entries().into_iter().collect(); + futures::stream::iter(entries) .map(|entry| { let file_io = self.table.file_io().clone(); spawn(async move { entry.load_manifest(&file_io).await }) }) - .collect(); - - while let Some(manifest) = manifest_tasks.try_next().await? { - for entry in manifest.entries() { - let file_path = entry.file_path(); - if new_files.contains(file_path) && entry.is_alive() { - referenced_files.push(file_path.to_string()); + .buffer_unordered(32) + .try_for_each(|manifest| { + for entry in manifest.entries() { + let file_path = entry.file_path(); + if new_files.contains(file_path) && entry.is_alive() { + referenced_files.push(file_path.to_string()); + } } - } - } + std::future::ready(Ok(())) + }) + .await?; } if !referenced_files.is_empty() {