Skip to content

Commit c115c0a

Browse files
Sandeep GottimukkalaSandeep Gottimukkala
authored andcommitted
feat(rest): use PlanStatus enum and shared_ptr for scan task types
Replace string-based plan_status fields in PlanTableScanResponse and FetchPlanningResultResponse with a PlanStatus enum defined in table_scan.h. Change file_scan_tasks and delete_files in BaseScanTaskResponse from value vectors to vector<shared_ptr<...>> for shared ownership semantics.
1 parent a970e07 commit c115c0a

9 files changed

Lines changed: 93 additions & 49 deletions

File tree

src/iceberg/catalog/rest/json_serde.cc

Lines changed: 5 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -113,7 +113,7 @@ Status BaseScanTaskResponseFromJson(
113113
for (const auto& entry_json : delete_files_json) {
114114
ICEBERG_ASSIGN_OR_RAISE(auto delete_file,
115115
DataFileFromJson(entry_json, partition_specs_by_id, schema));
116-
response->delete_files.push_back(std::move(delete_file));
116+
response->delete_files.push_back(std::make_shared<DataFile>(std::move(delete_file)));
117117
}
118118

119119
// 3. file_scan_tasks
@@ -595,8 +595,9 @@ Result<PlanTableScanResponse> PlanTableScanResponseFromJson(
595595
partition_specs_by_id,
596596
const Schema& schema) {
597597
PlanTableScanResponse response;
598-
ICEBERG_ASSIGN_OR_RAISE(response.plan_status,
598+
ICEBERG_ASSIGN_OR_RAISE(auto plan_status_str,
599599
GetJsonValue<std::string>(json, kPlanStatus));
600+
ICEBERG_ASSIGN_OR_RAISE(response.plan_status, PlanStatusFromString(plan_status_str));
600601
ICEBERG_ASSIGN_OR_RAISE(response.plan_id,
601602
GetJsonValueOrDefault<std::string>(json, kPlanId));
602603
ICEBERG_RETURN_UNEXPECTED(
@@ -611,8 +612,9 @@ Result<FetchPlanningResultResponse> FetchPlanningResultResponseFromJson(
611612
partition_specs_by_id,
612613
const Schema& schema) {
613614
FetchPlanningResultResponse response;
614-
ICEBERG_ASSIGN_OR_RAISE(response.plan_status,
615+
ICEBERG_ASSIGN_OR_RAISE(auto plan_status_str,
615616
GetJsonValue<std::string>(json, kPlanStatus));
617+
ICEBERG_ASSIGN_OR_RAISE(response.plan_status, PlanStatusFromString(plan_status_str));
616618
ICEBERG_RETURN_UNEXPECTED(
617619
BaseScanTaskResponseFromJson(json, &response, partition_specs_by_id, schema));
618620
ICEBERG_RETURN_UNEXPECTED(response.Validate());

src/iceberg/catalog/rest/types.cc

Lines changed: 26 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -131,33 +131,45 @@ bool BaseScanTaskResponse::operator==(const BaseScanTaskResponse& other) const {
131131
if (plan_tasks != other.plan_tasks) {
132132
return false;
133133
}
134-
if (delete_files != other.delete_files) {
134+
if (delete_files.size() != other.delete_files.size()) {
135135
return false;
136136
}
137+
for (size_t i = 0; i < delete_files.size(); ++i) {
138+
if (!delete_files[i] != !other.delete_files[i]) {
139+
return false;
140+
}
141+
if (delete_files[i] && *delete_files[i] != *other.delete_files[i]) {
142+
return false;
143+
}
144+
}
137145
if (file_scan_tasks.size() != other.file_scan_tasks.size()) {
138146
return false;
139147
}
140148
for (size_t i = 0; i < file_scan_tasks.size(); ++i) {
141149
const auto& a = file_scan_tasks[i];
142150
const auto& b = other.file_scan_tasks[i];
143-
if (!a.data_file() != !b.data_file()) {
151+
if (!a != !b) {
144152
return false;
145153
}
146-
if (a.data_file() && *a.data_file() != *b.data_file()) {
154+
if (!a) continue;
155+
if (!a->data_file() != !b->data_file()) {
147156
return false;
148157
}
149-
if (a.delete_files().size() != b.delete_files().size()) {
158+
if (a->data_file() && *a->data_file() != *b->data_file()) {
150159
return false;
151160
}
152-
for (size_t j = 0; j < a.delete_files().size(); ++j) {
153-
if (!a.delete_files()[j] != !b.delete_files()[j]) {
161+
if (a->delete_files().size() != b->delete_files().size()) {
162+
return false;
163+
}
164+
for (size_t j = 0; j < a->delete_files().size(); ++j) {
165+
if (!a->delete_files()[j] != !b->delete_files()[j]) {
154166
return false;
155167
}
156-
if (a.delete_files()[j] && *a.delete_files()[j] != *b.delete_files()[j]) {
168+
if (a->delete_files()[j] && *a->delete_files()[j] != *b->delete_files()[j]) {
157169
return false;
158170
}
159171
}
160-
if (a.residual_filter() != b.residual_filter()) {
172+
if (a->residual_filter() != b->residual_filter()) {
161173
return false;
162174
}
163175
}
@@ -220,22 +232,20 @@ Status PlanTableScanRequest::Validate() const {
220232
}
221233

222234
Status PlanTableScanResponse::Validate() const {
223-
if (plan_status.empty()) {
224-
return ValidationFailed("Invalid response: plan status must be defined");
225-
}
226-
if (plan_status == "submitted" && plan_id.empty()) {
235+
if (plan_status == PlanStatus::kSubmitted && plan_id.empty()) {
227236
return ValidationFailed(
228237
"Invalid response: plan id should be defined when status is 'submitted'");
229238
}
230-
if (plan_status == "cancelled") {
239+
if (plan_status == PlanStatus::kCancelled) {
231240
return ValidationFailed(
232241
"Invalid response: 'cancelled' is not a valid status for planTableScan");
233242
}
234-
if (plan_status != "completed" && (!plan_tasks.empty() || !file_scan_tasks.empty())) {
243+
if (plan_status != PlanStatus::kCompleted && (!plan_tasks.empty() || !file_scan_tasks.empty())) {
235244
return ValidationFailed(
236245
"Invalid response: tasks can only be defined when status is 'completed'");
237246
}
238-
if (!plan_id.empty() && plan_status != "submitted" && plan_status != "completed") {
247+
if (!plan_id.empty() && plan_status != PlanStatus::kSubmitted &&
248+
plan_status != PlanStatus::kCompleted) {
239249
return ValidationFailed(
240250
"Invalid response: plan id can only be defined when status is 'submitted' or "
241251
"'completed'");
@@ -249,10 +259,7 @@ Status PlanTableScanResponse::Validate() const {
249259
}
250260

251261
Status FetchPlanningResultResponse::Validate() const {
252-
if (plan_status.empty()) {
253-
return ValidationFailed("Invalid status: null");
254-
}
255-
if (plan_status != "completed" && (!plan_tasks.empty() || !file_scan_tasks.empty())) {
262+
if (plan_status != PlanStatus::kCompleted && (!plan_tasks.empty() || !file_scan_tasks.empty())) {
256263
return ValidationFailed(
257264
"Invalid response: tasks can only be returned in a 'completed' status");
258265
}

src/iceberg/catalog/rest/types.h

Lines changed: 5 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,7 @@
2323
#include <memory>
2424
#include <optional>
2525
#include <string>
26+
#include <string_view>
2627
#include <unordered_map>
2728
#include <vector>
2829

@@ -319,8 +320,8 @@ struct ICEBERG_REST_EXPORT PlanTableScanRequest {
319320
/// endpoints.
320321
struct ICEBERG_REST_EXPORT BaseScanTaskResponse {
321322
std::vector<std::string> plan_tasks;
322-
std::vector<FileScanTask> file_scan_tasks;
323-
std::vector<DataFile> delete_files;
323+
std::vector<std::shared_ptr<FileScanTask>> file_scan_tasks;
324+
std::vector<std::shared_ptr<DataFile>> delete_files;
324325
// std::unordered_map<std::string, PartitionSpec> specsById;
325326

326327
Status Validate() const { return {}; };
@@ -331,7 +332,7 @@ struct ICEBERG_REST_EXPORT BaseScanTaskResponse {
331332
/// \brief Response from initiating a scan planning operation, including plan status and
332333
/// initial scan tasks.
333334
struct ICEBERG_REST_EXPORT PlanTableScanResponse : BaseScanTaskResponse {
334-
std::string plan_status;
335+
PlanStatus plan_status = PlanStatus::kCompleted;
335336
std::string plan_id;
336337
// TODO(sandeepg): Add credentials.
337338

@@ -343,7 +344,7 @@ struct ICEBERG_REST_EXPORT PlanTableScanResponse : BaseScanTaskResponse {
343344
/// \brief Response from polling an asynchronous scan plan, including current status and
344345
/// available scan tasks.
345346
struct ICEBERG_REST_EXPORT FetchPlanningResultResponse : BaseScanTaskResponse {
346-
std::string plan_status;
347+
PlanStatus plan_status = PlanStatus::kCompleted;
347348
// TODO(sandeepg): Add credentials.
348349

349350
Status Validate() const;

src/iceberg/json_serde.cc

Lines changed: 9 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -1894,15 +1894,16 @@ Result<DataFile> DataFileFromJson(
18941894
return df;
18951895
}
18961896

1897-
Result<std::vector<FileScanTask>> FileScanTasksFromJson(
1898-
const nlohmann::json& json, const std::vector<DataFile>& delete_files,
1897+
Result<std::vector<std::shared_ptr<FileScanTask>>> FileScanTasksFromJson(
1898+
const nlohmann::json& json,
1899+
const std::vector<std::shared_ptr<DataFile>>& delete_files,
18991900
const std::unordered_map<int32_t, std::shared_ptr<PartitionSpec>>& partitionSpecById,
19001901
const Schema& schema) {
19011902
if (!json.is_array()) {
19021903
return JsonParseError("Cannot parse file scan tasks from non-array: {}",
19031904
SafeDumpJson(json));
19041905
}
1905-
std::vector<FileScanTask> file_scan_tasks;
1906+
std::vector<std::shared_ptr<FileScanTask>> file_scan_tasks;
19061907
for (const auto& task_json : json) {
19071908
if (!task_json.is_object()) {
19081909
return JsonParseError("Cannot parse file scan task from a non-object: {}",
@@ -1925,7 +1926,7 @@ Result<std::vector<FileScanTask>> FileScanTasksFromJson(
19251926
"delete-file-references index {} is out of range (delete_files size: {})",
19261927
ref, delete_files.size());
19271928
}
1928-
task_delete_files.push_back(std::make_shared<DataFile>(delete_files[ref]));
1929+
task_delete_files.push_back(delete_files[ref]);
19291930
}
19301931
}
19311932

@@ -1936,9 +1937,10 @@ Result<std::vector<FileScanTask>> FileScanTasksFromJson(
19361937
ICEBERG_ASSIGN_OR_RAISE(residual_filter, ExpressionFromJson(filter_json));
19371938
}
19381939

1939-
file_scan_tasks.emplace_back(std::make_shared<DataFile>(std::move(data_file)),
1940-
std::move(task_delete_files),
1941-
std::move(residual_filter));
1940+
file_scan_tasks.push_back(
1941+
std::make_shared<FileScanTask>(std::make_shared<DataFile>(std::move(data_file)),
1942+
std::move(task_delete_files),
1943+
std::move(residual_filter)));
19421944
}
19431945
return file_scan_tasks;
19441946
}

src/iceberg/json_serde_internal.h

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -447,8 +447,9 @@ ICEBERG_EXPORT Result<DataFile> DataFileFromJson(
447447
/// \param schema The table schema, used with partitionSpecById to resolve partition
448448
/// types.
449449
/// \return A vector of `FileScanTask` objects or an error if the conversion fails.
450-
ICEBERG_EXPORT Result<std::vector<FileScanTask>> FileScanTasksFromJson(
451-
const nlohmann::json& json, const std::vector<DataFile>& delete_files,
450+
ICEBERG_EXPORT Result<std::vector<std::shared_ptr<FileScanTask>>> FileScanTasksFromJson(
451+
const nlohmann::json& json,
452+
const std::vector<std::shared_ptr<DataFile>>& delete_files,
452453
const std::unordered_map<int32_t, std::shared_ptr<PartitionSpec>>& partitionSpecById,
453454
const Schema& schema);
454455

src/iceberg/table_scan.cc

Lines changed: 18 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -765,4 +765,22 @@ IncrementalChangelogScan::PlanFiles(std::optional<int64_t> from_snapshot_id_excl
765765
return NotImplemented("IncrementalChangelogScan::PlanFiles is not implemented");
766766
}
767767

768+
std::string_view ToString(PlanStatus status) {
769+
switch (status) {
770+
case PlanStatus::kSubmitted: return "submitted";
771+
case PlanStatus::kCompleted: return "completed";
772+
case PlanStatus::kCancelled: return "cancelled";
773+
case PlanStatus::kFailed: return "failed";
774+
}
775+
return "unknown";
776+
}
777+
778+
Result<PlanStatus> PlanStatusFromString(std::string_view status_str) {
779+
if (status_str == "submitted") return PlanStatus::kSubmitted;
780+
if (status_str == "completed") return PlanStatus::kCompleted;
781+
if (status_str == "cancelled") return PlanStatus::kCancelled;
782+
if (status_str == "failed") return PlanStatus::kFailed;
783+
return JsonParseError("Unknown plan status: {}", status_str);
784+
}
785+
768786
} // namespace iceberg

src/iceberg/table_scan.h

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,7 @@
2323
#include <memory>
2424
#include <optional>
2525
#include <string>
26+
#include <string_view>
2627
#include <unordered_map>
2728
#include <unordered_set>
2829
#include <vector>
@@ -35,6 +36,17 @@
3536

3637
namespace iceberg {
3738

39+
/// \brief Status of a server-side scan planning operation.
40+
enum class PlanStatus {
41+
kSubmitted,
42+
kCompleted,
43+
kCancelled,
44+
kFailed,
45+
};
46+
47+
ICEBERG_EXPORT std::string_view ToString(PlanStatus status);
48+
ICEBERG_EXPORT Result<PlanStatus> PlanStatusFromString(std::string_view status_str);
49+
3850
/// \brief An abstract scan task.
3951
class ICEBERG_EXPORT ScanTask {
4052
public:

src/iceberg/test/json_serde_test.cc

Lines changed: 8 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -942,10 +942,10 @@ TEST(FileScanTasksFromJsonTest, SingleTaskNoDeleteFiles) {
942942
ASSERT_THAT(result, IsOk());
943943
ASSERT_EQ(result.value().size(), 1U);
944944
const auto& task = result.value()[0];
945-
ASSERT_NE(task.data_file(), nullptr);
946-
EXPECT_EQ(task.data_file()->file_path, "s3://bucket/data/file.parquet");
947-
EXPECT_TRUE(task.delete_files().empty());
948-
EXPECT_EQ(task.residual_filter(), nullptr);
945+
ASSERT_NE(task->data_file(), nullptr);
946+
EXPECT_EQ(task->data_file()->file_path, "s3://bucket/data/file.parquet");
947+
EXPECT_TRUE(task->delete_files().empty());
948+
EXPECT_EQ(task->residual_filter(), nullptr);
949949
}
950950

951951
TEST(FileScanTasksFromJsonTest, TaskWithDeleteFileReferences) {
@@ -967,12 +967,13 @@ TEST(FileScanTasksFromJsonTest, TaskWithDeleteFileReferences) {
967967
"delete-file-references": [0]
968968
}])"_json;
969969

970-
auto result = FileScanTasksFromJson(json, {delete_file}, {}, Schema({}, 0));
970+
auto result =
971+
FileScanTasksFromJson(json, {std::make_shared<DataFile>(delete_file)}, {}, Schema({}, 0));
971972
ASSERT_THAT(result, IsOk());
972973
ASSERT_EQ(result.value().size(), 1U);
973974
const auto& task = result.value()[0];
974-
ASSERT_EQ(task.delete_files().size(), 1U);
975-
EXPECT_EQ(task.delete_files()[0]->file_path, "s3://bucket/deletes/pos_delete.parquet");
975+
ASSERT_EQ(task->delete_files().size(), 1U);
976+
EXPECT_EQ(task->delete_files()[0]->file_path, "s3://bucket/deletes/pos_delete.parquet");
976977
}
977978

978979
TEST(FileScanTasksFromJsonTest, DeleteFileReferenceOutOfRange) {

src/iceberg/test/rest_json_serde_test.cc

Lines changed: 7 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -1395,7 +1395,7 @@ TEST(PlanTableScanResponseFromJsonTest, SubmittedStatusMissingOptionalFields) {
13951395
auto json = nlohmann::json::parse(R"({"status":"submitted","plan-id":"abc-123"})");
13961396
auto result = PlanTableScanResponseFromJson(json, EmptySpecs(), EmptySchema());
13971397
ASSERT_THAT(result, IsOk());
1398-
EXPECT_EQ(result->plan_status, "submitted");
1398+
EXPECT_EQ(result->plan_status, PlanStatus::kSubmitted);
13991399
EXPECT_EQ(result->plan_id, "abc-123");
14001400
EXPECT_TRUE(result->plan_tasks.empty());
14011401
EXPECT_TRUE(result->file_scan_tasks.empty());
@@ -1408,7 +1408,7 @@ TEST(PlanTableScanResponseFromJsonTest, CompletedStatusWithPlanTasks) {
14081408
R"({"status":"completed","plan-id":"abc-123","plan-tasks":["task-1","task-2"],"delete-files":[],"file-scan-tasks":[]})");
14091409
auto result = PlanTableScanResponseFromJson(json, EmptySpecs(), EmptySchema());
14101410
ASSERT_THAT(result, IsOk());
1411-
EXPECT_EQ(result->plan_status, "completed");
1411+
EXPECT_EQ(result->plan_status, PlanStatus::kCompleted);
14121412
EXPECT_EQ(result->plan_id, "abc-123");
14131413
ASSERT_EQ(result->plan_tasks.size(), 2);
14141414
EXPECT_EQ(result->plan_tasks[0], "task-1");
@@ -1436,7 +1436,7 @@ TEST(FetchPlanningResultResponseFromJsonTest, SubmittedStatusNoTasks) {
14361436
auto json = nlohmann::json::parse(R"({"status":"submitted"})");
14371437
auto result = FetchPlanningResultResponseFromJson(json, EmptySpecs(), EmptySchema());
14381438
ASSERT_THAT(result, IsOk());
1439-
EXPECT_EQ(result->plan_status, "submitted");
1439+
EXPECT_EQ(result->plan_status, PlanStatus::kSubmitted);
14401440
EXPECT_TRUE(result->plan_tasks.empty());
14411441
EXPECT_TRUE(result->file_scan_tasks.empty());
14421442
EXPECT_TRUE(result->delete_files.empty());
@@ -1447,7 +1447,7 @@ TEST(FetchPlanningResultResponseFromJsonTest, CompletedStatusWithPlanTasks) {
14471447
R"({"status":"completed","plan-tasks":["task-1"],"delete-files":[],"file-scan-tasks":[]})");
14481448
auto result = FetchPlanningResultResponseFromJson(json, EmptySpecs(), EmptySchema());
14491449
ASSERT_THAT(result, IsOk());
1450-
EXPECT_EQ(result->plan_status, "completed");
1450+
EXPECT_EQ(result->plan_status, PlanStatus::kCompleted);
14511451
ASSERT_EQ(result->plan_tasks.size(), 1);
14521452
EXPECT_EQ(result->plan_tasks[0], "task-1");
14531453
}
@@ -1492,10 +1492,10 @@ TEST(FetchScanTasksResponseFromJsonTest, WithFileScanTasks) {
14921492
EXPECT_TRUE(result->plan_tasks.empty());
14931493
ASSERT_EQ(result->delete_files.size(), 1);
14941494
ASSERT_EQ(result->file_scan_tasks.size(), 1);
1495-
EXPECT_EQ(result->file_scan_tasks[0].data_file()->file_path,
1495+
EXPECT_EQ(result->file_scan_tasks[0]->data_file()->file_path,
14961496
"s3://bucket/data/file.parquet");
1497-
ASSERT_EQ(result->file_scan_tasks[0].delete_files().size(), 1);
1498-
EXPECT_EQ(result->file_scan_tasks[0].delete_files()[0]->file_path,
1497+
ASSERT_EQ(result->file_scan_tasks[0]->delete_files().size(), 1);
1498+
EXPECT_EQ(result->file_scan_tasks[0]->delete_files()[0]->file_path,
14991499
"s3://bucket/deletes/delete.parquet");
15001500
}
15011501

0 commit comments

Comments
 (0)