Skip to content

Commit ef346c6

Browse files
fvaleyeDrakeLin
andauthored
perf: add skip_stats option to skip reading file statistics (delta-io#1738)
## What changes are proposed in this pull request? Adds `skip_stats` option to `ScanBuilder` that skips reading the stats column from checkpoint parquet files using column projection. Useful when the caller handles data skipping externally or doesn't need file statistics(e.g., a higher layer like [delta-rs](https://github.com/delta-io/delta-rs)). ### Changes: - **Add** `with_skip_stats(bool)` to `ScanBuilder` - **Add** `CHECKPOINT_READ_SCHEMA_NO_STATS` schema without stats column - **Transform** outputs null for stats when `skip_stats=true` - **Disable** `data_skipping_filter` when stats are skipped ## Public API Changes **New public API, non-breaking change:** - `ScanBuilder::with_skip_stats(bool)`: opt-in method to skip reading file statistics ## How was this change tested? **Unit tests:** - Added `test_scan_action_iter_with_skip_stats` to verify stats are null when `skip_stats` is enabled ### Benchmark with synthetic table: I did a quick local test wit these table characteristics: - 50,000 files in a single parquet checkpoint - 20 columns per file - Stats JSON per file: ~1,000 bytes - Checkpoint file size: ~20 MB **Results (5 iterations, average):** | Mode | Time | Improvement | | :--- | :--- | :--- | | **With stats** | 37 ms | - | | **Skip stats** | 8 ms | **76%** | _The improvement comes from parquet column projection, which avoids I/O and JSON deserialization of the stats column._ --------- Co-authored-by: Drake Lin <drakelin18@gmail.com>
1 parent 757a927 commit ef346c6

5 files changed

Lines changed: 242 additions & 26 deletions

File tree

kernel/src/parallel/parallel_phase.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -203,6 +203,7 @@ mod tests {
203203
state_info,
204204
checkpoint_info,
205205
seen_file_keys,
206+
false,
206207
)
207208
}
208209

kernel/src/scan/log_replay.rs

Lines changed: 105 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -41,6 +41,8 @@ struct InternalScanState {
4141
physical_stats_schema: Option<SchemaRef>,
4242
/// Logical stats schema for the file statistics.
4343
logical_stats_schema: Option<SchemaRef>,
44+
#[serde(default)]
45+
skip_stats: bool,
4446
}
4547

4648
/// Serializable processor state for distributed processing. This can be serialized using the
@@ -111,6 +113,8 @@ pub struct ScanLogReplayProcessor {
111113
/// far in the log. This is used to filter out files with Remove actions as
112114
/// well as duplicate entries in the log.
113115
seen_file_keys: HashSet<FileActionKey>,
116+
/// Skip reading file statistics.
117+
skip_stats: bool,
114118
}
115119

116120
impl ScanLogReplayProcessor {
@@ -128,8 +132,15 @@ impl ScanLogReplayProcessor {
128132
engine: &dyn Engine,
129133
state_info: Arc<StateInfo>,
130134
checkpoint_info: CheckpointReadInfo,
135+
skip_stats: bool,
131136
) -> DeltaResult<Self> {
132-
Self::new_with_seen_files(engine, state_info, checkpoint_info, Default::default())
137+
Self::new_with_seen_files(
138+
engine,
139+
state_info,
140+
checkpoint_info,
141+
Default::default(),
142+
skip_stats,
143+
)
133144
}
134145

135146
/// Create new [`ScanLogReplayProcessor`] with pre-populated seen_file_keys.
@@ -142,31 +153,42 @@ impl ScanLogReplayProcessor {
142153
/// - `state_info`: StateInfo containing schemas, transforms, and predicates
143154
/// - `checkpoint_info`: Information about checkpoint reading for stats optimization
144155
/// - `seen_file_keys`: Pre-computed set of file action keys that have been seen
156+
/// - `skip_stats`: Skip reading file statistics
145157
pub(crate) fn new_with_seen_files(
146158
engine: &dyn Engine,
147159
state_info: Arc<StateInfo>,
148160
checkpoint_info: CheckpointReadInfo,
149161
seen_file_keys: HashSet<FileActionKey>,
162+
skip_stats: bool,
150163
) -> DeltaResult<Self> {
151164
let CheckpointReadInfo {
152165
has_stats_parsed,
153166
checkpoint_read_schema,
154167
} = checkpoint_info;
155168

156-
let output_schema =
157-
scan_row_schema_with_stats_parsed(state_info.physical_stats_schema.clone());
158-
159169
// Extract the physical predicate for data skipping and partition filtering.
160170
// DataSkippingFilter expects Option<(PredicateRef, SchemaRef)>.
161171
let physical_predicate = match &state_info.physical_predicate {
162172
PhysicalPredicate::Some(predicate, schema) => Some((predicate.clone(), schema.clone())),
163173
_ => None,
164174
};
165175

176+
// When skip_stats is enabled, use None for stats schema to produce null stats output
177+
let stats_schema_for_transform = if skip_stats {
178+
None
179+
} else {
180+
state_info.physical_stats_schema.clone()
181+
};
182+
183+
let output_schema = scan_row_schema_with_stats_parsed(stats_schema_for_transform.clone());
184+
166185
// Create data skipping filter that reads stats_parsed from the transformed batch.
167186
// This avoids double JSON parsing - the transform parses JSON once, then data skipping
168187
// reads the already-parsed stats_parsed column from the transform output.
169-
let data_skipping_filter =
188+
// Disabled when skip_stats is enabled.
189+
let data_skipping_filter = if skip_stats {
190+
None
191+
} else {
170192
state_info
171193
.physical_stats_schema
172194
.as_ref()
@@ -179,25 +201,27 @@ impl ScanLogReplayProcessor {
179201
output_schema.clone(),
180202
column_expr_ref!("stats_parsed"),
181203
)
182-
});
204+
})
205+
};
183206

184207
Ok(Self {
185208
partition_filter: physical_predicate.as_ref().map(|(p, _)| p.clone()),
186209
data_skipping_filter,
187210
// Log transform: always parse JSON (no stats_parsed in JSON commit files)
188211
log_transform: engine.evaluation_handler().new_expression_evaluator(
189212
checkpoint_read_schema.clone(),
190-
get_add_transform_expr(state_info.physical_stats_schema.clone(), false),
213+
get_add_transform_expr(stats_schema_for_transform.clone(), false, skip_stats),
191214
output_schema.clone().into(),
192215
)?,
193216
// Checkpoint transform: read stats_parsed directly when available, otherwise parse JSON
194217
checkpoint_transform: engine.evaluation_handler().new_expression_evaluator(
195218
checkpoint_read_schema,
196-
get_add_transform_expr(state_info.physical_stats_schema.clone(), has_stats_parsed),
219+
get_add_transform_expr(stats_schema_for_transform, has_stats_parsed, skip_stats),
197220
output_schema.into(),
198221
)?,
199222
seen_file_keys,
200223
state_info,
224+
skip_stats,
201225
})
202226
}
203227

@@ -245,6 +269,7 @@ impl ScanLogReplayProcessor {
245269
column_mapping_mode,
246270
physical_stats_schema,
247271
logical_stats_schema,
272+
skip_stats: self.skip_stats,
248273
};
249274
let internal_state_blob = serde_json::to_vec(&internal_state)
250275
.map_err(|e| Error::generic(format!("Failed to serialize internal state: {}", e)))?;
@@ -308,6 +333,7 @@ impl ScanLogReplayProcessor {
308333
state_info,
309334
state.checkpoint_info,
310335
state.seen_file_keys,
336+
internal_state.skip_stats,
311337
)?;
312338

313339
Ok(Arc::new(processor))
@@ -542,19 +568,27 @@ fn scan_row_schema_with_stats_parsed(stats_schema: Option<SchemaRef>) -> SchemaR
542568
/// - `physical_stats_schema`: Schema for parsing stats from JSON and for output (physical column
543569
/// names), or None if stats should not be included in output.
544570
/// - `has_stats_parsed`: Whether checkpoint has pre-parsed stats_parsed column.
571+
/// - `skip_stats`: When true, replaces the stats column with a null literal, avoiding reads of the
572+
/// raw stats JSON string from checkpoint parquet files.
545573
///
546574
/// The transform includes `stats_parsed` only when `physical_stats_schema` is Some.
547575
/// Stats are output using physical column names. Engines can use `Scan::logical_stats_schema()`
548576
/// to map physical names back to logical names when column mapping is enabled.
549577
fn get_add_transform_expr(
550578
physical_stats_schema: Option<SchemaRef>,
551579
has_stats_parsed: bool,
580+
skip_stats: bool,
552581
) -> ExpressionRef {
582+
let stats_expr = if skip_stats {
583+
Arc::new(Expression::Literal(Scalar::Null(DataType::STRING)))
584+
} else {
585+
column_expr_ref!("add.stats")
586+
};
553587
let mut fields = vec![
554588
column_expr_ref!("add.path"),
555589
column_expr_ref!("add.size"),
556590
column_expr_ref!("add.modificationTime"),
557-
column_expr_ref!("add.stats"),
591+
stats_expr,
558592
column_expr_ref!("add.deletionVector"),
559593
Arc::new(Expression::Struct(vec![
560594
column_expr_ref!("add.partitionValues"),
@@ -756,16 +790,20 @@ impl LogReplayProcessor for ScanLogReplayProcessor {
756790
/// that is selected in the returned `engine_data` _must_ be processed to complete the scan.
757791
/// Non-selected rows _must_ be ignored.
758792
///
793+
/// When `skip_stats` is true, file statistics are not read from checkpoint parquet files and
794+
/// data skipping is disabled.
795+
///
759796
/// Note: The iterator of [`ActionsBatch`]s ('action_iter' parameter) must be sorted by the order of
760797
/// the actions in the log from most recent to least recent.
761798
pub(crate) fn scan_action_iter(
762799
engine: &dyn Engine,
763800
action_iter: impl Iterator<Item = DeltaResult<ActionsBatch>>,
764801
state_info: Arc<StateInfo>,
765802
checkpoint_info: CheckpointReadInfo,
803+
skip_stats: bool,
766804
) -> DeltaResult<impl Iterator<Item = DeltaResult<ScanMetadata>>> {
767805
Ok(
768-
ScanLogReplayProcessor::new(engine, state_info, checkpoint_info)?
806+
ScanLogReplayProcessor::new(engine, state_info, checkpoint_info, skip_stats)?
769807
.process_actions_iter(action_iter),
770808
)
771809
}
@@ -914,6 +952,7 @@ mod tests {
914952
.map(|batch| Ok(ActionsBatch::new(batch as _, true))),
915953
state_info,
916954
test_checkpoint_info(),
955+
false,
917956
)
918957
.unwrap();
919958
for res in iter {
@@ -941,6 +980,7 @@ mod tests {
941980
.map(|batch| Ok(ActionsBatch::new(batch as _, true))),
942981
Arc::new(state_info),
943982
test_checkpoint_info(),
983+
false,
944984
)
945985
.unwrap();
946986

@@ -1025,6 +1065,7 @@ mod tests {
10251065
.map(|batch| Ok(ActionsBatch::new(batch as _, true))),
10261066
Arc::new(state_info),
10271067
test_checkpoint_info(),
1068+
false,
10281069
)
10291070
.unwrap();
10301071

@@ -1069,6 +1110,7 @@ mod tests {
10691110
&engine,
10701111
Arc::new(get_simple_state_info(schema.clone(), vec![]).unwrap()),
10711112
checkpoint_info.clone(),
1113+
false,
10721114
)
10731115
.unwrap();
10741116

@@ -1136,9 +1178,13 @@ mod tests {
11361178
_ => panic!("Expected predicate"),
11371179
};
11381180
let checkpoint_info = test_checkpoint_info();
1139-
let processor =
1140-
ScanLogReplayProcessor::new(&engine, state_info.clone(), checkpoint_info.clone())
1141-
.unwrap();
1181+
let processor = ScanLogReplayProcessor::new(
1182+
&engine,
1183+
state_info.clone(),
1184+
checkpoint_info.clone(),
1185+
false,
1186+
)
1187+
.unwrap();
11421188
let deserialized = ScanLogReplayProcessor::from_serializable_state(
11431189
&engine,
11441190
processor.into_serializable_state(checkpoint_info).unwrap(),
@@ -1189,9 +1235,13 @@ mod tests {
11891235
let original_transform = state_info.transform_spec.clone();
11901236
assert!(original_transform.is_some());
11911237
let checkpoint_info = test_checkpoint_info();
1192-
let processor =
1193-
ScanLogReplayProcessor::new(&engine, state_info.clone(), checkpoint_info.clone())
1194-
.unwrap();
1238+
let processor = ScanLogReplayProcessor::new(
1239+
&engine,
1240+
state_info.clone(),
1241+
checkpoint_info.clone(),
1242+
false,
1243+
)
1244+
.unwrap();
11951245
let deserialized = ScanLogReplayProcessor::from_serializable_state(
11961246
&engine,
11971247
processor.into_serializable_state(checkpoint_info).unwrap(),
@@ -1225,7 +1275,8 @@ mod tests {
12251275
});
12261276
let checkpoint_info = test_checkpoint_info();
12271277
let processor =
1228-
ScanLogReplayProcessor::new(&engine, state_info, checkpoint_info.clone()).unwrap();
1278+
ScanLogReplayProcessor::new(&engine, state_info, checkpoint_info.clone(), false)
1279+
.unwrap();
12291280
let deserialized = ScanLogReplayProcessor::from_serializable_state(
12301281
&engine,
12311282
processor.into_serializable_state(checkpoint_info).unwrap(),
@@ -1255,7 +1306,8 @@ mod tests {
12551306
logical_stats_schema: None,
12561307
});
12571308
let processor =
1258-
ScanLogReplayProcessor::new(&engine, state_info, checkpoint_info.clone()).unwrap();
1309+
ScanLogReplayProcessor::new(&engine, state_info, checkpoint_info.clone(), false)
1310+
.unwrap();
12591311
let serialized = processor.into_serializable_state(checkpoint_info).unwrap();
12601312
assert!(serialized.predicate.is_none());
12611313
let deserialized =
@@ -1296,6 +1348,7 @@ mod tests {
12961348
column_mapping_mode: ColumnMappingMode::None,
12971349
physical_stats_schema: None,
12981350
logical_stats_schema: None,
1351+
skip_stats: false,
12991352
};
13001353
let predicate = Arc::new(crate::expressions::Predicate::column(["id"]));
13011354
let invalid_blob = serde_json::to_vec(&invalid_internal_state).unwrap();
@@ -1327,6 +1380,7 @@ mod tests {
13271380
column_mapping_mode: ColumnMappingMode::None,
13281381
physical_stats_schema: None,
13291382
logical_stats_schema: None,
1383+
skip_stats: false,
13301384
};
13311385
let blob = serde_json::to_string(&invalid_internal_state).unwrap();
13321386
let mut obj: serde_json::Value = serde_json::from_str(&blob).unwrap();
@@ -1374,4 +1428,37 @@ mod tests {
13741428
let result = serde_json::to_string(&state);
13751429
assert_result_error_with_message(result, "Cannot serialize an Opaque Predicate");
13761430
}
1431+
1432+
#[test]
1433+
fn test_scan_action_iter_with_skip_stats() {
1434+
let batch = vec![add_batch_simple(get_commit_schema().clone())];
1435+
let schema: SchemaRef = Arc::new(StructType::new_unchecked([
1436+
StructField::new("value", DataType::INTEGER, true),
1437+
StructField::new("date", DataType::DATE, true),
1438+
]));
1439+
let state_info = get_simple_state_info(schema, vec!["date".to_string()]).unwrap();
1440+
1441+
let iter = scan_action_iter(
1442+
&SyncEngine::new(),
1443+
batch
1444+
.into_iter()
1445+
.map(|batch| Ok(ActionsBatch::new(batch as _, true))),
1446+
Arc::new(state_info),
1447+
test_checkpoint_info(),
1448+
true,
1449+
)
1450+
.unwrap();
1451+
1452+
let mut found_add = false;
1453+
for res in iter {
1454+
let scan_metadata = res.unwrap();
1455+
scan_metadata
1456+
.visit_scan_files((), |_: &mut (), scan_file: ScanFile| {
1457+
assert!(scan_file.stats.is_none());
1458+
})
1459+
.unwrap();
1460+
found_add = true;
1461+
}
1462+
assert!(found_add);
1463+
}
13771464
}

0 commit comments

Comments
 (0)