Skip to content

Commit e88d444

Browse files
committed
generify
1 parent 11c68d7 commit e88d444

3 files changed

Lines changed: 108 additions & 70 deletions

File tree

crates/iceberg/src/scan/context.rs

Lines changed: 40 additions & 39 deletions
Original file line numberDiff line numberDiff line change
@@ -24,15 +24,25 @@ use crate::delete_file_index::DeleteFileIndex;
2424
use crate::expr::{Bind, BoundPredicate, Predicate};
2525
use crate::io::object_cache::ObjectCache;
2626
use crate::scan::{
27-
AppendSnapshotSet, BoundPredicates, ExpressionEvaluatorCache, FileScanTask,
28-
ManifestEvaluatorCache, PartitionFilterCache,
27+
BoundPredicates, ExpressionEvaluatorCache, FileScanTask, ManifestEvaluatorCache,
28+
PartitionFilterCache,
2929
};
3030
use crate::spec::{
31-
ManifestContentType, ManifestEntryRef, ManifestFile, ManifestList, ManifestStatus, SchemaRef,
32-
SnapshotRef, TableMetadataRef,
31+
ManifestContentType, ManifestEntryRef, ManifestFile, ManifestList, SchemaRef, SnapshotRef,
32+
TableMetadataRef,
3333
};
3434
use crate::{Error, ErrorKind, Result};
3535

36+
/// Filter applied to each [`ManifestFile`] before fetching it.
37+
/// Returns `true` to include the manifest, `false` to skip it.
38+
pub(crate) type ManifestFileFilter =
39+
Arc<dyn Fn(&ManifestFile) -> bool + Send + Sync>;
40+
41+
/// Filter applied to each manifest entry after loading a manifest.
42+
/// Returns `true` to include the entry, `false` to skip it.
43+
pub(crate) type ManifestEntryFilter =
44+
Arc<dyn Fn(&ManifestEntryRef) -> bool + Send + Sync>;
45+
3646
/// Wraps a [`ManifestFile`] alongside the objects that are needed
3747
/// to process it in a thread-safe manner
3848
pub(crate) struct ManifestFileContext {
@@ -47,7 +57,7 @@ pub(crate) struct ManifestFileContext {
4757
expression_evaluator_cache: Arc<ExpressionEvaluatorCache>,
4858
delete_file_index: DeleteFileIndex,
4959
case_sensitive: bool,
50-
snapshot_range: Option<Arc<AppendSnapshotSet>>,
60+
entry_filter: Option<ManifestEntryFilter>,
5161
}
5262

5363
/// Wraps a [`ManifestEntryRef`] alongside the objects that are needed
@@ -78,33 +88,16 @@ impl ManifestFileContext {
7888
expression_evaluator_cache,
7989
delete_file_index,
8090
case_sensitive,
81-
snapshot_range,
91+
entry_filter,
8292
} = self;
8393

8494
let manifest = object_cache.get_manifest(&manifest_file).await?;
8595

8696
for manifest_entry in manifest.entries() {
87-
// For incremental scans, filter entries to only include those:
88-
// 1. With status ADDED (not EXISTING or DELETED)
89-
// 2. With a snapshot_id that falls within the range
90-
if let Some(ref range) = snapshot_range {
91-
// Only include entries with status ADDED
92-
if manifest_entry.status() != ManifestStatus::Added {
97+
if let Some(ref filter) = entry_filter {
98+
if !filter(manifest_entry) {
9399
continue;
94100
}
95-
96-
// Only include entries from snapshots in the range
97-
match manifest_entry.snapshot_id() {
98-
Some(entry_snapshot_id) => {
99-
if !range.contains(entry_snapshot_id) {
100-
continue;
101-
}
102-
}
103-
None => {
104-
// Skip entries without a snapshot_id in incremental mode
105-
continue;
106-
}
107-
}
108101
}
109102

110103
let manifest_entry_context = ManifestEntryContext {
@@ -171,7 +164,6 @@ impl ManifestEntryContext {
171164

172165
/// PlanContext wraps a [`SnapshotRef`] alongside all the other
173166
/// objects that are required to perform a scan file plan.
174-
#[derive(Debug)]
175167
pub(crate) struct PlanContext {
176168
pub snapshot: SnapshotRef,
177169

@@ -186,7 +178,25 @@ pub(crate) struct PlanContext {
186178
pub partition_filter_cache: Arc<PartitionFilterCache>,
187179
pub manifest_evaluator_cache: Arc<ManifestEvaluatorCache>,
188180
pub expression_evaluator_cache: Arc<ExpressionEvaluatorCache>,
189-
pub snapshot_range: Option<Arc<AppendSnapshotSet>>,
181+
pub manifest_file_filter: Option<ManifestFileFilter>,
182+
pub manifest_entry_filter: Option<ManifestEntryFilter>,
183+
}
184+
185+
impl std::fmt::Debug for PlanContext {
186+
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
187+
f.debug_struct("PlanContext")
188+
.field("snapshot", &self.snapshot)
189+
.field("case_sensitive", &self.case_sensitive)
190+
.field(
191+
"manifest_file_filter",
192+
&self.manifest_file_filter.as_ref().map(|_| "..."),
193+
)
194+
.field(
195+
"manifest_entry_filter",
196+
&self.manifest_entry_filter.as_ref().map(|_| "..."),
197+
)
198+
.finish_non_exhaustive()
199+
}
190200
}
191201

192202
impl PlanContext {
@@ -240,17 +250,8 @@ impl PlanContext {
240250
// TODO: Ideally we could ditch this intermediate Vec as we return an iterator.
241251
let mut filtered_mfcs = vec![];
242252
for manifest_file in manifest_files {
243-
// For incremental scans, skip manifests that can't contain relevant entries:
244-
// 1. Delete manifests — we only care about newly added data files.
245-
// 2. Data manifests whose added_snapshot_id is outside the scan range —
246-
// they can't contain entries added in the snapshots we care about.
247-
// (We still keep the entry-level filter because a manifest can contain
248-
// entries from multiple snapshots via manifest reuse.)
249-
if let Some(ref range) = self.snapshot_range {
250-
if manifest_file.content == ManifestContentType::Deletes {
251-
continue;
252-
}
253-
if !range.contains(manifest_file.added_snapshot_id) {
253+
if let Some(ref filter) = self.manifest_file_filter {
254+
if !filter(manifest_file) {
254255
continue;
255256
}
256257
}
@@ -324,7 +325,7 @@ impl PlanContext {
324325
expression_evaluator_cache: self.expression_evaluator_cache.clone(),
325326
delete_file_index,
326327
case_sensitive: self.case_sensitive,
327-
snapshot_range: self.snapshot_range.clone(),
328+
entry_filter: self.manifest_entry_filter.clone(),
328329
}
329330
}
330331
}

crates/iceberg/src/scan/incremental.rs

Lines changed: 63 additions & 28 deletions
Original file line numberDiff line numberDiff line change
@@ -18,10 +18,12 @@
1818
//! Incremental append scan for reading only newly added data between snapshots.
1919
2020
use std::collections::HashSet;
21+
use std::sync::Arc;
2122

2223
use crate::expr::Predicate;
24+
use crate::scan::context::{ManifestEntryFilter, ManifestFileFilter};
2325
use crate::scan::{ScanConfig, TableScan, build_table_scan};
24-
use crate::spec::{Operation, TableMetadataRef};
26+
use crate::spec::{ManifestContentType, ManifestStatus, Operation, TableMetadataRef};
2527
use crate::table::Table;
2628
use crate::util::available_parallelism;
2729
use crate::util::snapshot::ancestors_between;
@@ -136,10 +138,32 @@ impl AppendSnapshotSet {
136138
Ok(Self { snapshot_ids })
137139
}
138140

139-
/// Check if a snapshot_id is within this range
141+
/// Check if a snapshot_id is within this set
140142
pub(crate) fn contains(&self, snapshot_id: i64) -> bool {
141143
self.snapshot_ids.contains(&snapshot_id)
142144
}
145+
146+
/// Create a manifest file filter that skips delete manifests and data
147+
/// manifests whose `added_snapshot_id` is outside this set.
148+
pub(crate) fn manifest_file_filter(self: &Arc<Self>) -> ManifestFileFilter {
149+
let set = self.clone();
150+
Arc::new(move |manifest_file| {
151+
manifest_file.content != ManifestContentType::Deletes
152+
&& set.contains(manifest_file.added_snapshot_id)
153+
})
154+
}
155+
156+
/// Create a manifest entry filter that includes only entries with
157+
/// status ADDED and a snapshot_id within this set.
158+
pub(crate) fn manifest_entry_filter(self: &Arc<Self>) -> ManifestEntryFilter {
159+
let set = self.clone();
160+
Arc::new(move |entry| {
161+
entry.status() == ManifestStatus::Added
162+
&& entry
163+
.snapshot_id()
164+
.is_some_and(|id| set.contains(id))
165+
})
166+
}
143167
}
144168

145169
/// Builder to create an incremental append scan between two snapshots.
@@ -294,12 +318,12 @@ impl<'a> IncrementalAppendScanBuilder<'a> {
294318
}
295319
};
296320

297-
let snapshot_range = AppendSnapshotSet::build(
321+
let append_set = Arc::new(AppendSnapshotSet::build(
298322
&self.table.metadata_ref(),
299323
self.from_snapshot_id,
300324
to_snapshot.snapshot_id(),
301325
self.from_inclusive,
302-
)?;
326+
)?);
303327

304328
build_table_scan(
305329
ScanConfig {
@@ -315,7 +339,8 @@ impl<'a> IncrementalAppendScanBuilder<'a> {
315339
row_selection_enabled: self.row_selection_enabled,
316340
},
317341
to_snapshot,
318-
Some(snapshot_range),
342+
Some(append_set.manifest_file_filter()),
343+
Some(append_set.manifest_entry_filter()),
319344
)
320345
}
321346
}
@@ -402,22 +427,27 @@ mod tests {
402427
let table = TableTestFixture::new().table;
403428
let current_snapshot_id = table.metadata().current_snapshot().unwrap().snapshot_id();
404429

430+
// Verify the scan builds successfully
405431
let result = table
406432
.incremental_append_scan_inclusive(current_snapshot_id)
407433
.to_snapshot(current_snapshot_id)
408434
.build();
409-
410435
assert!(
411436
result.is_ok(),
412437
"Inclusive scan of a single append snapshot should succeed"
413438
);
414439

415-
let scan = result.unwrap();
416-
let plan_context = scan.plan_context.as_ref().unwrap();
417-
let range = plan_context.snapshot_range.as_ref().unwrap();
440+
// Verify AppendSnapshotSet directly
441+
let set = AppendSnapshotSet::build(
442+
&table.metadata_ref(),
443+
current_snapshot_id,
444+
current_snapshot_id,
445+
true,
446+
)
447+
.unwrap();
418448
assert!(
419-
range.contains(current_snapshot_id),
420-
"Inclusive range should contain the from_snapshot"
449+
set.contains(current_snapshot_id),
450+
"Inclusive set should contain the from_snapshot"
421451
);
422452
}
423453

@@ -427,22 +457,27 @@ mod tests {
427457
let table = TableTestFixture::new().table;
428458
let current_snapshot_id = table.metadata().current_snapshot().unwrap().snapshot_id();
429459

460+
// Verify the scan builds successfully
430461
let result = table
431462
.incremental_append_scan(current_snapshot_id)
432463
.to_snapshot(current_snapshot_id)
433464
.build();
434-
435465
assert!(
436466
result.is_ok(),
437467
"Exclusive scan from=to should succeed with empty range"
438468
);
439469

440-
let scan = result.unwrap();
441-
let plan_context = scan.plan_context.as_ref().unwrap();
442-
let range = plan_context.snapshot_range.as_ref().unwrap();
470+
// Verify AppendSnapshotSet directly
471+
let set = AppendSnapshotSet::build(
472+
&table.metadata_ref(),
473+
current_snapshot_id,
474+
current_snapshot_id,
475+
false,
476+
)
477+
.unwrap();
443478
assert!(
444-
!range.contains(current_snapshot_id),
445-
"Exclusive range should not contain the from_snapshot"
479+
!set.contains(current_snapshot_id),
480+
"Exclusive set should not contain the from_snapshot"
446481
);
447482
}
448483

@@ -484,20 +519,20 @@ mod tests {
484519
"Range of only append snapshots should succeed"
485520
);
486521

487-
let scan = result.unwrap();
488-
let range = scan
489-
.plan_context
490-
.as_ref()
491-
.unwrap()
492-
.snapshot_range
493-
.as_ref()
494-
.unwrap();
522+
// Verify AppendSnapshotSet directly
523+
let set = AppendSnapshotSet::build(
524+
&table.metadata_ref(),
525+
3051729675574597004,
526+
3056729675574597004,
527+
false,
528+
)
529+
.unwrap();
495530
assert!(
496-
!range.contains(3051729675574597004),
531+
!set.contains(3051729675574597004),
497532
"from_snapshot should be excluded"
498533
);
499-
assert!(range.contains(3055729675574597004), "S2 should be in range");
500-
assert!(range.contains(3056729675574597004), "S3 should be in range");
534+
assert!(set.contains(3055729675574597004), "S2 should be in range");
535+
assert!(set.contains(3056729675574597004), "S3 should be in range");
501536
}
502537

503538
#[tokio::test]

crates/iceberg/src/scan/mod.rs

Lines changed: 5 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -30,7 +30,6 @@ use arrow_array::RecordBatch;
3030
use futures::channel::mpsc::{Sender, channel};
3131
use futures::stream::BoxStream;
3232
use futures::{SinkExt, StreamExt, TryStreamExt};
33-
pub(crate) use incremental::AppendSnapshotSet;
3433
pub use incremental::IncrementalAppendScanBuilder;
3534
pub use task::*;
3635

@@ -69,7 +68,8 @@ pub(crate) struct ScanConfig<'a> {
6968
pub(crate) fn build_table_scan(
7069
config: ScanConfig<'_>,
7170
snapshot: SnapshotRef,
72-
snapshot_range: Option<AppendSnapshotSet>,
71+
manifest_file_filter: Option<ManifestFileFilter>,
72+
manifest_entry_filter: Option<ManifestEntryFilter>,
7373
) -> Result<TableScan> {
7474
let schema = snapshot.schema(config.table.metadata())?;
7575

@@ -144,7 +144,8 @@ pub(crate) fn build_table_scan(
144144
partition_filter_cache: Arc::new(PartitionFilterCache::new()),
145145
manifest_evaluator_cache: Arc::new(ManifestEvaluatorCache::new()),
146146
expression_evaluator_cache: Arc::new(ExpressionEvaluatorCache::new()),
147-
snapshot_range: snapshot_range.map(Arc::new),
147+
manifest_file_filter,
148+
manifest_entry_filter,
148149
};
149150

150151
Ok(TableScan {
@@ -345,6 +346,7 @@ impl<'a> TableScanBuilder<'a> {
345346
},
346347
snapshot,
347348
None,
349+
None,
348350
)
349351
}
350352
}

0 commit comments

Comments
 (0)