Skip to content

Commit 02b430f

Browse files
[fix](iceberg) Avoid dict reads on mixed-encoding position delete files (#61759)
### What problem does this PR solve? Iceberg parquet position delete files currently treat the `file_path` column as dictionary-coded as long as the column chunk has a dictionary page. That check is too loose: parquet allows mixed encodings in the same column chunk, so a chunk can contain both dictionary-encoded and plain-encoded data pages. When that happens, Doris builds a `ColumnDictI32` for `file_path`, but the plain decoder later calls `insert_many_strings()`, which fails with: `Method insert_many_strings is not supported for ColumnDictionary` This PR fixes the issue by only using dictionary-backed decoding for Iceberg position delete `file_path` columns when the entire parquet column chunk is fully dictionary encoded. Mixed-encoding chunks now fall back to normal string columns. It also adds BE unit coverage for: - fully dictionary-encoded parquet metadata - mixed dictionary/plain parquet metadata - parquet metadata without `encoding_stats` but with non-dictionary encodings
1 parent c6dea59 commit 02b430f

File tree

6 files changed

+298
-1
lines changed

6 files changed

+298
-1
lines changed

be/src/format/table/iceberg_delete_file_reader_helper.cpp

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -39,6 +39,7 @@
3939
#include "format/parquet/vparquet_column_chunk_reader.h"
4040
#include "format/parquet/vparquet_reader.h"
4141
#include "format/table/deletion_vector_reader.h"
42+
#include "format/table/iceberg_reader.h"
4243
#include "format/table/table_format_reader.h"
4344
#include "io/hdfs_builder.h"
4445
#include "runtime/descriptors.h"
@@ -135,7 +136,8 @@ Status init_parquet_delete_reader(ParquetReader* reader, bool* dictionary_coded)
135136
*dictionary_coded = true;
136137
for (const auto& row_group : meta_data->row_groups) {
137138
const auto& column_chunk = row_group.columns[0];
138-
if (!(column_chunk.__isset.meta_data && has_dict_page(column_chunk.meta_data))) {
139+
if (!(column_chunk.__isset.meta_data &&
140+
IcebergTableReader::_is_fully_dictionary_encoded(column_chunk.meta_data))) {
139141
*dictionary_coded = false;
140142
break;
141143
}

be/src/format/table/iceberg_reader.cpp

Lines changed: 50 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -107,6 +107,56 @@ class GroupedDeleteRowsVisitor final : public IcebergPositionDeleteVisitor {
107107

108108
const std::string IcebergOrcReader::ICEBERG_ORC_ATTRIBUTE = "iceberg.id";
109109

110+
bool IcebergTableReader::_is_fully_dictionary_encoded(
111+
const tparquet::ColumnMetaData& column_metadata) {
112+
const auto is_dictionary_encoding = [](tparquet::Encoding::type encoding) {
113+
return encoding == tparquet::Encoding::PLAIN_DICTIONARY ||
114+
encoding == tparquet::Encoding::RLE_DICTIONARY;
115+
};
116+
const auto is_data_page = [](tparquet::PageType::type page_type) {
117+
return page_type == tparquet::PageType::DATA_PAGE ||
118+
page_type == tparquet::PageType::DATA_PAGE_V2;
119+
};
120+
const auto is_level_encoding = [](tparquet::Encoding::type encoding) {
121+
return encoding == tparquet::Encoding::RLE || encoding == tparquet::Encoding::BIT_PACKED;
122+
};
123+
124+
// A column chunk may have a dictionary page but still contain plain-encoded data pages.
125+
// Only treat it as dictionary-coded when all data pages are dictionary encoded.
126+
if (column_metadata.__isset.encoding_stats) {
127+
bool has_data_page_stats = false;
128+
for (const tparquet::PageEncodingStats& enc_stat : column_metadata.encoding_stats) {
129+
if (is_data_page(enc_stat.page_type) && enc_stat.count > 0) {
130+
has_data_page_stats = true;
131+
if (!is_dictionary_encoding(enc_stat.encoding)) {
132+
return false;
133+
}
134+
}
135+
}
136+
if (has_data_page_stats) {
137+
return true;
138+
}
139+
}
140+
141+
bool has_dict_encoding = false;
142+
bool has_nondict_encoding = false;
143+
for (const tparquet::Encoding::type& encoding : column_metadata.encodings) {
144+
if (is_dictionary_encoding(encoding)) {
145+
has_dict_encoding = true;
146+
}
147+
148+
if (!is_dictionary_encoding(encoding) && !is_level_encoding(encoding)) {
149+
has_nondict_encoding = true;
150+
break;
151+
}
152+
}
153+
if (!has_dict_encoding || has_nondict_encoding) {
154+
return false;
155+
}
156+
157+
return true;
158+
}
159+
110160
IcebergTableReader::IcebergTableReader(std::unique_ptr<GenericReader> file_format_reader,
111161
RuntimeProfile* profile, RuntimeState* state,
112162
const TFileScanRangeParams& params,

be/src/format/table/iceberg_reader.h

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -106,6 +106,8 @@ class IcebergTableReader : public TableFormatReader, public TableSchemaChangeHel
106106
_row_lineage_columns = std::move(row_lineage_columns);
107107
}
108108

109+
static bool _is_fully_dictionary_encoded(const tparquet::ColumnMetaData& column_metadata);
110+
109111
protected:
110112
struct IcebergProfile {
111113
RuntimeProfile::Counter* num_delete_files;

be/test/format/table/iceberg/iceberg_delete_file_reader_helper_test.cpp

Lines changed: 66 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -17,10 +17,40 @@
1717

1818
#include "format/table/iceberg_delete_file_reader_helper.h"
1919

20+
#include <gen_cpp/Types_types.h>
2021
#include <gtest/gtest.h>
2122

23+
#include <unordered_map>
24+
#include <vector>
25+
26+
#include "io/fs/file_meta_cache.h"
27+
#include "runtime/runtime_profile.h"
28+
#include "runtime/runtime_state.h"
29+
2230
namespace doris {
2331

32+
namespace {
33+
34+
constexpr const char* kMixedPositionDeleteFile =
35+
"./be/test/exec/test_data/iceberg_mixed_position_delete_parquet/"
36+
"mixed_encoding_position_delete.parquet";
37+
constexpr const char* kTargetDataFilePath =
38+
"s3://warehouse/wh/test_db/000_target_data_file.parquet";
39+
40+
class CollectPositionDeleteVisitor final : public IcebergPositionDeleteVisitor {
41+
public:
42+
Status visit(const std::string& file_path, int64_t pos) override {
43+
delete_rows[file_path].push_back(pos);
44+
++total_rows;
45+
return Status::OK();
46+
}
47+
48+
std::unordered_map<std::string, std::vector<int64_t>> delete_rows;
49+
size_t total_rows = 0;
50+
};
51+
52+
} // namespace
53+
2454
TEST(IcebergDeleteFileReaderHelperTest, BuildDeleteFileRange) {
2555
auto range = build_iceberg_delete_file_range("s3://bucket/delete.parquet");
2656
EXPECT_EQ(range.path, "s3://bucket/delete.parquet");
@@ -41,4 +71,40 @@ TEST(IcebergDeleteFileReaderHelperTest, IsNotDeletionVectorWhenContentMissing) {
4171
EXPECT_FALSE(is_iceberg_deletion_vector(delete_file));
4272
}
4373

74+
TEST(IcebergDeleteFileReaderHelperTest, ReadMixedEncodingParquetPositionDeleteFile) {
75+
RuntimeProfile profile("test_profile");
76+
RuntimeState runtime_state((TQueryOptions()), TQueryGlobals());
77+
FileMetaCache meta_cache(1024);
78+
IcebergDeleteFileIOContext io_context(&runtime_state);
79+
80+
TFileScanRangeParams scan_params;
81+
scan_params.file_type = TFileType::FILE_LOCAL;
82+
scan_params.format_type = TFileFormatType::FORMAT_PARQUET;
83+
84+
TIcebergDeleteFileDesc delete_file;
85+
delete_file.path = kMixedPositionDeleteFile;
86+
delete_file.file_format = TFileFormatType::FORMAT_PARQUET;
87+
delete_file.__isset.file_format = true;
88+
89+
IcebergDeleteFileReaderOptions options;
90+
options.state = &runtime_state;
91+
options.profile = &profile;
92+
options.scan_params = &scan_params;
93+
options.io_ctx = &io_context.io_ctx;
94+
options.meta_cache = &meta_cache;
95+
options.batch_size = 1024;
96+
97+
CollectPositionDeleteVisitor visitor;
98+
auto st = read_iceberg_position_delete_file(delete_file, options, &visitor);
99+
ASSERT_TRUE(st.ok()) << st;
100+
ASSERT_EQ(visitor.total_rows, 216);
101+
102+
const auto it = visitor.delete_rows.find(kTargetDataFilePath);
103+
ASSERT_NE(it, visitor.delete_rows.end());
104+
105+
const std::vector<int64_t> expected_positions = {0, 2, 4, 6, 8, 10, 12, 14,
106+
16, 18, 20, 22, 24, 26, 28, 30};
107+
EXPECT_EQ(it->second, expected_positions);
108+
}
109+
44110
} // namespace doris

be/test/format/table/iceberg/iceberg_reader_test.cpp

Lines changed: 177 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -37,13 +37,15 @@
3737
#include "core/column/column_array.h"
3838
#include "core/column/column_nullable.h"
3939
#include "core/column/column_struct.h"
40+
#include "core/column/column_vector.h"
4041
#include "core/data_type/data_type.h"
4142
#include "core/data_type/data_type_array.h"
4243
#include "core/data_type/data_type_factory.hpp"
4344
#include "core/data_type/data_type_nullable.h"
4445
#include "core/data_type/data_type_number.h"
4546
#include "core/data_type/data_type_string.h"
4647
#include "core/data_type/data_type_struct.h"
48+
#include "format/parquet/vparquet_column_chunk_reader.h"
4749
#include "format/parquet/vparquet_reader.h"
4850
#include "io/fs/file_meta_cache.h"
4951
#include "io/fs/file_reader_writer_fwd.h"
@@ -56,6 +58,11 @@
5658

5759
namespace doris {
5860

61+
class IcebergReaderTestHelper : public IcebergTableReader {
62+
public:
63+
using IcebergTableReader::_is_fully_dictionary_encoded;
64+
};
65+
5966
class IcebergReaderTest : public ::testing::Test {
6067
protected:
6168
void SetUp() override {
@@ -68,6 +75,60 @@ class IcebergReaderTest : public ::testing::Test {
6875

6976
void TearDown() override { cache.reset(); }
7077

78+
std::string mixed_position_delete_file() const {
79+
return "./be/test/exec/test_data/iceberg_mixed_position_delete_parquet/"
80+
"mixed_encoding_position_delete.parquet";
81+
}
82+
83+
std::unique_ptr<ParquetReader> create_delete_file_parquet_reader(
84+
RuntimeProfile* profile, RuntimeState* runtime_state, TFileScanRangeParams* scan_params,
85+
TFileRangeDesc* scan_range, io::FileReaderSPtr* file_reader,
86+
const tparquet::FileMetaData** file_meta_data) {
87+
auto local_fs = io::global_local_filesystem();
88+
auto st = local_fs->open_file(mixed_position_delete_file(), file_reader);
89+
EXPECT_TRUE(st.ok()) << st;
90+
if (!st.ok()) {
91+
return nullptr;
92+
}
93+
94+
scan_params->format_type = TFileFormatType::FORMAT_PARQUET;
95+
96+
scan_range->start_offset = 0;
97+
scan_range->size = (*file_reader)->size();
98+
scan_range->path = mixed_position_delete_file();
99+
100+
auto parquet_reader =
101+
ParquetReader::create_unique(profile, *scan_params, *scan_range, 1024,
102+
&timezone_obj, nullptr, runtime_state, cache.get());
103+
EXPECT_NE(parquet_reader, nullptr);
104+
if (parquet_reader == nullptr) {
105+
return nullptr;
106+
}
107+
108+
parquet_reader->set_file_reader(*file_reader);
109+
110+
phmap::flat_hash_map<int, std::vector<std::shared_ptr<ColumnPredicate>>> predicates;
111+
st = parquet_reader->init_reader(delete_file_column_names,
112+
&delete_file_col_name_to_block_idx, {}, predicates,
113+
nullptr, nullptr, nullptr, nullptr, nullptr);
114+
EXPECT_TRUE(st.ok()) << st;
115+
if (!st.ok()) {
116+
return nullptr;
117+
}
118+
119+
std::unordered_map<std::string, std::tuple<std::string, const SlotDescriptor*>>
120+
partition_columns;
121+
std::unordered_map<std::string, VExprContextSPtr> missing_columns;
122+
st = parquet_reader->set_fill_columns(partition_columns, missing_columns);
123+
EXPECT_TRUE(st.ok()) << st;
124+
if (!st.ok()) {
125+
return nullptr;
126+
}
127+
128+
*file_meta_data = parquet_reader->get_meta_data();
129+
return parquet_reader;
130+
}
131+
71132
// Helper function to create complex struct types for testing
72133
void create_complex_struct_types(DataTypePtr& coordinates_struct_type,
73134
DataTypePtr& address_struct_type,
@@ -462,8 +523,124 @@ class IcebergReaderTest : public ::testing::Test {
462523

463524
std::unique_ptr<doris::FileMetaCache> cache;
464525
cctz::time_zone timezone_obj;
526+
std::vector<std::string> delete_file_column_names = {"file_path", "pos"};
527+
std::unordered_map<std::string, uint32_t> delete_file_col_name_to_block_idx = {{"file_path", 0},
528+
{"pos", 1}};
465529
};
466530

531+
TEST_F(IcebergReaderTest, detects_fully_dictionary_encoded_parquet_column) {
532+
tparquet::ColumnMetaData column_metadata;
533+
column_metadata.type = tparquet::Type::BYTE_ARRAY;
534+
column_metadata.__isset.encoding_stats = true;
535+
536+
tparquet::PageEncodingStats dict_page;
537+
dict_page.page_type = tparquet::PageType::DATA_PAGE;
538+
dict_page.encoding = tparquet::Encoding::RLE_DICTIONARY;
539+
dict_page.count = 3;
540+
541+
column_metadata.encoding_stats = {dict_page};
542+
543+
EXPECT_TRUE(IcebergReaderTestHelper::_is_fully_dictionary_encoded(column_metadata));
544+
}
545+
546+
TEST_F(IcebergReaderTest, rejects_mixed_dictionary_and_plain_parquet_column) {
547+
tparquet::ColumnMetaData column_metadata;
548+
column_metadata.type = tparquet::Type::BYTE_ARRAY;
549+
column_metadata.__isset.encoding_stats = true;
550+
551+
tparquet::PageEncodingStats dict_page;
552+
dict_page.page_type = tparquet::PageType::DATA_PAGE;
553+
dict_page.encoding = tparquet::Encoding::RLE_DICTIONARY;
554+
dict_page.count = 2;
555+
556+
tparquet::PageEncodingStats plain_page;
557+
plain_page.page_type = tparquet::PageType::DATA_PAGE;
558+
plain_page.encoding = tparquet::Encoding::PLAIN;
559+
plain_page.count = 1;
560+
561+
column_metadata.encoding_stats = {dict_page, plain_page};
562+
563+
EXPECT_FALSE(IcebergReaderTestHelper::_is_fully_dictionary_encoded(column_metadata));
564+
}
565+
566+
TEST_F(IcebergReaderTest, rejects_mixed_dictionary_and_plain_parquet_v2_column) {
567+
tparquet::ColumnMetaData column_metadata;
568+
column_metadata.type = tparquet::Type::BYTE_ARRAY;
569+
column_metadata.__isset.encoding_stats = true;
570+
571+
tparquet::PageEncodingStats dict_page;
572+
dict_page.page_type = tparquet::PageType::DATA_PAGE_V2;
573+
dict_page.encoding = tparquet::Encoding::RLE_DICTIONARY;
574+
dict_page.count = 2;
575+
576+
tparquet::PageEncodingStats plain_page;
577+
plain_page.page_type = tparquet::PageType::DATA_PAGE_V2;
578+
plain_page.encoding = tparquet::Encoding::PLAIN;
579+
plain_page.count = 1;
580+
581+
column_metadata.encoding_stats = {dict_page, plain_page};
582+
583+
EXPECT_FALSE(IcebergReaderTestHelper::_is_fully_dictionary_encoded(column_metadata));
584+
}
585+
586+
TEST_F(IcebergReaderTest, rejects_non_dictionary_encoding_without_encoding_stats) {
587+
tparquet::ColumnMetaData column_metadata;
588+
column_metadata.type = tparquet::Type::BYTE_ARRAY;
589+
column_metadata.__isset.encoding_stats = false;
590+
column_metadata.encodings = {tparquet::Encoding::PLAIN_DICTIONARY, tparquet::Encoding::PLAIN,
591+
tparquet::Encoding::RLE};
592+
593+
EXPECT_FALSE(IcebergReaderTestHelper::_is_fully_dictionary_encoded(column_metadata));
594+
}
595+
596+
TEST_F(IcebergReaderTest, falls_back_to_encodings_when_data_page_stats_are_missing) {
597+
tparquet::ColumnMetaData column_metadata;
598+
column_metadata.type = tparquet::Type::BYTE_ARRAY;
599+
column_metadata.__isset.encoding_stats = true;
600+
601+
tparquet::PageEncodingStats dict_page_header;
602+
dict_page_header.page_type = tparquet::PageType::DICTIONARY_PAGE;
603+
dict_page_header.encoding = tparquet::Encoding::PLAIN;
604+
dict_page_header.count = 1;
605+
column_metadata.encoding_stats = {dict_page_header};
606+
607+
column_metadata.encodings = {tparquet::Encoding::PLAIN, tparquet::Encoding::RLE,
608+
tparquet::Encoding::RLE_DICTIONARY};
609+
610+
EXPECT_FALSE(IcebergReaderTestHelper::_is_fully_dictionary_encoded(column_metadata));
611+
}
612+
613+
TEST_F(IcebergReaderTest, generated_position_delete_file_is_mixed_encoded) {
614+
RuntimeProfile profile("test_profile");
615+
RuntimeState runtime_state((TQueryOptions()), TQueryGlobals());
616+
TFileScanRangeParams scan_params;
617+
TFileRangeDesc scan_range;
618+
io::FileReaderSPtr file_reader;
619+
const tparquet::FileMetaData* file_meta_data = nullptr;
620+
auto parquet_reader = create_delete_file_parquet_reader(
621+
&profile, &runtime_state, &scan_params, &scan_range, &file_reader, &file_meta_data);
622+
ASSERT_NE(parquet_reader, nullptr);
623+
ASSERT_NE(file_meta_data, nullptr);
624+
ASSERT_EQ(file_meta_data->row_groups.size(), 1);
625+
626+
const auto& file_path_meta = file_meta_data->row_groups[0].columns[0].meta_data;
627+
EXPECT_TRUE(file_meta_data->row_groups[0].columns[0].__isset.meta_data);
628+
EXPECT_TRUE(has_dict_page(file_path_meta));
629+
bool has_plain_encoding = false;
630+
bool has_dictionary_encoding = false;
631+
for (const auto encoding : file_path_meta.encodings) {
632+
if (encoding == tparquet::Encoding::PLAIN) {
633+
has_plain_encoding = true;
634+
}
635+
if (encoding == tparquet::Encoding::PLAIN_DICTIONARY ||
636+
encoding == tparquet::Encoding::RLE_DICTIONARY) {
637+
has_dictionary_encoding = true;
638+
}
639+
}
640+
EXPECT_TRUE(has_plain_encoding);
641+
EXPECT_TRUE(has_dictionary_encoding);
642+
}
643+
467644
// Test reading real Iceberg Parquet file using IcebergTableReader
468645
TEST_F(IcebergReaderTest, read_iceberg_parquet_file) {
469646
// Read only: name, profile.address.coordinates.lat, profile.address.coordinates.lng, profile.contact.email

0 commit comments

Comments
 (0)