Skip to content

Commit b5c936f

Browse files
[fix](cloud)(restore) fix broken schema during restore of lsc=false tables
When restoring a `light_schema_change=false` table from a shared-nothing cluster to a shared-storage cluster, the tablet schema created by `create_tablet` has all column `unique_id` set to -1. Later, `commit_restore_job` writes rowset meta that contains the correct schema (with valid `unique_id >= 0`). However, `put_schema_kv` skips writing when the schema key already exists, leaving the broken schema (`unique_id = -1`) in place and causing subsequent reads to fail. Fix: 1. Introduce a new helper `put_schema_kv_on_restore` in `meta_service_schema.{h,cpp}`. It checks the existing schema value; if the key does not exist, or the existing schema is unparsable, or the first column's `unique_id` is -1 (i.e. broken), it removes the old key (range-remove to cover blob format) and writes the correct schema. The original `put_schema_kv` interface is kept untouched. 2. In `commit_restore_job`, replace `put_schema_kv` with `put_schema_kv_on_restore` for both rowset-meta and tablet-meta branches. A `std::set<std::string> restored_schema_keys` is added to avoid redundant FDB get/put for the same `(index_id, schema_version)` within a single restore job. 3. Log `rs_meta_schema_put_cnt / skip_cnt` and `tablet_meta_schema_put_cnt / skip_cnt` tags for observability.
1 parent 2a8afb9 commit b5c936f

3 files changed

Lines changed: 73 additions & 6 deletions

File tree

cloud/src/meta-service/meta_service.cpp

Lines changed: 32 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -41,6 +41,7 @@
4141
#include <memory>
4242
#include <numeric>
4343
#include <ostream>
44+
#include <set>
4445
#include <sstream>
4546
#include <string>
4647
#include <string_view>
@@ -1764,6 +1765,13 @@ void MetaServiceImpl::commit_restore_job(::google::protobuf::RpcController* cont
17641765
TabletStats tablet_stat;
17651766
int64_t converted_rowset_num = 0;
17661767
int32_t max_batch_size = config::max_restore_job_rowsets_per_batch;
1768+
// Track schema keys that have already been restored to avoid redundant
1769+
// FDB get/put operations for the same (index_id, schema_version).
1770+
std::set<std::string> restored_schema_keys;
1771+
int64_t rs_meta_schema_put_cnt = 0;
1772+
int64_t rs_meta_schema_skip_cnt = 0;
1773+
int64_t tablet_meta_schema_put_cnt = 0;
1774+
int64_t tablet_meta_schema_skip_cnt = 0;
17671775
for (size_t i = 0; i < restore_job_rs_metas.size(); i += max_batch_size) {
17681776
size_t end = (i + max_batch_size) > restore_job_rs_metas.size()
17691777
? restore_job_rs_metas.size()
@@ -1791,9 +1799,16 @@ void MetaServiceImpl::commit_restore_job(::google::protobuf::RpcController* cont
17911799
return;
17921800
}
17931801
}
1794-
put_schema_kv(code, msg, txn.get(), schema_key, rowset_meta.tablet_schema());
1795-
if (code != MetaServiceCode::OK) {
1796-
return;
1802+
if (restored_schema_keys.count(schema_key) == 0) {
1803+
put_schema_kv_on_restore(code, msg, txn.get(), schema_key,
1804+
rowset_meta.tablet_schema());
1805+
if (code != MetaServiceCode::OK) {
1806+
return;
1807+
}
1808+
restored_schema_keys.insert(schema_key);
1809+
++rs_meta_schema_put_cnt;
1810+
} else {
1811+
++rs_meta_schema_skip_cnt;
17971812
}
17981813
if (is_versioned_write) {
17991814
std::string versioned_schema_key = versioned::meta_schema_key(
@@ -2052,8 +2067,15 @@ void MetaServiceImpl::commit_restore_job(::google::protobuf::RpcController* cont
20522067
fix_column_type(tablet_meta->mutable_schema());
20532068
auto schema_key = meta_schema_key(
20542069
{instance_id, tablet_meta->index_id(), tablet_meta->schema_version()});
2055-
put_schema_kv(code, msg, txn0.get(), schema_key, tablet_meta->schema());
2056-
if (code != MetaServiceCode::OK) return;
2070+
if (restored_schema_keys.count(schema_key) == 0) {
2071+
put_schema_kv_on_restore(code, msg, txn0.get(), schema_key,
2072+
tablet_meta->schema());
2073+
if (code != MetaServiceCode::OK) return;
2074+
restored_schema_keys.insert(schema_key);
2075+
++tablet_meta_schema_put_cnt;
2076+
} else {
2077+
++tablet_meta_schema_skip_cnt;
2078+
}
20572079

20582080
bool is_versioned_write = is_version_write_enabled(instance_id);
20592081
if (is_versioned_write) {
@@ -2161,7 +2183,11 @@ void MetaServiceImpl::commit_restore_job(::google::protobuf::RpcController* cont
21612183
.tag("tablet_id", tablet_idx.tablet_id())
21622184
.tag("state", restore_job_pb.state())
21632185
.tag("mtime_s", restore_job_pb.mtime_s())
2164-
.tag("committed_rowset_num", converted_rowset_num);
2186+
.tag("committed_rowset_num", converted_rowset_num)
2187+
.tag("rs_meta_schema_put_cnt", rs_meta_schema_put_cnt)
2188+
.tag("rs_meta_schema_skip_cnt", rs_meta_schema_skip_cnt)
2189+
.tag("tablet_meta_schema_put_cnt", tablet_meta_schema_put_cnt)
2190+
.tag("tablet_meta_schema_skip_cnt", tablet_meta_schema_skip_cnt);
21652191
err = txn0->commit();
21662192
if (err != TxnErrorCode::TXN_OK) {
21672193
code = cast_as<ErrCategory::COMMIT>(err);

cloud/src/meta-service/meta_service_schema.cpp

Lines changed: 35 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,8 @@
1717

1818
#include "meta-service/meta_service_schema.h"
1919

20+
#include "meta-store/codec.h"
21+
2022
#include <fmt/format.h>
2123
#include <gen_cpp/cloud.pb.h>
2224
#include <gen_cpp/olap_file.pb.h>
@@ -156,6 +158,39 @@ void put_versioned_schema_kv(MetaServiceCode& code, std::string& msg, Transactio
156158
}
157159
}
158160

161+
void put_schema_kv_on_restore(MetaServiceCode& code, std::string& msg, Transaction* txn,
162+
std::string_view schema_key,
163+
const doris::TabletSchemaCloudPB& schema) {
164+
bool need_put = false;
165+
ValueBuf val_buf;
166+
TxnErrorCode err = cloud::blob_get(txn, schema_key, &val_buf);
167+
if (err == TxnErrorCode::TXN_KEY_NOT_FOUND) {
168+
need_put = true;
169+
} else if (err == TxnErrorCode::TXN_OK) {
170+
// Overwrite if existing schema is corrupted or has bad unique_id from create_tablet
171+
doris::TabletSchemaCloudPB saved_schema;
172+
need_put = !parse_schema_value(val_buf, &saved_schema) ||
173+
(saved_schema.column_size() > 0 &&
174+
saved_schema.column(0).unique_id() == -1);
175+
} else {
176+
code = cast_as<ErrCategory::READ>(err);
177+
msg = fmt::format("failed to get schema during restore, err={}", err);
178+
return;
179+
}
180+
if (need_put) {
181+
std::string schema_key_end(schema_key);
182+
encode_int64(INT64_MAX, &schema_key_end);
183+
txn->remove(schema_key, schema_key_end);
184+
uint8_t ver = config::meta_schema_value_version;
185+
if (ver > 0) {
186+
cloud::blob_put(txn, schema_key, schema, ver);
187+
} else {
188+
txn->put(schema_key, schema.SerializeAsString());
189+
}
190+
LOG(INFO) << "put schema during restore, key=" << hex(schema_key);
191+
}
192+
}
193+
159194
bool parse_schema_value(const ValueBuf& buf, doris::TabletSchemaCloudPB* schema) {
160195
// TODO(plat1ko): Apply decompression based on value version
161196
return buf.to_pb(schema);

cloud/src/meta-service/meta_service_schema.h

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -30,6 +30,12 @@ void put_schema_kv(MetaServiceCode& code, std::string& msg, Transaction* txn,
3030
void put_versioned_schema_kv(MetaServiceCode& code, std::string& msg, Transaction* txn,
3131
std::string_view schema_key, const doris::TabletSchemaCloudPB& schema);
3232

33+
// Put schema during restore. Only overwrite if existing schema has broken
34+
// unique_id=-1 (from create_tablet) or doesn't exist yet.
35+
void put_schema_kv_on_restore(MetaServiceCode& code, std::string& msg, Transaction* txn,
36+
std::string_view schema_key,
37+
const doris::TabletSchemaCloudPB& schema);
38+
3339
// Return true if parse success
3440
[[nodiscard]] bool parse_schema_value(const ValueBuf& buf, doris::TabletSchemaCloudPB* schema);
3541

0 commit comments

Comments
 (0)