diff --git a/Cargo.lock b/Cargo.lock index 5110d5a480..4d1dc28b76 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -3468,14 +3468,19 @@ name = "iceberg-catalog-s3tables" version = "0.9.0" dependencies = [ "anyhow", + "arrow-array", + "arrow-schema", "async-trait", "aws-config", "aws-sdk-s3tables", + "futures", "iceberg", "iceberg-storage-opendal", "iceberg_test_utils", "itertools 0.13.0", + "parquet", "tokio", + "uuid", ] [[package]] diff --git a/crates/catalog/s3tables/Cargo.toml b/crates/catalog/s3tables/Cargo.toml index 2fe096fec9..dc7be3027f 100644 --- a/crates/catalog/s3tables/Cargo.toml +++ b/crates/catalog/s3tables/Cargo.toml @@ -39,6 +39,11 @@ iceberg-storage-opendal = { workspace = true, features = ["opendal-s3"] } [dev-dependencies] +arrow-array = { workspace = true } +arrow-schema = { workspace = true } +futures = { workspace = true } iceberg_test_utils = { path = "../../test_utils", features = ["tests"] } itertools = { workspace = true } +parquet = { workspace = true } tokio = { workspace = true } +uuid = { workspace = true } diff --git a/crates/catalog/s3tables/src/catalog.rs b/crates/catalog/s3tables/src/catalog.rs index b88bd77d29..833d058a31 100644 --- a/crates/catalog/s3tables/src/catalog.rs +++ b/crates/catalog/s3tables/src/catalog.rs @@ -707,6 +707,7 @@ where T: std::fmt::Debug { #[cfg(test)] mod tests { + use futures::TryStreamExt; use iceberg::spec::{NestedField, PrimitiveType, Schema, Type}; use iceberg::transaction::{ApplyTransactionAction, Transaction}; @@ -1175,4 +1176,108 @@ mod tests { assert_eq!(err.message(), "Catalog name cannot be empty"); } } + + /// Verify that an S3 Table catalog can create a table, write data, load the same table, and read from it. + #[tokio::test] + async fn test_s3tables_create_table_write_load_table_read() { + use iceberg::writer::base_writer::data_file_writer::DataFileWriterBuilder; + use iceberg::writer::file_writer::ParquetWriterBuilder; + use iceberg::writer::file_writer::location_generator::{ + DefaultFileNameGenerator, DefaultLocationGenerator, + }; + use iceberg::writer::file_writer::rolling_writer::RollingFileWriterBuilder; + use iceberg::writer::{IcebergWriter, IcebergWriterBuilder}; + + let catalog = match load_s3tables_catalog_from_env().await { + Ok(Some(c)) => c, + Ok(None) => return, + Err(e) => panic!("Error loading catalog: {e}"), + }; + + let ns = NamespaceIdent::new(format!("test_rw_{}", uuid::Uuid::new_v4().simple())); + catalog.create_namespace(&ns, HashMap::new()).await.unwrap(); + + let table_name = String::from("table"); + + let schema = Schema::builder() + .with_fields(vec![ + NestedField::required(1, "id", Type::Primitive(PrimitiveType::Int)).into(), + ]) + .build() + .unwrap(); + let creation = TableCreation::builder() + .name(table_name.clone()) + .schema(schema) + .build(); + + let table = catalog.create_table(&ns, creation).await.unwrap(); + + // Write one row. + let arrow_schema: Arc = Arc::new( + table + .metadata() + .current_schema() + .as_ref() + .try_into() + .unwrap(), + ); + let batch = arrow_array::RecordBatch::try_new(arrow_schema, vec![Arc::new( + arrow_array::Int32Array::from(vec![42]), + )]) + .unwrap(); + + // Locations will be generated based on the table metadata, which will be using `s3://` for Amazon S3 Tables. + let location_generator = DefaultLocationGenerator::new(table.metadata().clone()).unwrap(); + let file_name_generator = DefaultFileNameGenerator::new( + "test".to_string(), + None, + iceberg::spec::DataFileFormat::Parquet, + ); + let parquet_writer_builder = ParquetWriterBuilder::new( + parquet::file::properties::WriterProperties::default(), + table.metadata().current_schema().clone(), + ); + let rw = RollingFileWriterBuilder::new_with_default_file_size( + parquet_writer_builder, + table.file_io().clone(), + location_generator, + file_name_generator, + ); + let mut writer = DataFileWriterBuilder::new(rw).build(None).await.unwrap(); + writer.write(batch.clone()).await.unwrap(); + let data_files = writer.close().await.unwrap(); + + let tx = Transaction::new(&table); + let tx = tx + .fast_append() + .add_data_files(data_files) + .apply(tx) + .unwrap(); + tx.commit(&catalog).await.unwrap(); + + // Reload from catalog and read back. + let table_ident = TableIdent::new(ns.clone(), table_name.clone()); + let reloaded = catalog.load_table(&table_ident).await.unwrap(); + let batches: Vec = reloaded + .scan() + .select_all() + .build() + .expect("scan to be valid (snapshot exists, schema is OK)") + .to_arrow() + .await + .expect("scan tasks should be OK") + .try_collect() + .await + .expect("scan should complete successfully"); + + assert_eq!(batches.len(), 1); + assert_eq!( + batches[0], batch, + "read records should match records written earlier" + ); + + // Clean up. + catalog.purge_table(&table_ident).await.ok(); + catalog.drop_namespace(&ns).await.ok(); + } }