diff --git a/src/iceberg/catalog.h b/src/iceberg/catalog.h index 81c9ddd11..18e76e4f5 100644 --- a/src/iceberg/catalog.h +++ b/src/iceberg/catalog.h @@ -26,8 +26,10 @@ #include #include +#include "iceberg/catalog/rest/types.h" #include "iceberg/result.h" #include "iceberg/table_identifier.h" +#include "iceberg/table_scan.h" #include "iceberg/type_fwd.h" namespace iceberg { @@ -188,6 +190,44 @@ class ICEBERG_EXPORT Catalog { /// \return a Table instance or ErrorKind::kAlreadyExists if the table already exists virtual Result> RegisterTable( const TableIdentifier& identifier, const std::string& metadata_file_location) = 0; + + /// \brief Initiate a scan planning operation for the given table. + /// + /// \param table The table to scan. + /// \param context The scan context containing snapshot, filter, and other options. + /// \return A PlanTableScanResponse with the plan status and initial scan tasks. + virtual Result PlanTableScan( + const Table& table, const internal::TableScanContext& context) { + return NotImplemented("PlanTableScan is not supported by this catalog"); + } + + /// \brief Fetch the current status and results of an asynchronous scan plan. + /// + /// \param table The table being scanned. + /// \param plan_id The plan ID returned by PlanTableScan. + /// \return A FetchPlanningResultResponse with the current plan status and tasks. + virtual Result FetchPlanningResult( + const Table& table, const std::string& plan_id) { + return NotImplemented("FetchPlanningResult is not supported by this catalog"); + } + + /// \brief Cancel an in-progress scan planning operation. + /// + /// \param table The table being scanned. + /// \param plan_id The plan ID returned by PlanTableScan. + virtual Status CancelPlanning(const Table& table, const std::string& plan_id) { + return NotImplemented("CancelPlanning is not supported by this catalog"); + } + + /// \brief Fetch the scan tasks for a given plan task token. + /// + /// \param table The table being scanned. + /// \param plan_task The plan task token returned in a scan plan response. + /// \return A FetchScanTasksResponse with the file scan tasks. + virtual Result FetchScanTasks( + const Table& table, const std::string& plan_task) { + return NotImplemented("FetchScanTasks is not supported by this catalog"); + } }; } // namespace iceberg diff --git a/src/iceberg/catalog/rest/endpoint.h b/src/iceberg/catalog/rest/endpoint.h index 7382955ce..fdcd2108e 100644 --- a/src/iceberg/catalog/rest/endpoint.h +++ b/src/iceberg/catalog/rest/endpoint.h @@ -128,6 +128,26 @@ class ICEBERG_REST_EXPORT Endpoint { return {HttpMethod::kPost, "/v1/{prefix}/transactions/commit"}; } + // Scan planning endpoints + static Endpoint PlanTableScan() { + return {HttpMethod::kPost, "/v1/{prefix}/namespaces/{namespace}/tables/{table}/plan"}; + } + + static Endpoint FetchPlanningResult() { + return {HttpMethod::kGet, + "/v1/{prefix}/namespaces/{namespace}/tables/{table}/plan/{plan-id}"}; + } + + static Endpoint CancelPlanning() { + return {HttpMethod::kDelete, + "/v1/{prefix}/namespaces/{namespace}/tables/{table}/plan/{plan-id}"}; + } + + static Endpoint FetchScanTasks() { + return {HttpMethod::kPost, + "/v1/{prefix}/namespaces/{namespace}/tables/{table}/tasks"}; + } + private: Endpoint(HttpMethod method, std::string_view path) : method_(method), path_(path) {} diff --git a/src/iceberg/catalog/rest/error_handlers.cc b/src/iceberg/catalog/rest/error_handlers.cc index f3e5b8fb3..f1aa6d4cf 100644 --- a/src/iceberg/catalog/rest/error_handlers.cc +++ b/src/iceberg/catalog/rest/error_handlers.cc @@ -30,6 +30,9 @@ namespace { constexpr std::string_view kIllegalArgumentException = "IllegalArgumentException"; constexpr std::string_view kNoSuchNamespaceException = "NoSuchNamespaceException"; constexpr std::string_view kNamespaceNotEmptyException = "NamespaceNotEmptyException"; +constexpr std::string_view kNoSuchTableException = "NoSuchTableException"; +constexpr std::string_view kNoSuchPlanIdException = "NoSuchPlanIdException"; +constexpr std::string_view kNoSuchPlanTaskException = "NoSuchPlanTaskException"; } // namespace @@ -183,4 +186,32 @@ Status ViewCommitErrorHandler::Accept(const ErrorResponse& error) const { return DefaultErrorHandler::Accept(error); } +const std::shared_ptr& ScanPlanErrorHandler::Instance() { + static const std::shared_ptr instance{new ScanPlanErrorHandler()}; + return instance; +} + +Status ScanPlanErrorHandler::Accept(const ErrorResponse& error) const { + switch (error.code) { + case 404: + if (error.type == kNoSuchNamespaceException) { + return NoSuchNamespace(error.message); + } + if (error.type == kNoSuchTableException) { + return NoSuchTable(error.message); + } + if (error.type == kNoSuchPlanIdException) { + return NoSuchPlanId(error.message); + } + if (error.type == kNoSuchPlanTaskException) { + return NoSuchPlanTask(error.message); + } + return NotFound(error.message); + case 406: + return NotSupported(error.message); + } + + return DefaultErrorHandler::Accept(error); +} + } // namespace iceberg::rest diff --git a/src/iceberg/catalog/rest/error_handlers.h b/src/iceberg/catalog/rest/error_handlers.h index eae2c9b7f..7c0823384 100644 --- a/src/iceberg/catalog/rest/error_handlers.h +++ b/src/iceberg/catalog/rest/error_handlers.h @@ -127,4 +127,15 @@ class ICEBERG_REST_EXPORT ViewCommitErrorHandler final : public DefaultErrorHand constexpr ViewCommitErrorHandler() = default; }; +/// \brief Scan plan operation error handler. +class ICEBERG_REST_EXPORT ScanPlanErrorHandler final : public DefaultErrorHandler { + public: + static const std::shared_ptr& Instance(); + + Status Accept(const ErrorResponse& error) const override; + + private: + constexpr ScanPlanErrorHandler() = default; +}; + } // namespace iceberg::rest diff --git a/src/iceberg/catalog/rest/json_serde.cc b/src/iceberg/catalog/rest/json_serde.cc index eebdc1969..c2d4ccf4b 100644 --- a/src/iceberg/catalog/rest/json_serde.cc +++ b/src/iceberg/catalog/rest/json_serde.cc @@ -26,8 +26,10 @@ #include "iceberg/catalog/rest/json_serde_internal.h" #include "iceberg/catalog/rest/types.h" +#include "iceberg/expression/json_serde_internal.h" #include "iceberg/json_serde_internal.h" #include "iceberg/partition_spec.h" +#include "iceberg/schema.h" #include "iceberg/sort_order.h" #include "iceberg/table_identifier.h" #include "iceberg/table_requirement.h" @@ -78,6 +80,21 @@ constexpr std::string_view kExpiresIn = "expires_in"; constexpr std::string_view kIssuedTokenType = "issued_token_type"; constexpr std::string_view kRefreshToken = "refresh_token"; constexpr std::string_view kOAuthScope = "scope"; +constexpr std::string_view kPlanStatus = "status"; +constexpr std::string_view kPlanId = "plan-id"; +constexpr std::string_view kPlanTasks = "plan-tasks"; +constexpr std::string_view kFileScanTasks = "file-scan-tasks"; +constexpr std::string_view kDeleteFiles = "delete-files"; +constexpr std::string_view kSnapshotId = "snapshot-id"; +constexpr std::string_view kSelect = "select"; +constexpr std::string_view kFilter = "filter"; +constexpr std::string_view kCaseSensitive = "case-sensitive"; +constexpr std::string_view kUseSnapshotSchema = "use-snapshot-schema"; +constexpr std::string_view kStartSnapshotId = "start-snapshot-id"; +constexpr std::string_view kEndSnapshotId = "end-snapshot-id"; +constexpr std::string_view kStatsFields = "stats-fields"; +constexpr std::string_view kMinRowsRequired = "min-rows-required"; +constexpr std::string_view kPlanTask = "plan-task"; } // namespace @@ -506,6 +523,114 @@ Result OAuthTokenResponseFromJson(const nlohmann::json& json return response; } +Result ToJson(const PlanTableScanRequest& request) { + nlohmann::json json; + if (request.snapshot_id.has_value()) { + json[kSnapshotId] = request.snapshot_id.value(); + } + if (!request.select.empty()) { + json[kSelect] = request.select; + } + if (request.filter) { + ICEBERG_ASSIGN_OR_RAISE(auto filter_json, iceberg::ToJson(*request.filter)); + json[kFilter] = std::move(filter_json); + } + json[kCaseSensitive] = request.case_sensitive; + json[kUseSnapshotSchema] = request.use_snapshot_schema; + if (request.start_snapshot_id.has_value()) { + json[kStartSnapshotId] = request.start_snapshot_id.value(); + } + if (request.end_snapshot_id.has_value()) { + json[kEndSnapshotId] = request.end_snapshot_id.value(); + } + if (!request.statsFields.empty()) { + json[kStatsFields] = request.statsFields; + } + if (request.min_rows_required.has_value()) { + json[kMinRowsRequired] = request.min_rows_required.value(); + } + return json; +} + +nlohmann::json ToJson(const FetchScanTasksRequest& request) { + nlohmann::json json; + json[kPlanTask] = request.planTask; + return json; +} + +Status BaseScanTaskResponseFromJson( + const nlohmann::json& json, BaseScanTaskResponse* response, + const std::unordered_map>& + partition_specs_by_id, + const Schema& schema) { + // 1. plan_tasks + ICEBERG_ASSIGN_OR_RAISE( + response->plan_tasks, + GetJsonValueOrDefault>(json, kPlanTasks)); + + // 2. delete_files + ICEBERG_ASSIGN_OR_RAISE( + auto delete_files_json, + GetJsonValueOrDefault(json, kDeleteFiles, nlohmann::json::array())); + for (const auto& entry_json : delete_files_json) { + ICEBERG_ASSIGN_OR_RAISE(auto delete_file, + DataFileFromJson(entry_json, partition_specs_by_id, schema)); + response->delete_files.push_back(std::move(delete_file)); + } + + // 3. file_scan_tasks + ICEBERG_ASSIGN_OR_RAISE(auto file_scan_tasks_json, + GetJsonValueOrDefault(json, kFileScanTasks, + nlohmann::json::array())); + ICEBERG_ASSIGN_OR_RAISE( + response->file_scan_tasks, + FileScanTasksFromJson(file_scan_tasks_json, response->delete_files, + partition_specs_by_id, schema)); + return {}; +} + +Result PlanTableScanResponseFromJson( + const nlohmann::json& json, + const std::unordered_map>& + partition_specs_by_id, + const Schema& schema) { + PlanTableScanResponse response; + ICEBERG_ASSIGN_OR_RAISE(response.plan_status, + GetJsonValue(json, kPlanStatus)); + ICEBERG_ASSIGN_OR_RAISE(response.plan_id, + GetJsonValueOrDefault(json, kPlanId)); + ICEBERG_RETURN_UNEXPECTED( + BaseScanTaskResponseFromJson(json, &response, partition_specs_by_id, schema)); + ICEBERG_RETURN_UNEXPECTED(response.Validate()); + return response; +} + +Result FetchPlanningResultResponseFromJson( + const nlohmann::json& json, + const std::unordered_map>& + partition_specs_by_id, + const Schema& schema) { + FetchPlanningResultResponse response; + ICEBERG_ASSIGN_OR_RAISE(response.plan_status, + GetJsonValue(json, kPlanStatus)); + ICEBERG_RETURN_UNEXPECTED( + BaseScanTaskResponseFromJson(json, &response, partition_specs_by_id, schema)); + ICEBERG_RETURN_UNEXPECTED(response.Validate()); + return response; +} + +Result FetchScanTasksResponseFromJson( + const nlohmann::json& json, + const std::unordered_map>& + partition_specs_by_id, + const Schema& schema) { + FetchScanTasksResponse response; + ICEBERG_RETURN_UNEXPECTED( + BaseScanTaskResponseFromJson(json, &response, partition_specs_by_id, schema)); + ICEBERG_RETURN_UNEXPECTED(response.Validate()); + return response; +} + #define ICEBERG_DEFINE_FROM_JSON(Model) \ template <> \ Result FromJson(const nlohmann::json& json) { \ diff --git a/src/iceberg/catalog/rest/json_serde_internal.h b/src/iceberg/catalog/rest/json_serde_internal.h index 820e077d7..cdd2dab4d 100644 --- a/src/iceberg/catalog/rest/json_serde_internal.h +++ b/src/iceberg/catalog/rest/json_serde_internal.h @@ -19,11 +19,15 @@ #pragma once +#include + #include #include "iceberg/catalog/rest/iceberg_rest_export.h" #include "iceberg/catalog/rest/types.h" +#include "iceberg/partition_spec.h" #include "iceberg/result.h" +#include "iceberg/schema.h" /// \file iceberg/catalog/rest/json_serde_internal.h /// JSON serialization and deserialization for Iceberg REST Catalog API types. @@ -62,4 +66,26 @@ ICEBERG_DECLARE_JSON_SERDE(OAuthTokenResponse) #undef ICEBERG_DECLARE_JSON_SERDE +ICEBERG_REST_EXPORT Result PlanTableScanResponseFromJson( + const nlohmann::json& json, + const std::unordered_map>& + partition_specs_by_id, + const Schema& schema); + +ICEBERG_REST_EXPORT Result +FetchPlanningResultResponseFromJson( + const nlohmann::json& json, + const std::unordered_map>& + partition_specs_by_id, + const Schema& schema); + +ICEBERG_REST_EXPORT Result FetchScanTasksResponseFromJson( + const nlohmann::json& json, + const std::unordered_map>& + partition_specs_by_id, + const Schema& schema); + +ICEBERG_REST_EXPORT Result ToJson(const PlanTableScanRequest& request); +ICEBERG_REST_EXPORT nlohmann::json ToJson(const FetchScanTasksRequest& request); + } // namespace iceberg::rest diff --git a/src/iceberg/catalog/rest/resource_paths.cc b/src/iceberg/catalog/rest/resource_paths.cc index f86a75ec0..4e6660b57 100644 --- a/src/iceberg/catalog/rest/resource_paths.cc +++ b/src/iceberg/catalog/rest/resource_paths.cc @@ -102,4 +102,26 @@ Result ResourcePaths::CommitTransaction() const { return std::format("{}/v1/{}transactions/commit", base_uri_, prefix_); } +Result ResourcePaths::ScanPlan(const TableIdentifier& ident) const { + ICEBERG_ASSIGN_OR_RAISE(std::string encoded_namespace, EncodeNamespace(ident.ns)); + ICEBERG_ASSIGN_OR_RAISE(std::string encoded_table_name, EncodeString(ident.name)); + return std::format("{}/v1/{}namespaces/{}/tables/{}/plan", base_uri_, prefix_, + encoded_namespace, encoded_table_name); +} + +Result ResourcePaths::ScanPlan(const TableIdentifier& ident, + const std::string& plan_id) const { + ICEBERG_ASSIGN_OR_RAISE(std::string encoded_namespace, EncodeNamespace(ident.ns)); + ICEBERG_ASSIGN_OR_RAISE(std::string encoded_table_name, EncodeString(ident.name)); + return std::format("{}/v1/{}namespaces/{}/tables/{}/plan/{}", base_uri_, prefix_, + encoded_namespace, encoded_table_name, plan_id); +} + +Result ResourcePaths::ScanTask(const TableIdentifier& ident) const { + ICEBERG_ASSIGN_OR_RAISE(std::string encoded_namespace, EncodeNamespace(ident.ns)); + ICEBERG_ASSIGN_OR_RAISE(std::string encoded_table_name, EncodeString(ident.name)); + return std::format("{}/v1/{}namespaces/{}/tables/{}/tasks", base_uri_, prefix_, + encoded_namespace, encoded_table_name); +} + } // namespace iceberg::rest diff --git a/src/iceberg/catalog/rest/resource_paths.h b/src/iceberg/catalog/rest/resource_paths.h index 1b502aaa7..15afb35dc 100644 --- a/src/iceberg/catalog/rest/resource_paths.h +++ b/src/iceberg/catalog/rest/resource_paths.h @@ -81,6 +81,19 @@ class ICEBERG_REST_EXPORT ResourcePaths { /// \brief Get the /v1/{prefix}/transactions/commit endpoint path. Result CommitTransaction() const; + /// \brief Get the /v1/{prefix}/namespaces/{namespace}/tables/{table}/plan endpoint + /// path. + Result ScanPlan(const TableIdentifier& ident) const; + + /// \brief Get the /v1/{prefix}/namespaces/{namespace}/tables/{table}/plan/{plan_id} + /// endpoint path. + Result ScanPlan(const TableIdentifier& ident, + const std::string& plan_id) const; + + /// \brief Get the /v1/{prefix}/namespaces/{namespace}/tables/{table}/tasks endpoint + /// path. + Result ScanTask(const TableIdentifier& ident) const; + private: ResourcePaths(std::string base_uri, const std::string& prefix); diff --git a/src/iceberg/catalog/rest/rest_catalog.cc b/src/iceberg/catalog/rest/rest_catalog.cc index 40e112db7..38738024d 100644 --- a/src/iceberg/catalog/rest/rest_catalog.cc +++ b/src/iceberg/catalog/rest/rest_catalog.cc @@ -43,6 +43,7 @@ #include "iceberg/sort_order.h" #include "iceberg/table.h" #include "iceberg/table_requirement.h" +#include "iceberg/table_scan.h" #include "iceberg/table_update.h" #include "iceberg/transaction.h" #include "iceberg/util/macros.h" @@ -62,6 +63,8 @@ std::unordered_set GetDefaultEndpoints() { Endpoint::UpdateTable(), Endpoint::DeleteTable(), Endpoint::RenameTable(), Endpoint::RegisterTable(), Endpoint::ReportMetrics(), Endpoint::CommitTransaction(), + Endpoint::PlanTableScan(), Endpoint::FetchPlanningResult(), + Endpoint::CancelPlanning(), Endpoint::FetchScanTasks(), }; } @@ -495,4 +498,85 @@ Result> RestCatalog::RegisterTable( shared_from_this()); } +Result RestCatalog::PlanTableScan( + const Table& table, const internal::TableScanContext& context) { + ICEBERG_ENDPOINT_CHECK(supported_endpoints_, Endpoint::PlanTableScan()); + ICEBERG_ASSIGN_OR_RAISE(auto path, paths_->ScanPlan(table.name())); + ICEBERG_ASSIGN_OR_RAISE(auto schema_ptr, table.schema()); + ICEBERG_ASSIGN_OR_RAISE(auto specs_ref, table.specs()); + + PlanTableScanRequest request; + if (context.snapshot_id.has_value()) { + request.snapshot_id = context.snapshot_id; + } + request.select = context.selected_columns; + request.filter = context.filter; + request.case_sensitive = context.case_sensitive; + request.use_snapshot_schema = false; + if (context.from_snapshot_id.has_value() && context.to_snapshot_id.has_value()) { + request.start_snapshot_id = context.from_snapshot_id.value(); + request.end_snapshot_id = context.to_snapshot_id.value(); + request.use_snapshot_schema = true; + } + if (context.min_rows_requested.has_value()) { + request.min_rows_required = context.min_rows_requested.value(); + } + + ICEBERG_RETURN_UNEXPECTED(request.Validate()); + ICEBERG_ASSIGN_OR_RAISE(auto request_json, ToJson(request)); + ICEBERG_ASSIGN_OR_RAISE(auto json_request, ToJsonString(request_json)); + ICEBERG_ASSIGN_OR_RAISE( + const auto response, + client_->Post(path, json_request, /*headers=*/{}, *ScanPlanErrorHandler::Instance(), + *catalog_session_)); + + ICEBERG_ASSIGN_OR_RAISE(auto json, FromJsonString(response.body())); + return PlanTableScanResponseFromJson(json, specs_ref.get(), *schema_ptr); +} + +Result RestCatalog::FetchPlanningResult( + const Table& table, const std::string& plan_id) { + ICEBERG_ENDPOINT_CHECK(supported_endpoints_, Endpoint::FetchPlanningResult()); + ICEBERG_ASSIGN_OR_RAISE(auto path, paths_->ScanPlan(table.name(), plan_id)); + ICEBERG_ASSIGN_OR_RAISE(auto schema_ptr, table.schema()); + ICEBERG_ASSIGN_OR_RAISE(auto specs_ref, table.specs()); + + ICEBERG_ASSIGN_OR_RAISE( + const auto response, + client_->Get(path, /*params=*/{}, /*headers=*/{}, *ScanPlanErrorHandler::Instance(), + *catalog_session_)); + ICEBERG_ASSIGN_OR_RAISE(auto json, FromJsonString(response.body())); + return FetchPlanningResultResponseFromJson(json, specs_ref.get(), *schema_ptr); +} + +Status RestCatalog::CancelPlanning(const Table& table, const std::string& plan_id) { + ICEBERG_ENDPOINT_CHECK(supported_endpoints_, Endpoint::CancelPlanning()); + ICEBERG_ASSIGN_OR_RAISE(auto path, paths_->ScanPlan(table.name(), plan_id)); + + ICEBERG_ASSIGN_OR_RAISE( + const auto response, + client_->Delete(path, /*params=*/{}, /*headers=*/{}, + *ScanPlanErrorHandler::Instance(), *catalog_session_)); + return {}; +} + +Result RestCatalog::FetchScanTasks(const Table& table, + const std::string& plan_task) { + ICEBERG_ENDPOINT_CHECK(supported_endpoints_, Endpoint::FetchScanTasks()); + ICEBERG_ASSIGN_OR_RAISE(auto path, paths_->ScanTask(table.name())); + ICEBERG_ASSIGN_OR_RAISE(auto schema_ptr, table.schema()); + ICEBERG_ASSIGN_OR_RAISE(auto specs_ref, table.specs()); + + FetchScanTasksRequest request{.planTask = plan_task}; + ICEBERG_RETURN_UNEXPECTED(request.Validate()); + ICEBERG_ASSIGN_OR_RAISE(auto json_request, ToJsonString(ToJson(request))); + ICEBERG_ASSIGN_OR_RAISE( + const auto response, + client_->Post(path, json_request, /*headers=*/{}, *ScanPlanErrorHandler::Instance(), + *catalog_session_)); + + ICEBERG_ASSIGN_OR_RAISE(auto json, FromJsonString(response.body())); + return FetchScanTasksResponseFromJson(json, specs_ref.get(), *schema_ptr); +} + } // namespace iceberg::rest diff --git a/src/iceberg/catalog/rest/rest_catalog.h b/src/iceberg/catalog/rest/rest_catalog.h index 38230a5e2..d4a86e1ec 100644 --- a/src/iceberg/catalog/rest/rest_catalog.h +++ b/src/iceberg/catalog/rest/rest_catalog.h @@ -104,6 +104,14 @@ class ICEBERG_REST_EXPORT RestCatalog : public Catalog, const TableIdentifier& identifier, const std::string& metadata_file_location) override; + Result PlanTableScan( + const Table& table, const internal::TableScanContext& context) override; + Result FetchPlanningResult( + const Table& table, const std::string& plan_id) override; + Status CancelPlanning(const Table& table, const std::string& plan_id) override; + Result FetchScanTasks(const Table& table, + const std::string& plan_task) override; + private: RestCatalog(RestCatalogProperties config, std::shared_ptr file_io, std::unique_ptr client, std::unique_ptr paths, diff --git a/src/iceberg/catalog/rest/types.cc b/src/iceberg/catalog/rest/types.cc index 3abfb1406..8dcfadd92 100644 --- a/src/iceberg/catalog/rest/types.cc +++ b/src/iceberg/catalog/rest/types.cc @@ -118,6 +118,70 @@ bool CommitTableResponse::operator==(const CommitTableResponse& other) const { return true; } +bool PlanTableScanRequest::operator==(const PlanTableScanRequest& other) const { + return snapshot_id == other.snapshot_id && select == other.select && + filter == other.filter && case_sensitive == other.case_sensitive && + use_snapshot_schema == other.use_snapshot_schema && + start_snapshot_id == other.start_snapshot_id && + end_snapshot_id == other.end_snapshot_id && statsFields == other.statsFields && + min_rows_required == other.min_rows_required; +} + +bool BaseScanTaskResponse::operator==(const BaseScanTaskResponse& other) const { + if (plan_tasks != other.plan_tasks) { + return false; + } + if (delete_files != other.delete_files) { + return false; + } + if (file_scan_tasks.size() != other.file_scan_tasks.size()) { + return false; + } + for (size_t i = 0; i < file_scan_tasks.size(); ++i) { + const auto& a = file_scan_tasks[i]; + const auto& b = other.file_scan_tasks[i]; + if (!a.data_file() != !b.data_file()) { + return false; + } + if (a.data_file() && *a.data_file() != *b.data_file()) { + return false; + } + if (a.delete_files().size() != b.delete_files().size()) { + return false; + } + for (size_t j = 0; j < a.delete_files().size(); ++j) { + if (!a.delete_files()[j] != !b.delete_files()[j]) { + return false; + } + if (a.delete_files()[j] && *a.delete_files()[j] != *b.delete_files()[j]) { + return false; + } + } + if (a.residual_filter() != b.residual_filter()) { + return false; + } + } + return true; +} + +bool PlanTableScanResponse::operator==(const PlanTableScanResponse& other) const { + return BaseScanTaskResponse::operator==(other) && plan_status == other.plan_status && + plan_id == other.plan_id; +} + +bool FetchPlanningResultResponse::operator==( + const FetchPlanningResultResponse& other) const { + return BaseScanTaskResponse::operator==(other) && plan_status == other.plan_status; +} + +bool FetchScanTasksRequest::operator==(const FetchScanTasksRequest& other) const { + return planTask == other.planTask; +} + +bool FetchScanTasksResponse::operator==(const FetchScanTasksResponse& other) const { + return BaseScanTaskResponse::operator==(other); +} + Status OAuthTokenResponse::Validate() const { if (access_token.empty()) { return ValidationFailed("OAuth2 token response missing required 'access_token'"); @@ -135,4 +199,89 @@ Status OAuthTokenResponse::Validate() const { return {}; } +Status PlanTableScanRequest::Validate() const { + if (snapshot_id.has_value()) { + if (start_snapshot_id.has_value() || end_snapshot_id.has_value()) { + return ValidationFailed( + "Invalid scan: cannot provide both snapshotId and " + "startSnapshotId/endSnapshotId"); + } + } + if (start_snapshot_id.has_value() || end_snapshot_id.has_value()) { + if (!start_snapshot_id.has_value() || !end_snapshot_id.has_value()) { + return ValidationFailed( + "Invalid incremental scan: startSnapshotId and endSnapshotId is required"); + } + } + if (min_rows_required.has_value() && min_rows_required.value() < 0) { + return ValidationFailed("Invalid scan: minRowsRequested is negative"); + } + return {}; +} + +Status PlanTableScanResponse::Validate() const { + if (plan_status.empty()) { + return ValidationFailed("Invalid response: plan status must be defined"); + } + if (plan_status == "submitted" && plan_id.empty()) { + return ValidationFailed( + "Invalid response: plan id should be defined when status is 'submitted'"); + } + if (plan_status == "cancelled") { + return ValidationFailed( + "Invalid response: 'cancelled' is not a valid status for planTableScan"); + } + if (plan_status != "completed" && (!plan_tasks.empty() || !file_scan_tasks.empty())) { + return ValidationFailed( + "Invalid response: tasks can only be defined when status is 'completed'"); + } + if (!plan_id.empty() && plan_status != "submitted" && plan_status != "completed") { + return ValidationFailed( + "Invalid response: plan id can only be defined when status is 'submitted' or " + "'completed'"); + } + if (file_scan_tasks.empty() && !delete_files.empty()) { + return ValidationFailed( + "Invalid response: deleteFiles should only be returned with fileScanTasks that " + "reference them"); + } + return {}; +} + +Status FetchPlanningResultResponse::Validate() const { + if (plan_status.empty()) { + return ValidationFailed("Invalid status: null"); + } + if (plan_status != "completed" && (!plan_tasks.empty() || !file_scan_tasks.empty())) { + return ValidationFailed( + "Invalid response: tasks can only be returned in a 'completed' status"); + } + if (file_scan_tasks.empty() && !delete_files.empty()) { + return ValidationFailed( + "Invalid response: deleteFiles should only be returned with fileScanTasks that " + "reference them"); + } + return {}; +} + +Status FetchScanTasksRequest::Validate() const { + if (planTask.empty()) { + return ValidationFailed("Invalid planTask: null"); + } + return {}; +} + +Status FetchScanTasksResponse::Validate() const { + if (file_scan_tasks.empty() && !delete_files.empty()) { + return ValidationFailed( + "Invalid response: deleteFiles should only be returned with fileScanTasks that " + "reference them"); + } + if (plan_tasks.empty() && file_scan_tasks.empty()) { + return ValidationFailed( + "Invalid response: planTasks and fileScanTask cannot both be null"); + } + return {}; +} + } // namespace iceberg::rest diff --git a/src/iceberg/catalog/rest/types.h b/src/iceberg/catalog/rest/types.h index 6495a6517..e49791b77 100644 --- a/src/iceberg/catalog/rest/types.h +++ b/src/iceberg/catalog/rest/types.h @@ -28,9 +28,12 @@ #include "iceberg/catalog/rest/endpoint.h" #include "iceberg/catalog/rest/iceberg_rest_export.h" +#include "iceberg/expression/expression.h" +#include "iceberg/manifest/manifest_entry.h" #include "iceberg/result.h" #include "iceberg/schema.h" #include "iceberg/table_identifier.h" +#include "iceberg/table_scan.h" #include "iceberg/type_fwd.h" #include "iceberg/util/macros.h" @@ -295,4 +298,73 @@ struct ICEBERG_REST_EXPORT OAuthTokenResponse { bool operator==(const OAuthTokenResponse&) const = default; }; +/// \brief Request to initiate a server-side scan planning operation. +struct ICEBERG_REST_EXPORT PlanTableScanRequest { + std::optional snapshot_id; + std::vector select; + std::shared_ptr filter; + bool case_sensitive = true; + bool use_snapshot_schema = false; + std::optional start_snapshot_id; + std::optional end_snapshot_id; + std::vector statsFields; + std::optional min_rows_required; + + Status Validate() const; + + bool operator==(const PlanTableScanRequest&) const; +}; + +/// \brief Base response containing scan tasks and delete files returned by scan plan +/// endpoints. +struct ICEBERG_REST_EXPORT BaseScanTaskResponse { + std::vector plan_tasks; + std::vector file_scan_tasks; + std::vector delete_files; + // std::unordered_map specsById; + + Status Validate() const { return {}; }; + + bool operator==(const BaseScanTaskResponse&) const; +}; + +/// \brief Response from initiating a scan planning operation, including plan status and +/// initial scan tasks. +struct ICEBERG_REST_EXPORT PlanTableScanResponse : BaseScanTaskResponse { + std::string plan_status; + std::string plan_id; + // TODO(sandeepg): Add credentials. + + Status Validate() const; + + bool operator==(const PlanTableScanResponse&) const; +}; + +/// \brief Response from polling an asynchronous scan plan, including current status and +/// available scan tasks. +struct ICEBERG_REST_EXPORT FetchPlanningResultResponse : BaseScanTaskResponse { + std::string plan_status; + // TODO(sandeepg): Add credentials. + + Status Validate() const; + + bool operator==(const FetchPlanningResultResponse&) const; +}; + +/// \brief Request to fetch the scan tasks for a given plan task token. +struct ICEBERG_REST_EXPORT FetchScanTasksRequest { + std::string planTask; + + Status Validate() const; + + bool operator==(const FetchScanTasksRequest&) const; +}; + +/// \brief Response containing the file scan tasks for a given plan task token. +struct ICEBERG_REST_EXPORT FetchScanTasksResponse : BaseScanTaskResponse { + Status Validate() const; + + bool operator==(const FetchScanTasksResponse&) const; +}; + } // namespace iceberg::rest diff --git a/src/iceberg/json_serde.cc b/src/iceberg/json_serde.cc index 2d8c22255..f7b782b49 100644 --- a/src/iceberg/json_serde.cc +++ b/src/iceberg/json_serde.cc @@ -27,7 +27,10 @@ #include #include "iceberg/constants.h" +#include "iceberg/expression/json_serde_internal.h" +#include "iceberg/file_format.h" #include "iceberg/json_serde_internal.h" +#include "iceberg/manifest/manifest_entry.h" #include "iceberg/name_mapping.h" #include "iceberg/partition_field.h" #include "iceberg/partition_spec.h" @@ -40,6 +43,7 @@ #include "iceberg/table_metadata.h" #include "iceberg/table_properties.h" #include "iceberg/table_requirement.h" +#include "iceberg/table_scan.h" #include "iceberg/table_update.h" #include "iceberg/transform.h" #include "iceberg/type.h" @@ -225,6 +229,30 @@ constexpr std::string_view kRequirementAssertDefaultSortOrderID = constexpr std::string_view kLastAssignedFieldId = "last-assigned-field-id"; constexpr std::string_view kLastAssignedPartitionId = "last-assigned-partition-id"; +// FileScanTask / DataFile constants (kSpecId, kSortOrderId, and kFileSizeInBytes +// are already defined above) +constexpr std::string_view kDataFile = "data-file"; +constexpr std::string_view kDeleteFileReferences = "delete-file-references"; +constexpr std::string_view kResidualFilter = "residual-filter"; +constexpr std::string_view kContent = "content"; +constexpr std::string_view kFilePath = "file-path"; +constexpr std::string_view kFileFormat = "file-format"; +constexpr std::string_view kPartition = "partition"; +constexpr std::string_view kRecordCount = "record-count"; +constexpr std::string_view kColumnSizes = "column-sizes"; +constexpr std::string_view kValueCounts = "value-counts"; +constexpr std::string_view kNullValueCounts = "null-value-counts"; +constexpr std::string_view kNanValueCounts = "nan-value-counts"; +constexpr std::string_view kLowerBounds = "lower-bounds"; +constexpr std::string_view kUpperBounds = "upper-bounds"; +constexpr std::string_view kKeyMetadata = "key-metadata"; +constexpr std::string_view kSplitOffsets = "split-offsets"; +constexpr std::string_view kEqualityIds = "equality-ids"; +constexpr std::string_view kFirstRowId = "first-row-id"; +constexpr std::string_view kReferencedDataFile = "referenced-data-file"; +constexpr std::string_view kContentOffset = "content-offset"; +constexpr std::string_view kContentSizeInBytes = "content-size-in-bytes"; + } // namespace nlohmann::json ToJson(const SortField& sort_field) { @@ -1723,4 +1751,196 @@ Result> TableRequirementFromJson( return JsonParseError("Unknown table requirement type: {}", type); } +Result DataFileFromJson( + const nlohmann::json& json, + const std::unordered_map>& partitionSpecById, + const Schema& schema) { + if (!json.is_object()) { + return JsonParseError("DataFile must be a JSON object: {}", SafeDumpJson(json)); + } + DataFile df; + + ICEBERG_ASSIGN_OR_RAISE(auto content_str, GetJsonValue(json, kContent)); + if (content_str == ToString(DataFile::Content::kData)) { + df.content = DataFile::Content::kData; + } else if (content_str == ToString(DataFile::Content::kPositionDeletes)) { + df.content = DataFile::Content::kPositionDeletes; + } else if (content_str == ToString(DataFile::Content::kEqualityDeletes)) { + df.content = DataFile::Content::kEqualityDeletes; + } else { + return JsonParseError("Unknown data file content: {}", content_str); + } + + ICEBERG_ASSIGN_OR_RAISE(df.file_path, GetJsonValue(json, kFilePath)); + ICEBERG_ASSIGN_OR_RAISE(auto format_str, GetJsonValue(json, kFileFormat)); + ICEBERG_ASSIGN_OR_RAISE(df.file_format, FileFormatTypeFromString(format_str)); + + if (json.contains(kSpecId) && !json.at(kSpecId).is_null()) { + ICEBERG_ASSIGN_OR_RAISE(auto spec_id, GetJsonValue(json, kSpecId)); + df.partition_spec_id = spec_id; + } + + if (json.contains(kPartition)) { + ICEBERG_ASSIGN_OR_RAISE(auto partition_vals, + GetJsonValue(json, kPartition)); + if (!partition_vals.is_array()) { + return JsonParseError("PartitionValues must be a JSON array: {}", + SafeDumpJson(partition_vals)); + } + std::vector literals; + auto it = partitionSpecById.find(df.partition_spec_id.value_or(-1)); + if (it == partitionSpecById.end()) { + return JsonParseError("Invalid partition spec id: {}", + df.partition_spec_id.value_or(-1)); + } + ICEBERG_ASSIGN_OR_RAISE(auto struct_type, it->second->PartitionType(schema)); + auto fields = struct_type->fields(); + if (partition_vals.size() != fields.size()) { + return JsonParseError("Invalid partition data size: expected = {}, actual = {}", + fields.size(), partition_vals.size()); + } + for (size_t pos = 0; pos < fields.size(); ++pos) { + ICEBERG_ASSIGN_OR_RAISE( + auto literal, LiteralFromJson(partition_vals[pos], fields[pos].type().get())); + literals.push_back(std::move(literal)); + } + df.partition = PartitionValues(std::move(literals)); + } + + ICEBERG_ASSIGN_OR_RAISE(df.record_count, GetJsonValue(json, kRecordCount)); + ICEBERG_ASSIGN_OR_RAISE(df.file_size_in_bytes, + GetJsonValue(json, kFileSizeInBytes)); + + // Parse CountMap: {"keys": [int, ...], "values": [long, ...]} + auto parse_int_map = [&](std::string_view key, + std::map& target) -> Status { + if (!json.contains(key) || json.at(key).is_null()) { + return {}; + } + ICEBERG_ASSIGN_OR_RAISE(auto map_json, GetJsonValue(json, key)); + ICEBERG_ASSIGN_OR_RAISE(auto keys, + GetTypedJsonValue>(map_json.at("keys"))); + ICEBERG_ASSIGN_OR_RAISE( + auto values, GetTypedJsonValue>(map_json.at("values"))); + if (keys.size() != values.size()) { + return JsonParseError("'{}' map keys and values have different lengths", key); + } + for (size_t i = 0; i < keys.size(); ++i) { + target[keys[i]] = values[i]; + } + return {}; + }; + + ICEBERG_RETURN_UNEXPECTED(parse_int_map(kColumnSizes, df.column_sizes)); + ICEBERG_RETURN_UNEXPECTED(parse_int_map(kValueCounts, df.value_counts)); + ICEBERG_RETURN_UNEXPECTED(parse_int_map(kNullValueCounts, df.null_value_counts)); + ICEBERG_RETURN_UNEXPECTED(parse_int_map(kNanValueCounts, df.nan_value_counts)); + + // Parse BinaryMap: {"keys": [int, ...], "values": [...]} + auto parse_binary_map = [&](std::string_view key, + std::map>& target) -> Status { + if (!json.contains(key) || json.at(key).is_null()) { + return {}; + } + ICEBERG_ASSIGN_OR_RAISE(auto map_json, GetJsonValue(json, key)); + ICEBERG_ASSIGN_OR_RAISE(auto keys, + GetJsonValue>(map_json, "keys")); + ICEBERG_ASSIGN_OR_RAISE( + auto values, GetJsonValue>>(map_json, "values")); + if (keys.size() != values.size()) { + return JsonParseError("'{}' binary map keys and values have different lengths", + key); + } + for (size_t i = 0; i < keys.size(); ++i) { + target[keys[i]] = values[i]; + } + return {}; + }; + + ICEBERG_RETURN_UNEXPECTED(parse_binary_map(kLowerBounds, df.lower_bounds)); + ICEBERG_RETURN_UNEXPECTED(parse_binary_map(kUpperBounds, df.upper_bounds)); + + if (json.contains(kKeyMetadata) && !json.at(kKeyMetadata).is_null()) { + ICEBERG_ASSIGN_OR_RAISE(df.key_metadata, + GetJsonValue>(json, kKeyMetadata)); + } + if (json.contains(kSplitOffsets) && !json.at(kSplitOffsets).is_null()) { + ICEBERG_ASSIGN_OR_RAISE(df.split_offsets, + GetJsonValue>(json, kSplitOffsets)); + } + if (json.contains(kEqualityIds) && !json.at(kEqualityIds).is_null()) { + ICEBERG_ASSIGN_OR_RAISE(df.equality_ids, + GetJsonValue>(json, kEqualityIds)); + } + if (json.contains(kSortOrderId) && !json.at(kSortOrderId).is_null()) { + ICEBERG_ASSIGN_OR_RAISE(df.sort_order_id, GetJsonValue(json, kSortOrderId)); + } + if (json.contains(kFirstRowId) && !json.at(kFirstRowId).is_null()) { + ICEBERG_ASSIGN_OR_RAISE(df.first_row_id, GetJsonValue(json, kFirstRowId)); + } + if (json.contains(kReferencedDataFile) && !json.at(kReferencedDataFile).is_null()) { + ICEBERG_ASSIGN_OR_RAISE(df.referenced_data_file, + GetJsonValue(json, kReferencedDataFile)); + } + if (json.contains(kContentOffset) && !json.at(kContentOffset).is_null()) { + ICEBERG_ASSIGN_OR_RAISE(df.content_offset, + GetJsonValue(json, kContentOffset)); + } + if (json.contains(kContentSizeInBytes) && !json.at(kContentSizeInBytes).is_null()) { + ICEBERG_ASSIGN_OR_RAISE(df.content_size_in_bytes, + GetJsonValue(json, kContentSizeInBytes)); + } + + return df; +} + +Result> FileScanTasksFromJson( + const nlohmann::json& json, const std::vector& delete_files, + const std::unordered_map>& partitionSpecById, + const Schema& schema) { + if (!json.is_array()) { + return JsonParseError("Cannot parse file scan tasks from non-array: {}", + SafeDumpJson(json)); + } + std::vector file_scan_tasks; + for (const auto& task_json : json) { + if (!task_json.is_object()) { + return JsonParseError("Cannot parse file scan task from a non-object: {}", + SafeDumpJson(task_json)); + } + + ICEBERG_ASSIGN_OR_RAISE(auto data_file_json, + GetJsonValue(task_json, kDataFile)); + ICEBERG_ASSIGN_OR_RAISE(auto data_file, + DataFileFromJson(data_file_json, partitionSpecById, schema)); + + std::vector> task_delete_files; + if (task_json.contains(kDeleteFileReferences) && + !task_json.at(kDeleteFileReferences).is_null()) { + ICEBERG_ASSIGN_OR_RAISE(auto refs, GetJsonValue>( + task_json, kDeleteFileReferences)); + for (int32_t ref : refs) { + if (ref < 0 || static_cast(ref) >= delete_files.size()) { + return JsonParseError( + "delete-file-references index {} is out of range (delete_files size: {})", + ref, delete_files.size()); + } + task_delete_files.push_back(std::make_shared(delete_files[ref])); + } + } + + std::shared_ptr residual_filter; + if (task_json.contains(kResidualFilter) && !task_json.at(kResidualFilter).is_null()) { + ICEBERG_ASSIGN_OR_RAISE(auto filter_json, + GetJsonValue(task_json, kResidualFilter)); + ICEBERG_ASSIGN_OR_RAISE(residual_filter, ExpressionFromJson(filter_json)); + } + + file_scan_tasks.emplace_back(std::make_shared(std::move(data_file)), + std::move(task_delete_files), + std::move(residual_filter)); + } + return file_scan_tasks; +} + } // namespace iceberg diff --git a/src/iceberg/json_serde_internal.h b/src/iceberg/json_serde_internal.h index 8699e3dd1..410d92ae2 100644 --- a/src/iceberg/json_serde_internal.h +++ b/src/iceberg/json_serde_internal.h @@ -28,9 +28,13 @@ #include +#include "iceberg/manifest/manifest_entry.h" +#include "iceberg/partition_spec.h" #include "iceberg/result.h" +#include "iceberg/schema.h" #include "iceberg/statistics_file.h" #include "iceberg/table_metadata.h" +#include "iceberg/table_scan.h" #include "iceberg/type_fwd.h" namespace iceberg { @@ -413,4 +417,39 @@ ICEBERG_EXPORT nlohmann::json ToJson(const TableRequirement& requirement); ICEBERG_EXPORT Result> TableRequirementFromJson( const nlohmann::json& json); +/// \brief Deserializes a JSON object into a `DataFile` object. +/// +/// Parses a DataFile from the REST Catalog JSON format. Maps (column-sizes, +/// value-counts, etc.) use the CountMap/BinaryMap parallel arrays format. +/// Binary fields (lower-bounds, upper-bounds, key-metadata) are base64-encoded. +/// +/// \param json The JSON object representing a `DataFile`. +/// \param partitionSpecById Map from spec ID to PartitionSpec for type-aware partition +/// parsing. +/// \param schema The table schema, used with partitionSpecById to resolve partition +/// types. +/// \return A `DataFile` object or an error if the conversion fails. +ICEBERG_EXPORT Result DataFileFromJson(const nlohmann::json& json); + +ICEBERG_EXPORT Result DataFileFromJson( + const nlohmann::json& json, + const std::unordered_map>& partitionSpecById, + const Schema& schema); + +/// \brief Deserializes a JSON array of file scan tasks. +/// +/// Each task may reference delete files by index into \p delete_files. +/// +/// \param json The JSON array of file scan task objects. +/// \param delete_files Delete files indexed by the tasks' delete-file-references. +/// \param partitionSpecById Map from spec ID to PartitionSpec for type-aware partition +/// parsing. +/// \param schema The table schema, used with partitionSpecById to resolve partition +/// types. +/// \return A vector of `FileScanTask` objects or an error if the conversion fails. +ICEBERG_EXPORT Result> FileScanTasksFromJson( + const nlohmann::json& json, const std::vector& delete_files, + const std::unordered_map>& partitionSpecById, + const Schema& schema); + } // namespace iceberg diff --git a/src/iceberg/result.h b/src/iceberg/result.h index 765508705..98246303d 100644 --- a/src/iceberg/result.h +++ b/src/iceberg/result.h @@ -48,6 +48,8 @@ enum class ErrorKind { kJsonParseError, kNamespaceNotEmpty, kNoSuchNamespace, + kNoSuchPlanId, + kNoSuchPlanTask, kNoSuchTable, kNoSuchView, kNotAllowed, @@ -111,6 +113,8 @@ DEFINE_ERROR_FUNCTION(IOError) DEFINE_ERROR_FUNCTION(JsonParseError) DEFINE_ERROR_FUNCTION(NamespaceNotEmpty) DEFINE_ERROR_FUNCTION(NoSuchNamespace) +DEFINE_ERROR_FUNCTION(NoSuchPlanId) +DEFINE_ERROR_FUNCTION(NoSuchPlanTask) DEFINE_ERROR_FUNCTION(NoSuchTable) DEFINE_ERROR_FUNCTION(NoSuchView) DEFINE_ERROR_FUNCTION(NotAllowed) diff --git a/src/iceberg/test/endpoint_test.cc b/src/iceberg/test/endpoint_test.cc index fcdc92a78..69cc56895 100644 --- a/src/iceberg/test/endpoint_test.cc +++ b/src/iceberg/test/endpoint_test.cc @@ -145,6 +145,31 @@ TEST(EndpointTest, TransactionEndpoints) { EXPECT_EQ(commit_transaction.path(), "/v1/{prefix}/transactions/commit"); } +// Test predefined scan planning endpoints +TEST(EndpointTest, ScanPlanEndpoints) { + auto plan_table_scan = Endpoint::PlanTableScan(); + EXPECT_EQ(plan_table_scan.method(), HttpMethod::kPost); + EXPECT_EQ(plan_table_scan.path(), + "/v1/{prefix}/namespaces/{namespace}/tables/{table}/plan"); + EXPECT_EQ(plan_table_scan.ToString(), + "POST /v1/{prefix}/namespaces/{namespace}/tables/{table}/plan"); + + auto fetch_planning_result = Endpoint::FetchPlanningResult(); + EXPECT_EQ(fetch_planning_result.method(), HttpMethod::kGet); + EXPECT_EQ(fetch_planning_result.path(), + "/v1/{prefix}/namespaces/{namespace}/tables/{table}/plan/{plan-id}"); + + auto cancel_planning = Endpoint::CancelPlanning(); + EXPECT_EQ(cancel_planning.method(), HttpMethod::kDelete); + EXPECT_EQ(cancel_planning.path(), + "/v1/{prefix}/namespaces/{namespace}/tables/{table}/plan/{plan-id}"); + + auto fetch_scan_tasks = Endpoint::FetchScanTasks(); + EXPECT_EQ(fetch_scan_tasks.method(), HttpMethod::kPost); + EXPECT_EQ(fetch_scan_tasks.path(), + "/v1/{prefix}/namespaces/{namespace}/tables/{table}/tasks"); +} + // Test endpoint equality TEST(EndpointTest, Equality) { auto endpoint1 = Endpoint::Make(HttpMethod::kGet, "/path"); @@ -242,6 +267,8 @@ TEST(EndpointTest, StringRoundTrip) { Endpoint::ListNamespaces(), Endpoint::GetNamespaceProperties(), Endpoint::CreateNamespace(), Endpoint::LoadTable(), Endpoint::CreateTable(), Endpoint::DeleteTable(), + Endpoint::PlanTableScan(), Endpoint::FetchPlanningResult(), + Endpoint::CancelPlanning(), Endpoint::FetchScanTasks(), }; for (const auto& original : endpoints) { diff --git a/src/iceberg/test/json_serde_test.cc b/src/iceberg/test/json_serde_test.cc index f019375d3..6979a02f2 100644 --- a/src/iceberg/test/json_serde_test.cc +++ b/src/iceberg/test/json_serde_test.cc @@ -23,7 +23,9 @@ #include #include +#include "iceberg/file_format.h" #include "iceberg/json_serde_internal.h" +#include "iceberg/manifest/manifest_entry.h" #include "iceberg/name_mapping.h" #include "iceberg/partition_spec.h" #include "iceberg/schema.h" @@ -32,6 +34,7 @@ #include "iceberg/sort_order.h" #include "iceberg/statistics_file.h" #include "iceberg/table_requirement.h" +#include "iceberg/table_scan.h" #include "iceberg/table_update.h" #include "iceberg/test/matchers.h" #include "iceberg/transform.h" @@ -772,4 +775,228 @@ TEST(TableRequirementJsonTest, TableRequirementUnknownType) { EXPECT_THAT(result, HasErrorMessage("Unknown table requirement type")); } +// ---- DataFileFromJson tests ---- + +TEST(DataFileFromJsonTest, RequiredFieldsOnly) { + auto json = R"({ + "content": "data", + "file-path": "s3://bucket/data/file.parquet", + "file-format": "PARQUET", + "file-size-in-bytes": 12345, + "record-count": 100 + })"_json; + + auto result = DataFileFromJson(json, {}, Schema({}, 0)); + ASSERT_THAT(result, IsOk()); + const auto& df = result.value(); + EXPECT_EQ(df.content, DataFile::Content::kData); + EXPECT_EQ(df.file_path, "s3://bucket/data/file.parquet"); + EXPECT_EQ(df.file_format, FileFormatType::kParquet); + EXPECT_EQ(df.file_size_in_bytes, 12345); + EXPECT_EQ(df.record_count, 100); + EXPECT_TRUE(df.column_sizes.empty()); + EXPECT_FALSE(df.sort_order_id.has_value()); + EXPECT_FALSE(df.partition_spec_id.has_value()); +} + +TEST(DataFileFromJsonTest, LowercaseFormat) { + auto json = R"({ + "content": "data", + "file-path": "s3://bucket/data/file.avro", + "file-format": "avro", + "file-size-in-bytes": 500, + "record-count": 10 + })"_json; + + auto result = DataFileFromJson(json, {}, Schema({}, 0)); + ASSERT_THAT(result, IsOk()); + EXPECT_EQ(result.value().content, DataFile::Content::kData); + EXPECT_EQ(result.value().file_format, FileFormatType::kAvro); +} + +TEST(DataFileFromJsonTest, WithOptionalFields) { + auto json = R"({ + "content": "data", + "file-path": "s3://bucket/data/file.parquet", + "file-format": "PARQUET", + "spec-id": 1, + "file-size-in-bytes": 12345, + "record-count": 100, + "column-sizes": {"keys": [1, 2], "values": [1000, 2000]}, + "value-counts": {"keys": [1, 2], "values": [100, 100]}, + "null-value-counts": {"keys": [1], "values": [0]}, + "nan-value-counts": {"keys": [2], "values": [5]}, + "split-offsets": [0, 4096], + "sort-order-id": 0 + })"_json; + + auto result = DataFileFromJson(json, {}, Schema({}, 0)); + ASSERT_THAT(result, IsOk()); + const auto& df = result.value(); + EXPECT_EQ(df.partition_spec_id, 1); + ASSERT_EQ(df.column_sizes.size(), 2U); + EXPECT_EQ(df.column_sizes.at(1), 1000); + EXPECT_EQ(df.column_sizes.at(2), 2000); + ASSERT_EQ(df.value_counts.size(), 2U); + EXPECT_EQ(df.value_counts.at(1), 100); + ASSERT_EQ(df.null_value_counts.size(), 1U); + EXPECT_EQ(df.null_value_counts.at(1), 0); + ASSERT_EQ(df.nan_value_counts.size(), 1U); + EXPECT_EQ(df.nan_value_counts.at(2), 5); + ASSERT_EQ(df.split_offsets.size(), 2U); + EXPECT_EQ(df.split_offsets[0], 0); + EXPECT_EQ(df.split_offsets[1], 4096); + EXPECT_EQ(df.sort_order_id, 0); +} + +TEST(DataFileFromJsonTest, EqualityDeleteFile) { + auto json = R"({ + "content": "equality_deletes", + "file-path": "s3://bucket/deletes/eq_delete.parquet", + "file-format": "PARQUET", + "file-size-in-bytes": 5000, + "record-count": 50, + "equality-ids": [1, 2] + })"_json; + + auto result = DataFileFromJson(json, {}, Schema({}, 0)); + ASSERT_THAT(result, IsOk()); + const auto& df = result.value(); + EXPECT_EQ(df.content, DataFile::Content::kEqualityDeletes); + ASSERT_EQ(df.equality_ids.size(), 2U); + EXPECT_EQ(df.equality_ids[0], 1); + EXPECT_EQ(df.equality_ids[1], 2); +} + +TEST(DataFileFromJsonTest, PositionDeleteFileWithReferencedDataFile) { + auto json = R"({ + "content": "position_deletes", + "file-path": "s3://bucket/deletes/pos_delete.parquet", + "file-format": "PARQUET", + "file-size-in-bytes": 3000, + "record-count": 20, + "referenced-data-file": "s3://bucket/data/file.parquet" + })"_json; + + auto result = DataFileFromJson(json, {}, Schema({}, 0)); + ASSERT_THAT(result, IsOk()); + const auto& df = result.value(); + EXPECT_EQ(df.content, DataFile::Content::kPositionDeletes); + ASSERT_TRUE(df.referenced_data_file.has_value()); + EXPECT_EQ(df.referenced_data_file.value(), "s3://bucket/data/file.parquet"); +} + +TEST(DataFileFromJsonTest, InvalidContentType) { + auto json = R"({ + "content": "UNKNOWN", + "file-path": "s3://bucket/file.parquet", + "file-format": "PARQUET", + "file-size-in-bytes": 100, + "record-count": 10 + })"_json; + + auto result = DataFileFromJson(json, {}, Schema({}, 0)); + EXPECT_THAT(result, IsError(ErrorKind::kJsonParseError)); + EXPECT_THAT(result, HasErrorMessage("Unknown data file content")); +} + +TEST(DataFileFromJsonTest, MissingRequiredField) { + // Missing "file-path" + auto json = R"({ + "content": "data", + "file-format": "PARQUET", + "file-size-in-bytes": 100, + "record-count": 10 + })"_json; + + auto result = DataFileFromJson(json, {}, Schema({}, 0)); + EXPECT_THAT(result, IsError(ErrorKind::kJsonParseError)); +} + +TEST(DataFileFromJsonTest, NotAnObject) { + auto result = DataFileFromJson(nlohmann::json::array(), {}, Schema({}, 0)); + EXPECT_THAT(result, IsError(ErrorKind::kJsonParseError)); + EXPECT_THAT(result, HasErrorMessage("DataFile must be a JSON object")); +} + +// ---- FileScanTasksFromJson tests ---- + +TEST(FileScanTasksFromJsonTest, EmptyArray) { + auto result = FileScanTasksFromJson(nlohmann::json::array(), {}, {}, Schema({}, 0)); + ASSERT_THAT(result, IsOk()); + EXPECT_TRUE(result.value().empty()); +} + +TEST(FileScanTasksFromJsonTest, SingleTaskNoDeleteFiles) { + auto json = R"([{ + "data-file": { + "content": "data", + "file-path": "s3://bucket/data/file.parquet", + "file-format": "PARQUET", + "file-size-in-bytes": 12345, + "record-count": 100 + } + }])"_json; + + auto result = FileScanTasksFromJson(json, {}, {}, Schema({}, 0)); + ASSERT_THAT(result, IsOk()); + ASSERT_EQ(result.value().size(), 1U); + const auto& task = result.value()[0]; + ASSERT_NE(task.data_file(), nullptr); + EXPECT_EQ(task.data_file()->file_path, "s3://bucket/data/file.parquet"); + EXPECT_TRUE(task.delete_files().empty()); + EXPECT_EQ(task.residual_filter(), nullptr); +} + +TEST(FileScanTasksFromJsonTest, TaskWithDeleteFileReferences) { + DataFile delete_file; + delete_file.content = DataFile::Content::kPositionDeletes; + delete_file.file_path = "s3://bucket/deletes/pos_delete.parquet"; + delete_file.file_format = FileFormatType::kParquet; + delete_file.file_size_in_bytes = 1000; + delete_file.record_count = 5; + + auto json = R"([{ + "data-file": { + "content": "data", + "file-path": "s3://bucket/data/file.parquet", + "file-format": "PARQUET", + "file-size-in-bytes": 12345, + "record-count": 100 + }, + "delete-file-references": [0] + }])"_json; + + auto result = FileScanTasksFromJson(json, {delete_file}, {}, Schema({}, 0)); + ASSERT_THAT(result, IsOk()); + ASSERT_EQ(result.value().size(), 1U); + const auto& task = result.value()[0]; + ASSERT_EQ(task.delete_files().size(), 1U); + EXPECT_EQ(task.delete_files()[0]->file_path, "s3://bucket/deletes/pos_delete.parquet"); +} + +TEST(FileScanTasksFromJsonTest, DeleteFileReferenceOutOfRange) { + auto json = R"([{ + "data-file": { + "content": "data", + "file-path": "s3://bucket/data/file.parquet", + "file-format": "PARQUET", + "file-size-in-bytes": 100, + "record-count": 10 + }, + "delete-file-references": [5] + }])"_json; + + // No delete files provided, so index 5 is out of range + auto result = FileScanTasksFromJson(json, {}, {}, Schema({}, 0)); + EXPECT_THAT(result, IsError(ErrorKind::kJsonParseError)); + EXPECT_THAT(result, HasErrorMessage("out of range")); +} + +TEST(FileScanTasksFromJsonTest, NotAnArray) { + auto result = FileScanTasksFromJson(nlohmann::json::object(), {}, {}, Schema({}, 0)); + EXPECT_THAT(result, IsError(ErrorKind::kJsonParseError)); + EXPECT_THAT(result, HasErrorMessage("non-array")); +} + } // namespace iceberg diff --git a/src/iceberg/test/rest_catalog_integration_test.cc b/src/iceberg/test/rest_catalog_integration_test.cc index b364ffd36..7f03932c1 100644 --- a/src/iceberg/test/rest_catalog_integration_test.cc +++ b/src/iceberg/test/rest_catalog_integration_test.cc @@ -46,6 +46,7 @@ #include "iceberg/table.h" #include "iceberg/table_identifier.h" #include "iceberg/table_requirement.h" +#include "iceberg/table_scan.h" #include "iceberg/table_update.h" #include "iceberg/test/matchers.h" #include "iceberg/test/std_io.h" @@ -448,6 +449,83 @@ TEST_F(RestCatalogIntegrationTest, StageCreateTable) { EXPECT_EQ(committed->metadata()->properties.configs().at("key1"), "value1"); } +// -- Scan plan operations -- + +TEST_F(RestCatalogIntegrationTest, PlanTableScan) { + Namespace ns{.levels = {"test_plan_table_scan"}}; + ICEBERG_UNWRAP_OR_FAIL(auto catalog, CreateCatalogAndNamespace(ns)); + TableIdentifier table_id{.ns = ns, .name = "scan_table"}; + ASSERT_THAT(CreateDefaultTable(catalog, table_id), IsOk()); + ICEBERG_UNWRAP_OR_FAIL(auto table, catalog->LoadTable(table_id)); + + internal::TableScanContext context; + ICEBERG_UNWRAP_OR_FAIL(auto response, catalog->PlanTableScan(*table, context)); + // Empty table: no file scan tasks returned + EXPECT_TRUE(response.file_scan_tasks.empty()); +} + +TEST_F(RestCatalogIntegrationTest, PlanTableScanWithContext) { + Namespace ns{.levels = {"test_plan_scan_context"}}; + ICEBERG_UNWRAP_OR_FAIL(auto catalog, CreateCatalogAndNamespace(ns)); + TableIdentifier table_id{.ns = ns, .name = "context_scan_table"}; + ASSERT_THAT(CreateDefaultTable(catalog, table_id), IsOk()); + ICEBERG_UNWRAP_OR_FAIL(auto table, catalog->LoadTable(table_id)); + + internal::TableScanContext context; + context.selected_columns = {"id", "data"}; + context.case_sensitive = true; + ICEBERG_UNWRAP_OR_FAIL(auto response, catalog->PlanTableScan(*table, context)); + EXPECT_TRUE(response.file_scan_tasks.empty()); +} + +TEST_F(RestCatalogIntegrationTest, FetchPlanningResult) { + Namespace ns{.levels = {"test_fetch_planning_result"}}; + ICEBERG_UNWRAP_OR_FAIL(auto catalog, CreateCatalogAndNamespace(ns)); + TableIdentifier table_id{.ns = ns, .name = "fetch_plan_table"}; + ASSERT_THAT(CreateDefaultTable(catalog, table_id), IsOk()); + ICEBERG_UNWRAP_OR_FAIL(auto table, catalog->LoadTable(table_id)); + + internal::TableScanContext context; + ICEBERG_UNWRAP_OR_FAIL(auto plan_response, catalog->PlanTableScan(*table, context)); + + // NOTE: apache/iceberg-rest-fixture always responds synchronously (status="completed") + // with a non-empty plan_id (e.g. "sync-"). Sync plans are immediately discarded + // server-side. Async paths are covered by unit tests. + EXPECT_FALSE(plan_response.plan_id.empty()); +} + +TEST_F(RestCatalogIntegrationTest, CancelPlanning) { + Namespace ns{.levels = {"test_cancel_planning"}}; + ICEBERG_UNWRAP_OR_FAIL(auto catalog, CreateCatalogAndNamespace(ns)); + TableIdentifier table_id{.ns = ns, .name = "cancel_plan_table"}; + ASSERT_THAT(CreateDefaultTable(catalog, table_id), IsOk()); + ICEBERG_UNWRAP_OR_FAIL(auto table, catalog->LoadTable(table_id)); + + internal::TableScanContext context; + ICEBERG_UNWRAP_OR_FAIL(auto plan_response, catalog->PlanTableScan(*table, context)); + + // NOTE: apache/iceberg-rest-fixture always responds synchronously — sync plan_id is + // accepted for cancellation (idempotent). Async paths are covered by unit tests. + ASSERT_FALSE(plan_response.plan_id.empty()); + ASSERT_THAT(catalog->CancelPlanning(*table, plan_response.plan_id), IsOk()); +} + +TEST_F(RestCatalogIntegrationTest, FetchScanTasks) { + Namespace ns{.levels = {"test_fetch_scan_tasks"}}; + ICEBERG_UNWRAP_OR_FAIL(auto catalog, CreateCatalogAndNamespace(ns)); + TableIdentifier table_id{.ns = ns, .name = "fetch_tasks_table"}; + ASSERT_THAT(CreateDefaultTable(catalog, table_id), IsOk()); + ICEBERG_UNWRAP_OR_FAIL(auto table, catalog->LoadTable(table_id)); + + internal::TableScanContext context; + ICEBERG_UNWRAP_OR_FAIL(auto plan_response, catalog->PlanTableScan(*table, context)); + + // NOTE: apache/iceberg-rest-fixture always responds synchronously — plan_tasks is + // always empty. FetchScanTasks is only relevant for async plans; async paths are + // covered by unit tests. + EXPECT_TRUE(plan_response.plan_tasks.empty()); +} + // -- Snapshot loading mode -- TEST_F(RestCatalogIntegrationTest, LoadTableWithSnapshotModeAll) { diff --git a/src/iceberg/test/rest_json_serde_test.cc b/src/iceberg/test/rest_json_serde_test.cc index 9da052e6a..e000f26b9 100644 --- a/src/iceberg/test/rest_json_serde_test.cc +++ b/src/iceberg/test/rest_json_serde_test.cc @@ -27,6 +27,7 @@ #include "iceberg/catalog/rest/types.h" #include "iceberg/partition_spec.h" #include "iceberg/result.h" +#include "iceberg/schema.h" #include "iceberg/sort_order.h" #include "iceberg/table_identifier.h" #include "iceberg/table_metadata.h" @@ -1380,4 +1381,139 @@ INSTANTIATE_TEST_SUITE_P( return info.param.test_name; }); +// Helper: empty schema and specs for scan response tests that don't need partition +// parsing. +static Schema EmptySchema() { return Schema({}, 0); } +static std::unordered_map> EmptySpecs() { + return {}; +} + +// --- PlanTableScanResponse --- + +TEST(PlanTableScanResponseFromJsonTest, SubmittedStatusMissingOptionalFields) { + // "submitted" response: only status and plan-id, no tasks + auto json = nlohmann::json::parse(R"({"status":"submitted","plan-id":"abc-123"})"); + auto result = PlanTableScanResponseFromJson(json, EmptySpecs(), EmptySchema()); + ASSERT_THAT(result, IsOk()); + EXPECT_EQ(result->plan_status, "submitted"); + EXPECT_EQ(result->plan_id, "abc-123"); + EXPECT_TRUE(result->plan_tasks.empty()); + EXPECT_TRUE(result->file_scan_tasks.empty()); + EXPECT_TRUE(result->delete_files.empty()); +} + +TEST(PlanTableScanResponseFromJsonTest, CompletedStatusWithPlanTasks) { + // "completed" response with plan-tasks but no file-scan-tasks + auto json = nlohmann::json::parse( + R"({"status":"completed","plan-id":"abc-123","plan-tasks":["task-1","task-2"],"delete-files":[],"file-scan-tasks":[]})"); + auto result = PlanTableScanResponseFromJson(json, EmptySpecs(), EmptySchema()); + ASSERT_THAT(result, IsOk()); + EXPECT_EQ(result->plan_status, "completed"); + EXPECT_EQ(result->plan_id, "abc-123"); + ASSERT_EQ(result->plan_tasks.size(), 2); + EXPECT_EQ(result->plan_tasks[0], "task-1"); + EXPECT_EQ(result->plan_tasks[1], "task-2"); +} + +TEST(PlanTableScanResponseFromJsonTest, MissingRequiredStatus) { + auto json = nlohmann::json::parse(R"({"plan-id":"abc-123"})"); + auto result = PlanTableScanResponseFromJson(json, EmptySpecs(), EmptySchema()); + ASSERT_THAT(result, IsError(ErrorKind::kJsonParseError)); + EXPECT_THAT(result, HasErrorMessage("Missing 'status'")); +} + +TEST(PlanTableScanResponseFromJsonTest, MissingPlanIdDefaultsToEmptyForFailedStatus) { + // plan-id is optional for non-submitted/completed statuses + auto json = nlohmann::json::parse(R"({"status":"failed"})"); + auto result = PlanTableScanResponseFromJson(json, EmptySpecs(), EmptySchema()); + ASSERT_THAT(result, IsOk()); + EXPECT_TRUE(result->plan_id.empty()); +} + +// --- FetchPlanningResultResponse --- + +TEST(FetchPlanningResultResponseFromJsonTest, SubmittedStatusNoTasks) { + auto json = nlohmann::json::parse(R"({"status":"submitted"})"); + auto result = FetchPlanningResultResponseFromJson(json, EmptySpecs(), EmptySchema()); + ASSERT_THAT(result, IsOk()); + EXPECT_EQ(result->plan_status, "submitted"); + EXPECT_TRUE(result->plan_tasks.empty()); + EXPECT_TRUE(result->file_scan_tasks.empty()); + EXPECT_TRUE(result->delete_files.empty()); +} + +TEST(FetchPlanningResultResponseFromJsonTest, CompletedStatusWithPlanTasks) { + auto json = nlohmann::json::parse( + R"({"status":"completed","plan-tasks":["task-1"],"delete-files":[],"file-scan-tasks":[]})"); + auto result = FetchPlanningResultResponseFromJson(json, EmptySpecs(), EmptySchema()); + ASSERT_THAT(result, IsOk()); + EXPECT_EQ(result->plan_status, "completed"); + ASSERT_EQ(result->plan_tasks.size(), 1); + EXPECT_EQ(result->plan_tasks[0], "task-1"); +} + +TEST(FetchPlanningResultResponseFromJsonTest, MissingRequiredStatus) { + auto json = nlohmann::json::parse(R"({})"); + auto result = FetchPlanningResultResponseFromJson(json, EmptySpecs(), EmptySchema()); + ASSERT_THAT(result, IsError(ErrorKind::kJsonParseError)); + EXPECT_THAT(result, HasErrorMessage("Missing 'status'")); +} + +// --- FetchScanTasksResponse --- + +TEST(FetchScanTasksResponseFromJsonTest, WithFileScanTasks) { + // One file scan task with a data file and one delete file referenced by index. + auto json = nlohmann::json::parse(R"({ + "plan-tasks": [], + "delete-files": [ + { + "content": "position_deletes", + "file-path": "s3://bucket/deletes/delete.parquet", + "file-format": "PARQUET", + "file-size-in-bytes": 512, + "record-count": 5 + } + ], + "file-scan-tasks": [ + { + "data-file": { + "content": "data", + "file-path": "s3://bucket/data/file.parquet", + "file-format": "PARQUET", + "file-size-in-bytes": 12345, + "record-count": 100 + }, + "delete-file-references": [0] + } + ] + })"); + auto result = FetchScanTasksResponseFromJson(json, EmptySpecs(), EmptySchema()); + ASSERT_THAT(result, IsOk()); + EXPECT_TRUE(result->plan_tasks.empty()); + ASSERT_EQ(result->delete_files.size(), 1); + ASSERT_EQ(result->file_scan_tasks.size(), 1); + EXPECT_EQ(result->file_scan_tasks[0].data_file()->file_path, + "s3://bucket/data/file.parquet"); + ASSERT_EQ(result->file_scan_tasks[0].delete_files().size(), 1); + EXPECT_EQ(result->file_scan_tasks[0].delete_files()[0]->file_path, + "s3://bucket/deletes/delete.parquet"); +} + +TEST(FetchScanTasksResponseFromJsonTest, WithPlanTasksOnly) { + auto json = nlohmann::json::parse( + R"({"plan-tasks":["task-1","task-2"],"delete-files":[],"file-scan-tasks":[]})"); + auto result = FetchScanTasksResponseFromJson(json, EmptySpecs(), EmptySchema()); + ASSERT_THAT(result, IsOk()); + ASSERT_EQ(result->plan_tasks.size(), 2); + EXPECT_EQ(result->plan_tasks[0], "task-1"); + EXPECT_TRUE(result->file_scan_tasks.empty()); +} + +TEST(FetchScanTasksResponseFromJsonTest, AllFieldsMissing) { + // Both plan-tasks and file-scan-tasks absent → Validate() should fail + auto json = nlohmann::json::parse(R"({})"); + auto result = FetchScanTasksResponseFromJson(json, EmptySpecs(), EmptySchema()); + ASSERT_THAT(result, IsError(ErrorKind::kValidationFailed)); +} + } // namespace iceberg::rest