Skip to content

Commit af16607

Browse files
committed
[refactor](be) Clean up new parquet page skip filter
### What problem does this PR solve? Issue Number: close #xxx Related PR: #64214 Problem Summary: The new parquet data page filter setup was implemented as an inline closure inside RecordReader creation, which made the page ordinal tracking, profile updates and page skip plan lookup hard to read. This refactors the filter into a small DataPageSkipFilter helper, centralizes page skip plan lookup/filter installation, and documents the important page-index invariants around data-page ordinals, non-repeated leaves, and double-skip accounting. The remaining touched reader files are formatting-only changes from the project clang-format script. ### Release note None ### Check List (For Author) - Test: Unit Test - Pending: NewParquetReaderTest.* - Behavior changed: No - Does this need documentation: No
1 parent 353fcbd commit af16607

10 files changed

Lines changed: 109 additions & 75 deletions

File tree

be/src/format/new_parquet/parquet_statistics.cpp

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1032,6 +1032,8 @@ bool build_page_skip_plan_for_leaf(
10321032
int64_t row_group_rows, ParquetPageSkipPlan* page_skip_plan) {
10331033
DORIS_CHECK(page_skip_plan != nullptr);
10341034
*page_skip_plan = ParquetPageSkipPlan {};
1035+
// OffsetIndex first_row_index is row-based only for non-repeated leaves. LIST/MAP/repeated
1036+
// leaves need repetition-level-aware range mapping and are intentionally left out for now.
10351037
if (column_schema.kind != ParquetColumnSchemaKind::PRIMITIVE ||
10361038
column_schema.descriptor == nullptr || column_schema.leaf_column_id < 0 ||
10371039
column_schema.descriptor->max_repetition_level() != 0) {

be/src/format/new_parquet/reader/column_reader.cpp

Lines changed: 61 additions & 30 deletions
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,7 @@
2525
#include <cstddef>
2626
#include <cstdint>
2727
#include <exception>
28+
#include <map>
2829
#include <memory>
2930
#include <string>
3031
#include <utility>
@@ -47,6 +48,63 @@
4748
namespace doris::parquet {
4849
namespace {
4950

51+
class DataPageSkipFilter {
52+
public:
53+
DataPageSkipFilter(const ParquetPageSkipPlan* page_skip_plan,
54+
ParquetPageSkipProfile page_skip_profile)
55+
: _page_skip_plan(page_skip_plan), _page_skip_profile(page_skip_profile) {
56+
DORIS_CHECK(_page_skip_plan != nullptr);
57+
}
58+
59+
bool operator()(const ::parquet::DataPageStats&) {
60+
// Arrow invokes this callback once for each DATA_PAGE/DATA_PAGE_V2 and never for
61+
// dictionary pages, so this ordinal matches Parquet OffsetIndex page locations.
62+
const size_t page_idx = _next_data_page_idx++;
63+
const bool skip = _page_skip_plan->should_skip_page(page_idx);
64+
if (!skip) {
65+
return false;
66+
}
67+
update_skip_profile(page_idx);
68+
return true;
69+
}
70+
71+
private:
72+
void update_skip_profile(size_t page_idx) const {
73+
if (_page_skip_profile.skipped_pages != nullptr) {
74+
COUNTER_UPDATE(_page_skip_profile.skipped_pages, 1);
75+
}
76+
if (_page_skip_profile.skipped_bytes != nullptr) {
77+
COUNTER_UPDATE(_page_skip_profile.skipped_bytes,
78+
_page_skip_plan->skipped_page_compressed_size(page_idx));
79+
}
80+
}
81+
82+
const ParquetPageSkipPlan* _page_skip_plan = nullptr;
83+
ParquetPageSkipProfile _page_skip_profile;
84+
size_t _next_data_page_idx = 0;
85+
};
86+
87+
const ParquetPageSkipPlan* find_page_skip_plan(
88+
const std::map<int, ParquetPageSkipPlan>* page_skip_plans, int leaf_column_id) {
89+
if (page_skip_plans == nullptr) {
90+
return nullptr;
91+
}
92+
const auto plan_it = page_skip_plans->find(leaf_column_id);
93+
return plan_it == page_skip_plans->end() ? nullptr : &plan_it->second;
94+
}
95+
96+
void install_data_page_filter(std::unique_ptr<::parquet::PageReader>& page_reader,
97+
const std::map<int, ParquetPageSkipPlan>* page_skip_plans,
98+
int leaf_column_id, ParquetPageSkipProfile page_skip_profile) {
99+
DORIS_CHECK(page_reader != nullptr);
100+
const ParquetPageSkipPlan* page_skip_plan =
101+
find_page_skip_plan(page_skip_plans, leaf_column_id);
102+
if (page_skip_plan == nullptr) {
103+
return;
104+
}
105+
page_reader->set_data_page_filter(DataPageSkipFilter(page_skip_plan, page_skip_profile));
106+
}
107+
50108
bool supports_nested_scalar_record_reader(const ParquetColumnSchema& column_schema) {
51109
if (supports_record_reader(column_schema.type_descriptor)) {
52110
return true;
@@ -156,13 +214,7 @@ Status ParquetColumnReaderFactory::create_scalar_reader(
156214
if (reader == nullptr) {
157215
return Status::InvalidArgument("reader is null");
158216
}
159-
const ParquetPageSkipPlan* page_skip_plan = nullptr;
160-
if (_page_skip_plans != nullptr) {
161-
auto plan_it = _page_skip_plans->find(column_schema.leaf_column_id);
162-
if (plan_it != _page_skip_plans->end()) {
163-
page_skip_plan = &plan_it->second;
164-
}
165-
}
217+
const auto* page_skip_plan = find_page_skip_plan(_page_skip_plans, column_schema.leaf_column_id);
166218
*reader = std::make_unique<ScalarColumnReader>(column_schema, std::move(record_reader),
167219
page_skip_plan, _column_reader_profile);
168220
return Status::OK();
@@ -246,29 +298,8 @@ Status ParquetColumnReaderFactory::get_record_reader(
246298
if (_record_readers[leaf_column_id] == nullptr) {
247299
try {
248300
auto page_reader = _row_group->GetColumnPageReader(leaf_column_id);
249-
if (_page_skip_plans != nullptr) {
250-
auto plan_it = _page_skip_plans->find(leaf_column_id);
251-
if (plan_it != _page_skip_plans->end()) {
252-
const ParquetPageSkipPlan* page_skip_plan = &plan_it->second;
253-
page_reader->set_data_page_filter(
254-
[page_skip_plan, page_skip_profile = _page_skip_profile,
255-
page_idx = size_t {0}](const ::parquet::DataPageStats&) mutable {
256-
const bool skip = page_skip_plan->should_skip_page(page_idx);
257-
if (skip) {
258-
if (page_skip_profile.skipped_pages != nullptr) {
259-
COUNTER_UPDATE(page_skip_profile.skipped_pages, 1);
260-
}
261-
if (page_skip_profile.skipped_bytes != nullptr) {
262-
COUNTER_UPDATE(page_skip_profile.skipped_bytes,
263-
page_skip_plan->skipped_page_compressed_size(
264-
page_idx));
265-
}
266-
}
267-
++page_idx;
268-
return skip;
269-
});
270-
}
271-
}
301+
install_data_page_filter(page_reader, _page_skip_plans, leaf_column_id,
302+
_page_skip_profile);
272303
const auto level_info = ::parquet::internal::LevelInfo::ComputeLevelInfo(descriptor);
273304
_record_readers[leaf_column_id] = ::parquet::internal::RecordReader::Make(
274305
descriptor, level_info, ::arrow::default_memory_pool(),

be/src/format/new_parquet/reader/scalar_column_reader.cpp

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -113,6 +113,9 @@ int64_t ScalarColumnReader::page_filtered_rows_to_skip(int64_t rows) const {
113113
const int64_t start = std::max(range.start, _row_group_rows_read);
114114
const int64_t end = std::min(range_end, skip_end);
115115
if (start < end) {
116+
// Scheduler gap skips are derived from page-index selected_ranges. A page-filtered
117+
// range can only overlap such a gap when the whole data page is outside every selected
118+
// range, so partial overlap would mean the planner and scheduler are out of sync.
116119
DORIS_CHECK(start == range.start);
117120
DORIS_CHECK(end == range_end);
118121
filtered_rows += end - start;

be/src/format/new_parquet/selection_vector.h

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -33,8 +33,12 @@ struct RowRange {
3333

3434
struct ParquetPageSkipPlan {
3535
int leaf_column_id = -1;
36+
// Page ordinal is the data-page ordinal in the column chunk. It intentionally excludes
37+
// dictionary pages, matching Arrow PageReader::set_data_page_filter().
3638
std::vector<uint8_t> skipped_pages;
3739
std::vector<int64_t> skipped_page_compressed_sizes;
40+
// Row ranges covered by skipped data pages. ScalarColumnReader uses these ranges to avoid
41+
// calling RecordReader::SkipRecords() again for pages already skipped by Arrow.
3842
std::vector<RowRange> skipped_ranges;
3943

4044
bool empty() const { return skipped_ranges.empty(); }

be/src/format/reader/column_mapper.cpp

Lines changed: 18 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -34,9 +34,9 @@
3434
#include "exprs/create_predicate_function.h"
3535
#include "exprs/vcompound_pred.h"
3636
#include "exprs/vdirect_in_predicate.h"
37+
#include "exprs/vectorized_fn_call.h"
3738
#include "exprs/vexpr_context.h"
3839
#include "exprs/vin_predicate.h"
39-
#include "exprs/vectorized_fn_call.h"
4040
#include "format/reader/expr/cast.h"
4141
#include "format/reader/expr/literal.h"
4242
#include "format/reader/expr/slot_ref.h"
@@ -310,8 +310,8 @@ std::string TableColumnMapperOptions::debug_string() const {
310310

311311
std::string TableColumnMapper::debug_string(const ColumnDefinition& column) {
312312
std::ostringstream out;
313-
out << "ColumnDefinition{name=" << column.name << ", identifier="
314-
<< field_debug_string(column.identifier)
313+
out << "ColumnDefinition{name=" << column.name
314+
<< ", identifier=" << field_debug_string(column.identifier)
315315
<< ", local_id=" << column.local_id << ", type=" << data_type_debug_string(column.type)
316316
<< ", children="
317317
<< join_debug_strings(column.children,
@@ -519,10 +519,10 @@ Status clone_table_expr_tree(const VExprSPtr& expr, VExprSPtr* cloned_expr) {
519519

520520
VExprSPtr cloned;
521521
if (const auto* table_slot_ref = dynamic_cast<const TableSlotRef*>(expr.get())) {
522-
cloned = TableSlotRef::create_shared(
523-
table_slot_ref->slot_id(), table_slot_ref->column_id(),
524-
table_slot_ref->column_uniq_id(), table_slot_ref->data_type(),
525-
table_slot_ref->column_name());
522+
cloned = TableSlotRef::create_shared(table_slot_ref->slot_id(), table_slot_ref->column_id(),
523+
table_slot_ref->column_uniq_id(),
524+
table_slot_ref->data_type(),
525+
table_slot_ref->column_name());
526526
} else if (const auto* vslot_ref = dynamic_cast<const VSlotRef*>(expr.get())) {
527527
cloned = TableSlotRef::create_shared(vslot_ref->slot_id(), vslot_ref->column_id(),
528528
vslot_ref->column_uniq_id(), vslot_ref->data_type(),
@@ -1258,8 +1258,8 @@ static bool rewrite_binary_slot_literal_predicate(
12581258
return false;
12591259
}
12601260

1261-
auto rewritten_literal = rewrite_literal_to_file_type(literal_expr, *rewrite_info,
1262-
rewrite_context);
1261+
auto rewritten_literal =
1262+
rewrite_literal_to_file_type(literal_expr, *rewrite_info, rewrite_context);
12631263
if (rewritten_literal == nullptr) {
12641264
children[literal_child_idx] = original_table_literal(literal_expr, rewrite_context);
12651265
expr->set_children(std::move(children));
@@ -1295,15 +1295,15 @@ static bool rewrite_in_slot_literal_predicate(
12951295
if (literal_expr == nullptr) {
12961296
return false;
12971297
}
1298-
auto rewritten_literal = rewrite_literal_to_file_type(literal_expr, *rewrite_info,
1299-
rewrite_context);
1298+
auto rewritten_literal =
1299+
rewrite_literal_to_file_type(literal_expr, *rewrite_info, rewrite_context);
13001300
if (rewritten_literal == nullptr) {
13011301
for (size_t restore_idx = 1; restore_idx < children.size(); ++restore_idx) {
13021302
auto restore_literal = unwrap_literal_for_file_cast(children[restore_idx],
13031303
rewrite_info->table_type);
13041304
if (restore_literal != nullptr) {
1305-
children[restore_idx] = original_table_literal(restore_literal,
1306-
rewrite_context);
1305+
children[restore_idx] =
1306+
original_table_literal(restore_literal, rewrite_context);
13071307
}
13081308
}
13091309
expr->set_children(std::move(children));
@@ -1344,8 +1344,7 @@ static VExprSPtr rewrite_table_expr_to_file_expr(
13441344
// struct_element must see the actual file struct layout. Casting the parent struct
13451345
// to the output projection can hide filter-only children such as `s.id` in
13461346
// `SELECT s.name WHERE s.id > 5`.
1347-
children[0] = create_file_slot_ref(*slot_ref, rewrite_it->second,
1348-
rewrite_context);
1347+
children[0] = create_file_slot_ref(*slot_ref, rewrite_it->second, rewrite_context);
13491348
expr->set_children(std::move(children));
13501349
return expr;
13511350
}
@@ -1383,8 +1382,8 @@ static VExprSPtr rewrite_table_expr_to_file_expr(
13831382
global_to_file_slot.find(GlobalIndex(cast_set<size_t>(slot_ref->slot_id())));
13841383
if (rewrite_it != global_to_file_slot.end() &&
13851384
expr->data_type()->equals(*rewrite_it->second.table_type)) {
1386-
auto rewritten_child = create_file_slot_ref(*slot_ref, rewrite_it->second,
1387-
rewrite_context);
1385+
auto rewritten_child =
1386+
create_file_slot_ref(*slot_ref, rewrite_it->second, rewrite_context);
13881387
if (rewrite_it->second.file_type->equals(*rewrite_it->second.table_type)) {
13891388
return rewritten_child;
13901389
}
@@ -1973,9 +1972,8 @@ Status TableColumnMapper::localize_filters(const std::vector<TableFilter>& table
19731972
if (!clone_status.ok()) {
19741973
continue;
19751974
}
1976-
auto localized_root =
1977-
rewrite_table_expr_to_file_expr(rewrite_root, global_to_file_slot,
1978-
&rewrite_context);
1975+
auto localized_root = rewrite_table_expr_to_file_expr(rewrite_root, global_to_file_slot,
1976+
&rewrite_context);
19791977
auto localized_conjunct = VExprContext::create_shared(std::move(localized_root));
19801978
RETURN_IF_ERROR(rewrite_context.prepare_created_exprs(localized_conjunct.get()));
19811979
file_request->conjuncts.push_back(std::move(localized_conjunct));

be/src/format/reader/column_mapper.h

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -66,7 +66,7 @@ enum TableVirtualColumnType {
6666

6767
enum class FilterConversionType {
6868
COPY_DIRECTLY, // filter can be copied directly from file layer without any change, e.g. column type and table type are the same and no complex nested projection is involved.
69-
CAST_FILTER, // filter can be converted from file layer by adding a cast, e.g. column type is nullable but table type is not, or file column has a trivial nested projection but table column has a complex nested projection.
69+
CAST_FILTER, // filter can be converted from file layer by adding a cast, e.g. column type is nullable but table type is not, or file column has a trivial nested projection but table column has a complex nested projection.
7070
READER_EXPRESSION,
7171
FINALIZE_ONLY, // filter cannot be converted to file layer and should be evaluated at table reader finalize phase, e.g. a child column of a nested column is null in file schema.
7272
CONSTANT,

be/src/format/reader/table/paimon_reader.cpp

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -121,8 +121,8 @@ Status PaimonReader::prepare_split(const reader::SplitReadOptions& options) {
121121
}
122122

123123
reader::TableColumnMappingMode PaimonReader::mapping_mode() const {
124-
if (_split_schema_id < 0 || _scan_params == nullptr || !_scan_params->__isset.current_schema_id ||
125-
!_scan_params->__isset.history_schema_info) {
124+
if (_split_schema_id < 0 || _scan_params == nullptr ||
125+
!_scan_params->__isset.current_schema_id || !_scan_params->__isset.history_schema_info) {
126126
return reader::TableColumnMappingMode::BY_NAME;
127127
}
128128
return find_schema(_scan_params, _split_schema_id) == nullptr

be/src/format/reader/table_reader.cpp

Lines changed: 6 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -119,8 +119,8 @@ std::string expr_context_debug_string(const VExprContextSPtr& context) {
119119
return "VExprContext{root=null}";
120120
}
121121
std::ostringstream out;
122-
out << "VExprContext{root_name=" << root->expr_name()
123-
<< ", root_debug=" << root->debug_string() << "}";
122+
out << "VExprContext{root_name=" << root->expr_name() << ", root_debug=" << root->debug_string()
123+
<< "}";
124124
return out.str();
125125
}
126126

@@ -285,11 +285,10 @@ std::string TableReader::debug_string() const {
285285
<< ", table_column_predicates="
286286
<< table_column_predicates_debug_string(_table_column_predicates)
287287
<< ", conjunct_count=" << _conjuncts.size() << ", conjuncts="
288-
<< join_table_reader_debug_strings(
289-
_conjuncts,
290-
[](const VExprContextSPtr& conjunct) {
291-
return expr_context_debug_string(conjunct);
292-
})
288+
<< join_table_reader_debug_strings(_conjuncts,
289+
[](const VExprContextSPtr& conjunct) {
290+
return expr_context_debug_string(conjunct);
291+
})
293292
<< ", file_schema="
294293
<< join_table_reader_debug_strings(_data_reader.file_schema,
295294
[](const ColumnDefinition& field) {

be/src/format/reader/table_reader.h

Lines changed: 1 addition & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -215,9 +215,7 @@ class TableReader {
215215
// 切换到下一个 reader 的通用流程。
216216
// 该方法先关闭当前 reader,再打开下一个具体 reader;子类不应重复实现这个循环。
217217
Status create_next_reader(bool* eos);
218-
virtual TableColumnMappingMode mapping_mode() const {
219-
return TableColumnMappingMode::BY_NAME;
220-
}
218+
virtual TableColumnMappingMode mapping_mode() const { return TableColumnMappingMode::BY_NAME; }
221219
virtual Status annotate_file_schema(std::vector<ColumnDefinition>* file_schema) {
222220
DORIS_CHECK(file_schema != nullptr);
223221
return Status::OK();

be/test/format/reader/expr/cast_test.cpp

Lines changed: 11 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -681,8 +681,8 @@ TEST_F(CastTest, ColumnMapperDoesNotLeakRewrittenInPredicateLiteralAcrossSplits)
681681
ASSERT_TRUE(bigint_mapper.create_mapping(projected_columns, {}, {bigint_file_field}).ok());
682682
reader::FileScanRequest bigint_request;
683683
ASSERT_TRUE(bigint_mapper
684-
.create_scan_request({table_filter}, {}, projected_columns,
685-
&bigint_request, &state)
684+
.create_scan_request({table_filter}, {}, projected_columns, &bigint_request,
685+
&state)
686686
.ok());
687687
ASSERT_EQ(bigint_request.conjuncts.size(), 1);
688688
const auto& bigint_localized_expr = bigint_request.conjuncts[0]->root();
@@ -781,8 +781,8 @@ TEST_F(CastTest, ColumnMapperDoesNotLeakRewrittenLiteralAcrossSplits) {
781781
ASSERT_TRUE(bigint_mapper.create_mapping(projected_columns, {}, {bigint_file_field}).ok());
782782
reader::FileScanRequest bigint_request;
783783
ASSERT_TRUE(bigint_mapper
784-
.create_scan_request({table_filter}, {}, projected_columns,
785-
&bigint_request, &state)
784+
.create_scan_request({table_filter}, {}, projected_columns, &bigint_request,
785+
&state)
786786
.ok());
787787
ASSERT_EQ(bigint_request.conjuncts.size(), 1);
788788
const auto& bigint_localized_expr = bigint_request.conjuncts[0]->root();
@@ -864,13 +864,12 @@ TEST_F(CastTest, ColumnMapperDoesNotNestCastFilterAcrossScanRequests) {
864864
table_filter.global_indices = {reader::GlobalIndex(0)};
865865

866866
reader::FileScanRequest first_request;
867-
ASSERT_TRUE(
868-
mapper.create_scan_request({table_filter}, {}, projected_columns, &first_request, &state)
869-
.ok());
867+
ASSERT_TRUE(mapper.create_scan_request({table_filter}, {}, projected_columns, &first_request,
868+
&state)
869+
.ok());
870870
reader::FileScanRequest second_request;
871-
ASSERT_TRUE(mapper
872-
.create_scan_request({table_filter}, {}, projected_columns,
873-
&second_request, &state)
871+
ASSERT_TRUE(mapper.create_scan_request({table_filter}, {}, projected_columns, &second_request,
872+
&state)
874873
.ok());
875874

876875
ASSERT_EQ(second_request.conjuncts.size(), 1);
@@ -923,8 +922,8 @@ TEST_F(CastTest, ColumnMapperRewritesPreviousCastFilterToMatchingSplitType) {
923922
ASSERT_TRUE(bigint_mapper.create_mapping(projected_columns, {}, {bigint_file_field}).ok());
924923
reader::FileScanRequest bigint_request;
925924
ASSERT_TRUE(bigint_mapper
926-
.create_scan_request({table_filter}, {}, projected_columns,
927-
&bigint_request, &state)
925+
.create_scan_request({table_filter}, {}, projected_columns, &bigint_request,
926+
&state)
928927
.ok());
929928

930929
const auto& bigint_localized_expr = bigint_request.conjuncts[0]->root();

0 commit comments

Comments
 (0)