Skip to content

Commit a970e07

Browse files
Sandeep GottimukkalaSandeep Gottimukkala
authored andcommitted
refactor(rest): move scan plan catalog impl out of this branch
Remove PlanTableScan, FetchPlanningResult, CancelPlanning, and FetchScanTasks from Catalog, RestCatalog, and their tests. This branch now contains only the REST models (types.h), ser/de, and error handlers for scan planning. The catalog-layer wiring will live in a separate branch.
1 parent 67a5602 commit a970e07

5 files changed

Lines changed: 0 additions & 237 deletions

File tree

src/iceberg/catalog.h

Lines changed: 0 additions & 40 deletions
Original file line numberDiff line numberDiff line change
@@ -26,10 +26,8 @@
2626
#include <unordered_set>
2727
#include <vector>
2828

29-
#include "iceberg/catalog/rest/types.h"
3029
#include "iceberg/result.h"
3130
#include "iceberg/table_identifier.h"
32-
#include "iceberg/table_scan.h"
3331
#include "iceberg/type_fwd.h"
3432

3533
namespace iceberg {
@@ -190,44 +188,6 @@ class ICEBERG_EXPORT Catalog {
190188
/// \return a Table instance or ErrorKind::kAlreadyExists if the table already exists
191189
virtual Result<std::shared_ptr<Table>> RegisterTable(
192190
const TableIdentifier& identifier, const std::string& metadata_file_location) = 0;
193-
194-
/// \brief Initiate a scan planning operation for the given table.
195-
///
196-
/// \param table The table to scan.
197-
/// \param context The scan context containing snapshot, filter, and other options.
198-
/// \return A PlanTableScanResponse with the plan status and initial scan tasks.
199-
virtual Result<rest::PlanTableScanResponse> PlanTableScan(
200-
const Table& table, const internal::TableScanContext& context) {
201-
return NotImplemented("PlanTableScan is not supported by this catalog");
202-
}
203-
204-
/// \brief Fetch the current status and results of an asynchronous scan plan.
205-
///
206-
/// \param table The table being scanned.
207-
/// \param plan_id The plan ID returned by PlanTableScan.
208-
/// \return A FetchPlanningResultResponse with the current plan status and tasks.
209-
virtual Result<rest::FetchPlanningResultResponse> FetchPlanningResult(
210-
const Table& table, const std::string& plan_id) {
211-
return NotImplemented("FetchPlanningResult is not supported by this catalog");
212-
}
213-
214-
/// \brief Cancel an in-progress scan planning operation.
215-
///
216-
/// \param table The table being scanned.
217-
/// \param plan_id The plan ID returned by PlanTableScan.
218-
virtual Status CancelPlanning(const Table& table, const std::string& plan_id) {
219-
return NotImplemented("CancelPlanning is not supported by this catalog");
220-
}
221-
222-
/// \brief Fetch the scan tasks for a given plan task token.
223-
///
224-
/// \param table The table being scanned.
225-
/// \param plan_task The plan task token returned in a scan plan response.
226-
/// \return A FetchScanTasksResponse with the file scan tasks.
227-
virtual Result<rest::FetchScanTasksResponse> FetchScanTasks(
228-
const Table& table, const std::string& plan_task) {
229-
return NotImplemented("FetchScanTasks is not supported by this catalog");
230-
}
231191
};
232192

233193
} // namespace iceberg

src/iceberg/catalog/rest/rest_catalog.cc

Lines changed: 0 additions & 84 deletions
Original file line numberDiff line numberDiff line change
@@ -43,7 +43,6 @@
4343
#include "iceberg/sort_order.h"
4444
#include "iceberg/table.h"
4545
#include "iceberg/table_requirement.h"
46-
#include "iceberg/table_scan.h"
4746
#include "iceberg/table_update.h"
4847
#include "iceberg/transaction.h"
4948
#include "iceberg/util/macros.h"
@@ -63,8 +62,6 @@ std::unordered_set<Endpoint> GetDefaultEndpoints() {
6362
Endpoint::UpdateTable(), Endpoint::DeleteTable(),
6463
Endpoint::RenameTable(), Endpoint::RegisterTable(),
6564
Endpoint::ReportMetrics(), Endpoint::CommitTransaction(),
66-
Endpoint::PlanTableScan(), Endpoint::FetchPlanningResult(),
67-
Endpoint::CancelPlanning(), Endpoint::FetchScanTasks(),
6865
};
6966
}
7067

@@ -498,85 +495,4 @@ Result<std::shared_ptr<Table>> RestCatalog::RegisterTable(
498495
shared_from_this());
499496
}
500497

501-
Result<PlanTableScanResponse> RestCatalog::PlanTableScan(
502-
const Table& table, const internal::TableScanContext& context) {
503-
ICEBERG_ENDPOINT_CHECK(supported_endpoints_, Endpoint::PlanTableScan());
504-
ICEBERG_ASSIGN_OR_RAISE(auto path, paths_->Plan(table.name()));
505-
ICEBERG_ASSIGN_OR_RAISE(auto schema_ptr, table.schema());
506-
ICEBERG_ASSIGN_OR_RAISE(auto specs_ref, table.specs());
507-
508-
PlanTableScanRequest request;
509-
if (context.snapshot_id.has_value()) {
510-
request.snapshot_id = context.snapshot_id;
511-
}
512-
request.select = context.selected_columns;
513-
request.filter = context.filter;
514-
request.case_sensitive = context.case_sensitive;
515-
request.use_snapshot_schema = false;
516-
if (context.from_snapshot_id.has_value() && context.to_snapshot_id.has_value()) {
517-
request.start_snapshot_id = context.from_snapshot_id.value();
518-
request.end_snapshot_id = context.to_snapshot_id.value();
519-
request.use_snapshot_schema = true;
520-
}
521-
if (context.min_rows_requested.has_value()) {
522-
request.min_rows_requested = context.min_rows_requested.value();
523-
}
524-
525-
ICEBERG_RETURN_UNEXPECTED(request.Validate());
526-
ICEBERG_ASSIGN_OR_RAISE(auto request_json, ToJson(request));
527-
ICEBERG_ASSIGN_OR_RAISE(auto json_request, ToJsonString(request_json));
528-
ICEBERG_ASSIGN_OR_RAISE(
529-
const auto response,
530-
client_->Post(path, json_request, /*headers=*/{}, *PlanErrorHandler::Instance(),
531-
*catalog_session_));
532-
533-
ICEBERG_ASSIGN_OR_RAISE(auto json, FromJsonString(response.body()));
534-
return PlanTableScanResponseFromJson(json, specs_ref.get(), *schema_ptr);
535-
}
536-
537-
Result<FetchPlanningResultResponse> RestCatalog::FetchPlanningResult(
538-
const Table& table, const std::string& plan_id) {
539-
ICEBERG_ENDPOINT_CHECK(supported_endpoints_, Endpoint::FetchPlanningResult());
540-
ICEBERG_ASSIGN_OR_RAISE(auto path, paths_->Plan(table.name(), plan_id));
541-
ICEBERG_ASSIGN_OR_RAISE(auto schema_ptr, table.schema());
542-
ICEBERG_ASSIGN_OR_RAISE(auto specs_ref, table.specs());
543-
544-
ICEBERG_ASSIGN_OR_RAISE(
545-
const auto response,
546-
client_->Get(path, /*params=*/{}, /*headers=*/{}, *PlanErrorHandler::Instance(),
547-
*catalog_session_));
548-
ICEBERG_ASSIGN_OR_RAISE(auto json, FromJsonString(response.body()));
549-
return FetchPlanningResultResponseFromJson(json, specs_ref.get(), *schema_ptr);
550-
}
551-
552-
Status RestCatalog::CancelPlanning(const Table& table, const std::string& plan_id) {
553-
ICEBERG_ENDPOINT_CHECK(supported_endpoints_, Endpoint::CancelPlanning());
554-
ICEBERG_ASSIGN_OR_RAISE(auto path, paths_->Plan(table.name(), plan_id));
555-
556-
ICEBERG_ASSIGN_OR_RAISE(
557-
const auto response,
558-
client_->Delete(path, /*params=*/{}, /*headers=*/{},
559-
*PlanErrorHandler::Instance(), *catalog_session_));
560-
return {};
561-
}
562-
563-
Result<FetchScanTasksResponse> RestCatalog::FetchScanTasks(const Table& table,
564-
const std::string& plan_task) {
565-
ICEBERG_ENDPOINT_CHECK(supported_endpoints_, Endpoint::FetchScanTasks());
566-
ICEBERG_ASSIGN_OR_RAISE(auto path, paths_->FetchScanTasks(table.name()));
567-
ICEBERG_ASSIGN_OR_RAISE(auto schema_ptr, table.schema());
568-
ICEBERG_ASSIGN_OR_RAISE(auto specs_ref, table.specs());
569-
570-
FetchScanTasksRequest request{.planTask = plan_task};
571-
ICEBERG_RETURN_UNEXPECTED(request.Validate());
572-
ICEBERG_ASSIGN_OR_RAISE(auto json_request, ToJsonString(ToJson(request)));
573-
ICEBERG_ASSIGN_OR_RAISE(
574-
const auto response,
575-
client_->Post(path, json_request, /*headers=*/{}, *PlanTaskErrorHandler::Instance(),
576-
*catalog_session_));
577-
578-
ICEBERG_ASSIGN_OR_RAISE(auto json, FromJsonString(response.body()));
579-
return FetchScanTasksResponseFromJson(json, specs_ref.get(), *schema_ptr);
580-
}
581-
582498
} // namespace iceberg::rest

src/iceberg/catalog/rest/rest_catalog.h

Lines changed: 0 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -104,14 +104,6 @@ class ICEBERG_REST_EXPORT RestCatalog : public Catalog,
104104
const TableIdentifier& identifier,
105105
const std::string& metadata_file_location) override;
106106

107-
Result<PlanTableScanResponse> PlanTableScan(
108-
const Table& table, const internal::TableScanContext& context) override;
109-
Result<FetchPlanningResultResponse> FetchPlanningResult(
110-
const Table& table, const std::string& plan_id) override;
111-
Status CancelPlanning(const Table& table, const std::string& plan_id) override;
112-
Result<FetchScanTasksResponse> FetchScanTasks(const Table& table,
113-
const std::string& plan_task) override;
114-
115107
private:
116108
RestCatalog(RestCatalogProperties config, std::shared_ptr<FileIO> file_io,
117109
std::unique_ptr<HttpClient> client, std::unique_ptr<ResourcePaths> paths,

src/iceberg/test/endpoint_test.cc

Lines changed: 0 additions & 27 deletions
Original file line numberDiff line numberDiff line change
@@ -145,31 +145,6 @@ TEST(EndpointTest, TransactionEndpoints) {
145145
EXPECT_EQ(commit_transaction.path(), "/v1/{prefix}/transactions/commit");
146146
}
147147

148-
// Test predefined scan planning endpoints
149-
TEST(EndpointTest, ScanPlanEndpoints) {
150-
auto plan_table_scan = Endpoint::PlanTableScan();
151-
EXPECT_EQ(plan_table_scan.method(), HttpMethod::kPost);
152-
EXPECT_EQ(plan_table_scan.path(),
153-
"/v1/{prefix}/namespaces/{namespace}/tables/{table}/plan");
154-
EXPECT_EQ(plan_table_scan.ToString(),
155-
"POST /v1/{prefix}/namespaces/{namespace}/tables/{table}/plan");
156-
157-
auto fetch_planning_result = Endpoint::FetchPlanningResult();
158-
EXPECT_EQ(fetch_planning_result.method(), HttpMethod::kGet);
159-
EXPECT_EQ(fetch_planning_result.path(),
160-
"/v1/{prefix}/namespaces/{namespace}/tables/{table}/plan/{plan-id}");
161-
162-
auto cancel_planning = Endpoint::CancelPlanning();
163-
EXPECT_EQ(cancel_planning.method(), HttpMethod::kDelete);
164-
EXPECT_EQ(cancel_planning.path(),
165-
"/v1/{prefix}/namespaces/{namespace}/tables/{table}/plan/{plan-id}");
166-
167-
auto fetch_scan_tasks = Endpoint::FetchScanTasks();
168-
EXPECT_EQ(fetch_scan_tasks.method(), HttpMethod::kPost);
169-
EXPECT_EQ(fetch_scan_tasks.path(),
170-
"/v1/{prefix}/namespaces/{namespace}/tables/{table}/tasks");
171-
}
172-
173148
// Test endpoint equality
174149
TEST(EndpointTest, Equality) {
175150
auto endpoint1 = Endpoint::Make(HttpMethod::kGet, "/path");
@@ -267,8 +242,6 @@ TEST(EndpointTest, StringRoundTrip) {
267242
Endpoint::ListNamespaces(), Endpoint::GetNamespaceProperties(),
268243
Endpoint::CreateNamespace(), Endpoint::LoadTable(),
269244
Endpoint::CreateTable(), Endpoint::DeleteTable(),
270-
Endpoint::PlanTableScan(), Endpoint::FetchPlanningResult(),
271-
Endpoint::CancelPlanning(), Endpoint::FetchScanTasks(),
272245
};
273246

274247
for (const auto& original : endpoints) {

src/iceberg/test/rest_catalog_integration_test.cc

Lines changed: 0 additions & 78 deletions
Original file line numberDiff line numberDiff line change
@@ -46,7 +46,6 @@
4646
#include "iceberg/table.h"
4747
#include "iceberg/table_identifier.h"
4848
#include "iceberg/table_requirement.h"
49-
#include "iceberg/table_scan.h"
5049
#include "iceberg/table_update.h"
5150
#include "iceberg/test/matchers.h"
5251
#include "iceberg/test/std_io.h"
@@ -449,83 +448,6 @@ TEST_F(RestCatalogIntegrationTest, StageCreateTable) {
449448
EXPECT_EQ(committed->metadata()->properties.configs().at("key1"), "value1");
450449
}
451450

452-
// -- Scan plan operations --
453-
454-
TEST_F(RestCatalogIntegrationTest, PlanTableScan) {
455-
Namespace ns{.levels = {"test_plan_table_scan"}};
456-
ICEBERG_UNWRAP_OR_FAIL(auto catalog, CreateCatalogAndNamespace(ns));
457-
TableIdentifier table_id{.ns = ns, .name = "scan_table"};
458-
ASSERT_THAT(CreateDefaultTable(catalog, table_id), IsOk());
459-
ICEBERG_UNWRAP_OR_FAIL(auto table, catalog->LoadTable(table_id));
460-
461-
internal::TableScanContext context;
462-
ICEBERG_UNWRAP_OR_FAIL(auto response, catalog->PlanTableScan(*table, context));
463-
// Empty table: no file scan tasks returned
464-
EXPECT_TRUE(response.file_scan_tasks.empty());
465-
}
466-
467-
TEST_F(RestCatalogIntegrationTest, PlanTableScanWithContext) {
468-
Namespace ns{.levels = {"test_plan_scan_context"}};
469-
ICEBERG_UNWRAP_OR_FAIL(auto catalog, CreateCatalogAndNamespace(ns));
470-
TableIdentifier table_id{.ns = ns, .name = "context_scan_table"};
471-
ASSERT_THAT(CreateDefaultTable(catalog, table_id), IsOk());
472-
ICEBERG_UNWRAP_OR_FAIL(auto table, catalog->LoadTable(table_id));
473-
474-
internal::TableScanContext context;
475-
context.selected_columns = {"id", "data"};
476-
context.case_sensitive = true;
477-
ICEBERG_UNWRAP_OR_FAIL(auto response, catalog->PlanTableScan(*table, context));
478-
EXPECT_TRUE(response.file_scan_tasks.empty());
479-
}
480-
481-
TEST_F(RestCatalogIntegrationTest, FetchPlanningResult) {
482-
Namespace ns{.levels = {"test_fetch_planning_result"}};
483-
ICEBERG_UNWRAP_OR_FAIL(auto catalog, CreateCatalogAndNamespace(ns));
484-
TableIdentifier table_id{.ns = ns, .name = "fetch_plan_table"};
485-
ASSERT_THAT(CreateDefaultTable(catalog, table_id), IsOk());
486-
ICEBERG_UNWRAP_OR_FAIL(auto table, catalog->LoadTable(table_id));
487-
488-
internal::TableScanContext context;
489-
ICEBERG_UNWRAP_OR_FAIL(auto plan_response, catalog->PlanTableScan(*table, context));
490-
491-
// NOTE: apache/iceberg-rest-fixture always responds synchronously (status="completed")
492-
// with a non-empty plan_id (e.g. "sync-<uuid>"). Sync plans are immediately discarded
493-
// server-side. Async paths are covered by unit tests.
494-
EXPECT_FALSE(plan_response.plan_id.empty());
495-
}
496-
497-
TEST_F(RestCatalogIntegrationTest, CancelPlanning) {
498-
Namespace ns{.levels = {"test_cancel_planning"}};
499-
ICEBERG_UNWRAP_OR_FAIL(auto catalog, CreateCatalogAndNamespace(ns));
500-
TableIdentifier table_id{.ns = ns, .name = "cancel_plan_table"};
501-
ASSERT_THAT(CreateDefaultTable(catalog, table_id), IsOk());
502-
ICEBERG_UNWRAP_OR_FAIL(auto table, catalog->LoadTable(table_id));
503-
504-
internal::TableScanContext context;
505-
ICEBERG_UNWRAP_OR_FAIL(auto plan_response, catalog->PlanTableScan(*table, context));
506-
507-
// NOTE: apache/iceberg-rest-fixture always responds synchronously — sync plan_id is
508-
// accepted for cancellation (idempotent). Async paths are covered by unit tests.
509-
ASSERT_FALSE(plan_response.plan_id.empty());
510-
ASSERT_THAT(catalog->CancelPlanning(*table, plan_response.plan_id), IsOk());
511-
}
512-
513-
TEST_F(RestCatalogIntegrationTest, FetchScanTasks) {
514-
Namespace ns{.levels = {"test_fetch_scan_tasks"}};
515-
ICEBERG_UNWRAP_OR_FAIL(auto catalog, CreateCatalogAndNamespace(ns));
516-
TableIdentifier table_id{.ns = ns, .name = "fetch_tasks_table"};
517-
ASSERT_THAT(CreateDefaultTable(catalog, table_id), IsOk());
518-
ICEBERG_UNWRAP_OR_FAIL(auto table, catalog->LoadTable(table_id));
519-
520-
internal::TableScanContext context;
521-
ICEBERG_UNWRAP_OR_FAIL(auto plan_response, catalog->PlanTableScan(*table, context));
522-
523-
// NOTE: apache/iceberg-rest-fixture always responds synchronously — plan_tasks is
524-
// always empty. FetchScanTasks is only relevant for async plans; async paths are
525-
// covered by unit tests.
526-
EXPECT_TRUE(plan_response.plan_tasks.empty());
527-
}
528-
529451
// -- Snapshot loading mode --
530452

531453
TEST_F(RestCatalogIntegrationTest, LoadTableWithSnapshotModeAll) {

0 commit comments

Comments
 (0)