Skip to content

Commit 94cc048

Browse files
committed
df
1 parent 53a835b commit 94cc048

7 files changed

Lines changed: 167 additions & 73 deletions

File tree

crates/iceberg/src/arrow/caching_delete_file_loader.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -943,7 +943,7 @@ mod tests {
943943
partition_spec: None,
944944
name_mapping: None,
945945
case_sensitive: false,
946-
split_offsets: None,
946+
split_offsets: None,
947947
};
948948

949949
// Load the deletes - should handle both types without error

crates/iceberg/src/arrow/delete_filter.rs

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -428,7 +428,7 @@ pub(crate) mod tests {
428428
partition_spec: None,
429429
name_mapping: None,
430430
case_sensitive: false,
431-
split_offsets: None,
431+
split_offsets: None,
432432
},
433433
FileScanTask {
434434
file_size_in_bytes: 0,
@@ -445,7 +445,7 @@ pub(crate) mod tests {
445445
partition_spec: None,
446446
name_mapping: None,
447447
case_sensitive: false,
448-
split_offsets: None,
448+
split_offsets: None,
449449
},
450450
];
451451

@@ -503,7 +503,7 @@ pub(crate) mod tests {
503503
partition_spec: None,
504504
name_mapping: None,
505505
case_sensitive: true,
506-
split_offsets: None,
506+
split_offsets: None,
507507
};
508508

509509
let filter = DeleteFilter::default();

crates/iceberg/src/arrow/reader.rs

Lines changed: 20 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -2333,7 +2333,7 @@ message schema {
23332333
partition_spec: None,
23342334
name_mapping: None,
23352335
case_sensitive: false,
2336-
split_offsets: None,
2336+
split_offsets: None,
23372337
})]
23382338
.into_iter(),
23392339
)) as FileScanTaskStream;
@@ -2657,7 +2657,7 @@ message schema {
26572657
partition_spec: None,
26582658
name_mapping: None,
26592659
case_sensitive: false,
2660-
split_offsets: None,
2660+
split_offsets: None,
26612661
};
26622662

26632663
// Task 2: read the second and third row groups
@@ -2676,7 +2676,7 @@ message schema {
26762676
partition_spec: None,
26772677
name_mapping: None,
26782678
case_sensitive: false,
2679-
split_offsets: None,
2679+
split_offsets: None,
26802680
};
26812681

26822682
let tasks1 = Box::pin(futures::stream::iter(vec![Ok(task1)])) as FileScanTaskStream;
@@ -2808,7 +2808,7 @@ message schema {
28082808
partition_spec: None,
28092809
name_mapping: None,
28102810
case_sensitive: false,
2811-
split_offsets: None,
2811+
split_offsets: None,
28122812
})]
28132813
.into_iter(),
28142814
)) as FileScanTaskStream;
@@ -2983,7 +2983,7 @@ message schema {
29832983
partition_spec: None,
29842984
name_mapping: None,
29852985
case_sensitive: false,
2986-
split_offsets: None,
2986+
split_offsets: None,
29872987
};
29882988

29892989
let tasks = Box::pin(futures::stream::iter(vec![Ok(task)])) as FileScanTaskStream;
@@ -3204,7 +3204,7 @@ message schema {
32043204
partition_spec: None,
32053205
name_mapping: None,
32063206
case_sensitive: false,
3207-
split_offsets: None,
3207+
split_offsets: None,
32083208
};
32093209

32103210
let tasks = Box::pin(futures::stream::iter(vec![Ok(task)])) as FileScanTaskStream;
@@ -3418,7 +3418,7 @@ message schema {
34183418
partition_spec: None,
34193419
name_mapping: None,
34203420
case_sensitive: false,
3421-
split_offsets: None,
3421+
split_offsets: None,
34223422
};
34233423

34243424
let tasks = Box::pin(futures::stream::iter(vec![Ok(task)])) as FileScanTaskStream;
@@ -3526,7 +3526,7 @@ message schema {
35263526
partition_spec: None,
35273527
name_mapping: None,
35283528
case_sensitive: false,
3529-
split_offsets: None,
3529+
split_offsets: None,
35303530
})]
35313531
.into_iter(),
35323532
)) as FileScanTaskStream;
@@ -3628,7 +3628,7 @@ message schema {
36283628
partition_spec: None,
36293629
name_mapping: None,
36303630
case_sensitive: false,
3631-
split_offsets: None,
3631+
split_offsets: None,
36323632
})]
36333633
.into_iter(),
36343634
)) as FileScanTaskStream;
@@ -3719,7 +3719,7 @@ message schema {
37193719
partition_spec: None,
37203720
name_mapping: None,
37213721
case_sensitive: false,
3722-
split_offsets: None,
3722+
split_offsets: None,
37233723
})]
37243724
.into_iter(),
37253725
)) as FileScanTaskStream;
@@ -3824,7 +3824,7 @@ message schema {
38243824
partition_spec: None,
38253825
name_mapping: None,
38263826
case_sensitive: false,
3827-
split_offsets: None,
3827+
split_offsets: None,
38283828
})]
38293829
.into_iter(),
38303830
)) as FileScanTaskStream;
@@ -3958,7 +3958,7 @@ message schema {
39583958
partition_spec: None,
39593959
name_mapping: None,
39603960
case_sensitive: false,
3961-
split_offsets: None,
3961+
split_offsets: None,
39623962
})]
39633963
.into_iter(),
39643964
)) as FileScanTaskStream;
@@ -4059,7 +4059,7 @@ message schema {
40594059
partition_spec: None,
40604060
name_mapping: None,
40614061
case_sensitive: false,
4062-
split_offsets: None,
4062+
split_offsets: None,
40634063
})]
40644064
.into_iter(),
40654065
)) as FileScanTaskStream;
@@ -4173,7 +4173,7 @@ message schema {
41734173
partition_spec: None,
41744174
name_mapping: None,
41754175
case_sensitive: false,
4176-
split_offsets: None,
4176+
split_offsets: None,
41774177
})]
41784178
.into_iter(),
41794179
)) as FileScanTaskStream;
@@ -4268,7 +4268,7 @@ message schema {
42684268
partition_spec: None,
42694269
name_mapping: None,
42704270
case_sensitive: false,
4271-
split_offsets: None,
4271+
split_offsets: None,
42724272
}),
42734273
Ok(FileScanTask {
42744274
file_size_in_bytes: std::fs::metadata(format!("{table_location}/file_1.parquet"))
@@ -4287,7 +4287,7 @@ message schema {
42874287
partition_spec: None,
42884288
name_mapping: None,
42894289
case_sensitive: false,
4290-
split_offsets: None,
4290+
split_offsets: None,
42914291
}),
42924292
Ok(FileScanTask {
42934293
file_size_in_bytes: std::fs::metadata(format!("{table_location}/file_2.parquet"))
@@ -4306,7 +4306,7 @@ message schema {
43064306
partition_spec: None,
43074307
name_mapping: None,
43084308
case_sensitive: false,
4309-
split_offsets: None,
4309+
split_offsets: None,
43104310
}),
43114311
];
43124312

@@ -4489,7 +4489,7 @@ message schema {
44894489
partition_spec: Some(partition_spec),
44904490
name_mapping: None,
44914491
case_sensitive: false,
4492-
split_offsets: None,
4492+
split_offsets: None,
44934493
})]
44944494
.into_iter(),
44954495
)) as FileScanTaskStream;
@@ -4906,7 +4906,7 @@ message schema {
49064906
partition_spec: None,
49074907
name_mapping: None,
49084908
case_sensitive: false,
4909-
split_offsets: None,
4909+
split_offsets: None,
49104910
})]
49114911
.into_iter(),
49124912
)) as FileScanTaskStream;
@@ -4975,7 +4975,7 @@ message schema {
49754975
partition_spec: None,
49764976
name_mapping: None,
49774977
case_sensitive: false,
4978-
split_offsets: None,
4978+
split_offsets: None,
49794979
};
49804980

49814981
let tasks = Box::pin(futures::stream::iter(vec![Ok(task)])) as FileScanTaskStream;

crates/iceberg/src/scan/mod.rs

Lines changed: 10 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -404,8 +404,7 @@ impl TableScan {
404404
/// Use this method when you need evenly-sized work units, e.g. for
405405
/// distributing scan work across multiple threads or nodes.
406406
pub async fn plan_tasks(&self) -> Result<CombinedScanTaskStream> {
407-
let file_tasks: Vec<FileScanTask> =
408-
self.plan_files().await?.try_collect().await?;
407+
let file_tasks: Vec<FileScanTask> = self.plan_files().await?.try_collect().await?;
409408

410409
let split_tasks: Vec<FileScanTask> = file_tasks
411410
.into_iter()
@@ -414,8 +413,12 @@ impl TableScan {
414413

415414
let open_file_cost = self.split_open_file_cost;
416415
let weight_fn = |task: &FileScanTask| -> u64 {
417-
let content_size =
418-
task.length + task.deletes.iter().map(|d| d.file_size_in_bytes).sum::<u64>();
416+
let content_size = task.length
417+
+ task
418+
.deletes
419+
.iter()
420+
.map(|d| d.file_size_in_bytes)
421+
.sum::<u64>();
419422
let open_cost = (1 + task.deletes.len() as u64) * open_file_cost;
420423
content_size.max(open_cost)
421424
};
@@ -432,7 +435,9 @@ impl TableScan {
432435
.map(|group| CombinedScanTask::new(merge_adjacent_tasks(group)))
433436
.collect();
434437

435-
Ok(Box::pin(futures::stream::iter(combined.into_iter().map(Ok))))
438+
Ok(Box::pin(futures::stream::iter(
439+
combined.into_iter().map(Ok),
440+
)))
436441
}
437442

438443
/// Returns a stream of [`FileScanTask`]s.

crates/iceberg/src/scan/split.rs

Lines changed: 12 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -65,7 +65,12 @@ fn split_by_offsets(
6565
let mut group_size: u64 = 0;
6666

6767
for i in 1..boundaries.len() {
68-
let segment_size = boundaries[i] - (if i > 0 { boundaries[i - 1] } else { group_start });
68+
let segment_size = boundaries[i]
69+
- (if i > 0 {
70+
boundaries[i - 1]
71+
} else {
72+
group_start
73+
});
6974
group_size += segment_size;
7075

7176
if group_size >= target_split_size {
@@ -139,7 +144,11 @@ pub(crate) fn merge_adjacent_tasks(mut tasks: Vec<FileScanTask>) -> Vec<FileScan
139144
}
140145

141146
// Sort by file path then by start offset for deterministic merging
142-
tasks.sort_by(|a, b| a.data_file_path.cmp(&b.data_file_path).then(a.start.cmp(&b.start)));
147+
tasks.sort_by(|a, b| {
148+
a.data_file_path
149+
.cmp(&b.data_file_path)
150+
.then(a.start.cmp(&b.start))
151+
});
143152

144153
let mut result: Vec<FileScanTask> = Vec::with_capacity(tasks.len());
145154
let mut iter = tasks.into_iter();
@@ -194,11 +203,7 @@ mod tests {
194203
}
195204
}
196205

197-
fn make_parquet_task_with_offsets(
198-
start: u64,
199-
length: u64,
200-
offsets: Vec<i64>,
201-
) -> FileScanTask {
206+
fn make_parquet_task_with_offsets(start: u64, length: u64, offsets: Vec<i64>) -> FileScanTask {
202207
let mut task = make_task(start, length, DataFileFormat::Parquet);
203208
task.split_offsets = Some(offsets);
204209
task

0 commit comments

Comments
 (0)