Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
19 changes: 19 additions & 0 deletions src/iceberg/arrow/arrow_fs_file_io.cc
Original file line number Diff line number Diff line change
Expand Up @@ -79,6 +79,25 @@ Status ArrowFileSystemFileIO::WriteFile(const std::string& file_location,
return {};
}

Result<std::shared_ptr<::arrow::io::RandomAccessFile>>
ArrowFileSystemFileIO::OpenInputFile(const std::string& file_location,
std::optional<size_t> length) {
ICEBERG_ASSIGN_OR_RAISE(auto path, ResolvePath(file_location));
::arrow::fs::FileInfo file_info(path, ::arrow::fs::FileType::File);
if (length.has_value()) {
file_info.set_size(length.value());
}
ICEBERG_ARROW_ASSIGN_OR_RETURN(auto file, arrow_fs_->OpenInputFile(file_info));
return file;
}

Result<std::shared_ptr<::arrow::io::OutputStream>>
ArrowFileSystemFileIO::OpenOutputStream(const std::string& file_location) {
ICEBERG_ASSIGN_OR_RAISE(auto path, ResolvePath(file_location));
ICEBERG_ARROW_ASSIGN_OR_RETURN(auto file, arrow_fs_->OpenOutputStream(path));
return file;
}

/// \brief Delete a file at the given location.
Status ArrowFileSystemFileIO::DeleteFile(const std::string& file_location) {
ICEBERG_ASSIGN_OR_RAISE(auto path, ResolvePath(file_location));
Expand Down
23 changes: 23 additions & 0 deletions src/iceberg/arrow/arrow_fs_file_io_internal.h
Original file line number Diff line number Diff line change
Expand Up @@ -20,8 +20,10 @@
#pragma once

#include <memory>
#include <optional>

#include <arrow/filesystem/filesystem.h>
#include <arrow/io/interfaces.h>

#include "iceberg/file_io.h"
#include "iceberg/iceberg_bundle_export.h"
Expand Down Expand Up @@ -52,6 +54,27 @@ class ICEBERG_BUNDLE_EXPORT ArrowFileSystemFileIO : public FileIO {
/// \brief Delete a file at the given location.
Status DeleteFile(const std::string& file_location) override;

/// \brief Open an input file for reading, resolving any URI scheme in the path.
///
/// This method resolves URI schemes (e.g., "s3://bucket/key" -> "bucket/key")
/// before opening the file. Use this instead of calling fs()->OpenInputFile()
/// directly to ensure URI resolution is applied consistently.
///
/// \param file_location The file location, which may contain a URI scheme.
/// \param length Optional file size hint for the Arrow filesystem.
Result<std::shared_ptr<::arrow::io::RandomAccessFile>> OpenInputFile(
const std::string& file_location, std::optional<size_t> length = std::nullopt);

/// \brief Open an output stream for writing, resolving any URI scheme in the path.
///
/// This method resolves URI schemes (e.g., "s3://bucket/key" -> "bucket/key")
/// before opening the stream. Use this instead of calling fs()->OpenOutputStream()
/// directly to ensure URI resolution is applied consistently.
///
/// \param file_location The file location, which may contain a URI scheme.
Result<std::shared_ptr<::arrow::io::OutputStream>> OpenOutputStream(
const std::string& file_location);

/// \brief Get the Arrow file system.
const std::shared_ptr<::arrow::fs::FileSystem>& fs() const { return arrow_fs_; }

Expand Down
8 changes: 1 addition & 7 deletions src/iceberg/avro/avro_reader.cc
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,6 @@

#include <arrow/array/builder_base.h>
#include <arrow/c/bridge.h>
#include <arrow/filesystem/filesystem.h>
#include <arrow/record_batch.h>
#include <arrow/result.h>
#include <arrow/type.h>
Expand Down Expand Up @@ -51,13 +50,8 @@ namespace {

Result<std::unique_ptr<AvroInputStream>> CreateInputStream(const ReaderOptions& options,
int64_t buffer_size) {
::arrow::fs::FileInfo file_info(options.path, ::arrow::fs::FileType::File);
if (options.length) {
file_info.set_size(options.length.value());
}

auto io = internal::checked_pointer_cast<arrow::ArrowFileSystemFileIO>(options.io);
ICEBERG_ARROW_ASSIGN_OR_RETURN(auto file, io->fs()->OpenInputFile(file_info));
ICEBERG_ASSIGN_OR_RAISE(auto file, io->OpenInputFile(options.path, options.length));
return std::make_unique<AvroInputStream>(file, buffer_size);
}

Expand Down
2 changes: 1 addition & 1 deletion src/iceberg/avro/avro_writer.cc
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ namespace {
Result<std::unique_ptr<AvroOutputStream>> CreateOutputStream(const WriterOptions& options,
int64_t buffer_size) {
auto io = internal::checked_pointer_cast<arrow::ArrowFileSystemFileIO>(options.io);
ICEBERG_ARROW_ASSIGN_OR_RETURN(auto output, io->fs()->OpenOutputStream(options.path));
ICEBERG_ASSIGN_OR_RAISE(auto output, io->OpenOutputStream(options.path));
return std::make_unique<AvroOutputStream>(output, buffer_size);
}

Expand Down
3 changes: 3 additions & 0 deletions src/iceberg/catalog/rest/constant.h
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,9 @@ inline const std::string kMimeTypeFormUrlEncoded = "application/x-www-form-urlen
inline const std::string kUserAgentPrefix = "iceberg-cpp/";
inline const std::string kUserAgent = "iceberg-cpp/" ICEBERG_VERSION_STRING;

inline const std::string kHeaderAccessDelegation = "X-Iceberg-Access-Delegation";
inline const std::string kAccessDelegationVendedCredentials = "vended-credentials";

inline const std::string kQueryParamParent = "parent";
inline const std::string kQueryParamPageToken = "page_token";

Expand Down
34 changes: 34 additions & 0 deletions src/iceberg/catalog/rest/json_serde.cc
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,8 @@ constexpr std::string_view kStack = "stack";
constexpr std::string_view kError = "error";
constexpr std::string_view kIdentifier = "identifier";
constexpr std::string_view kRequirements = "requirements";
constexpr std::string_view kStorageCredentials = "storage-credentials";
constexpr std::string_view kPrefix = "prefix";
constexpr std::string_view kAccessToken = "access_token";
constexpr std::string_view kTokenType = "token_type";
constexpr std::string_view kExpiresIn = "expires_in";
Expand Down Expand Up @@ -217,12 +219,35 @@ Result<RenameTableRequest> RenameTableRequestFromJson(const nlohmann::json& json
return request;
}

// StorageCredential serialization
nlohmann::json ToJson(const StorageCredential& cred) {
nlohmann::json json;
json[kPrefix] = cred.prefix;
json[kConfig] = cred.config;
return json;
}

Result<StorageCredential> StorageCredentialFromJson(const nlohmann::json& json) {
StorageCredential cred;
ICEBERG_ASSIGN_OR_RAISE(cred.prefix, GetJsonValue<std::string>(json, kPrefix));
ICEBERG_ASSIGN_OR_RAISE(cred.config,
GetJsonValueOrDefault<decltype(cred.config)>(json, kConfig));
return cred;
}

// LoadTableResult (used by CreateTableResponse, LoadTableResponse)
nlohmann::json ToJson(const LoadTableResult& result) {
nlohmann::json json;
SetOptionalStringField(json, kMetadataLocation, result.metadata_location);
json[kMetadata] = ToJson(*result.metadata);
SetContainerField(json, kConfig, result.config);
if (!result.storage_credentials.empty()) {
nlohmann::json creds_json = nlohmann::json::array();
for (const auto& cred : result.storage_credentials) {
creds_json.push_back(ToJson(cred));
}
json[kStorageCredentials] = std::move(creds_json);
}
return json;
}

Expand All @@ -235,6 +260,14 @@ Result<LoadTableResult> LoadTableResultFromJson(const nlohmann::json& json) {
ICEBERG_ASSIGN_OR_RAISE(result.metadata, TableMetadataFromJson(metadata_json));
ICEBERG_ASSIGN_OR_RAISE(result.config,
GetJsonValueOrDefault<decltype(result.config)>(json, kConfig));
if (json.contains(kStorageCredentials)) {
ICEBERG_ASSIGN_OR_RAISE(auto creds_json,
GetJsonValue<nlohmann::json>(json, kStorageCredentials));
for (const auto& cred_json : creds_json) {
ICEBERG_ASSIGN_OR_RAISE(auto cred, StorageCredentialFromJson(cred_json));
result.storage_credentials.push_back(std::move(cred));
}
}
ICEBERG_RETURN_UNEXPECTED(result.Validate());
return result;
}
Expand Down Expand Up @@ -528,5 +561,6 @@ ICEBERG_DEFINE_FROM_JSON(CreateTableRequest)
ICEBERG_DEFINE_FROM_JSON(CommitTableRequest)
ICEBERG_DEFINE_FROM_JSON(CommitTableResponse)
ICEBERG_DEFINE_FROM_JSON(OAuthTokenResponse)
ICEBERG_DEFINE_FROM_JSON(StorageCredential)

} // namespace iceberg::rest
1 change: 1 addition & 0 deletions src/iceberg/catalog/rest/json_serde_internal.h
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,7 @@ ICEBERG_DECLARE_JSON_SERDE(CreateTableRequest)
ICEBERG_DECLARE_JSON_SERDE(CommitTableRequest)
ICEBERG_DECLARE_JSON_SERDE(CommitTableResponse)
ICEBERG_DECLARE_JSON_SERDE(OAuthTokenResponse)
ICEBERG_DECLARE_JSON_SERDE(StorageCredential)

#undef ICEBERG_DECLARE_JSON_SERDE

Expand Down
33 changes: 26 additions & 7 deletions src/iceberg/catalog/rest/rest_catalog.cc
Original file line number Diff line number Diff line change
Expand Up @@ -166,7 +166,9 @@ Result<std::shared_ptr<RestCatalog>> RestCatalog::Make(
// Get snapshot loading mode
ICEBERG_ASSIGN_OR_RAISE(auto snapshot_mode, final_config.SnapshotLoadingMode());

auto client = std::make_unique<HttpClient>(final_config.ExtractHeaders());
auto default_headers = final_config.ExtractHeaders();
default_headers.emplace(kHeaderAccessDelegation, kAccessDelegationVendedCredentials);
auto client = std::make_unique<HttpClient>(std::move(default_headers));
ICEBERG_ASSIGN_OR_RAISE(auto catalog_session,
auth_manager->CatalogSession(*client, final_config.configs()));

Expand Down Expand Up @@ -361,8 +363,13 @@ Result<std::shared_ptr<Table>> RestCatalog::CreateTable(
ICEBERG_ASSIGN_OR_RAISE(auto result,
CreateTableInternal(identifier, schema, spec, order, location,
properties, /*stage_create=*/false));
ICEBERG_ASSIGN_OR_RAISE(
auto table_io,
ResolveTableFileIO(file_io_, config_.configs(),
config_.Get(RestCatalogProperties::kWarehouse), result));
return Table::Make(identifier, std::move(result.metadata),
std::move(result.metadata_location), file_io_, shared_from_this());
std::move(result.metadata_location), std::move(table_io),
shared_from_this());
}

Result<std::shared_ptr<Table>> RestCatalog::UpdateTable(
Expand Down Expand Up @@ -404,10 +411,14 @@ Result<std::shared_ptr<Transaction>> RestCatalog::StageCreateTable(
ICEBERG_ASSIGN_OR_RAISE(auto result,
CreateTableInternal(identifier, schema, spec, order, location,
properties, /*stage_create=*/true));
ICEBERG_ASSIGN_OR_RAISE(
auto table_io,
ResolveTableFileIO(file_io_, config_.configs(),
config_.Get(RestCatalogProperties::kWarehouse), result));
ICEBERG_ASSIGN_OR_RAISE(auto staged_table,
StagedTable::Make(identifier, std::move(result.metadata),
std::move(result.metadata_location), file_io_,
shared_from_this()));
std::move(result.metadata_location),
std::move(table_io), shared_from_this()));
return Transaction::Make(std::move(staged_table), TransactionKind::kCreate);
}

Expand Down Expand Up @@ -474,9 +485,13 @@ Result<std::shared_ptr<Table>> RestCatalog::LoadTable(const TableIdentifier& ide
ICEBERG_ASSIGN_OR_RAISE(const auto body, LoadTableInternal(identifier));
ICEBERG_ASSIGN_OR_RAISE(auto json, FromJsonString(body));
ICEBERG_ASSIGN_OR_RAISE(auto load_result, LoadTableResultFromJson(json));
/// FIXME: support per-table FileIO creation

ICEBERG_ASSIGN_OR_RAISE(
auto table_io,
ResolveTableFileIO(file_io_, config_.configs(),
config_.Get(RestCatalogProperties::kWarehouse), load_result));
return Table::Make(identifier, std::move(load_result.metadata),
std::move(load_result.metadata_location), file_io_,
std::move(load_result.metadata_location), std::move(table_io),
shared_from_this());
}

Expand All @@ -498,8 +513,12 @@ Result<std::shared_ptr<Table>> RestCatalog::RegisterTable(

ICEBERG_ASSIGN_OR_RAISE(auto json, FromJsonString(response.body()));
ICEBERG_ASSIGN_OR_RAISE(auto load_result, LoadTableResultFromJson(json));
ICEBERG_ASSIGN_OR_RAISE(
auto table_io,
ResolveTableFileIO(file_io_, config_.configs(),
config_.Get(RestCatalogProperties::kWarehouse), load_result));
return Table::Make(identifier, std::move(load_result.metadata),
std::move(load_result.metadata_location), file_io_,
std::move(load_result.metadata_location), std::move(table_io),
shared_from_this());
}

Expand Down
80 changes: 75 additions & 5 deletions src/iceberg/catalog/rest/rest_file_io.cc
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,9 @@
#include "iceberg/catalog/rest/rest_file_io.h"

#include <string>
#include <unordered_map>

#include "iceberg/catalog/rest/catalog_properties.h"
#include "iceberg/file_io_registry.h"
#include "iceberg/util/macros.h"

Expand Down Expand Up @@ -69,12 +71,13 @@ Result<std::unique_ptr<FileIO>> MakeCatalogFileIO(const RestCatalogProperties& c

if (io_impl.empty()) {
if (warehouse.empty()) {
return InvalidArgument(R"("{}" or "{}" property is required to create FileIO)",
RestCatalogProperties::kIOImpl.key(),
RestCatalogProperties::kWarehouse.key());
// No io-impl or warehouse configured. Fall back to a local FileIO as a
// default — enabling per-table ResolveTableFileIO (vending).
io_impl = std::string(FileIORegistry::kArrowLocalFileIO);
} else {
ICEBERG_ASSIGN_OR_RAISE(const auto detected_kind, DetectBuiltinFileIO(warehouse));
io_impl = std::string(BuiltinFileIOName(detected_kind));
}
ICEBERG_ASSIGN_OR_RAISE(const auto detected_kind, DetectBuiltinFileIO(warehouse));
io_impl = std::string(BuiltinFileIOName(detected_kind));
}

if (!warehouse.empty() && IsBuiltinImpl(io_impl)) {
Expand All @@ -92,4 +95,71 @@ Result<std::unique_ptr<FileIO>> MakeCatalogFileIO(const RestCatalogProperties& c
return FileIORegistry::Load(io_impl, config.configs());
}

namespace {

const StorageCredential* ResolveStorageCredential(
const std::vector<StorageCredential>& credentials, std::string_view location) {
const StorageCredential* best = nullptr;
for (const auto& cred : credentials) {
if (location.starts_with(cred.prefix)) {
if (!best || cred.prefix.size() > best->prefix.size()) {
best = &cred;
}
}
}
return best;
}

std::unordered_map<std::string, std::string> MergeTableProperties(
const std::unordered_map<std::string, std::string>& catalog_props,
const std::unordered_map<std::string, std::string>& table_config,
const std::unordered_map<std::string, std::string>& credential_config) {
auto merged = catalog_props;
for (const auto& [k, v] : table_config) {
merged[k] = v;
}
for (const auto& [k, v] : credential_config) {
merged[k] = v;
}
return merged;
}

} // namespace

Result<std::shared_ptr<FileIO>> ResolveTableFileIO(
const std::shared_ptr<FileIO>& catalog_io,
const std::unordered_map<std::string, std::string>& catalog_props,
const std::string& warehouse, const LoadTableResult& result) {
if (result.config.empty() && result.storage_credentials.empty()) {
return catalog_io;
}

// Merge order: catalog props < table config < storage credentials (highest priority).
const StorageCredential* cred = nullptr;
if (!result.metadata_location.empty()) {
cred = ResolveStorageCredential(result.storage_credentials, result.metadata_location);
}
static const std::unordered_map<std::string, std::string> kEmpty;
auto merged =
MergeTableProperties(catalog_props, result.config, cred ? cred->config : kEmpty);

// Detect FileIO type: explicit io-impl > warehouse scheme > metadata_location scheme.
std::string io_impl;
if (auto it = merged.find(std::string(RestCatalogProperties::kIOImpl.key()));
it != merged.end()) {
io_impl = it->second;
} else if (!warehouse.empty()) {
ICEBERG_ASSIGN_OR_RAISE(auto kind, DetectBuiltinFileIO(warehouse));
io_impl = std::string(BuiltinFileIOName(kind));
} else if (!result.metadata_location.empty()) {
ICEBERG_ASSIGN_OR_RAISE(auto kind, DetectBuiltinFileIO(result.metadata_location));
io_impl = std::string(BuiltinFileIOName(kind));
} else {
return catalog_io;
}

ICEBERG_ASSIGN_OR_RAISE(auto table_io, FileIORegistry::Load(io_impl, merged));
return std::shared_ptr<FileIO>(std::move(table_io));
}

} // namespace iceberg::rest
14 changes: 14 additions & 0 deletions src/iceberg/catalog/rest/rest_file_io.h
Original file line number Diff line number Diff line change
Expand Up @@ -21,10 +21,13 @@

#include <cstdint>
#include <memory>
#include <string>
#include <string_view>
#include <unordered_map>

#include "iceberg/catalog/rest/catalog_properties.h"
#include "iceberg/catalog/rest/iceberg_rest_export.h"
#include "iceberg/catalog/rest/types.h"
#include "iceberg/file_io.h"
#include "iceberg/file_io_registry.h"
#include "iceberg/result.h"
Expand All @@ -44,4 +47,15 @@ ICEBERG_REST_EXPORT std::string_view BuiltinFileIOName(BuiltinFileIOKind kind);
ICEBERG_REST_EXPORT Result<std::unique_ptr<FileIO>> MakeCatalogFileIO(
const RestCatalogProperties& config);

/// \brief Resolve a per-table FileIO from a LoadTableResult.
///
/// Merges catalog properties, table config, and the best-matching storage
/// credential (longest prefix match on metadata_location), then creates a
/// per-table FileIO via FileIORegistry. Falls back to \p catalog_io when
/// no per-table config is present or the FileIO type cannot be detected.
ICEBERG_REST_EXPORT Result<std::shared_ptr<FileIO>> ResolveTableFileIO(
const std::shared_ptr<FileIO>& catalog_io,
const std::unordered_map<std::string, std::string>& catalog_props,
const std::string& warehouse, const LoadTableResult& result);

} // namespace iceberg::rest
Loading
Loading