Skip to content

Commit 9e2ce84

Browse files
committed
tests(catalog): add s3tables end-to-end test to verify we can create and load tables and read and write their data
Signed-off-by: Daniel Carl Jones <djonesoa@amazon.com>
1 parent 626de2e commit 9e2ce84

3 files changed

Lines changed: 115 additions & 0 deletions

File tree

Cargo.lock

Lines changed: 5 additions & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

crates/catalog/s3tables/Cargo.toml

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -39,6 +39,11 @@ iceberg-storage-opendal = { workspace = true, features = ["opendal-s3"] }
3939

4040

4141
[dev-dependencies]
42+
arrow-array = { workspace = true }
43+
arrow-schema = { workspace = true }
44+
futures = { workspace = true }
4245
iceberg_test_utils = { path = "../../test_utils", features = ["tests"] }
4346
itertools = { workspace = true }
47+
parquet = { workspace = true }
4448
tokio = { workspace = true }
49+
uuid = { workspace = true }

crates/catalog/s3tables/src/catalog.rs

Lines changed: 105 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -707,6 +707,7 @@ where T: std::fmt::Debug {
707707

708708
#[cfg(test)]
709709
mod tests {
710+
use futures::TryStreamExt;
710711
use iceberg::spec::{NestedField, PrimitiveType, Schema, Type};
711712
use iceberg::transaction::{ApplyTransactionAction, Transaction};
712713

@@ -1175,4 +1176,108 @@ mod tests {
11751176
assert_eq!(err.message(), "Catalog name cannot be empty");
11761177
}
11771178
}
1179+
1180+
/// Verify that an S3 Table catalog can create a table, write data, load the same table, and read from it.
1181+
#[tokio::test]
1182+
async fn test_s3tables_create_table_write_load_table_read() {
1183+
use iceberg::writer::base_writer::data_file_writer::DataFileWriterBuilder;
1184+
use iceberg::writer::file_writer::ParquetWriterBuilder;
1185+
use iceberg::writer::file_writer::location_generator::{
1186+
DefaultFileNameGenerator, DefaultLocationGenerator,
1187+
};
1188+
use iceberg::writer::file_writer::rolling_writer::RollingFileWriterBuilder;
1189+
use iceberg::writer::{IcebergWriter, IcebergWriterBuilder};
1190+
1191+
let catalog = match load_s3tables_catalog_from_env().await {
1192+
Ok(Some(c)) => c,
1193+
Ok(None) => return,
1194+
Err(e) => panic!("Error loading catalog: {e}"),
1195+
};
1196+
1197+
let ns = NamespaceIdent::new(format!("test_rw_{}", uuid::Uuid::new_v4().simple()));
1198+
catalog.create_namespace(&ns, HashMap::new()).await.unwrap();
1199+
1200+
let table_name = String::from("table");
1201+
1202+
let schema = Schema::builder()
1203+
.with_fields(vec![
1204+
NestedField::required(1, "id", Type::Primitive(PrimitiveType::Int)).into(),
1205+
])
1206+
.build()
1207+
.unwrap();
1208+
let creation = TableCreation::builder()
1209+
.name(table_name.clone())
1210+
.schema(schema)
1211+
.build();
1212+
1213+
let table = catalog.create_table(&ns, creation).await.unwrap();
1214+
1215+
// Write one row.
1216+
let arrow_schema: Arc<arrow_schema::Schema> = Arc::new(
1217+
table
1218+
.metadata()
1219+
.current_schema()
1220+
.as_ref()
1221+
.try_into()
1222+
.unwrap(),
1223+
);
1224+
let batch = arrow_array::RecordBatch::try_new(arrow_schema, vec![Arc::new(
1225+
arrow_array::Int32Array::from(vec![42]),
1226+
)])
1227+
.unwrap();
1228+
1229+
// Locations will be generated based on the table metadata, which will be using `s3://` for Amazon S3 Tables.
1230+
let location_generator = DefaultLocationGenerator::new(table.metadata().clone()).unwrap();
1231+
let file_name_generator = DefaultFileNameGenerator::new(
1232+
"test".to_string(),
1233+
None,
1234+
iceberg::spec::DataFileFormat::Parquet,
1235+
);
1236+
let parquet_writer_builder = ParquetWriterBuilder::new(
1237+
parquet::file::properties::WriterProperties::default(),
1238+
table.metadata().current_schema().clone(),
1239+
);
1240+
let rw = RollingFileWriterBuilder::new_with_default_file_size(
1241+
parquet_writer_builder,
1242+
table.file_io().clone(),
1243+
location_generator,
1244+
file_name_generator,
1245+
);
1246+
let mut writer = DataFileWriterBuilder::new(rw).build(None).await.unwrap();
1247+
writer.write(batch.clone()).await.unwrap();
1248+
let data_files = writer.close().await.unwrap();
1249+
1250+
let tx = Transaction::new(&table);
1251+
let tx = tx
1252+
.fast_append()
1253+
.add_data_files(data_files)
1254+
.apply(tx)
1255+
.unwrap();
1256+
tx.commit(&catalog).await.unwrap();
1257+
1258+
// Reload from catalog and read back.
1259+
let table_ident = TableIdent::new(ns.clone(), table_name.clone());
1260+
let reloaded = catalog.load_table(&table_ident).await.unwrap();
1261+
let batches: Vec<arrow_array::RecordBatch> = reloaded
1262+
.scan()
1263+
.select_all()
1264+
.build()
1265+
.expect("scan to be valid (snapshot exists, schema is OK)")
1266+
.to_arrow()
1267+
.await
1268+
.expect("scan tasks should be OK")
1269+
.try_collect()
1270+
.await
1271+
.expect("scan should complete successfully");
1272+
1273+
assert_eq!(batches.len(), 1);
1274+
assert_eq!(
1275+
batches[0], batch,
1276+
"read records should match records written earlier"
1277+
);
1278+
1279+
// Clean up.
1280+
catalog.purge_table(&table_ident).await.ok();
1281+
catalog.drop_namespace(&ns).await.ok();
1282+
}
11781283
}

0 commit comments

Comments
 (0)