Skip to content

Commit 86714b9

Browse files
Sandeep GottimukkalaSandeep Gottimukkala
authored andcommitted
Address comments
1 parent a95f341 commit 86714b9

5 files changed

Lines changed: 67 additions & 35 deletions

File tree

src/iceberg/catalog/rest/error_handlers.cc

Lines changed: 20 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -214,4 +214,24 @@ Status ScanPlanErrorHandler::Accept(const ErrorResponse& error) const {
214214
return DefaultErrorHandler::Accept(error);
215215
}
216216

217+
const std::shared_ptr<PlanTaskErrorHandler>& PlanTaskErrorHandler::Instance() {
218+
static const std::shared_ptr<PlanTaskErrorHandler> instance{new PlanTaskErrorHandler()};
219+
return instance;
220+
}
221+
222+
Status PlanTaskErrorHandler::Accept(const ErrorResponse& error) const {
223+
switch (error.code) {
224+
case 404:
225+
if (error.type == kNoSuchNamespaceException) {
226+
return NoSuchNamespace(error.message);
227+
}
228+
if (error.type == kNoSuchTableException) {
229+
return NoSuchTable(error.message);
230+
}
231+
return NoSuchPlanTask(error.message);
232+
}
233+
234+
return DefaultErrorHandler::Accept(error);
235+
}
236+
217237
} // namespace iceberg::rest

src/iceberg/catalog/rest/error_handlers.h

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -138,4 +138,15 @@ class ICEBERG_REST_EXPORT ScanPlanErrorHandler final : public DefaultErrorHandle
138138
constexpr ScanPlanErrorHandler() = default;
139139
};
140140

141+
/// \brief Fetch scan tasks operation error handler.
142+
class ICEBERG_REST_EXPORT PlanTaskErrorHandler final : public DefaultErrorHandler {
143+
public:
144+
static const std::shared_ptr<PlanTaskErrorHandler>& Instance();
145+
146+
Status Accept(const ErrorResponse& error) const override;
147+
148+
private:
149+
constexpr PlanTaskErrorHandler() = default;
150+
};
151+
141152
} // namespace iceberg::rest

src/iceberg/catalog/rest/json_serde.cc

Lines changed: 33 additions & 33 deletions
Original file line numberDiff line numberDiff line change
@@ -93,9 +93,40 @@ constexpr std::string_view kUseSnapshotSchema = "use-snapshot-schema";
9393
constexpr std::string_view kStartSnapshotId = "start-snapshot-id";
9494
constexpr std::string_view kEndSnapshotId = "end-snapshot-id";
9595
constexpr std::string_view kStatsFields = "stats-fields";
96-
constexpr std::string_view kMinRowsRequired = "min-rows-required";
96+
constexpr std::string_view kMinRowsRequested = "min-rows-requested";
9797
constexpr std::string_view kPlanTask = "plan-task";
9898

99+
Status BaseScanTaskResponseFromJson(
100+
const nlohmann::json& json, BaseScanTaskResponse* response,
101+
const std::unordered_map<int32_t, std::shared_ptr<PartitionSpec>>&
102+
partition_specs_by_id,
103+
const Schema& schema) {
104+
// 1. plan_tasks
105+
ICEBERG_ASSIGN_OR_RAISE(
106+
response->plan_tasks,
107+
GetJsonValueOrDefault<std::vector<std::string>>(json, kPlanTasks));
108+
109+
// 2. delete_files
110+
ICEBERG_ASSIGN_OR_RAISE(
111+
auto delete_files_json,
112+
GetJsonValueOrDefault<nlohmann::json>(json, kDeleteFiles, nlohmann::json::array()));
113+
for (const auto& entry_json : delete_files_json) {
114+
ICEBERG_ASSIGN_OR_RAISE(auto delete_file,
115+
DataFileFromJson(entry_json, partition_specs_by_id, schema));
116+
response->delete_files.push_back(std::move(delete_file));
117+
}
118+
119+
// 3. file_scan_tasks
120+
ICEBERG_ASSIGN_OR_RAISE(auto file_scan_tasks_json,
121+
GetJsonValueOrDefault<nlohmann::json>(json, kFileScanTasks,
122+
nlohmann::json::array()));
123+
ICEBERG_ASSIGN_OR_RAISE(
124+
response->file_scan_tasks,
125+
FileScanTasksFromJson(file_scan_tasks_json, response->delete_files,
126+
partition_specs_by_id, schema));
127+
return {};
128+
}
129+
99130
} // namespace
100131

101132
nlohmann::json ToJson(const CatalogConfig& config) {
@@ -547,7 +578,7 @@ Result<nlohmann::json> ToJson(const PlanTableScanRequest& request) {
547578
json[kStatsFields] = request.statsFields;
548579
}
549580
if (request.min_rows_required.has_value()) {
550-
json[kMinRowsRequired] = request.min_rows_required.value();
581+
json[kMinRowsRequested] = request.min_rows_required.value();
551582
}
552583
return json;
553584
}
@@ -558,37 +589,6 @@ nlohmann::json ToJson(const FetchScanTasksRequest& request) {
558589
return json;
559590
}
560591

561-
Status BaseScanTaskResponseFromJson(
562-
const nlohmann::json& json, BaseScanTaskResponse* response,
563-
const std::unordered_map<int32_t, std::shared_ptr<PartitionSpec>>&
564-
partition_specs_by_id,
565-
const Schema& schema) {
566-
// 1. plan_tasks
567-
ICEBERG_ASSIGN_OR_RAISE(
568-
response->plan_tasks,
569-
GetJsonValueOrDefault<std::vector<std::string>>(json, kPlanTasks));
570-
571-
// 2. delete_files
572-
ICEBERG_ASSIGN_OR_RAISE(
573-
auto delete_files_json,
574-
GetJsonValueOrDefault<nlohmann::json>(json, kDeleteFiles, nlohmann::json::array()));
575-
for (const auto& entry_json : delete_files_json) {
576-
ICEBERG_ASSIGN_OR_RAISE(auto delete_file,
577-
DataFileFromJson(entry_json, partition_specs_by_id, schema));
578-
response->delete_files.push_back(std::move(delete_file));
579-
}
580-
581-
// 3. file_scan_tasks
582-
ICEBERG_ASSIGN_OR_RAISE(auto file_scan_tasks_json,
583-
GetJsonValueOrDefault<nlohmann::json>(json, kFileScanTasks,
584-
nlohmann::json::array()));
585-
ICEBERG_ASSIGN_OR_RAISE(
586-
response->file_scan_tasks,
587-
FileScanTasksFromJson(file_scan_tasks_json, response->delete_files,
588-
partition_specs_by_id, schema));
589-
return {};
590-
}
591-
592592
Result<PlanTableScanResponse> PlanTableScanResponseFromJson(
593593
const nlohmann::json& json,
594594
const std::unordered_map<int32_t, std::shared_ptr<PartitionSpec>>&

src/iceberg/catalog/rest/resource_paths.cc

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -113,8 +113,9 @@ Result<std::string> ResourcePaths::ScanPlan(const TableIdentifier& ident,
113113
const std::string& plan_id) const {
114114
ICEBERG_ASSIGN_OR_RAISE(std::string encoded_namespace, EncodeNamespace(ident.ns));
115115
ICEBERG_ASSIGN_OR_RAISE(std::string encoded_table_name, EncodeString(ident.name));
116+
ICEBERG_ASSIGN_OR_RAISE(std::string encoded_plan_id, EncodeString(plan_id));
116117
return std::format("{}/v1/{}namespaces/{}/tables/{}/plan/{}", base_uri_, prefix_,
117-
encoded_namespace, encoded_table_name, plan_id);
118+
encoded_namespace, encoded_table_name, encoded_plan_id);
118119
}
119120

120121
Result<std::string> ResourcePaths::ScanTask(const TableIdentifier& ident) const {

src/iceberg/catalog/rest/rest_catalog.cc

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -572,7 +572,7 @@ Result<FetchScanTasksResponse> RestCatalog::FetchScanTasks(const Table& table,
572572
ICEBERG_ASSIGN_OR_RAISE(auto json_request, ToJsonString(ToJson(request)));
573573
ICEBERG_ASSIGN_OR_RAISE(
574574
const auto response,
575-
client_->Post(path, json_request, /*headers=*/{}, *ScanPlanErrorHandler::Instance(),
575+
client_->Post(path, json_request, /*headers=*/{}, *PlanTaskErrorHandler::Instance(),
576576
*catalog_session_));
577577

578578
ICEBERG_ASSIGN_OR_RAISE(auto json, FromJsonString(response.body()));

0 commit comments

Comments
 (0)