Skip to content

Commit e884cb4

Browse files
committed
Fix REST scan JSON serde parity
1 parent 8f9c36c commit e884cb4

13 files changed

Lines changed: 1074 additions & 402 deletions

File tree

src/iceberg/catalog/rest/json_serde.cc

Lines changed: 324 additions & 221 deletions
Large diffs are not rendered by default.

src/iceberg/catalog/rest/json_serde_internal.h

Lines changed: 30 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -27,11 +27,8 @@
2727

2828
#include "iceberg/catalog/rest/iceberg_rest_export.h"
2929
#include "iceberg/catalog/rest/types.h"
30-
#include "iceberg/manifest/manifest_entry.h"
31-
#include "iceberg/partition_spec.h"
3230
#include "iceberg/result.h"
33-
#include "iceberg/schema.h"
34-
#include "iceberg/table_scan.h"
31+
#include "iceberg/type_fwd.h"
3532

3633
/// \file iceberg/catalog/rest/json_serde_internal.h
3734
/// JSON serialization and deserialization for Iceberg REST Catalog API types.
@@ -75,24 +72,52 @@ ICEBERG_REST_EXPORT Result<PlanTableScanResponse> PlanTableScanResponseFromJson(
7572
const std::unordered_map<int32_t, std::shared_ptr<PartitionSpec>>&
7673
partition_specs_by_id,
7774
const Schema& schema);
75+
ICEBERG_REST_EXPORT Result<nlohmann::json> ToJson(
76+
const PlanTableScanResponse& response,
77+
const std::unordered_map<int32_t, std::shared_ptr<PartitionSpec>>&
78+
partition_specs_by_id,
79+
const Schema& schema);
7880

7981
ICEBERG_REST_EXPORT Result<FetchPlanningResultResponse>
8082
FetchPlanningResultResponseFromJson(
8183
const nlohmann::json& json,
8284
const std::unordered_map<int32_t, std::shared_ptr<PartitionSpec>>&
8385
partition_specs_by_id,
8486
const Schema& schema);
87+
ICEBERG_REST_EXPORT Result<nlohmann::json> ToJson(
88+
const FetchPlanningResultResponse& response,
89+
const std::unordered_map<int32_t, std::shared_ptr<PartitionSpec>>&
90+
partition_specs_by_id,
91+
const Schema& schema);
8592

8693
ICEBERG_REST_EXPORT Result<FetchScanTasksResponse> FetchScanTasksResponseFromJson(
8794
const nlohmann::json& json,
8895
const std::unordered_map<int32_t, std::shared_ptr<PartitionSpec>>&
8996
partition_specs_by_id,
9097
const Schema& schema);
98+
ICEBERG_REST_EXPORT Result<nlohmann::json> ToJson(
99+
const FetchScanTasksResponse& response,
100+
const std::unordered_map<int32_t, std::shared_ptr<PartitionSpec>>&
101+
partition_specs_by_id,
102+
const Schema& schema);
91103

104+
ICEBERG_REST_EXPORT Result<PlanTableScanRequest> PlanTableScanRequestFromJson(
105+
const nlohmann::json& json);
106+
template <>
107+
ICEBERG_REST_EXPORT Result<PlanTableScanRequest> FromJson(const nlohmann::json& json);
92108
ICEBERG_REST_EXPORT Result<nlohmann::json> ToJson(const PlanTableScanRequest& request);
109+
110+
ICEBERG_REST_EXPORT Result<FetchScanTasksRequest> FetchScanTasksRequestFromJson(
111+
const nlohmann::json& json);
112+
template <>
113+
ICEBERG_REST_EXPORT Result<FetchScanTasksRequest> FromJson(const nlohmann::json& json);
93114
ICEBERG_REST_EXPORT nlohmann::json ToJson(const FetchScanTasksRequest& request);
94115

95-
ICEBERG_REST_EXPORT Result<nlohmann::json> ToJson(const DataFile& df);
116+
ICEBERG_REST_EXPORT Result<nlohmann::json> ToJson(
117+
const DataFile& df,
118+
const std::unordered_map<int32_t, std::shared_ptr<PartitionSpec>>&
119+
partition_specs_by_id,
120+
const Schema& schema);
96121

97122
ICEBERG_REST_EXPORT Result<DataFile> DataFileFromJson(
98123
const nlohmann::json& json,
@@ -107,9 +132,4 @@ FileScanTasksFromJson(const nlohmann::json& json,
107132
partition_spec_by_id,
108133
const Schema& schema);
109134

110-
ICEBERG_REST_EXPORT Result<nlohmann::json> ToJson(const PlanTableScanResponse& response);
111-
ICEBERG_REST_EXPORT Result<nlohmann::json> ToJson(
112-
const FetchPlanningResultResponse& response);
113-
ICEBERG_REST_EXPORT Result<nlohmann::json> ToJson(const FetchScanTasksResponse& response);
114-
115135
} // namespace iceberg::rest

src/iceberg/catalog/rest/types.cc

Lines changed: 100 additions & 68 deletions
Original file line numberDiff line numberDiff line change
@@ -20,16 +20,42 @@
2020
#include "iceberg/catalog/rest/types.h"
2121

2222
#include <algorithm>
23+
#include <optional>
2324

25+
#include "iceberg/expression/expression.h"
26+
#include "iceberg/manifest/manifest_entry.h"
2427
#include "iceberg/partition_spec.h"
2528
#include "iceberg/schema.h"
2629
#include "iceberg/sort_order.h"
2730
#include "iceberg/table_metadata.h"
2831
#include "iceberg/table_requirement.h"
32+
#include "iceberg/table_scan.h"
2933
#include "iceberg/table_update.h"
3034

3135
namespace iceberg::rest {
3236

37+
std::string_view ToString(PlanStatus status) {
38+
switch (status) {
39+
case PlanStatus::kSubmitted:
40+
return "submitted";
41+
case PlanStatus::kCompleted:
42+
return "completed";
43+
case PlanStatus::kCancelled:
44+
return "cancelled";
45+
case PlanStatus::kFailed:
46+
return "failed";
47+
}
48+
return "unknown";
49+
}
50+
51+
Result<PlanStatus> PlanStatusFromString(std::string_view status_str) {
52+
if (status_str == "submitted") return PlanStatus::kSubmitted;
53+
if (status_str == "completed") return PlanStatus::kCompleted;
54+
if (status_str == "cancelled") return PlanStatus::kCancelled;
55+
if (status_str == "failed") return PlanStatus::kFailed;
56+
return JsonParseError("Unknown plan status: {}", status_str);
57+
}
58+
3359
bool CreateTableRequest::operator==(const CreateTableRequest& other) const {
3460
if (name != other.name || location != other.location ||
3561
stage_create != other.stage_create || properties != other.properties) {
@@ -118,9 +144,22 @@ bool CommitTableResponse::operator==(const CommitTableResponse& other) const {
118144
return true;
119145
}
120146

147+
namespace {
148+
149+
bool ExpressionPtrEqual(const std::shared_ptr<Expression>& lhs,
150+
const std::shared_ptr<Expression>& rhs) {
151+
if (lhs == rhs) {
152+
return true;
153+
}
154+
return lhs && rhs && lhs->Equals(*rhs);
155+
}
156+
157+
} // namespace
158+
121159
bool PlanTableScanRequest::operator==(const PlanTableScanRequest& other) const {
122160
return snapshot_id == other.snapshot_id && select == other.select &&
123-
filter == other.filter && case_sensitive == other.case_sensitive &&
161+
ExpressionPtrEqual(filter, other.filter) &&
162+
case_sensitive == other.case_sensitive &&
124163
use_snapshot_schema == other.use_snapshot_schema &&
125164
start_snapshot_id == other.start_snapshot_id &&
126165
end_snapshot_id == other.end_snapshot_id && stats_fields == other.stats_fields &&
@@ -129,86 +168,81 @@ bool PlanTableScanRequest::operator==(const PlanTableScanRequest& other) const {
129168

130169
namespace {
131170

132-
bool ScanTaskFieldsEqual(
133-
const std::vector<std::string>& plan_tasks_a,
134-
const std::vector<std::string>& plan_tasks_b,
135-
const std::vector<std::shared_ptr<DataFile>>& delete_files_a,
136-
const std::vector<std::shared_ptr<DataFile>>& delete_files_b,
137-
const std::vector<std::shared_ptr<FileScanTask>>& file_scan_tasks_a,
138-
const std::vector<std::shared_ptr<FileScanTask>>& file_scan_tasks_b) {
139-
if (plan_tasks_a != plan_tasks_b) {
140-
return false;
171+
template <typename T>
172+
bool SharedPtrEqual(const std::shared_ptr<T>& lhs, const std::shared_ptr<T>& rhs) {
173+
if (lhs == rhs) {
174+
return true;
141175
}
142-
if (delete_files_a.size() != delete_files_b.size()) {
143-
return false;
144-
}
145-
for (size_t i = 0; i < delete_files_a.size(); ++i) {
146-
if (!delete_files_a[i] != !delete_files_b[i]) {
147-
return false;
148-
}
149-
if (delete_files_a[i] && *delete_files_a[i] != *delete_files_b[i]) {
150-
return false;
151-
}
176+
return lhs && rhs && *lhs == *rhs;
177+
}
178+
179+
template <typename T>
180+
bool SharedPtrVectorEqual(const std::vector<std::shared_ptr<T>>& lhs,
181+
const std::vector<std::shared_ptr<T>>& rhs) {
182+
return std::ranges::equal(lhs, rhs, SharedPtrEqual<T>);
183+
}
184+
185+
bool FileScanTaskEqual(const std::shared_ptr<FileScanTask>& lhs,
186+
const std::shared_ptr<FileScanTask>& rhs) {
187+
if (lhs == rhs) {
188+
return true;
152189
}
153-
if (file_scan_tasks_a.size() != file_scan_tasks_b.size()) {
190+
if (!lhs || !rhs) {
154191
return false;
155192
}
156-
for (size_t i = 0; i < file_scan_tasks_a.size(); ++i) {
157-
const auto& a = file_scan_tasks_a[i];
158-
const auto& b = file_scan_tasks_b[i];
159-
if (!a != !b) {
160-
return false;
161-
}
162-
if (!a) continue;
163-
if (!a->data_file() != !b->data_file()) {
164-
return false;
165-
}
166-
if (a->data_file() && *a->data_file() != *b->data_file()) {
167-
return false;
168-
}
169-
if (a->delete_files().size() != b->delete_files().size()) {
170-
return false;
171-
}
172-
for (size_t j = 0; j < a->delete_files().size(); ++j) {
173-
if (!a->delete_files()[j] != !b->delete_files()[j]) {
174-
return false;
175-
}
176-
if (a->delete_files()[j] && *a->delete_files()[j] != *b->delete_files()[j]) {
177-
return false;
178-
}
179-
}
180-
if (a->residual_filter() != b->residual_filter()) {
181-
return false;
182-
}
193+
return SharedPtrEqual(lhs->data_file(), rhs->data_file()) &&
194+
SharedPtrVectorEqual(lhs->delete_files(), rhs->delete_files()) &&
195+
ExpressionPtrEqual(lhs->residual_filter(), rhs->residual_filter());
196+
}
197+
198+
template <typename T>
199+
bool OptionalSharedPtrVectorEqual(
200+
const std::optional<std::vector<std::shared_ptr<T>>>& lhs,
201+
const std::optional<std::vector<std::shared_ptr<T>>>& rhs,
202+
bool (*eq)(const std::shared_ptr<T>&, const std::shared_ptr<T>&)) {
203+
if (lhs.has_value() != rhs.has_value()) {
204+
return false;
183205
}
184-
return true;
206+
return !lhs.has_value() || std::ranges::equal(*lhs, *rhs, eq);
207+
}
208+
209+
template <typename Response>
210+
bool ScanTaskFieldsEqual(const Response& lhs, const Response& rhs) {
211+
return lhs.plan_tasks == rhs.plan_tasks &&
212+
SharedPtrVectorEqual(lhs.delete_files, rhs.delete_files) &&
213+
OptionalSharedPtrVectorEqual(lhs.file_scan_tasks, rhs.file_scan_tasks,
214+
FileScanTaskEqual);
215+
}
216+
217+
template <typename Response>
218+
bool HasTaskFields(const Response& response) {
219+
return response.plan_tasks.has_value() || response.file_scan_tasks.has_value();
220+
}
221+
222+
template <typename Response>
223+
bool HasNonEmptyFileScanTasks(const Response& response) {
224+
return response.file_scan_tasks.has_value() && !response.file_scan_tasks->empty();
185225
}
186226

187227
} // namespace
188228

189229
bool PlanTableScanResponse::operator==(const PlanTableScanResponse& other) const {
190-
return ScanTaskFieldsEqual(plan_tasks, other.plan_tasks, delete_files,
191-
other.delete_files, file_scan_tasks,
192-
other.file_scan_tasks) &&
193-
plan_status == other.plan_status && plan_id == other.plan_id &&
194-
error == other.error;
230+
return ScanTaskFieldsEqual(*this, other) && plan_status == other.plan_status &&
231+
plan_id == other.plan_id && error == other.error;
195232
}
196233

197234
bool FetchPlanningResultResponse::operator==(
198235
const FetchPlanningResultResponse& other) const {
199-
return ScanTaskFieldsEqual(plan_tasks, other.plan_tasks, delete_files,
200-
other.delete_files, file_scan_tasks,
201-
other.file_scan_tasks) &&
202-
plan_status == other.plan_status && error == other.error;
236+
return ScanTaskFieldsEqual(*this, other) && plan_status == other.plan_status &&
237+
error == other.error;
203238
}
204239

205240
bool FetchScanTasksRequest::operator==(const FetchScanTasksRequest& other) const {
206241
return planTask == other.planTask;
207242
}
208243

209244
bool FetchScanTasksResponse::operator==(const FetchScanTasksResponse& other) const {
210-
return ScanTaskFieldsEqual(plan_tasks, other.plan_tasks, delete_files,
211-
other.delete_files, file_scan_tasks, other.file_scan_tasks);
245+
return ScanTaskFieldsEqual(*this, other);
212246
}
213247

214248
Status OAuthTokenResponse::Validate() const {
@@ -257,8 +291,7 @@ Status PlanTableScanResponse::Validate() const {
257291
return ValidationFailed(
258292
"Invalid response: 'cancelled' is not a valid status for planTableScan");
259293
}
260-
if (plan_status != PlanStatus::kCompleted &&
261-
(!plan_tasks.empty() || !file_scan_tasks.empty())) {
294+
if (plan_status != PlanStatus::kCompleted && HasTaskFields(*this)) {
262295
return ValidationFailed(
263296
"Invalid response: tasks can only be defined when status is 'completed'");
264297
}
@@ -268,7 +301,7 @@ Status PlanTableScanResponse::Validate() const {
268301
"Invalid response: plan id can only be defined when status is 'submitted' or "
269302
"'completed'");
270303
}
271-
if (file_scan_tasks.empty() && !delete_files.empty()) {
304+
if (!HasNonEmptyFileScanTasks(*this) && !delete_files.empty()) {
272305
return ValidationFailed(
273306
"Invalid response: deleteFiles should only be returned with fileScanTasks that "
274307
"reference them");
@@ -285,12 +318,11 @@ Status PlanTableScanResponse::Validate() const {
285318
}
286319

287320
Status FetchPlanningResultResponse::Validate() const {
288-
if (plan_status != PlanStatus::kCompleted &&
289-
(!plan_tasks.empty() || !file_scan_tasks.empty())) {
321+
if (plan_status != PlanStatus::kCompleted && HasTaskFields(*this)) {
290322
return ValidationFailed(
291323
"Invalid response: tasks can only be returned in a 'completed' status");
292324
}
293-
if (file_scan_tasks.empty() && !delete_files.empty()) {
325+
if (!HasNonEmptyFileScanTasks(*this) && !delete_files.empty()) {
294326
return ValidationFailed(
295327
"Invalid response: deleteFiles should only be returned with fileScanTasks that "
296328
"reference them");
@@ -314,12 +346,12 @@ Status FetchScanTasksRequest::Validate() const {
314346
}
315347

316348
Status FetchScanTasksResponse::Validate() const {
317-
if (file_scan_tasks.empty() && !delete_files.empty()) {
349+
if (!HasNonEmptyFileScanTasks(*this) && !delete_files.empty()) {
318350
return ValidationFailed(
319351
"Invalid response: deleteFiles should only be returned with fileScanTasks that "
320352
"reference them");
321353
}
322-
if (plan_tasks.empty() && file_scan_tasks.empty()) {
354+
if (!HasTaskFields(*this)) {
323355
return ValidationFailed(
324356
"Invalid response: planTasks and fileScanTask cannot both be null");
325357
}

src/iceberg/catalog/rest/types.h

Lines changed: 17 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -29,12 +29,8 @@
2929

3030
#include "iceberg/catalog/rest/endpoint.h"
3131
#include "iceberg/catalog/rest/iceberg_rest_export.h"
32-
#include "iceberg/expression/expression.h"
33-
#include "iceberg/manifest/manifest_entry.h"
3432
#include "iceberg/result.h"
35-
#include "iceberg/schema.h"
3633
#include "iceberg/table_identifier.h"
37-
#include "iceberg/table_scan.h"
3834
#include "iceberg/type_fwd.h"
3935
#include "iceberg/util/macros.h"
4036

@@ -43,6 +39,17 @@
4339

4440
namespace iceberg::rest {
4541

42+
/// \brief Status of a REST server-side scan planning operation.
43+
enum class PlanStatus {
44+
kSubmitted,
45+
kCompleted,
46+
kCancelled,
47+
kFailed,
48+
};
49+
50+
ICEBERG_REST_EXPORT std::string_view ToString(PlanStatus status);
51+
ICEBERG_REST_EXPORT Result<PlanStatus> PlanStatusFromString(std::string_view status_str);
52+
4653
/// \brief Server-provided configuration for the catalog.
4754
struct ICEBERG_REST_EXPORT CatalogConfig {
4855
std::unordered_map<std::string, std::string> defaults; // required
@@ -319,8 +326,8 @@ struct ICEBERG_REST_EXPORT PlanTableScanRequest {
319326
/// \brief Response from initiating a scan planning operation, including plan status and
320327
/// initial scan tasks.
321328
struct ICEBERG_REST_EXPORT PlanTableScanResponse {
322-
std::vector<std::string> plan_tasks;
323-
std::vector<std::shared_ptr<FileScanTask>> file_scan_tasks;
329+
std::optional<std::vector<std::string>> plan_tasks;
330+
std::optional<std::vector<std::shared_ptr<FileScanTask>>> file_scan_tasks;
324331
std::vector<std::shared_ptr<DataFile>> delete_files;
325332
PlanStatus plan_status = PlanStatus::kCompleted;
326333
std::string plan_id;
@@ -335,8 +342,8 @@ struct ICEBERG_REST_EXPORT PlanTableScanResponse {
335342
/// \brief Response from polling an asynchronous scan plan, including current status and
336343
/// available scan tasks.
337344
struct ICEBERG_REST_EXPORT FetchPlanningResultResponse {
338-
std::vector<std::string> plan_tasks;
339-
std::vector<std::shared_ptr<FileScanTask>> file_scan_tasks;
345+
std::optional<std::vector<std::string>> plan_tasks;
346+
std::optional<std::vector<std::shared_ptr<FileScanTask>>> file_scan_tasks;
340347
std::vector<std::shared_ptr<DataFile>> delete_files;
341348
PlanStatus plan_status = PlanStatus::kCompleted;
342349
std::optional<ErrorResponse> error;
@@ -358,8 +365,8 @@ struct ICEBERG_REST_EXPORT FetchScanTasksRequest {
358365

359366
/// \brief Response containing the file scan tasks for a given plan task token.
360367
struct ICEBERG_REST_EXPORT FetchScanTasksResponse {
361-
std::vector<std::string> plan_tasks;
362-
std::vector<std::shared_ptr<FileScanTask>> file_scan_tasks;
368+
std::optional<std::vector<std::string>> plan_tasks;
369+
std::optional<std::vector<std::shared_ptr<FileScanTask>>> file_scan_tasks;
363370
std::vector<std::shared_ptr<DataFile>> delete_files;
364371

365372
Status Validate() const;

0 commit comments

Comments
 (0)