|
25 | 25 | #include <cstddef> |
26 | 26 | #include <cstdint> |
27 | 27 | #include <exception> |
| 28 | +#include <map> |
28 | 29 | #include <memory> |
29 | 30 | #include <string> |
30 | 31 | #include <utility> |
|
47 | 48 | namespace doris::parquet { |
48 | 49 | namespace { |
49 | 50 |
|
| 51 | +class DataPageSkipFilter { |
| 52 | +public: |
| 53 | + DataPageSkipFilter(const ParquetPageSkipPlan* page_skip_plan, |
| 54 | + ParquetPageSkipProfile page_skip_profile) |
| 55 | + : _page_skip_plan(page_skip_plan), _page_skip_profile(page_skip_profile) { |
| 56 | + DORIS_CHECK(_page_skip_plan != nullptr); |
| 57 | + } |
| 58 | + |
| 59 | + bool operator()(const ::parquet::DataPageStats&) { |
| 60 | + // Arrow invokes this callback once for each DATA_PAGE/DATA_PAGE_V2 and never for |
| 61 | + // dictionary pages, so this ordinal matches Parquet OffsetIndex page locations. |
| 62 | + const size_t page_idx = _next_data_page_idx++; |
| 63 | + const bool skip = _page_skip_plan->should_skip_page(page_idx); |
| 64 | + if (!skip) { |
| 65 | + return false; |
| 66 | + } |
| 67 | + update_skip_profile(page_idx); |
| 68 | + return true; |
| 69 | + } |
| 70 | + |
| 71 | +private: |
| 72 | + void update_skip_profile(size_t page_idx) const { |
| 73 | + if (_page_skip_profile.skipped_pages != nullptr) { |
| 74 | + COUNTER_UPDATE(_page_skip_profile.skipped_pages, 1); |
| 75 | + } |
| 76 | + if (_page_skip_profile.skipped_bytes != nullptr) { |
| 77 | + COUNTER_UPDATE(_page_skip_profile.skipped_bytes, |
| 78 | + _page_skip_plan->skipped_page_compressed_size(page_idx)); |
| 79 | + } |
| 80 | + } |
| 81 | + |
| 82 | + const ParquetPageSkipPlan* _page_skip_plan = nullptr; |
| 83 | + ParquetPageSkipProfile _page_skip_profile; |
| 84 | + size_t _next_data_page_idx = 0; |
| 85 | +}; |
| 86 | + |
| 87 | +const ParquetPageSkipPlan* find_page_skip_plan( |
| 88 | + const std::map<int, ParquetPageSkipPlan>* page_skip_plans, int leaf_column_id) { |
| 89 | + if (page_skip_plans == nullptr) { |
| 90 | + return nullptr; |
| 91 | + } |
| 92 | + const auto plan_it = page_skip_plans->find(leaf_column_id); |
| 93 | + return plan_it == page_skip_plans->end() ? nullptr : &plan_it->second; |
| 94 | +} |
| 95 | + |
| 96 | +void install_data_page_filter(std::unique_ptr<::parquet::PageReader>& page_reader, |
| 97 | + const std::map<int, ParquetPageSkipPlan>* page_skip_plans, |
| 98 | + int leaf_column_id, ParquetPageSkipProfile page_skip_profile) { |
| 99 | + DORIS_CHECK(page_reader != nullptr); |
| 100 | + const ParquetPageSkipPlan* page_skip_plan = |
| 101 | + find_page_skip_plan(page_skip_plans, leaf_column_id); |
| 102 | + if (page_skip_plan == nullptr) { |
| 103 | + return; |
| 104 | + } |
| 105 | + page_reader->set_data_page_filter(DataPageSkipFilter(page_skip_plan, page_skip_profile)); |
| 106 | +} |
| 107 | + |
50 | 108 | bool supports_nested_scalar_record_reader(const ParquetColumnSchema& column_schema) { |
51 | 109 | if (supports_record_reader(column_schema.type_descriptor)) { |
52 | 110 | return true; |
@@ -156,13 +214,7 @@ Status ParquetColumnReaderFactory::create_scalar_reader( |
156 | 214 | if (reader == nullptr) { |
157 | 215 | return Status::InvalidArgument("reader is null"); |
158 | 216 | } |
159 | | - const ParquetPageSkipPlan* page_skip_plan = nullptr; |
160 | | - if (_page_skip_plans != nullptr) { |
161 | | - auto plan_it = _page_skip_plans->find(column_schema.leaf_column_id); |
162 | | - if (plan_it != _page_skip_plans->end()) { |
163 | | - page_skip_plan = &plan_it->second; |
164 | | - } |
165 | | - } |
| 217 | + const auto* page_skip_plan = find_page_skip_plan(_page_skip_plans, column_schema.leaf_column_id); |
166 | 218 | *reader = std::make_unique<ScalarColumnReader>(column_schema, std::move(record_reader), |
167 | 219 | page_skip_plan, _column_reader_profile); |
168 | 220 | return Status::OK(); |
@@ -246,29 +298,8 @@ Status ParquetColumnReaderFactory::get_record_reader( |
246 | 298 | if (_record_readers[leaf_column_id] == nullptr) { |
247 | 299 | try { |
248 | 300 | auto page_reader = _row_group->GetColumnPageReader(leaf_column_id); |
249 | | - if (_page_skip_plans != nullptr) { |
250 | | - auto plan_it = _page_skip_plans->find(leaf_column_id); |
251 | | - if (plan_it != _page_skip_plans->end()) { |
252 | | - const ParquetPageSkipPlan* page_skip_plan = &plan_it->second; |
253 | | - page_reader->set_data_page_filter( |
254 | | - [page_skip_plan, page_skip_profile = _page_skip_profile, |
255 | | - page_idx = size_t {0}](const ::parquet::DataPageStats&) mutable { |
256 | | - const bool skip = page_skip_plan->should_skip_page(page_idx); |
257 | | - if (skip) { |
258 | | - if (page_skip_profile.skipped_pages != nullptr) { |
259 | | - COUNTER_UPDATE(page_skip_profile.skipped_pages, 1); |
260 | | - } |
261 | | - if (page_skip_profile.skipped_bytes != nullptr) { |
262 | | - COUNTER_UPDATE(page_skip_profile.skipped_bytes, |
263 | | - page_skip_plan->skipped_page_compressed_size( |
264 | | - page_idx)); |
265 | | - } |
266 | | - } |
267 | | - ++page_idx; |
268 | | - return skip; |
269 | | - }); |
270 | | - } |
271 | | - } |
| 301 | + install_data_page_filter(page_reader, _page_skip_plans, leaf_column_id, |
| 302 | + _page_skip_profile); |
272 | 303 | const auto level_info = ::parquet::internal::LevelInfo::ComputeLevelInfo(descriptor); |
273 | 304 | _record_readers[leaf_column_id] = ::parquet::internal::RecordReader::Make( |
274 | 305 | descriptor, level_info, ::arrow::default_memory_pool(), |
|
0 commit comments