Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions .github/workflows/ci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -175,6 +175,7 @@ jobs:
env:
# Disable debug info to speed up compilation and reduce artifact size
RUSTFLAGS: "-C debuginfo=0"
ICEBERG_TEST_S3TABLES_ENDPOINT: "http://localhost:9000"
run: |
if [ "${{ matrix.test-suite.name }}" = "default" ]; then
cargo nextest run ${{ matrix.test-suite.args }}
Expand Down
6 changes: 3 additions & 3 deletions crates/catalog/hms/tests/hms_catalog_test.rs
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ use iceberg_catalog_hms::{
HmsCatalog, HmsCatalogBuilder, THRIFT_TRANSPORT_BUFFERED,
};
use iceberg_storage_opendal::OpenDalStorageFactory;
use iceberg_test_utils::{get_hms_endpoint, get_minio_endpoint, set_up};
use iceberg_test_utils::{get_hms_endpoint, get_s3_endpoint, set_up};
use tokio::time::sleep;
use tracing::info;

Expand All @@ -40,7 +40,7 @@ async fn get_catalog() -> HmsCatalog {
set_up();

let hms_endpoint = get_hms_endpoint();
let minio_endpoint = get_minio_endpoint();
let s3_endpoint = get_s3_endpoint();

let props = HashMap::from([
(HMS_CATALOG_PROP_URI.to_string(), hms_endpoint),
Expand All @@ -52,7 +52,7 @@ async fn get_catalog() -> HmsCatalog {
HMS_CATALOG_PROP_WAREHOUSE.to_string(),
"s3a://warehouse/hive".to_string(),
),
(S3_ENDPOINT.to_string(), minio_endpoint),
(S3_ENDPOINT.to_string(), s3_endpoint),
(S3_ACCESS_KEY_ID.to_string(), "admin".to_string()),
(S3_SECRET_ACCESS_KEY.to_string(), "password".to_string()),
(S3_REGION.to_string(), "us-east-1".to_string()),
Expand Down
10 changes: 5 additions & 5 deletions crates/catalog/loader/tests/common/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@ use iceberg_catalog_sql::{
};
use iceberg_storage_opendal::OpenDalStorageFactory;
use iceberg_test_utils::{
get_glue_endpoint, get_hms_endpoint, get_minio_endpoint, get_rest_catalog_endpoint, set_up,
get_glue_endpoint, get_hms_endpoint, get_rest_catalog_endpoint, get_s3_endpoint, set_up,
};
use sqlx::migrate::MigrateDatabase;
use tempfile::TempDir;
Expand Down Expand Up @@ -216,7 +216,7 @@ async fn rest_catalog() -> RestCatalog {

async fn glue_catalog() -> GlueCatalog {
let glue_endpoint = get_glue_endpoint();
let minio_endpoint = get_minio_endpoint();
let s3_endpoint = get_s3_endpoint();

let props = HashMap::from([
(AWS_ACCESS_KEY_ID.to_string(), "my_access_id".to_string()),
Expand All @@ -225,7 +225,7 @@ async fn glue_catalog() -> GlueCatalog {
"my_secret_key".to_string(),
),
(AWS_REGION_NAME.to_string(), "us-east-1".to_string()),
(S3_ENDPOINT.to_string(), minio_endpoint),
(S3_ENDPOINT.to_string(), s3_endpoint),
(S3_ACCESS_KEY_ID.to_string(), "admin".to_string()),
(S3_SECRET_ACCESS_KEY.to_string(), "password".to_string()),
(S3_REGION.to_string(), "us-east-1".to_string()),
Expand Down Expand Up @@ -264,7 +264,7 @@ async fn glue_catalog() -> GlueCatalog {

async fn hms_catalog() -> HmsCatalog {
let hms_endpoint = get_hms_endpoint();
let minio_endpoint = get_minio_endpoint();
let s3_endpoint = get_s3_endpoint();

let props = HashMap::from([
(HMS_CATALOG_PROP_URI.to_string(), hms_endpoint),
Expand All @@ -276,7 +276,7 @@ async fn hms_catalog() -> HmsCatalog {
HMS_CATALOG_PROP_WAREHOUSE.to_string(),
"s3a://warehouse/hive".to_string(),
),
(S3_ENDPOINT.to_string(), minio_endpoint),
(S3_ENDPOINT.to_string(), s3_endpoint),
(S3_ACCESS_KEY_ID.to_string(), "admin".to_string()),
(S3_SECRET_ACCESS_KEY.to_string(), "password".to_string()),
(S3_REGION.to_string(), "us-east-1".to_string()),
Expand Down
81 changes: 73 additions & 8 deletions crates/catalog/s3tables/src/catalog.rs
Original file line number Diff line number Diff line change
Expand Up @@ -378,7 +378,7 @@ impl Catalog for S3TablesCatalog {
match req.send().await {
Ok(_) => Ok(true),
Err(err) => {
if err.as_service_error().map(|e| e.is_not_found_exception()) == Some(true) {
if is_not_found_sdk_error(&err) {
Ok(false)
} else {
Err(from_aws_sdk_error(err))
Expand Down Expand Up @@ -606,7 +606,7 @@ impl Catalog for S3TablesCatalog {
match req.send().await {
Ok(_) => Ok(true),
Err(err) => {
if err.as_service_error().map(|e| e.is_not_found_exception()) == Some(true) {
if is_not_found_sdk_error(&err) {
Ok(false)
} else {
Err(from_aws_sdk_error(err))
Expand Down Expand Up @@ -696,6 +696,35 @@ impl Catalog for S3TablesCatalog {
}
}

/// Check if an AWS SDK error represents a "not found" condition for a
/// namespace or table resource.
///
/// The typed `NotFoundException` from the real AWS S3Tables API is matched by
/// checking the error code. S3Tables-compatible backends (e.g. SeaweedFS) may
/// return different error codes such as `NoSuchNamespace` or `NoSuchTable`
/// which the SDK deserializes as `Unhandled` variants. This function checks the
/// error code metadata to handle both cases.
///
/// Importantly, this does NOT treat `NoSuchBucket` as "not found" -- a missing
/// table bucket is an infrastructure/configuration error that should propagate.
fn is_not_found_sdk_error<E>(err: &aws_sdk_s3tables::error::SdkError<E>) -> bool
where E: aws_sdk_s3tables::error::ProvideErrorMetadata {
match err {
aws_sdk_s3tables::error::SdkError::ServiceError(service_err) => {
matches!(
service_err.err().code(),
Some(
"NotFoundException"
| "NoSuchNamespace"
| "NoSuchTable"
| "ResourceNotFoundException"
)
)
}
_ => false,
}
}

/// Format AWS SDK error into iceberg error
pub(crate) fn from_aws_sdk_error<T>(error: aws_sdk_s3tables::error::SdkError<T>) -> Error
where T: std::fmt::Debug {
Expand All @@ -712,6 +741,7 @@ mod tests {

use super::*;

/// Load a catalog configured for real AWS S3Tables (requires TABLE_BUCKET_ARN).
async fn load_s3tables_catalog_from_env() -> Result<Option<S3TablesCatalog>> {
let table_bucket_arn = match std::env::var("TABLE_BUCKET_ARN").ok() {
Some(table_bucket_arn) => table_bucket_arn,
Expand All @@ -729,6 +759,41 @@ mod tests {
Ok(Some(S3TablesCatalog::new(config, None).await?))
}

/// Load a catalog configured for local SeaweedFS S3Tables testing.
/// Falls back to real AWS if TABLE_BUCKET_ARN is set, otherwise checks
/// ICEBERG_TEST_S3TABLES_ENDPOINT for a local SeaweedFS instance.
async fn load_s3tables_catalog_for_test() -> Result<Option<S3TablesCatalog>> {
// Prefer real AWS if TABLE_BUCKET_ARN is set
if let Some(catalog) = load_s3tables_catalog_from_env().await? {
return Ok(Some(catalog));
}

// Fall back to local SeaweedFS via ICEBERG_TEST_S3TABLES_ENDPOINT
let endpoint_url = match std::env::var("ICEBERG_TEST_S3TABLES_ENDPOINT").ok() {
Some(url) => url,
None => return Ok(None),
};

let table_bucket_arn = std::env::var("TABLE_BUCKET_ARN").unwrap_or_else(|_| {
"arn:aws:s3tables:us-east-1:000000000000:bucket/iceberg-test".to_string()
});

let mut props = HashMap::new();
props.insert("aws_access_key_id".to_string(), "admin".to_string());
props.insert("aws_secret_access_key".to_string(), "password".to_string());
props.insert("region_name".to_string(), "us-east-1".to_string());

let config = S3TablesCatalogConfig {
name: None,
table_bucket_arn,
endpoint_url: Some(endpoint_url),
client: None,
props,
};

Ok(Some(S3TablesCatalog::new(config, None).await?))
}

#[tokio::test]
async fn test_s3tables_list_namespace() {
let catalog = match load_s3tables_catalog_from_env().await {
Expand Down Expand Up @@ -776,7 +841,7 @@ mod tests {

#[tokio::test]
async fn test_s3tables_create_delete_namespace() {
let catalog = match load_s3tables_catalog_from_env().await {
let catalog = match load_s3tables_catalog_for_test().await {
Ok(Some(catalog)) => catalog,
Ok(None) => return,
Err(e) => panic!("Error loading catalog: {e}"),
Expand All @@ -794,7 +859,7 @@ mod tests {

#[tokio::test]
async fn test_s3tables_create_delete_table() {
let catalog = match load_s3tables_catalog_from_env().await {
let catalog = match load_s3tables_catalog_for_test().await {
Ok(Some(catalog)) => catalog,
Ok(None) => return,
Err(e) => panic!("Error loading catalog: {e}"),
Expand All @@ -821,23 +886,23 @@ mod tests {
namespace.clone(),
"test_s3tables_create_delete_table".to_string(),
);
catalog.purge_table(&table_ident).await.ok();
catalog.drop_namespace(&namespace).await.ok();
catalog.drop_table(&table_ident).await.ok();

catalog
.create_namespace(&namespace, HashMap::new())
.await
.unwrap();
catalog.create_table(&namespace, creation).await.unwrap();
assert!(catalog.table_exists(&table_ident).await.unwrap());
catalog.drop_table(&table_ident).await.unwrap();
catalog.purge_table(&table_ident).await.unwrap();
assert!(!catalog.table_exists(&table_ident).await.unwrap());
catalog.drop_namespace(&namespace).await.unwrap();
}

#[tokio::test]
async fn test_s3tables_update_table() {
let catalog = match load_s3tables_catalog_from_env().await {
let catalog = match load_s3tables_catalog_for_test().await {
Ok(Some(catalog)) => catalog,
Ok(None) => return,
Err(e) => panic!("Error loading catalog: {e}"),
Expand All @@ -849,7 +914,7 @@ mod tests {
TableIdent::new(namespace.clone(), "test_s3tables_update_table".to_string());

// Clean up any existing resources from previous test runs
catalog.drop_table(&table_ident).await.ok();
catalog.purge_table(&table_ident).await.ok();
catalog.drop_namespace(&namespace).await.ok();

// Create namespace and table
Expand Down
6 changes: 3 additions & 3 deletions crates/integration_tests/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ use std::sync::OnceLock;

use iceberg::io::{S3_ACCESS_KEY_ID, S3_ENDPOINT, S3_REGION, S3_SECRET_ACCESS_KEY};
use iceberg_catalog_rest::REST_CATALOG_PROP_URI;
use iceberg_test_utils::{get_minio_endpoint, get_rest_catalog_endpoint, set_up};
use iceberg_test_utils::{get_rest_catalog_endpoint, get_s3_endpoint, set_up};

/// Global test fixture that uses environment-based configuration.
/// This assumes Docker containers are started externally (e.g., via `make docker-up`).
Expand All @@ -37,11 +37,11 @@ impl GlobalTestFixture {
set_up();

let rest_endpoint = get_rest_catalog_endpoint();
let minio_endpoint = get_minio_endpoint();
let s3_endpoint = get_s3_endpoint();

let catalog_config = HashMap::from([
(REST_CATALOG_PROP_URI.to_string(), rest_endpoint),
(S3_ENDPOINT.to_string(), minio_endpoint),
(S3_ENDPOINT.to_string(), s3_endpoint),
(S3_ACCESS_KEY_ID.to_string(), "admin".to_string()),
(S3_SECRET_ACCESS_KEY.to_string(), "password".to_string()),
(S3_REGION.to_string(), "us-east-1".to_string()),
Expand Down
20 changes: 10 additions & 10 deletions crates/storage/opendal/tests/file_io_s3_test.rs
Original file line number Diff line number Diff line change
Expand Up @@ -29,21 +29,21 @@ mod tests {
FileIO, FileIOBuilder, S3_ACCESS_KEY_ID, S3_ENDPOINT, S3_REGION, S3_SECRET_ACCESS_KEY,
};
use iceberg_storage_opendal::{CustomAwsCredentialLoader, OpenDalStorageFactory};
use iceberg_test_utils::{get_minio_endpoint, normalize_test_name_with_parts, set_up};
use iceberg_test_utils::{get_s3_endpoint, normalize_test_name_with_parts, set_up};
use reqsign::{AwsCredential, AwsCredentialLoad};
use reqwest::Client;

async fn get_file_io() -> FileIO {
set_up();

let minio_endpoint = get_minio_endpoint();
let s3_endpoint = get_s3_endpoint();

FileIOBuilder::new(Arc::new(OpenDalStorageFactory::S3 {
configured_scheme: "s3".to_string(),
customized_credential_load: None,
}))
.with_props(vec![
(S3_ENDPOINT, minio_endpoint),
(S3_ENDPOINT, s3_endpoint),
(S3_ACCESS_KEY_ID, "admin".to_string()),
(S3_SECRET_ACCESS_KEY, "password".to_string()),
(S3_REGION, "us-east-1".to_string()),
Expand Down Expand Up @@ -107,7 +107,7 @@ mod tests {
Self { credential }
}

fn new_minio() -> Self {
fn new_s3_test() -> Self {
Self::new(Some(AwsCredential {
access_key_id: "admin".to_string(),
secret_access_key: "password".to_string(),
Expand All @@ -127,7 +127,7 @@ mod tests {
#[test]
fn test_custom_aws_credential_loader_instantiation() {
// Test creating CustomAwsCredentialLoader with mock loader
let mock_loader = MockCredentialLoader::new_minio();
let mock_loader = MockCredentialLoader::new_s3_test();
let custom_loader = CustomAwsCredentialLoader::new(Arc::new(mock_loader));

// Test that the loader can be used in FileIOBuilder with OpenDalStorageFactory
Expand All @@ -147,18 +147,18 @@ mod tests {
let _file_io = get_file_io().await;

// Create a mock credential loader
let mock_loader = MockCredentialLoader::new_minio();
let mock_loader = MockCredentialLoader::new_s3_test();
let custom_loader = CustomAwsCredentialLoader::new(Arc::new(mock_loader));

let minio_endpoint = get_minio_endpoint();
let s3_endpoint = get_s3_endpoint();

// Build FileIO with custom credential loader via OpenDalStorageFactory
let file_io_with_custom_creds = FileIOBuilder::new(Arc::new(OpenDalStorageFactory::S3 {
configured_scheme: "s3".to_string(),
customized_credential_load: Some(custom_loader),
}))
.with_props(vec![
(S3_ENDPOINT, minio_endpoint),
(S3_ENDPOINT, s3_endpoint),
(S3_REGION, "us-east-1".to_string()),
])
.build();
Expand All @@ -178,15 +178,15 @@ mod tests {
let mock_loader = MockCredentialLoader::new(None);
let custom_loader = CustomAwsCredentialLoader::new(Arc::new(mock_loader));

let minio_endpoint = get_minio_endpoint();
let s3_endpoint = get_s3_endpoint();

// Build FileIO with custom credential loader via OpenDalStorageFactory
let file_io_with_custom_creds = FileIOBuilder::new(Arc::new(OpenDalStorageFactory::S3 {
configured_scheme: "s3".to_string(),
customized_credential_load: Some(custom_loader),
}))
.with_props(vec![
(S3_ENDPOINT, minio_endpoint),
(S3_ENDPOINT, s3_endpoint),
(S3_REGION, "us-east-1".to_string()),
])
.build();
Expand Down
16 changes: 8 additions & 8 deletions crates/storage/opendal/tests/resolving_storage_test.rs
Original file line number Diff line number Diff line change
Expand Up @@ -32,16 +32,16 @@ mod tests {
FileIOBuilder, S3_ACCESS_KEY_ID, S3_ENDPOINT, S3_REGION, S3_SECRET_ACCESS_KEY,
};
use iceberg_storage_opendal::OpenDalResolvingStorageFactory;
use iceberg_test_utils::{get_minio_endpoint, normalize_test_name_with_parts, set_up};
use iceberg_test_utils::{get_s3_endpoint, normalize_test_name_with_parts, set_up};

fn get_resolving_file_io() -> iceberg::io::FileIO {
set_up();

let minio_endpoint = get_minio_endpoint();
let s3_endpoint = get_s3_endpoint();

FileIOBuilder::new(Arc::new(OpenDalResolvingStorageFactory::new()))
.with_props(vec![
(S3_ENDPOINT, minio_endpoint),
(S3_ENDPOINT, s3_endpoint),
(S3_ACCESS_KEY_ID, "admin".to_string()),
(S3_SECRET_ACCESS_KEY, "password".to_string()),
(S3_REGION, "us-east-1".to_string()),
Expand Down Expand Up @@ -260,10 +260,10 @@ mod tests {
use reqsign::{AwsCredential, AwsCredentialLoad};
use reqwest::Client;

struct MinioCredentialLoader;
struct S3TestCredentialLoader;

#[async_trait]
impl AwsCredentialLoad for MinioCredentialLoader {
impl AwsCredentialLoad for S3TestCredentialLoader {
async fn load_credential(
&self,
_client: Client,
Expand All @@ -278,15 +278,15 @@ mod tests {
}

set_up();
let minio_endpoint = get_minio_endpoint();
let s3_endpoint = get_s3_endpoint();

let factory = OpenDalResolvingStorageFactory::new().with_s3_credential_loader(
CustomAwsCredentialLoader::new(Arc::new(MinioCredentialLoader)),
CustomAwsCredentialLoader::new(Arc::new(S3TestCredentialLoader)),
);

let file_io = FileIOBuilder::new(Arc::new(factory))
.with_props(vec![
(S3_ENDPOINT, minio_endpoint),
(S3_ENDPOINT, s3_endpoint),
(S3_REGION, "us-east-1".to_string()),
])
.build();
Expand Down
Loading
Loading