Skip to content

Commit 93bbb41

Browse files
[fix](cloud)(restore) fix broken schema during restore of lsc=false tables
Problem ------- In cloud mode, when a snapshot of a table created with `light_schema_change = false` is restored, the schema KV persisted under `meta_schema_key({instance_id, index_id, schema_version})` ends up with every column carrying `unique_id = -1`, because the schema is first written by `create_tablet` and at that point the unique_ids are not yet assigned. `commit_restore_job` then receives the correct schema (with valid `unique_id >= 0`) in each rowset meta from the backup, but the existing `put_schema_kv` is a no-op when the key already exists, so the broken schema leaks through and subsequent reads fail with errors such as `column reader is nullptr` or `different type between schema and column reader`. Fix --- Introduce `put_schema_kv_on_restore()` in the cloud MetaService. It reads the existing schema value, detects the broken-schema signature (`column(0).unique_id() == -1` or an unparseable value), range-removes all chunks of the stale schema, and writes the correct one. To avoid replacing a bad schema with another bad one, it also refuses to write a schema that is itself broken (empty columns or `column(0).unique_id == -1`) and only logs a warning in that case. In `MetaServiceImpl::commit_restore_job`, replace both existing `put_schema_kv` call sites with `put_schema_kv_on_restore`, guarded by an in-RPC `std::set<std::string>` so the same `(index_id, schema_version)` pair does not issue redundant FDB reads/writes when the restore spans many rowsets. Four counters (`rs_meta_schema_put_cnt/skip_cnt`, `tablet_meta_schema_put_cnt/skip_cnt`) are logged at the end of the RPC to make the behaviour observable in production. `put_versioned_schema_kv()` is intentionally NOT wrapped by the dedup set: it targets a different key space (`versioned::meta_schema_key`) and is already skip-if-exists inside the function. Tests ----- `cloud/test/meta_service_test.cpp` adds 5 unit tests covering every branch of `put_schema_kv_on_restore`: - `PutWhenKeyNotExist` — first-write path - `NoopWhenExistingSchemaIsGood` — skip-if-healthy path - `OverwriteWhenExistingIsBroken` — the fix itself - `DefensiveSkipWhenIncomingHasEmptyColumns` — defensive guard - `DefensiveSkipWhenIncomingHasUidNegativeOne` — defensive guard
1 parent 34b5c6c commit 93bbb41

4 files changed

Lines changed: 419 additions & 6 deletions

File tree

cloud/src/meta-service/meta_service.cpp

Lines changed: 33 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -41,6 +41,7 @@
4141
#include <memory>
4242
#include <numeric>
4343
#include <ostream>
44+
#include <set>
4445
#include <sstream>
4546
#include <string>
4647
#include <string_view>
@@ -1764,6 +1765,15 @@ void MetaServiceImpl::commit_restore_job(::google::protobuf::RpcController* cont
17641765
TabletStats tablet_stat;
17651766
int64_t converted_rowset_num = 0;
17661767
int32_t max_batch_size = config::max_restore_job_rowsets_per_batch;
1768+
// Track schema keys for which `put_schema_kv_on_restore` has already been
1769+
// invoked within this RPC, so the same (index_id, schema_version) does not
1770+
// issue redundant FDB reads/writes across rowsets. Only covers the
1771+
// `meta_schema_key` path; `put_versioned_schema_kv` remains independent.
1772+
std::set<std::string> restored_schema_keys;
1773+
int64_t rs_meta_schema_put_cnt = 0;
1774+
int64_t rs_meta_schema_skip_cnt = 0;
1775+
int64_t tablet_meta_schema_put_cnt = 0;
1776+
int64_t tablet_meta_schema_skip_cnt = 0;
17671777
for (size_t i = 0; i < restore_job_rs_metas.size(); i += max_batch_size) {
17681778
size_t end = (i + max_batch_size) > restore_job_rs_metas.size()
17691779
? restore_job_rs_metas.size()
@@ -1791,9 +1801,16 @@ void MetaServiceImpl::commit_restore_job(::google::protobuf::RpcController* cont
17911801
return;
17921802
}
17931803
}
1794-
put_schema_kv(code, msg, txn.get(), schema_key, rowset_meta.tablet_schema());
1795-
if (code != MetaServiceCode::OK) {
1796-
return;
1804+
if (restored_schema_keys.count(schema_key) == 0) {
1805+
put_schema_kv_on_restore(code, msg, txn.get(), schema_key,
1806+
rowset_meta.tablet_schema());
1807+
if (code != MetaServiceCode::OK) {
1808+
return;
1809+
}
1810+
restored_schema_keys.insert(schema_key);
1811+
++rs_meta_schema_put_cnt;
1812+
} else {
1813+
++rs_meta_schema_skip_cnt;
17971814
}
17981815
if (is_versioned_write) {
17991816
std::string versioned_schema_key = versioned::meta_schema_key(
@@ -2052,8 +2069,14 @@ void MetaServiceImpl::commit_restore_job(::google::protobuf::RpcController* cont
20522069
fix_column_type(tablet_meta->mutable_schema());
20532070
auto schema_key = meta_schema_key(
20542071
{instance_id, tablet_meta->index_id(), tablet_meta->schema_version()});
2055-
put_schema_kv(code, msg, txn0.get(), schema_key, tablet_meta->schema());
2056-
if (code != MetaServiceCode::OK) return;
2072+
if (restored_schema_keys.count(schema_key) == 0) {
2073+
put_schema_kv_on_restore(code, msg, txn0.get(), schema_key, tablet_meta->schema());
2074+
if (code != MetaServiceCode::OK) return;
2075+
restored_schema_keys.insert(schema_key);
2076+
++tablet_meta_schema_put_cnt;
2077+
} else {
2078+
++tablet_meta_schema_skip_cnt;
2079+
}
20572080

20582081
bool is_versioned_write = is_version_write_enabled(instance_id);
20592082
if (is_versioned_write) {
@@ -2161,7 +2184,11 @@ void MetaServiceImpl::commit_restore_job(::google::protobuf::RpcController* cont
21612184
.tag("tablet_id", tablet_idx.tablet_id())
21622185
.tag("state", restore_job_pb.state())
21632186
.tag("mtime_s", restore_job_pb.mtime_s())
2164-
.tag("committed_rowset_num", converted_rowset_num);
2187+
.tag("committed_rowset_num", converted_rowset_num)
2188+
.tag("rs_meta_schema_put_cnt", rs_meta_schema_put_cnt)
2189+
.tag("rs_meta_schema_skip_cnt", rs_meta_schema_skip_cnt)
2190+
.tag("tablet_meta_schema_put_cnt", tablet_meta_schema_put_cnt)
2191+
.tag("tablet_meta_schema_skip_cnt", tablet_meta_schema_skip_cnt);
21652192
err = txn0->commit();
21662193
if (err != TxnErrorCode::TXN_OK) {
21672194
code = cast_as<ErrCategory::COMMIT>(err);

cloud/src/meta-service/meta_service_schema.cpp

Lines changed: 52 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -34,6 +34,7 @@
3434
#include "cpp/sync_point.h"
3535
#include "meta-service/meta_service_helper.h"
3636
#include "meta-store/blob_message.h"
37+
#include "meta-store/codec.h"
3738
#include "meta-store/document_message.h"
3839
#include "meta-store/keys.h"
3940
#include "meta-store/txn_kv.h"
@@ -133,6 +134,57 @@ void put_schema_kv(MetaServiceCode& code, std::string& msg, Transaction* txn,
133134
}
134135
}
135136

137+
void put_schema_kv_on_restore(MetaServiceCode& code, std::string& msg, Transaction* txn,
138+
std::string_view schema_key,
139+
const doris::TabletSchemaCloudPB& schema) {
140+
// Decide whether we need to (re)write the schema at this key.
141+
bool need_put = false;
142+
ValueBuf val_buf;
143+
TxnErrorCode err = cloud::blob_get(txn, schema_key, &val_buf);
144+
if (err == TxnErrorCode::TXN_KEY_NOT_FOUND) {
145+
need_put = true;
146+
} else if (err == TxnErrorCode::TXN_OK) {
147+
// Overwrite if the existing schema value cannot be parsed, or if its
148+
// first column has unique_id == -1 which is the signature of a broken
149+
// schema written by create_tablet for a light_schema_change=false
150+
// table during restore.
151+
doris::TabletSchemaCloudPB saved_schema;
152+
need_put = !parse_schema_value(val_buf, &saved_schema) ||
153+
(saved_schema.column_size() > 0 && saved_schema.column(0).unique_id() == -1);
154+
} else {
155+
code = cast_as<ErrCategory::READ>(err);
156+
msg = fmt::format("failed to get schema during restore, err={}", err);
157+
return;
158+
}
159+
if (!need_put) {
160+
return;
161+
}
162+
// Defensive check: refuse to overwrite with a schema that is itself
163+
// broken (empty columns, or column(0).unique_id == -1). This guarantees
164+
// we never replace a bad schema with another bad one. On rejection we
165+
// only log, so callers can continue committing other work.
166+
if (schema.column_size() == 0 || schema.column(0).unique_id() == -1) {
167+
LOG_WARNING("skip put schema during restore, incoming schema is broken")
168+
.tag("key", hex(schema_key))
169+
.tag("column_size", schema.column_size());
170+
return;
171+
}
172+
// `put_schema_kv` stores the schema as either a single KV (when
173+
// meta_schema_value_version == 0) or multiple blob chunks keyed by
174+
// `schema_key + encode_int64(ver << 56 + i)`. To clean up all possible
175+
// existing chunks we do a range remove over `[schema_key, schema_key + INT64_MAX)`.
176+
std::string schema_key_end(schema_key);
177+
encode_int64(INT64_MAX, &schema_key_end);
178+
txn->remove(schema_key, schema_key_end);
179+
uint8_t ver = config::meta_schema_value_version;
180+
if (ver > 0) {
181+
cloud::blob_put(txn, schema_key, schema, ver);
182+
} else {
183+
txn->put(schema_key, schema.SerializeAsString());
184+
}
185+
LOG_INFO("put schema during restore").tag("key", hex(schema_key));
186+
}
187+
136188
void put_versioned_schema_kv(MetaServiceCode& code, std::string& msg, Transaction* txn,
137189
std::string_view schema_key,
138190
const doris::TabletSchemaCloudPB& schema) {

cloud/src/meta-service/meta_service_schema.h

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,15 @@ struct ValueBuf;
2727
void put_schema_kv(MetaServiceCode& code, std::string& msg, Transaction* txn,
2828
std::string_view schema_key, const doris::TabletSchemaCloudPB& schema);
2929

30+
// Put schema during a restore job. Unlike put_schema_kv, this function will
31+
// overwrite an existing schema key whose contents are broken (cannot be
32+
// parsed, or column(0).unique_id == -1 produced by create_tablet for tables
33+
// with light_schema_change=false). It also refuses to write a new schema
34+
// that is itself broken, to avoid replacing a bad schema with another bad one.
35+
void put_schema_kv_on_restore(MetaServiceCode& code, std::string& msg, Transaction* txn,
36+
std::string_view schema_key,
37+
const doris::TabletSchemaCloudPB& schema);
38+
3039
void put_versioned_schema_kv(MetaServiceCode& code, std::string& msg, Transaction* txn,
3140
std::string_view schema_key, const doris::TabletSchemaCloudPB& schema);
3241

0 commit comments

Comments
 (0)