Skip to content

Commit fd3a313

Browse files
authored
[improve](compaction) Use segment footer raw_data_bytes for first-time batch size estimation (#62271)
## Summary - When vertical compaction runs for the first time on a tablet (no historical sampling data), `estimate_batch_size()` previously returned a hardcoded value of 992, which could cause OOM for wide tables or be too conservative for narrow tables - This change uses `ColumnMetaPB.raw_data_bytes` from segment footer to compute a per-row size estimate for the first compaction. `raw_data_bytes` records the original data size before encoding, which closely approximates runtime `Block::bytes()` - Historical sampling now uses `Block::allocated_bytes()` instead of `bytes()` for more accurate memory estimation (`size()` vs `capacity()`) - Subsequent compactions with historical sampling data are completely unchanged ### Key design decisions | Column type | Estimation strategy | |------------|-------------------| | Scalar (INT/VARCHAR etc.) | `raw_data_bytes / rows_with_data` + structural compensation (+1 null map, +8 offset) | | Complex (ARRAY/MAP/STRUCT) | `raw_data_bytes / rows_with_data`, no compensation (already includes recursive sub-writer data) | | VARIANT (root/subcolumn) | Fallback to 992 (`raw_data_bytes=0 // TODO` in writer) | ### Performance safeguards - Footer collection only runs on first compaction (no historical sampling data) - Skipped entirely when `compaction_batch_size` is manually set - OOM backoff and sparse optimization paths are untouched ## Test plan - [ ] Wide table (200+ columns) first compaction does not OOM - [ ] Narrow table first compaction batch_size is close to upper limit - [ ] Multi-round compaction: first round uses footer, subsequent rounds use historical sampling - [ ] Variant columns fallback to 992 - [ ] Sparse optimization is not affected - [ ] `TestFirstCompactionUsesFooterEstimation` unit test passes - [ ] `TestFooterRawDataBytesAccuracy` unit test passes
1 parent 7d63a11 commit fd3a313

File tree

3 files changed

+284
-4
lines changed

3 files changed

+284
-4
lines changed

be/src/storage/iterator/vertical_merge_iterator.h

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -208,7 +208,7 @@ class VerticalMergeIteratorContext {
208208

209209
size_t bytes() {
210210
if (_block) {
211-
return _block->bytes();
211+
return _block->allocated_bytes();
212212
} else {
213213
return 0;
214214
}

be/src/storage/merger.cpp

Lines changed: 120 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -30,6 +30,7 @@
3030
#include <ostream>
3131
#include <shared_mutex>
3232
#include <string>
33+
#include <unordered_map>
3334
#include <utility>
3435
#include <vector>
3536

@@ -45,9 +46,11 @@
4546
#include "storage/olap_common.h"
4647
#include "storage/olap_define.h"
4748
#include "storage/rowid_conversion.h"
49+
#include "storage/rowset/beta_rowset.h"
4850
#include "storage/rowset/rowset.h"
4951
#include "storage/rowset/rowset_meta.h"
5052
#include "storage/rowset/rowset_writer.h"
53+
#include "storage/segment/segment.h"
5154
#include "storage/segment/segment_writer.h"
5255
#include "storage/storage_engine.h"
5356
#include "storage/tablet/base_tablet.h"
@@ -412,7 +415,8 @@ Status Merger::vertical_compact_one_group(
412415
}
413416

414417
int64_t estimate_batch_size(int group_index, BaseTabletSPtr tablet, int64_t way_cnt,
415-
ReaderType reader_type) {
418+
ReaderType reader_type, int64_t group_per_row_from_footer,
419+
bool footer_fallback) {
416420
auto& sample_info_lock = tablet->get_sample_info_lock(reader_type);
417421
auto& sample_infos = tablet->get_sample_infos(reader_type);
418422
std::unique_lock<std::mutex> lock(sample_info_lock);
@@ -440,9 +444,21 @@ int64_t estimate_batch_size(int group_index, BaseTabletSPtr tablet, int64_t way_
440444
group_data_size = info.bytes / info.rows;
441445
sample_infos[group_index].group_data_size = group_data_size;
442446
} else {
447+
// No historical sampling data available.
448+
// Try to use raw_data_bytes from segment footer for a better estimate.
449+
if (!footer_fallback && group_per_row_from_footer > 0) {
450+
int64_t batch_size = block_mem_limit / group_per_row_from_footer;
451+
int64_t res = std::max(std::min(batch_size, int64_t(4096 - 32)), int64_t(32L));
452+
LOG(INFO) << "estimate batch size from footer for vertical compaction, tablet id: "
453+
<< tablet->tablet_id()
454+
<< " group_per_row_from_footer: " << group_per_row_from_footer
455+
<< " way cnt: " << way_cnt << " batch size: " << res;
456+
return res;
457+
}
443458
LOG(INFO) << "estimate batch size for vertical compaction, tablet id: "
444459
<< tablet->tablet_id() << " group data size: " << info.group_data_size
445-
<< " row num: " << info.rows << " consume bytes: " << info.bytes;
460+
<< " row num: " << info.rows << " consume bytes: " << info.bytes
461+
<< " footer_fallback: " << footer_fallback;
446462
return 1024 - 32;
447463
}
448464

@@ -548,13 +564,114 @@ Status Merger::vertical_merge_rowsets(BaseTabletSPtr tablet, ReaderType reader_t
548564
std::unique_lock<std::mutex> lock(sample_info_lock);
549565
sample_infos.resize(column_groups.size());
550566
}
567+
// Collect per-column raw_data_bytes from segment footer for first-time batch size estimation.
568+
// raw_data_bytes is the original data size before encoding, close to runtime Block::bytes().
569+
// Only collect when needed: skip if manual batch_size override is set, or if ALL groups
570+
// already have historical sampling data. Use per-group granularity so that schema evolution
571+
// (new groups without history) still gets footer-based estimation.
572+
struct ColumnRawSizeInfo {
573+
int64_t total_raw_bytes = 0;
574+
int64_t rows_with_data = 0;
575+
};
576+
std::unordered_map<int32_t, ColumnRawSizeInfo> column_raw_sizes;
577+
bool need_footer_collection = false;
578+
if (config::compaction_batch_size == -1) {
579+
std::unique_lock<std::mutex> lock(sample_info_lock);
580+
for (const auto& info : sample_infos) {
581+
if (info.group_data_size <= 0 && info.bytes <= 0 && info.rows <= 0) {
582+
need_footer_collection = true;
583+
break;
584+
}
585+
}
586+
}
587+
if (need_footer_collection) {
588+
for (const auto& rs_reader : src_rowset_readers) {
589+
auto beta_rowset = std::dynamic_pointer_cast<BetaRowset>(rs_reader->rowset());
590+
if (!beta_rowset) {
591+
continue;
592+
}
593+
std::vector<segment_v2::SegmentSharedPtr> segments;
594+
auto st = beta_rowset->load_segments(&segments);
595+
if (!st.ok()) {
596+
LOG(WARNING) << "Failed to load segments for footer raw_data_bytes collection"
597+
<< ", tablet_id: " << tablet->tablet_id()
598+
<< ", rowset_id: " << beta_rowset->rowset_id() << ", status: " << st;
599+
continue;
600+
}
601+
for (const auto& segment : segments) {
602+
int64_t row_count = segment->num_rows();
603+
auto collect_st = segment->traverse_column_meta_pbs(
604+
[&](const segment_v2::ColumnMetaPB& meta) {
605+
int32_t uid = meta.unique_id();
606+
if (uid >= 0 && meta.has_raw_data_bytes()) {
607+
auto& info = column_raw_sizes[uid];
608+
info.total_raw_bytes += meta.raw_data_bytes();
609+
info.rows_with_data += row_count;
610+
}
611+
});
612+
if (!collect_st.ok()) {
613+
LOG(WARNING) << "Failed to traverse column meta for footer collection"
614+
<< ", tablet_id: " << tablet->tablet_id()
615+
<< ", status: " << collect_st;
616+
}
617+
}
618+
}
619+
}
620+
621+
// Pre-compute per-row estimate for each column group from footer data.
622+
std::vector<int64_t> group_per_row_from_footer(column_groups.size(), 0);
623+
std::vector<bool> group_footer_fallback(column_groups.size(), false);
624+
for (size_t i = 0; i < column_groups.size(); ++i) {
625+
int64_t group_per_row = 0;
626+
bool need_fallback = false;
627+
for (uint32_t col_ordinal : column_groups[i]) {
628+
const auto& col = tablet_schema.column(col_ordinal);
629+
int32_t uid = col.unique_id();
630+
631+
// Variant columns (root or subcolumn): raw_data_bytes is 0 (TODO in writer),
632+
// cannot estimate from footer, fallback to default for the entire group.
633+
if (uid < 0 || col.is_variant_type()) {
634+
need_fallback = true;
635+
break;
636+
}
637+
638+
int64_t col_per_row = 0;
639+
auto it = column_raw_sizes.find(uid);
640+
if (it != column_raw_sizes.end() && it->second.rows_with_data > 0) {
641+
col_per_row = it->second.total_raw_bytes / it->second.rows_with_data;
642+
643+
// Structural overhead compensation for scalar columns only.
644+
// Only add when footer data actually exists for this column,
645+
// to avoid creating a non-zero estimate purely from structural overhead.
646+
// Complex types (ARRAY/MAP/STRUCT) have raw_data_bytes recursively aggregated
647+
// from sub-writers, so no additional compensation is needed.
648+
if (col.type() != FieldType::OLAP_FIELD_TYPE_ARRAY &&
649+
col.type() != FieldType::OLAP_FIELD_TYPE_MAP &&
650+
col.type() != FieldType::OLAP_FIELD_TYPE_STRUCT) {
651+
if (col.is_nullable()) {
652+
col_per_row += 1; // null map: 1 byte per row
653+
}
654+
if (col.is_length_variable_type()) {
655+
col_per_row += 8; // offset array: 8 bytes per row at runtime
656+
}
657+
}
658+
}
659+
660+
group_per_row += col_per_row;
661+
}
662+
group_per_row_from_footer[i] = group_per_row;
663+
group_footer_fallback[i] = need_fallback;
664+
}
665+
551666
// compact group one by one
552667
for (auto i = 0; i < column_groups.size(); ++i) {
553668
VLOG_NOTICE << "row source size: " << row_sources_buf.total_size();
554669
bool is_key = (i == 0);
555670
int64_t batch_size = config::compaction_batch_size != -1
556671
? config::compaction_batch_size
557-
: estimate_batch_size(i, tablet, merge_way_num, reader_type);
672+
: estimate_batch_size(i, tablet, merge_way_num, reader_type,
673+
group_per_row_from_footer[i],
674+
group_footer_fallback[i]);
558675
CompactionSampleInfo sample_info;
559676
Merger::Statistics group_stats;
560677
group_stats.rowid_conversion = total_stats.rowid_conversion;

be/test/storage/compaction/vertical_compaction_test.cpp

Lines changed: 163 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -63,6 +63,7 @@
6363
#include "storage/rowset/rowset_writer.h"
6464
#include "storage/rowset/rowset_writer_context.h"
6565
#include "storage/schema.h"
66+
#include "storage/segment/segment.h"
6667
#include "storage/storage_engine.h"
6768
#include "storage/tablet/tablet.h"
6869
#include "storage/tablet/tablet_meta.h"
@@ -1192,4 +1193,166 @@ TEST_F(VerticalCompactionTest, TestUniqueKeyVerticalMergeWithNullableSparseColum
11921193
config::sparse_column_compaction_threshold_percent = original_threshold;
11931194
}
11941195

1196+
// Test that first-time compaction (no historical sampling) uses footer raw_data_bytes
1197+
// to estimate batch_size instead of hardcoded 992.
1198+
// This test verifies the footer-based estimation path is triggered and compaction succeeds.
1199+
TEST_F(VerticalCompactionTest, TestFirstCompactionUsesFooterEstimation) {
1200+
// Use small data to ensure compaction completes quickly
1201+
auto num_input_rowset = 2;
1202+
auto num_segments = 1;
1203+
auto rows_per_segment = 1024;
1204+
SegmentsOverlapPB overlap = NONOVERLAPPING;
1205+
std::vector<std::vector<std::vector<std::tuple<int64_t, int64_t>>>> input_data;
1206+
generate_input_data(num_input_rowset, num_segments, rows_per_segment, overlap, input_data);
1207+
1208+
TabletSchemaSPtr tablet_schema = create_schema();
1209+
1210+
// Create input rowsets
1211+
std::vector<RowsetSharedPtr> input_rowsets;
1212+
for (auto i = 0; i < num_input_rowset; i++) {
1213+
RowsetSharedPtr rowset = create_rowset(tablet_schema, overlap, input_data[i], i);
1214+
input_rowsets.push_back(rowset);
1215+
}
1216+
1217+
// Create input rowset readers
1218+
std::vector<RowsetReaderSharedPtr> input_rs_readers;
1219+
for (auto& rowset : input_rowsets) {
1220+
RowsetReaderSharedPtr rs_reader;
1221+
ASSERT_TRUE(rowset->create_reader(&rs_reader).ok());
1222+
input_rs_readers.push_back(std::move(rs_reader));
1223+
}
1224+
1225+
// Create output rowset writer
1226+
auto writer_context = create_rowset_writer_context(tablet_schema, NONOVERLAPPING, 3456,
1227+
{0, input_rowsets.back()->end_version()});
1228+
auto res = RowsetFactory::create_rowset_writer(*engine_ref, writer_context, true);
1229+
ASSERT_TRUE(res.has_value()) << res.error();
1230+
auto output_rs_writer = std::move(res).value();
1231+
1232+
// Create tablet - fresh tablet has no historical sampling data,
1233+
// so estimate_batch_size will hit the else branch and use footer raw_data_bytes.
1234+
TabletSharedPtr tablet = create_tablet(*tablet_schema, false);
1235+
Merger::Statistics stats;
1236+
RowIdConversion rowid_conversion;
1237+
stats.rowid_conversion = &rowid_conversion;
1238+
1239+
// Verify sample_infos are empty (no historical data)
1240+
auto& sample_infos = tablet->get_sample_infos(ReaderType::READER_BASE_COMPACTION);
1241+
EXPECT_TRUE(sample_infos.empty());
1242+
1243+
// Run vertical merge - this should use footer raw_data_bytes for batch size estimation
1244+
// since there is no historical sampling data.
1245+
// The log should contain "estimate batch size from footer" instead of the old hardcoded path.
1246+
auto s = Merger::vertical_merge_rowsets(tablet, ReaderType::READER_BASE_COMPACTION,
1247+
*tablet_schema, input_rs_readers,
1248+
output_rs_writer.get(), 100000, num_segments, &stats);
1249+
ASSERT_TRUE(s.ok()) << s;
1250+
1251+
RowsetSharedPtr out_rowset;
1252+
ASSERT_EQ(Status::OK(), output_rs_writer->build(out_rowset));
1253+
EXPECT_EQ(out_rowset->rowset_meta()->num_rows(),
1254+
num_input_rowset * num_segments * rows_per_segment);
1255+
1256+
// After first compaction, sample_infos should be populated with historical data
1257+
// for subsequent compactions to use.
1258+
auto& updated_infos = tablet->get_sample_infos(ReaderType::READER_BASE_COMPACTION);
1259+
EXPECT_FALSE(updated_infos.empty());
1260+
}
1261+
1262+
// Test that raw_data_bytes in segment footer accurately reflects the original data size
1263+
// for different column types, which is the foundation of footer-based batch size estimation.
1264+
TEST_F(VerticalCompactionTest, TestFooterRawDataBytesAccuracy) {
1265+
// Create a schema with INT key + VARCHAR value to test both fixed and variable-length types
1266+
TabletSchemaSPtr tablet_schema = std::make_shared<TabletSchema>();
1267+
TabletSchemaPB tablet_schema_pb;
1268+
tablet_schema_pb.set_keys_type(DUP_KEYS);
1269+
tablet_schema_pb.set_num_short_key_columns(1);
1270+
tablet_schema_pb.set_num_rows_per_row_block(1024);
1271+
tablet_schema_pb.set_compress_kind(COMPRESS_NONE);
1272+
tablet_schema_pb.set_next_column_unique_id(3);
1273+
1274+
ColumnPB* col_int = tablet_schema_pb.add_column();
1275+
col_int->set_unique_id(1);
1276+
col_int->set_name("c_int");
1277+
col_int->set_type("INT");
1278+
col_int->set_is_key(true);
1279+
col_int->set_length(4);
1280+
col_int->set_index_length(4);
1281+
col_int->set_is_nullable(false);
1282+
col_int->set_is_bf_column(false);
1283+
1284+
ColumnPB* col_varchar = tablet_schema_pb.add_column();
1285+
col_varchar->set_unique_id(2);
1286+
col_varchar->set_name("c_varchar");
1287+
col_varchar->set_type("VARCHAR");
1288+
col_varchar->set_is_key(false);
1289+
col_varchar->set_length(128);
1290+
col_varchar->set_index_length(20);
1291+
col_varchar->set_is_nullable(false);
1292+
col_varchar->set_is_bf_column(false);
1293+
1294+
tablet_schema->init_from_pb(tablet_schema_pb);
1295+
1296+
// Write 1000 rows: INT values + VARCHAR strings of exactly 20 bytes each
1297+
constexpr int kNumRows = 1000;
1298+
constexpr int kStringLen = 20;
1299+
std::string fixed_string(kStringLen, 'x');
1300+
1301+
auto writer_context =
1302+
create_rowset_writer_context(tablet_schema, NONOVERLAPPING, UINT32_MAX, {0, 0});
1303+
auto res = RowsetFactory::create_rowset_writer(*engine_ref, writer_context, true);
1304+
ASSERT_TRUE(res.has_value()) << res.error();
1305+
auto rowset_writer = std::move(res).value();
1306+
1307+
Block block = tablet_schema->create_block();
1308+
auto columns = block.mutate_columns();
1309+
for (int i = 0; i < kNumRows; i++) {
1310+
int32_t int_val = i;
1311+
columns[0]->insert_data(reinterpret_cast<const char*>(&int_val), sizeof(int_val));
1312+
columns[1]->insert_data(fixed_string.data(), fixed_string.size());
1313+
}
1314+
ASSERT_TRUE(rowset_writer->add_block(&block).ok());
1315+
ASSERT_TRUE(rowset_writer->flush().ok());
1316+
1317+
RowsetSharedPtr rowset;
1318+
ASSERT_EQ(Status::OK(), rowset_writer->build(rowset));
1319+
ASSERT_EQ(1, rowset->rowset_meta()->num_segments());
1320+
ASSERT_EQ(kNumRows, rowset->rowset_meta()->num_rows());
1321+
1322+
// Load segments and read footer's raw_data_bytes
1323+
auto beta_rowset = std::dynamic_pointer_cast<BetaRowset>(rowset);
1324+
ASSERT_NE(beta_rowset, nullptr);
1325+
std::vector<segment_v2::SegmentSharedPtr> segments;
1326+
ASSERT_TRUE(beta_rowset->load_segments(&segments).ok());
1327+
ASSERT_EQ(1, segments.size());
1328+
ASSERT_EQ(kNumRows, segments[0]->num_rows());
1329+
1330+
// Collect raw_data_bytes per column from footer
1331+
std::unordered_map<int32_t, uint64_t> raw_bytes_by_uid;
1332+
auto st = segments[0]->traverse_column_meta_pbs(
1333+
[&](const segment_v2::ColumnMetaPB& meta) {
1334+
if (meta.unique_id() >= 0 && meta.has_raw_data_bytes()) {
1335+
raw_bytes_by_uid[meta.unique_id()] = meta.raw_data_bytes();
1336+
}
1337+
});
1338+
ASSERT_TRUE(st.ok()) << st;
1339+
1340+
// Verify INT column (uid=1): raw_data_bytes should be exactly kNumRows * sizeof(int32_t).
1341+
// PageBuilder::get_raw_data_size() accumulates raw data bytes added via add(),
1342+
// for fixed-width types this is exactly N * sizeof(T).
1343+
ASSERT_TRUE(raw_bytes_by_uid.count(1) > 0) << "INT column raw_data_bytes not found in footer";
1344+
EXPECT_EQ(raw_bytes_by_uid[1], kNumRows * sizeof(int32_t))
1345+
<< "INT column: expected " << kNumRows * sizeof(int32_t)
1346+
<< " total raw_data_bytes, got " << raw_bytes_by_uid[1];
1347+
1348+
// Verify VARCHAR column (uid=2): raw_data_bytes should be exactly kNumRows * kStringLen.
1349+
// BinaryPlainPageBuilder/BinaryDictPageBuilder only accumulate src->size (the raw string
1350+
// payload), not offsets, varint length prefixes, or dictionary overhead.
1351+
ASSERT_TRUE(raw_bytes_by_uid.count(2) > 0)
1352+
<< "VARCHAR column raw_data_bytes not found in footer";
1353+
EXPECT_EQ(raw_bytes_by_uid[2], kNumRows * kStringLen)
1354+
<< "VARCHAR column: expected " << kNumRows * kStringLen
1355+
<< " total raw_data_bytes, got " << raw_bytes_by_uid[2];
1356+
}
1357+
11951358
} // namespace doris

0 commit comments

Comments
 (0)