Skip to content

Commit 02be6ca

Browse files
branch-4.0: [fix](cloud) Fill schema change version holes before running #63443 (#63462)
Cherry-picked from #63443 Co-authored-by: Xin Liao <liaoxin@selectdb.com>
1 parent e12cce8 commit 02be6ca

3 files changed

Lines changed: 163 additions & 0 deletions

File tree

be/src/cloud/cloud_schema_change_job.cpp

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -564,6 +564,10 @@ Status CloudSchemaChangeJob::_convert_historical_rowsets(const SchemaChangeParam
564564
}
565565
}
566566
_new_tablet->add_rowsets(std::move(_output_rowsets), true, wlock, false);
567+
// Ensure the real new tablet has a continuous local version graph before it becomes
568+
// visible. Later RUNNING-tablet delete bitmap sync depends on capturing all old versions.
569+
RETURN_IF_ERROR(_cloud_storage_engine.meta_mgr().fill_version_holes(
570+
_new_tablet.get(), _new_tablet->max_version_unlocked(), wlock));
567571
_new_tablet->set_cumulative_layer_point(_output_cumulative_point);
568572
_new_tablet->reset_approximate_stats(stats.num_rowsets(), stats.num_segments(),
569573
stats.num_rows(), stats.data_size());

be/test/cloud/cloud_schema_change_job_test.cpp

Lines changed: 109 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@
2222
#include <gtest/gtest.h>
2323

2424
#include <memory>
25+
#include <vector>
2526

2627
#include "cloud/cloud_cluster_info.h"
2728
#include "cloud/cloud_storage_engine.h"
@@ -96,6 +97,114 @@ class CloudSchemaChangeJobTest : public testing::Test {
9697
std::shared_ptr<CloudClusterInfo> _cluster_info;
9798
};
9899

100+
TEST_F(CloudSchemaChangeJobTest, FillVersionHolesBeforeNewTabletRunning) {
101+
int64_t base_tablet_id = 40001;
102+
int64_t new_tablet_id = 40002;
103+
104+
TabletMetaSharedPtr base_meta(new TabletMeta(
105+
1, 2, base_tablet_id, base_tablet_id + 100, 4, 5, TTabletSchema(), 6, {{7, 8}},
106+
UniqueId(9, 10), TTabletType::TABLET_TYPE_DISK, TCompressionType::LZ4F));
107+
TabletMetaSharedPtr new_meta(new TabletMeta(
108+
1, 2, new_tablet_id, new_tablet_id + 100, 4, 5, TTabletSchema(), 6, {{7, 8}},
109+
UniqueId(11, 12), TTabletType::TABLET_TYPE_DISK, TCompressionType::LZ4F));
110+
111+
auto base_tablet = std::make_shared<CloudTablet>(_engine, std::move(base_meta));
112+
auto new_tablet = std::make_shared<CloudTablet>(_engine, std::move(new_meta));
113+
static_cast<void>(new_tablet->set_tablet_state(TABLET_NOTREADY));
114+
115+
auto placeholder = create_rowset(new_tablet->tablet_schema(), new_tablet_id, 0, 1);
116+
auto rowset_after_hole = create_rowset(new_tablet->tablet_schema(), new_tablet_id, 4, 4);
117+
ASSERT_NE(placeholder, nullptr);
118+
ASSERT_NE(rowset_after_hole, nullptr);
119+
120+
auto* sp = SyncPoint::get_instance();
121+
sp->clear_all_call_backs();
122+
sp->enable_processing();
123+
124+
sp->set_call_back("CloudMetaMgr::get_tablet_meta", [&](auto&& args) {
125+
auto tablet_id = try_any_cast<int64_t>(args[0]);
126+
auto* meta_ptr = try_any_cast<TabletMetaSharedPtr*>(args[1]);
127+
if (tablet_id == base_tablet_id) {
128+
*meta_ptr = base_tablet->tablet_meta();
129+
} else if (tablet_id == new_tablet_id) {
130+
*meta_ptr = new_tablet->tablet_meta();
131+
}
132+
try_any_cast_ret<Status>(args)->second = true;
133+
});
134+
135+
CloudTablet* loaded_new_tablet = nullptr;
136+
sp->set_call_back("CloudMetaMgr::sync_tablet_rowsets", [&](auto&& outcome) {
137+
auto* tablet = try_any_cast<CloudTablet*>(outcome[0]);
138+
if (tablet->tablet_id() == new_tablet_id) {
139+
loaded_new_tablet = tablet;
140+
std::unique_lock lock(tablet->get_header_lock());
141+
std::vector<RowsetSharedPtr> rowsets;
142+
if (!tablet->rowset_map().count(Version(0, 1))) {
143+
rowsets.push_back(placeholder);
144+
}
145+
if (!tablet->rowset_map().count(Version(4, 4))) {
146+
rowsets.push_back(rowset_after_hole);
147+
}
148+
tablet->add_rowsets(std::move(rowsets), false, lock, false);
149+
}
150+
auto* pairs = try_any_cast_ret<Status>(outcome);
151+
pairs->second = true;
152+
pairs->first = Status::OK();
153+
});
154+
155+
sp->set_call_back("CloudMetaMgr::prepare_tablet_job", [](auto&& outcome) {
156+
auto* pairs = try_any_cast_ret<Status>(outcome);
157+
pairs->second = true;
158+
pairs->first = Status::OK();
159+
160+
auto* resp = try_any_cast<cloud::StartTabletJobResponse*>(outcome[1]);
161+
resp->mutable_status()->set_code(cloud::MetaServiceCode::OK);
162+
resp->set_alter_version(2);
163+
});
164+
165+
bool commit_called = false;
166+
sp->set_call_back("CloudMetaMgr::commit_tablet_job", [&](auto&& outcome) {
167+
commit_called = true;
168+
auto* pairs = try_any_cast_ret<Status>(outcome);
169+
pairs->second = true;
170+
pairs->first = Status::OK();
171+
172+
auto* resp = try_any_cast<cloud::FinishTabletJobResponse*>(outcome[1]);
173+
resp->mutable_status()->set_code(cloud::MetaServiceCode::OK);
174+
auto* stats = resp->mutable_stats();
175+
stats->set_num_rowsets(3);
176+
stats->set_num_segments(0);
177+
stats->set_num_rows(0);
178+
stats->set_data_size(0);
179+
});
180+
181+
TAlterTabletReqV2 request;
182+
request.base_tablet_id = base_tablet_id;
183+
request.new_tablet_id = new_tablet_id;
184+
request.alter_version = 1;
185+
request.__set_alter_tablet_type(TAlterTabletType::SCHEMA_CHANGE);
186+
187+
CloudSchemaChangeJob sc_job(_engine, "test_fill_holes_before_running", 9999999999);
188+
auto status = sc_job.process_alter_tablet(request);
189+
190+
ASSERT_TRUE(status.ok()) << status.to_string();
191+
ASSERT_TRUE(commit_called);
192+
ASSERT_NE(loaded_new_tablet, nullptr);
193+
ASSERT_EQ(loaded_new_tablet->tablet_state(), TABLET_RUNNING);
194+
ASSERT_TRUE(loaded_new_tablet->rowset_map().count(Version(3, 3)));
195+
auto hole_rowset = loaded_new_tablet->rowset_map().at(Version(3, 3));
196+
ASSERT_TRUE(hole_rowset->empty());
197+
ASSERT_TRUE(hole_rowset->is_hole_rowset());
198+
199+
auto versions_result =
200+
loaded_new_tablet->capture_consistent_versions_unlocked(Version(3, 4), {});
201+
ASSERT_TRUE(versions_result.has_value()) << versions_result.error();
202+
const auto& versions = versions_result.value();
203+
ASSERT_EQ(versions.size(), 2);
204+
ASSERT_EQ(versions[0], Version(3, 3));
205+
ASSERT_EQ(versions[1], Version(4, 4));
206+
}
207+
99208
// Test: cross-V1 compaction detected → abort SC job → return SC_COMPACTION_CONFLICT
100209
TEST_F(CloudSchemaChangeJobTest, CrossV1CompactionDetected) {
101210
int64_t base_tablet_id = 10001;

be/test/cloud/cloud_tablet_test.cpp

Lines changed: 50 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,7 @@
2626
#include <cstdint>
2727
#include <ranges>
2828

29+
#include "cloud/cloud_meta_mgr.h"
2930
#include "cloud/cloud_storage_engine.h"
3031
#include "cloud/cloud_warm_up_manager.h"
3132
#include "common/config.h"
@@ -1015,6 +1016,7 @@ class CloudTabletDeleteRowsetsForSchemaChangeTest : public testing::Test {
10151016
rs_meta->set_rowset_type(BETA_ROWSET);
10161017
rs_meta->set_version(version);
10171018
rs_meta->set_rowset_id(_engine.next_rowset_id());
1019+
rs_meta->set_tablet_schema(_tablet->tablet_schema());
10181020
RowsetSharedPtr rowset;
10191021
Status st = RowsetFactory::create_rowset(nullptr, "", rs_meta, &rowset);
10201022
if (!st.ok()) {
@@ -1182,6 +1184,54 @@ TEST_F(CloudTabletDeleteRowsetsForSchemaChangeTest, TestMultipleCompactionRowset
11821184
ASSERT_EQ(versions[10], Version(11, 11));
11831185
}
11841186

1187+
TEST_F(CloudTabletDeleteRowsetsForSchemaChangeTest, TestFillVersionHolesBeforeSchemaChangeRunning) {
1188+
static_cast<void>(_tablet->set_tablet_state(TABLET_NOTREADY));
1189+
_tablet->set_alter_version(10);
1190+
1191+
auto rs_placeholder = create_rowset(Version(0, 1));
1192+
auto rs_historical = create_rowset(Version(2, 10));
1193+
auto rs_after_first_hole = create_rowset(Version(12, 12));
1194+
auto rs_after_second_hole = create_rowset(Version(14, 14));
1195+
ASSERT_NE(rs_placeholder, nullptr);
1196+
ASSERT_NE(rs_historical, nullptr);
1197+
ASSERT_NE(rs_after_first_hole, nullptr);
1198+
ASSERT_NE(rs_after_second_hole, nullptr);
1199+
1200+
cloud::CloudMetaMgr meta_mgr;
1201+
{
1202+
std::unique_lock wlock(_tablet->get_header_lock());
1203+
_tablet->add_rowsets(
1204+
{rs_placeholder, rs_historical, rs_after_first_hole, rs_after_second_hole}, false,
1205+
wlock, false);
1206+
ASSERT_FALSE(_tablet->rowset_map().count(Version(11, 11)));
1207+
ASSERT_FALSE(_tablet->rowset_map().count(Version(13, 13)));
1208+
ASSERT_FALSE(_tablet->capture_consistent_versions_unlocked(Version(0, 14), {}).has_value());
1209+
1210+
auto status =
1211+
meta_mgr.fill_version_holes(_tablet.get(), _tablet->max_version_unlocked(), wlock);
1212+
ASSERT_TRUE(status.ok()) << status.to_string();
1213+
ASSERT_TRUE(_tablet->set_tablet_state(TABLET_RUNNING).ok());
1214+
}
1215+
1216+
for (const Version& version : {Version(11, 11), Version(13, 13)}) {
1217+
ASSERT_TRUE(_tablet->rowset_map().count(version));
1218+
auto hole_rowset = _tablet->rowset_map().at(version);
1219+
ASSERT_TRUE(hole_rowset->empty());
1220+
ASSERT_TRUE(hole_rowset->is_hole_rowset());
1221+
}
1222+
1223+
auto versions_result = _tablet->capture_consistent_versions_unlocked(Version(0, 14), {});
1224+
ASSERT_TRUE(versions_result.has_value()) << versions_result.error();
1225+
const auto& versions = versions_result.value();
1226+
ASSERT_EQ(versions.size(), 6);
1227+
ASSERT_EQ(versions[0], Version(0, 1));
1228+
ASSERT_EQ(versions[1], Version(2, 10));
1229+
ASSERT_EQ(versions[2], Version(11, 11));
1230+
ASSERT_EQ(versions[3], Version(12, 12));
1231+
ASSERT_EQ(versions[4], Version(13, 13));
1232+
ASSERT_EQ(versions[5], Version(14, 14));
1233+
}
1234+
11851235
// Reproduce the CI crash scenario: SC delete puts rowsets to stale, then
11861236
// compaction creates a new stale path with overlapping version keys. When
11871237
// one stale path is cleaned, the other hits DCHECK(false) because the

0 commit comments

Comments
 (0)