Skip to content

Commit 6aa73bf

Browse files
Sandeep GottimukkalaSandeep Gottimukkala
authored andcommitted
Comments and stuff
1 parent 4c3ce8d commit 6aa73bf

File tree

6 files changed

+85
-38
lines changed

6 files changed

+85
-38
lines changed

src/iceberg/catalog.h

Lines changed: 22 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -191,17 +191,39 @@ class ICEBERG_EXPORT Catalog {
191191
virtual Result<std::shared_ptr<Table>> RegisterTable(
192192
const TableIdentifier& identifier, const std::string& metadata_file_location) = 0;
193193

194+
/// \brief Initiate a scan planning operation for the given table.
195+
///
196+
/// \param table The table to scan.
197+
/// \param context The scan context containing snapshot, filter, and other options.
198+
/// \return A PlanTableScanResponse with the plan status and initial scan tasks.
194199
virtual Result<rest::PlanTableScanResponse> PlanTableScan(
195200
const Table& table, const internal::TableScanContext& context) {
196201
return NotImplemented("PlanTableScan is not supported by this catalog");
197202
}
203+
204+
/// \brief Fetch the current status and results of an asynchronous scan plan.
205+
///
206+
/// \param table The table being scanned.
207+
/// \param plan_id The plan ID returned by PlanTableScan.
208+
/// \return A FetchPlanningResultResponse with the current plan status and tasks.
198209
virtual Result<rest::FetchPlanningResultResponse> FetchPlanningResult(
199210
const Table& table, const std::string& plan_id) {
200211
return NotImplemented("FetchPlanningResult is not supported by this catalog");
201212
}
213+
214+
/// \brief Cancel an in-progress scan planning operation.
215+
///
216+
/// \param table The table being scanned.
217+
/// \param plan_id The plan ID returned by PlanTableScan.
202218
virtual Status CancelPlanning(const Table& table, const std::string& plan_id) {
203219
return NotImplemented("CancelPlanning is not supported by this catalog");
204220
}
221+
222+
/// \brief Fetch the scan tasks for a given plan task token.
223+
///
224+
/// \param table The table being scanned.
225+
/// \param plan_task The plan task token returned in a scan plan response.
226+
/// \return A FetchScanTasksResponse with the file scan tasks.
205227
virtual Result<rest::FetchScanTasksResponse> FetchScanTasks(
206228
const Table& table, const std::string& plan_task) {
207229
return NotImplemented("FetchScanTasks is not supported by this catalog");

src/iceberg/catalog/rest/error_handlers.h

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -127,6 +127,7 @@ class ICEBERG_REST_EXPORT ViewCommitErrorHandler final : public DefaultErrorHand
127127
constexpr ViewCommitErrorHandler() = default;
128128
};
129129

130+
/// \brief Scan plan operation error handler.
130131
class ICEBERG_REST_EXPORT ScanPlanErrorHandler final : public DefaultErrorHandler {
131132
public:
132133
static const std::shared_ptr<ScanPlanErrorHandler>& Instance();

src/iceberg/catalog/rest/json_serde.cc

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -605,6 +605,7 @@ Result<PlanTableScanResponse> PlanTableScanResponseFromJson(
605605
ICEBERG_ASSIGN_OR_RAISE(response.plan_id, GetJsonValue<std::string>(json, kPlanId));
606606
ICEBERG_RETURN_UNEXPECTED(
607607
BaseScanTaskResponseFromJson(json, &response, partition_specs_by_id, schema));
608+
ICEBERG_RETURN_UNEXPECTED(response.Validate());
608609
return response;
609610
}
610611

@@ -618,6 +619,7 @@ Result<FetchPlanningResultResponse> FetchPlanningResultResponseFromJson(
618619
response.plan_status = PlanStatus(PlanStatus::FromString(status_str));
619620
ICEBERG_RETURN_UNEXPECTED(
620621
BaseScanTaskResponseFromJson(json, &response, partition_specs_by_id, schema));
622+
ICEBERG_RETURN_UNEXPECTED(response.Validate());
621623
return response;
622624
}
623625

@@ -628,6 +630,7 @@ Result<FetchScanTasksResponse> FetchScanTasksResponseFromJson(
628630
FetchScanTasksResponse response;
629631
ICEBERG_RETURN_UNEXPECTED(
630632
BaseScanTaskResponseFromJson(json, &response, partition_specs_by_id, schema));
633+
ICEBERG_RETURN_UNEXPECTED(response.Validate());
631634
return response;
632635
}
633636

src/iceberg/catalog/rest/resource_paths.h

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -81,8 +81,14 @@ class ICEBERG_REST_EXPORT ResourcePaths {
8181
/// \brief Get the /v1/{prefix}/transactions/commit endpoint path.
8282
Result<std::string> CommitTransaction() const;
8383

84+
/// \brief Get the /v1/{prefix}/namespaces/{namespace}/tables/{table}/plan endpoint path.
8485
Result<std::string> ScanPlan(const TableIdentifier& ident) const;
86+
87+
/// \brief Get the /v1/{prefix}/namespaces/{namespace}/tables/{table}/plan/{plan_id}
88+
/// endpoint path.
8589
Result<std::string> ScanPlan(const TableIdentifier& ident, const std::string& plan_id) const;
90+
91+
/// \brief Get the /v1/{prefix}/namespaces/{namespace}/tables/{table}/tasks endpoint path.
8692
Result<std::string> ScanTask(const TableIdentifier& ident) const;
8793

8894
private:

src/iceberg/catalog/rest/types.cc

Lines changed: 46 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -118,6 +118,52 @@ bool CommitTableResponse::operator==(const CommitTableResponse& other) const {
118118
return true;
119119
}
120120

121+
bool PlanTableScanRequest::operator==(const PlanTableScanRequest& other) const {
122+
return snapshot_id == other.snapshot_id && select == other.select &&
123+
filter == other.filter && case_sensitive == other.case_sensitive &&
124+
use_snapshot_schema == other.use_snapshot_schema &&
125+
start_snapshot_id == other.start_snapshot_id &&
126+
end_snapshot_id == other.end_snapshot_id && statsFields == other.statsFields &&
127+
min_rows_required == other.min_rows_required;
128+
}
129+
130+
bool BaseScanTaskResponse::operator==(const BaseScanTaskResponse& other) const {
131+
if (plan_tasks != other.plan_tasks) return false;
132+
if (delete_files != other.delete_files) return false;
133+
if (file_scan_tasks.size() != other.file_scan_tasks.size()) return false;
134+
for (size_t i = 0; i < file_scan_tasks.size(); ++i) {
135+
const auto& a = file_scan_tasks[i];
136+
const auto& b = other.file_scan_tasks[i];
137+
if (!a.data_file() != !b.data_file()) return false;
138+
if (a.data_file() && *a.data_file() != *b.data_file()) return false;
139+
if (a.delete_files().size() != b.delete_files().size()) return false;
140+
for (size_t j = 0; j < a.delete_files().size(); ++j) {
141+
if (!a.delete_files()[j] != !b.delete_files()[j]) return false;
142+
if (a.delete_files()[j] && *a.delete_files()[j] != *b.delete_files()[j]) return false;
143+
}
144+
if (a.residual_filter() != b.residual_filter()) return false;
145+
}
146+
return true;
147+
}
148+
149+
bool PlanTableScanResponse::operator==(const PlanTableScanResponse& other) const {
150+
return BaseScanTaskResponse::operator==(other) && plan_status == other.plan_status &&
151+
plan_id == other.plan_id;
152+
}
153+
154+
bool FetchPlanningResultResponse::operator==(const FetchPlanningResultResponse& other) const {
155+
return BaseScanTaskResponse::operator==(other) &&
156+
plan_status.ToString() == other.plan_status.ToString();
157+
}
158+
159+
bool FetchScanTasksRequest::operator==(const FetchScanTasksRequest& other) const {
160+
return planTask == other.planTask;
161+
}
162+
163+
bool FetchScanTasksResponse::operator==(const FetchScanTasksResponse& other) const {
164+
return BaseScanTaskResponse::operator==(other);
165+
}
166+
121167
Status OAuthTokenResponse::Validate() const {
122168
if (access_token.empty()) {
123169
return ValidationFailed("OAuth2 token response missing required 'access_token'");

src/iceberg/json_serde.cc

Lines changed: 7 additions & 38 deletions
Original file line numberDiff line numberDiff line change
@@ -253,35 +253,6 @@ constexpr std::string_view kReferencedDataFile = "referenced-data-file";
253253
constexpr std::string_view kContentOffset = "content-offset";
254254
constexpr std::string_view kContentSizeInBytes = "content-size-in-bytes";
255255

256-
// Decode a base64-encoded string to raw bytes.
257-
std::vector<uint8_t> Base64Decode(std::string_view encoded) {
258-
static const std::array<int, 256> kDecodeTable = [] {
259-
std::array<int, 256> table;
260-
table.fill(-1);
261-
for (int i = 0; i < 26; i++) table[static_cast<unsigned char>('A') + i] = i;
262-
for (int i = 0; i < 26; i++) table[static_cast<unsigned char>('a') + i] = 26 + i;
263-
for (int i = 0; i < 10; i++) table[static_cast<unsigned char>('0') + i] = 52 + i;
264-
table[static_cast<unsigned char>('+')] = 62;
265-
table[static_cast<unsigned char>('/')] = 63;
266-
return table;
267-
}();
268-
269-
std::vector<uint8_t> decoded;
270-
decoded.reserve(encoded.size() * 3 / 4);
271-
int val = 0, bits = -8;
272-
for (unsigned char c : encoded) {
273-
if (c == '=') break;
274-
const int d = kDecodeTable[c];
275-
if (d == -1) continue;
276-
val = (val << 6) + d;
277-
bits += 6;
278-
if (bits >= 0) {
279-
decoded.push_back(static_cast<uint8_t>((val >> bits) & 0xFF));
280-
bits -= 8;
281-
}
282-
}
283-
return decoded;
284-
}
285256

286257
} // namespace
287258

@@ -1866,21 +1837,20 @@ Result<DataFile> DataFileFromJson(
18661837
ICEBERG_RETURN_UNEXPECTED(parse_int_map(kNullValueCounts, df.null_value_counts));
18671838
ICEBERG_RETURN_UNEXPECTED(parse_int_map(kNanValueCounts, df.nan_value_counts));
18681839

1869-
// Parse BinaryMap: {"keys": [int, ...], "values": [base64string, ...]}
1840+
// Parse BinaryMap: {"keys": [int, ...], "values": [base64 binary, ...]}
18701841
auto parse_binary_map = [&](std::string_view key,
18711842
std::map<int32_t, std::vector<uint8_t>>& target) -> Status {
18721843
if (!json.contains(key) || json.at(key).is_null()) return {};
18731844
ICEBERG_ASSIGN_OR_RAISE(auto map_json, GetJsonValue<nlohmann::json>(json, key));
18741845
ICEBERG_ASSIGN_OR_RAISE(auto keys,
1875-
GetTypedJsonValue<std::vector<int32_t>>(map_json.at("keys")));
1876-
ICEBERG_ASSIGN_OR_RAISE(
1877-
auto values,
1878-
GetTypedJsonValue<std::vector<std::string>>(map_json.at("values")));
1846+
GetJsonValue<std::vector<int32_t>>(map_json, "keys"));
1847+
ICEBERG_ASSIGN_OR_RAISE(auto values,
1848+
GetJsonValue<std::vector<std::vector<uint8_t>>>(map_json, "values"));
18791849
if (keys.size() != values.size()) {
18801850
return JsonParseError("'{}' binary map keys and values have different lengths", key);
18811851
}
18821852
for (size_t i = 0; i < keys.size(); ++i) {
1883-
target[keys[i]] = Base64Decode(values[i]);
1853+
target[keys[i]] = values[i];
18841854
}
18851855
return {};
18861856
};
@@ -1889,9 +1859,8 @@ Result<DataFile> DataFileFromJson(
18891859
ICEBERG_RETURN_UNEXPECTED(parse_binary_map(kUpperBounds, df.upper_bounds));
18901860

18911861
if (json.contains(kKeyMetadata) && !json.at(kKeyMetadata).is_null()) {
1892-
ICEBERG_ASSIGN_OR_RAISE(auto key_meta_str,
1893-
GetJsonValue<std::string>(json, kKeyMetadata));
1894-
df.key_metadata = Base64Decode(key_meta_str);
1862+
ICEBERG_ASSIGN_OR_RAISE(df.key_metadata,
1863+
GetJsonValue<std::vector<uint8_t>>(json, kKeyMetadata));
18951864
}
18961865
if (json.contains(kSplitOffsets) && !json.at(kSplitOffsets).is_null()) {
18971866
ICEBERG_ASSIGN_OR_RAISE(df.split_offsets,

0 commit comments

Comments
 (0)