Skip to content

Commit 353fcbd

Browse files
committed
[fix](be) Report physical new parquet skip metrics
### What problem does this PR solve? Issue Number: close #xxx Related PR: #64214 Problem Summary: The new parquet page skip profile mixed logical pruning information with physical reader work. Page skip counters used generic names that could be interpreted as all page-index-pruned pages, while they are updated only when Arrow's data page filter callback actually skips a page. ReaderSkipRows also counted scheduler-level logical skips, including rows already removed by page filtering, so it could overstate the actual RecordReader::SkipRecords work. This change renames the page skip counters to data-page-filter-specific names and updates ReaderSkipRows only for rows actually passed to Arrow RecordReader::SkipRecords. Parent complex readers and the synthetic row-position reader no longer add logical read/skip rows to the physical reader counters. ### Release note None ### Check List (For Author) - Test: Unit Test - Pending: NewParquetReaderTest.* - Behavior changed: No - Does this need documentation: No
1 parent 38d793d commit 353fcbd

8 files changed

Lines changed: 20 additions & 29 deletions

File tree

be/src/format/new_parquet/parquet_reader.cpp

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -219,8 +219,8 @@ Status ParquetReader::open(std::unique_ptr<reader::FileScanRequest>& request) {
219219
}
220220
_state->scan_plan = row_group_plan;
221221
_state->scheduler.set_page_skip_profile(
222-
{.skipped_pages = _parquet_profile.pages_skipped_by_filter,
223-
.skipped_bytes = _parquet_profile.page_skip_bytes});
222+
{.skipped_pages = _parquet_profile.pages_skipped_by_data_page_filter,
223+
.skipped_bytes = _parquet_profile.data_page_filter_skip_bytes});
224224
_state->scheduler.set_scan_profile({
225225
.raw_rows_read = _parquet_profile.raw_rows_read,
226226
.selected_rows = _parquet_profile.selected_rows,
@@ -364,10 +364,10 @@ void ParquetReader::_init_profile() {
364364
_profile, "FilteredRowsByGroup", TUnit::UNIT, parquet_profile, 1);
365365
_parquet_profile.filtered_page_rows = ADD_CHILD_COUNTER_WITH_LEVEL(
366366
_profile, "FilteredRowsByPage", TUnit::UNIT, parquet_profile, 1);
367-
_parquet_profile.pages_skipped_by_filter = ADD_CHILD_COUNTER_WITH_LEVEL(
368-
_profile, "PagesSkippedByFilter", TUnit::UNIT, parquet_profile, 1);
369-
_parquet_profile.page_skip_bytes = ADD_CHILD_COUNTER_WITH_LEVEL(
370-
_profile, "PageSkipBytes", TUnit::BYTES, parquet_profile, 1);
367+
_parquet_profile.pages_skipped_by_data_page_filter = ADD_CHILD_COUNTER_WITH_LEVEL(
368+
_profile, "PagesSkippedByDataPageFilter", TUnit::UNIT, parquet_profile, 1);
369+
_parquet_profile.data_page_filter_skip_bytes = ADD_CHILD_COUNTER_WITH_LEVEL(
370+
_profile, "DataPageFilterSkipBytes", TUnit::BYTES, parquet_profile, 1);
371371
_parquet_profile.selected_rows = ADD_CHILD_COUNTER_WITH_LEVEL(
372372
_profile, "SelectedRows", TUnit::UNIT, parquet_profile, 1);
373373
_parquet_profile.rows_filtered_by_conjunct = ADD_CHILD_COUNTER_WITH_LEVEL(

be/src/format/new_parquet/parquet_reader.h

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -78,8 +78,8 @@ class ParquetReader : public reader::FileReader {
7878
RuntimeProfile::Counter* selected_row_ranges = nullptr;
7979
RuntimeProfile::Counter* filtered_group_rows = nullptr;
8080
RuntimeProfile::Counter* filtered_page_rows = nullptr;
81-
RuntimeProfile::Counter* pages_skipped_by_filter = nullptr;
82-
RuntimeProfile::Counter* page_skip_bytes = nullptr;
81+
RuntimeProfile::Counter* pages_skipped_by_data_page_filter = nullptr;
82+
RuntimeProfile::Counter* data_page_filter_skip_bytes = nullptr;
8383
RuntimeProfile::Counter* selected_rows = nullptr;
8484
RuntimeProfile::Counter* rows_filtered_by_conjunct = nullptr;
8585
RuntimeProfile::Counter* total_batches = nullptr;

be/src/format/new_parquet/reader/list_column_reader.cpp

Lines changed: 0 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -345,7 +345,6 @@ Status ListColumnReader::read(int64_t rows, MutableColumnPtr& column, int64_t* r
345345
array_column->get_data_ptr() = std::move(nested_column);
346346
append_offsets(array_column->get_offsets(), entry_counts);
347347
append_parent_nulls(parent_null_map, parent_nulls);
348-
update_reader_read_rows(*rows_read);
349348
return Status::OK();
350349
}
351350

@@ -357,7 +356,6 @@ Status ListColumnReader::read(int64_t rows, MutableColumnPtr& column, int64_t* r
357356
array_column->get_data_ptr() = std::move(nested_column);
358357
append_offsets(array_column->get_offsets(), entry_counts);
359358
append_parent_nulls(parent_null_map, parent_nulls);
360-
update_reader_read_rows(*rows_read);
361359
return Status::OK();
362360
}
363361

@@ -397,7 +395,6 @@ Status ListColumnReader::read(int64_t rows, MutableColumnPtr& column, int64_t* r
397395
append_offsets(array_column->get_offsets(), entry_counts);
398396
array_column->get_data_ptr() = std::move(nested_column);
399397
append_parent_nulls(parent_null_map, parent_nulls);
400-
update_reader_read_rows(*rows_read);
401398
return Status::OK();
402399
}
403400

@@ -438,7 +435,6 @@ Status ListColumnReader::skip(int64_t rows) {
438435
return Status::Corruption("Failed to skip parquet LIST column {}: skipped {} of {} rows",
439436
_name, rows_read, rows);
440437
}
441-
update_reader_skip_rows(rows);
442438
return Status::OK();
443439
}
444440

be/src/format/new_parquet/reader/map_column_reader.cpp

Lines changed: 0 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -629,7 +629,6 @@ Status MapColumnReader::read(int64_t rows, MutableColumnPtr& column, int64_t* ro
629629
NestedScalarValueAppender {readers.scalar_value, "MAP", "value",
630630
value_max_definition_level},
631631
rows, &context, rows_read));
632-
update_reader_read_rows(*rows_read);
633632
return Status::OK();
634633
}
635634

@@ -647,7 +646,6 @@ Status MapColumnReader::read(int64_t rows, MutableColumnPtr& column, int64_t* ro
647646
_name, _type, _nullable_definition_level, _repeated_repetition_level, *readers.key,
648647
key_max_definition_level, *readers.list_value, *scalar_list_value_reader,
649648
&_key_overflow, &_value_overflow, rows, &context, rows_read));
650-
update_reader_read_rows(*rows_read);
651649
return Status::OK();
652650
}
653651

@@ -656,7 +654,6 @@ Status MapColumnReader::read(int64_t rows, MutableColumnPtr& column, int64_t* ro
656654
key_max_definition_level, *readers.struct_value, &_struct_value_overflow,
657655
&_key_overflow, NestedStructValueAppender {readers.struct_value}, rows, &context,
658656
rows_read));
659-
update_reader_read_rows(*rows_read);
660657
return Status::OK();
661658
}
662659

@@ -697,7 +694,6 @@ Status MapColumnReader::skip(int64_t rows) {
697694
return Status::Corruption("Failed to skip parquet MAP column {}: skipped {} of {} rows",
698695
_name, rows_read, rows);
699696
}
700-
update_reader_skip_rows(rows);
701697
return Status::OK();
702698
}
703699

be/src/format/new_parquet/reader/row_position_column_reader.cpp

Lines changed: 0 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -66,7 +66,6 @@ Status RowPositionColumnReader::read(int64_t rows, MutableColumnPtr& column, int
6666
}
6767
_next_row_position += rows;
6868
*rows_read = rows;
69-
update_reader_read_rows(rows);
7069
return Status::OK();
7170
}
7271

@@ -75,7 +74,6 @@ Status RowPositionColumnReader::skip(int64_t rows) {
7574
return Status::OK();
7675
}
7776
_next_row_position += rows;
78-
update_reader_skip_rows(rows);
7977
return Status::OK();
8078
}
8179

be/src/format/new_parquet/reader/scalar_column_reader.cpp

Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -70,6 +70,9 @@ Status ScalarColumnReader::skip_records(int64_t rows) {
7070
return Status::InternalError("Parquet record reader is not initialized for column {}",
7171
_name);
7272
}
73+
if (rows <= 0) {
74+
return Status::OK();
75+
}
7376
int64_t skipped_rows = 0;
7477
try {
7578
_record_reader->Reset();
@@ -89,6 +92,7 @@ Status ScalarColumnReader::skip_records(int64_t rows) {
8992
return Status::InternalError("Failed to skip parquet records for column {}: {}", _name,
9093
e.what());
9194
}
95+
update_reader_skip_rows(rows);
9296
return Status::OK();
9397
}
9498

@@ -129,9 +133,9 @@ Status ScalarColumnReader::skip(int64_t rows) {
129133

130134
const int64_t page_filtered_rows = page_filtered_rows_to_skip(rows);
131135
DORIS_CHECK(page_filtered_rows <= rows);
132-
RETURN_IF_ERROR(skip_records(rows - page_filtered_rows));
136+
const int64_t record_reader_skip_rows = rows - page_filtered_rows;
137+
RETURN_IF_ERROR(skip_records(record_reader_skip_rows));
133138
advance_rows_read(rows);
134-
update_reader_skip_rows(rows);
135139
return Status::OK();
136140
}
137141

be/src/format/new_parquet/reader/struct_column_reader.cpp

Lines changed: 0 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -38,7 +38,6 @@ Status StructColumnReader::read(int64_t rows, MutableColumnPtr& column, int64_t*
3838
if (_children.empty()) {
3939
column->resize(static_cast<size_t>(rows));
4040
*rows_read = rows;
41-
update_reader_read_rows(*rows_read);
4241
return Status::OK();
4342
}
4443

@@ -153,7 +152,6 @@ Status StructColumnReader::read(int64_t rows, MutableColumnPtr& column, int64_t*
153152
append_parent_nulls(parent_null_map, parent_nulls);
154153
}
155154
*rows_read = expected_rows;
156-
update_reader_read_rows(*rows_read);
157155
return Status::OK();
158156
}
159157

@@ -269,7 +267,6 @@ Status StructColumnReader::read(int64_t rows, MutableColumnPtr& column, int64_t*
269267
append_parent_nulls(parent_null_map, parent_nulls);
270268
}
271269
*rows_read = expected_rows;
272-
update_reader_read_rows(*rows_read);
273270
return Status::OK();
274271
}
275272

@@ -305,7 +302,6 @@ Status StructColumnReader::read(int64_t rows, MutableColumnPtr& column, int64_t*
305302
}
306303

307304
*rows_read = std::max<int64_t>(expected_rows, 0);
308-
update_reader_read_rows(*rows_read);
309305
return Status::OK();
310306
}
311307

@@ -316,7 +312,6 @@ Status StructColumnReader::skip(int64_t rows) {
316312
for (auto& child_reader : _children) {
317313
RETURN_IF_ERROR(child_reader->skip(rows));
318314
}
319-
update_reader_skip_rows(rows);
320315
return Status::OK();
321316
}
322317

be/test/format/new_parquet/parquet_reader_test.cpp

Lines changed: 6 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -2060,19 +2060,21 @@ TEST_F(NewParquetReaderTest, PageIndexFilteredPagesDoNotDoubleSkipOutputColumns)
20602060
}
20612061
}
20622062

2063-
ASSERT_NE(profile.get_counter("PagesSkippedByFilter"), nullptr);
2064-
ASSERT_NE(profile.get_counter("PageSkipBytes"), nullptr);
2063+
ASSERT_NE(profile.get_counter("PagesSkippedByDataPageFilter"), nullptr);
2064+
ASSERT_NE(profile.get_counter("DataPageFilterSkipBytes"), nullptr);
20652065
ASSERT_NE(profile.get_counter("RawRowsRead"), nullptr);
20662066
ASSERT_NE(profile.get_counter("SelectedRows"), nullptr);
20672067
ASSERT_NE(profile.get_counter("RangeGapSkippedRows"), nullptr);
2068+
ASSERT_NE(profile.get_counter("ReaderSkipRows"), nullptr);
20682069
ASSERT_NE(profile.get_counter("RowGroupFilterTime"), nullptr);
20692070
ASSERT_NE(profile.get_counter("PageIndexFilterTime"), nullptr);
20702071
ASSERT_NE(profile.get_counter("PageIndexReadTime"), nullptr);
2071-
EXPECT_GT(profile.get_counter("PagesSkippedByFilter")->value(), 0);
2072-
EXPECT_GT(profile.get_counter("PageSkipBytes")->value(), 0);
2072+
EXPECT_GT(profile.get_counter("PagesSkippedByDataPageFilter")->value(), 0);
2073+
EXPECT_GT(profile.get_counter("DataPageFilterSkipBytes")->value(), 0);
20732074
EXPECT_EQ(profile.get_counter("RawRowsRead")->value(), 64);
20742075
EXPECT_EQ(profile.get_counter("SelectedRows")->value(), 64);
20752076
EXPECT_GT(profile.get_counter("RangeGapSkippedRows")->value(), 0);
2077+
EXPECT_EQ(profile.get_counter("ReaderSkipRows")->value(), 0);
20762078
EXPECT_GT(profile.get_counter("RowGroupFilterTime")->value(), 0);
20772079
EXPECT_GT(profile.get_counter("PageIndexFilterTime")->value(), 0);
20782080
EXPECT_GT(profile.get_counter("PageIndexReadTime")->value(), 0);

0 commit comments

Comments
 (0)