Skip to content

Commit bcaf921

Browse files
authored
feat: read parsed-stats from checkpoint (delta-io#1638)
## 🥞 Stacked PR Use this [link](https://github.com/delta-io/delta-kernel-rs/pull/1638/files) to review incremental changes. - [**stack/has_compatible_parsed_stats**](delta-io#1638) [[Files changed](https://github.com/delta-io/delta-kernel-rs/pull/1638/files)] - [stack/output-stat-columns-all](delta-io#1720) [[Files changed](https://github.com/delta-io/delta-kernel-rs/pull/1720/files/6f727fd4d2fe1bd7733f98063c83c8624b74370a..71054a500a40f59762098d1dea7073bf161d3570)] --------- ## What changes are proposed in this pull request? This PR adds infrastructure to detect when checkpoints have compatible pre-parsed statistics (stats_parsed) that can be used for data skipping without JSON parsing. Added CheckpointReadInfo struct containing: - has_stats_parsed: bool - whether checkpoint has compatible pre-parsed stats - checkpoint_read_schema: SchemaRef - schema used to read checkpoint files ## How was this change tested? New and existing unit tests
1 parent 4eb3bef commit bcaf921

9 files changed

Lines changed: 467 additions & 90 deletions

File tree

kernel/src/log_segment.rs

Lines changed: 193 additions & 50 deletions
Large diffs are not rendered by default.

kernel/src/log_segment/tests.rs

Lines changed: 207 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -1170,8 +1170,13 @@ async fn test_create_checkpoint_stream_returns_checkpoint_batches_as_is_if_schem
11701170
None,
11711171
None,
11721172
)?;
1173-
let mut iter =
1174-
log_segment.create_checkpoint_stream(&engine, v2_checkpoint_read_schema.clone(), None)?;
1173+
let checkpoint_result = log_segment.create_checkpoint_stream(
1174+
&engine,
1175+
v2_checkpoint_read_schema.clone(),
1176+
None,
1177+
None,
1178+
)?;
1179+
let mut iter = checkpoint_result.actions;
11751180

11761181
// Assert that the first batch returned is from reading checkpoint file 1
11771182
let ActionsBatch {
@@ -1235,8 +1240,13 @@ async fn test_create_checkpoint_stream_returns_checkpoint_batches_if_checkpoint_
12351240
None,
12361241
None,
12371242
)?;
1238-
let mut iter =
1239-
log_segment.create_checkpoint_stream(&engine, v2_checkpoint_read_schema.clone(), None)?;
1243+
let checkpoint_result = log_segment.create_checkpoint_stream(
1244+
&engine,
1245+
v2_checkpoint_read_schema.clone(),
1246+
None,
1247+
None,
1248+
)?;
1249+
let mut iter = checkpoint_result.actions;
12401250

12411251
// Assert the correctness of batches returned
12421252
for expected_sidecar in ["sidecar1.parquet", "sidecar2.parquet"].iter() {
@@ -1295,8 +1305,13 @@ async fn test_create_checkpoint_stream_reads_parquet_checkpoint_batch_without_si
12951305
None,
12961306
None,
12971307
)?;
1298-
let mut iter =
1299-
log_segment.create_checkpoint_stream(&engine, v2_checkpoint_read_schema.clone(), None)?;
1308+
let checkpoint_result = log_segment.create_checkpoint_stream(
1309+
&engine,
1310+
v2_checkpoint_read_schema.clone(),
1311+
None,
1312+
None,
1313+
)?;
1314+
let mut iter = checkpoint_result.actions;
13001315

13011316
// Assert that the first batch returned is from reading checkpoint file 1
13021317
let ActionsBatch {
@@ -1344,8 +1359,9 @@ async fn test_create_checkpoint_stream_reads_json_checkpoint_batch_without_sidec
13441359
None,
13451360
None,
13461361
)?;
1347-
let mut iter =
1348-
log_segment.create_checkpoint_stream(&engine, v2_checkpoint_read_schema, None)?;
1362+
let checkpoint_result =
1363+
log_segment.create_checkpoint_stream(&engine, v2_checkpoint_read_schema, None, None)?;
1364+
let mut iter = checkpoint_result.actions;
13491365

13501366
// Assert that the first batch returned is from reading checkpoint file 1
13511367
let ActionsBatch {
@@ -1431,8 +1447,13 @@ async fn test_create_checkpoint_stream_reads_checkpoint_file_and_returns_sidecar
14311447
None,
14321448
None,
14331449
)?;
1434-
let mut iter =
1435-
log_segment.create_checkpoint_stream(&engine, v2_checkpoint_read_schema.clone(), None)?;
1450+
let checkpoint_result = log_segment.create_checkpoint_stream(
1451+
&engine,
1452+
v2_checkpoint_read_schema.clone(),
1453+
None,
1454+
None,
1455+
)?;
1456+
let mut iter = checkpoint_result.actions;
14361457

14371458
// Assert that the first batch returned is from reading checkpoint file 1
14381459
let ActionsBatch {
@@ -2653,6 +2674,18 @@ fn create_checkpoint_schema_with_stats_parsed(min_values_fields: Vec<StructField
26532674
StructType::new_unchecked([StructField::nullable("add", add_schema)])
26542675
}
26552676

2677+
// Helper to create a stats_schema with proper structure (numRecords, minValues, maxValues)
2678+
fn create_stats_schema(column_fields: Vec<StructField>) -> StructType {
2679+
StructType::new_unchecked([
2680+
StructField::nullable("numRecords", DataType::LONG),
2681+
StructField::nullable(
2682+
"minValues",
2683+
StructType::new_unchecked(column_fields.clone()),
2684+
),
2685+
StructField::nullable("maxValues", StructType::new_unchecked(column_fields)),
2686+
])
2687+
}
2688+
26562689
// Helper to create a checkpoint schema without stats_parsed
26572690
fn create_checkpoint_schema_without_stats_parsed() -> StructType {
26582691
use crate::schema::StructType;
@@ -2675,15 +2708,15 @@ fn test_schema_has_compatible_stats_parsed_basic() {
26752708
)]);
26762709

26772710
// Exact type match should work
2678-
let stats_schema = StructType::new_unchecked([StructField::nullable("id", DataType::INTEGER)]);
2711+
let stats_schema = create_stats_schema(vec![StructField::nullable("id", DataType::INTEGER)]);
26792712
assert!(LogSegment::schema_has_compatible_stats_parsed(
26802713
&checkpoint_schema,
26812714
&stats_schema
26822715
));
26832716

26842717
// Type widening (int -> long) should work
26852718
let stats_schema_widened =
2686-
StructType::new_unchecked([StructField::nullable("id", DataType::LONG)]);
2719+
create_stats_schema(vec![StructField::nullable("id", DataType::LONG)]);
26872720
assert!(LogSegment::schema_has_compatible_stats_parsed(
26882721
&checkpoint_schema,
26892722
&stats_schema_widened
@@ -2703,17 +2736,18 @@ fn test_schema_has_compatible_stats_parsed_basic() {
27032736

27042737
#[test]
27052738
fn test_schema_has_compatible_stats_parsed_missing_column_ok() {
2706-
// Checkpoint has "id" column, stats schema needs "other" column (missing in checkpoint is OK)
2739+
// Checkpoint has "id" column, stats schema needs "other" column
2740+
// Missing column is acceptable - it will return null when accessed
27072741
let checkpoint_schema =
27082742
create_checkpoint_schema_with_stats_parsed(vec![StructField::nullable(
27092743
"id",
27102744
DataType::INTEGER,
27112745
)]);
27122746

2713-
let stats_schema =
2714-
StructType::new_unchecked([StructField::nullable("other", DataType::INTEGER)]);
2747+
let stats_schema = create_stats_schema(vec![StructField::nullable("other", DataType::INTEGER)]);
27152748

2716-
// Missing column in checkpoint is OK - it will just return NULL
2749+
// Missing column in checkpoint is OK - it will return null when accessed,
2750+
// which is acceptable for data skipping (just means we can't skip based on that column)
27172751
assert!(LogSegment::schema_has_compatible_stats_parsed(
27182752
&checkpoint_schema,
27192753
&stats_schema
@@ -2728,7 +2762,7 @@ fn test_schema_has_compatible_stats_parsed_extra_column_ok() {
27282762
StructField::nullable("extra", DataType::STRING),
27292763
]);
27302764

2731-
let stats_schema = StructType::new_unchecked([StructField::nullable("id", DataType::INTEGER)]);
2765+
let stats_schema = create_stats_schema(vec![StructField::nullable("id", DataType::INTEGER)]);
27322766

27332767
assert!(LogSegment::schema_has_compatible_stats_parsed(
27342768
&checkpoint_schema,
@@ -2741,7 +2775,7 @@ fn test_schema_has_compatible_stats_parsed_no_stats_parsed() {
27412775
// Checkpoint schema without stats_parsed field
27422776
let checkpoint_schema = create_checkpoint_schema_without_stats_parsed();
27432777

2744-
let stats_schema = StructType::new_unchecked([StructField::nullable("id", DataType::INTEGER)]);
2778+
let stats_schema = create_stats_schema(vec![StructField::nullable("id", DataType::INTEGER)]);
27452779

27462780
assert!(!LogSegment::schema_has_compatible_stats_parsed(
27472781
&checkpoint_schema,
@@ -2758,7 +2792,7 @@ fn test_schema_has_compatible_stats_parsed_empty_stats_schema() {
27582792
DataType::INTEGER,
27592793
)]);
27602794

2761-
let stats_schema = StructType::new_unchecked([]);
2795+
let stats_schema = create_stats_schema(vec![]);
27622796

27632797
// If no columns are needed for data skipping, any stats_parsed is compatible
27642798
assert!(LogSegment::schema_has_compatible_stats_parsed(
@@ -2776,7 +2810,7 @@ fn test_schema_has_compatible_stats_parsed_multiple_columns() {
27762810
]);
27772811

27782812
// First column matches, second is incompatible
2779-
let stats_schema = StructType::new_unchecked([
2813+
let stats_schema = create_stats_schema(vec![
27802814
StructField::nullable("good_col", DataType::LONG),
27812815
StructField::nullable("bad_col", DataType::INTEGER),
27822816
]);
@@ -2802,7 +2836,7 @@ fn test_schema_has_compatible_stats_parsed_missing_min_max_values() {
28022836

28032837
let checkpoint_schema = StructType::new_unchecked([StructField::nullable("add", add_schema)]);
28042838

2805-
let stats_schema = StructType::new_unchecked([StructField::nullable("id", DataType::INTEGER)]);
2839+
let stats_schema = create_stats_schema(vec![StructField::nullable("id", DataType::INTEGER)]);
28062840

28072841
// Should return true - missing minValues/maxValues is handled gracefully with continue
28082842
assert!(LogSegment::schema_has_compatible_stats_parsed(
@@ -2828,7 +2862,7 @@ fn test_schema_has_compatible_stats_parsed_min_values_not_struct() {
28282862

28292863
let checkpoint_schema = StructType::new_unchecked([StructField::nullable("add", add_schema)]);
28302864

2831-
let stats_schema = StructType::new_unchecked([StructField::nullable("id", DataType::INTEGER)]);
2865+
let stats_schema = create_stats_schema(vec![StructField::nullable("id", DataType::INTEGER)]);
28322866

28332867
// Should return false - minValues/maxValues must be Struct types
28342868
assert!(!LogSegment::schema_has_compatible_stats_parsed(
@@ -2837,6 +2871,157 @@ fn test_schema_has_compatible_stats_parsed_min_values_not_struct() {
28372871
));
28382872
}
28392873

2874+
#[test]
2875+
fn test_schema_has_compatible_stats_parsed_nested_struct() {
2876+
// Create a nested struct: user: { name: string, age: integer }
2877+
let user_struct = StructType::new_unchecked([
2878+
StructField::nullable("name", DataType::STRING),
2879+
StructField::nullable("age", DataType::INTEGER),
2880+
]);
2881+
2882+
let checkpoint_schema =
2883+
create_checkpoint_schema_with_stats_parsed(vec![StructField::nullable(
2884+
"user",
2885+
user_struct.clone(),
2886+
)]);
2887+
2888+
// Exact match should work
2889+
let stats_schema = create_stats_schema(vec![StructField::nullable("user", user_struct)]);
2890+
assert!(LogSegment::schema_has_compatible_stats_parsed(
2891+
&checkpoint_schema,
2892+
&stats_schema
2893+
));
2894+
}
2895+
2896+
#[test]
2897+
fn test_schema_has_compatible_stats_parsed_nested_struct_with_extra_fields() {
2898+
// Checkpoint has extra nested fields not needed by stats schema
2899+
let checkpoint_user = StructType::new_unchecked([
2900+
StructField::nullable("name", DataType::STRING),
2901+
StructField::nullable("age", DataType::INTEGER),
2902+
StructField::nullable("extra", DataType::STRING), // extra field
2903+
]);
2904+
2905+
let checkpoint_schema =
2906+
create_checkpoint_schema_with_stats_parsed(vec![StructField::nullable(
2907+
"user",
2908+
checkpoint_user,
2909+
)]);
2910+
2911+
// Stats schema only needs a subset of fields
2912+
let stats_user = StructType::new_unchecked([StructField::nullable("name", DataType::STRING)]);
2913+
2914+
let stats_schema = create_stats_schema(vec![StructField::nullable("user", stats_user)]);
2915+
2916+
// Extra fields in checkpoint nested struct should be OK
2917+
assert!(LogSegment::schema_has_compatible_stats_parsed(
2918+
&checkpoint_schema,
2919+
&stats_schema
2920+
));
2921+
}
2922+
2923+
#[test]
2924+
fn test_schema_has_compatible_stats_parsed_nested_struct_missing_field_ok() {
2925+
// Checkpoint is missing a nested field that stats schema needs
2926+
let checkpoint_user =
2927+
StructType::new_unchecked([StructField::nullable("name", DataType::STRING)]);
2928+
2929+
let checkpoint_schema =
2930+
create_checkpoint_schema_with_stats_parsed(vec![StructField::nullable(
2931+
"user",
2932+
checkpoint_user,
2933+
)]);
2934+
2935+
// Stats schema needs more fields than checkpoint has
2936+
let stats_user = StructType::new_unchecked([
2937+
StructField::nullable("name", DataType::STRING),
2938+
StructField::nullable("age", DataType::INTEGER), // missing in checkpoint
2939+
]);
2940+
2941+
let stats_schema = create_stats_schema(vec![StructField::nullable("user", stats_user)]);
2942+
2943+
// Missing nested field is OK - it will return null when accessed
2944+
assert!(LogSegment::schema_has_compatible_stats_parsed(
2945+
&checkpoint_schema,
2946+
&stats_schema
2947+
));
2948+
}
2949+
2950+
#[test]
2951+
fn test_schema_has_compatible_stats_parsed_nested_struct_type_mismatch() {
2952+
// Checkpoint has incompatible type in nested field
2953+
let checkpoint_user = StructType::new_unchecked([
2954+
StructField::nullable("name", DataType::INTEGER), // wrong type!
2955+
]);
2956+
2957+
let checkpoint_schema =
2958+
create_checkpoint_schema_with_stats_parsed(vec![StructField::nullable(
2959+
"user",
2960+
checkpoint_user,
2961+
)]);
2962+
2963+
let stats_user = StructType::new_unchecked([StructField::nullable("name", DataType::STRING)]);
2964+
2965+
let stats_schema = create_stats_schema(vec![StructField::nullable("user", stats_user)]);
2966+
2967+
// Type mismatch in nested field should fail
2968+
assert!(!LogSegment::schema_has_compatible_stats_parsed(
2969+
&checkpoint_schema,
2970+
&stats_schema
2971+
));
2972+
}
2973+
2974+
#[test]
2975+
fn test_schema_has_compatible_stats_parsed_deeply_nested() {
2976+
// Deeply nested: company: { department: { team: { name: string } } }
2977+
let team = StructType::new_unchecked([StructField::nullable("name", DataType::STRING)]);
2978+
let department = StructType::new_unchecked([StructField::nullable("team", team.clone())]);
2979+
let company = StructType::new_unchecked([StructField::nullable("department", department)]);
2980+
2981+
let checkpoint_schema =
2982+
create_checkpoint_schema_with_stats_parsed(vec![StructField::nullable(
2983+
"company",
2984+
company.clone(),
2985+
)]);
2986+
2987+
let stats_schema = create_stats_schema(vec![StructField::nullable("company", company)]);
2988+
2989+
assert!(LogSegment::schema_has_compatible_stats_parsed(
2990+
&checkpoint_schema,
2991+
&stats_schema
2992+
));
2993+
}
2994+
2995+
#[test]
2996+
fn test_schema_has_compatible_stats_parsed_deeply_nested_type_mismatch() {
2997+
// Type mismatch deep in nested structure
2998+
let checkpoint_team =
2999+
StructType::new_unchecked([StructField::nullable("name", DataType::INTEGER)]); // wrong!
3000+
let checkpoint_dept =
3001+
StructType::new_unchecked([StructField::nullable("team", checkpoint_team)]);
3002+
let checkpoint_company =
3003+
StructType::new_unchecked([StructField::nullable("department", checkpoint_dept)]);
3004+
3005+
let checkpoint_schema =
3006+
create_checkpoint_schema_with_stats_parsed(vec![StructField::nullable(
3007+
"company",
3008+
checkpoint_company,
3009+
)]);
3010+
3011+
let stats_team = StructType::new_unchecked([StructField::nullable("name", DataType::STRING)]);
3012+
let stats_dept = StructType::new_unchecked([StructField::nullable("team", stats_team)]);
3013+
let stats_company =
3014+
StructType::new_unchecked([StructField::nullable("department", stats_dept)]);
3015+
3016+
let stats_schema = create_stats_schema(vec![StructField::nullable("company", stats_company)]);
3017+
3018+
// Type mismatch deep in hierarchy should be detected
3019+
assert!(!LogSegment::schema_has_compatible_stats_parsed(
3020+
&checkpoint_schema,
3021+
&stats_schema
3022+
));
3023+
}
3024+
28403025
// ============================================================================
28413026
// new_with_commit tests
28423027
// ============================================================================

kernel/src/scan/data_skipping/stats_schema/mod.rs

Lines changed: 25 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -3,11 +3,12 @@
33
mod column_filter;
44

55
use std::borrow::Cow;
6+
use std::sync::Arc;
67

78
use crate::{
89
schema::{
9-
ArrayType, ColumnName, DataType, MapType, PrimitiveType, Schema, SchemaTransform,
10-
StructField, StructType,
10+
ArrayType, ColumnName, DataType, MapType, PrimitiveType, Schema, SchemaRef,
11+
SchemaTransform, StructField, StructType,
1112
},
1213
table_properties::TableProperties,
1314
DeltaResult,
@@ -151,6 +152,28 @@ pub(crate) fn stats_column_names(
151152
columns
152153
}
153154

155+
/// Creates a stats schema from a referenced schema (columns from predicate).
156+
/// Returns schema: `{ numRecords, nullCount, minValues, maxValues }`
157+
///
158+
/// This is used to build the schema for parsing JSON stats and for reading stats_parsed
159+
/// from checkpoints.
160+
pub(crate) fn build_stats_schema(referenced_schema: &StructType) -> Option<SchemaRef> {
161+
let stats_schema = NullableStatsTransform
162+
.transform_struct(referenced_schema)?
163+
.into_owned();
164+
165+
let nullcount_schema = NullCountStatsTransform
166+
.transform_struct(&stats_schema)?
167+
.into_owned();
168+
169+
Some(Arc::new(StructType::new_unchecked([
170+
StructField::nullable("numRecords", DataType::LONG),
171+
StructField::nullable("nullCount", nullcount_schema),
172+
StructField::nullable("minValues", stats_schema.clone()),
173+
StructField::nullable("maxValues", stats_schema),
174+
])))
175+
}
176+
154177
/// Transforms a schema to make all fields nullable.
155178
/// Used for stats schemas where stats may not be available for all columns.
156179
pub(crate) struct NullableStatsTransform;

0 commit comments

Comments
 (0)