@@ -460,8 +460,11 @@ impl MultiPartitionShuffleRepartitioner {
460460 let row = row_idx as usize ;
461461 let src_offset = row * byte_width;
462462 let is_valid = nulls. is_none_or ( |n| n. is_valid ( row) ) ;
463- self . partition_buffers [ p]
464- . append_fixed ( col_idx, & values[ src_offset..src_offset + byte_width] , is_valid) ;
463+ self . partition_buffers [ p] . append_fixed (
464+ col_idx,
465+ & values[ src_offset..src_offset + byte_width] ,
466+ is_valid,
467+ ) ;
465468 }
466469 }
467470 } else if is_variable {
@@ -480,8 +483,11 @@ impl MultiPartitionShuffleRepartitioner {
480483 let val_start = offsets_slice[ row] as usize ;
481484 let val_end = offsets_slice[ row + 1 ] as usize ;
482485 let is_valid = nulls. is_none_or ( |n| n. is_valid ( row) ) ;
483- self . partition_buffers [ p]
484- . append_variable ( col_idx, & values_slice[ val_start..val_end] , is_valid) ;
486+ self . partition_buffers [ p] . append_variable (
487+ col_idx,
488+ & values_slice[ val_start..val_end] ,
489+ is_valid,
490+ ) ;
485491 }
486492 }
487493 } else if is_large_variable {
@@ -500,8 +506,11 @@ impl MultiPartitionShuffleRepartitioner {
500506 let val_start = offsets_slice[ row] as usize ;
501507 let val_end = offsets_slice[ row + 1 ] as usize ;
502508 let is_valid = nulls. is_none_or ( |n| n. is_valid ( row) ) ;
503- self . partition_buffers [ p]
504- . append_large_variable ( col_idx, & values_slice[ val_start..val_end] , is_valid) ;
509+ self . partition_buffers [ p] . append_large_variable (
510+ col_idx,
511+ & values_slice[ val_start..val_end] ,
512+ is_valid,
513+ ) ;
505514 }
506515 }
507516 } else if is_boolean {
@@ -516,8 +525,11 @@ impl MultiPartitionShuffleRepartitioner {
516525 for & row_idx in row_indices {
517526 let row = row_idx as usize ;
518527 let is_valid = nulls. is_none_or ( |n| n. is_valid ( row) ) ;
519- self . partition_buffers [ p]
520- . append_bool ( col_idx, bool_array. value ( row) , is_valid) ;
528+ self . partition_buffers [ p] . append_bool (
529+ col_idx,
530+ bool_array. value ( row) ,
531+ is_valid,
532+ ) ;
521533 }
522534 }
523535 } else {
@@ -530,14 +542,15 @@ impl MultiPartitionShuffleRepartitioner {
530542 }
531543 let row_indices = & partition_row_indices[ start..end] ;
532544 for & row_idx in row_indices {
533- self . partition_buffers [ p]
534- . append_fallback_index ( col_idx, row_idx) ;
545+ self . partition_buffers [ p] . append_fallback_index ( col_idx, row_idx) ;
535546 }
536547 }
537548 }
538549 }
539550
540- self . metrics . scatter_time . add_duration ( scatter_start. elapsed ( ) ) ;
551+ self . metrics
552+ . scatter_time
553+ . add_duration ( scatter_start. elapsed ( ) ) ;
541554
542555 // Update row counts from partition_starts (O(num_partitions), not O(num_rows))
543556 for p in 0 ..num_partitions {
@@ -705,10 +718,7 @@ impl ShufflePartitioner for MultiPartitionShuffleRepartitioner {
705718 & self . metrics . encode_time ,
706719 & self . metrics . write_time ,
707720 ) ?;
708- buf_batch_writer. flush (
709- & self . metrics . encode_time ,
710- & self . metrics . write_time ,
711- ) ?;
721+ buf_batch_writer. flush ( & self . metrics . encode_time , & self . metrics . write_time ) ?;
712722 }
713723 }
714724
0 commit comments