Skip to content

Commit 48c2739

Browse files
committed
feat(data): add MOR file scan task reader
Add FileScanTaskReader as the delete-aware Arrow stream entrypoint for file scan tasks, covering both no-delete reader passthrough and merge-on-read filtering with position and equality deletes. Introduce reusable Arrow C Data utilities for stream wrapping and batch projection, including ProjectionContext caching and optional Arrow compute registration via arrow::RegisterAll. Move Arrow IO registration into arrow_register and remove FileScanTask::ToArrow. Add coverage for projection behavior across nanoarrow and Arrow compute paths, plus end-to-end FileScanTaskReader tests for projected reads, position deletes, equality deletes, dropped equality fields, and fully deleted batches.
1 parent c41c269 commit 48c2739

17 files changed

Lines changed: 1865 additions & 165 deletions

src/iceberg/CMakeLists.txt

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@
1818
set(ICEBERG_INCLUDES "$<BUILD_INTERFACE:${PROJECT_BINARY_DIR}/src>"
1919
"$<BUILD_INTERFACE:${PROJECT_SOURCE_DIR}/src>")
2020
set(ICEBERG_SOURCES
21+
arrow_c_data_util.cc
2122
arrow_c_data_guard_internal.cc
2223
catalog/memory/in_memory_catalog.cc
2324
delete_file_index.cc
@@ -164,6 +165,7 @@ set(ICEBERG_DATA_SOURCES
164165
data/delete_filter.cc
165166
data/delete_loader.cc
166167
data/equality_delete_writer.cc
168+
data/file_scan_task_reader.cc
167169
data/position_delete_writer.cc
168170
data/writer.cc
169171
deletes/position_delete_index.cc
@@ -220,9 +222,10 @@ add_subdirectory(util)
220222

221223
if(ICEBERG_BUILD_BUNDLE)
222224
set(ICEBERG_BUNDLE_SOURCES
225+
arrow/arrow_c_data_util.cc
223226
arrow/arrow_io.cc
224227
arrow/s3/arrow_s3_file_io.cc
225-
arrow/arrow_io_register.cc
228+
arrow/arrow_register.cc
226229
arrow/metadata_column_util.cc
227230
avro/avro_data_util.cc
228231
avro/avro_direct_decoder.cc
Lines changed: 130 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,130 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing,
13+
* software distributed under the License is distributed on an
14+
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15+
* KIND, either express or implied. See the License for the
16+
* specific language governing permissions and limitations
17+
* under the License.
18+
*/
19+
20+
#include <cstdint>
21+
#include <memory>
22+
#include <mutex>
23+
#include <span>
24+
#include <utility>
25+
#include <vector>
26+
27+
#include <arrow/array/array_primitive.h>
28+
#include <arrow/buffer.h>
29+
#include <arrow/c/bridge.h>
30+
#include <arrow/compute/api_vector.h>
31+
#include <arrow/record_batch.h>
32+
#include <nanoarrow/nanoarrow.h>
33+
34+
#include "iceberg/arrow/arrow_status_internal.h"
35+
#include "iceberg/arrow/nanoarrow_status_internal.h"
36+
#include "iceberg/arrow_c_data_guard_internal.h"
37+
#include "iceberg/arrow_c_data_util_internal.h"
38+
#include "iceberg/result.h"
39+
#include "iceberg/util/macros.h"
40+
41+
namespace iceberg {
42+
43+
namespace {
44+
45+
struct ArrowProjectBatchState {
46+
std::shared_ptr<::arrow::Schema> input_schema;
47+
std::shared_ptr<::arrow::Schema> output_schema;
48+
};
49+
50+
Result<std::shared_ptr<::arrow::Schema>> ImportArrowSchema(
51+
const ArrowSchema& arrow_schema) {
52+
ArrowSchema schema_copy;
53+
ICEBERG_NANOARROW_RETURN_UNEXPECTED(ArrowSchemaDeepCopy(&arrow_schema, &schema_copy));
54+
internal::ArrowSchemaGuard schema_copy_guard(&schema_copy);
55+
56+
ICEBERG_ARROW_ASSIGN_OR_RETURN(auto schema, ::arrow::ImportSchema(&schema_copy));
57+
return schema;
58+
}
59+
60+
Result<std::shared_ptr<ArrowProjectBatchState>> GetArrowProjectBatchState(
61+
ProjectionContext& projection) {
62+
auto state =
63+
std::static_pointer_cast<ArrowProjectBatchState>(projection.project_batch_state());
64+
if (state != nullptr) {
65+
return state;
66+
}
67+
68+
ICEBERG_ASSIGN_OR_RAISE(auto input_schema,
69+
ImportArrowSchema(projection.input_arrow_schema()));
70+
ICEBERG_ASSIGN_OR_RAISE(auto output_schema,
71+
ImportArrowSchema(projection.output_arrow_schema()));
72+
73+
state = std::make_shared<ArrowProjectBatchState>(
74+
ArrowProjectBatchState{.input_schema = std::move(input_schema),
75+
.output_schema = std::move(output_schema)});
76+
projection.project_batch_state() = state;
77+
return state;
78+
}
79+
80+
Result<ArrowArray> ProjectBatchArrowCompute(ArrowArray* input_batch,
81+
std::span<const int32_t> row_indices,
82+
ProjectionContext& projection) {
83+
ICEBERG_PRECHECK(input_batch != nullptr, "input_batch must not be null");
84+
ICEBERG_ASSIGN_OR_RAISE(auto state, GetArrowProjectBatchState(projection));
85+
86+
ICEBERG_ARROW_ASSIGN_OR_RETURN(
87+
auto input_record_batch,
88+
::arrow::ImportRecordBatch(input_batch, state->input_schema));
89+
90+
const int32_t empty_index = 0;
91+
const int32_t* row_indices_data =
92+
row_indices.empty() ? &empty_index : row_indices.data();
93+
auto index_array = std::make_shared<::arrow::Int32Array>(
94+
static_cast<int64_t>(row_indices.size()),
95+
::arrow::Buffer::Wrap(row_indices_data, row_indices.size()));
96+
97+
std::vector<std::shared_ptr<::arrow::Array>> output_columns;
98+
output_columns.reserve(projection.selected_field_indices().size());
99+
for (int input_index : projection.selected_field_indices()) {
100+
ICEBERG_PRECHECK(input_index >= 0 && input_index < input_record_batch->num_columns(),
101+
"Input field index {} out of range for batch with {} columns",
102+
input_index, input_record_batch->num_columns());
103+
ICEBERG_ARROW_ASSIGN_OR_RETURN(
104+
auto taken_column,
105+
::arrow::compute::Take(*input_record_batch->column(input_index), *index_array));
106+
output_columns.push_back(std::move(taken_column));
107+
}
108+
109+
auto output_record_batch = ::arrow::RecordBatch::Make(
110+
state->output_schema, static_cast<int64_t>(row_indices.size()),
111+
std::move(output_columns));
112+
113+
ArrowArray output_array;
114+
ICEBERG_ARROW_RETURN_NOT_OK(
115+
::arrow::ExportRecordBatch(*output_record_batch, &output_array));
116+
internal::ArrowArrayGuard output_array_guard(&output_array);
117+
118+
return std::exchange(output_array, ArrowArray{});
119+
}
120+
121+
} // namespace
122+
123+
void RegisterArrowProjectBatch() {
124+
static std::once_flag flag;
125+
std::call_once(flag, []() {
126+
ProjectionContext::RegisterProjectBatchFunction(&ProjectBatchArrowCompute);
127+
});
128+
}
129+
130+
} // namespace iceberg
Original file line numberDiff line numberDiff line change
@@ -1,28 +1,35 @@
1-
// Licensed to the Apache Software Foundation (ASF) under one
2-
// or more contributor license agreements. See the NOTICE file
3-
// distributed with this work for additional information
4-
// regarding copyright ownership. The ASF licenses this file
5-
// to you under the Apache License, Version 2.0 (the
6-
// "License"); you may not use this file except in compliance
7-
// with the License. You may obtain a copy of the License at
8-
//
9-
// http://www.apache.org/licenses/LICENSE-2.0
10-
//
11-
// Unless required by applicable law or agreed to in writing,
12-
// software distributed under the License is distributed on an
13-
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14-
// KIND, either express or implied. See the License for the
15-
// specific language governing permissions and limitations
16-
// under the License.
17-
18-
#include "iceberg/arrow/arrow_io_register.h"
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing,
13+
* software distributed under the License is distributed on an
14+
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15+
* KIND, either express or implied. See the License for the
16+
* specific language governing permissions and limitations
17+
* under the License.
18+
*/
19+
20+
#include "iceberg/arrow/arrow_register.h"
1921

2022
#include <mutex>
2123
#include <string>
24+
#include <unordered_map>
2225

2326
#include "iceberg/arrow/arrow_io_util.h"
2427
#include "iceberg/file_io_registry.h"
2528

29+
namespace iceberg {
30+
void RegisterArrowProjectBatch();
31+
}
32+
2633
namespace iceberg::arrow {
2734

2835
namespace {
@@ -43,8 +50,6 @@ void RegisterS3FileIO() {
4350
#endif
4451
}
4552

46-
} // namespace
47-
4853
void EnsureArrowFileIOsRegistered() {
4954
static std::once_flag flag;
5055
std::call_once(flag, []() {
@@ -58,4 +63,11 @@ void EnsureArrowFileIOsRegistered() {
5863
return true;
5964
}();
6065

66+
} // namespace
67+
68+
void RegisterAll() {
69+
EnsureArrowFileIOsRegistered();
70+
::iceberg::RegisterArrowProjectBatch();
71+
}
72+
6173
} // namespace iceberg::arrow
Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -19,16 +19,16 @@
1919

2020
#pragma once
2121

22-
/// \file iceberg/arrow/arrow_io_register.h
23-
/// \brief Provide functions to register Arrow FileIO implementations.
22+
/// \file iceberg/arrow/arrow_register.h
23+
/// \brief Provide functions to register Arrow bundle integrations.
2424

2525
#include "iceberg/iceberg_bundle_export.h"
2626

2727
namespace iceberg::arrow {
2828

29-
/// \brief Register built-in Arrow FileIO implementations into the FileIORegistry.
29+
/// \brief Register Arrow FileIOs and Arrow-backed C Data utilities.
3030
///
3131
/// This operation is idempotent and safe to call multiple times.
32-
ICEBERG_BUNDLE_EXPORT void EnsureArrowFileIOsRegistered();
32+
ICEBERG_BUNDLE_EXPORT void RegisterAll();
3333

3434
} // namespace iceberg::arrow

0 commit comments

Comments
 (0)