@@ -707,6 +707,7 @@ where T: std::fmt::Debug {
707707
708708#[ cfg( test) ]
709709mod tests {
710+ use futures:: TryStreamExt ;
710711 use iceberg:: spec:: { NestedField , PrimitiveType , Schema , Type } ;
711712 use iceberg:: transaction:: { ApplyTransactionAction , Transaction } ;
712713
@@ -1175,4 +1176,97 @@ mod tests {
11751176 assert_eq ! ( err. message( ) , "Catalog name cannot be empty" ) ;
11761177 }
11771178 }
1179+
1180+ /// Verify that an S3 Table catalog can create a table, write data, load the same table, and read from it.
1181+ #[ tokio:: test]
1182+ async fn test_s3tables_create_table_write_load_table_read ( ) {
1183+ use iceberg:: writer:: base_writer:: data_file_writer:: DataFileWriterBuilder ;
1184+ use iceberg:: writer:: file_writer:: ParquetWriterBuilder ;
1185+ use iceberg:: writer:: file_writer:: location_generator:: {
1186+ DefaultFileNameGenerator , DefaultLocationGenerator ,
1187+ } ;
1188+ use iceberg:: writer:: file_writer:: rolling_writer:: RollingFileWriterBuilder ;
1189+ use iceberg:: writer:: { IcebergWriter , IcebergWriterBuilder } ;
1190+
1191+ let catalog = match load_s3tables_catalog_from_env ( ) . await {
1192+ Ok ( Some ( c) ) => c,
1193+ Ok ( None ) => return ,
1194+ Err ( e) => panic ! ( "Error loading catalog: {e}" ) ,
1195+ } ;
1196+
1197+ let ns = NamespaceIdent :: new ( format ! ( "test_rw_{}" , uuid:: Uuid :: new_v4( ) . simple( ) ) ) ;
1198+ catalog. create_namespace ( & ns, HashMap :: new ( ) ) . await . unwrap ( ) ;
1199+
1200+ let table_name = String :: from ( "table" ) ;
1201+
1202+ let schema = Schema :: builder ( )
1203+ . with_fields ( vec ! [
1204+ NestedField :: required( 1 , "id" , Type :: Primitive ( PrimitiveType :: Int ) ) . into( ) ,
1205+ ] )
1206+ . build ( )
1207+ . unwrap ( ) ;
1208+ let creation = TableCreation :: builder ( )
1209+ . name ( table_name. clone ( ) )
1210+ . schema ( schema)
1211+ . build ( ) ;
1212+
1213+ let table = catalog. create_table ( & ns, creation) . await . unwrap ( ) ;
1214+
1215+ // Write one row.
1216+ let arrow_schema: Arc < arrow_schema:: Schema > = Arc :: new (
1217+ table. metadata ( ) . current_schema ( ) . as_ref ( ) . try_into ( ) . unwrap ( ) ,
1218+ ) ;
1219+ let batch = arrow_array:: RecordBatch :: try_new (
1220+ arrow_schema,
1221+ vec ! [ Arc :: new( arrow_array:: Int32Array :: from( vec![ 42 ] ) ) ] ,
1222+ )
1223+ . unwrap ( ) ;
1224+
1225+ // Locations will be generated based on the table metadata, which will be using `s3://` for Amazon S3 Tables.
1226+ let location_generator = DefaultLocationGenerator :: new ( table. metadata ( ) . clone ( ) ) . unwrap ( ) ;
1227+ let file_name_generator = DefaultFileNameGenerator :: new (
1228+ "test" . to_string ( ) ,
1229+ None ,
1230+ iceberg:: spec:: DataFileFormat :: Parquet ,
1231+ ) ;
1232+ let parquet_writer_builder = ParquetWriterBuilder :: new (
1233+ parquet:: file:: properties:: WriterProperties :: default ( ) ,
1234+ table. metadata ( ) . current_schema ( ) . clone ( ) ,
1235+ ) ;
1236+ let rw = RollingFileWriterBuilder :: new_with_default_file_size (
1237+ parquet_writer_builder,
1238+ table. file_io ( ) . clone ( ) ,
1239+ location_generator,
1240+ file_name_generator,
1241+ ) ;
1242+ let mut writer = DataFileWriterBuilder :: new ( rw) . build ( None ) . await . unwrap ( ) ;
1243+ writer. write ( batch. clone ( ) ) . await . unwrap ( ) ;
1244+ let data_files = writer. close ( ) . await . unwrap ( ) ;
1245+
1246+ let tx = Transaction :: new ( & table) ;
1247+ let tx = tx. fast_append ( ) . add_data_files ( data_files) . apply ( tx) . unwrap ( ) ;
1248+ tx. commit ( & catalog) . await . unwrap ( ) ;
1249+
1250+ // Reload from catalog and read back.
1251+ let table_ident = TableIdent :: new ( ns. clone ( ) , table_name. clone ( ) ) ;
1252+ let reloaded = catalog. load_table ( & table_ident) . await . unwrap ( ) ;
1253+ let batches: Vec < arrow_array:: RecordBatch > = reloaded
1254+ . scan ( )
1255+ . select_all ( )
1256+ . build ( )
1257+ . expect ( "scan to be valid (snapshot exists, schema is OK)" )
1258+ . to_arrow ( )
1259+ . await
1260+ . expect ( "scan tasks should be OK" )
1261+ . try_collect ( )
1262+ . await
1263+ . expect ( "scan should complete successfully" ) ;
1264+
1265+ assert_eq ! ( batches. len( ) , 1 ) ;
1266+ assert_eq ! ( batches[ 0 ] , batch, "read records should match records written earlier" ) ;
1267+
1268+ // Clean up.
1269+ catalog. purge_table ( & table_ident) . await . ok ( ) ;
1270+ catalog. drop_namespace ( & ns) . await . ok ( ) ;
1271+ }
11781272}
0 commit comments