Skip to content

Commit ae43c16

Browse files
Sandeep GottimukkalaSandeep Gottimukkala
authored andcommitted
refactor: move DataFileFromJson and FileScanTasksFromJson to catalog/rest
These parsers implement REST Catalog scan response wire format and have no callers outside the REST catalog module, mirroring Java TableScanResponseParser. Remove from top-level json_serde layer, declare in catalog/rest/json_serde_internal.h, and move tests to rest_json_serde_test.cc.
1 parent 6edd361 commit ae43c16

6 files changed

Lines changed: 461 additions & 458 deletions

File tree

src/iceberg/catalog/rest/json_serde.cc

Lines changed: 220 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,7 @@
2727
#include "iceberg/catalog/rest/json_serde_internal.h"
2828
#include "iceberg/catalog/rest/types.h"
2929
#include "iceberg/expression/json_serde_internal.h"
30+
#include "iceberg/file_format.h"
3031
#include "iceberg/json_serde_internal.h"
3132
#include "iceberg/partition_spec.h"
3233
#include "iceberg/schema.h"
@@ -95,6 +96,225 @@ constexpr std::string_view kEndSnapshotId = "end-snapshot-id";
9596
constexpr std::string_view kStatsFields = "stats-fields";
9697
constexpr std::string_view kMinRowsRequested = "min-rows-requested";
9798
constexpr std::string_view kPlanTask = "plan-task";
99+
constexpr std::string_view kContent = "content";
100+
constexpr std::string_view kFilePath = "file-path";
101+
constexpr std::string_view kFileFormat = "file-format";
102+
constexpr std::string_view kSpecId = "spec-id";
103+
constexpr std::string_view kPartition = "partition";
104+
constexpr std::string_view kRecordCount = "record-count";
105+
constexpr std::string_view kFileSizeInBytes = "file-size-in-bytes";
106+
constexpr std::string_view kColumnSizes = "column-sizes";
107+
constexpr std::string_view kValueCounts = "value-counts";
108+
constexpr std::string_view kNullValueCounts = "null-value-counts";
109+
constexpr std::string_view kNanValueCounts = "nan-value-counts";
110+
constexpr std::string_view kLowerBounds = "lower-bounds";
111+
constexpr std::string_view kUpperBounds = "upper-bounds";
112+
constexpr std::string_view kKeyMetadata = "key-metadata";
113+
constexpr std::string_view kSplitOffsets = "split-offsets";
114+
constexpr std::string_view kEqualityIds = "equality-ids";
115+
constexpr std::string_view kSortOrderId = "sort-order-id";
116+
constexpr std::string_view kFirstRowId = "first-row-id";
117+
constexpr std::string_view kReferencedDataFile = "referenced-data-file";
118+
constexpr std::string_view kContentOffset = "content-offset";
119+
constexpr std::string_view kContentSizeInBytes = "content-size-in-bytes";
120+
constexpr std::string_view kDataFile = "data-file";
121+
constexpr std::string_view kDeleteFileReferences = "delete-file-references";
122+
constexpr std::string_view kResidualFilter = "residual-filter";
123+
124+
} // namespace
125+
126+
Result<DataFile> DataFileFromJson(
127+
const nlohmann::json& json,
128+
const std::unordered_map<int32_t, std::shared_ptr<PartitionSpec>>& partition_spec_by_id,
129+
const Schema& schema) {
130+
if (!json.is_object()) {
131+
return JsonParseError("DataFile must be a JSON object: {}", SafeDumpJson(json));
132+
}
133+
DataFile df;
134+
135+
ICEBERG_ASSIGN_OR_RAISE(auto content_str, GetJsonValue<std::string>(json, kContent));
136+
if (content_str == ToString(DataFile::Content::kData)) {
137+
df.content = DataFile::Content::kData;
138+
} else if (content_str == ToString(DataFile::Content::kPositionDeletes)) {
139+
df.content = DataFile::Content::kPositionDeletes;
140+
} else if (content_str == ToString(DataFile::Content::kEqualityDeletes)) {
141+
df.content = DataFile::Content::kEqualityDeletes;
142+
} else {
143+
return JsonParseError("Unknown data file content: {}", content_str);
144+
}
145+
146+
ICEBERG_ASSIGN_OR_RAISE(df.file_path, GetJsonValue<std::string>(json, kFilePath));
147+
ICEBERG_ASSIGN_OR_RAISE(auto format_str, GetJsonValue<std::string>(json, kFileFormat));
148+
ICEBERG_ASSIGN_OR_RAISE(df.file_format, FileFormatTypeFromString(format_str));
149+
150+
if (json.contains(kSpecId) && !json.at(kSpecId).is_null()) {
151+
ICEBERG_ASSIGN_OR_RAISE(auto spec_id, GetJsonValue<int32_t>(json, kSpecId));
152+
df.partition_spec_id = spec_id;
153+
}
154+
155+
if (json.contains(kPartition)) {
156+
ICEBERG_ASSIGN_OR_RAISE(auto partition_vals,
157+
GetJsonValue<nlohmann::json>(json, kPartition));
158+
if (!partition_vals.is_array()) {
159+
return JsonParseError("PartitionValues must be a JSON array: {}",
160+
SafeDumpJson(partition_vals));
161+
}
162+
std::vector<Literal> literals;
163+
auto it = partition_spec_by_id.find(df.partition_spec_id.value_or(-1));
164+
if (it == partition_spec_by_id.end()) {
165+
return JsonParseError("Invalid partition spec id: {}",
166+
df.partition_spec_id.value_or(-1));
167+
}
168+
ICEBERG_ASSIGN_OR_RAISE(auto struct_type, it->second->PartitionType(schema));
169+
auto fields = struct_type->fields();
170+
if (partition_vals.size() != fields.size()) {
171+
return JsonParseError("Invalid partition data size: expected = {}, actual = {}",
172+
fields.size(), partition_vals.size());
173+
}
174+
for (size_t pos = 0; pos < fields.size(); ++pos) {
175+
ICEBERG_ASSIGN_OR_RAISE(
176+
auto literal, LiteralFromJson(partition_vals[pos], fields[pos].type().get()));
177+
literals.push_back(std::move(literal));
178+
}
179+
df.partition = PartitionValues(std::move(literals));
180+
}
181+
182+
ICEBERG_ASSIGN_OR_RAISE(df.record_count, GetJsonValue<int64_t>(json, kRecordCount));
183+
ICEBERG_ASSIGN_OR_RAISE(df.file_size_in_bytes,
184+
GetJsonValue<int64_t>(json, kFileSizeInBytes));
185+
186+
auto parse_int_map = [&](std::string_view key,
187+
std::map<int32_t, int64_t>& target) -> Status {
188+
if (!json.contains(key) || json.at(key).is_null()) {
189+
return {};
190+
}
191+
ICEBERG_ASSIGN_OR_RAISE(auto map_json, GetJsonValue<nlohmann::json>(json, key));
192+
ICEBERG_ASSIGN_OR_RAISE(auto keys,
193+
GetTypedJsonValue<std::vector<int32_t>>(map_json.at("keys")));
194+
ICEBERG_ASSIGN_OR_RAISE(
195+
auto values, GetTypedJsonValue<std::vector<int64_t>>(map_json.at("values")));
196+
if (keys.size() != values.size()) {
197+
return JsonParseError("'{}' map keys and values have different lengths", key);
198+
}
199+
for (size_t i = 0; i < keys.size(); ++i) {
200+
target[keys[i]] = values[i];
201+
}
202+
return {};
203+
};
204+
205+
ICEBERG_RETURN_UNEXPECTED(parse_int_map(kColumnSizes, df.column_sizes));
206+
ICEBERG_RETURN_UNEXPECTED(parse_int_map(kValueCounts, df.value_counts));
207+
ICEBERG_RETURN_UNEXPECTED(parse_int_map(kNullValueCounts, df.null_value_counts));
208+
ICEBERG_RETURN_UNEXPECTED(parse_int_map(kNanValueCounts, df.nan_value_counts));
209+
210+
auto parse_binary_map = [&](std::string_view key,
211+
std::map<int32_t, std::vector<uint8_t>>& target) -> Status {
212+
if (!json.contains(key) || json.at(key).is_null()) {
213+
return {};
214+
}
215+
ICEBERG_ASSIGN_OR_RAISE(auto map_json, GetJsonValue<nlohmann::json>(json, key));
216+
ICEBERG_ASSIGN_OR_RAISE(auto keys,
217+
GetJsonValue<std::vector<int32_t>>(map_json, "keys"));
218+
ICEBERG_ASSIGN_OR_RAISE(
219+
auto values, GetJsonValue<std::vector<std::vector<uint8_t>>>(map_json, "values"));
220+
if (keys.size() != values.size()) {
221+
return JsonParseError("'{}' binary map keys and values have different lengths", key);
222+
}
223+
for (size_t i = 0; i < keys.size(); ++i) {
224+
target[keys[i]] = values[i];
225+
}
226+
return {};
227+
};
228+
229+
ICEBERG_RETURN_UNEXPECTED(parse_binary_map(kLowerBounds, df.lower_bounds));
230+
ICEBERG_RETURN_UNEXPECTED(parse_binary_map(kUpperBounds, df.upper_bounds));
231+
232+
if (json.contains(kKeyMetadata) && !json.at(kKeyMetadata).is_null()) {
233+
ICEBERG_ASSIGN_OR_RAISE(df.key_metadata,
234+
GetJsonValue<std::vector<uint8_t>>(json, kKeyMetadata));
235+
}
236+
if (json.contains(kSplitOffsets) && !json.at(kSplitOffsets).is_null()) {
237+
ICEBERG_ASSIGN_OR_RAISE(df.split_offsets,
238+
GetJsonValue<std::vector<int64_t>>(json, kSplitOffsets));
239+
}
240+
if (json.contains(kEqualityIds) && !json.at(kEqualityIds).is_null()) {
241+
ICEBERG_ASSIGN_OR_RAISE(df.equality_ids,
242+
GetJsonValue<std::vector<int32_t>>(json, kEqualityIds));
243+
}
244+
if (json.contains(kSortOrderId) && !json.at(kSortOrderId).is_null()) {
245+
ICEBERG_ASSIGN_OR_RAISE(df.sort_order_id, GetJsonValue<int32_t>(json, kSortOrderId));
246+
}
247+
if (json.contains(kFirstRowId) && !json.at(kFirstRowId).is_null()) {
248+
ICEBERG_ASSIGN_OR_RAISE(df.first_row_id, GetJsonValue<int64_t>(json, kFirstRowId));
249+
}
250+
if (json.contains(kReferencedDataFile) && !json.at(kReferencedDataFile).is_null()) {
251+
ICEBERG_ASSIGN_OR_RAISE(df.referenced_data_file,
252+
GetJsonValue<std::string>(json, kReferencedDataFile));
253+
}
254+
if (json.contains(kContentOffset) && !json.at(kContentOffset).is_null()) {
255+
ICEBERG_ASSIGN_OR_RAISE(df.content_offset,
256+
GetJsonValue<int64_t>(json, kContentOffset));
257+
}
258+
if (json.contains(kContentSizeInBytes) && !json.at(kContentSizeInBytes).is_null()) {
259+
ICEBERG_ASSIGN_OR_RAISE(df.content_size_in_bytes,
260+
GetJsonValue<int64_t>(json, kContentSizeInBytes));
261+
}
262+
263+
return df;
264+
}
265+
266+
Result<std::vector<std::shared_ptr<FileScanTask>>> FileScanTasksFromJson(
267+
const nlohmann::json& json,
268+
const std::vector<std::shared_ptr<DataFile>>& delete_files,
269+
const std::unordered_map<int32_t, std::shared_ptr<PartitionSpec>>& partition_spec_by_id,
270+
const Schema& schema) {
271+
if (!json.is_array()) {
272+
return JsonParseError("Cannot parse file scan tasks from non-array: {}",
273+
SafeDumpJson(json));
274+
}
275+
std::vector<std::shared_ptr<FileScanTask>> file_scan_tasks;
276+
for (const auto& task_json : json) {
277+
if (!task_json.is_object()) {
278+
return JsonParseError("Cannot parse file scan task from a non-object: {}",
279+
SafeDumpJson(task_json));
280+
}
281+
282+
ICEBERG_ASSIGN_OR_RAISE(auto data_file_json,
283+
GetJsonValue<nlohmann::json>(task_json, kDataFile));
284+
ICEBERG_ASSIGN_OR_RAISE(auto data_file,
285+
DataFileFromJson(data_file_json, partition_spec_by_id, schema));
286+
287+
std::vector<std::shared_ptr<DataFile>> task_delete_files;
288+
if (task_json.contains(kDeleteFileReferences) &&
289+
!task_json.at(kDeleteFileReferences).is_null()) {
290+
ICEBERG_ASSIGN_OR_RAISE(auto refs, GetJsonValue<std::vector<int32_t>>(
291+
task_json, kDeleteFileReferences));
292+
for (int32_t ref : refs) {
293+
if (ref < 0 || static_cast<size_t>(ref) >= delete_files.size()) {
294+
return JsonParseError(
295+
"delete-file-references index {} is out of range (delete_files size: {})",
296+
ref, delete_files.size());
297+
}
298+
task_delete_files.push_back(delete_files[ref]);
299+
}
300+
}
301+
302+
std::shared_ptr<Expression> residual_filter;
303+
if (task_json.contains(kResidualFilter) && !task_json.at(kResidualFilter).is_null()) {
304+
ICEBERG_ASSIGN_OR_RAISE(auto filter_json,
305+
GetJsonValue<nlohmann::json>(task_json, kResidualFilter));
306+
ICEBERG_ASSIGN_OR_RAISE(residual_filter, ExpressionFromJson(filter_json));
307+
}
308+
309+
file_scan_tasks.push_back(
310+
std::make_shared<FileScanTask>(std::make_shared<DataFile>(std::move(data_file)),
311+
std::move(task_delete_files),
312+
std::move(residual_filter)));
313+
}
314+
return file_scan_tasks;
315+
}
316+
317+
namespace {
98318

99319
Status BaseScanTaskResponseFromJson(
100320
const nlohmann::json& json, BaseScanTaskResponse* response,

src/iceberg/catalog/rest/json_serde_internal.h

Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -19,15 +19,19 @@
1919

2020
#pragma once
2121

22+
#include <memory>
2223
#include <unordered_map>
24+
#include <vector>
2325

2426
#include <nlohmann/json_fwd.hpp>
2527

2628
#include "iceberg/catalog/rest/iceberg_rest_export.h"
2729
#include "iceberg/catalog/rest/types.h"
30+
#include "iceberg/manifest/manifest_entry.h"
2831
#include "iceberg/partition_spec.h"
2932
#include "iceberg/result.h"
3033
#include "iceberg/schema.h"
34+
#include "iceberg/table_scan.h"
3135

3236
/// \file iceberg/catalog/rest/json_serde_internal.h
3337
/// JSON serialization and deserialization for Iceberg REST Catalog API types.
@@ -88,4 +92,15 @@ ICEBERG_REST_EXPORT Result<FetchScanTasksResponse> FetchScanTasksResponseFromJso
8892
ICEBERG_REST_EXPORT Result<nlohmann::json> ToJson(const PlanTableScanRequest& request);
8993
ICEBERG_REST_EXPORT nlohmann::json ToJson(const FetchScanTasksRequest& request);
9094

95+
ICEBERG_REST_EXPORT Result<DataFile> DataFileFromJson(
96+
const nlohmann::json& json,
97+
const std::unordered_map<int32_t, std::shared_ptr<PartitionSpec>>& partition_spec_by_id,
98+
const Schema& schema);
99+
100+
ICEBERG_REST_EXPORT Result<std::vector<std::shared_ptr<FileScanTask>>> FileScanTasksFromJson(
101+
const nlohmann::json& json,
102+
const std::vector<std::shared_ptr<DataFile>>& delete_files,
103+
const std::unordered_map<int32_t, std::shared_ptr<PartitionSpec>>& partition_spec_by_id,
104+
const Schema& schema);
105+
91106
} // namespace iceberg::rest

0 commit comments

Comments
 (0)