diff --git a/src/iceberg/arrow/arrow_fs_file_io.cc b/src/iceberg/arrow/arrow_fs_file_io.cc index 769fcfb13..b1515d379 100644 --- a/src/iceberg/arrow/arrow_fs_file_io.cc +++ b/src/iceberg/arrow/arrow_fs_file_io.cc @@ -79,6 +79,25 @@ Status ArrowFileSystemFileIO::WriteFile(const std::string& file_location, return {}; } +Result> +ArrowFileSystemFileIO::OpenInputFile(const std::string& file_location, + std::optional length) { + ICEBERG_ASSIGN_OR_RAISE(auto path, ResolvePath(file_location)); + ::arrow::fs::FileInfo file_info(path, ::arrow::fs::FileType::File); + if (length.has_value()) { + file_info.set_size(length.value()); + } + ICEBERG_ARROW_ASSIGN_OR_RETURN(auto file, arrow_fs_->OpenInputFile(file_info)); + return file; +} + +Result> +ArrowFileSystemFileIO::OpenOutputStream(const std::string& file_location) { + ICEBERG_ASSIGN_OR_RAISE(auto path, ResolvePath(file_location)); + ICEBERG_ARROW_ASSIGN_OR_RETURN(auto file, arrow_fs_->OpenOutputStream(path)); + return file; +} + /// \brief Delete a file at the given location. Status ArrowFileSystemFileIO::DeleteFile(const std::string& file_location) { ICEBERG_ASSIGN_OR_RAISE(auto path, ResolvePath(file_location)); diff --git a/src/iceberg/arrow/arrow_fs_file_io_internal.h b/src/iceberg/arrow/arrow_fs_file_io_internal.h index 92a991501..c25c3339d 100644 --- a/src/iceberg/arrow/arrow_fs_file_io_internal.h +++ b/src/iceberg/arrow/arrow_fs_file_io_internal.h @@ -20,8 +20,10 @@ #pragma once #include +#include #include +#include #include "iceberg/file_io.h" #include "iceberg/iceberg_bundle_export.h" @@ -52,6 +54,27 @@ class ICEBERG_BUNDLE_EXPORT ArrowFileSystemFileIO : public FileIO { /// \brief Delete a file at the given location. Status DeleteFile(const std::string& file_location) override; + /// \brief Open an input file for reading, resolving any URI scheme in the path. + /// + /// This method resolves URI schemes (e.g., "s3://bucket/key" -> "bucket/key") + /// before opening the file. Use this instead of calling fs()->OpenInputFile() + /// directly to ensure URI resolution is applied consistently. + /// + /// \param file_location The file location, which may contain a URI scheme. + /// \param length Optional file size hint for the Arrow filesystem. + Result> OpenInputFile( + const std::string& file_location, std::optional length = std::nullopt); + + /// \brief Open an output stream for writing, resolving any URI scheme in the path. + /// + /// This method resolves URI schemes (e.g., "s3://bucket/key" -> "bucket/key") + /// before opening the stream. Use this instead of calling fs()->OpenOutputStream() + /// directly to ensure URI resolution is applied consistently. + /// + /// \param file_location The file location, which may contain a URI scheme. + Result> OpenOutputStream( + const std::string& file_location); + /// \brief Get the Arrow file system. const std::shared_ptr<::arrow::fs::FileSystem>& fs() const { return arrow_fs_; } diff --git a/src/iceberg/avro/avro_reader.cc b/src/iceberg/avro/avro_reader.cc index f4985d9ac..51de2a2be 100644 --- a/src/iceberg/avro/avro_reader.cc +++ b/src/iceberg/avro/avro_reader.cc @@ -23,7 +23,6 @@ #include #include -#include #include #include #include @@ -51,13 +50,8 @@ namespace { Result> CreateInputStream(const ReaderOptions& options, int64_t buffer_size) { - ::arrow::fs::FileInfo file_info(options.path, ::arrow::fs::FileType::File); - if (options.length) { - file_info.set_size(options.length.value()); - } - auto io = internal::checked_pointer_cast(options.io); - ICEBERG_ARROW_ASSIGN_OR_RETURN(auto file, io->fs()->OpenInputFile(file_info)); + ICEBERG_ASSIGN_OR_RAISE(auto file, io->OpenInputFile(options.path, options.length)); return std::make_unique(file, buffer_size); } diff --git a/src/iceberg/avro/avro_writer.cc b/src/iceberg/avro/avro_writer.cc index 32ce3f634..7987d67b2 100644 --- a/src/iceberg/avro/avro_writer.cc +++ b/src/iceberg/avro/avro_writer.cc @@ -50,7 +50,7 @@ namespace { Result> CreateOutputStream(const WriterOptions& options, int64_t buffer_size) { auto io = internal::checked_pointer_cast(options.io); - ICEBERG_ARROW_ASSIGN_OR_RETURN(auto output, io->fs()->OpenOutputStream(options.path)); + ICEBERG_ASSIGN_OR_RAISE(auto output, io->OpenOutputStream(options.path)); return std::make_unique(output, buffer_size); } diff --git a/src/iceberg/catalog/rest/constant.h b/src/iceberg/catalog/rest/constant.h index 0a6e8d0c0..ed0bdf9a2 100644 --- a/src/iceberg/catalog/rest/constant.h +++ b/src/iceberg/catalog/rest/constant.h @@ -38,6 +38,9 @@ inline const std::string kMimeTypeFormUrlEncoded = "application/x-www-form-urlen inline const std::string kUserAgentPrefix = "iceberg-cpp/"; inline const std::string kUserAgent = "iceberg-cpp/" ICEBERG_VERSION_STRING; +inline const std::string kHeaderAccessDelegation = "X-Iceberg-Access-Delegation"; +inline const std::string kAccessDelegationVendedCredentials = "vended-credentials"; + inline const std::string kQueryParamParent = "parent"; inline const std::string kQueryParamPageToken = "page_token"; diff --git a/src/iceberg/catalog/rest/json_serde.cc b/src/iceberg/catalog/rest/json_serde.cc index eebdc1969..d5caf00ae 100644 --- a/src/iceberg/catalog/rest/json_serde.cc +++ b/src/iceberg/catalog/rest/json_serde.cc @@ -72,6 +72,8 @@ constexpr std::string_view kStack = "stack"; constexpr std::string_view kError = "error"; constexpr std::string_view kIdentifier = "identifier"; constexpr std::string_view kRequirements = "requirements"; +constexpr std::string_view kStorageCredentials = "storage-credentials"; +constexpr std::string_view kPrefix = "prefix"; constexpr std::string_view kAccessToken = "access_token"; constexpr std::string_view kTokenType = "token_type"; constexpr std::string_view kExpiresIn = "expires_in"; @@ -217,12 +219,35 @@ Result RenameTableRequestFromJson(const nlohmann::json& json return request; } +// StorageCredential serialization +nlohmann::json ToJson(const StorageCredential& cred) { + nlohmann::json json; + json[kPrefix] = cred.prefix; + json[kConfig] = cred.config; + return json; +} + +Result StorageCredentialFromJson(const nlohmann::json& json) { + StorageCredential cred; + ICEBERG_ASSIGN_OR_RAISE(cred.prefix, GetJsonValue(json, kPrefix)); + ICEBERG_ASSIGN_OR_RAISE(cred.config, + GetJsonValueOrDefault(json, kConfig)); + return cred; +} + // LoadTableResult (used by CreateTableResponse, LoadTableResponse) nlohmann::json ToJson(const LoadTableResult& result) { nlohmann::json json; SetOptionalStringField(json, kMetadataLocation, result.metadata_location); json[kMetadata] = ToJson(*result.metadata); SetContainerField(json, kConfig, result.config); + if (!result.storage_credentials.empty()) { + nlohmann::json creds_json = nlohmann::json::array(); + for (const auto& cred : result.storage_credentials) { + creds_json.push_back(ToJson(cred)); + } + json[kStorageCredentials] = std::move(creds_json); + } return json; } @@ -235,6 +260,14 @@ Result LoadTableResultFromJson(const nlohmann::json& json) { ICEBERG_ASSIGN_OR_RAISE(result.metadata, TableMetadataFromJson(metadata_json)); ICEBERG_ASSIGN_OR_RAISE(result.config, GetJsonValueOrDefault(json, kConfig)); + if (json.contains(kStorageCredentials)) { + ICEBERG_ASSIGN_OR_RAISE(auto creds_json, + GetJsonValue(json, kStorageCredentials)); + for (const auto& cred_json : creds_json) { + ICEBERG_ASSIGN_OR_RAISE(auto cred, StorageCredentialFromJson(cred_json)); + result.storage_credentials.push_back(std::move(cred)); + } + } ICEBERG_RETURN_UNEXPECTED(result.Validate()); return result; } @@ -528,5 +561,6 @@ ICEBERG_DEFINE_FROM_JSON(CreateTableRequest) ICEBERG_DEFINE_FROM_JSON(CommitTableRequest) ICEBERG_DEFINE_FROM_JSON(CommitTableResponse) ICEBERG_DEFINE_FROM_JSON(OAuthTokenResponse) +ICEBERG_DEFINE_FROM_JSON(StorageCredential) } // namespace iceberg::rest diff --git a/src/iceberg/catalog/rest/json_serde_internal.h b/src/iceberg/catalog/rest/json_serde_internal.h index 820e077d7..ca87e9594 100644 --- a/src/iceberg/catalog/rest/json_serde_internal.h +++ b/src/iceberg/catalog/rest/json_serde_internal.h @@ -59,6 +59,7 @@ ICEBERG_DECLARE_JSON_SERDE(CreateTableRequest) ICEBERG_DECLARE_JSON_SERDE(CommitTableRequest) ICEBERG_DECLARE_JSON_SERDE(CommitTableResponse) ICEBERG_DECLARE_JSON_SERDE(OAuthTokenResponse) +ICEBERG_DECLARE_JSON_SERDE(StorageCredential) #undef ICEBERG_DECLARE_JSON_SERDE diff --git a/src/iceberg/catalog/rest/rest_catalog.cc b/src/iceberg/catalog/rest/rest_catalog.cc index a0267adcb..2e9b8ddb1 100644 --- a/src/iceberg/catalog/rest/rest_catalog.cc +++ b/src/iceberg/catalog/rest/rest_catalog.cc @@ -166,7 +166,9 @@ Result> RestCatalog::Make( // Get snapshot loading mode ICEBERG_ASSIGN_OR_RAISE(auto snapshot_mode, final_config.SnapshotLoadingMode()); - auto client = std::make_unique(final_config.ExtractHeaders()); + auto default_headers = final_config.ExtractHeaders(); + default_headers.emplace(kHeaderAccessDelegation, kAccessDelegationVendedCredentials); + auto client = std::make_unique(std::move(default_headers)); ICEBERG_ASSIGN_OR_RAISE(auto catalog_session, auth_manager->CatalogSession(*client, final_config.configs())); @@ -361,8 +363,13 @@ Result> RestCatalog::CreateTable( ICEBERG_ASSIGN_OR_RAISE(auto result, CreateTableInternal(identifier, schema, spec, order, location, properties, /*stage_create=*/false)); + ICEBERG_ASSIGN_OR_RAISE( + auto table_io, + ResolveTableFileIO(file_io_, config_.configs(), + config_.Get(RestCatalogProperties::kWarehouse), result)); return Table::Make(identifier, std::move(result.metadata), - std::move(result.metadata_location), file_io_, shared_from_this()); + std::move(result.metadata_location), std::move(table_io), + shared_from_this()); } Result> RestCatalog::UpdateTable( @@ -404,10 +411,14 @@ Result> RestCatalog::StageCreateTable( ICEBERG_ASSIGN_OR_RAISE(auto result, CreateTableInternal(identifier, schema, spec, order, location, properties, /*stage_create=*/true)); + ICEBERG_ASSIGN_OR_RAISE( + auto table_io, + ResolveTableFileIO(file_io_, config_.configs(), + config_.Get(RestCatalogProperties::kWarehouse), result)); ICEBERG_ASSIGN_OR_RAISE(auto staged_table, StagedTable::Make(identifier, std::move(result.metadata), - std::move(result.metadata_location), file_io_, - shared_from_this())); + std::move(result.metadata_location), + std::move(table_io), shared_from_this())); return Transaction::Make(std::move(staged_table), TransactionKind::kCreate); } @@ -474,9 +485,13 @@ Result> RestCatalog::LoadTable(const TableIdentifier& ide ICEBERG_ASSIGN_OR_RAISE(const auto body, LoadTableInternal(identifier)); ICEBERG_ASSIGN_OR_RAISE(auto json, FromJsonString(body)); ICEBERG_ASSIGN_OR_RAISE(auto load_result, LoadTableResultFromJson(json)); - /// FIXME: support per-table FileIO creation + + ICEBERG_ASSIGN_OR_RAISE( + auto table_io, + ResolveTableFileIO(file_io_, config_.configs(), + config_.Get(RestCatalogProperties::kWarehouse), load_result)); return Table::Make(identifier, std::move(load_result.metadata), - std::move(load_result.metadata_location), file_io_, + std::move(load_result.metadata_location), std::move(table_io), shared_from_this()); } @@ -498,8 +513,12 @@ Result> RestCatalog::RegisterTable( ICEBERG_ASSIGN_OR_RAISE(auto json, FromJsonString(response.body())); ICEBERG_ASSIGN_OR_RAISE(auto load_result, LoadTableResultFromJson(json)); + ICEBERG_ASSIGN_OR_RAISE( + auto table_io, + ResolveTableFileIO(file_io_, config_.configs(), + config_.Get(RestCatalogProperties::kWarehouse), load_result)); return Table::Make(identifier, std::move(load_result.metadata), - std::move(load_result.metadata_location), file_io_, + std::move(load_result.metadata_location), std::move(table_io), shared_from_this()); } diff --git a/src/iceberg/catalog/rest/rest_file_io.cc b/src/iceberg/catalog/rest/rest_file_io.cc index f08a03353..a2cc0bbb6 100644 --- a/src/iceberg/catalog/rest/rest_file_io.cc +++ b/src/iceberg/catalog/rest/rest_file_io.cc @@ -20,7 +20,9 @@ #include "iceberg/catalog/rest/rest_file_io.h" #include +#include +#include "iceberg/catalog/rest/catalog_properties.h" #include "iceberg/file_io_registry.h" #include "iceberg/util/macros.h" @@ -69,12 +71,13 @@ Result> MakeCatalogFileIO(const RestCatalogProperties& c if (io_impl.empty()) { if (warehouse.empty()) { - return InvalidArgument(R"("{}" or "{}" property is required to create FileIO)", - RestCatalogProperties::kIOImpl.key(), - RestCatalogProperties::kWarehouse.key()); + // No io-impl or warehouse configured. Fall back to a local FileIO as a + // default — enabling per-table ResolveTableFileIO (vending). + io_impl = std::string(FileIORegistry::kArrowLocalFileIO); + } else { + ICEBERG_ASSIGN_OR_RAISE(const auto detected_kind, DetectBuiltinFileIO(warehouse)); + io_impl = std::string(BuiltinFileIOName(detected_kind)); } - ICEBERG_ASSIGN_OR_RAISE(const auto detected_kind, DetectBuiltinFileIO(warehouse)); - io_impl = std::string(BuiltinFileIOName(detected_kind)); } if (!warehouse.empty() && IsBuiltinImpl(io_impl)) { @@ -92,4 +95,71 @@ Result> MakeCatalogFileIO(const RestCatalogProperties& c return FileIORegistry::Load(io_impl, config.configs()); } +namespace { + +const StorageCredential* ResolveStorageCredential( + const std::vector& credentials, std::string_view location) { + const StorageCredential* best = nullptr; + for (const auto& cred : credentials) { + if (location.starts_with(cred.prefix)) { + if (!best || cred.prefix.size() > best->prefix.size()) { + best = &cred; + } + } + } + return best; +} + +std::unordered_map MergeTableProperties( + const std::unordered_map& catalog_props, + const std::unordered_map& table_config, + const std::unordered_map& credential_config) { + auto merged = catalog_props; + for (const auto& [k, v] : table_config) { + merged[k] = v; + } + for (const auto& [k, v] : credential_config) { + merged[k] = v; + } + return merged; +} + +} // namespace + +Result> ResolveTableFileIO( + const std::shared_ptr& catalog_io, + const std::unordered_map& catalog_props, + const std::string& warehouse, const LoadTableResult& result) { + if (result.config.empty() && result.storage_credentials.empty()) { + return catalog_io; + } + + // Merge order: catalog props < table config < storage credentials (highest priority). + const StorageCredential* cred = nullptr; + if (!result.metadata_location.empty()) { + cred = ResolveStorageCredential(result.storage_credentials, result.metadata_location); + } + static const std::unordered_map kEmpty; + auto merged = + MergeTableProperties(catalog_props, result.config, cred ? cred->config : kEmpty); + + // Detect FileIO type: explicit io-impl > warehouse scheme > metadata_location scheme. + std::string io_impl; + if (auto it = merged.find(std::string(RestCatalogProperties::kIOImpl.key())); + it != merged.end()) { + io_impl = it->second; + } else if (!warehouse.empty()) { + ICEBERG_ASSIGN_OR_RAISE(auto kind, DetectBuiltinFileIO(warehouse)); + io_impl = std::string(BuiltinFileIOName(kind)); + } else if (!result.metadata_location.empty()) { + ICEBERG_ASSIGN_OR_RAISE(auto kind, DetectBuiltinFileIO(result.metadata_location)); + io_impl = std::string(BuiltinFileIOName(kind)); + } else { + return catalog_io; + } + + ICEBERG_ASSIGN_OR_RAISE(auto table_io, FileIORegistry::Load(io_impl, merged)); + return std::shared_ptr(std::move(table_io)); +} + } // namespace iceberg::rest diff --git a/src/iceberg/catalog/rest/rest_file_io.h b/src/iceberg/catalog/rest/rest_file_io.h index 68482521a..9b7d71768 100644 --- a/src/iceberg/catalog/rest/rest_file_io.h +++ b/src/iceberg/catalog/rest/rest_file_io.h @@ -21,10 +21,13 @@ #include #include +#include #include +#include #include "iceberg/catalog/rest/catalog_properties.h" #include "iceberg/catalog/rest/iceberg_rest_export.h" +#include "iceberg/catalog/rest/types.h" #include "iceberg/file_io.h" #include "iceberg/file_io_registry.h" #include "iceberg/result.h" @@ -44,4 +47,15 @@ ICEBERG_REST_EXPORT std::string_view BuiltinFileIOName(BuiltinFileIOKind kind); ICEBERG_REST_EXPORT Result> MakeCatalogFileIO( const RestCatalogProperties& config); +/// \brief Resolve a per-table FileIO from a LoadTableResult. +/// +/// Merges catalog properties, table config, and the best-matching storage +/// credential (longest prefix match on metadata_location), then creates a +/// per-table FileIO via FileIORegistry. Falls back to \p catalog_io when +/// no per-table config is present or the FileIO type cannot be detected. +ICEBERG_REST_EXPORT Result> ResolveTableFileIO( + const std::shared_ptr& catalog_io, + const std::unordered_map& catalog_props, + const std::string& warehouse, const LoadTableResult& result); + } // namespace iceberg::rest diff --git a/src/iceberg/catalog/rest/types.h b/src/iceberg/catalog/rest/types.h index 6495a6517..8a916bcdb 100644 --- a/src/iceberg/catalog/rest/types.h +++ b/src/iceberg/catalog/rest/types.h @@ -169,12 +169,25 @@ struct ICEBERG_REST_EXPORT CreateTableRequest { /// \brief An opaque token that allows clients to make use of pagination for list APIs. using PageToken = std::string; +/// \brief A storage credential returned by the REST catalog server. +/// +/// Each credential has a prefix (a location prefix like "s3://bucket/path") and +/// a config map containing cloud-provider-specific properties (e.g., temporary +/// access keys). Clients resolve the best credential via longest prefix match +/// against the table's metadata location. +struct ICEBERG_REST_EXPORT StorageCredential { + std::string prefix; + std::unordered_map config; + + bool operator==(const StorageCredential&) const = default; +}; + /// \brief Result body for table create/load/register APIs. struct ICEBERG_REST_EXPORT LoadTableResult { std::string metadata_location; std::shared_ptr metadata; // required std::unordered_map config; - // TODO(Li Feiyang): Add std::shared_ptr storage_credential; + std::vector storage_credentials; /// \brief Validates the LoadTableResult. Status Validate() const { diff --git a/src/iceberg/file_io.h b/src/iceberg/file_io.h index 259da7556..6a9d7c710 100644 --- a/src/iceberg/file_io.h +++ b/src/iceberg/file_io.h @@ -22,6 +22,7 @@ #include #include #include +#include #include "iceberg/iceberg_export.h" #include "iceberg/result.h" @@ -42,6 +43,14 @@ class ICEBERG_EXPORT FileIO { FileIO() = default; virtual ~FileIO() = default; + /// \brief Returns the configuration properties used to initialize this FileIO. + /// + /// Engines that need to configure their own storage access (e.g., for credential + /// vending) can read these properties to obtain the resolved credentials. + const std::unordered_map& properties() const { + return properties_; + } + /// \brief Read the content of the file at the given location. /// /// \param file_location The location of the file to read. @@ -73,6 +82,10 @@ class ICEBERG_EXPORT FileIO { virtual Status DeleteFile(const std::string& file_location) { return NotImplemented("DeleteFile not implemented"); } + + private: + friend class FileIORegistry; + std::unordered_map properties_; }; } // namespace iceberg diff --git a/src/iceberg/file_io_registry.cc b/src/iceberg/file_io_registry.cc index ffba8677a..0609d172b 100644 --- a/src/iceberg/file_io_registry.cc +++ b/src/iceberg/file_io_registry.cc @@ -20,6 +20,8 @@ #include #include +#include "iceberg/util/macros.h" + namespace iceberg { namespace { @@ -55,7 +57,9 @@ Result> FileIORegistry::Load( } factory = it->second; } - return factory(properties); + ICEBERG_ASSIGN_OR_RAISE(auto io, factory(properties)); + io->properties_ = properties; + return io; } } // namespace iceberg diff --git a/src/iceberg/iceberg-config.cmake.in b/src/iceberg/iceberg-config.cmake.in index 787fadcc6..b3f07dad6 100644 --- a/src/iceberg/iceberg-config.cmake.in +++ b/src/iceberg/iceberg-config.cmake.in @@ -32,6 +32,7 @@ @PACKAGE_INIT@ set(ICEBERG_BUILD_STATIC "@ICEBERG_BUILD_STATIC@") +set(ICEBERG_S3 "@ICEBERG_S3@") set(ICEBERG_SYSTEM_DEPENDENCIES "@ICEBERG_SYSTEM_DEPENDENCIES@") include(CMakeFindDependencyMacro) @@ -93,6 +94,35 @@ if(TARGET iceberg::arrow_static) "${arrow_lib_dir}/${CMAKE_STATIC_LIBRARY_PREFIX}arrow_bundled_dependencies${CMAKE_STATIC_LIBRARY_SUFFIX}" ) endforeach() + + # When ICEBERG_S3 is enabled, Arrow's bundled static archive includes the + # AWS SDK C libraries (aws-c-common, etc.) which depend on platform-specific + # system libraries. Without these, downstream consumers get unresolved + # symbols at link time. + # Reference: Arrow's ArrowConfig.cmake.in handles this identically. + # https://github.com/apache/arrow/blob/main/cpp/src/arrow/ArrowConfig.cmake.in + if(ICEBERG_S3) + if(APPLE) + find_library(CORE_FOUNDATION CoreFoundation) + target_link_libraries(Arrow::arrow_bundled_dependencies + INTERFACE ${CORE_FOUNDATION}) + find_library(SECURITY Security) + target_link_libraries(Arrow::arrow_bundled_dependencies + INTERFACE ${SECURITY}) + find_library(NETWORK Network) + target_link_libraries(Arrow::arrow_bundled_dependencies INTERFACE ${NETWORK}) + elseif(WIN32) + target_link_libraries(Arrow::arrow_bundled_dependencies + INTERFACE "winhttp.lib" + "bcrypt.lib" + "wininet.lib" + "userenv.lib" + "version.lib" + "ncrypt.lib" + "Secur32.lib" + "Shlwapi.lib") + endif() + endif() endif() if(TARGET iceberg::parquet_static) diff --git a/src/iceberg/parquet/parquet_reader.cc b/src/iceberg/parquet/parquet_reader.cc index 2fbf8e0a6..cd6f1372b 100644 --- a/src/iceberg/parquet/parquet_reader.cc +++ b/src/iceberg/parquet/parquet_reader.cc @@ -50,13 +50,8 @@ namespace { Result> OpenInputStream( const ReaderOptions& options) { - ::arrow::fs::FileInfo file_info(options.path, ::arrow::fs::FileType::File); - if (options.length) { - file_info.set_size(options.length.value()); - } - auto io = internal::checked_pointer_cast(options.io); - ICEBERG_ARROW_ASSIGN_OR_RETURN(auto input, io->fs()->OpenInputFile(file_info)); + ICEBERG_ASSIGN_OR_RAISE(auto input, io->OpenInputFile(options.path, options.length)); return input; } diff --git a/src/iceberg/parquet/parquet_writer.cc b/src/iceberg/parquet/parquet_writer.cc index a68e9e61e..847b2b22f 100644 --- a/src/iceberg/parquet/parquet_writer.cc +++ b/src/iceberg/parquet/parquet_writer.cc @@ -42,7 +42,7 @@ namespace { Result> OpenOutputStream( const WriterOptions& options) { auto io = internal::checked_pointer_cast(options.io); - ICEBERG_ARROW_ASSIGN_OR_RETURN(auto output, io->fs()->OpenOutputStream(options.path)); + ICEBERG_ASSIGN_OR_RAISE(auto output, io->OpenOutputStream(options.path)); return output; } diff --git a/src/iceberg/table_scan.cc b/src/iceberg/table_scan.cc index ed2ede707..cad2e4195 100644 --- a/src/iceberg/table_scan.cc +++ b/src/iceberg/table_scan.cc @@ -32,7 +32,7 @@ #include "iceberg/snapshot.h" #include "iceberg/table_metadata.h" #include "iceberg/util/macros.h" -#include "iceberg/util/snapshot_util_internal.h" +#include "iceberg/util/snapshot_util.h" #include "iceberg/util/timepoint.h" #include "iceberg/util/type_util.h" diff --git a/src/iceberg/test/CMakeLists.txt b/src/iceberg/test/CMakeLists.txt index eb26b2bea..bcee03c9a 100644 --- a/src/iceberg/test/CMakeLists.txt +++ b/src/iceberg/test/CMakeLists.txt @@ -232,6 +232,7 @@ if(ICEBERG_BUILD_REST) add_rest_iceberg_test(rest_catalog_test SOURCES auth_manager_test.cc + credential_vending_test.cc endpoint_test.cc rest_file_io_test.cc rest_json_serde_test.cc diff --git a/src/iceberg/test/arrow_fs_file_io_test.cc b/src/iceberg/test/arrow_fs_file_io_test.cc index eacda2f75..499d396fe 100644 --- a/src/iceberg/test/arrow_fs_file_io_test.cc +++ b/src/iceberg/test/arrow_fs_file_io_test.cc @@ -17,7 +17,12 @@ * under the License. */ +#include + +#include #include +#include +#include #include #include "iceberg/arrow/arrow_fs_file_io_internal.h" @@ -64,4 +69,57 @@ TEST_F(LocalFileIOTest, DeleteFile) { EXPECT_THAT(del_res, HasErrorMessage("Cannot delete file")); } +// Test OpenInputFile and OpenOutputStream with URI scheme resolution using +// MockFileSystem. +class OpenFileURITest : public ::testing::Test { + protected: + void SetUp() override { + auto mock_fs = std::make_shared<::arrow::fs::internal::MockFileSystem>( + std::chrono::system_clock::now()); + mock_fs_ = mock_fs.get(); + file_io_ = + std::make_unique(std::move(mock_fs)); + } + + ::arrow::fs::internal::MockFileSystem* mock_fs_; + std::unique_ptr file_io_; +}; + +TEST_F(OpenFileURITest, RoundTripWithURIScheme) { + // Write via OpenOutputStream with a URI scheme — the scheme should be stripped. + const std::string data = "round trip data"; + { + auto out = file_io_->OpenOutputStream("mock:///data.bin"); + ASSERT_THAT(out, IsOk()); + ASSERT_TRUE((*out)->Write(data.data(), data.size()).ok()); + ASSERT_TRUE((*out)->Close().ok()); + } + + // Read back via OpenInputFile with the same URI scheme. + { + auto in = file_io_->OpenInputFile("mock:///data.bin"); + ASSERT_THAT(in, IsOk()); + auto buf_result = (*in)->Read(data.size()); + ASSERT_TRUE(buf_result.ok()) << buf_result.status().ToString(); + auto buf = *buf_result; + EXPECT_EQ(std::string(reinterpret_cast(buf->data()), buf->size()), data); + } + + // Also readable via plain path (proves the URI was stripped for storage). + auto read = file_io_->ReadFile("data.bin", std::nullopt); + ASSERT_THAT(read, IsOk()); + EXPECT_THAT(read, HasValue(::testing::Eq(data))); +} + +TEST_F(OpenFileURITest, PlainPathPassesThrough) { + auto out = file_io_->OpenOutputStream("file.txt"); + ASSERT_THAT(out, IsOk()); + ASSERT_TRUE((*out)->Write("plain", 5).ok()); + ASSERT_TRUE((*out)->Close().ok()); + + auto read = file_io_->ReadFile("file.txt", std::nullopt); + ASSERT_THAT(read, IsOk()); + EXPECT_THAT(read, HasValue(::testing::Eq("plain"))); +} + } // namespace iceberg diff --git a/src/iceberg/test/arrow_s3_file_io_test.cc b/src/iceberg/test/arrow_s3_file_io_test.cc index d890ad10e..4bcb20ed1 100644 --- a/src/iceberg/test/arrow_s3_file_io_test.cc +++ b/src/iceberg/test/arrow_s3_file_io_test.cc @@ -23,10 +23,12 @@ #include #include +#include #include #include #include "iceberg/arrow/arrow_file_io.h" +#include "iceberg/arrow/arrow_fs_file_io_internal.h" #include "iceberg/arrow/s3/s3_properties.h" #include "iceberg/test/matchers.h" @@ -181,4 +183,38 @@ TEST_F(ArrowS3FileIOTest, MakeS3FileIOWithTimeouts) { ASSERT_THAT(io_res, IsOk()); } +TEST_F(ArrowS3FileIOTest, OpenInputFileAndOutputStreamWithS3URI) { + RequireIntegrationEnv(); + auto io_res = MakeS3FileIO(PropertiesFromEnv()); + ASSERT_THAT(io_res, IsOk()); + // Cast to ArrowFileSystemFileIO to access OpenInputFile/OpenOutputStream. + auto* fs_io = dynamic_cast(io_res->get()); + ASSERT_NE(fs_io, nullptr); + + auto object_uri = ObjectUri("iceberg_open_file_uri_test.txt"); + const std::string data = "open file URI round trip"; + + // Write via OpenOutputStream with an s3:// URI. + { + auto out = fs_io->OpenOutputStream(object_uri); + ASSERT_THAT(out, IsOk()); + ASSERT_TRUE((*out)->Write(data.data(), data.size()).ok()); + ASSERT_TRUE((*out)->Close().ok()); + } + + // Read back via OpenInputFile with the same s3:// URI. + { + auto in = fs_io->OpenInputFile(object_uri); + ASSERT_THAT(in, IsOk()); + auto buf_result = (*in)->Read(data.size()); + ASSERT_TRUE(buf_result.ok()) << buf_result.status().ToString(); + auto buf = *buf_result; + EXPECT_EQ(std::string(reinterpret_cast(buf->data()), buf->size()), data); + } + + // Clean up. + auto del = fs_io->DeleteFile(object_uri); + EXPECT_THAT(del, IsOk()); +} + } // namespace iceberg::arrow diff --git a/src/iceberg/test/avro_test.cc b/src/iceberg/test/avro_test.cc index 82da97ea3..98563c928 100644 --- a/src/iceberg/test/avro_test.cc +++ b/src/iceberg/test/avro_test.cc @@ -917,7 +917,7 @@ TEST_P(AvroWriterTest, MultipleAvroBlocks) { // Use raw avro-cpp reader to count blocks by tracking previousSync() changes auto mock_io = internal::checked_pointer_cast(file_io_); - auto input = mock_io->fs()->OpenInputFile(temp_avro_file_).ValueOrDie(); + auto input = mock_io->OpenInputFile(temp_avro_file_).value(); auto input_stream = std::make_unique(std::move(input), 1024 * 1024); ::avro::DataFileReader<::avro::GenericDatum> avro_reader(std::move(input_stream)); ::avro::GenericDatum datum(avro_reader.dataSchema()); diff --git a/src/iceberg/test/credential_vending_test.cc b/src/iceberg/test/credential_vending_test.cc new file mode 100644 index 000000000..fb298363c --- /dev/null +++ b/src/iceberg/test/credential_vending_test.cc @@ -0,0 +1,299 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +/// \file credential_vending_test.cc +/// \brief Tests for credential vending: ResolveTableFileIO, FileIO detection, +/// and FileIO registry integration. + +#include +#include +#include + +#include +#include + +#include "iceberg/catalog/rest/rest_file_io.h" +#include "iceberg/catalog/rest/types.h" +#include "iceberg/file_io.h" +#include "iceberg/file_io_registry.h" +#include "iceberg/table_metadata.h" +#include "iceberg/test/matchers.h" + +namespace iceberg { + +// --------------------------------------------------------------------------- +// DetectBuiltinFileIO tests +// --------------------------------------------------------------------------- + +TEST(CredentialVendingFileIOTest, DetectS3SchemeReturnsArrowS3) { + auto result = rest::DetectBuiltinFileIO("s3://bucket/path"); + ASSERT_THAT(result, IsOk()); + EXPECT_EQ(*result, rest::BuiltinFileIOKind::kArrowS3); +} + +TEST(CredentialVendingFileIOTest, DetectLocalPathReturnsArrowLocal) { + auto result = rest::DetectBuiltinFileIO("/tmp/warehouse"); + ASSERT_THAT(result, IsOk()); + EXPECT_EQ(*result, rest::BuiltinFileIOKind::kArrowLocal); +} + +TEST(CredentialVendingFileIOTest, DetectFileSchemeReturnsArrowLocal) { + auto result = rest::DetectBuiltinFileIO("file:///tmp/warehouse"); + ASSERT_THAT(result, IsOk()); + EXPECT_EQ(*result, rest::BuiltinFileIOKind::kArrowLocal); +} + +TEST(CredentialVendingFileIOTest, DetectUnsupportedSchemeReturnsError) { + auto result = rest::DetectBuiltinFileIO("gs://bucket/warehouse"); + EXPECT_THAT(result, IsError(ErrorKind::kNotSupported)); +} + +TEST(CredentialVendingFileIOTest, BuiltinFileIONameForS3) { + EXPECT_EQ(rest::BuiltinFileIOName(rest::BuiltinFileIOKind::kArrowS3), + FileIORegistry::kArrowS3FileIO); +} + +TEST(CredentialVendingFileIOTest, BuiltinFileIONameForLocal) { + EXPECT_EQ(rest::BuiltinFileIOName(rest::BuiltinFileIOKind::kArrowLocal), + FileIORegistry::kArrowLocalFileIO); +} + +// --------------------------------------------------------------------------- +// ResolveTableFileIO tests +// --------------------------------------------------------------------------- + +namespace { + +constexpr std::string_view kTestFileIOImpl = "test.credential-vending.TestFileIO"; + +class DummyFileIO : public FileIO { + public: + Result ReadFile(const std::string&, std::optional) override { + return std::string("dummy"); + } + Status WriteFile(const std::string&, std::string_view) override { return {}; } + Status DeleteFile(const std::string&) override { return {}; } +}; + +rest::LoadTableResult MakeLoadResult( + std::string metadata_location, + std::unordered_map config = {}, + std::vector creds = {}) { + return rest::LoadTableResult{ + .metadata_location = std::move(metadata_location), + .metadata = std::make_shared(TableMetadata{.format_version = 2}), + .config = std::move(config), + .storage_credentials = std::move(creds), + }; +} + +class ResolveTableFileIOTest : public ::testing::Test { + protected: + static void SetUpTestSuite() { + FileIORegistry::Register(std::string(kTestFileIOImpl), + [](const std::unordered_map&) + -> Result> { + return std::make_unique(); + }); + } + + std::shared_ptr catalog_io_ = std::make_shared(); + std::unordered_map catalog_props_ = { + {"io-impl", std::string(kTestFileIOImpl)}, + {"s3.region", "us-east-1"}, + }; + std::string warehouse_ = "file:///tmp/warehouse"; +}; + +} // namespace + +TEST_F(ResolveTableFileIOTest, NoConfigReturnsCatalogIO) { + auto result = rest::ResolveTableFileIO(catalog_io_, catalog_props_, warehouse_, + MakeLoadResult("s3://bucket/meta.json")); + ASSERT_THAT(result, IsOk()); + EXPECT_EQ(result.value().get(), catalog_io_.get()); +} + +TEST_F(ResolveTableFileIOTest, TableConfigMergedIntoProperties) { + auto result = rest::ResolveTableFileIO( + catalog_io_, catalog_props_, warehouse_, + MakeLoadResult("s3://bucket/meta.json", {{"s3.access-key-id", "TABLE_KEY"}})); + ASSERT_THAT(result, IsOk()); + const auto& props = result.value()->properties(); + EXPECT_EQ(props.at("s3.access-key-id"), "TABLE_KEY"); + EXPECT_EQ(props.at("s3.region"), "us-east-1"); +} + +TEST_F(ResolveTableFileIOTest, StorageCredentialsOverrideTableConfig) { + auto result = rest::ResolveTableFileIO( + catalog_io_, catalog_props_, warehouse_, + MakeLoadResult("s3://bucket/warehouse/db/table/meta.json", + {{"s3.access-key-id", "TABLE_KEY"}}, + {{.prefix = "s3://bucket/warehouse/", + .config = {{"s3.access-key-id", "CRED_KEY"}, + {"s3.session-token", "CRED_TOKEN"}}}})); + ASSERT_THAT(result, IsOk()); + const auto& props = result.value()->properties(); + EXPECT_EQ(props.at("s3.access-key-id"), "CRED_KEY"); + EXPECT_EQ(props.at("s3.session-token"), "CRED_TOKEN"); + EXPECT_EQ(props.at("s3.region"), "us-east-1"); +} + +TEST_F(ResolveTableFileIOTest, LongestPrefixMatchWins) { + auto result = rest::ResolveTableFileIO( + catalog_io_, catalog_props_, warehouse_, + MakeLoadResult( + "s3://bucket/warehouse/db/table/meta.json", {}, + {{.prefix = "s3://bucket/", .config = {{"s3.access-key-id", "SHORT"}}}, + {.prefix = "s3://bucket/warehouse/db/", + .config = {{"s3.access-key-id", "LONG"}}}, + {.prefix = "s3://other/", .config = {{"s3.access-key-id", "WRONG"}}}})); + ASSERT_THAT(result, IsOk()); + EXPECT_EQ(result.value()->properties().at("s3.access-key-id"), "LONG"); +} + +TEST_F(ResolveTableFileIOTest, NoMatchingCredentialIgnored) { + auto result = rest::ResolveTableFileIO( + catalog_io_, catalog_props_, warehouse_, + MakeLoadResult( + "s3://my-bucket/meta.json", {}, + {{.prefix = "s3://other-bucket/", .config = {{"s3.access-key-id", "WRONG"}}}})); + ASSERT_THAT(result, IsOk()); + EXPECT_FALSE(result.value()->properties().contains("s3.access-key-id")); +} + +TEST_F(ResolveTableFileIOTest, EmptyMetadataLocationIgnoresCredentials) { + auto result = rest::ResolveTableFileIO( + catalog_io_, catalog_props_, warehouse_, + MakeLoadResult("", // empty metadata location + {{"s3.access-key-id", "TABLE_KEY"}}, + {{.prefix = "", .config = {{"s3.access-key-id", "WRONG"}}}})); + ASSERT_THAT(result, IsOk()); + // Table config should be merged, but the credential should NOT match + EXPECT_EQ(result.value()->properties().at("s3.access-key-id"), "TABLE_KEY"); +} + +TEST_F(ResolveTableFileIOTest, FileIOPropertiesPopulated) { + auto result = rest::ResolveTableFileIO( + catalog_io_, catalog_props_, warehouse_, + MakeLoadResult("s3://bucket/meta.json", + {{"s3.access-key-id", "KEY"}, {"s3.secret-access-key", "SECRET"}})); + ASSERT_THAT(result, IsOk()); + EXPECT_EQ(result.value()->properties().at("s3.access-key-id"), "KEY"); + EXPECT_EQ(result.value()->properties().at("s3.secret-access-key"), "SECRET"); +} + +TEST_F(ResolveTableFileIOTest, TableConfigOverridesIOImpl) { + // Register a second FileIO impl to verify io-impl override from table config + const std::string alt_impl = "test.credential-vending.AltFileIO"; + FileIORegistry::Register( + alt_impl, + [](const std::unordered_map&) + -> Result> { return std::make_unique(); }); + + auto result = rest::ResolveTableFileIO( + catalog_io_, catalog_props_, warehouse_, + MakeLoadResult("s3://bucket/meta.json", + {{"io-impl", alt_impl}, {"s3.access-key-id", "KEY"}})); + ASSERT_THAT(result, IsOk()); + // Should use the alt impl and still have the merged properties + EXPECT_NE(result.value().get(), catalog_io_.get()); + EXPECT_EQ(result.value()->properties().at("io-impl"), alt_impl); + EXPECT_EQ(result.value()->properties().at("s3.access-key-id"), "KEY"); +} + +TEST_F(ResolveTableFileIOTest, DetectsFileIOFromWarehouseScheme) { + // No io-impl in config, but warehouse is s3:// — should detect S3 FileIO + const std::string s3_impl = std::string(FileIORegistry::kArrowS3FileIO); + FileIORegistry::Register( + s3_impl, + [](const std::unordered_map&) + -> Result> { return std::make_unique(); }); + + std::unordered_map no_io_impl_props = { + {"s3.region", "us-east-1"}, + }; + auto result = + rest::ResolveTableFileIO(catalog_io_, no_io_impl_props, "s3://warehouse-bucket/", + MakeLoadResult("s3://warehouse-bucket/db/meta.json", + {{"s3.access-key-id", "KEY"}})); + ASSERT_THAT(result, IsOk()); + EXPECT_NE(result.value().get(), catalog_io_.get()); + EXPECT_EQ(result.value()->properties().at("s3.access-key-id"), "KEY"); +} + +TEST_F(ResolveTableFileIOTest, DetectsFileIOFromMetadataLocationWhenNoWarehouse) { + // When no warehouse or io-impl, detect from metadata_location scheme (like Python) + const std::string s3_impl = std::string(FileIORegistry::kArrowS3FileIO); + FileIORegistry::Register( + s3_impl, + [](const std::unordered_map&) + -> Result> { return std::make_unique(); }); + + std::unordered_map no_io_impl_props = { + {"s3.region", "us-east-1"}, + }; + auto result = rest::ResolveTableFileIO( + catalog_io_, no_io_impl_props, "", // empty warehouse + MakeLoadResult("s3://bucket/meta.json", {{"s3.access-key-id", "KEY"}})); + ASSERT_THAT(result, IsOk()); + // Should create a new S3 FileIO, not fall back to catalog_io + EXPECT_NE(result.value().get(), catalog_io_.get()); + EXPECT_EQ(result.value()->properties().at("s3.access-key-id"), "KEY"); +} + +TEST_F(ResolveTableFileIOTest, FallsToCatalogIOWhenNothingDetectable) { + std::unordered_map no_io_impl_props = { + {"s3.region", "us-east-1"}, + }; + auto result = rest::ResolveTableFileIO( + catalog_io_, no_io_impl_props, "", // empty warehouse + MakeLoadResult("", {{"s3.access-key-id", "KEY"}})); // empty metadata_location + ASSERT_THAT(result, IsOk()); + EXPECT_EQ(result.value().get(), catalog_io_.get()); +} + +// --------------------------------------------------------------------------- +// FileIORegistry tests +// --------------------------------------------------------------------------- + +TEST(CredentialVendingFileIOTest, RegistryLoadUnknownImplReturnsNotFound) { + auto result = FileIORegistry::Load("nonexistent.impl", {}); + EXPECT_THAT(result, IsError(ErrorKind::kNotFound)); +} + +TEST(CredentialVendingFileIOTest, RegistryPopulatesFileIOProperties) { + const std::string impl_name = "test.credential-vending.PropsFileIO"; + FileIORegistry::Register( + impl_name, + [](const std::unordered_map&) + -> Result> { return std::make_unique(); }); + + std::unordered_map test_props = { + {"s3.access-key-id", "AKIA_TEST"}, + {"s3.secret-access-key", "SECRET_TEST"}, + }; + + auto result = FileIORegistry::Load(impl_name, test_props); + ASSERT_THAT(result, IsOk()); + EXPECT_EQ(result.value()->properties().at("s3.access-key-id"), "AKIA_TEST"); + EXPECT_EQ(result.value()->properties().at("s3.secret-access-key"), "SECRET_TEST"); +} + +} // namespace iceberg diff --git a/src/iceberg/test/file_scan_task_test.cc b/src/iceberg/test/file_scan_task_test.cc index ba0c41b37..abf37b61d 100644 --- a/src/iceberg/test/file_scan_task_test.cc +++ b/src/iceberg/test/file_scan_task_test.cc @@ -69,7 +69,7 @@ class FileScanTaskTest : public TempFileTestBase { .ValueOrDie(); auto io = internal::checked_cast(*file_io_); - auto outfile = io.fs()->OpenOutputStream(temp_parquet_file_).ValueOrDie(); + auto outfile = io.OpenOutputStream(temp_parquet_file_).value(); ASSERT_TRUE(::parquet::arrow::WriteTable(*table, ::arrow::default_memory_pool(), outfile, chunk_size) @@ -85,7 +85,7 @@ class FileScanTaskTest : public TempFileTestBase { auto empty_table = ::arrow::Table::FromRecordBatches(arrow_schema, {}).ValueOrDie(); auto io = internal::checked_cast(*file_io_); - auto outfile = io.fs()->OpenOutputStream(temp_parquet_file_).ValueOrDie(); + auto outfile = io.OpenOutputStream(temp_parquet_file_).value(); ASSERT_TRUE(::parquet::arrow::WriteTable(*empty_table, ::arrow::default_memory_pool(), outfile, 1024) .ok()); diff --git a/src/iceberg/test/parquet_test.cc b/src/iceberg/test/parquet_test.cc index 0d983db58..a6eebc0cb 100644 --- a/src/iceberg/test/parquet_test.cc +++ b/src/iceberg/test/parquet_test.cc @@ -173,7 +173,7 @@ class ParquetReaderTest : public ::testing::Test { .ValueOrDie(); auto io = internal::checked_cast(*file_io_); - auto outfile = io.fs()->OpenOutputStream(temp_parquet_file_).ValueOrDie(); + auto outfile = io.OpenOutputStream(temp_parquet_file_).value(); ASSERT_TRUE(::parquet::arrow::WriteTable(*table, ::arrow::default_memory_pool(), outfile, /*chunk_size=*/2) @@ -278,7 +278,7 @@ TEST_F(ParquetReaderTest, ReadSplit) { // Read split offsets auto io = internal::checked_cast(*file_io_); - auto input_stream = io.fs()->OpenInputFile(temp_parquet_file_).ValueOrDie(); + auto input_stream = io.OpenInputFile(temp_parquet_file_).value(); auto metadata = ::parquet::ReadMetaData(input_stream); std::vector split_offsets; for (int i = 0; i < metadata->num_row_groups(); ++i) { diff --git a/src/iceberg/test/resources/iceberg-rest-fixture/docker-compose.yml b/src/iceberg/test/resources/iceberg-rest-fixture/docker-compose.yml index 0a5c37ecb..1bef42892 100644 --- a/src/iceberg/test/resources/iceberg-rest-fixture/docker-compose.yml +++ b/src/iceberg/test/resources/iceberg-rest-fixture/docker-compose.yml @@ -22,5 +22,8 @@ services: - CATALOG_CATALOG__IMPL=org.apache.iceberg.jdbc.JdbcCatalog - CATALOG_URI=jdbc:sqlite:file:/tmp/iceberg_rest_mode=memory - CATALOG_WAREHOUSE=file:///tmp/iceberg_warehouse + - CATALOG_INCLUDE__CREDENTIALS=true + - CATALOG_S3_ACCESS__KEY__ID=dummy-access-key + - CATALOG_S3_SECRET__ACCESS__KEY=dummy-secret-key ports: - "8181:8181" diff --git a/src/iceberg/test/rest_catalog_integration_test.cc b/src/iceberg/test/rest_catalog_integration_test.cc index 3de7e722a..b1c5f9f96 100644 --- a/src/iceberg/test/rest_catalog_integration_test.cc +++ b/src/iceberg/test/rest_catalog_integration_test.cc @@ -486,4 +486,66 @@ TEST_F(RestCatalogIntegrationTest, LoadTableWithSnapshotModeRefs) { EXPECT_FALSE(loaded->metadata()->schemas.empty()); } +// -- Credential vending -- + +TEST_F(RestCatalogIntegrationTest, LoadTableVendsCredentials) { + Namespace ns{.levels = {"test_load_vend"}}; + ICEBERG_UNWRAP_OR_FAIL(auto catalog, CreateCatalogAndNamespace(ns)); + + TableIdentifier table_id{.ns = ns, .name = "vend_table"}; + ASSERT_THAT(CreateDefaultTable(catalog, table_id), IsOk()); + + ICEBERG_UNWRAP_OR_FAIL(auto loaded, catalog->LoadTable(table_id)); + const auto& props = loaded->io()->properties(); + // Values must match the dummy credentials configured in docker-compose.yml + EXPECT_EQ(props.at("s3.access-key-id"), "dummy-access-key"); + EXPECT_EQ(props.at("s3.secret-access-key"), "dummy-secret-key"); +} + +TEST_F(RestCatalogIntegrationTest, CreateTableVendsCredentials) { + Namespace ns{.levels = {"test_create_vend"}}; + ICEBERG_UNWRAP_OR_FAIL(auto catalog, CreateCatalogAndNamespace(ns)); + + TableIdentifier table_id{.ns = ns, .name = "created_vend"}; + ICEBERG_UNWRAP_OR_FAIL(auto table, CreateDefaultTable(catalog, table_id)); + + const auto& props = table->io()->properties(); + EXPECT_EQ(props.at("s3.access-key-id"), "dummy-access-key"); + EXPECT_EQ(props.at("s3.secret-access-key"), "dummy-secret-key"); +} + +TEST_F(RestCatalogIntegrationTest, RegisterTableVendsCredentials) { + Namespace ns{.levels = {"test_register_vend"}}; + ICEBERG_UNWRAP_OR_FAIL(auto catalog, CreateCatalogAndNamespace(ns)); + + TableIdentifier orig_id{.ns = ns, .name = "orig_table"}; + ICEBERG_UNWRAP_OR_FAIL(auto orig, CreateDefaultTable(catalog, orig_id)); + std::string metadata_location(orig->metadata_file_location()); + + ASSERT_THAT(catalog->DropTable(orig_id, /*purge=*/false), IsOk()); + + TableIdentifier new_id{.ns = ns, .name = "registered_vend"}; + ICEBERG_UNWRAP_OR_FAIL(auto registered, + catalog->RegisterTable(new_id, metadata_location)); + + const auto& props = registered->io()->properties(); + EXPECT_EQ(props.at("s3.access-key-id"), "dummy-access-key"); + EXPECT_EQ(props.at("s3.secret-access-key"), "dummy-secret-key"); +} + +TEST_F(RestCatalogIntegrationTest, StageCreateTableVendsCredentials) { + Namespace ns{.levels = {"test_stage_vend"}}; + ICEBERG_UNWRAP_OR_FAIL(auto catalog, CreateCatalogAndNamespace(ns)); + + TableIdentifier table_id{.ns = ns, .name = "staged_vend"}; + ICEBERG_UNWRAP_OR_FAIL( + auto txn, + catalog->StageCreateTable(table_id, DefaultSchema(), PartitionSpec::Unpartitioned(), + SortOrder::Unsorted(), "", {})); + + const auto& props = txn->table()->io()->properties(); + EXPECT_EQ(props.at("s3.access-key-id"), "dummy-access-key"); + EXPECT_EQ(props.at("s3.secret-access-key"), "dummy-secret-key"); +} + } // namespace iceberg::rest diff --git a/src/iceberg/test/rest_file_io_test.cc b/src/iceberg/test/rest_file_io_test.cc index e9131da3f..3f899468e 100644 --- a/src/iceberg/test/rest_file_io_test.cc +++ b/src/iceberg/test/rest_file_io_test.cc @@ -59,9 +59,20 @@ TEST(RestFileIOTest, DetectBuiltinKindRejectsUnsupportedScheme) { EXPECT_THAT(result, HasErrorMessage("not supported for automatic FileIO resolution")); } -TEST(RestFileIOTest, MakeCatalogFileIOMissingImplAndWarehouse) { +TEST(RestFileIOTest, MakeCatalogFileIOFallsBackToLocalWithoutImplOrWarehouse) { + bool local_factory_called = false; + FileIORegistry::Register( + std::string(FileIORegistry::kArrowLocalFileIO), + [&local_factory_called]( + const std::unordered_map& /*properties*/) + -> Result> { + local_factory_called = true; + return std::make_unique(); + }); + auto result = MakeCatalogFileIO(RestCatalogProperties::default_properties()); - EXPECT_THAT(result, IsError(ErrorKind::kInvalidArgument)); + ASSERT_THAT(result, IsOk()); + EXPECT_TRUE(local_factory_called); } TEST(RestFileIOTest, MakeCatalogFileIORejectsIncompatibleWarehouse) { diff --git a/src/iceberg/test/rest_json_serde_test.cc b/src/iceberg/test/rest_json_serde_test.cc index 9da052e6a..bb1567eb4 100644 --- a/src/iceberg/test/rest_json_serde_test.cc +++ b/src/iceberg/test/rest_json_serde_test.cc @@ -1107,7 +1107,17 @@ INSTANTIATE_TEST_SUITE_P( .model = {.metadata_location = "s3://bucket/metadata/v1.json", .metadata = MakeSimpleTableMetadata(), .config = {{"warehouse", "s3://bucket/warehouse"}, - {"foo", "bar"}}}}), + {"foo", "bar"}}}}, + // With storage-credentials + LoadTableResultParam{ + .test_name = "WithStorageCredentials", + .expected_json_str = + R"({"metadata":{"current-schema-id":1,"current-snapshot-id":null,"default-sort-order-id":0,"default-spec-id":0,"format-version":2,"last-column-id":1,"last-partition-id":0,"last-sequence-number":0,"last-updated-ms":0,"location":"s3://bucket/test","metadata-log":[],"partition-specs":[{"fields":[],"spec-id":0}],"partition-statistics":[],"properties":{},"refs":{},"schemas":[{"fields":[{"id":1,"name":"id","required":true,"type":"int"}],"schema-id":1,"type":"struct"}],"snapshot-log":[],"snapshots":[],"sort-orders":[{"fields":[],"order-id":0}],"statistics":[],"table-uuid":"test-uuid-1234"},"storage-credentials":[{"config":{"s3.access-key-id":"AKIA1234","s3.secret-access-key":"secret"},"prefix":"s3://bucket/"}]})", + .model = {.metadata = MakeSimpleTableMetadata(), + .storage_credentials = + {{.prefix = "s3://bucket/", + .config = {{"s3.access-key-id", "AKIA1234"}, + {"s3.secret-access-key", "secret"}}}}}}), [](const ::testing::TestParamInfo& info) { return info.param.test_name; }); @@ -1136,7 +1146,16 @@ INSTANTIATE_TEST_SUITE_P( .json_str = R"({"metadata":{"format-version":2,"table-uuid":"test-uuid-1234","location":"s3://bucket/test","last-sequence-number":0,"last-updated-ms":0,"last-column-id":1,"schemas":[{"type":"struct","schema-id":1,"fields":[{"id":1,"name":"id","type":"int","required":true}]}],"current-schema-id":1,"partition-specs":[{"spec-id":0,"fields":[]}],"default-spec-id":0,"last-partition-id":0,"sort-orders":[{"order-id":0,"fields":[]}],"default-sort-order-id":0,"properties":{}},"config":{"warehouse":"s3://bucket/warehouse"}})", .expected_model = {.metadata = MakeSimpleTableMetadata(), - .config = {{"warehouse", "s3://bucket/warehouse"}}}}), + .config = {{"warehouse", "s3://bucket/warehouse"}}}}, + // With storage-credentials + LoadTableResultDeserializeParam{ + .test_name = "WithStorageCredentials", + .json_str = + R"({"metadata":{"format-version":2,"table-uuid":"test-uuid-1234","location":"s3://bucket/test","last-sequence-number":0,"last-updated-ms":0,"last-column-id":1,"schemas":[{"type":"struct","schema-id":1,"fields":[{"id":1,"name":"id","type":"int","required":true}]}],"current-schema-id":1,"partition-specs":[{"spec-id":0,"fields":[]}],"default-spec-id":0,"last-partition-id":0,"sort-orders":[{"order-id":0,"fields":[]}],"default-sort-order-id":0,"properties":{}},"storage-credentials":[{"prefix":"s3://bucket/","config":{"s3.access-key-id":"AKIA1234"}}]})", + .expected_model = {.metadata = MakeSimpleTableMetadata(), + .storage_credentials = {{.prefix = "s3://bucket/", + .config = {{"s3.access-key-id", + "AKIA1234"}}}}}}), [](const ::testing::TestParamInfo& info) { return info.param.test_name; }); @@ -1175,7 +1194,13 @@ INSTANTIATE_TEST_SUITE_P( LoadTableResultInvalidParam{ .test_name = "InvalidMetadataContent", .invalid_json_str = R"({"metadata":{"format-version":"invalid"}})", - .expected_error_message = "type must be number, but is string"}), + .expected_error_message = "type must be number, but is string"}, + // Storage credential missing required prefix field + LoadTableResultInvalidParam{ + .test_name = "StorageCredentialMissingPrefix", + .invalid_json_str = + R"({"storage-credentials":[{"config":{"key":"val"}}],"metadata":{"format-version":2,"table-uuid":"test","location":"s3://test","last-sequence-number":0,"last-column-id":1,"last-updated-ms":0,"schemas":[{"type":"struct","schema-id":1,"fields":[{"id":1,"name":"id","type":"int","required":true}]}],"current-schema-id":1,"partition-specs":[{"spec-id":0,"fields":[]}],"default-spec-id":0,"last-partition-id":0,"sort-orders":[{"order-id":0,"fields":[]}],"default-sort-order-id":0}})", + .expected_error_message = "Missing 'prefix'"}), [](const ::testing::TestParamInfo& info) { return info.param.test_name; }); diff --git a/src/iceberg/test/snapshot_util_test.cc b/src/iceberg/test/snapshot_util_test.cc index a47b403da..7b7cf8c00 100644 --- a/src/iceberg/test/snapshot_util_test.cc +++ b/src/iceberg/test/snapshot_util_test.cc @@ -17,6 +17,8 @@ * under the License. */ +#include "iceberg/util/snapshot_util.h" + #include #include #include @@ -33,7 +35,6 @@ #include "iceberg/test/matchers.h" #include "iceberg/test/mock_catalog.h" #include "iceberg/test/mock_io.h" -#include "iceberg/util/snapshot_util_internal.h" #include "iceberg/util/timepoint.h" namespace iceberg { diff --git a/src/iceberg/update/expire_snapshots.cc b/src/iceberg/update/expire_snapshots.cc index 722ae7a42..780befb4e 100644 --- a/src/iceberg/update/expire_snapshots.cc +++ b/src/iceberg/update/expire_snapshots.cc @@ -33,7 +33,7 @@ #include "iceberg/transaction.h" #include "iceberg/util/error_collector.h" #include "iceberg/util/macros.h" -#include "iceberg/util/snapshot_util_internal.h" +#include "iceberg/util/snapshot_util.h" namespace iceberg { diff --git a/src/iceberg/update/set_snapshot.cc b/src/iceberg/update/set_snapshot.cc index 79662890b..0054c9d32 100644 --- a/src/iceberg/update/set_snapshot.cc +++ b/src/iceberg/update/set_snapshot.cc @@ -28,7 +28,7 @@ #include "iceberg/transaction.h" #include "iceberg/util/error_collector.h" #include "iceberg/util/macros.h" -#include "iceberg/util/snapshot_util_internal.h" +#include "iceberg/util/snapshot_util.h" #include "iceberg/util/timepoint.h" namespace iceberg { diff --git a/src/iceberg/update/snapshot_update.cc b/src/iceberg/update/snapshot_update.cc index 3e5792667..cf982cb51 100644 --- a/src/iceberg/update/snapshot_update.cc +++ b/src/iceberg/update/snapshot_update.cc @@ -33,7 +33,7 @@ #include "iceberg/table.h" #include "iceberg/transaction.h" #include "iceberg/util/macros.h" -#include "iceberg/util/snapshot_util_internal.h" +#include "iceberg/util/snapshot_util.h" #include "iceberg/util/string_util.h" #include "iceberg/util/uuid.h" diff --git a/src/iceberg/update/update_snapshot_reference.cc b/src/iceberg/update/update_snapshot_reference.cc index 908962ecd..4348d91f9 100644 --- a/src/iceberg/update/update_snapshot_reference.cc +++ b/src/iceberg/update/update_snapshot_reference.cc @@ -28,7 +28,7 @@ #include "iceberg/table_metadata.h" #include "iceberg/transaction.h" #include "iceberg/util/macros.h" -#include "iceberg/util/snapshot_util_internal.h" +#include "iceberg/util/snapshot_util.h" namespace iceberg { diff --git a/src/iceberg/util/snapshot_util.cc b/src/iceberg/util/snapshot_util.cc index 49019408b..dbd4572e8 100644 --- a/src/iceberg/util/snapshot_util.cc +++ b/src/iceberg/util/snapshot_util.cc @@ -17,6 +17,8 @@ * under the License. */ +#include "iceberg/util/snapshot_util.h" + #include #include "iceberg/schema.h" @@ -24,7 +26,6 @@ #include "iceberg/table.h" #include "iceberg/table_metadata.h" #include "iceberg/util/macros.h" -#include "iceberg/util/snapshot_util_internal.h" #include "iceberg/util/timepoint.h" #include "iceberg/util/uuid.h" diff --git a/src/iceberg/util/snapshot_util_internal.h b/src/iceberg/util/snapshot_util.h similarity index 100% rename from src/iceberg/util/snapshot_util_internal.h rename to src/iceberg/util/snapshot_util.h