Skip to content

Commit 3d2537c

Browse files
authored
feat(data): add MOR file scan task reader (#657)
Add FileScanTaskReader as the delete-aware Arrow stream entrypoint for file scan tasks, covering both no-delete reader passthrough and merge-on-read filtering with position and equality deletes. Introduce reusable Arrow C Data utilities for stream wrapping and batch projection, including ProjectionContext caching and optional Arrow compute registration via arrow::RegisterAll. Move Arrow IO registration into arrow_register and remove FileScanTask::ToArrow. Add coverage for projection behavior across nanoarrow and Arrow compute paths, plus end-to-end FileScanTaskReader tests for projected reads, position deletes, equality deletes, dropped equality fields, and fully deleted batches.
1 parent e97a8e8 commit 3d2537c

18 files changed

Lines changed: 1877 additions & 166 deletions

src/iceberg/CMakeLists.txt

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@
1818
set(ICEBERG_INCLUDES "$<BUILD_INTERFACE:${PROJECT_BINARY_DIR}/src>"
1919
"$<BUILD_INTERFACE:${PROJECT_SOURCE_DIR}/src>")
2020
set(ICEBERG_SOURCES
21+
arrow_c_data_util.cc
2122
arrow_c_data_guard_internal.cc
2223
catalog/memory/in_memory_catalog.cc
2324
delete_file_index.cc
@@ -166,6 +167,7 @@ set(ICEBERG_DATA_SOURCES
166167
data/delete_filter.cc
167168
data/delete_loader.cc
168169
data/equality_delete_writer.cc
170+
data/file_scan_task_reader.cc
169171
data/position_delete_writer.cc
170172
data/writer.cc
171173
deletes/position_delete_index.cc
@@ -222,9 +224,10 @@ add_subdirectory(util)
222224

223225
if(ICEBERG_BUILD_BUNDLE)
224226
set(ICEBERG_BUNDLE_SOURCES
227+
arrow/arrow_c_data_util.cc
225228
arrow/arrow_io.cc
226229
arrow/s3/arrow_s3_file_io.cc
227-
arrow/arrow_io_register.cc
230+
arrow/arrow_register.cc
228231
arrow/metadata_column_util.cc
229232
avro/avro_data_util.cc
230233
avro/avro_direct_decoder.cc
Lines changed: 131 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,131 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing,
13+
* software distributed under the License is distributed on an
14+
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15+
* KIND, either express or implied. See the License for the
16+
* specific language governing permissions and limitations
17+
* under the License.
18+
*/
19+
20+
#include <cstdint>
21+
#include <memory>
22+
#include <mutex>
23+
#include <span>
24+
#include <utility>
25+
#include <vector>
26+
27+
#include <arrow/array/array_primitive.h>
28+
#include <arrow/buffer.h>
29+
#include <arrow/c/bridge.h>
30+
#include <arrow/compute/api_vector.h>
31+
#include <arrow/record_batch.h>
32+
#include <nanoarrow/nanoarrow.h>
33+
34+
#include "iceberg/arrow/arrow_status_internal.h"
35+
#include "iceberg/arrow/nanoarrow_status_internal.h"
36+
#include "iceberg/arrow_c_data_guard_internal.h"
37+
#include "iceberg/arrow_c_data_util_internal.h"
38+
#include "iceberg/result.h"
39+
#include "iceberg/util/macros.h"
40+
41+
namespace iceberg {
42+
43+
namespace {
44+
45+
struct ArrowProjectBatchState {
46+
std::shared_ptr<::arrow::Schema> input_schema;
47+
std::shared_ptr<::arrow::Schema> output_schema;
48+
};
49+
50+
Result<std::shared_ptr<::arrow::Schema>> ImportArrowSchema(
51+
const ArrowSchema& arrow_schema) {
52+
ArrowSchema schema_copy;
53+
ICEBERG_NANOARROW_RETURN_UNEXPECTED(ArrowSchemaDeepCopy(&arrow_schema, &schema_copy));
54+
internal::ArrowSchemaGuard schema_copy_guard(&schema_copy);
55+
56+
ICEBERG_ARROW_ASSIGN_OR_RETURN(auto schema, ::arrow::ImportSchema(&schema_copy));
57+
return schema;
58+
}
59+
60+
Result<std::shared_ptr<ArrowProjectBatchState>> GetArrowProjectBatchState(
61+
ProjectionContext& projection) {
62+
auto state =
63+
std::static_pointer_cast<ArrowProjectBatchState>(projection.project_batch_state());
64+
if (state != nullptr) {
65+
return state;
66+
}
67+
68+
ICEBERG_ASSIGN_OR_RAISE(auto input_schema,
69+
ImportArrowSchema(projection.input_arrow_schema()));
70+
ICEBERG_ASSIGN_OR_RAISE(auto output_schema,
71+
ImportArrowSchema(projection.output_arrow_schema()));
72+
73+
state = std::make_shared<ArrowProjectBatchState>(
74+
ArrowProjectBatchState{.input_schema = std::move(input_schema),
75+
.output_schema = std::move(output_schema)});
76+
projection.project_batch_state() = state;
77+
return state;
78+
}
79+
80+
Result<ArrowArray> ProjectBatchArrowCompute(ArrowArray* input_batch,
81+
std::span<const int32_t> row_indices,
82+
ProjectionContext& projection) {
83+
ICEBERG_PRECHECK(input_batch != nullptr, "input_batch must not be null");
84+
ICEBERG_ASSIGN_OR_RAISE(auto state, GetArrowProjectBatchState(projection));
85+
86+
ICEBERG_ARROW_ASSIGN_OR_RETURN(
87+
auto input_record_batch,
88+
::arrow::ImportRecordBatch(input_batch, state->input_schema));
89+
90+
const int32_t empty_index = 0;
91+
// Buffer::Wrap needs a valid pointer even when the zero-length buffer is never read.
92+
const int32_t* row_indices_data =
93+
row_indices.empty() ? &empty_index : row_indices.data();
94+
auto index_array = std::make_shared<::arrow::Int32Array>(
95+
static_cast<int64_t>(row_indices.size()),
96+
::arrow::Buffer::Wrap(row_indices_data, row_indices.size()));
97+
98+
std::vector<std::shared_ptr<::arrow::Array>> output_columns;
99+
output_columns.reserve(projection.selected_field_indices().size());
100+
for (int32_t input_index : projection.selected_field_indices()) {
101+
ICEBERG_PRECHECK(input_index >= 0 && input_index < input_record_batch->num_columns(),
102+
"Input field index {} out of range for batch with {} columns",
103+
input_index, input_record_batch->num_columns());
104+
ICEBERG_ARROW_ASSIGN_OR_RETURN(
105+
auto taken_column,
106+
::arrow::compute::Take(*input_record_batch->column(input_index), *index_array));
107+
output_columns.push_back(std::move(taken_column));
108+
}
109+
110+
auto output_record_batch = ::arrow::RecordBatch::Make(
111+
state->output_schema, static_cast<int64_t>(row_indices.size()),
112+
std::move(output_columns));
113+
114+
ArrowArray output_array;
115+
ICEBERG_ARROW_RETURN_NOT_OK(
116+
::arrow::ExportRecordBatch(*output_record_batch, &output_array));
117+
internal::ArrowArrayGuard output_array_guard(&output_array);
118+
119+
return std::exchange(output_array, ArrowArray{});
120+
}
121+
122+
} // namespace
123+
124+
void RegisterArrowProjectBatch() {
125+
static std::once_flag flag;
126+
std::call_once(flag, []() {
127+
ProjectionContext::RegisterProjectBatchFunction(&ProjectBatchArrowCompute);
128+
});
129+
}
130+
131+
} // namespace iceberg
Original file line numberDiff line numberDiff line change
@@ -1,28 +1,35 @@
1-
// Licensed to the Apache Software Foundation (ASF) under one
2-
// or more contributor license agreements. See the NOTICE file
3-
// distributed with this work for additional information
4-
// regarding copyright ownership. The ASF licenses this file
5-
// to you under the Apache License, Version 2.0 (the
6-
// "License"); you may not use this file except in compliance
7-
// with the License. You may obtain a copy of the License at
8-
//
9-
// http://www.apache.org/licenses/LICENSE-2.0
10-
//
11-
// Unless required by applicable law or agreed to in writing,
12-
// software distributed under the License is distributed on an
13-
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14-
// KIND, either express or implied. See the License for the
15-
// specific language governing permissions and limitations
16-
// under the License.
17-
18-
#include "iceberg/arrow/arrow_io_register.h"
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing,
13+
* software distributed under the License is distributed on an
14+
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15+
* KIND, either express or implied. See the License for the
16+
* specific language governing permissions and limitations
17+
* under the License.
18+
*/
19+
20+
#include "iceberg/arrow/arrow_register.h"
1921

2022
#include <mutex>
2123
#include <string>
24+
#include <unordered_map>
2225

2326
#include "iceberg/arrow/arrow_io_util.h"
2427
#include "iceberg/file_io_registry.h"
2528

29+
namespace iceberg {
30+
void RegisterArrowProjectBatch();
31+
}
32+
2633
namespace iceberg::arrow {
2734

2835
namespace {
@@ -43,8 +50,6 @@ void RegisterS3FileIO() {
4350
#endif
4451
}
4552

46-
} // namespace
47-
4853
void EnsureArrowFileIOsRegistered() {
4954
static std::once_flag flag;
5055
std::call_once(flag, []() {
@@ -58,4 +63,11 @@ void EnsureArrowFileIOsRegistered() {
5863
return true;
5964
}();
6065

66+
} // namespace
67+
68+
void RegisterAll() {
69+
EnsureArrowFileIOsRegistered();
70+
::iceberg::RegisterArrowProjectBatch();
71+
}
72+
6173
} // namespace iceberg::arrow
Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -19,16 +19,16 @@
1919

2020
#pragma once
2121

22-
/// \file iceberg/arrow/arrow_io_register.h
23-
/// \brief Provide functions to register Arrow FileIO implementations.
22+
/// \file iceberg/arrow/arrow_register.h
23+
/// \brief Provide functions to register Arrow bundle integrations.
2424

2525
#include "iceberg/iceberg_bundle_export.h"
2626

2727
namespace iceberg::arrow {
2828

29-
/// \brief Register built-in Arrow FileIO implementations into the FileIORegistry.
29+
/// \brief Register Arrow FileIOs and Arrow-backed C Data utilities.
3030
///
3131
/// This operation is idempotent and safe to call multiple times.
32-
ICEBERG_BUNDLE_EXPORT void EnsureArrowFileIOsRegistered();
32+
ICEBERG_BUNDLE_EXPORT void RegisterAll();
3333

3434
} // namespace iceberg::arrow

0 commit comments

Comments
 (0)