@@ -185,7 +185,14 @@ impl<'a> SnapshotProducer<'a> {
185185
186186 for delete_file in & self . added_delete_files {
187187 match delete_file. content_type ( ) {
188- DataContentType :: PositionDeletes => { }
188+ DataContentType :: PositionDeletes => {
189+ if delete_file. equality_ids ( ) . is_some ( ) {
190+ return Err ( Error :: new (
191+ ErrorKind :: DataInvalid ,
192+ "Position delete files must not have equality_ids set" ,
193+ ) ) ;
194+ }
195+ }
189196 DataContentType :: EqualityDeletes => {
190197 let ids = delete_file. equality_ids ( ) . ok_or_else ( || {
191198 Error :: new (
@@ -208,6 +215,9 @@ impl<'a> SnapshotProducer<'a> {
208215 }
209216 }
210217
218+ // TODO: This validation is too strict for partition evolution scenarios where delete
219+ // files may reference older partition specs. Once manifest-per-spec is implemented,
220+ // relax this to check that the spec_id exists rather than matching the default.
211221 if self . table . metadata ( ) . default_partition_spec_id ( ) != delete_file. partition_spec_id {
212222 return Err ( Error :: new (
213223 ErrorKind :: DataInvalid ,
@@ -261,6 +271,21 @@ impl<'a> SnapshotProducer<'a> {
261271 let new_data_files = seen_data_files;
262272 let new_delete_files = seen_delete_files;
263273
274+ // Check for cross-type duplicates: same path cannot appear in both data and delete files
275+ let cross_type_duplicates: Vec < _ > = new_data_files
276+ . intersection ( & new_delete_files)
277+ . map ( |s| s. to_string ( ) )
278+ . collect ( ) ;
279+ if !cross_type_duplicates. is_empty ( ) {
280+ return Err ( Error :: new (
281+ ErrorKind :: DataInvalid ,
282+ format ! (
283+ "Cannot add the same file path as both a data file and a delete file: {}" ,
284+ cross_type_duplicates. join( ", " )
285+ ) ,
286+ ) ) ;
287+ }
288+
264289 let mut duplicate_data_files = Vec :: new ( ) ;
265290 let mut duplicate_delete_files = Vec :: new ( ) ;
266291
@@ -436,17 +461,11 @@ impl<'a> SnapshotProducer<'a> {
436461 ) ) ;
437462 }
438463
439- let snapshot_id = self . snapshot_id ;
440- let format_version = self . table . metadata ( ) . format_version ( ) ;
441464 let manifest_entries = added_delete_files. into_iter ( ) . map ( |delete_file| {
442- let builder = ManifestEntry :: builder ( )
465+ ManifestEntry :: builder ( )
443466 . status ( crate :: spec:: ManifestStatus :: Added )
444- . data_file ( delete_file) ;
445- if format_version == FormatVersion :: V1 {
446- builder. snapshot_id ( snapshot_id) . build ( )
447- } else {
448- builder. build ( )
449- }
467+ . data_file ( delete_file)
468+ . build ( )
450469 } ) ;
451470 let mut writer = self . new_manifest_writer ( ManifestContentType :: Deletes ) ?;
452471 for entry in manifest_entries {
@@ -677,7 +696,10 @@ mod tests {
677696 . unwrap ( )
678697 }
679698
680- fn make_equality_delete_file_with_ids ( spec_id : i32 , equality_ids : Option < Vec < i32 > > ) -> DataFile {
699+ fn make_equality_delete_file_with_ids (
700+ spec_id : i32 ,
701+ equality_ids : Option < Vec < i32 > > ,
702+ ) -> DataFile {
681703 DataFileBuilder :: default ( )
682704 . content ( DataContentType :: EqualityDeletes )
683705 . file_path ( "test/eq-delete-1.parquet" . to_string ( ) )
@@ -710,21 +732,18 @@ mod tests {
710732 let spec_id = table. metadata ( ) . default_partition_spec_id ( ) ;
711733 let delete_file = make_position_delete_file ( spec_id) ;
712734
713- let producer = SnapshotProducer :: new (
714- & table,
715- Uuid :: now_v7 ( ) ,
716- None ,
717- HashMap :: new ( ) ,
718- vec ! [ ] ,
719- vec ! [ delete_file] ,
720- ) ;
735+ let producer =
736+ SnapshotProducer :: new ( & table, Uuid :: now_v7 ( ) , None , HashMap :: new ( ) , vec ! [ ] , vec ! [
737+ delete_file,
738+ ] ) ;
721739
722740 let result = producer. validate_added_delete_files ( ) ;
723741 assert ! ( result. is_err( ) ) ;
724742 let err = result. unwrap_err ( ) ;
725- assert ! ( err
726- . message( )
727- . contains( "Delete files are not supported in format version 1" ) ) ;
743+ assert ! (
744+ err. message( )
745+ . contains( "Delete files are not supported in format version 1" )
746+ ) ;
728747 }
729748
730749 #[ test]
@@ -733,21 +752,18 @@ mod tests {
733752 let spec_id = table. metadata ( ) . default_partition_spec_id ( ) ;
734753 let delete_file = make_equality_delete_file_with_ids ( spec_id, None ) ;
735754
736- let producer = SnapshotProducer :: new (
737- & table,
738- Uuid :: now_v7 ( ) ,
739- None ,
740- HashMap :: new ( ) ,
741- vec ! [ ] ,
742- vec ! [ delete_file] ,
743- ) ;
755+ let producer =
756+ SnapshotProducer :: new ( & table, Uuid :: now_v7 ( ) , None , HashMap :: new ( ) , vec ! [ ] , vec ! [
757+ delete_file,
758+ ] ) ;
744759
745760 let result = producer. validate_added_delete_files ( ) ;
746761 assert ! ( result. is_err( ) ) ;
747762 let err = result. unwrap_err ( ) ;
748- assert ! ( err
749- . message( )
750- . contains( "Equality delete files must have equality_ids set" ) ) ;
763+ assert ! (
764+ err. message( )
765+ . contains( "Equality delete files must have equality_ids set" )
766+ ) ;
751767 }
752768
753769 #[ test]
@@ -756,21 +772,18 @@ mod tests {
756772 let spec_id = table. metadata ( ) . default_partition_spec_id ( ) ;
757773 let delete_file = make_equality_delete_file_with_ids ( spec_id, Some ( vec ! [ ] ) ) ;
758774
759- let producer = SnapshotProducer :: new (
760- & table,
761- Uuid :: now_v7 ( ) ,
762- None ,
763- HashMap :: new ( ) ,
764- vec ! [ ] ,
765- vec ! [ delete_file] ,
766- ) ;
775+ let producer =
776+ SnapshotProducer :: new ( & table, Uuid :: now_v7 ( ) , None , HashMap :: new ( ) , vec ! [ ] , vec ! [
777+ delete_file,
778+ ] ) ;
767779
768780 let result = producer. validate_added_delete_files ( ) ;
769781 assert ! ( result. is_err( ) ) ;
770782 let err = result. unwrap_err ( ) ;
771- assert ! ( err
772- . message( )
773- . contains( "Equality delete files must have equality_ids set" ) ) ;
783+ assert ! (
784+ err. message( )
785+ . contains( "Equality delete files must have equality_ids set" )
786+ ) ;
774787 }
775788
776789 #[ test]
@@ -779,21 +792,18 @@ mod tests {
779792 let spec_id = table. metadata ( ) . default_partition_spec_id ( ) ;
780793 let data_file = make_data_file_as_delete ( spec_id) ;
781794
782- let producer = SnapshotProducer :: new (
783- & table,
784- Uuid :: now_v7 ( ) ,
785- None ,
786- HashMap :: new ( ) ,
787- vec ! [ ] ,
788- vec ! [ data_file] ,
789- ) ;
795+ let producer =
796+ SnapshotProducer :: new ( & table, Uuid :: now_v7 ( ) , None , HashMap :: new ( ) , vec ! [ ] , vec ! [
797+ data_file,
798+ ] ) ;
790799
791800 let result = producer. validate_added_delete_files ( ) ;
792801 assert ! ( result. is_err( ) ) ;
793802 let err = result. unwrap_err ( ) ;
794- assert ! ( err
795- . message( )
796- . contains( "Data content type is not allowed for delete files" ) ) ;
803+ assert ! (
804+ err. message( )
805+ . contains( "Data content type is not allowed for delete files" )
806+ ) ;
797807 }
798808
799809 #[ test]
@@ -802,14 +812,10 @@ mod tests {
802812 let spec_id = table. metadata ( ) . default_partition_spec_id ( ) ;
803813 let delete_file = make_position_delete_file ( spec_id) ;
804814
805- let producer = SnapshotProducer :: new (
806- & table,
807- Uuid :: now_v7 ( ) ,
808- None ,
809- HashMap :: new ( ) ,
810- vec ! [ ] ,
811- vec ! [ delete_file] ,
812- ) ;
815+ let producer =
816+ SnapshotProducer :: new ( & table, Uuid :: now_v7 ( ) , None , HashMap :: new ( ) , vec ! [ ] , vec ! [
817+ delete_file,
818+ ] ) ;
813819
814820 let result = producer. validate_added_delete_files ( ) ;
815821 assert ! ( result. is_ok( ) ) ;
@@ -821,37 +827,110 @@ mod tests {
821827 let spec_id = table. metadata ( ) . default_partition_spec_id ( ) ;
822828 let delete_file = make_equality_delete_file_with_ids ( spec_id, Some ( vec ! [ 1 ] ) ) ;
823829
824- let producer = SnapshotProducer :: new (
825- & table,
826- Uuid :: now_v7 ( ) ,
827- None ,
828- HashMap :: new ( ) ,
829- vec ! [ ] ,
830- vec ! [ delete_file] ,
831- ) ;
830+ let producer =
831+ SnapshotProducer :: new ( & table, Uuid :: now_v7 ( ) , None , HashMap :: new ( ) , vec ! [ ] , vec ! [
832+ delete_file,
833+ ] ) ;
832834
833835 let result = producer. validate_added_delete_files ( ) ;
834836 assert ! ( result. is_ok( ) ) ;
835837 }
836838
837839 #[ test]
838- fn test_empty_delete_files_returns_error ( ) {
840+ fn test_write_delete_manifest_precondition_empty_files ( ) {
839841 let table = make_v2_minimal_table ( ) ;
840842
841- let mut producer = SnapshotProducer :: new (
843+ let mut producer =
844+ SnapshotProducer :: new ( & table, Uuid :: now_v7 ( ) , None , HashMap :: new ( ) , vec ! [ ] , vec ! [ ] ) ;
845+
846+ let result = futures:: executor:: block_on ( producer. write_delete_manifest ( ) ) ;
847+ assert ! ( result. is_err( ) ) ;
848+ let err = result. unwrap_err ( ) ;
849+ assert ! (
850+ err. message( )
851+ . contains( "No added delete files found when write a delete manifest file" )
852+ ) ;
853+ }
854+
855+ fn make_data_file_with_path ( spec_id : i32 , path : & str ) -> DataFile {
856+ DataFileBuilder :: default ( )
857+ . content ( DataContentType :: Data )
858+ . file_path ( path. to_string ( ) )
859+ . file_format ( DataFileFormat :: Parquet )
860+ . file_size_in_bytes ( 100 )
861+ . record_count ( 10 )
862+ . partition_spec_id ( spec_id)
863+ . partition ( Struct :: from_iter ( [ Some ( Literal :: long ( 300 ) ) ] ) )
864+ . build ( )
865+ . unwrap ( )
866+ }
867+
868+ fn make_position_delete_file_with_path ( spec_id : i32 , path : & str ) -> DataFile {
869+ DataFileBuilder :: default ( )
870+ . content ( DataContentType :: PositionDeletes )
871+ . file_path ( path. to_string ( ) )
872+ . file_format ( DataFileFormat :: Parquet )
873+ . file_size_in_bytes ( 100 )
874+ . record_count ( 10 )
875+ . partition_spec_id ( spec_id)
876+ . partition ( Struct :: from_iter ( [ Some ( Literal :: long ( 300 ) ) ] ) )
877+ . build ( )
878+ . unwrap ( )
879+ }
880+
881+ #[ test]
882+ fn test_validate_cross_type_duplicate_files_rejected ( ) {
883+ let table = make_v2_minimal_table ( ) ;
884+ let spec_id = table. metadata ( ) . default_partition_spec_id ( ) ;
885+ let shared_path = "test/shared-file.parquet" ;
886+ let data_file = make_data_file_with_path ( spec_id, shared_path) ;
887+ let delete_file = make_position_delete_file_with_path ( spec_id, shared_path) ;
888+
889+ let producer = SnapshotProducer :: new (
842890 & table,
843891 Uuid :: now_v7 ( ) ,
844892 None ,
845893 HashMap :: new ( ) ,
846- vec ! [ ] ,
847- vec ! [ ] ,
894+ vec ! [ data_file ] ,
895+ vec ! [ delete_file ] ,
848896 ) ;
849897
850- let result = futures:: executor:: block_on ( producer. write_delete_manifest ( ) ) ;
898+ let result = futures:: executor:: block_on ( producer. validate_duplicate_files ( ) ) ;
899+ assert ! ( result. is_err( ) ) ;
900+ let err = result. unwrap_err ( ) ;
901+ assert ! (
902+ err. message( )
903+ . contains( "Cannot add the same file path as both a data file and a delete file" )
904+ ) ;
905+ }
906+
907+ #[ test]
908+ fn test_validate_position_delete_rejects_equality_ids ( ) {
909+ let table = make_v2_minimal_table ( ) ;
910+ let spec_id = table. metadata ( ) . default_partition_spec_id ( ) ;
911+ let delete_file = DataFileBuilder :: default ( )
912+ . content ( DataContentType :: PositionDeletes )
913+ . file_path ( "test/pos-delete-with-eq-ids.parquet" . to_string ( ) )
914+ . file_format ( DataFileFormat :: Parquet )
915+ . file_size_in_bytes ( 100 )
916+ . record_count ( 10 )
917+ . partition_spec_id ( spec_id)
918+ . partition ( Struct :: from_iter ( [ Some ( Literal :: long ( 300 ) ) ] ) )
919+ . equality_ids ( Some ( vec ! [ 1 , 2 ] ) )
920+ . build ( )
921+ . unwrap ( ) ;
922+
923+ let producer =
924+ SnapshotProducer :: new ( & table, Uuid :: now_v7 ( ) , None , HashMap :: new ( ) , vec ! [ ] , vec ! [
925+ delete_file,
926+ ] ) ;
927+
928+ let result = producer. validate_added_delete_files ( ) ;
851929 assert ! ( result. is_err( ) ) ;
852930 let err = result. unwrap_err ( ) ;
853- assert ! ( err
854- . message( )
855- . contains( "No added delete files found when write a delete manifest file" ) ) ;
931+ assert ! (
932+ err. message( )
933+ . contains( "Position delete files must not have equality_ids set" )
934+ ) ;
856935 }
857936}
0 commit comments