Skip to content

Commit ab6c8d4

Browse files
Sandeep GottimukkalaSandeep Gottimukkala
authored andcommitted
Comments
1 parent 6aa73bf commit ab6c8d4

File tree

4 files changed

+156
-17
lines changed

4 files changed

+156
-17
lines changed

src/iceberg/catalog/rest/json_serde.cc

Lines changed: 7 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -563,22 +563,13 @@ Status BaseScanTaskResponseFromJson(
563563
const std::unordered_map<int32_t, std::shared_ptr<PartitionSpec>>& partition_specs_by_id,
564564
const Schema& schema) {
565565
// 1. plan_tasks
566-
ICEBERG_ASSIGN_OR_RAISE(auto plan_tasks,
567-
GetJsonValue<nlohmann::json>(json, kPlanTasks));
568-
if (!plan_tasks.is_array()) {
569-
return JsonParseError("Cannot parse plan tasks from non-array: {}",
570-
SafeDumpJson(plan_tasks));
571-
}
572566
ICEBERG_ASSIGN_OR_RAISE(response->plan_tasks,
573-
GetTypedJsonValue<std::vector<std::string>>(plan_tasks));
567+
GetJsonValueOrDefault<std::vector<std::string>>(json, kPlanTasks));
574568

575569
// 2. delete_files
576570
ICEBERG_ASSIGN_OR_RAISE(auto delete_files_json,
577-
GetJsonValue<nlohmann::json>(json, kDeleteFiles));
578-
if (!delete_files_json.is_array()) {
579-
return JsonParseError("Cannot parse delete files from non-array: {}",
580-
SafeDumpJson(delete_files_json));
581-
}
571+
GetJsonValueOrDefault<nlohmann::json>(json, kDeleteFiles,
572+
nlohmann::json::array()));
582573
for (const auto& entry_json : delete_files_json) {
583574
ICEBERG_ASSIGN_OR_RAISE(auto delete_file,
584575
DataFileFromJson(entry_json, partition_specs_by_id, schema));
@@ -587,7 +578,8 @@ Status BaseScanTaskResponseFromJson(
587578

588579
// 3. file_scan_tasks
589580
ICEBERG_ASSIGN_OR_RAISE(auto file_scan_tasks_json,
590-
GetJsonValue<nlohmann::json>(json, kFileScanTasks));
581+
GetJsonValueOrDefault<nlohmann::json>(json, kFileScanTasks,
582+
nlohmann::json::array()));
591583
ICEBERG_ASSIGN_OR_RAISE(
592584
response->file_scan_tasks,
593585
FileScanTasksFromJson(file_scan_tasks_json, response->delete_files,
@@ -602,7 +594,8 @@ Result<PlanTableScanResponse> PlanTableScanResponseFromJson(
602594
PlanTableScanResponse response;
603595
ICEBERG_ASSIGN_OR_RAISE(response.plan_status,
604596
GetJsonValue<std::string>(json, kPlanStatus));
605-
ICEBERG_ASSIGN_OR_RAISE(response.plan_id, GetJsonValue<std::string>(json, kPlanId));
597+
ICEBERG_ASSIGN_OR_RAISE(response.plan_id,
598+
GetJsonValueOrDefault<std::string>(json, kPlanId));
606599
ICEBERG_RETURN_UNEXPECTED(
607600
BaseScanTaskResponseFromJson(json, &response, partition_specs_by_id, schema));
608601
ICEBERG_RETURN_UNEXPECTED(response.Validate());

src/iceberg/catalog/rest/rest_catalog.cc

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -512,15 +512,17 @@ Result<PlanTableScanResponse> RestCatalog::PlanTableScan(
512512
request.select = context.selected_columns;
513513
request.filter = context.filter;
514514
request.case_sensitive = context.case_sensitive;
515-
request.use_snapshot_schema = false; // TODO
515+
request.use_snapshot_schema = false;
516516
if (context.from_snapshot_id.has_value() && context.to_snapshot_id.has_value()) {
517517
request.start_snapshot_id = context.from_snapshot_id.value();
518518
request.end_snapshot_id = context.to_snapshot_id.value();
519+
request.use_snapshot_schema = true;
519520
}
520521
if (context.min_rows_requested.has_value()) {
521522
request.min_rows_required = context.min_rows_requested.value();
522523
}
523524

525+
ICEBERG_RETURN_UNEXPECTED(request.Validate());
524526
ICEBERG_ASSIGN_OR_RAISE(auto request_json, ToJson(request));
525527
ICEBERG_ASSIGN_OR_RAISE(auto json_request, ToJsonString(request_json));
526528
ICEBERG_ASSIGN_OR_RAISE(
@@ -566,6 +568,7 @@ Result<FetchScanTasksResponse> RestCatalog::FetchScanTasks(
566568
ICEBERG_ASSIGN_OR_RAISE(auto specs_ref, table.specs());
567569

568570
FetchScanTasksRequest request{.planTask = plan_task};
571+
ICEBERG_RETURN_UNEXPECTED(request.Validate());
569572
ICEBERG_ASSIGN_OR_RAISE(auto json_request, ToJsonString(ToJson(request)));
570573
ICEBERG_ASSIGN_OR_RAISE(
571574
const auto response,

src/iceberg/catalog/rest/types.h

Lines changed: 11 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -298,12 +298,13 @@ struct ICEBERG_REST_EXPORT OAuthTokenResponse {
298298
bool operator==(const OAuthTokenResponse&) const = default;
299299
};
300300

301+
/// \brief Request to initiate a server-side scan planning operation.
301302
struct ICEBERG_REST_EXPORT PlanTableScanRequest {
302303
std::optional<int64_t> snapshot_id;
303304
std::vector<std::string> select;
304305
std::shared_ptr<Expression> filter;
305-
bool case_sensitive;
306-
bool use_snapshot_schema;
306+
bool case_sensitive = true;
307+
bool use_snapshot_schema = false;
307308
std::optional<int64_t> start_snapshot_id;
308309
std::optional<int64_t> end_snapshot_id;
309310
std::vector<std::string> statsFields;
@@ -314,6 +315,8 @@ struct ICEBERG_REST_EXPORT PlanTableScanRequest {
314315
bool operator==(const PlanTableScanRequest&) const;
315316
};
316317

318+
/// \brief Base response containing scan tasks and delete files returned by scan plan
319+
/// endpoints.
317320
struct ICEBERG_REST_EXPORT BaseScanTaskResponse {
318321
std::vector<std::string> plan_tasks;
319322
std::vector<FileScanTask> file_scan_tasks;
@@ -325,6 +328,8 @@ struct ICEBERG_REST_EXPORT BaseScanTaskResponse {
325328
bool operator==(const BaseScanTaskResponse&) const;
326329
};
327330

331+
/// \brief Response from initiating a scan planning operation, including plan status and
332+
/// initial scan tasks.
328333
struct ICEBERG_REST_EXPORT PlanTableScanResponse : BaseScanTaskResponse {
329334
std::string plan_status;
330335
std::string plan_id;
@@ -335,6 +340,8 @@ struct ICEBERG_REST_EXPORT PlanTableScanResponse : BaseScanTaskResponse {
335340
bool operator==(const PlanTableScanResponse&) const;
336341
};
337342

343+
/// \brief Response from polling an asynchronous scan plan, including current status and
344+
/// available scan tasks.
338345
struct ICEBERG_REST_EXPORT FetchPlanningResultResponse : BaseScanTaskResponse {
339346
PlanStatus plan_status;
340347
// TODO: Add credentials.
@@ -344,6 +351,7 @@ struct ICEBERG_REST_EXPORT FetchPlanningResultResponse : BaseScanTaskResponse {
344351
bool operator==(const FetchPlanningResultResponse&) const;
345352
};
346353

354+
/// \brief Request to fetch the scan tasks for a given plan task token.
347355
struct ICEBERG_REST_EXPORT FetchScanTasksRequest {
348356
std::string planTask;
349357

@@ -352,6 +360,7 @@ struct ICEBERG_REST_EXPORT FetchScanTasksRequest {
352360
bool operator==(const FetchScanTasksRequest&) const;
353361
};
354362

363+
/// \brief Response containing the file scan tasks for a given plan task token.
355364
struct ICEBERG_REST_EXPORT FetchScanTasksResponse : BaseScanTaskResponse {
356365
Status Validate() const;
357366

src/iceberg/test/rest_json_serde_test.cc

Lines changed: 134 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,7 @@
2626
#include "iceberg/catalog/rest/json_serde_internal.h"
2727
#include "iceberg/catalog/rest/types.h"
2828
#include "iceberg/partition_spec.h"
29+
#include "iceberg/schema.h"
2930
#include "iceberg/result.h"
3031
#include "iceberg/sort_order.h"
3132
#include "iceberg/table_identifier.h"
@@ -1380,4 +1381,137 @@ INSTANTIATE_TEST_SUITE_P(
13801381
return info.param.test_name;
13811382
});
13821383

1384+
// Helper: empty schema and specs for scan response tests that don't need partition parsing.
1385+
static Schema EmptySchema() { return Schema({}, 0); }
1386+
static std::unordered_map<int32_t, std::shared_ptr<PartitionSpec>> EmptySpecs() { return {}; }
1387+
1388+
// --- PlanTableScanResponse ---
1389+
1390+
TEST(PlanTableScanResponseFromJsonTest, SubmittedStatusMissingOptionalFields) {
1391+
// "submitted" response: only status and plan-id, no tasks
1392+
auto json = nlohmann::json::parse(
1393+
R"({"status":"submitted","plan-id":"abc-123"})");
1394+
auto result = PlanTableScanResponseFromJson(json, EmptySpecs(), EmptySchema());
1395+
ASSERT_THAT(result, IsOk());
1396+
EXPECT_EQ(result->plan_status, "submitted");
1397+
EXPECT_EQ(result->plan_id, "abc-123");
1398+
EXPECT_TRUE(result->plan_tasks.empty());
1399+
EXPECT_TRUE(result->file_scan_tasks.empty());
1400+
EXPECT_TRUE(result->delete_files.empty());
1401+
}
1402+
1403+
TEST(PlanTableScanResponseFromJsonTest, CompletedStatusWithPlanTasks) {
1404+
// "completed" response with plan-tasks but no file-scan-tasks
1405+
auto json = nlohmann::json::parse(
1406+
R"({"status":"completed","plan-id":"abc-123","plan-tasks":["task-1","task-2"],"delete-files":[],"file-scan-tasks":[]})");
1407+
auto result = PlanTableScanResponseFromJson(json, EmptySpecs(), EmptySchema());
1408+
ASSERT_THAT(result, IsOk());
1409+
EXPECT_EQ(result->plan_status, "completed");
1410+
EXPECT_EQ(result->plan_id, "abc-123");
1411+
ASSERT_EQ(result->plan_tasks.size(), 2);
1412+
EXPECT_EQ(result->plan_tasks[0], "task-1");
1413+
EXPECT_EQ(result->plan_tasks[1], "task-2");
1414+
}
1415+
1416+
TEST(PlanTableScanResponseFromJsonTest, MissingRequiredStatus) {
1417+
auto json = nlohmann::json::parse(R"({"plan-id":"abc-123"})");
1418+
auto result = PlanTableScanResponseFromJson(json, EmptySpecs(), EmptySchema());
1419+
ASSERT_THAT(result, IsError(ErrorKind::kJsonParseError));
1420+
EXPECT_THAT(result, HasErrorMessage("Missing 'status'"));
1421+
}
1422+
1423+
TEST(PlanTableScanResponseFromJsonTest, MissingPlanIdDefaultsToEmptyForFailedStatus) {
1424+
// plan-id is optional for non-submitted/completed statuses
1425+
auto json = nlohmann::json::parse(R"({"status":"failed"})");
1426+
auto result = PlanTableScanResponseFromJson(json, EmptySpecs(), EmptySchema());
1427+
ASSERT_THAT(result, IsOk());
1428+
EXPECT_TRUE(result->plan_id.empty());
1429+
}
1430+
1431+
// --- FetchPlanningResultResponse ---
1432+
1433+
TEST(FetchPlanningResultResponseFromJsonTest, SubmittedStatusNoTasks) {
1434+
auto json = nlohmann::json::parse(R"({"status":"submitted"})");
1435+
auto result = FetchPlanningResultResponseFromJson(json, EmptySpecs(), EmptySchema());
1436+
ASSERT_THAT(result, IsOk());
1437+
EXPECT_EQ(result->plan_status.ToString(), "submitted");
1438+
EXPECT_TRUE(result->plan_tasks.empty());
1439+
EXPECT_TRUE(result->file_scan_tasks.empty());
1440+
EXPECT_TRUE(result->delete_files.empty());
1441+
}
1442+
1443+
TEST(FetchPlanningResultResponseFromJsonTest, CompletedStatusWithPlanTasks) {
1444+
auto json = nlohmann::json::parse(
1445+
R"({"status":"completed","plan-tasks":["task-1"],"delete-files":[],"file-scan-tasks":[]})");
1446+
auto result = FetchPlanningResultResponseFromJson(json, EmptySpecs(), EmptySchema());
1447+
ASSERT_THAT(result, IsOk());
1448+
EXPECT_EQ(result->plan_status.ToString(), "completed");
1449+
ASSERT_EQ(result->plan_tasks.size(), 1);
1450+
EXPECT_EQ(result->plan_tasks[0], "task-1");
1451+
}
1452+
1453+
TEST(FetchPlanningResultResponseFromJsonTest, MissingRequiredStatus) {
1454+
auto json = nlohmann::json::parse(R"({})");
1455+
auto result = FetchPlanningResultResponseFromJson(json, EmptySpecs(), EmptySchema());
1456+
ASSERT_THAT(result, IsError(ErrorKind::kJsonParseError));
1457+
EXPECT_THAT(result, HasErrorMessage("Missing 'status'"));
1458+
}
1459+
1460+
// --- FetchScanTasksResponse ---
1461+
1462+
TEST(FetchScanTasksResponseFromJsonTest, WithFileScanTasks) {
1463+
// One file scan task with a data file and one delete file referenced by index.
1464+
auto json = nlohmann::json::parse(R"({
1465+
"plan-tasks": [],
1466+
"delete-files": [
1467+
{
1468+
"content": "POSITION_DELETES",
1469+
"file-path": "s3://bucket/deletes/delete.parquet",
1470+
"file-format": "PARQUET",
1471+
"file-size-in-bytes": 512,
1472+
"record-count": 5
1473+
}
1474+
],
1475+
"file-scan-tasks": [
1476+
{
1477+
"data-file": {
1478+
"content": "DATA",
1479+
"file-path": "s3://bucket/data/file.parquet",
1480+
"file-format": "PARQUET",
1481+
"file-size-in-bytes": 12345,
1482+
"record-count": 100
1483+
},
1484+
"delete-file-references": [0]
1485+
}
1486+
]
1487+
})");
1488+
auto result = FetchScanTasksResponseFromJson(json, EmptySpecs(), EmptySchema());
1489+
ASSERT_THAT(result, IsOk());
1490+
EXPECT_TRUE(result->plan_tasks.empty());
1491+
ASSERT_EQ(result->delete_files.size(), 1);
1492+
ASSERT_EQ(result->file_scan_tasks.size(), 1);
1493+
EXPECT_EQ(result->file_scan_tasks[0].data_file()->file_path,
1494+
"s3://bucket/data/file.parquet");
1495+
ASSERT_EQ(result->file_scan_tasks[0].delete_files().size(), 1);
1496+
EXPECT_EQ(result->file_scan_tasks[0].delete_files()[0]->file_path,
1497+
"s3://bucket/deletes/delete.parquet");
1498+
}
1499+
1500+
TEST(FetchScanTasksResponseFromJsonTest, WithPlanTasksOnly) {
1501+
auto json = nlohmann::json::parse(
1502+
R"({"plan-tasks":["task-1","task-2"],"delete-files":[],"file-scan-tasks":[]})");
1503+
auto result = FetchScanTasksResponseFromJson(json, EmptySpecs(), EmptySchema());
1504+
ASSERT_THAT(result, IsOk());
1505+
ASSERT_EQ(result->plan_tasks.size(), 2);
1506+
EXPECT_EQ(result->plan_tasks[0], "task-1");
1507+
EXPECT_TRUE(result->file_scan_tasks.empty());
1508+
}
1509+
1510+
TEST(FetchScanTasksResponseFromJsonTest, AllFieldsMissing) {
1511+
// Both plan-tasks and file-scan-tasks absent → Validate() should fail
1512+
auto json = nlohmann::json::parse(R"({})");
1513+
auto result = FetchScanTasksResponseFromJson(json, EmptySpecs(), EmptySchema());
1514+
ASSERT_THAT(result, IsError(ErrorKind::kValidationFailed));
1515+
}
1516+
13831517
} // namespace iceberg::rest

0 commit comments

Comments
 (0)