Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
366 changes: 174 additions & 192 deletions Cargo.lock

Large diffs are not rendered by default.

47 changes: 44 additions & 3 deletions crates/iceberg/src/scan/context.rs
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,14 @@ use crate::spec::{
};
use crate::{Error, ErrorKind, Result};

/// Filter applied to each [`ManifestFile`] before fetching it.
/// Returns `true` to include the manifest, `false` to skip it.
pub(crate) type ManifestFileFilter = Arc<dyn Fn(&ManifestFile) -> bool + Send + Sync>;

/// Filter applied to each manifest entry after loading a manifest.
/// Returns `true` to include the entry, `false` to skip it.
pub(crate) type ManifestEntryFilter = Arc<dyn Fn(&ManifestEntryRef) -> bool + Send + Sync>;

Comment on lines +36 to +43
Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this is a nice way to inject these filters and should extend to other future scans

/// Wraps a [`ManifestFile`] alongside the objects that are needed
/// to process it in a thread-safe manner
pub(crate) struct ManifestFileContext {
Expand All @@ -47,6 +55,7 @@ pub(crate) struct ManifestFileContext {
expression_evaluator_cache: Arc<ExpressionEvaluatorCache>,
delete_file_index: DeleteFileIndex,
case_sensitive: bool,
entry_filter: Option<ManifestEntryFilter>,
}

/// Wraps a [`ManifestEntryRef`] alongside the objects that are needed
Expand Down Expand Up @@ -76,12 +85,19 @@ impl ManifestFileContext {
mut sender,
expression_evaluator_cache,
delete_file_index,
..
case_sensitive,
entry_filter,
} = self;

let manifest = object_cache.get_manifest(&manifest_file).await?;

for manifest_entry in manifest.entries() {
if let Some(ref filter) = entry_filter
&& !filter(manifest_entry)
{
continue;
}

let manifest_entry_context = ManifestEntryContext {
// TODO: refactor to avoid the expensive ManifestEntry clone
manifest_entry: manifest_entry.clone(),
Expand All @@ -91,7 +107,7 @@ impl ManifestFileContext {
bound_predicates: bound_predicates.clone(),
snapshot_schema: snapshot_schema.clone(),
delete_file_index: delete_file_index.clone(),
case_sensitive: self.case_sensitive,
case_sensitive,
};

sender
Expand Down Expand Up @@ -146,7 +162,6 @@ impl ManifestEntryContext {

/// PlanContext wraps a [`SnapshotRef`] alongside all the other
/// objects that are required to perform a scan file plan.
#[derive(Debug)]
pub(crate) struct PlanContext {
pub snapshot: SnapshotRef,

Expand All @@ -161,6 +176,25 @@ pub(crate) struct PlanContext {
pub partition_filter_cache: Arc<PartitionFilterCache>,
pub manifest_evaluator_cache: Arc<ManifestEvaluatorCache>,
pub expression_evaluator_cache: Arc<ExpressionEvaluatorCache>,
pub manifest_file_filter: Option<ManifestFileFilter>,
pub manifest_entry_filter: Option<ManifestEntryFilter>,
}

impl std::fmt::Debug for PlanContext {
Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ManifestFileFilter and ManifestEntryFilter can't be debug

fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.debug_struct("PlanContext")
.field("snapshot", &self.snapshot)
.field("case_sensitive", &self.case_sensitive)
.field(
"manifest_file_filter",
&self.manifest_file_filter.as_ref().map(|_| "..."),
)
.field(
"manifest_entry_filter",
&self.manifest_entry_filter.as_ref().map(|_| "..."),
)
.finish_non_exhaustive()
}
}

impl PlanContext {
Expand Down Expand Up @@ -214,6 +248,12 @@ impl PlanContext {
// TODO: Ideally we could ditch this intermediate Vec as we return an iterator.
let mut filtered_mfcs = vec![];
for manifest_file in manifest_files {
if let Some(ref filter) = self.manifest_file_filter
&& !filter(manifest_file)
{
continue;
}

let tx = if manifest_file.content == ManifestContentType::Deletes {
delete_file_tx.clone()
} else {
Expand Down Expand Up @@ -283,6 +323,7 @@ impl PlanContext {
expression_evaluator_cache: self.expression_evaluator_cache.clone(),
delete_file_index,
case_sensitive: self.case_sensitive,
entry_filter: self.manifest_entry_filter.clone(),
}
}
}
Loading
Loading