Skip to content

Commit f49f654

Browse files
committed
feat(rest): implement table operations part1
1 parent f43d24b commit f49f654

File tree

9 files changed

+200
-50
lines changed

9 files changed

+200
-50
lines changed

src/iceberg/catalog.h

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -179,7 +179,8 @@ class ICEBERG_EXPORT Catalog {
179179
/// \param identifier a table identifier
180180
/// \return instance of Table implementation referred to by identifier or
181181
/// ErrorKind::kNoSuchTable if the table does not exist
182-
virtual Result<std::shared_ptr<Table>> LoadTable(const TableIdentifier& identifier) = 0;
182+
virtual Result<std::shared_ptr<Table>> LoadTable(
183+
const TableIdentifier& identifier) const = 0;
183184

184185
/// \brief Register a table with the catalog if it does not exist
185186
///

src/iceberg/catalog/memory/in_memory_catalog.cc

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -466,7 +466,7 @@ Status InMemoryCatalog::RenameTable(const TableIdentifier& from,
466466
}
467467

468468
Result<std::shared_ptr<Table>> InMemoryCatalog::LoadTable(
469-
const TableIdentifier& identifier) {
469+
const TableIdentifier& identifier) const {
470470
if (!file_io_) [[unlikely]] {
471471
return InvalidArgument("file_io is not set for catalog {}", catalog_name_);
472472
}
@@ -480,8 +480,9 @@ Result<std::shared_ptr<Table>> InMemoryCatalog::LoadTable(
480480

481481
ICEBERG_ASSIGN_OR_RAISE(auto metadata,
482482
TableMetadataUtil::Read(*file_io_, metadata_location));
483+
auto non_const_catalog = std::const_pointer_cast<InMemoryCatalog>(shared_from_this());
483484
return Table::Make(identifier, std::move(metadata), std::move(metadata_location),
484-
file_io_, shared_from_this());
485+
file_io_, non_const_catalog);
485486
}
486487

487488
Result<std::shared_ptr<Table>> InMemoryCatalog::RegisterTable(

src/iceberg/catalog/memory/in_memory_catalog.h

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -93,7 +93,8 @@ class ICEBERG_EXPORT InMemoryCatalog
9393

9494
Status RenameTable(const TableIdentifier& from, const TableIdentifier& to) override;
9595

96-
Result<std::shared_ptr<Table>> LoadTable(const TableIdentifier& identifier) override;
96+
Result<std::shared_ptr<Table>> LoadTable(
97+
const TableIdentifier& identifier) const override;
9798

9899
Result<std::shared_ptr<Table>> RegisterTable(
99100
const TableIdentifier& identifier,

src/iceberg/catalog/rest/http_client.cc

Lines changed: 9 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -135,9 +135,8 @@ Status HandleFailureResponse(const cpr::Response& response,
135135
} // namespace
136136

137137
void HttpClient::PrepareSession(
138-
const std::string& path,
139-
const std::unordered_map<std::string, std::string>& request_headers,
140-
const std::unordered_map<std::string, std::string>& params) {
138+
const std::string& path, const std::unordered_map<std::string, std::string>& params,
139+
const std::unordered_map<std::string, std::string>& request_headers) {
141140
session_->SetUrl(cpr::Url{path});
142141
session_->SetParameters(GetParameters(params));
143142
session_->RemoveContent();
@@ -164,7 +163,7 @@ Result<HttpResponse> HttpClient::Get(
164163
cpr::Response response;
165164
{
166165
std::lock_guard guard(session_mutex_);
167-
PrepareSession(path, headers, params);
166+
PrepareSession(path, params, headers);
168167
response = session_->Get();
169168
}
170169

@@ -181,7 +180,7 @@ Result<HttpResponse> HttpClient::Post(
181180
cpr::Response response;
182181
{
183182
std::lock_guard guard(session_mutex_);
184-
PrepareSession(path, headers);
183+
PrepareSession(path, /*params=*/{}, headers);
185184
session_->SetBody(cpr::Body{body});
186185
response = session_->Post();
187186
}
@@ -206,7 +205,7 @@ Result<HttpResponse> HttpClient::PostForm(
206205
auto form_headers = headers;
207206
form_headers[kHeaderContentType] = kMimeTypeFormUrlEncoded;
208207

209-
PrepareSession(path, form_headers);
208+
PrepareSession(path, /*params=*/{}, form_headers);
210209
std::vector<cpr::Pair> pair_list;
211210
pair_list.reserve(form_data.size());
212211
for (const auto& [key, val] : form_data) {
@@ -229,7 +228,7 @@ Result<HttpResponse> HttpClient::Head(
229228
cpr::Response response;
230229
{
231230
std::lock_guard guard(session_mutex_);
232-
PrepareSession(path, headers);
231+
PrepareSession(path, /*params=*/{}, headers);
233232
response = session_->Head();
234233
}
235234

@@ -240,12 +239,13 @@ Result<HttpResponse> HttpClient::Head(
240239
}
241240

242241
Result<HttpResponse> HttpClient::Delete(
243-
const std::string& path, const std::unordered_map<std::string, std::string>& headers,
242+
const std::string& path, const std::unordered_map<std::string, std::string>& params,
243+
const std::unordered_map<std::string, std::string>& headers,
244244
const ErrorHandler& error_handler) {
245245
cpr::Response response;
246246
{
247247
std::lock_guard guard(session_mutex_);
248-
PrepareSession(path, headers);
248+
PrepareSession(path, params, headers);
249249
response = session_->Delete();
250250
}
251251

src/iceberg/catalog/rest/http_client.h

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -104,13 +104,14 @@ class ICEBERG_REST_EXPORT HttpClient {
104104

105105
/// \brief Sends a DELETE request.
106106
Result<HttpResponse> Delete(const std::string& path,
107+
const std::unordered_map<std::string, std::string>& params,
107108
const std::unordered_map<std::string, std::string>& headers,
108109
const ErrorHandler& error_handler);
109110

110111
private:
111-
void PrepareSession(const std::string& path,
112-
const std::unordered_map<std::string, std::string>& request_headers,
113-
const std::unordered_map<std::string, std::string>& params = {});
112+
void PrepareSession(
113+
const std::string& path, const std::unordered_map<std::string, std::string>& params,
114+
const std::unordered_map<std::string, std::string>& request_headers);
114115

115116
std::unordered_map<std::string, std::string> default_headers_;
116117

src/iceberg/catalog/rest/rest_catalog.cc

Lines changed: 58 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -189,9 +189,10 @@ Status RestCatalog::DropNamespace(const Namespace& ns) {
189189
ICEBERG_RETURN_UNEXPECTED(
190190
CheckEndpoint(supported_endpoints_, Endpoint::DropNamespace()));
191191
ICEBERG_ASSIGN_OR_RAISE(auto path, paths_->Namespace_(ns));
192-
ICEBERG_ASSIGN_OR_RAISE(
193-
const auto response,
194-
client_->Delete(path, /*headers=*/{}, *DropNamespaceErrorHandler::Instance()));
192+
193+
ICEBERG_ASSIGN_OR_RAISE(const auto response,
194+
client_->Delete(path, /*params=*/{}, /*headers=*/{},
195+
*DropNamespaceErrorHandler::Instance()));
195196
return {};
196197
}
197198

@@ -204,17 +205,16 @@ Result<bool> RestCatalog::NamespaceExists(const Namespace& ns) const {
204205
return false;
205206
}
206207
ICEBERG_RETURN_UNEXPECTED(result);
207-
// GET succeeded, namespace exists
208+
// GetNamespaceProperties succeeded, namespace exists
208209
return true;
209210
}
210211

211212
ICEBERG_ASSIGN_OR_RAISE(auto path, paths_->Namespace_(ns));
212213
auto response_or_error =
213214
client_->Head(path, /*headers=*/{}, *NamespaceErrorHandler::Instance());
214215
if (!response_or_error.has_value()) {
215-
const auto& error = response_or_error.error();
216216
// catch NoSuchNamespaceException/404 and return false
217-
if (error.kind == ErrorKind::kNoSuchNamespace) {
217+
if (response_or_error.error().kind == ErrorKind::kNoSuchNamespace) {
218218
return false;
219219
}
220220
ICEBERG_RETURN_UNEXPECTED(response_or_error);
@@ -294,14 +294,44 @@ Result<std::shared_ptr<Transaction>> RestCatalog::StageCreateTable(
294294
return NotImplemented("Not implemented");
295295
}
296296

297-
Status RestCatalog::DropTable([[maybe_unused]] const TableIdentifier& identifier,
298-
[[maybe_unused]] bool purge) {
299-
return NotImplemented("Not implemented");
297+
Status RestCatalog::DropTable(const TableIdentifier& identifier, bool purge) {
298+
ICEBERG_RETURN_UNEXPECTED(CheckEndpoint(supported_endpoints_, Endpoint::DeleteTable()));
299+
ICEBERG_ASSIGN_OR_RAISE(auto path, paths_->Table(identifier));
300+
301+
std::unordered_map<std::string, std::string> params;
302+
if (purge) {
303+
params["purgeRequested"] = "true";
304+
}
305+
ICEBERG_ASSIGN_OR_RAISE(
306+
const auto response,
307+
client_->Delete(path, params, /*headers=*/{}, *TableErrorHandler::Instance()));
308+
return {};
300309
}
301310

302-
Result<bool> RestCatalog::TableExists(
303-
[[maybe_unused]] const TableIdentifier& identifier) const {
304-
return NotImplemented("Not implemented");
311+
Result<bool> RestCatalog::TableExists(const TableIdentifier& identifier) const {
312+
auto check = CheckEndpoint(supported_endpoints_, Endpoint::TableExists());
313+
if (!check.has_value()) {
314+
// Fall back to LoadTable endpoint (GET)
315+
auto result = LoadTable(identifier);
316+
if (!result.has_value() && result.error().kind == ErrorKind::kNoSuchTable) {
317+
return false;
318+
}
319+
ICEBERG_RETURN_UNEXPECTED(result);
320+
// LoadTable succeeded, table exists
321+
return true;
322+
}
323+
324+
ICEBERG_ASSIGN_OR_RAISE(auto path, paths_->Table(identifier));
325+
auto response_or_error =
326+
client_->Head(path, /*headers=*/{}, *TableErrorHandler::Instance());
327+
if (!response_or_error.has_value()) {
328+
// catch NoSuchTableException/404 and return false
329+
if (response_or_error.error().kind == ErrorKind::kNoSuchTable) {
330+
return false;
331+
}
332+
ICEBERG_RETURN_UNEXPECTED(response_or_error);
333+
}
334+
return true;
305335
}
306336

307337
Status RestCatalog::RenameTable([[maybe_unused]] const TableIdentifier& from,
@@ -310,8 +340,22 @@ Status RestCatalog::RenameTable([[maybe_unused]] const TableIdentifier& from,
310340
}
311341

312342
Result<std::shared_ptr<Table>> RestCatalog::LoadTable(
313-
[[maybe_unused]] const TableIdentifier& identifier) {
314-
return NotImplemented("Not implemented");
343+
const TableIdentifier& identifier) const {
344+
ICEBERG_RETURN_UNEXPECTED(CheckEndpoint(supported_endpoints_, Endpoint::LoadTable()));
345+
ICEBERG_ASSIGN_OR_RAISE(auto path, paths_->Table(identifier));
346+
347+
ICEBERG_ASSIGN_OR_RAISE(
348+
const auto response,
349+
client_->Get(path, /*params=*/{}, /*headers=*/{}, *TableErrorHandler::Instance()));
350+
351+
// TODO(Feiyang Li): support load metadata table
352+
ICEBERG_ASSIGN_OR_RAISE(auto json, FromJsonString(response.body()));
353+
ICEBERG_ASSIGN_OR_RAISE(auto load_result, LoadTableResultFromJson(json));
354+
// Cast away const since Table needs non-const Catalog pointer for mutations
355+
auto non_const_catalog = std::const_pointer_cast<RestCatalog>(shared_from_this());
356+
return Table::Make(identifier, load_result.metadata,
357+
std::move(load_result.metadata_location), file_io_,
358+
non_const_catalog);
315359
}
316360

317361
Result<std::shared_ptr<Table>> RestCatalog::RegisterTable(

src/iceberg/catalog/rest/rest_catalog.h

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -97,7 +97,8 @@ class ICEBERG_REST_EXPORT RestCatalog : public Catalog,
9797

9898
Status DropTable(const TableIdentifier& identifier, bool purge) override;
9999

100-
Result<std::shared_ptr<Table>> LoadTable(const TableIdentifier& identifier) override;
100+
Result<std::shared_ptr<Table>> LoadTable(
101+
const TableIdentifier& identifier) const override;
101102

102103
Result<std::shared_ptr<Table>> RegisterTable(
103104
const TableIdentifier& identifier,

src/iceberg/test/mock_catalog.h

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -81,7 +81,7 @@ class MockCatalog : public Catalog {
8181
(override));
8282

8383
MOCK_METHOD((Result<std::shared_ptr<Table>>), LoadTable, (const TableIdentifier&),
84-
(override));
84+
(const, override));
8585

8686
MOCK_METHOD((Result<std::shared_ptr<Table>>), RegisterTable,
8787
(const TableIdentifier&, const std::string&), (override));

0 commit comments

Comments
 (0)