Skip to content

Commit ea4e7ee

Browse files
kaka11chenhubgeter
authored andcommitted
[refactoring](multi-catalog)data_lake_reader_refactoring.
1 parent 0f7a1c3 commit ea4e7ee

File tree

89 files changed

+5719
-4524
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

89 files changed

+5719
-4524
lines changed

be/src/exec/scan/file_scanner.cpp

Lines changed: 543 additions & 500 deletions
Large diffs are not rendered by default.

be/src/exec/scan/file_scanner.h

Lines changed: 35 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -90,7 +90,9 @@ class FileScanner : public Scanner {
9090
: Scanner(state, profile),
9191
_params(params),
9292
_col_name_to_slot_id(colname_to_slot_id),
93-
_real_tuple_desc(tuple_desc) {};
93+
_real_tuple_desc(tuple_desc) {
94+
_configure_file_scan_handlers();
95+
};
9496

9597
Status read_lines_from_range(const TFileRangeDesc& range, const std::list<int64_t>& row_ids,
9698
Block* result_block, const ExternalFileMappingInfo& external_info,
@@ -107,6 +109,9 @@ class FileScanner : public Scanner {
107109

108110
Status _get_next_reader();
109111

112+
// Build a ReaderInitContext with shared fields from FileScanner members.
113+
void _fill_base_init_context(ReaderInitContext* ctx);
114+
110115
// TODO: cast input block columns type to string.
111116
Status _cast_src_block(Block* block) { return Status::OK(); }
112117

@@ -128,10 +133,10 @@ class FileScanner : public Scanner {
128133
std::vector<SlotDescriptor*> _file_slot_descs;
129134
// col names from _file_slot_descs
130135
std::vector<std::string> _file_col_names;
136+
// Unified column descriptors for init_reader (includes file, partition, missing, synthesized cols)
137+
std::vector<ColumnDescriptor> _column_descs;
131138

132-
// Partition source slot descriptors
133-
std::vector<SlotDescriptor*> _partition_slot_descs;
134-
// Partition slot id to index in _partition_slot_descs
139+
// Partition slot id to partition key index (for matching columns_from_path)
135140
std::unordered_map<SlotId, int> _partition_slot_index_map;
136141
// created from param.expr_of_dest_slot
137142
// For query, it saves default value expr of all dest columns, or nullptr for NULL.
@@ -152,8 +157,6 @@ class FileScanner : public Scanner {
152157
// Get from GenericReader, save the existing columns in file to their type.
153158
std::unordered_map<std::string, DataTypePtr> _slot_lower_name_to_col_type;
154159
// Get from GenericReader, save columns that required by scan but not exist in file.
155-
// These columns will be filled by default value or null.
156-
std::unordered_set<std::string> _missing_cols;
157160

158161
// The col lowercase name of source file to type of source file.
159162
std::map<std::string, DataTypePtr> _source_file_col_name_types;
@@ -192,7 +195,6 @@ class FileScanner : public Scanner {
192195
std::unordered_map<std::string, std::tuple<std::string, const SlotDescriptor*>>
193196
_partition_col_descs;
194197
std::unordered_map<std::string, bool> _partition_value_is_null;
195-
std::unordered_map<std::string, VExprContextSPtr> _missing_col_descs;
196198

197199
// idx of skip_bitmap_col in _input_tuple_desc
198200
int32_t _skip_bitmap_col_idx {-1};
@@ -230,34 +232,41 @@ class FileScanner : public Scanner {
230232
// otherwise, point to _output_tuple_desc
231233
const TupleDescriptor* _real_tuple_desc = nullptr;
232234

233-
std::pair<std::shared_ptr<RowIdColumnIteratorV2>, int> _row_id_column_iterator_pair = {nullptr,
234-
-1};
235-
bool _need_iceberg_rowid_column = false;
236-
int _iceberg_rowid_column_pos = -1;
237-
// for iceberg row lineage
238-
RowLineageColumns _row_lineage_columns;
239235
int64_t _last_bytes_read_from_local = 0;
240236
int64_t _last_bytes_read_from_remote = 0;
241237

238+
Status (FileScanner::*_init_src_block_handler)(Block* block) = nullptr;
239+
Status (FileScanner::*_process_src_block_after_read_handler)(Block* block) = nullptr;
240+
bool (FileScanner::*_should_push_down_predicates_handler)(
241+
TFileFormatType::type format_type) const = nullptr;
242+
bool (FileScanner::*_should_enable_condition_cache_handler)() const = nullptr;
243+
242244
// Condition cache for external tables
243245
uint64_t _condition_cache_digest = 0;
244246
segment_v2::ConditionCache::ExternalCacheKey _condition_cache_key;
245247
std::shared_ptr<std::vector<bool>> _condition_cache;
246248
std::shared_ptr<ConditionCacheContext> _condition_cache_ctx;
247249
int64_t _condition_cache_hit_count = 0;
248250

251+
void _configure_file_scan_handlers();
252+
249253
Status _init_expr_ctxes();
250254
Status _init_src_block(Block* block);
251-
Status _check_output_block_types();
252-
Status _cast_to_input_block(Block* block);
255+
Status _init_src_block_for_load(Block* block);
256+
Status _init_src_block_for_query(Block* block);
257+
Status _process_src_block_after_read(Block* block);
258+
Status _process_src_block_after_read_for_load(Block* block);
259+
Status _process_src_block_after_read_for_query(Block* block);
253260
Status _fill_columns_from_path(size_t rows);
254261
Status _fill_missing_columns(size_t rows);
262+
Status _check_output_block_types();
263+
Status _cast_to_input_block(Block* block);
255264
Status _pre_filter_src_block();
256265
Status _convert_to_output_block(Block* block);
257266
Status _truncate_char_or_varchar_columns(Block* block);
258267
void _truncate_char_or_varchar_column(Block* block, int idx, int len);
259268
Status _generate_partition_columns();
260-
Status _generate_missing_columns();
269+
261270
bool _check_partition_prune_expr(const VExprSPtr& expr);
262271
void _init_runtime_filter_partition_prune_ctxs();
263272
void _init_runtime_filter_partition_prune_block();
@@ -267,11 +276,11 @@ class FileScanner : public Scanner {
267276
void _get_slot_ids(VExpr* expr, std::vector<int>* slot_ids);
268277
Status _generate_truncate_columns(bool need_to_get_parsed_schema);
269278
Status _set_fill_or_truncate_columns(bool need_to_get_parsed_schema);
270-
Status _init_orc_reader(std::unique_ptr<OrcReader>&& orc_reader,
271-
FileMetaCache* file_meta_cache_ptr);
272-
Status _init_parquet_reader(std::unique_ptr<ParquetReader>&& parquet_reader,
273-
FileMetaCache* file_meta_cache_ptr);
274-
Status _create_row_id_column_iterator();
279+
Status _init_orc_reader(FileMetaCache* file_meta_cache_ptr,
280+
std::unique_ptr<OrcReader> orc_reader = nullptr);
281+
Status _init_parquet_reader(FileMetaCache* file_meta_cache_ptr,
282+
std::unique_ptr<ParquetReader> parquet_reader = nullptr);
283+
std::shared_ptr<segment_v2::RowIdColumnIteratorV2> _create_row_id_column_iterator();
275284

276285
TFileFormatType::type _get_current_format_type() {
277286
// for compatibility, if format_type is not set in range, use the format type of params
@@ -291,6 +300,11 @@ class FileScanner : public Scanner {
291300
}
292301

293302
bool _should_enable_condition_cache();
303+
bool _should_enable_condition_cache_for_load() const;
304+
bool _should_enable_condition_cache_for_query() const;
305+
bool _should_push_down_predicates(TFileFormatType::type format_type) const;
306+
bool _should_push_down_predicates_for_load(TFileFormatType::type format_type) const;
307+
bool _should_push_down_predicates_for_query(TFileFormatType::type format_type) const;
294308
void _init_reader_condition_cache();
295309
void _finalize_reader_condition_cache();
296310

be/src/exec/sort/heap_sorter.cpp

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -37,6 +37,8 @@ HeapSorter::HeapSorter(VSortExecExprs& vsort_exec_exprs, RuntimeState* state, in
3737

3838
Status HeapSorter::append_block(Block* block) {
3939
auto tmp_block = std::make_shared<Block>(block->clone_empty());
40+
// std::cout <<"block=>\n" << block->dump_data() << std::endl;
41+
// std::cout<< "tmp_block=>\n" << tmp_block->dump_data() << std::endl;
4042
if (!_have_runtime_predicate && _queue.is_valid() && _queue_row_num >= _heap_size) {
4143
RETURN_IF_ERROR(_prepare_sort_columns(*block, *tmp_block, false));
4244
if (_materialize_sort_exprs) {

be/src/format/arrow/arrow_stream_reader.cpp

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -65,7 +65,7 @@ Status ArrowStreamReader::init_reader() {
6565
return Status::OK();
6666
}
6767

68-
Status ArrowStreamReader::get_next_block(Block* block, size_t* read_rows, bool* eof) {
68+
Status ArrowStreamReader::_do_get_next_block(Block* block, size_t* read_rows, bool* eof) {
6969
bool has_next = false;
7070
RETURN_IF_ERROR(_pip_stream->HasNext(&has_next));
7171
if (!has_next) {
@@ -126,8 +126,8 @@ Status ArrowStreamReader::get_next_block(Block* block, size_t* read_rows, bool*
126126
return Status::OK();
127127
}
128128

129-
Status ArrowStreamReader::get_columns(std::unordered_map<std::string, DataTypePtr>* name_to_type,
130-
std::unordered_set<std::string>* missing_cols) {
129+
Status ArrowStreamReader::_get_columns_impl(
130+
std::unordered_map<std::string, DataTypePtr>* name_to_type) {
131131
for (const auto& slot : _file_slot_descs) {
132132
name_to_type->emplace(slot->col_name(), slot->type());
133133
}

be/src/format/arrow/arrow_stream_reader.h

Lines changed: 5 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -55,10 +55,12 @@ class ArrowStreamReader : public GenericReader {
5555

5656
Status init_reader();
5757

58-
Status get_next_block(Block* block, size_t* read_rows, bool* eof) override;
58+
Status _do_get_next_block(Block* block, size_t* read_rows, bool* eof) override;
5959

60-
Status get_columns(std::unordered_map<std::string, DataTypePtr>* name_to_type,
61-
std::unordered_set<std::string>* missing_cols) override;
60+
Status _get_columns_impl(std::unordered_map<std::string, DataTypePtr>* name_to_type) override;
61+
62+
protected:
63+
Status _do_init_reader(ReaderInitContext* /*ctx*/) override { return init_reader(); }
6264

6365
private:
6466
RuntimeState* _state;

be/src/format/column_descriptor.h

Lines changed: 56 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,56 @@
1+
// Licensed to the Apache Software Foundation (ASF) under one
2+
// or more contributor license agreements. See the NOTICE file
3+
// distributed with this work for additional information
4+
// regarding copyright ownership. The ASF licenses this file
5+
// to you under the Apache License, Version 2.0 (the
6+
// "License"); you may not use this file except in compliance
7+
// with the License. You may obtain a copy of the License at
8+
//
9+
// http://www.apache.org/licenses/LICENSE-2.0
10+
//
11+
// Unless required by applicable law or agreed to in writing,
12+
// software distributed under the License is distributed on an
13+
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14+
// KIND, either express or implied. See the License for the
15+
// specific language governing permissions and limitations
16+
// under the License.
17+
18+
#pragma once
19+
20+
#include <string>
21+
22+
#include "exprs/vexpr_fwd.h"
23+
24+
namespace doris {
25+
class SlotDescriptor;
26+
27+
/// Column categories for table format reading.
28+
///
29+
/// Each column requested by the query is classified into one of these categories.
30+
/// The category determines how the column's value is obtained:
31+
/// - REGULAR: Read directly from the data file (Parquet/ORC).
32+
/// If the column is absent from a file (schema evolution),
33+
/// its default_expr is used to produce a default value.
34+
/// - PARTITION_KEY: Filled from partition metadata (e.g. Hive path partitions).
35+
/// - SYNTHESIZED: Never in the data file; fully computed at runtime
36+
/// (e.g. Doris V2 __DORIS_ICEBERG_ROWID_COL__).
37+
/// - GENERATED: May or may not exist in the data file. If present but null,
38+
/// the value is backfilled at runtime (e.g. Iceberg V3 _row_id).
39+
enum class ColumnCategory {
40+
REGULAR,
41+
PARTITION_KEY,
42+
SYNTHESIZED,
43+
GENERATED,
44+
};
45+
46+
/// Describes a column requested by the query, along with its category.
47+
struct ColumnDescriptor {
48+
std::string name;
49+
const SlotDescriptor* slot_desc = nullptr;
50+
ColumnCategory category = ColumnCategory::REGULAR;
51+
/// Default value expression when this column is missing from the data file.
52+
/// nullptr means fill with NULL. Built once per table scan in FileScanner.
53+
VExprContextSPtr default_expr;
54+
};
55+
56+
} // namespace doris

be/src/format/count_reader.h

Lines changed: 106 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,106 @@
1+
// Licensed to the Apache Software Foundation (ASF) under one
2+
// or more contributor license agreements. See the NOTICE file
3+
// distributed with this work for additional information
4+
// regarding copyright ownership. The ASF licenses this file
5+
// to you under the Apache License, Version 2.0 (the
6+
// "License"); you may not use this file except in compliance
7+
// with the License. You may obtain a copy of the License at
8+
//
9+
// http://www.apache.org/licenses/LICENSE-2.0
10+
//
11+
// Unless required by applicable law or agreed to in writing,
12+
// software distributed under the License is distributed on an
13+
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14+
// KIND, either express or implied. See the License for the
15+
// specific language governing permissions and limitations
16+
// under the License.
17+
18+
#pragma once
19+
20+
#include <cstddef>
21+
#include <cstdint>
22+
#include <memory>
23+
24+
#include "common/status.h"
25+
#include "core/block/block.h"
26+
#include "format/generic_reader.h"
27+
28+
namespace doris {
29+
#include "common/compile_check_begin.h"
30+
31+
/// A lightweight reader that emits row counts without reading any actual data.
32+
/// Used as a decorator to replace the real reader when COUNT(*) push down is active.
33+
///
34+
/// Instead of duplicating the COUNT short-circuit logic in every format reader
35+
/// (ORC, Parquet, etc.), FileScanner creates a CountReader after the real reader
36+
/// is initialized and the total row count is known. The CountReader then serves
37+
/// all subsequent get_next_block calls by simply resizing columns.
38+
///
39+
/// This cleanly separates the "how many rows" concern from the actual data reading,
40+
/// eliminating duplicated COUNT blocks across format readers.
41+
class CountReader : public GenericReader {
42+
public:
43+
/// @param total_rows Total number of rows to emit (post-filter).
44+
/// @param batch_size Maximum rows per batch.
45+
/// @param inner_reader The original reader, kept alive for profile collection
46+
/// and lifecycle management. Ownership is transferred.
47+
CountReader(int64_t total_rows, size_t batch_size,
48+
std::unique_ptr<GenericReader> inner_reader = nullptr)
49+
: _remaining_rows(total_rows),
50+
_batch_size(batch_size),
51+
_inner_reader(std::move(inner_reader)) {
52+
set_push_down_agg_type(TPushAggOp::type::COUNT);
53+
}
54+
55+
~CountReader() override = default;
56+
57+
Status _do_get_next_block(Block* block, size_t* read_rows, bool* eof) override {
58+
auto rows = std::min(_remaining_rows, static_cast<int64_t>(_batch_size));
59+
_remaining_rows -= rows;
60+
61+
auto mutate_columns = block->mutate_columns();
62+
for (auto& col : mutate_columns) {
63+
col->resize(rows);
64+
}
65+
block->set_columns(std::move(mutate_columns));
66+
67+
*read_rows = rows;
68+
*eof = (_remaining_rows == 0);
69+
return Status::OK();
70+
}
71+
72+
/// CountReader counts rows by definition.
73+
bool count_read_rows() override { return true; }
74+
75+
/// Delegate to inner reader if available, otherwise return our total.
76+
int64_t get_total_rows() const override {
77+
return _inner_reader ? _inner_reader->get_total_rows() : _initial_total_rows();
78+
}
79+
80+
Status close() override {
81+
if (_inner_reader) {
82+
return _inner_reader->close();
83+
}
84+
return Status::OK();
85+
}
86+
87+
/// Access the inner reader for profile collection or other lifecycle needs.
88+
GenericReader* inner_reader() const { return _inner_reader.get(); }
89+
90+
protected:
91+
void _collect_profile_before_close() override {
92+
if (_inner_reader) {
93+
_inner_reader->collect_profile_before_close();
94+
}
95+
}
96+
97+
private:
98+
int64_t _initial_total_rows() const { return _remaining_rows; }
99+
100+
int64_t _remaining_rows;
101+
size_t _batch_size;
102+
std::unique_ptr<GenericReader> _inner_reader;
103+
};
104+
105+
#include "common/compile_check_end.h"
106+
} // namespace doris

0 commit comments

Comments
 (0)