@@ -951,145 +951,6 @@ impl Operation {
951951 }
952952}
953953
954- #[ cfg( test) ]
955- mod tests {
956- use super :: * ;
957- use futures:: executor:: block_on;
958- use iceberg_rust_spec:: spec:: schema:: SchemaBuilder ;
959- use iceberg_rust_spec:: spec:: table_metadata:: TableMetadataBuilder ;
960- use iceberg_rust_spec:: spec:: types:: { PrimitiveType , StructField , Type } ;
961- use object_store:: memory:: InMemory ;
962-
963- fn sample_metadata (
964- snapshot_defs : & [ ( i64 , i64 ) ] ,
965- current_snapshot : Option < i64 > ,
966- refs : & [ ( & str , i64 ) ] ,
967- ) -> TableMetadata {
968- let schema = SchemaBuilder :: default ( )
969- . with_schema_id ( 0 )
970- . with_struct_field ( StructField {
971- id : 1 ,
972- name : "id" . to_string ( ) ,
973- required : true ,
974- field_type : Type :: Primitive ( PrimitiveType :: Long ) ,
975- doc : None ,
976- } )
977- . build ( )
978- . unwrap ( ) ;
979-
980- let snapshots = snapshot_defs
981- . iter ( )
982- . enumerate ( )
983- . map ( |( idx, ( snapshot_id, timestamp) ) | {
984- let snapshot = SnapshotBuilder :: default ( )
985- . with_snapshot_id ( * snapshot_id)
986- . with_sequence_number ( ( idx + 1 ) as i64 )
987- . with_timestamp_ms ( * timestamp)
988- . with_manifest_list ( format ! ( "manifest-{snapshot_id}.avro" ) )
989- . with_summary ( Summary {
990- operation : SnapshotOperation :: Append ,
991- other : HashMap :: new ( ) ,
992- } )
993- . with_schema_id ( 0 )
994- . build ( )
995- . unwrap ( ) ;
996- ( * snapshot_id, snapshot)
997- } )
998- . collect :: < HashMap < _ , _ > > ( ) ;
999-
1000- let refs = refs
1001- . iter ( )
1002- . map ( |( name, snapshot_id) | {
1003- (
1004- ( * name) . to_string ( ) ,
1005- SnapshotReference {
1006- snapshot_id : * snapshot_id,
1007- retention : SnapshotRetention :: default ( ) ,
1008- } ,
1009- )
1010- } )
1011- . collect :: < HashMap < _ , _ > > ( ) ;
1012-
1013- TableMetadataBuilder :: default ( )
1014- . location ( "s3://tests/table" . to_owned ( ) )
1015- . current_schema_id ( 0 )
1016- . schemas ( HashMap :: from_iter ( vec ! [ ( 0 , schema) ] ) )
1017- . snapshots ( snapshots)
1018- . current_snapshot_id ( current_snapshot)
1019- . last_sequence_number ( snapshot_defs. len ( ) as i64 )
1020- . refs ( refs)
1021- . build ( )
1022- . unwrap ( )
1023- }
1024-
1025- fn execute_operation (
1026- metadata : & TableMetadata ,
1027- older_than : Option < i64 > ,
1028- retain_last : Option < usize > ,
1029- retain_refs : bool ,
1030- dry_run : bool ,
1031- ) -> Result < Vec < TableUpdate > , Error > {
1032- let op = Operation :: ExpireSnapshots {
1033- older_than,
1034- retain_last,
1035- _clean_orphan_files : false ,
1036- retain_ref_snapshots : retain_refs,
1037- dry_run,
1038- } ;
1039- let store = Arc :: new ( InMemory :: new ( ) ) ;
1040- block_on ( op. execute ( metadata, store) ) . map ( |( _, updates) | updates)
1041- }
1042-
1043- fn collect_snapshot_ids ( updates : & [ TableUpdate ] ) -> Vec < i64 > {
1044- updates
1045- . iter ( )
1046- . flat_map ( |update| match update {
1047- TableUpdate :: RemoveSnapshots { snapshot_ids } => snapshot_ids. clone ( ) ,
1048- _ => Vec :: new ( ) ,
1049- } )
1050- . collect ( )
1051- }
1052-
1053- #[ test]
1054- fn snapshot_expiration_requires_policy ( ) {
1055- let metadata = sample_metadata ( & [ ( 1 , 1_000 ) ] , Some ( 1 ) , & [ ] ) ;
1056- let result = execute_operation ( & metadata, None , None , true , false ) ;
1057- assert ! ( matches!( result, Err ( Error :: InvalidFormat ( _) ) ) ) ;
1058- }
1059-
1060- #[ test]
1061- fn snapshot_expiration_applies_time_and_count_filters ( ) {
1062- let metadata = sample_metadata (
1063- & [ ( 1 , 1_000 ) , ( 2 , 2_000 ) , ( 3 , 3_000 ) , ( 4 , 4_000 ) ] ,
1064- Some ( 4 ) ,
1065- & [ ] ,
1066- ) ;
1067- let updates = execute_operation ( & metadata, Some ( 2_500 ) , Some ( 2 ) , true , false ) . unwrap ( ) ;
1068- let mut expired = collect_snapshot_ids ( & updates) ;
1069- expired. sort ( ) ;
1070- assert_eq ! ( expired, vec![ 1 , 2 ] ) ;
1071- }
1072-
1073- #[ test]
1074- fn snapshot_expiration_preserves_current_and_refs ( ) {
1075- let metadata = sample_metadata (
1076- & [ ( 10 , 1_000 ) , ( 20 , 2_000 ) , ( 30 , 3_000 ) ] ,
1077- Some ( 30 ) ,
1078- & [ ( "branch" , 20 ) ] ,
1079- ) ;
1080- let updates = execute_operation ( & metadata, Some ( 1_500 ) , None , true , false ) . unwrap ( ) ;
1081- // Snapshot 10 is the only candidate because 20 is referenced and 30 is current.
1082- assert_eq ! ( collect_snapshot_ids( & updates) , vec![ 10 ] ) ;
1083- }
1084-
1085- #[ test]
1086- fn snapshot_expiration_supports_dry_run ( ) {
1087- let metadata = sample_metadata ( & [ ( 1 , 1_000 ) , ( 2 , 900 ) ] , Some ( 1 ) , & [ ] ) ;
1088- let updates = execute_operation ( & metadata, Some ( 950 ) , None , true , true ) . unwrap ( ) ;
1089- assert ! ( updates. is_empty( ) ) ;
1090- }
1091- }
1092-
1093954pub fn bounding_partition_values < ' a > (
1094955 mut iter : impl Iterator < Item = & ' a DataFile > ,
1095956 partition_column_names : & SmallVec < [ & str ; 4 ] > ,
@@ -1309,3 +1170,142 @@ pub fn compute_n_splits(
13091170 x => x. ilog2 ( ) + 1 ,
13101171 }
13111172}
1173+
1174+ #[ cfg( test) ]
1175+ mod tests {
1176+ use super :: * ;
1177+ use futures:: executor:: block_on;
1178+ use iceberg_rust_spec:: spec:: schema:: SchemaBuilder ;
1179+ use iceberg_rust_spec:: spec:: table_metadata:: TableMetadataBuilder ;
1180+ use iceberg_rust_spec:: spec:: types:: { PrimitiveType , StructField , Type } ;
1181+ use object_store:: memory:: InMemory ;
1182+
1183+ fn sample_metadata (
1184+ snapshot_defs : & [ ( i64 , i64 ) ] ,
1185+ current_snapshot : Option < i64 > ,
1186+ refs : & [ ( & str , i64 ) ] ,
1187+ ) -> TableMetadata {
1188+ let schema = SchemaBuilder :: default ( )
1189+ . with_schema_id ( 0 )
1190+ . with_struct_field ( StructField {
1191+ id : 1 ,
1192+ name : "id" . to_string ( ) ,
1193+ required : true ,
1194+ field_type : Type :: Primitive ( PrimitiveType :: Long ) ,
1195+ doc : None ,
1196+ } )
1197+ . build ( )
1198+ . unwrap ( ) ;
1199+
1200+ let snapshots = snapshot_defs
1201+ . iter ( )
1202+ . enumerate ( )
1203+ . map ( |( idx, ( snapshot_id, timestamp) ) | {
1204+ let snapshot = SnapshotBuilder :: default ( )
1205+ . with_snapshot_id ( * snapshot_id)
1206+ . with_sequence_number ( ( idx + 1 ) as i64 )
1207+ . with_timestamp_ms ( * timestamp)
1208+ . with_manifest_list ( format ! ( "manifest-{snapshot_id}.avro" ) )
1209+ . with_summary ( Summary {
1210+ operation : SnapshotOperation :: Append ,
1211+ other : HashMap :: new ( ) ,
1212+ } )
1213+ . with_schema_id ( 0 )
1214+ . build ( )
1215+ . unwrap ( ) ;
1216+ ( * snapshot_id, snapshot)
1217+ } )
1218+ . collect :: < HashMap < _ , _ > > ( ) ;
1219+
1220+ let refs = refs
1221+ . iter ( )
1222+ . map ( |( name, snapshot_id) | {
1223+ (
1224+ ( * name) . to_string ( ) ,
1225+ SnapshotReference {
1226+ snapshot_id : * snapshot_id,
1227+ retention : SnapshotRetention :: default ( ) ,
1228+ } ,
1229+ )
1230+ } )
1231+ . collect :: < HashMap < _ , _ > > ( ) ;
1232+
1233+ TableMetadataBuilder :: default ( )
1234+ . location ( "s3://tests/table" . to_owned ( ) )
1235+ . current_schema_id ( 0 )
1236+ . schemas ( HashMap :: from_iter ( vec ! [ ( 0 , schema) ] ) )
1237+ . snapshots ( snapshots)
1238+ . current_snapshot_id ( current_snapshot)
1239+ . last_sequence_number ( snapshot_defs. len ( ) as i64 )
1240+ . refs ( refs)
1241+ . build ( )
1242+ . unwrap ( )
1243+ }
1244+
1245+ fn execute_operation (
1246+ metadata : & TableMetadata ,
1247+ older_than : Option < i64 > ,
1248+ retain_last : Option < usize > ,
1249+ retain_refs : bool ,
1250+ dry_run : bool ,
1251+ ) -> Result < Vec < TableUpdate > , Error > {
1252+ let op = Operation :: ExpireSnapshots {
1253+ older_than,
1254+ retain_last,
1255+ _clean_orphan_files : false ,
1256+ retain_ref_snapshots : retain_refs,
1257+ dry_run,
1258+ } ;
1259+ let store = Arc :: new ( InMemory :: new ( ) ) ;
1260+ block_on ( op. execute ( metadata, store) ) . map ( |( _, updates) | updates)
1261+ }
1262+
1263+ fn collect_snapshot_ids ( updates : & [ TableUpdate ] ) -> Vec < i64 > {
1264+ updates
1265+ . iter ( )
1266+ . flat_map ( |update| match update {
1267+ TableUpdate :: RemoveSnapshots { snapshot_ids } => snapshot_ids. clone ( ) ,
1268+ _ => Vec :: new ( ) ,
1269+ } )
1270+ . collect ( )
1271+ }
1272+
1273+ #[ test]
1274+ fn snapshot_expiration_requires_policy ( ) {
1275+ let metadata = sample_metadata ( & [ ( 1 , 1_000 ) ] , Some ( 1 ) , & [ ] ) ;
1276+ let result = execute_operation ( & metadata, None , None , true , false ) ;
1277+ assert ! ( matches!( result, Err ( Error :: InvalidFormat ( _) ) ) ) ;
1278+ }
1279+
1280+ #[ test]
1281+ fn snapshot_expiration_applies_time_and_count_filters ( ) {
1282+ let metadata = sample_metadata (
1283+ & [ ( 1 , 1_000 ) , ( 2 , 2_000 ) , ( 3 , 3_000 ) , ( 4 , 4_000 ) ] ,
1284+ Some ( 4 ) ,
1285+ & [ ] ,
1286+ ) ;
1287+ let updates = execute_operation ( & metadata, Some ( 2_500 ) , Some ( 2 ) , true , false ) . unwrap ( ) ;
1288+ let mut expired = collect_snapshot_ids ( & updates) ;
1289+ expired. sort ( ) ;
1290+ assert_eq ! ( expired, vec![ 1 , 2 ] ) ;
1291+ }
1292+
1293+ #[ test]
1294+ fn snapshot_expiration_preserves_current_and_refs ( ) {
1295+ let metadata = sample_metadata (
1296+ & [ ( 10 , 1_000 ) , ( 20 , 2_000 ) , ( 30 , 3_000 ) ] ,
1297+ Some ( 30 ) ,
1298+ & [ ( "branch" , 20 ) ] ,
1299+ ) ;
1300+ let updates = execute_operation ( & metadata, Some ( 1_500 ) , None , true , false ) . unwrap ( ) ;
1301+ // Snapshot 10 is the only candidate because 20 is referenced and 30 is current.
1302+ assert_eq ! ( collect_snapshot_ids( & updates) , vec![ 10 ] ) ;
1303+ }
1304+
1305+ #[ test]
1306+ fn snapshot_expiration_supports_dry_run ( ) {
1307+ let metadata = sample_metadata ( & [ ( 1 , 1_000 ) , ( 2 , 900 ) ] , Some ( 1 ) , & [ ] ) ;
1308+ let updates = execute_operation ( & metadata, Some ( 950 ) , None , true , true ) . unwrap ( ) ;
1309+ assert ! ( updates. is_empty( ) ) ;
1310+ }
1311+ }
0 commit comments