diff --git a/crates/iceberg/src/arrow/caching_delete_file_loader.rs b/crates/iceberg/src/arrow/caching_delete_file_loader.rs index ae97534d83..4b67cf9d34 100644 --- a/crates/iceberg/src/arrow/caching_delete_file_loader.rs +++ b/crates/iceberg/src/arrow/caching_delete_file_loader.rs @@ -943,6 +943,7 @@ mod tests { partition_spec: None, name_mapping: None, case_sensitive: false, + split_offsets: None, }; // Load the deletes - should handle both types without error diff --git a/crates/iceberg/src/arrow/delete_filter.rs b/crates/iceberg/src/arrow/delete_filter.rs index 6369938ce2..dfebd0adef 100644 --- a/crates/iceberg/src/arrow/delete_filter.rs +++ b/crates/iceberg/src/arrow/delete_filter.rs @@ -428,6 +428,7 @@ pub(crate) mod tests { partition_spec: None, name_mapping: None, case_sensitive: false, + split_offsets: None, }, FileScanTask { file_size_in_bytes: 0, @@ -444,6 +445,7 @@ pub(crate) mod tests { partition_spec: None, name_mapping: None, case_sensitive: false, + split_offsets: None, }, ]; @@ -501,6 +503,7 @@ pub(crate) mod tests { partition_spec: None, name_mapping: None, case_sensitive: true, + split_offsets: None, }; let filter = DeleteFilter::default(); diff --git a/crates/iceberg/src/arrow/reader.rs b/crates/iceberg/src/arrow/reader.rs index 488f41cf20..388ae1df3e 100644 --- a/crates/iceberg/src/arrow/reader.rs +++ b/crates/iceberg/src/arrow/reader.rs @@ -2333,6 +2333,7 @@ message schema { partition_spec: None, name_mapping: None, case_sensitive: false, + split_offsets: None, })] .into_iter(), )) as FileScanTaskStream; @@ -2656,6 +2657,7 @@ message schema { partition_spec: None, name_mapping: None, case_sensitive: false, + split_offsets: None, }; // Task 2: read the second and third row groups @@ -2674,6 +2676,7 @@ message schema { partition_spec: None, name_mapping: None, case_sensitive: false, + split_offsets: None, }; let tasks1 = Box::pin(futures::stream::iter(vec![Ok(task1)])) as FileScanTaskStream; @@ -2805,6 +2808,7 @@ message schema { partition_spec: None, name_mapping: None, case_sensitive: false, + split_offsets: None, })] .into_iter(), )) as FileScanTaskStream; @@ -2979,6 +2983,7 @@ message schema { partition_spec: None, name_mapping: None, case_sensitive: false, + split_offsets: None, }; let tasks = Box::pin(futures::stream::iter(vec![Ok(task)])) as FileScanTaskStream; @@ -3199,6 +3204,7 @@ message schema { partition_spec: None, name_mapping: None, case_sensitive: false, + split_offsets: None, }; let tasks = Box::pin(futures::stream::iter(vec![Ok(task)])) as FileScanTaskStream; @@ -3412,6 +3418,7 @@ message schema { partition_spec: None, name_mapping: None, case_sensitive: false, + split_offsets: None, }; let tasks = Box::pin(futures::stream::iter(vec![Ok(task)])) as FileScanTaskStream; @@ -3519,6 +3526,7 @@ message schema { partition_spec: None, name_mapping: None, case_sensitive: false, + split_offsets: None, })] .into_iter(), )) as FileScanTaskStream; @@ -3620,6 +3628,7 @@ message schema { partition_spec: None, name_mapping: None, case_sensitive: false, + split_offsets: None, })] .into_iter(), )) as FileScanTaskStream; @@ -3710,6 +3719,7 @@ message schema { partition_spec: None, name_mapping: None, case_sensitive: false, + split_offsets: None, })] .into_iter(), )) as FileScanTaskStream; @@ -3814,6 +3824,7 @@ message schema { partition_spec: None, name_mapping: None, case_sensitive: false, + split_offsets: None, })] .into_iter(), )) as FileScanTaskStream; @@ -3947,6 +3958,7 @@ message schema { partition_spec: None, name_mapping: None, case_sensitive: false, + split_offsets: None, })] .into_iter(), )) as FileScanTaskStream; @@ -4047,6 +4059,7 @@ message schema { partition_spec: None, name_mapping: None, case_sensitive: false, + split_offsets: None, })] .into_iter(), )) as FileScanTaskStream; @@ -4160,6 +4173,7 @@ message schema { partition_spec: None, name_mapping: None, case_sensitive: false, + split_offsets: None, })] .into_iter(), )) as FileScanTaskStream; @@ -4254,6 +4268,7 @@ message schema { partition_spec: None, name_mapping: None, case_sensitive: false, + split_offsets: None, }), Ok(FileScanTask { file_size_in_bytes: std::fs::metadata(format!("{table_location}/file_1.parquet")) @@ -4272,6 +4287,7 @@ message schema { partition_spec: None, name_mapping: None, case_sensitive: false, + split_offsets: None, }), Ok(FileScanTask { file_size_in_bytes: std::fs::metadata(format!("{table_location}/file_2.parquet")) @@ -4290,6 +4306,7 @@ message schema { partition_spec: None, name_mapping: None, case_sensitive: false, + split_offsets: None, }), ]; @@ -4472,6 +4489,7 @@ message schema { partition_spec: Some(partition_spec), name_mapping: None, case_sensitive: false, + split_offsets: None, })] .into_iter(), )) as FileScanTaskStream; @@ -4888,6 +4906,7 @@ message schema { partition_spec: None, name_mapping: None, case_sensitive: false, + split_offsets: None, })] .into_iter(), )) as FileScanTaskStream; @@ -4956,6 +4975,7 @@ message schema { partition_spec: None, name_mapping: None, case_sensitive: false, + split_offsets: None, }; let tasks = Box::pin(futures::stream::iter(vec![Ok(task)])) as FileScanTaskStream; diff --git a/crates/iceberg/src/scan/bin_packing.rs b/crates/iceberg/src/scan/bin_packing.rs new file mode 100644 index 0000000000..748745128a --- /dev/null +++ b/crates/iceberg/src/scan/bin_packing.rs @@ -0,0 +1,187 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +//! First-fit-decreasing bin packing with lookback. +//! +//! Used by [`TableScan::plan_tasks()`] to combine split scan tasks into +//! balanced groups whose total weight is roughly `target_weight`. + +use std::collections::VecDeque; + +struct Bin { + items: Vec, + weight: u64, +} + +/// Bin-pack `items` into groups whose total weight is roughly `target_weight`. +/// +/// Uses a first-fit-decreasing strategy with a sliding `lookback` window of +/// open bins, matching the algorithm in Java Iceberg's `BinPacking.java`. +/// +/// Items heavier than `target_weight` are placed in their own bin. +pub(crate) fn bin_pack( + mut items: Vec, + target_weight: u64, + lookback: usize, + weight_fn: F, +) -> Vec> +where + F: Fn(&T) -> u64, +{ + if items.is_empty() { + return vec![]; + } + + let lookback = lookback.max(1); + + // Compute weights and sort descending (heaviest first) + let mut weighted: Vec<(T, u64)> = items + .drain(..) + .map(|item| { + let w = weight_fn(&item); + (item, w) + }) + .collect(); + weighted.sort_by(|a, b| b.1.cmp(&a.1)); + + let mut result: Vec> = Vec::new(); + let mut open_bins: VecDeque> = VecDeque::new(); + + for (item, weight) in weighted { + // Try to fit into an existing open bin + let fit_idx = open_bins + .iter() + .position(|bin| bin.weight + weight <= target_weight); + + if let Some(idx) = fit_idx { + open_bins[idx].weight += weight; + open_bins[idx].items.push(item); + } else { + // Evict the largest bin if we've exceeded lookback + if open_bins.len() >= lookback { + let max_idx = open_bins + .iter() + .enumerate() + .max_by_key(|(_, b)| b.weight) + .map(|(i, _)| i) + .unwrap(); + let evicted = open_bins.remove(max_idx).unwrap(); + result.push(evicted.items); + } + + open_bins.push_back(Bin { + items: vec![item], + weight, + }); + } + } + + // Flush remaining bins + for bin in open_bins { + result.push(bin.items); + } + + result +} + +#[cfg(test)] +mod tests { + use super::*; + + #[test] + fn test_empty_input() { + let items: Vec = vec![]; + let result = bin_pack(items, 100, 10, |&x| x); + assert!(result.is_empty()); + } + + #[test] + fn test_single_item_fits() { + let result = bin_pack(vec![50u64], 100, 10, |&x| x); + assert_eq!(result.len(), 1); + assert_eq!(result[0], vec![50]); + } + + #[test] + fn test_single_oversized_item() { + let result = bin_pack(vec![200u64], 100, 10, |&x| x); + assert_eq!(result.len(), 1); + assert_eq!(result[0], vec![200]); + } + + #[test] + fn test_multiple_small_items_pack_together() { + let result = bin_pack(vec![30u64, 20, 10, 25, 15], 100, 10, |&x| x); + // Total weight = 100, fits in one bin + assert_eq!(result.len(), 1); + assert_eq!(result[0].iter().sum::(), 100); + } + + #[test] + fn test_items_split_into_multiple_bins() { + let result = bin_pack(vec![60u64, 60, 60], 100, 10, |&x| x); + // Each 60 can pair with at most one other 60 (120 > 100), so need at least 2 bins + // With first-fit-decreasing: first 60 -> bin1, second 60 -> bin2, third 60 -> bin3 + // (none can combine since 60+60=120 > 100) + assert_eq!(result.len(), 3); + } + + #[test] + fn test_bin_packing_balances_load() { + // 4 items: 50, 40, 30, 20 with target 70 + let result = bin_pack(vec![50u64, 40, 30, 20], 70, 10, |&x| x); + // Sorted descending: 50, 40, 30, 20 + // 50 -> bin1(50), 40 -> bin2(40), 30 -> bin2(70), 20 -> bin1(70) + assert_eq!(result.len(), 2); + for bin in &result { + let sum: u64 = bin.iter().sum(); + assert!(sum <= 70, "Bin weight {sum} exceeds target 70"); + } + } + + #[test] + fn test_lookback_limits_open_bins() { + // With lookback=1, only one bin is kept open at a time + let result = bin_pack(vec![10u64, 10, 10, 10], 100, 1, |&x| x); + // All items are same weight (10). With lookback=1: + // item1(10)->bin1(10), item2(10)->bin1(20), item3(10)->bin1(30), item4(10)->bin1(40) + // They all fit, so 1 bin + assert_eq!(result.len(), 1); + } + + #[test] + fn test_lookback_causes_suboptimal_packing() { + // With lookback=1, a tight fit may be missed + // Items: 80, 70, 30, 20 with target=100, lookback=1 + let result = bin_pack(vec![80u64, 70, 30, 20], 100, 1, |&x| x); + // Sorted: 80, 70, 30, 20 + // 80->bin1(80). lookback=1, only bin1 open. + // 70 doesn't fit in bin1(80+70=150>100). Evict bin1([80]). 70->bin2(70). + // 30 fits in bin2(70+30=100). bin2(100). + // 20 doesn't fit in bin2(100+20=120>100). Evict bin2([70,30]). 20->bin3(20). + assert_eq!(result.len(), 3); + } + + #[test] + fn test_custom_weight_function() { + // Weight function that doubles the value + let result = bin_pack(vec![30u64, 30, 30], 100, 10, |&x| x * 2); + // Effective weights: 60, 60, 60 + // 60+60=120 > 100, so each in its own bin + assert_eq!(result.len(), 3); + } +} diff --git a/crates/iceberg/src/scan/context.rs b/crates/iceberg/src/scan/context.rs index aa28ffd5a2..3a38d31847 100644 --- a/crates/iceberg/src/scan/context.rs +++ b/crates/iceberg/src/scan/context.rs @@ -140,6 +140,11 @@ impl ManifestEntryContext { // TODO: Extract name_mapping from table metadata property "schema.name-mapping.default" name_mapping: None, case_sensitive: self.case_sensitive, + split_offsets: self + .manifest_entry + .data_file() + .split_offsets() + .map(|s| s.to_vec()), }) } } diff --git a/crates/iceberg/src/scan/mod.rs b/crates/iceberg/src/scan/mod.rs index 4a1e27bdc1..15f31f91dd 100644 --- a/crates/iceberg/src/scan/mod.rs +++ b/crates/iceberg/src/scan/mod.rs @@ -17,10 +17,12 @@ //! Table scan api. +mod bin_packing; mod cache; use cache::*; mod context; use context::*; +mod split; mod task; use std::sync::Arc; @@ -38,7 +40,9 @@ use crate::expr::{Bind, BoundPredicate, Predicate}; use crate::io::FileIO; use crate::metadata_columns::{get_metadata_field_id, is_metadata_column_name}; use crate::runtime::spawn; -use crate::spec::{DataContentType, SnapshotRef}; +use crate::scan::bin_packing::bin_pack; +use crate::scan::split::{merge_adjacent_tasks, split_scan_task}; +use crate::spec::{DataContentType, SnapshotRef, TableProperties}; use crate::table::Table; use crate::util::available_parallelism; use crate::{Error, ErrorKind, Result}; @@ -60,6 +64,9 @@ pub struct TableScanBuilder<'a> { concurrency_limit_manifest_files: usize, row_group_filtering_enabled: bool, row_selection_enabled: bool, + target_split_size: Option, + split_lookback: Option, + split_open_file_cost: Option, } impl<'a> TableScanBuilder<'a> { @@ -78,6 +85,9 @@ impl<'a> TableScanBuilder<'a> { concurrency_limit_manifest_files: num_cpus, row_group_filtering_enabled: true, row_selection_enabled: false, + target_split_size: None, + split_lookback: None, + split_open_file_cost: None, } } @@ -184,8 +194,49 @@ impl<'a> TableScanBuilder<'a> { self } + /// Sets the target split size in bytes for `plan_tasks()`. + /// Overrides the table property `read.split.target-size`. + pub fn with_target_split_size(mut self, size: u64) -> Self { + self.target_split_size = Some(size); + self + } + + /// Sets the bin-packing lookback for `plan_tasks()`. + /// Overrides the table property `read.split.planning-lookback`. + pub fn with_split_lookback(mut self, lookback: usize) -> Self { + self.split_lookback = Some(lookback); + self + } + + /// Sets the open file cost in bytes for `plan_tasks()`. + /// Overrides the table property `read.split.open-file-cost`. + pub fn with_split_open_file_cost(mut self, cost: u64) -> Self { + self.split_open_file_cost = Some(cost); + self + } + /// Build the table scan. pub fn build(self) -> Result { + let props = self.table.metadata().properties(); + let target_split_size = self.target_split_size.unwrap_or_else(|| { + props + .get(TableProperties::PROPERTY_SPLIT_SIZE) + .and_then(|v| v.parse().ok()) + .unwrap_or(TableProperties::PROPERTY_SPLIT_SIZE_DEFAULT) + }); + let split_lookback = self.split_lookback.unwrap_or_else(|| { + props + .get(TableProperties::PROPERTY_SPLIT_LOOKBACK) + .and_then(|v| v.parse().ok()) + .unwrap_or(TableProperties::PROPERTY_SPLIT_LOOKBACK_DEFAULT) + }); + let split_open_file_cost = self.split_open_file_cost.unwrap_or_else(|| { + props + .get(TableProperties::PROPERTY_SPLIT_OPEN_FILE_COST) + .and_then(|v| v.parse().ok()) + .unwrap_or(TableProperties::PROPERTY_SPLIT_OPEN_FILE_COST_DEFAULT) + }); + let snapshot = match self.snapshot_id { Some(snapshot_id) => self .table @@ -210,6 +261,9 @@ impl<'a> TableScanBuilder<'a> { concurrency_limit_manifest_files: self.concurrency_limit_manifest_files, row_group_filtering_enabled: self.row_group_filtering_enabled, row_selection_enabled: self.row_selection_enabled, + target_split_size, + split_lookback, + split_open_file_cost, }); }; current_snapshot_id.clone() @@ -303,6 +357,9 @@ impl<'a> TableScanBuilder<'a> { concurrency_limit_manifest_files: self.concurrency_limit_manifest_files, row_group_filtering_enabled: self.row_group_filtering_enabled, row_selection_enabled: self.row_selection_enabled, + target_split_size, + split_lookback, + split_open_file_cost, }) } } @@ -331,9 +388,58 @@ pub struct TableScan { row_group_filtering_enabled: bool, row_selection_enabled: bool, + + target_split_size: u64, + split_lookback: usize, + split_open_file_cost: u64, } impl TableScan { + /// Returns a stream of [`CombinedScanTask`]s. + /// + /// This builds on [`plan_files()`](Self::plan_files) by additionally: + /// 1. Splitting large files into smaller tasks based on `target_split_size` + /// 2. Bin-packing small tasks into balanced groups for parallel execution + /// + /// Use this method when you need evenly-sized work units, e.g. for + /// distributing scan work across multiple threads or nodes. + pub async fn plan_tasks(&self) -> Result { + let file_tasks: Vec = self.plan_files().await?.try_collect().await?; + + let split_tasks: Vec = file_tasks + .into_iter() + .flat_map(|task| split_scan_task(task, self.target_split_size)) + .collect(); + + let open_file_cost = self.split_open_file_cost; + let weight_fn = |task: &FileScanTask| -> u64 { + let content_size = task.length + + task + .deletes + .iter() + .map(|d| d.file_size_in_bytes) + .sum::(); + let open_cost = (1 + task.deletes.len() as u64) * open_file_cost; + content_size.max(open_cost) + }; + + let bins = bin_pack( + split_tasks, + self.target_split_size, + self.split_lookback, + weight_fn, + ); + + let combined: Vec = bins + .into_iter() + .map(|group| CombinedScanTask::new(merge_adjacent_tasks(group))) + .collect(); + + Ok(Box::pin(futures::stream::iter( + combined.into_iter().map(Ok), + ))) + } + /// Returns a stream of [`FileScanTask`]s. pub async fn plan_files(&self) -> Result { let Some(plan_context) = self.plan_context.as_ref() else { @@ -1820,6 +1926,7 @@ pub mod tests { partition_spec: None, name_mapping: None, case_sensitive: false, + split_offsets: None, }; test_fn(task); @@ -1839,6 +1946,7 @@ pub mod tests { partition_spec: None, name_mapping: None, case_sensitive: false, + split_offsets: None, }; test_fn(task); } diff --git a/crates/iceberg/src/scan/split.rs b/crates/iceberg/src/scan/split.rs new file mode 100644 index 0000000000..fe7974bada --- /dev/null +++ b/crates/iceberg/src/scan/split.rs @@ -0,0 +1,321 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +//! File splitting and adjacent-task merging for scan planning. + +use crate::scan::FileScanTask; +use crate::spec::DataFileFormat; + +/// Split a [`FileScanTask`] into smaller tasks based on `target_split_size`. +/// +/// For Parquet files with `split_offsets`, splits at row group boundaries. +/// For other formats or missing offsets, splits at fixed byte intervals. +/// Tasks already at or below `target_split_size` are returned unchanged. +pub(crate) fn split_scan_task(task: FileScanTask, target_split_size: u64) -> Vec { + if task.length <= target_split_size { + return vec![task]; + } + + if task.data_file_format == DataFileFormat::Parquet + && let Some(ref offsets) = task.split_offsets + && !offsets.is_empty() + { + return split_by_offsets(&task, offsets, target_split_size); + } + + split_fixed_size(&task, target_split_size) +} + +/// Split at row group boundaries defined by `split_offsets`. +fn split_by_offsets( + task: &FileScanTask, + offsets: &[i64], + target_split_size: u64, +) -> Vec { + let file_end = task.start + task.length; + + // Filter offsets to those within our byte range and collect boundaries + let mut boundaries: Vec = offsets + .iter() + .map(|&o| o as u64) + .filter(|&o| o >= task.start && o < file_end) + .collect(); + + // Ensure the range start is included as the first boundary + if boundaries.is_empty() || boundaries[0] != task.start { + boundaries.insert(0, task.start); + } + + let mut result = Vec::new(); + let mut group_start = boundaries[0]; + let mut group_size: u64 = 0; + + for i in 1..boundaries.len() { + let segment_size = boundaries[i] + - (if i > 0 { + boundaries[i - 1] + } else { + group_start + }); + group_size += segment_size; + + if group_size >= target_split_size { + result.push(make_split(task, group_start, boundaries[i] - group_start)); + group_start = boundaries[i]; + group_size = 0; + } + } + + // Final segment: from group_start to end of task + let final_length = file_end - group_start; + if final_length > 0 { + result.push(make_split(task, group_start, final_length)); + } + + if result.is_empty() { + result.push(make_split(task, task.start, task.length)); + } + + result +} + +/// Split at fixed byte intervals. +fn split_fixed_size(task: &FileScanTask, target_split_size: u64) -> Vec { + let mut result = Vec::new(); + let end = task.start + task.length; + let mut offset = task.start; + + while offset < end { + let length = (end - offset).min(target_split_size); + result.push(make_split(task, offset, length)); + offset += length; + } + + result +} + +/// Create a new `FileScanTask` covering a sub-range of the original. +fn make_split(task: &FileScanTask, start: u64, length: u64) -> FileScanTask { + FileScanTask { + file_size_in_bytes: task.file_size_in_bytes, + start, + length, + // Record count is unknown for partial file reads + record_count: if start == task.start && length == task.length { + task.record_count + } else { + None + }, + data_file_path: task.data_file_path.clone(), + data_file_format: task.data_file_format, + schema: task.schema.clone(), + project_field_ids: task.project_field_ids.clone(), + predicate: task.predicate.clone(), + deletes: task.deletes.clone(), + partition: task.partition.clone(), + partition_spec: task.partition_spec.clone(), + name_mapping: task.name_mapping.clone(), + case_sensitive: task.case_sensitive, + split_offsets: None, // Splits don't need offsets anymore + } +} + +/// Merge adjacent splits from the same file within a group of tasks. +/// +/// Two tasks are merged if they have the same `data_file_path` and their +/// byte ranges are contiguous (`task_a.start + task_a.length == task_b.start`). +pub(crate) fn merge_adjacent_tasks(mut tasks: Vec) -> Vec { + if tasks.len() <= 1 { + return tasks; + } + + // Sort by file path then by start offset for deterministic merging + tasks.sort_by(|a, b| { + a.data_file_path + .cmp(&b.data_file_path) + .then(a.start.cmp(&b.start)) + }); + + let mut result: Vec = Vec::with_capacity(tasks.len()); + let mut iter = tasks.into_iter(); + let mut current = iter.next().unwrap(); + + for next in iter { + if current.data_file_path == next.data_file_path + && current.start + current.length == next.start + { + // Merge: extend current to cover both ranges + current.length += next.length; + // Merged record count is unknown + current.record_count = None; + } else { + result.push(current); + current = next; + } + } + result.push(current); + + result +} + +#[cfg(test)] +mod tests { + use std::sync::Arc; + + use super::*; + use crate::spec::{DataFileFormat, Schema, SchemaRef}; + + fn test_schema() -> SchemaRef { + Arc::new(Schema::builder().build().unwrap()) + } + + fn make_task(start: u64, length: u64, format: DataFileFormat) -> FileScanTask { + FileScanTask { + file_size_in_bytes: start + length, + start, + length, + record_count: Some(1000), + data_file_path: "s3://bucket/data/file.parquet".to_string(), + data_file_format: format, + schema: test_schema(), + project_field_ids: vec![], + predicate: None, + deletes: vec![], + partition: None, + partition_spec: None, + name_mapping: None, + case_sensitive: true, + split_offsets: None, + } + } + + fn make_parquet_task_with_offsets(start: u64, length: u64, offsets: Vec) -> FileScanTask { + let mut task = make_task(start, length, DataFileFormat::Parquet); + task.split_offsets = Some(offsets); + task + } + + #[test] + fn test_task_smaller_than_target_not_split() { + let task = make_task(0, 50, DataFileFormat::Parquet); + let result = split_scan_task(task, 100); + assert_eq!(result.len(), 1); + assert_eq!(result[0].start, 0); + assert_eq!(result[0].length, 50); + assert_eq!(result[0].record_count, Some(1000)); + } + + #[test] + fn test_task_equal_to_target_not_split() { + let task = make_task(0, 100, DataFileFormat::Parquet); + let result = split_scan_task(task, 100); + assert_eq!(result.len(), 1); + } + + #[test] + fn test_fixed_size_split() { + let task = make_task(0, 250, DataFileFormat::Avro); + let result = split_scan_task(task, 100); + assert_eq!(result.len(), 3); + assert_eq!((result[0].start, result[0].length), (0, 100)); + assert_eq!((result[1].start, result[1].length), (100, 100)); + assert_eq!((result[2].start, result[2].length), (200, 50)); + // Splits don't have record counts + assert_eq!(result[0].record_count, None); + } + + #[test] + fn test_parquet_without_offsets_uses_fixed_split() { + let task = make_task(0, 250, DataFileFormat::Parquet); + // No split_offsets set + let result = split_scan_task(task, 100); + assert_eq!(result.len(), 3); + } + + #[test] + fn test_parquet_offset_aware_split() { + // File: 300 bytes, row groups at 0, 100, 200 + let task = make_parquet_task_with_offsets(0, 300, vec![0, 100, 200]); + let result = split_scan_task(task, 150); + // Groups: [0..200) = 200 bytes (>=150, split), [200..300) = 100 bytes + assert_eq!(result.len(), 2); + assert_eq!((result[0].start, result[0].length), (0, 200)); + assert_eq!((result[1].start, result[1].length), (200, 100)); + } + + #[test] + fn test_parquet_offset_aware_split_single_large_group() { + // File: 500 bytes, but only one row group + let task = make_parquet_task_with_offsets(0, 500, vec![0]); + let result = split_scan_task(task, 100); + // Can't split at row group boundaries (only one), so entire file is one task + assert_eq!(result.len(), 1); + assert_eq!((result[0].start, result[0].length), (0, 500)); + } + + #[test] + fn test_split_with_nonzero_start() { + let task = make_task(100, 250, DataFileFormat::Avro); + let result = split_scan_task(task, 100); + assert_eq!(result.len(), 3); + assert_eq!((result[0].start, result[0].length), (100, 100)); + assert_eq!((result[1].start, result[1].length), (200, 100)); + assert_eq!((result[2].start, result[2].length), (300, 50)); + } + + #[test] + fn test_merge_adjacent_contiguous() { + let tasks = vec![ + make_task(0, 100, DataFileFormat::Parquet), + make_task(100, 100, DataFileFormat::Parquet), + ]; + let result = merge_adjacent_tasks(tasks); + assert_eq!(result.len(), 1); + assert_eq!(result[0].start, 0); + assert_eq!(result[0].length, 200); + } + + #[test] + fn test_merge_non_contiguous_not_merged() { + let tasks = vec![ + make_task(0, 100, DataFileFormat::Parquet), + make_task(200, 100, DataFileFormat::Parquet), + ]; + let result = merge_adjacent_tasks(tasks); + assert_eq!(result.len(), 2); + } + + #[test] + fn test_merge_different_files_not_merged() { + let mut task1 = make_task(0, 100, DataFileFormat::Parquet); + task1.data_file_path = "file1.parquet".to_string(); + let mut task2 = make_task(100, 100, DataFileFormat::Parquet); + task2.data_file_path = "file2.parquet".to_string(); + + let result = merge_adjacent_tasks(vec![task1, task2]); + assert_eq!(result.len(), 2); + } + + #[test] + fn test_merge_empty_and_single() { + assert!(merge_adjacent_tasks(vec![]).is_empty()); + + let single = vec![make_task(0, 100, DataFileFormat::Parquet)]; + let result = merge_adjacent_tasks(single); + assert_eq!(result.len(), 1); + } +} diff --git a/crates/iceberg/src/scan/task.rs b/crates/iceberg/src/scan/task.rs index 67615c351e..95e5dfb9ee 100644 --- a/crates/iceberg/src/scan/task.rs +++ b/crates/iceberg/src/scan/task.rs @@ -110,6 +110,14 @@ pub struct FileScanTask { /// Whether this scan task should treat column names as case-sensitive when binding predicates. pub case_sensitive: bool, + + /// Row group byte offsets from the DataFile, used for offset-aware splitting + /// of Parquet files during scan planning. + #[serde(default)] + #[serde(skip_serializing_if = "Option::is_none")] + #[serde(serialize_with = "serialize_not_implemented")] + #[serde(deserialize_with = "deserialize_not_implemented")] + pub split_offsets: Option>, } impl FileScanTask { @@ -175,3 +183,42 @@ pub struct FileScanTaskDeleteFile { /// equality ids for equality deletes (null for anything other than equality-deletes) pub equality_ids: Option>, } + +/// A stream of [`CombinedScanTask`]s. +pub type CombinedScanTaskStream = BoxStream<'static, Result>; + +/// A group of [`FileScanTask`]s to be processed together by a single reader. +/// +/// Created by [`TableScan::plan_tasks()`] which splits large files and bin-packs +/// the results into balanced groups for parallel execution. +#[derive(Debug, Clone)] +pub struct CombinedScanTask { + tasks: Vec, +} + +impl CombinedScanTask { + /// Creates a new `CombinedScanTask` from a list of `FileScanTask`s. + pub fn new(tasks: Vec) -> Self { + Self { tasks } + } + + /// Returns the constituent `FileScanTask`s. + pub fn tasks(&self) -> &[FileScanTask] { + &self.tasks + } + + /// Consumes self and returns the constituent `FileScanTask`s. + pub fn into_tasks(self) -> Vec { + self.tasks + } + + /// Returns the total estimated size of all tasks in bytes. + pub fn estimated_bytes(&self) -> u64 { + self.tasks.iter().map(|t| t.length).sum() + } + + /// Returns the number of files in this combined task. + pub fn files_count(&self) -> usize { + self.tasks.len() + } +} diff --git a/crates/iceberg/src/spec/table_properties.rs b/crates/iceberg/src/spec/table_properties.rs index a3d4e7fdaa..469b209157 100644 --- a/crates/iceberg/src/spec/table_properties.rs +++ b/crates/iceberg/src/spec/table_properties.rs @@ -226,6 +226,22 @@ impl TableProperties { pub const PROPERTY_GC_ENABLED: &str = "gc.enabled"; /// Default value for gc.enabled pub const PROPERTY_GC_ENABLED_DEFAULT: bool = true; + + /// Target size in bytes for split scan tasks during scan planning. + pub const PROPERTY_SPLIT_SIZE: &str = "read.split.target-size"; + /// Default target split size: 128 MB + pub const PROPERTY_SPLIT_SIZE_DEFAULT: u64 = 128 * 1024 * 1024; + + /// Number of bins to consider when bin-packing scan tasks. + pub const PROPERTY_SPLIT_LOOKBACK: &str = "read.split.planning-lookback"; + /// Default split lookback: 10 + pub const PROPERTY_SPLIT_LOOKBACK_DEFAULT: usize = 10; + + /// Minimum cost in bytes assigned to opening a file, used as a weight floor + /// in the bin-packing algorithm to avoid creating tasks with many tiny files. + pub const PROPERTY_SPLIT_OPEN_FILE_COST: &str = "read.split.open-file-cost"; + /// Default open file cost: 4 MB + pub const PROPERTY_SPLIT_OPEN_FILE_COST_DEFAULT: u64 = 4 * 1024 * 1024; } impl TryFrom<&HashMap> for TableProperties { diff --git a/crates/integrations/datafusion/src/physical_plan/scan.rs b/crates/integrations/datafusion/src/physical_plan/scan.rs index 234ab26470..a1ac2b06a3 100644 --- a/crates/integrations/datafusion/src/physical_plan/scan.rs +++ b/crates/integrations/datafusion/src/physical_plan/scan.rs @@ -18,7 +18,6 @@ use std::any::Any; use std::pin::Pin; use std::sync::Arc; -use std::vec; use datafusion::arrow::array::RecordBatch; use datafusion::arrow::datatypes::SchemaRef as ArrowSchemaRef; @@ -30,7 +29,10 @@ use datafusion::physical_plan::stream::RecordBatchStreamAdapter; use datafusion::physical_plan::{DisplayAs, ExecutionPlan, Partitioning, PlanProperties}; use datafusion::prelude::Expr; use futures::{Stream, TryStreamExt}; +use iceberg::arrow::ArrowReaderBuilder; use iceberg::expr::Predicate; +use iceberg::io::FileIO; +use iceberg::scan::CombinedScanTask; use iceberg::table::Table; use super::expr_to_predicate::convert_filters_to_predicate; @@ -53,10 +55,18 @@ pub struct IcebergTableScan { predicates: Option, /// Optional limit on the number of rows to return limit: Option, + /// Pre-planned combined scan tasks, one per DataFusion partition. + /// When `None`, falls back to single-partition planning via `plan_files()`. + combined_tasks: Option>>, + /// FileIO for reading data files. + file_io: FileIO, } impl IcebergTableScan { - /// Creates a new [`IcebergTableScan`] object. + /// Creates a new [`IcebergTableScan`] without eagerly planning tasks. + /// + /// This produces a single-partition scan. Call [`plan`] after construction + /// to enable multi-partition execution via `plan_tasks()`. pub(crate) fn new( table: Table, snapshot_id: Option, @@ -69,9 +79,10 @@ impl IcebergTableScan { None => schema.clone(), Some(projection) => Arc::new(schema.project(projection).unwrap()), }; - let plan_properties = Self::compute_properties(output_schema.clone()); + let plan_properties = Self::compute_properties(output_schema, 1); let projection = get_column_names(schema.clone(), projection); let predicates = convert_filters_to_predicate(filters); + let file_io = table.file_io().clone(); Self { table, @@ -80,9 +91,44 @@ impl IcebergTableScan { projection, predicates, limit, + combined_tasks: None, + file_io, } } + /// Eagerly plans scan tasks via `plan_tasks()`, enabling multi-partition + /// parallel execution in DataFusion. + /// + /// If planning fails (e.g. manifests are unreachable), returns `self` + /// unchanged in single-partition mode. Errors will surface at `execute()` time. + pub(crate) async fn plan(mut self) -> Self { + let combined_tasks = self.try_plan_tasks().await; + if let Ok(tasks) = combined_tasks { + let num_partitions = tasks.len().max(1); + self.plan_properties = Self::compute_properties(self.schema(), num_partitions); + self.combined_tasks = Some(Arc::new(tasks)); + } + self + } + + async fn try_plan_tasks(&self) -> Result, iceberg::Error> { + let scan_builder = match self.snapshot_id { + Some(snapshot_id) => self.table.scan().snapshot_id(snapshot_id), + None => self.table.scan(), + }; + let mut scan_builder = match &self.projection { + Some(column_names) => scan_builder.select(column_names.clone()), + None => scan_builder.select_all(), + }; + if let Some(ref pred) = self.predicates { + scan_builder = scan_builder.with_filter(pred.clone()); + } + let table_scan = scan_builder.build()?; + let combined_tasks: Vec = + table_scan.plan_tasks().await?.try_collect().await?; + Ok(combined_tasks) + } + pub fn table(&self) -> &Table { &self.table } @@ -103,14 +149,10 @@ impl IcebergTableScan { self.limit } - /// Computes [`PlanProperties`] used in query optimization. - fn compute_properties(schema: ArrowSchemaRef) -> Arc { - // TODO: - // This is more or less a placeholder, to be replaced - // once we support output-partitioning + fn compute_properties(schema: ArrowSchemaRef, num_partitions: usize) -> Arc { Arc::new(PlanProperties::new( EquivalenceProperties::new(schema), - Partitioning::UnknownPartitioning(1), + Partitioning::UnknownPartitioning(num_partitions), EmissionType::Incremental, Boundedness::Bounded, )) @@ -143,9 +185,41 @@ impl ExecutionPlan for IcebergTableScan { fn execute( &self, - _partition: usize, + partition: usize, _context: Arc, ) -> DFResult { + let schema = self.schema(); + + // If we have pre-planned tasks, use them for partition-aware execution + if let Some(ref combined_tasks) = self.combined_tasks { + let Some(combined_task) = combined_tasks.get(partition) else { + return Ok(Box::pin(RecordBatchStreamAdapter::new( + schema, + futures::stream::empty(), + ))); + }; + + let tasks = combined_task.tasks().to_vec(); + let file_io = self.file_io.clone(); + + let fut = async move { + let task_stream = Box::pin(futures::stream::iter(tasks.into_iter().map(Ok))); + let reader = ArrowReaderBuilder::new(file_io).build(); + let stream = reader + .read(task_stream) + .map_err(to_datafusion_error)? + .map_err(to_datafusion_error); + Ok::<_, datafusion::error::DataFusionError>(stream) + }; + + let stream = futures::stream::once(fut).try_flatten(); + return Ok(Box::pin(RecordBatchStreamAdapter::new( + schema, + apply_limit(stream, self.limit), + ))); + } + + // Fallback: single-partition mode using plan_files() + to_arrow() let fut = get_batch_stream( self.table.clone(), self.snapshot_id, @@ -154,33 +228,37 @@ impl ExecutionPlan for IcebergTableScan { ); let stream = futures::stream::once(fut).try_flatten(); - // Apply limit if specified - let limited_stream: Pin> + Send>> = - if let Some(limit) = self.limit { - let mut remaining = limit; - Box::pin(stream.try_filter_map(move |batch| { - futures::future::ready(if remaining == 0 { - Ok(None) - } else if batch.num_rows() <= remaining { - remaining -= batch.num_rows(); - Ok(Some(batch)) - } else { - let limited_batch = batch.slice(0, remaining); - remaining = 0; - Ok(Some(limited_batch)) - }) - })) - } else { - Box::pin(stream) - }; - Ok(Box::pin(RecordBatchStreamAdapter::new( - self.schema(), - limited_stream, + schema, + apply_limit(stream, self.limit), ))) } } +/// Apply an optional row limit to a stream of record batches. +fn apply_limit( + stream: impl Stream> + Send + 'static, + limit: Option, +) -> Pin> + Send>> { + if let Some(limit) = limit { + let mut remaining = limit; + Box::pin(stream.try_filter_map(move |batch| { + futures::future::ready(if remaining == 0 { + Ok(None) + } else if batch.num_rows() <= remaining { + remaining -= batch.num_rows(); + Ok(Some(batch)) + } else { + let limited_batch = batch.slice(0, remaining); + remaining = 0; + Ok(Some(limited_batch)) + }) + })) + } else { + Box::pin(stream) + } +} + impl DisplayAs for IcebergTableScan { fn fmt_as( &self, @@ -195,7 +273,7 @@ impl DisplayAs for IcebergTableScan { .map_or(String::new(), |v| v.join(",")), self.predicates .clone() - .map_or(String::from(""), |p| format!("{p}")) + .map_or(String::from(""), |p| format!("{p}")), ) } } diff --git a/crates/integrations/datafusion/src/table/mod.rs b/crates/integrations/datafusion/src/table/mod.rs index 75b7988d8d..14368cf47a 100644 --- a/crates/integrations/datafusion/src/table/mod.rs +++ b/crates/integrations/datafusion/src/table/mod.rs @@ -137,14 +137,17 @@ impl TableProvider for IcebergTableProvider { .map_err(to_datafusion_error)?; // Create scan with fresh metadata (always use current snapshot) - Ok(Arc::new(IcebergTableScan::new( + let scan = IcebergTableScan::new( table, None, // Always use current snapshot for catalog-backed provider self.schema.clone(), projection, filters, limit, - ))) + ) + .plan() + .await; + Ok(Arc::new(scan)) } fn supports_filters_pushdown( @@ -315,14 +318,17 @@ impl TableProvider for IcebergStaticTableProvider { limit: Option, ) -> DFResult> { // Use cached table (no refresh) - Ok(Arc::new(IcebergTableScan::new( + let scan = IcebergTableScan::new( self.table.clone(), self.snapshot_id, self.schema.clone(), projection, filters, limit, - ))) + ) + .plan() + .await; + Ok(Arc::new(scan)) } fn supports_filters_pushdown(