Skip to content

Commit 11cce80

Browse files
committed
generify
1 parent 11c68d7 commit 11cce80

3 files changed

Lines changed: 105 additions & 70 deletions

File tree

crates/iceberg/src/scan/context.rs

Lines changed: 38 additions & 39 deletions
Original file line numberDiff line numberDiff line change
@@ -24,15 +24,23 @@ use crate::delete_file_index::DeleteFileIndex;
2424
use crate::expr::{Bind, BoundPredicate, Predicate};
2525
use crate::io::object_cache::ObjectCache;
2626
use crate::scan::{
27-
AppendSnapshotSet, BoundPredicates, ExpressionEvaluatorCache, FileScanTask,
28-
ManifestEvaluatorCache, PartitionFilterCache,
27+
BoundPredicates, ExpressionEvaluatorCache, FileScanTask, ManifestEvaluatorCache,
28+
PartitionFilterCache,
2929
};
3030
use crate::spec::{
31-
ManifestContentType, ManifestEntryRef, ManifestFile, ManifestList, ManifestStatus, SchemaRef,
32-
SnapshotRef, TableMetadataRef,
31+
ManifestContentType, ManifestEntryRef, ManifestFile, ManifestList, SchemaRef, SnapshotRef,
32+
TableMetadataRef,
3333
};
3434
use crate::{Error, ErrorKind, Result};
3535

36+
/// Filter applied to each [`ManifestFile`] before fetching it.
37+
/// Returns `true` to include the manifest, `false` to skip it.
38+
pub(crate) type ManifestFileFilter = Arc<dyn Fn(&ManifestFile) -> bool + Send + Sync>;
39+
40+
/// Filter applied to each manifest entry after loading a manifest.
41+
/// Returns `true` to include the entry, `false` to skip it.
42+
pub(crate) type ManifestEntryFilter = Arc<dyn Fn(&ManifestEntryRef) -> bool + Send + Sync>;
43+
3644
/// Wraps a [`ManifestFile`] alongside the objects that are needed
3745
/// to process it in a thread-safe manner
3846
pub(crate) struct ManifestFileContext {
@@ -47,7 +55,7 @@ pub(crate) struct ManifestFileContext {
4755
expression_evaluator_cache: Arc<ExpressionEvaluatorCache>,
4856
delete_file_index: DeleteFileIndex,
4957
case_sensitive: bool,
50-
snapshot_range: Option<Arc<AppendSnapshotSet>>,
58+
entry_filter: Option<ManifestEntryFilter>,
5159
}
5260

5361
/// Wraps a [`ManifestEntryRef`] alongside the objects that are needed
@@ -78,33 +86,16 @@ impl ManifestFileContext {
7886
expression_evaluator_cache,
7987
delete_file_index,
8088
case_sensitive,
81-
snapshot_range,
89+
entry_filter,
8290
} = self;
8391

8492
let manifest = object_cache.get_manifest(&manifest_file).await?;
8593

8694
for manifest_entry in manifest.entries() {
87-
// For incremental scans, filter entries to only include those:
88-
// 1. With status ADDED (not EXISTING or DELETED)
89-
// 2. With a snapshot_id that falls within the range
90-
if let Some(ref range) = snapshot_range {
91-
// Only include entries with status ADDED
92-
if manifest_entry.status() != ManifestStatus::Added {
95+
if let Some(ref filter) = entry_filter {
96+
if !filter(manifest_entry) {
9397
continue;
9498
}
95-
96-
// Only include entries from snapshots in the range
97-
match manifest_entry.snapshot_id() {
98-
Some(entry_snapshot_id) => {
99-
if !range.contains(entry_snapshot_id) {
100-
continue;
101-
}
102-
}
103-
None => {
104-
// Skip entries without a snapshot_id in incremental mode
105-
continue;
106-
}
107-
}
10899
}
109100

110101
let manifest_entry_context = ManifestEntryContext {
@@ -171,7 +162,6 @@ impl ManifestEntryContext {
171162

172163
/// PlanContext wraps a [`SnapshotRef`] alongside all the other
173164
/// objects that are required to perform a scan file plan.
174-
#[derive(Debug)]
175165
pub(crate) struct PlanContext {
176166
pub snapshot: SnapshotRef,
177167

@@ -186,7 +176,25 @@ pub(crate) struct PlanContext {
186176
pub partition_filter_cache: Arc<PartitionFilterCache>,
187177
pub manifest_evaluator_cache: Arc<ManifestEvaluatorCache>,
188178
pub expression_evaluator_cache: Arc<ExpressionEvaluatorCache>,
189-
pub snapshot_range: Option<Arc<AppendSnapshotSet>>,
179+
pub manifest_file_filter: Option<ManifestFileFilter>,
180+
pub manifest_entry_filter: Option<ManifestEntryFilter>,
181+
}
182+
183+
impl std::fmt::Debug for PlanContext {
184+
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
185+
f.debug_struct("PlanContext")
186+
.field("snapshot", &self.snapshot)
187+
.field("case_sensitive", &self.case_sensitive)
188+
.field(
189+
"manifest_file_filter",
190+
&self.manifest_file_filter.as_ref().map(|_| "..."),
191+
)
192+
.field(
193+
"manifest_entry_filter",
194+
&self.manifest_entry_filter.as_ref().map(|_| "..."),
195+
)
196+
.finish_non_exhaustive()
197+
}
190198
}
191199

192200
impl PlanContext {
@@ -240,17 +248,8 @@ impl PlanContext {
240248
// TODO: Ideally we could ditch this intermediate Vec as we return an iterator.
241249
let mut filtered_mfcs = vec![];
242250
for manifest_file in manifest_files {
243-
// For incremental scans, skip manifests that can't contain relevant entries:
244-
// 1. Delete manifests — we only care about newly added data files.
245-
// 2. Data manifests whose added_snapshot_id is outside the scan range —
246-
// they can't contain entries added in the snapshots we care about.
247-
// (We still keep the entry-level filter because a manifest can contain
248-
// entries from multiple snapshots via manifest reuse.)
249-
if let Some(ref range) = self.snapshot_range {
250-
if manifest_file.content == ManifestContentType::Deletes {
251-
continue;
252-
}
253-
if !range.contains(manifest_file.added_snapshot_id) {
251+
if let Some(ref filter) = self.manifest_file_filter {
252+
if !filter(manifest_file) {
254253
continue;
255254
}
256255
}
@@ -324,7 +323,7 @@ impl PlanContext {
324323
expression_evaluator_cache: self.expression_evaluator_cache.clone(),
325324
delete_file_index,
326325
case_sensitive: self.case_sensitive,
327-
snapshot_range: self.snapshot_range.clone(),
326+
entry_filter: self.manifest_entry_filter.clone(),
328327
}
329328
}
330329
}

crates/iceberg/src/scan/incremental.rs

Lines changed: 62 additions & 28 deletions
Original file line numberDiff line numberDiff line change
@@ -18,10 +18,12 @@
1818
//! Incremental append scan for reading only newly added data between snapshots.
1919
2020
use std::collections::HashSet;
21+
use std::sync::Arc;
2122

2223
use crate::expr::Predicate;
24+
use crate::scan::context::{ManifestEntryFilter, ManifestFileFilter};
2325
use crate::scan::{ScanConfig, TableScan, build_table_scan};
24-
use crate::spec::{Operation, TableMetadataRef};
26+
use crate::spec::{ManifestContentType, ManifestStatus, Operation, TableMetadataRef};
2527
use crate::table::Table;
2628
use crate::util::available_parallelism;
2729
use crate::util::snapshot::ancestors_between;
@@ -136,10 +138,30 @@ impl AppendSnapshotSet {
136138
Ok(Self { snapshot_ids })
137139
}
138140

139-
/// Check if a snapshot_id is within this range
141+
/// Check if a snapshot_id is within this set
140142
pub(crate) fn contains(&self, snapshot_id: i64) -> bool {
141143
self.snapshot_ids.contains(&snapshot_id)
142144
}
145+
146+
/// Create a manifest file filter that skips delete manifests and data
147+
/// manifests whose `added_snapshot_id` is outside this set.
148+
pub(crate) fn manifest_file_filter(self: &Arc<Self>) -> ManifestFileFilter {
149+
let set = self.clone();
150+
Arc::new(move |manifest_file| {
151+
manifest_file.content != ManifestContentType::Deletes
152+
&& set.contains(manifest_file.added_snapshot_id)
153+
})
154+
}
155+
156+
/// Create a manifest entry filter that includes only entries with
157+
/// status ADDED and a snapshot_id within this set.
158+
pub(crate) fn manifest_entry_filter(self: &Arc<Self>) -> ManifestEntryFilter {
159+
let set = self.clone();
160+
Arc::new(move |entry| {
161+
entry.status() == ManifestStatus::Added
162+
&& entry.snapshot_id().is_some_and(|id| set.contains(id))
163+
})
164+
}
143165
}
144166

145167
/// Builder to create an incremental append scan between two snapshots.
@@ -294,12 +316,12 @@ impl<'a> IncrementalAppendScanBuilder<'a> {
294316
}
295317
};
296318

297-
let snapshot_range = AppendSnapshotSet::build(
319+
let append_set = Arc::new(AppendSnapshotSet::build(
298320
&self.table.metadata_ref(),
299321
self.from_snapshot_id,
300322
to_snapshot.snapshot_id(),
301323
self.from_inclusive,
302-
)?;
324+
)?);
303325

304326
build_table_scan(
305327
ScanConfig {
@@ -315,7 +337,8 @@ impl<'a> IncrementalAppendScanBuilder<'a> {
315337
row_selection_enabled: self.row_selection_enabled,
316338
},
317339
to_snapshot,
318-
Some(snapshot_range),
340+
Some(append_set.manifest_file_filter()),
341+
Some(append_set.manifest_entry_filter()),
319342
)
320343
}
321344
}
@@ -324,6 +347,7 @@ impl<'a> IncrementalAppendScanBuilder<'a> {
324347
mod tests {
325348
use futures::TryStreamExt;
326349

350+
use super::AppendSnapshotSet;
327351
use crate::scan::tests::TableTestFixture;
328352

329353
#[test]
@@ -402,22 +426,27 @@ mod tests {
402426
let table = TableTestFixture::new().table;
403427
let current_snapshot_id = table.metadata().current_snapshot().unwrap().snapshot_id();
404428

429+
// Verify the scan builds successfully
405430
let result = table
406431
.incremental_append_scan_inclusive(current_snapshot_id)
407432
.to_snapshot(current_snapshot_id)
408433
.build();
409-
410434
assert!(
411435
result.is_ok(),
412436
"Inclusive scan of a single append snapshot should succeed"
413437
);
414438

415-
let scan = result.unwrap();
416-
let plan_context = scan.plan_context.as_ref().unwrap();
417-
let range = plan_context.snapshot_range.as_ref().unwrap();
439+
// Verify AppendSnapshotSet directly
440+
let set = AppendSnapshotSet::build(
441+
&table.metadata_ref(),
442+
current_snapshot_id,
443+
current_snapshot_id,
444+
true,
445+
)
446+
.unwrap();
418447
assert!(
419-
range.contains(current_snapshot_id),
420-
"Inclusive range should contain the from_snapshot"
448+
set.contains(current_snapshot_id),
449+
"Inclusive set should contain the from_snapshot"
421450
);
422451
}
423452

@@ -427,22 +456,27 @@ mod tests {
427456
let table = TableTestFixture::new().table;
428457
let current_snapshot_id = table.metadata().current_snapshot().unwrap().snapshot_id();
429458

459+
// Verify the scan builds successfully
430460
let result = table
431461
.incremental_append_scan(current_snapshot_id)
432462
.to_snapshot(current_snapshot_id)
433463
.build();
434-
435464
assert!(
436465
result.is_ok(),
437466
"Exclusive scan from=to should succeed with empty range"
438467
);
439468

440-
let scan = result.unwrap();
441-
let plan_context = scan.plan_context.as_ref().unwrap();
442-
let range = plan_context.snapshot_range.as_ref().unwrap();
469+
// Verify AppendSnapshotSet directly
470+
let set = AppendSnapshotSet::build(
471+
&table.metadata_ref(),
472+
current_snapshot_id,
473+
current_snapshot_id,
474+
false,
475+
)
476+
.unwrap();
443477
assert!(
444-
!range.contains(current_snapshot_id),
445-
"Exclusive range should not contain the from_snapshot"
478+
!set.contains(current_snapshot_id),
479+
"Exclusive set should not contain the from_snapshot"
446480
);
447481
}
448482

@@ -484,20 +518,20 @@ mod tests {
484518
"Range of only append snapshots should succeed"
485519
);
486520

487-
let scan = result.unwrap();
488-
let range = scan
489-
.plan_context
490-
.as_ref()
491-
.unwrap()
492-
.snapshot_range
493-
.as_ref()
494-
.unwrap();
521+
// Verify AppendSnapshotSet directly
522+
let set = AppendSnapshotSet::build(
523+
&table.metadata_ref(),
524+
3051729675574597004,
525+
3056729675574597004,
526+
false,
527+
)
528+
.unwrap();
495529
assert!(
496-
!range.contains(3051729675574597004),
530+
!set.contains(3051729675574597004),
497531
"from_snapshot should be excluded"
498532
);
499-
assert!(range.contains(3055729675574597004), "S2 should be in range");
500-
assert!(range.contains(3056729675574597004), "S3 should be in range");
533+
assert!(set.contains(3055729675574597004), "S2 should be in range");
534+
assert!(set.contains(3056729675574597004), "S3 should be in range");
501535
}
502536

503537
#[tokio::test]

crates/iceberg/src/scan/mod.rs

Lines changed: 5 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -30,7 +30,6 @@ use arrow_array::RecordBatch;
3030
use futures::channel::mpsc::{Sender, channel};
3131
use futures::stream::BoxStream;
3232
use futures::{SinkExt, StreamExt, TryStreamExt};
33-
pub(crate) use incremental::AppendSnapshotSet;
3433
pub use incremental::IncrementalAppendScanBuilder;
3534
pub use task::*;
3635

@@ -69,7 +68,8 @@ pub(crate) struct ScanConfig<'a> {
6968
pub(crate) fn build_table_scan(
7069
config: ScanConfig<'_>,
7170
snapshot: SnapshotRef,
72-
snapshot_range: Option<AppendSnapshotSet>,
71+
manifest_file_filter: Option<ManifestFileFilter>,
72+
manifest_entry_filter: Option<ManifestEntryFilter>,
7373
) -> Result<TableScan> {
7474
let schema = snapshot.schema(config.table.metadata())?;
7575

@@ -144,7 +144,8 @@ pub(crate) fn build_table_scan(
144144
partition_filter_cache: Arc::new(PartitionFilterCache::new()),
145145
manifest_evaluator_cache: Arc::new(ManifestEvaluatorCache::new()),
146146
expression_evaluator_cache: Arc::new(ExpressionEvaluatorCache::new()),
147-
snapshot_range: snapshot_range.map(Arc::new),
147+
manifest_file_filter,
148+
manifest_entry_filter,
148149
};
149150

150151
Ok(TableScan {
@@ -345,6 +346,7 @@ impl<'a> TableScanBuilder<'a> {
345346
},
346347
snapshot,
347348
None,
349+
None,
348350
)
349351
}
350352
}

0 commit comments

Comments
 (0)