Skip to content
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions src/iceberg/catalog/rest/http_client.cc
Original file line number Diff line number Diff line change
Expand Up @@ -140,6 +140,10 @@ void HttpClient::PrepareSession(
session_->SetUrl(cpr::Url{path});
session_->SetParameters(GetParameters(params));
session_->RemoveContent();
// clear lingering POST mode state from prior requests. CURLOPT_POST is implicitly set
// to 1 by POST requests, and this state is not reset by RemoveContent(), so we must
// manually enforce HTTP GET to clear it.
curl_easy_setopt(session_->GetCurlHolder()->handle, CURLOPT_HTTPGET, 1L);
Comment thread
HeartLinked marked this conversation as resolved.
auto final_headers = MergeHeaders(default_headers_, headers);
session_->SetHeader(final_headers);
}
Expand Down
62 changes: 62 additions & 0 deletions src/iceberg/catalog/rest/json_internal.cc
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,8 @@
#include "iceberg/partition_spec.h"
#include "iceberg/sort_order.h"
#include "iceberg/table_identifier.h"
#include "iceberg/table_requirement.h"
#include "iceberg/table_update.h"
#include "iceberg/util/json_util_internal.h"
#include "iceberg/util/macros.h"

Expand Down Expand Up @@ -70,6 +72,10 @@ constexpr std::string_view kCode = "code";
constexpr std::string_view kStack = "stack";
constexpr std::string_view kError = "error";

// CommitTableRequest field constants
Comment thread
HeartLinked marked this conversation as resolved.
Outdated
constexpr std::string_view kIdentifier = "identifier";
constexpr std::string_view kRequirements = "requirements";

} // namespace

nlohmann::json ToJson(const CatalogConfig& config) {
Expand Down Expand Up @@ -390,6 +396,60 @@ Result<CreateTableRequest> CreateTableRequestFromJson(const nlohmann::json& json
return request;
}

// CommitTableRequest serialization
nlohmann::json ToJson(const CommitTableRequest& request) {
nlohmann::json json;
if (!request.identifier.name.empty()) {
json[kIdentifier] = iceberg::ToJson(request.identifier);
Comment thread
HeartLinked marked this conversation as resolved.
Outdated
}

nlohmann::json requirements_json = nlohmann::json::array();
for (const auto& req : request.requirements) {
requirements_json.push_back(iceberg::ToJson(*req));
}
json[kRequirements] = std::move(requirements_json);

nlohmann::json updates_json = nlohmann::json::array();
for (const auto& update : request.updates) {
updates_json.push_back(iceberg::ToJson(*update));
}
json[kUpdates] = std::move(updates_json);

return json;
}

Result<CommitTableRequest> CommitTableRequestFromJson(const nlohmann::json& json) {
CommitTableRequest request;
if (json.contains(kIdentifier)) {
ICEBERG_ASSIGN_OR_RAISE(auto identifier_json,
GetJsonValue<nlohmann::json>(json, kIdentifier));
ICEBERG_ASSIGN_OR_RAISE(request.identifier, TableIdentifierFromJson(identifier_json));
}
// Note: requirements and updates deserialization would be complex
Comment thread
HeartLinked marked this conversation as resolved.
Outdated
// and is not typically needed for the client side
ICEBERG_RETURN_UNEXPECTED(request.Validate());
return request;
}

// CommitTableResponse serialization
nlohmann::json ToJson(const CommitTableResponse& response) {
nlohmann::json json;
SetOptionalStringField(json, kMetadataLocation, response.metadata_location);
Comment thread
HeartLinked marked this conversation as resolved.
Outdated
json[kMetadata] = ToJson(*response.metadata);
Comment thread
HeartLinked marked this conversation as resolved.
Outdated
return json;
}

Result<CommitTableResponse> CommitTableResponseFromJson(const nlohmann::json& json) {
CommitTableResponse response;
ICEBERG_ASSIGN_OR_RAISE(response.metadata_location,
GetJsonValueOrDefault<std::string>(json, kMetadataLocation));
Comment thread
HeartLinked marked this conversation as resolved.
Outdated
ICEBERG_ASSIGN_OR_RAISE(auto metadata_json,
GetJsonValue<nlohmann::json>(json, kMetadata));
ICEBERG_ASSIGN_OR_RAISE(response.metadata, TableMetadataFromJson(metadata_json));
ICEBERG_RETURN_UNEXPECTED(response.Validate());
return response;
}

#define ICEBERG_DEFINE_FROM_JSON(Model) \
template <> \
Result<Model> FromJson<Model>(const nlohmann::json& json) { \
Expand All @@ -409,5 +469,7 @@ ICEBERG_DEFINE_FROM_JSON(LoadTableResult)
ICEBERG_DEFINE_FROM_JSON(RegisterTableRequest)
ICEBERG_DEFINE_FROM_JSON(RenameTableRequest)
ICEBERG_DEFINE_FROM_JSON(CreateTableRequest)
ICEBERG_DEFINE_FROM_JSON(CommitTableRequest)
ICEBERG_DEFINE_FROM_JSON(CommitTableResponse)

} // namespace iceberg::rest
2 changes: 2 additions & 0 deletions src/iceberg/catalog/rest/json_internal.h
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,8 @@ ICEBERG_DECLARE_JSON_SERDE(LoadTableResult)
ICEBERG_DECLARE_JSON_SERDE(RegisterTableRequest)
ICEBERG_DECLARE_JSON_SERDE(RenameTableRequest)
ICEBERG_DECLARE_JSON_SERDE(CreateTableRequest)
ICEBERG_DECLARE_JSON_SERDE(CommitTableRequest)
Comment thread
HeartLinked marked this conversation as resolved.
ICEBERG_DECLARE_JSON_SERDE(CommitTableResponse)

#undef ICEBERG_DECLARE_JSON_SERDE

Expand Down
96 changes: 83 additions & 13 deletions src/iceberg/catalog/rest/rest_catalog.cc
Original file line number Diff line number Diff line change
Expand Up @@ -244,9 +244,30 @@ Status RestCatalog::UpdateNamespaceProperties(
return {};
}

Result<std::vector<TableIdentifier>> RestCatalog::ListTables(
[[maybe_unused]] const Namespace& ns) const {
return NotImplemented("Not implemented");
Result<std::vector<TableIdentifier>> RestCatalog::ListTables(const Namespace& ns) const {
ICEBERG_ENDPOINT_CHECK(supported_endpoints_, Endpoint::CreateTable());
Comment thread
HeartLinked marked this conversation as resolved.
Outdated

ICEBERG_ASSIGN_OR_RAISE(auto path, paths_->Tables(ns));
std::vector<TableIdentifier> result;
std::string next_token;
while (true) {
std::unordered_map<std::string, std::string> params;
if (!next_token.empty()) {
params[kQueryParamPageToken] = next_token;
}
ICEBERG_ASSIGN_OR_RAISE(
const auto response,
client_->Get(path, params, /*headers=*/{}, *TableErrorHandler::Instance()));
ICEBERG_ASSIGN_OR_RAISE(auto json, FromJsonString(response.body()));
ICEBERG_ASSIGN_OR_RAISE(auto list_response, ListTablesResponseFromJson(json));
result.insert(result.end(), list_response.identifiers.begin(),
list_response.identifiers.end());
if (list_response.next_page_token.empty()) {
return result;
}
next_token = list_response.next_page_token;
Comment thread
HeartLinked marked this conversation as resolved.
Outdated
}
return result;
}

Result<std::shared_ptr<Table>> RestCatalog::CreateTable(
Expand Down Expand Up @@ -280,10 +301,34 @@ Result<std::shared_ptr<Table>> RestCatalog::CreateTable(
}

Result<std::shared_ptr<Table>> RestCatalog::UpdateTable(
[[maybe_unused]] const TableIdentifier& identifier,
[[maybe_unused]] const std::vector<std::unique_ptr<TableRequirement>>& requirements,
[[maybe_unused]] const std::vector<std::unique_ptr<TableUpdate>>& updates) {
return NotImplemented("Not implemented");
const TableIdentifier& identifier,
const std::vector<std::unique_ptr<TableRequirement>>& requirements,
const std::vector<std::unique_ptr<TableUpdate>>& updates) {
ICEBERG_ENDPOINT_CHECK(supported_endpoints_, Endpoint::CreateTable());
Comment thread
HeartLinked marked this conversation as resolved.
Outdated
ICEBERG_ASSIGN_OR_RAISE(auto path, paths_->Table(identifier));

// Build request with non-owning shared_ptr (using no-op deleter)
Comment thread
HeartLinked marked this conversation as resolved.
Outdated
CommitTableRequest request{.identifier = identifier};
request.requirements.reserve(requirements.size());
for (const auto& req : requirements) {
request.requirements.emplace_back(req.get(), [](auto*) {});
}
request.updates.reserve(updates.size());
for (const auto& update : updates) {
request.updates.emplace_back(update.get(), [](auto*) {});
}

ICEBERG_ASSIGN_OR_RAISE(auto json_request, ToJsonString(ToJson(request)));
ICEBERG_ASSIGN_OR_RAISE(
const auto response,
client_->Post(path, json_request, /*headers=*/{}, *TableErrorHandler::Instance()));

ICEBERG_ASSIGN_OR_RAISE(auto json, FromJsonString(response.body()));
ICEBERG_ASSIGN_OR_RAISE(auto commit_response, CommitTableResponseFromJson(json));

return Table::Make(identifier, commit_response.metadata,
Comment thread
HeartLinked marked this conversation as resolved.
Outdated
std::move(commit_response.metadata_location), file_io_,
shared_from_this());
}

Result<std::shared_ptr<Transaction>> RestCatalog::StageCreateTable(
Expand Down Expand Up @@ -321,9 +366,17 @@ Result<bool> RestCatalog::TableExists(const TableIdentifier& identifier) const {
client_->Head(path, /*headers=*/{}, *TableErrorHandler::Instance()));
}

Status RestCatalog::RenameTable([[maybe_unused]] const TableIdentifier& from,
[[maybe_unused]] const TableIdentifier& to) {
return NotImplemented("Not implemented");
Status RestCatalog::RenameTable(const TableIdentifier& from, const TableIdentifier& to) {
ICEBERG_ENDPOINT_CHECK(supported_endpoints_, Endpoint::RenameTable());
ICEBERG_ASSIGN_OR_RAISE(auto path, paths_->Rename());

RenameTableRequest request{.source = from, .destination = to};
ICEBERG_ASSIGN_OR_RAISE(auto json_request, ToJsonString(ToJson(request)));
ICEBERG_ASSIGN_OR_RAISE(
const auto response,
client_->Post(path, json_request, /*headers=*/{}, *TableErrorHandler::Instance()));

return {};
}

Result<std::string> RestCatalog::LoadTableInternal(
Expand All @@ -350,9 +403,26 @@ Result<std::shared_ptr<Table>> RestCatalog::LoadTable(const TableIdentifier& ide
}

Result<std::shared_ptr<Table>> RestCatalog::RegisterTable(
[[maybe_unused]] const TableIdentifier& identifier,
[[maybe_unused]] const std::string& metadata_file_location) {
return NotImplemented("Not implemented");
const TableIdentifier& identifier, const std::string& metadata_file_location) {
ICEBERG_ENDPOINT_CHECK(supported_endpoints_, Endpoint::RegisterTable());
ICEBERG_ASSIGN_OR_RAISE(auto path, paths_->Register(identifier.ns));

RegisterTableRequest request{
.name = identifier.name,
.metadata_location = metadata_file_location,
};

ICEBERG_ASSIGN_OR_RAISE(auto json_request, ToJsonString(ToJson(request)));
ICEBERG_ASSIGN_OR_RAISE(
const auto response,
client_->Post(path, json_request, /*headers=*/{}, *TableErrorHandler::Instance()));

ICEBERG_ASSIGN_OR_RAISE(auto json, FromJsonString(response.body()));
ICEBERG_ASSIGN_OR_RAISE(auto load_result, LoadTableResultFromJson(json));
auto non_const_catalog = std::const_pointer_cast<RestCatalog>(shared_from_this());
return Table::Make(identifier, load_result.metadata,
std::move(load_result.metadata_location), file_io_,
non_const_catalog);
Comment thread
HeartLinked marked this conversation as resolved.
Outdated
}

} // namespace iceberg::rest
28 changes: 28 additions & 0 deletions src/iceberg/catalog/rest/types.cc
Original file line number Diff line number Diff line change
Expand Up @@ -69,4 +69,32 @@ bool LoadTableResult::operator==(const LoadTableResult& other) const {
return true;
}

bool CommitTableRequest::operator==(const CommitTableRequest& other) const {
if (identifier != other.identifier) {
return false;
}
if (requirements.size() != other.requirements.size()) {
return false;
}
if (updates.size() != other.updates.size()) {
return false;
}
// Note: Deep comparison of requirements and updates is not implemented
Comment thread
HeartLinked marked this conversation as resolved.
Outdated
// as they contain polymorphic types. This is primarily for testing.
return true;
}

bool CommitTableResponse::operator==(const CommitTableResponse& other) const {
if (metadata_location != other.metadata_location) {
return false;
}
if (!metadata != !other.metadata) {
return false;
}
if (metadata && *metadata != *other.metadata) {
return false;
}
return true;
}

} // namespace iceberg::rest
31 changes: 31 additions & 0 deletions src/iceberg/catalog/rest/types.h
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,8 @@
#include "iceberg/result.h"
#include "iceberg/schema.h"
#include "iceberg/table_identifier.h"
#include "iceberg/table_requirement.h"
#include "iceberg/table_update.h"
Comment thread
HeartLinked marked this conversation as resolved.
Outdated
#include "iceberg/type_fwd.h"
#include "iceberg/util/macros.h"

Expand Down Expand Up @@ -246,4 +248,33 @@ struct ICEBERG_REST_EXPORT ListTablesResponse {
bool operator==(const ListTablesResponse&) const = default;
};

/// \brief Request to commit changes to a table.
struct ICEBERG_REST_EXPORT CommitTableRequest {
TableIdentifier identifier;
std::vector<std::shared_ptr<TableRequirement>> requirements;
std::vector<std::shared_ptr<TableUpdate>> updates;
Comment thread
HeartLinked marked this conversation as resolved.
Outdated

/// \brief Validates the CommitTableRequest.
Status Validate() const { return {}; }

bool operator==(const CommitTableRequest& other) const;
};

/// \brief Response from committing changes to a table.
struct ICEBERG_REST_EXPORT CommitTableResponse {
std::string metadata_location;
Comment thread
HeartLinked marked this conversation as resolved.
Outdated
std::shared_ptr<TableMetadata> metadata; // required
// TODO(Li Feiyang): Add std::shared_ptr<StorageCredential> storage_credential;
Comment thread
HeartLinked marked this conversation as resolved.
Outdated

/// \brief Validates the CommitTableResponse.
Status Validate() const {
if (!metadata) {
return Invalid("Invalid metadata: null");
Comment thread
HeartLinked marked this conversation as resolved.
Outdated
}
return {};
}

bool operator==(const CommitTableResponse& other) const;
};

} // namespace iceberg::rest
Loading
Loading