Skip to content

Commit 9739d89

Browse files
authored
branch-4.0: [fix](cloud) Normalize SC rowset graph before delete bitmap capture (#63981)
## Summary pick #63960
1 parent 7d0c819 commit 9739d89

4 files changed

Lines changed: 111 additions & 35 deletions

File tree

be/src/cloud/cloud_schema_change_job.cpp

Lines changed: 12 additions & 31 deletions
Original file line numberDiff line numberDiff line change
@@ -541,29 +541,8 @@ Status CloudSchemaChangeJob::_convert_historical_rowsets(const SchemaChangeParam
541541
// during double write phase by `CloudMetaMgr::sync_tablet_rowsets` in another thread
542542
std::unique_lock lock {_new_tablet->get_sync_meta_lock()};
543543
std::unique_lock wlock(_new_tablet->get_header_lock());
544-
// Mirror MS behavior: delete rowsets in [2, alter_version] before adding
545-
// SC output rowsets to avoid stale compaction rowsets remaining visible.
546-
{
547-
int64_t alter_ver = sc_job->alter_version();
548-
std::vector<RowsetSharedPtr> to_delete;
549-
for (auto& [v, rs] : _new_tablet->rowset_map()) {
550-
if (v.first >= 2 && v.second <= alter_ver) {
551-
to_delete.push_back(rs);
552-
}
553-
}
554-
if (!to_delete.empty()) {
555-
LOG_INFO(
556-
"schema change: delete {} local rowsets in [2, {}] before adding SC "
557-
"output, tablet_id={}, versions=[{}]",
558-
to_delete.size(), alter_ver, _new_tablet->tablet_id(),
559-
fmt::join(to_delete | std::views::transform([](const auto& rs) {
560-
return rs->version().to_string();
561-
}),
562-
", "));
563-
_new_tablet->delete_rowsets_for_schema_change(to_delete, wlock);
564-
}
565-
}
566-
_new_tablet->add_rowsets(std::move(_output_rowsets), true, wlock, false);
544+
_new_tablet->replace_rowsets_with_schema_change_output(
545+
_output_rowsets, sc_job->alter_version(), wlock, "commit", true);
567546
// Ensure the real new tablet has a continuous local version graph before it becomes
568547
// visible. Later RUNNING-tablet delete bitmap sync depends on capturing all old versions.
569548
RETURN_IF_ERROR(_cloud_storage_engine.meta_mgr().fill_version_holes(
@@ -610,6 +589,11 @@ Status CloudSchemaChangeJob::_process_delete_bitmap(int64_t alter_version,
610589

611590
// step 1, process incremental rowset without delete bitmap update lock
612591
RETURN_IF_ERROR(_cloud_storage_engine.meta_mgr().sync_tablet_rowsets(tmp_tablet.get()));
592+
{
593+
std::unique_lock wlock(tmp_tablet->get_header_lock());
594+
tmp_tablet->replace_rowsets_with_schema_change_output(_output_rowsets, alter_version, wlock,
595+
"delete_bitmap_without_lock", false);
596+
}
613597
int64_t max_version = tmp_tablet->max_version().second;
614598
LOG(INFO) << "alter table for mow table, calculate delete bitmap of "
615599
<< "incremental rowsets without lock, version: " << start_calc_delete_bitmap_version
@@ -619,10 +603,6 @@ Status CloudSchemaChangeJob::_process_delete_bitmap(int64_t alter_version,
619603
{start_calc_delete_bitmap_version, max_version}, CaptureRowsetOps {}));
620604
DBUG_EXECUTE_IF("CloudSchemaChangeJob::_process_delete_bitmap.after.capture_without_lock",
621605
DBUG_BLOCK);
622-
{
623-
std::unique_lock wlock(tmp_tablet->get_header_lock());
624-
tmp_tablet->add_rowsets(_output_rowsets, true, wlock, false);
625-
}
626606
for (auto rowset : ret.rowsets) {
627607
RETURN_IF_ERROR(CloudTablet::update_delete_bitmap_without_lock(tmp_tablet, rowset));
628608
}
@@ -635,17 +615,18 @@ Status CloudSchemaChangeJob::_process_delete_bitmap(int64_t alter_version,
635615
RETURN_IF_ERROR(_cloud_storage_engine.meta_mgr().get_delete_bitmap_update_lock(
636616
*_new_tablet, SCHEMA_CHANGE_DELETE_BITMAP_LOCK_ID, initiator));
637617
RETURN_IF_ERROR(_cloud_storage_engine.meta_mgr().sync_tablet_rowsets(tmp_tablet.get()));
618+
{
619+
std::unique_lock wlock(tmp_tablet->get_header_lock());
620+
tmp_tablet->replace_rowsets_with_schema_change_output(_output_rowsets, alter_version, wlock,
621+
"delete_bitmap_with_lock", false);
622+
}
638623
int64_t new_max_version = tmp_tablet->max_version().second;
639624
LOG(INFO) << "alter table for mow table, calculate delete bitmap of "
640625
<< "incremental rowsets with lock, version: " << max_version + 1 << "-"
641626
<< new_max_version << " new_tablet_id: " << _new_tablet->tablet_id();
642627
if (new_max_version > max_version) {
643628
auto ret = DORIS_TRY(tmp_tablet->capture_consistent_rowsets_unlocked(
644629
{max_version + 1, new_max_version}, CaptureRowsetOps {}));
645-
{
646-
std::unique_lock wlock(tmp_tablet->get_header_lock());
647-
tmp_tablet->add_rowsets(_output_rowsets, true, wlock, false);
648-
}
649630
for (auto rowset : ret.rowsets) {
650631
RETURN_IF_ERROR(CloudTablet::update_delete_bitmap_without_lock(tmp_tablet, rowset));
651632
}

be/src/cloud/cloud_tablet.cpp

Lines changed: 42 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -95,6 +95,17 @@ bvar::Window<bvar::Adder<int64_t>> g_capture_with_freshness_tolerance_fallback_c
9595

9696
static constexpr int LOAD_INITIATOR_ID = -1;
9797

98+
namespace {
99+
100+
bool is_schema_change_output_rowset(const RowsetSharedPtr& rowset,
101+
const std::vector<RowsetSharedPtr>& output_rowsets) {
102+
return std::ranges::any_of(output_rowsets, [&rowset](const RowsetSharedPtr& output_rowset) {
103+
return output_rowset->rowset_id() == rowset->rowset_id();
104+
});
105+
}
106+
107+
} // namespace
108+
98109
bvar::Adder<uint64_t> g_file_cache_cloud_tablet_submitted_segment_size(
99110
"file_cache_cloud_tablet_submitted_segment_size");
100111
bvar::Adder<uint64_t> g_file_cache_cloud_tablet_submitted_segment_num(
@@ -686,7 +697,8 @@ void CloudTablet::delete_rowsets(const std::vector<RowsetSharedPtr>& to_delete,
686697
}
687698

688699
void CloudTablet::delete_rowsets_for_schema_change(const std::vector<RowsetSharedPtr>& to_delete,
689-
std::unique_lock<std::shared_mutex>&) {
700+
std::unique_lock<std::shared_mutex>&,
701+
bool recycle_deleted_rowsets) {
690702
if (to_delete.empty()) {
691703
return;
692704
}
@@ -709,8 +721,35 @@ void CloudTablet::delete_rowsets_for_schema_change(const std::vector<RowsetShare
709721
// the other hits a DCHECK(false) in delete_expired_stale_rowsets().
710722
_tablet_meta->modify_rs_metas({}, rs_metas, true);
711723

712-
// Schedule for direct cache cleanup. MS has already recycled these rowsets.
713-
add_unused_rowsets(to_delete);
724+
if (recycle_deleted_rowsets) {
725+
// Schedule for direct cache cleanup. MS has already recycled these rowsets.
726+
add_unused_rowsets(to_delete);
727+
}
728+
}
729+
730+
void CloudTablet::replace_rowsets_with_schema_change_output(
731+
const std::vector<RowsetSharedPtr>& output_rowsets, int64_t alter_version,
732+
std::unique_lock<std::shared_mutex>& meta_lock, const char* stage,
733+
bool recycle_deleted_rowsets) {
734+
std::vector<RowsetSharedPtr> to_delete;
735+
for (auto& [v, rs] : _rs_version_map) {
736+
if (v.first >= 2 && v.second <= alter_version &&
737+
!is_schema_change_output_rowset(rs, output_rowsets)) {
738+
to_delete.push_back(rs);
739+
}
740+
}
741+
if (!to_delete.empty()) {
742+
LOG_INFO(
743+
"schema change: delete {} local rowsets in [2, {}] before adding SC output, "
744+
"tablet_id={}, stage={}, versions=[{}]",
745+
to_delete.size(), alter_version, tablet_id(), stage,
746+
fmt::join(to_delete | std::views::transform([](const auto& rs) {
747+
return rs->version().to_string();
748+
}),
749+
", "));
750+
delete_rowsets_for_schema_change(to_delete, meta_lock, recycle_deleted_rowsets);
751+
}
752+
add_rowsets(output_rowsets, true, meta_lock, false);
714753
}
715754

716755
uint64_t CloudTablet::delete_expired_stale_rowsets() {

be/src/cloud/cloud_tablet.h

Lines changed: 13 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -164,7 +164,19 @@ class CloudTablet final : public BaseTablet {
164164
// preferring stale compaction rowsets over individual SC output rowsets.
165165
// MUST hold EXCLUSIVE `_meta_lock`.
166166
void delete_rowsets_for_schema_change(const std::vector<RowsetSharedPtr>& to_delete,
167-
std::unique_lock<std::shared_mutex>& meta_lock);
167+
std::unique_lock<std::shared_mutex>& meta_lock,
168+
bool recycle_deleted_rowsets = true);
169+
170+
// Replace local rowsets in [2, alter_version] with schema change output rowsets.
171+
// Existing SC output rowsets are kept; other local/double-write/compaction rowsets
172+
// in this version range are removed from both _rs_version_map and version graph.
173+
// recycle_deleted_rowsets should only be true for the real tablet; temporary
174+
// schema-change delete-bitmap tablets only need to normalize their local graph.
175+
// MUST hold EXCLUSIVE `_meta_lock`.
176+
void replace_rowsets_with_schema_change_output(
177+
const std::vector<RowsetSharedPtr>& output_rowsets, int64_t alter_version,
178+
std::unique_lock<std::shared_mutex>& meta_lock, const char* stage,
179+
bool recycle_deleted_rowsets);
168180

169181
// When the tablet is dropped, we need to recycle cached data:
170182
// 1. The data in file cache

be/test/cloud/cloud_tablet_test.cpp

Lines changed: 44 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1105,6 +1105,50 @@ TEST_F(CloudTabletDeleteRowsetsForSchemaChangeTest, TestSchemaChangeDeletesCompa
11051105
}
11061106
}
11071107

1108+
TEST_F(CloudTabletDeleteRowsetsForSchemaChangeTest,
1109+
TestReplaceSchemaChangeOutputCleansPollutedTmpGraph) {
1110+
auto rs_placeholder = create_rowset(Version(0, 1));
1111+
auto rs_sc_2 = create_rowset(Version(2, 2));
1112+
auto rs_sc_3 = create_rowset(Version(3, 3));
1113+
auto rs_compacted = create_rowset(Version(2, 3));
1114+
auto rs_post_alter = create_rowset(Version(4, 4));
1115+
ASSERT_NE(rs_placeholder, nullptr);
1116+
ASSERT_NE(rs_sc_2, nullptr);
1117+
ASSERT_NE(rs_sc_3, nullptr);
1118+
ASSERT_NE(rs_compacted, nullptr);
1119+
ASSERT_NE(rs_post_alter, nullptr);
1120+
1121+
{
1122+
std::unique_lock wlock(_tablet->get_header_lock());
1123+
_tablet->add_rowsets({rs_placeholder, rs_sc_2, rs_sc_3, rs_compacted, rs_post_alter}, false,
1124+
wlock, false);
1125+
}
1126+
ASSERT_TRUE(_tablet->rowset_map().count(Version(2, 2)));
1127+
ASSERT_TRUE(_tablet->rowset_map().count(Version(3, 3)));
1128+
ASSERT_TRUE(_tablet->rowset_map().count(Version(2, 3)));
1129+
1130+
{
1131+
std::unique_lock wlock(_tablet->get_header_lock());
1132+
_tablet->replace_rowsets_with_schema_change_output({rs_sc_2, rs_sc_3}, 3, wlock, "test",
1133+
false);
1134+
}
1135+
1136+
ASSERT_TRUE(_tablet->rowset_map().count(Version(2, 2)));
1137+
ASSERT_TRUE(_tablet->rowset_map().count(Version(3, 3)));
1138+
ASSERT_FALSE(_tablet->rowset_map().count(Version(2, 3)));
1139+
ASSERT_TRUE(_tablet->rowset_map().count(Version(4, 4)));
1140+
ASSERT_FALSE(_tablet->need_remove_unused_rowsets());
1141+
1142+
auto versions_result = _tablet->capture_consistent_versions_unlocked(Version(0, 4), {});
1143+
ASSERT_TRUE(versions_result.has_value()) << versions_result.error();
1144+
auto& versions = versions_result.value();
1145+
ASSERT_EQ(versions.size(), 4);
1146+
ASSERT_EQ(versions[0], Version(0, 1));
1147+
ASSERT_EQ(versions[1], Version(2, 2));
1148+
ASSERT_EQ(versions[2], Version(3, 3));
1149+
ASSERT_EQ(versions[3], Version(4, 4));
1150+
}
1151+
11081152
// Test that delete_rowsets_for_schema_change with empty input is a no-op
11091153
TEST_F(CloudTabletDeleteRowsetsForSchemaChangeTest, TestEmptyDeleteIsNoop) {
11101154
auto rs = create_rowset(Version(0, 1));

0 commit comments

Comments
 (0)