@@ -230,38 +230,42 @@ impl ParquetMergePolicy for ConstWriteAmplificationParquetMergePolicy {
230230 }
231231
232232 // Separate mature splits — don't touch them.
233+ let mut group_by_level: HashMap < u32 , Vec < ParquetSplitMetadata > > = HashMap :: new ( ) ;
233234 let mut mature_splits = Vec :: new ( ) ;
234- let mut young_splits = Vec :: new ( ) ;
235235 for split in splits. drain ( ..) {
236236 if self . is_split_mature ( & split) {
237237 mature_splits. push ( split) ;
238238 } else {
239- young_splits. push ( split) ;
239+ group_by_level
240+ . entry ( split. num_merge_ops )
241+ . or_default ( )
242+ . push ( split) ;
240243 }
241244 }
242245 splits. extend ( mature_splits) ;
243246
244- // Sort youngest/smallest first. If we limit the number of finalize
245- // merges, we focus on the young/small ones for maximum compaction.
246- sort_splits_newest_first ( & mut young_splits) ;
247-
248247 let min_merge_factor = FINALIZE_MIN_MERGE_FACTOR . min ( self . config . max_merge_factor ) ;
249248 let merge_factor_range = min_merge_factor..=self . config . max_merge_factor ;
250249
250+ // Within each level, sort youngest/smallest first. If we limit the
251+ // number of finalize merges, we focus on the young/small ones for
252+ // maximum compaction.
251253 let mut merge_operations = Vec :: new ( ) ;
252- while merge_operations. len ( ) < self . config . max_finalize_merge_operations {
253- if let Some ( op) =
254- self . single_merge_operation ( & mut young_splits, merge_factor_range. clone ( ) )
255- {
256- merge_operations. push ( op) ;
257- } else {
258- break ;
254+ for level_splits in group_by_level. values_mut ( ) {
255+ sort_splits_newest_first ( level_splits) ;
256+ while merge_operations. len ( ) < self . config . max_finalize_merge_operations {
257+ if let Some ( op) =
258+ self . single_merge_operation ( level_splits, merge_factor_range. clone ( ) )
259+ {
260+ merge_operations. push ( op) ;
261+ } else {
262+ break ;
263+ }
259264 }
265+ // Un-merged splits at this level go back.
266+ splits. append ( level_splits) ;
260267 }
261268
262- // Un-merged young splits go back.
263- splits. extend ( young_splits) ;
264-
265269 let num_splits_per_op: Vec < usize > =
266270 merge_operations. iter ( ) . map ( |op| op. splits . len ( ) ) . collect ( ) ;
267271 let bytes_per_op: Vec < u64 > = merge_operations
@@ -695,6 +699,34 @@ mod tests {
695699 assert ! ( splits. iter( ) . all( |s| s. num_merge_ops >= 3 ) ) ;
696700 }
697701
702+ #[ test]
703+ fn test_finalize_respects_mc_level_invariant ( ) {
704+ // Bug: finalize_operations() did not group by num_merge_ops level,
705+ // so splits from different levels could be merged together. This
706+ // violates MC-LEVEL and causes the merged output to be stamped with
707+ // max(num_merge_ops) + 1, prematurely maturing lower-level data.
708+ let policy = test_policy ( ) ; // merge_factor=3, max_merge_ops=3
709+ let mut splits = vec ! [
710+ // Two level-0 splits
711+ make_split( "l0_a" , 1_000_000 , 0 , now( ) ) ,
712+ make_split( "l0_b" , 1_000_000 , 0 , now( ) ) ,
713+ // One level-1 split
714+ make_split( "l1_a" , 1_000_000 , 1 , now( ) ) ,
715+ ] ;
716+ let ops = policy. finalize_operations ( & mut splits) ;
717+
718+ // MC-LEVEL: every operation must contain splits from exactly one level.
719+ for op in & ops {
720+ let levels: HashSet < u32 > = op. splits . iter ( ) . map ( |s| s. num_merge_ops ) . collect ( ) ;
721+ assert_eq ! (
722+ levels. len( ) ,
723+ 1 ,
724+ "finalize produced a merge mixing levels: {:?}" ,
725+ levels
726+ ) ;
727+ }
728+ }
729+
698730 // ── Property Tests ──────────────────────────────────────────────
699731
700732 prop_compose ! {
@@ -823,6 +855,60 @@ mod tests {
823855 }
824856 }
825857
858+ proptest ! {
859+ #[ test]
860+ fn proptest_finalize_respects_mc_level(
861+ mut splits in prop:: collection:: vec( parquet_split_strategy( ) , 0 ..80 )
862+ ) {
863+ let policy = test_policy( ) ;
864+ let original_count = splits. len( ) ;
865+ let original_total_bytes: u64 = splits. iter( ) . map( |s| s. size_bytes) . sum( ) ;
866+
867+ let ops = policy. finalize_operations( & mut splits) ;
868+
869+ // MC-CONSERVE for finalize.
870+ let ops_bytes: u64 = ops. iter( )
871+ . flat_map( |op| op. splits. iter( ) )
872+ . map( |s| s. size_bytes)
873+ . sum( ) ;
874+ let remaining_bytes: u64 = splits. iter( ) . map( |s| s. size_bytes) . sum( ) ;
875+ prop_assert_eq!(
876+ ops_bytes + remaining_bytes,
877+ original_total_bytes,
878+ "finalize byte conservation violated"
879+ ) ;
880+
881+ let ops_count: usize = ops. iter( ) . map( |op| op. splits. len( ) ) . sum( ) ;
882+ prop_assert_eq!(
883+ ops_count + splits. len( ) ,
884+ original_count,
885+ "finalize split count conservation violated"
886+ ) ;
887+
888+ for op in & ops {
889+ prop_assert!(
890+ op. splits. len( ) >= 2 ,
891+ "finalize merge op must have >= 2 splits"
892+ ) ;
893+
894+ // MC-LEVEL: all splits in op have same num_merge_ops.
895+ let levels: HashSet <u32 > = op. splits. iter( ) . map( |s| s. num_merge_ops) . collect( ) ;
896+ prop_assert_eq!(
897+ levels. len( ) , 1 ,
898+ "finalize mixed levels: {:?}" , levels
899+ ) ;
900+
901+ // MC-WA: no mature splits.
902+ for split in & op. splits {
903+ prop_assert!(
904+ !policy. is_split_mature( split) ,
905+ "mature split in finalize merge"
906+ ) ;
907+ }
908+ }
909+ }
910+ }
911+
826912 // ── Simulation Test ─────────────────────────────────────────────
827913
828914 #[ test]
0 commit comments