Skip to content

Commit 2ca9bb2

Browse files
Sandeep GottimukkalaSandeep Gottimukkala
authored andcommitted
feat(rest): add ToJson for DataFile and scan task responses with roundtrip tests
Add ToJson(DataFile) and ToJson for PlanTableScanResponse, FetchPlanningResultResponse, and FetchScanTasksResponse to support both roundtrip test coverage and server-side use cases. The response serializers use an indexed delete-files structure matching the REST Catalog wire format.
1 parent ae43c16 commit 2ca9bb2

3 files changed

Lines changed: 245 additions & 0 deletions

File tree

src/iceberg/catalog/rest/json_serde.cc

Lines changed: 142 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -17,8 +17,10 @@
1717
* under the License.
1818
*/
1919

20+
#include <map>
2021
#include <memory>
2122
#include <string>
23+
#include <unordered_map>
2224
#include <utility>
2325
#include <vector>
2426

@@ -314,8 +316,129 @@ Result<std::vector<std::shared_ptr<FileScanTask>>> FileScanTasksFromJson(
314316
return file_scan_tasks;
315317
}
316318

319+
nlohmann::json ToJson(const DataFile& df) {
320+
nlohmann::json json;
321+
json[kContent] = ToString(df.content);
322+
json[kFilePath] = df.file_path;
323+
json[kFileFormat] = ToString(df.file_format);
324+
325+
if (df.partition_spec_id.has_value()) {
326+
json[kSpecId] = df.partition_spec_id.value();
327+
}
328+
329+
json[kRecordCount] = df.record_count;
330+
json[kFileSizeInBytes] = df.file_size_in_bytes;
331+
332+
auto write_int_map = [&](std::string_view key,
333+
const std::map<int32_t, int64_t>& m) {
334+
if (!m.empty()) {
335+
std::vector<int32_t> keys;
336+
std::vector<int64_t> values;
337+
for (const auto& [k, v] : m) {
338+
keys.push_back(k);
339+
values.push_back(v);
340+
}
341+
json[key] = {{"keys", std::move(keys)}, {"values", std::move(values)}};
342+
}
343+
};
344+
345+
write_int_map(kColumnSizes, df.column_sizes);
346+
write_int_map(kValueCounts, df.value_counts);
347+
write_int_map(kNullValueCounts, df.null_value_counts);
348+
write_int_map(kNanValueCounts, df.nan_value_counts);
349+
350+
auto write_binary_map = [&](std::string_view key,
351+
const std::map<int32_t, std::vector<uint8_t>>& m) {
352+
if (!m.empty()) {
353+
std::vector<int32_t> keys;
354+
std::vector<std::vector<uint8_t>> values;
355+
for (const auto& [k, v] : m) {
356+
keys.push_back(k);
357+
values.push_back(v);
358+
}
359+
json[key] = {{"keys", std::move(keys)}, {"values", std::move(values)}};
360+
}
361+
};
362+
363+
write_binary_map(kLowerBounds, df.lower_bounds);
364+
write_binary_map(kUpperBounds, df.upper_bounds);
365+
366+
if (!df.key_metadata.empty()) {
367+
json[kKeyMetadata] = df.key_metadata;
368+
}
369+
if (!df.split_offsets.empty()) {
370+
json[kSplitOffsets] = df.split_offsets;
371+
}
372+
if (!df.equality_ids.empty()) {
373+
json[kEqualityIds] = df.equality_ids;
374+
}
375+
if (df.sort_order_id.has_value()) {
376+
json[kSortOrderId] = df.sort_order_id.value();
377+
}
378+
if (df.first_row_id.has_value()) {
379+
json[kFirstRowId] = df.first_row_id.value();
380+
}
381+
if (df.referenced_data_file.has_value()) {
382+
json[kReferencedDataFile] = df.referenced_data_file.value();
383+
}
384+
if (df.content_offset.has_value()) {
385+
json[kContentOffset] = df.content_offset.value();
386+
}
387+
if (df.content_size_in_bytes.has_value()) {
388+
json[kContentSizeInBytes] = df.content_size_in_bytes.value();
389+
}
390+
391+
return json;
392+
}
393+
317394
namespace {
318395

396+
nlohmann::json BaseScanTaskResponseToJson(const BaseScanTaskResponse& response) {
397+
nlohmann::json json;
398+
399+
SetContainerField(json, kPlanTasks, response.plan_tasks);
400+
401+
// Build delete_files array and a pointer-to-index map for reference lookup.
402+
std::unordered_map<const DataFile*, int32_t> delete_file_index;
403+
nlohmann::json delete_files_json = nlohmann::json::array();
404+
for (size_t i = 0; i < response.delete_files.size(); ++i) {
405+
if (response.delete_files[i]) {
406+
delete_files_json.push_back(ToJson(*response.delete_files[i]));
407+
delete_file_index[response.delete_files[i].get()] = static_cast<int32_t>(i);
408+
}
409+
}
410+
if (!delete_files_json.empty()) {
411+
json[kDeleteFiles] = std::move(delete_files_json);
412+
}
413+
414+
nlohmann::json tasks_json = nlohmann::json::array();
415+
for (const auto& task : response.file_scan_tasks) {
416+
if (!task) continue;
417+
nlohmann::json task_json;
418+
if (task->data_file()) {
419+
task_json[kDataFile] = ToJson(*task->data_file());
420+
}
421+
if (!task->delete_files().empty()) {
422+
std::vector<int32_t> refs;
423+
for (const auto& df : task->delete_files()) {
424+
auto it = delete_file_index.find(df.get());
425+
if (it != delete_file_index.end()) {
426+
refs.push_back(it->second);
427+
}
428+
}
429+
if (!refs.empty()) {
430+
task_json[kDeleteFileReferences] = std::move(refs);
431+
}
432+
}
433+
tasks_json.push_back(std::move(task_json));
434+
}
435+
if (!tasks_json.empty()) {
436+
json[kFileScanTasks] = std::move(tasks_json);
437+
}
438+
439+
return json;
440+
}
441+
319442
Status BaseScanTaskResponseFromJson(
320443
const nlohmann::json& json, BaseScanTaskResponse* response,
321444
const std::unordered_map<int32_t, std::shared_ptr<PartitionSpec>>&
@@ -853,6 +976,25 @@ Result<FetchScanTasksResponse> FetchScanTasksResponseFromJson(
853976
return response;
854977
}
855978

979+
nlohmann::json ToJson(const PlanTableScanResponse& response) {
980+
nlohmann::json json = BaseScanTaskResponseToJson(response);
981+
json[kPlanStatus] = ToString(response.plan_status);
982+
if (!response.plan_id.empty()) {
983+
json[kPlanId] = response.plan_id;
984+
}
985+
return json;
986+
}
987+
988+
nlohmann::json ToJson(const FetchPlanningResultResponse& response) {
989+
nlohmann::json json = BaseScanTaskResponseToJson(response);
990+
json[kPlanStatus] = ToString(response.plan_status);
991+
return json;
992+
}
993+
994+
nlohmann::json ToJson(const FetchScanTasksResponse& response) {
995+
return BaseScanTaskResponseToJson(response);
996+
}
997+
856998
#define ICEBERG_DEFINE_FROM_JSON(Model) \
857999
template <> \
8581000
Result<Model> FromJson<Model>(const nlohmann::json& json) { \

src/iceberg/catalog/rest/json_serde_internal.h

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -92,6 +92,8 @@ ICEBERG_REST_EXPORT Result<FetchScanTasksResponse> FetchScanTasksResponseFromJso
9292
ICEBERG_REST_EXPORT Result<nlohmann::json> ToJson(const PlanTableScanRequest& request);
9393
ICEBERG_REST_EXPORT nlohmann::json ToJson(const FetchScanTasksRequest& request);
9494

95+
ICEBERG_REST_EXPORT nlohmann::json ToJson(const DataFile& df);
96+
9597
ICEBERG_REST_EXPORT Result<DataFile> DataFileFromJson(
9698
const nlohmann::json& json,
9799
const std::unordered_map<int32_t, std::shared_ptr<PartitionSpec>>& partition_spec_by_id,
@@ -103,4 +105,8 @@ ICEBERG_REST_EXPORT Result<std::vector<std::shared_ptr<FileScanTask>>> FileScanT
103105
const std::unordered_map<int32_t, std::shared_ptr<PartitionSpec>>& partition_spec_by_id,
104106
const Schema& schema);
105107

108+
ICEBERG_REST_EXPORT nlohmann::json ToJson(const PlanTableScanResponse& response);
109+
ICEBERG_REST_EXPORT nlohmann::json ToJson(const FetchPlanningResultResponse& response);
110+
ICEBERG_REST_EXPORT nlohmann::json ToJson(const FetchScanTasksResponse& response);
111+
106112
} // namespace iceberg::rest

src/iceberg/test/rest_json_serde_test.cc

Lines changed: 97 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1742,4 +1742,101 @@ TEST(FileScanTasksFromJsonTest, NotAnArray) {
17421742
EXPECT_THAT(result, HasErrorMessage("non-array"));
17431743
}
17441744

1745+
// --- Roundtrip tests ---
1746+
1747+
TEST(DataFileRoundtripTest, RequiredFieldsOnly) {
1748+
DataFile df;
1749+
df.content = DataFile::Content::kData;
1750+
df.file_path = "s3://bucket/data/file.parquet";
1751+
df.file_format = FileFormatType::kParquet;
1752+
df.file_size_in_bytes = 12345;
1753+
df.record_count = 100;
1754+
1755+
auto json = ToJson(df);
1756+
auto result = DataFileFromJson(json, {}, Schema({}, 0));
1757+
ASSERT_THAT(result, IsOk());
1758+
EXPECT_EQ(result.value(), df);
1759+
}
1760+
1761+
TEST(DataFileRoundtripTest, WithOptionalFields) {
1762+
DataFile df;
1763+
df.content = DataFile::Content::kPositionDeletes;
1764+
df.file_path = "s3://bucket/deletes/pos.parquet";
1765+
df.file_format = FileFormatType::kParquet;
1766+
df.file_size_in_bytes = 5000;
1767+
df.record_count = 50;
1768+
df.partition_spec_id = 1;
1769+
df.column_sizes = {{1, 1000}, {2, 2000}};
1770+
df.value_counts = {{1, 100}, {2, 100}};
1771+
df.null_value_counts = {{1, 0}};
1772+
df.nan_value_counts = {{2, 5}};
1773+
df.split_offsets = {0, 4096};
1774+
df.sort_order_id = 0;
1775+
df.referenced_data_file = "s3://bucket/data/file.parquet";
1776+
1777+
auto json = ToJson(df);
1778+
auto result = DataFileFromJson(json, {}, Schema({}, 0));
1779+
ASSERT_THAT(result, IsOk());
1780+
EXPECT_EQ(result.value(), df);
1781+
}
1782+
1783+
TEST(FetchScanTasksResponseRoundtripTest, WithFileScanTasksAndDeleteFiles) {
1784+
auto json = nlohmann::json::parse(R"({
1785+
"plan-tasks": [],
1786+
"delete-files": [
1787+
{
1788+
"content": "position_deletes",
1789+
"file-path": "s3://bucket/deletes/delete.parquet",
1790+
"file-format": "PARQUET",
1791+
"file-size-in-bytes": 512,
1792+
"record-count": 5
1793+
}
1794+
],
1795+
"file-scan-tasks": [
1796+
{
1797+
"data-file": {
1798+
"content": "data",
1799+
"file-path": "s3://bucket/data/file.parquet",
1800+
"file-format": "PARQUET",
1801+
"file-size-in-bytes": 12345,
1802+
"record-count": 100
1803+
},
1804+
"delete-file-references": [0]
1805+
}
1806+
]
1807+
})");
1808+
1809+
auto result = FetchScanTasksResponseFromJson(json, EmptySpecs(), EmptySchema());
1810+
ASSERT_THAT(result, IsOk());
1811+
1812+
auto roundtrip_json = ToJson(*result);
1813+
auto result2 = FetchScanTasksResponseFromJson(roundtrip_json, EmptySpecs(), EmptySchema());
1814+
ASSERT_THAT(result2, IsOk());
1815+
EXPECT_EQ(*result, *result2);
1816+
}
1817+
1818+
TEST(PlanTableScanResponseRoundtripTest, SubmittedStatus) {
1819+
auto json = nlohmann::json::parse(R"({"status": "submitted", "plan-id": "abc-123"})");
1820+
auto result = PlanTableScanResponseFromJson(json, EmptySpecs(), EmptySchema());
1821+
ASSERT_THAT(result, IsOk());
1822+
1823+
auto roundtrip_json = ToJson(*result);
1824+
auto result2 = PlanTableScanResponseFromJson(roundtrip_json, EmptySpecs(), EmptySchema());
1825+
ASSERT_THAT(result2, IsOk());
1826+
EXPECT_EQ(*result, *result2);
1827+
}
1828+
1829+
TEST(FetchPlanningResultResponseRoundtripTest, CompletedWithPlanTasks) {
1830+
auto json =
1831+
nlohmann::json::parse(R"({"status": "completed", "plan-tasks": ["task-1", "task-2"]})");
1832+
auto result = FetchPlanningResultResponseFromJson(json, EmptySpecs(), EmptySchema());
1833+
ASSERT_THAT(result, IsOk());
1834+
1835+
auto roundtrip_json = ToJson(*result);
1836+
auto result2 =
1837+
FetchPlanningResultResponseFromJson(roundtrip_json, EmptySpecs(), EmptySchema());
1838+
ASSERT_THAT(result2, IsOk());
1839+
EXPECT_EQ(*result, *result2);
1840+
}
1841+
17451842
} // namespace iceberg::rest

0 commit comments

Comments
 (0)