Skip to content

Commit f177530

Browse files
committed
feat(rest): implement list table and update table
1 parent d328d0a commit f177530

File tree

9 files changed

+591
-13
lines changed

9 files changed

+591
-13
lines changed

src/iceberg/catalog/rest/http_client.cc

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -140,6 +140,10 @@ void HttpClient::PrepareSession(
140140
session_->SetUrl(cpr::Url{path});
141141
session_->SetParameters(GetParameters(params));
142142
session_->RemoveContent();
143+
// clear lingering POST mode state from prior requests. CURLOPT_POST is implicitly set
144+
// to 1 by POST requests, and this state is not reset by RemoveContent(), so we must
145+
// manually enforce HTTP GET to clear it.
146+
curl_easy_setopt(session_->GetCurlHolder()->handle, CURLOPT_HTTPGET, 1L);
143147
auto final_headers = MergeHeaders(default_headers_, headers);
144148
session_->SetHeader(final_headers);
145149
}

src/iceberg/catalog/rest/json_internal.cc

Lines changed: 62 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -31,6 +31,8 @@
3131
#include "iceberg/partition_spec.h"
3232
#include "iceberg/sort_order.h"
3333
#include "iceberg/table_identifier.h"
34+
#include "iceberg/table_requirement.h"
35+
#include "iceberg/table_update.h"
3436
#include "iceberg/util/json_util_internal.h"
3537
#include "iceberg/util/macros.h"
3638

@@ -70,6 +72,10 @@ constexpr std::string_view kCode = "code";
7072
constexpr std::string_view kStack = "stack";
7173
constexpr std::string_view kError = "error";
7274

75+
// CommitTableRequest field constants
76+
constexpr std::string_view kIdentifier = "identifier";
77+
constexpr std::string_view kRequirements = "requirements";
78+
7379
} // namespace
7480

7581
nlohmann::json ToJson(const CatalogConfig& config) {
@@ -390,6 +396,60 @@ Result<CreateTableRequest> CreateTableRequestFromJson(const nlohmann::json& json
390396
return request;
391397
}
392398

399+
// CommitTableRequest serialization
400+
nlohmann::json ToJson(const CommitTableRequest& request) {
401+
nlohmann::json json;
402+
if (!request.identifier.name.empty()) {
403+
json[kIdentifier] = iceberg::ToJson(request.identifier);
404+
}
405+
406+
nlohmann::json requirements_json = nlohmann::json::array();
407+
for (const auto& req : request.requirements) {
408+
requirements_json.push_back(iceberg::ToJson(*req));
409+
}
410+
json[kRequirements] = std::move(requirements_json);
411+
412+
nlohmann::json updates_json = nlohmann::json::array();
413+
for (const auto& update : request.updates) {
414+
updates_json.push_back(iceberg::ToJson(*update));
415+
}
416+
json[kUpdates] = std::move(updates_json);
417+
418+
return json;
419+
}
420+
421+
Result<CommitTableRequest> CommitTableRequestFromJson(const nlohmann::json& json) {
422+
CommitTableRequest request;
423+
if (json.contains(kIdentifier)) {
424+
ICEBERG_ASSIGN_OR_RAISE(auto identifier_json,
425+
GetJsonValue<nlohmann::json>(json, kIdentifier));
426+
ICEBERG_ASSIGN_OR_RAISE(request.identifier, TableIdentifierFromJson(identifier_json));
427+
}
428+
// Note: requirements and updates deserialization would be complex
429+
// and is not typically needed for the client side
430+
ICEBERG_RETURN_UNEXPECTED(request.Validate());
431+
return request;
432+
}
433+
434+
// CommitTableResponse serialization
435+
nlohmann::json ToJson(const CommitTableResponse& response) {
436+
nlohmann::json json;
437+
SetOptionalStringField(json, kMetadataLocation, response.metadata_location);
438+
json[kMetadata] = ToJson(*response.metadata);
439+
return json;
440+
}
441+
442+
Result<CommitTableResponse> CommitTableResponseFromJson(const nlohmann::json& json) {
443+
CommitTableResponse response;
444+
ICEBERG_ASSIGN_OR_RAISE(response.metadata_location,
445+
GetJsonValueOrDefault<std::string>(json, kMetadataLocation));
446+
ICEBERG_ASSIGN_OR_RAISE(auto metadata_json,
447+
GetJsonValue<nlohmann::json>(json, kMetadata));
448+
ICEBERG_ASSIGN_OR_RAISE(response.metadata, TableMetadataFromJson(metadata_json));
449+
ICEBERG_RETURN_UNEXPECTED(response.Validate());
450+
return response;
451+
}
452+
393453
#define ICEBERG_DEFINE_FROM_JSON(Model) \
394454
template <> \
395455
Result<Model> FromJson<Model>(const nlohmann::json& json) { \
@@ -409,5 +469,7 @@ ICEBERG_DEFINE_FROM_JSON(LoadTableResult)
409469
ICEBERG_DEFINE_FROM_JSON(RegisterTableRequest)
410470
ICEBERG_DEFINE_FROM_JSON(RenameTableRequest)
411471
ICEBERG_DEFINE_FROM_JSON(CreateTableRequest)
472+
ICEBERG_DEFINE_FROM_JSON(CommitTableRequest)
473+
ICEBERG_DEFINE_FROM_JSON(CommitTableResponse)
412474

413475
} // namespace iceberg::rest

src/iceberg/catalog/rest/json_internal.h

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -56,6 +56,8 @@ ICEBERG_DECLARE_JSON_SERDE(LoadTableResult)
5656
ICEBERG_DECLARE_JSON_SERDE(RegisterTableRequest)
5757
ICEBERG_DECLARE_JSON_SERDE(RenameTableRequest)
5858
ICEBERG_DECLARE_JSON_SERDE(CreateTableRequest)
59+
ICEBERG_DECLARE_JSON_SERDE(CommitTableRequest)
60+
ICEBERG_DECLARE_JSON_SERDE(CommitTableResponse)
5961

6062
#undef ICEBERG_DECLARE_JSON_SERDE
6163

src/iceberg/catalog/rest/rest_catalog.cc

Lines changed: 83 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -244,9 +244,30 @@ Status RestCatalog::UpdateNamespaceProperties(
244244
return {};
245245
}
246246

247-
Result<std::vector<TableIdentifier>> RestCatalog::ListTables(
248-
[[maybe_unused]] const Namespace& ns) const {
249-
return NotImplemented("Not implemented");
247+
Result<std::vector<TableIdentifier>> RestCatalog::ListTables(const Namespace& ns) const {
248+
ICEBERG_ENDPOINT_CHECK(supported_endpoints_, Endpoint::CreateTable());
249+
250+
ICEBERG_ASSIGN_OR_RAISE(auto path, paths_->Tables(ns));
251+
std::vector<TableIdentifier> result;
252+
std::string next_token;
253+
while (true) {
254+
std::unordered_map<std::string, std::string> params;
255+
if (!next_token.empty()) {
256+
params[kQueryParamPageToken] = next_token;
257+
}
258+
ICEBERG_ASSIGN_OR_RAISE(
259+
const auto response,
260+
client_->Get(path, params, /*headers=*/{}, *TableErrorHandler::Instance()));
261+
ICEBERG_ASSIGN_OR_RAISE(auto json, FromJsonString(response.body()));
262+
ICEBERG_ASSIGN_OR_RAISE(auto list_response, ListTablesResponseFromJson(json));
263+
result.insert(result.end(), list_response.identifiers.begin(),
264+
list_response.identifiers.end());
265+
if (list_response.next_page_token.empty()) {
266+
return result;
267+
}
268+
next_token = list_response.next_page_token;
269+
}
270+
return result;
250271
}
251272

252273
Result<std::shared_ptr<Table>> RestCatalog::CreateTable(
@@ -280,10 +301,34 @@ Result<std::shared_ptr<Table>> RestCatalog::CreateTable(
280301
}
281302

282303
Result<std::shared_ptr<Table>> RestCatalog::UpdateTable(
283-
[[maybe_unused]] const TableIdentifier& identifier,
284-
[[maybe_unused]] const std::vector<std::unique_ptr<TableRequirement>>& requirements,
285-
[[maybe_unused]] const std::vector<std::unique_ptr<TableUpdate>>& updates) {
286-
return NotImplemented("Not implemented");
304+
const TableIdentifier& identifier,
305+
const std::vector<std::unique_ptr<TableRequirement>>& requirements,
306+
const std::vector<std::unique_ptr<TableUpdate>>& updates) {
307+
ICEBERG_ENDPOINT_CHECK(supported_endpoints_, Endpoint::CreateTable());
308+
ICEBERG_ASSIGN_OR_RAISE(auto path, paths_->Table(identifier));
309+
310+
// Build request with non-owning shared_ptr (using no-op deleter)
311+
CommitTableRequest request{.identifier = identifier};
312+
request.requirements.reserve(requirements.size());
313+
for (const auto& req : requirements) {
314+
request.requirements.emplace_back(req.get(), [](auto*) {});
315+
}
316+
request.updates.reserve(updates.size());
317+
for (const auto& update : updates) {
318+
request.updates.emplace_back(update.get(), [](auto*) {});
319+
}
320+
321+
ICEBERG_ASSIGN_OR_RAISE(auto json_request, ToJsonString(ToJson(request)));
322+
ICEBERG_ASSIGN_OR_RAISE(
323+
const auto response,
324+
client_->Post(path, json_request, /*headers=*/{}, *TableErrorHandler::Instance()));
325+
326+
ICEBERG_ASSIGN_OR_RAISE(auto json, FromJsonString(response.body()));
327+
ICEBERG_ASSIGN_OR_RAISE(auto commit_response, CommitTableResponseFromJson(json));
328+
329+
return Table::Make(identifier, commit_response.metadata,
330+
std::move(commit_response.metadata_location), file_io_,
331+
shared_from_this());
287332
}
288333

289334
Result<std::shared_ptr<Transaction>> RestCatalog::StageCreateTable(
@@ -321,9 +366,17 @@ Result<bool> RestCatalog::TableExists(const TableIdentifier& identifier) const {
321366
client_->Head(path, /*headers=*/{}, *TableErrorHandler::Instance()));
322367
}
323368

324-
Status RestCatalog::RenameTable([[maybe_unused]] const TableIdentifier& from,
325-
[[maybe_unused]] const TableIdentifier& to) {
326-
return NotImplemented("Not implemented");
369+
Status RestCatalog::RenameTable(const TableIdentifier& from, const TableIdentifier& to) {
370+
ICEBERG_ENDPOINT_CHECK(supported_endpoints_, Endpoint::RenameTable());
371+
ICEBERG_ASSIGN_OR_RAISE(auto path, paths_->Rename());
372+
373+
RenameTableRequest request{.source = from, .destination = to};
374+
ICEBERG_ASSIGN_OR_RAISE(auto json_request, ToJsonString(ToJson(request)));
375+
ICEBERG_ASSIGN_OR_RAISE(
376+
const auto response,
377+
client_->Post(path, json_request, /*headers=*/{}, *TableErrorHandler::Instance()));
378+
379+
return {};
327380
}
328381

329382
Result<std::string> RestCatalog::LoadTableInternal(
@@ -350,9 +403,26 @@ Result<std::shared_ptr<Table>> RestCatalog::LoadTable(const TableIdentifier& ide
350403
}
351404

352405
Result<std::shared_ptr<Table>> RestCatalog::RegisterTable(
353-
[[maybe_unused]] const TableIdentifier& identifier,
354-
[[maybe_unused]] const std::string& metadata_file_location) {
355-
return NotImplemented("Not implemented");
406+
const TableIdentifier& identifier, const std::string& metadata_file_location) {
407+
ICEBERG_ENDPOINT_CHECK(supported_endpoints_, Endpoint::RegisterTable());
408+
ICEBERG_ASSIGN_OR_RAISE(auto path, paths_->Register(identifier.ns));
409+
410+
RegisterTableRequest request{
411+
.name = identifier.name,
412+
.metadata_location = metadata_file_location,
413+
};
414+
415+
ICEBERG_ASSIGN_OR_RAISE(auto json_request, ToJsonString(ToJson(request)));
416+
ICEBERG_ASSIGN_OR_RAISE(
417+
const auto response,
418+
client_->Post(path, json_request, /*headers=*/{}, *TableErrorHandler::Instance()));
419+
420+
ICEBERG_ASSIGN_OR_RAISE(auto json, FromJsonString(response.body()));
421+
ICEBERG_ASSIGN_OR_RAISE(auto load_result, LoadTableResultFromJson(json));
422+
auto non_const_catalog = std::const_pointer_cast<RestCatalog>(shared_from_this());
423+
return Table::Make(identifier, load_result.metadata,
424+
std::move(load_result.metadata_location), file_io_,
425+
non_const_catalog);
356426
}
357427

358428
} // namespace iceberg::rest

src/iceberg/catalog/rest/types.cc

Lines changed: 28 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -69,4 +69,32 @@ bool LoadTableResult::operator==(const LoadTableResult& other) const {
6969
return true;
7070
}
7171

72+
bool CommitTableRequest::operator==(const CommitTableRequest& other) const {
73+
if (identifier != other.identifier) {
74+
return false;
75+
}
76+
if (requirements.size() != other.requirements.size()) {
77+
return false;
78+
}
79+
if (updates.size() != other.updates.size()) {
80+
return false;
81+
}
82+
// Note: Deep comparison of requirements and updates is not implemented
83+
// as they contain polymorphic types. This is primarily for testing.
84+
return true;
85+
}
86+
87+
bool CommitTableResponse::operator==(const CommitTableResponse& other) const {
88+
if (metadata_location != other.metadata_location) {
89+
return false;
90+
}
91+
if (!metadata != !other.metadata) {
92+
return false;
93+
}
94+
if (metadata && *metadata != *other.metadata) {
95+
return false;
96+
}
97+
return true;
98+
}
99+
72100
} // namespace iceberg::rest

src/iceberg/catalog/rest/types.h

Lines changed: 31 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -29,6 +29,8 @@
2929
#include "iceberg/result.h"
3030
#include "iceberg/schema.h"
3131
#include "iceberg/table_identifier.h"
32+
#include "iceberg/table_requirement.h"
33+
#include "iceberg/table_update.h"
3234
#include "iceberg/type_fwd.h"
3335
#include "iceberg/util/macros.h"
3436

@@ -246,4 +248,33 @@ struct ICEBERG_REST_EXPORT ListTablesResponse {
246248
bool operator==(const ListTablesResponse&) const = default;
247249
};
248250

251+
/// \brief Request to commit changes to a table.
252+
struct ICEBERG_REST_EXPORT CommitTableRequest {
253+
TableIdentifier identifier;
254+
std::vector<std::shared_ptr<TableRequirement>> requirements;
255+
std::vector<std::shared_ptr<TableUpdate>> updates;
256+
257+
/// \brief Validates the CommitTableRequest.
258+
Status Validate() const { return {}; }
259+
260+
bool operator==(const CommitTableRequest& other) const;
261+
};
262+
263+
/// \brief Response from committing changes to a table.
264+
struct ICEBERG_REST_EXPORT CommitTableResponse {
265+
std::string metadata_location;
266+
std::shared_ptr<TableMetadata> metadata; // required
267+
// TODO(Li Feiyang): Add std::shared_ptr<StorageCredential> storage_credential;
268+
269+
/// \brief Validates the CommitTableResponse.
270+
Status Validate() const {
271+
if (!metadata) {
272+
return Invalid("Invalid metadata: null");
273+
}
274+
return {};
275+
}
276+
277+
bool operator==(const CommitTableResponse& other) const;
278+
};
279+
249280
} // namespace iceberg::rest

0 commit comments

Comments
 (0)