Skip to content

Commit 901376f

Browse files
authored
branch-4.0: [opt](memory) add BE-level cache for load tablet schemas (#64581)
pick #63508
1 parent 182820f commit 901376f

6 files changed

Lines changed: 287 additions & 32 deletions

File tree

be/src/exec/tablet_info.h

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -103,7 +103,7 @@ class OlapTableSchemaParam {
103103
return _unique_key_update_mode == UniqueKeyUpdateModePB::UPDATE_FLEXIBLE_COLUMNS;
104104
}
105105

106-
std::set<std::string> partial_update_input_columns() const {
106+
const std::set<std::string>& partial_update_input_columns() const {
107107
return _partial_update_input_columns;
108108
}
109109
PartialUpdateNewRowPolicyPB partial_update_new_key_policy() const {

be/src/olap/delta_writer_v2.cpp

Lines changed: 27 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -48,6 +48,7 @@
4848
#include "olap/storage_engine.h"
4949
#include "olap/tablet_manager.h"
5050
#include "olap/tablet_schema.h"
51+
#include "olap/tablet_schema_cache.h"
5152
#include "runtime/exec_env.h"
5253
#include "runtime/query_context.h"
5354
#include "service/backend_options.h"
@@ -217,28 +218,39 @@ Status DeltaWriterV2::cancel_with_status(const Status& st) {
217218
Status DeltaWriterV2::_build_current_tablet_schema(int64_t index_id,
218219
const OlapTableSchemaParam* table_schema_param,
219220
const TabletSchema& ori_tablet_schema) {
220-
_tablet_schema->copy_from(ori_tablet_schema);
221221
// find the right index id
222-
int i = 0;
223-
auto indexes = table_schema_param->indexes();
224-
for (; i < indexes.size(); i++) {
225-
if (indexes[i]->index_id == index_id) {
222+
const OlapTableIndexSchema* index_schema = nullptr;
223+
for (const auto* schema : table_schema_param->indexes()) {
224+
if (schema->index_id == index_id) {
225+
index_schema = schema;
226226
break;
227227
}
228228
}
229229

230-
if (!indexes.empty() && !indexes[i]->columns.empty() &&
231-
indexes[i]->columns[0]->unique_id() >= 0) {
232-
_tablet_schema->build_current_tablet_schema(
233-
index_id, static_cast<int32_t>(table_schema_param->version()), indexes[i],
234-
ori_tablet_schema);
230+
auto cache_key = TabletSchemaCache::build_load_schema_cache_key(
231+
index_id, table_schema_param, ori_tablet_schema, index_schema);
232+
auto cached_schema = TabletSchemaCache::instance()->lookup_schema(cache_key);
233+
if (cached_schema.first != nullptr) {
234+
_tablet_schema = cached_schema.second;
235+
TabletSchemaCache::instance()->release(cached_schema.first);
236+
} else {
237+
_tablet_schema->copy_from(ori_tablet_schema);
238+
if (index_schema != nullptr && !index_schema->columns.empty() &&
239+
index_schema->columns[0]->unique_id() >= 0) {
240+
_tablet_schema->build_current_tablet_schema(
241+
index_id, static_cast<int32_t>(table_schema_param->version()), index_schema,
242+
ori_tablet_schema);
243+
}
244+
_tablet_schema->set_table_id(table_schema_param->table_id());
245+
_tablet_schema->set_db_id(table_schema_param->db_id());
246+
if (table_schema_param->is_partial_update()) {
247+
_tablet_schema->set_auto_increment_column(table_schema_param->auto_increment_coulumn());
248+
}
249+
auto inserted_schema = TabletSchemaCache::instance()->insert(cache_key, _tablet_schema);
250+
_tablet_schema = inserted_schema.second;
251+
TabletSchemaCache::instance()->release(inserted_schema.first);
235252
}
236253

237-
_tablet_schema->set_table_id(table_schema_param->table_id());
238-
_tablet_schema->set_db_id(table_schema_param->db_id());
239-
if (table_schema_param->is_partial_update()) {
240-
_tablet_schema->set_auto_increment_column(table_schema_param->auto_increment_coulumn());
241-
}
242254
// set partial update columns info
243255
_partial_update_info = std::make_shared<PartialUpdateInfo>();
244256
RETURN_IF_ERROR(_partial_update_info->init(

be/src/olap/rowset_builder.cpp

Lines changed: 29 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -48,6 +48,7 @@
4848
#include "olap/tablet_manager.h"
4949
#include "olap/tablet_meta.h"
5050
#include "olap/tablet_schema.h"
51+
#include "olap/tablet_schema_cache.h"
5152
#include "olap/txn_manager.h"
5253
#include "runtime/memory/global_memory_arbitrator.h"
5354
#include "util/brpc_client_cache.h"
@@ -368,21 +369,38 @@ Status BaseRowsetBuilder::_build_current_tablet_schema(
368369
int64_t index_id, const OlapTableSchemaParam* table_schema_param,
369370
const TabletSchema& ori_tablet_schema) {
370371
// find the right index id
371-
int i = 0;
372-
auto indexes = table_schema_param->indexes();
373-
for (; i < indexes.size(); i++) {
374-
if (indexes[i]->index_id == index_id) {
372+
const OlapTableIndexSchema* index_schema = nullptr;
373+
for (const auto* schema : table_schema_param->indexes()) {
374+
if (schema->index_id == index_id) {
375+
index_schema = schema;
375376
break;
376377
}
377378
}
378-
if (!indexes.empty() && !indexes[i]->columns.empty() &&
379-
indexes[i]->columns[0]->unique_id() >= 0) {
380-
_tablet_schema->shawdow_copy_without_columns(ori_tablet_schema);
381-
_tablet_schema->build_current_tablet_schema(
382-
index_id, cast_set<int32_t>(table_schema_param->version()), indexes[i],
383-
ori_tablet_schema);
379+
380+
auto cache_key = TabletSchemaCache::build_load_schema_cache_key(
381+
index_id, table_schema_param, ori_tablet_schema, index_schema);
382+
auto cached_schema = TabletSchemaCache::instance()->lookup_schema(cache_key);
383+
if (cached_schema.first != nullptr) {
384+
_tablet_schema = cached_schema.second;
385+
TabletSchemaCache::instance()->release(cached_schema.first);
384386
} else {
385-
_tablet_schema->copy_from(ori_tablet_schema);
387+
if (index_schema != nullptr && !index_schema->columns.empty() &&
388+
index_schema->columns[0]->unique_id() >= 0) {
389+
_tablet_schema->shawdow_copy_without_columns(ori_tablet_schema);
390+
_tablet_schema->build_current_tablet_schema(
391+
index_id, cast_set<int32_t>(table_schema_param->version()), index_schema,
392+
ori_tablet_schema);
393+
} else {
394+
_tablet_schema->copy_from(ori_tablet_schema);
395+
}
396+
_tablet_schema->set_table_id(table_schema_param->table_id());
397+
_tablet_schema->set_db_id(table_schema_param->db_id());
398+
if (table_schema_param->is_partial_update()) {
399+
_tablet_schema->set_auto_increment_column(table_schema_param->auto_increment_coulumn());
400+
}
401+
auto inserted_schema = TabletSchemaCache::instance()->insert(cache_key, _tablet_schema);
402+
_tablet_schema = inserted_schema.second;
403+
TabletSchemaCache::instance()->release(inserted_schema.first);
386404
}
387405
if (_tablet_schema->schema_version() > ori_tablet_schema.schema_version()) {
388406
// After schema change, should include extracted column
@@ -400,11 +418,6 @@ Status BaseRowsetBuilder::_build_current_tablet_schema(
400418
}
401419
}
402420

403-
_tablet_schema->set_table_id(table_schema_param->table_id());
404-
_tablet_schema->set_db_id(table_schema_param->db_id());
405-
if (table_schema_param->is_partial_update()) {
406-
_tablet_schema->set_auto_increment_column(table_schema_param->auto_increment_coulumn());
407-
}
408421
// set partial update columns info
409422
_partial_update_info = std::make_shared<PartialUpdateInfo>();
410423
RETURN_IF_ERROR(_partial_update_info->init(

be/src/olap/tablet_schema_cache.cpp

Lines changed: 88 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -21,7 +21,12 @@
2121
#include <glog/logging.h>
2222
#include <json2pb/pb_to_json.h>
2323

24+
#include <cstdint>
25+
#include <type_traits>
26+
#include <utility>
27+
2428
#include "bvar/bvar.h"
29+
#include "exec/tablet_info.h"
2530
#include "olap/tablet_schema.h"
2631
#include "util/sha.h"
2732

@@ -39,6 +44,17 @@ static std::string get_key_signature(const std::string& origin) {
3944
return std::string {digest.digest().data(), digest.digest().length()};
4045
}
4146

47+
template <typename T>
48+
static void append_cache_key_value(std::string* key, T value) {
49+
static_assert(std::is_integral_v<T>);
50+
key->append(reinterpret_cast<const char*>(&value), sizeof(value));
51+
}
52+
53+
static void append_cache_key_string(std::string* key, const std::string& value) {
54+
append_cache_key_value(key, static_cast<uint64_t>(value.size()));
55+
key->append(value);
56+
}
57+
4258
std::pair<Cache::Handle*, TabletSchemaSPtr> TabletSchemaCache::insert(const std::string& key) {
4359
std::string key_signature = get_key_signature(key);
4460
auto* lru_handle = lookup(key_signature);
@@ -64,6 +80,78 @@ std::pair<Cache::Handle*, TabletSchemaSPtr> TabletSchemaCache::insert(const std:
6480
return std::make_pair(lru_handle, tablet_schema_ptr);
6581
}
6682

83+
std::pair<Cache::Handle*, TabletSchemaSPtr> TabletSchemaCache::insert(
84+
const std::string& key, TabletSchemaSPtr tablet_schema) {
85+
auto* lru_handle = lookup(key);
86+
TabletSchemaSPtr tablet_schema_ptr;
87+
if (lru_handle) {
88+
auto* value = (CacheValue*)LRUCachePolicy::value(lru_handle);
89+
tablet_schema_ptr = value->tablet_schema;
90+
g_tablet_schema_cache_hit_count << 1;
91+
} else {
92+
DCHECK(tablet_schema != nullptr);
93+
auto* value = new CacheValue;
94+
tablet_schema_ptr = std::move(tablet_schema);
95+
value->tablet_schema = tablet_schema_ptr;
96+
lru_handle = LRUCachePolicy::insert(key, value, tablet_schema_ptr->num_columns(),
97+
tablet_schema_ptr->mem_size(), CachePriority::NORMAL);
98+
g_tablet_schema_cache_count << 1;
99+
g_tablet_schema_cache_columns_count << tablet_schema_ptr->num_columns();
100+
}
101+
DCHECK(lru_handle != nullptr);
102+
return std::make_pair(lru_handle, tablet_schema_ptr);
103+
}
104+
105+
std::pair<Cache::Handle*, TabletSchemaSPtr> TabletSchemaCache::lookup_schema(
106+
const std::string& key) {
107+
auto* lru_handle = lookup(key);
108+
if (lru_handle == nullptr) {
109+
return {nullptr, nullptr};
110+
}
111+
auto* value = (CacheValue*)LRUCachePolicy::value(lru_handle);
112+
g_tablet_schema_cache_hit_count << 1;
113+
return {lru_handle, value->tablet_schema};
114+
}
115+
116+
std::string TabletSchemaCache::build_load_schema_cache_key(
117+
int64_t index_id, const OlapTableSchemaParam* table_schema_param,
118+
const TabletSchema& ori_tablet_schema, const OlapTableIndexSchema* index_schema) {
119+
DCHECK(table_schema_param != nullptr);
120+
std::string cache_key;
121+
cache_key.append("load_schema_v2");
122+
append_cache_key_value(&cache_key, index_id);
123+
append_cache_key_value(&cache_key, table_schema_param->table_id());
124+
append_cache_key_value(&cache_key, table_schema_param->db_id());
125+
append_cache_key_value(&cache_key, table_schema_param->version());
126+
127+
TabletSchemaPB ori_schema_pb;
128+
ori_tablet_schema.to_schema_pb(&ori_schema_pb);
129+
append_cache_key_string(&cache_key,
130+
TabletSchema::deterministic_string_serialize(ori_schema_pb));
131+
if (ori_tablet_schema.num_variant_columns() > 0) {
132+
// Variant schemas carry path set info outside TabletSchemaPB, so do not share them
133+
// across different source TabletSchema objects unless that metadata is serialized.
134+
append_cache_key_value(&cache_key, reinterpret_cast<uintptr_t>(&ori_tablet_schema));
135+
}
136+
137+
std::string auto_increment_column;
138+
if (table_schema_param->is_partial_update()) {
139+
auto_increment_column = table_schema_param->auto_increment_coulumn();
140+
}
141+
append_cache_key_string(&cache_key, auto_increment_column);
142+
143+
const bool has_current_schema = index_schema != nullptr && !index_schema->columns.empty() &&
144+
index_schema->columns[0]->unique_id() >= 0;
145+
append_cache_key_value(&cache_key, has_current_schema);
146+
if (index_schema != nullptr) {
147+
POlapTableIndexSchema index_schema_pb;
148+
index_schema->to_protobuf(&index_schema_pb);
149+
append_cache_key_string(&cache_key,
150+
TabletSchema::deterministic_string_serialize(index_schema_pb));
151+
}
152+
return get_key_signature(cache_key);
153+
}
154+
67155
void TabletSchemaCache::release(Cache::Handle* lru_handle) {
68156
LRUCachePolicy::release(lru_handle);
69157
}

be/src/olap/tablet_schema_cache.h

Lines changed: 17 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -17,12 +17,19 @@
1717

1818
#pragma once
1919

20+
#include <cstdint>
21+
#include <string>
22+
#include <utility>
23+
2024
#include "olap/tablet_fwd.h"
2125
#include "runtime/exec_env.h"
2226
#include "runtime/memory/lru_cache_policy.h"
2327

2428
namespace doris {
2529

30+
class OlapTableSchemaParam;
31+
struct OlapTableIndexSchema;
32+
2633
class TabletSchemaCache : public LRUCachePolicy {
2734
public:
2835
using LRUCachePolicy::insert;
@@ -45,6 +52,16 @@ class TabletSchemaCache : public LRUCachePolicy {
4552

4653
std::pair<Cache::Handle*, TabletSchemaSPtr> insert(const std::string& key);
4754

55+
std::pair<Cache::Handle*, TabletSchemaSPtr> insert(const std::string& key,
56+
TabletSchemaSPtr tablet_schema);
57+
58+
std::pair<Cache::Handle*, TabletSchemaSPtr> lookup_schema(const std::string& key);
59+
60+
static std::string build_load_schema_cache_key(int64_t index_id,
61+
const OlapTableSchemaParam* table_schema_param,
62+
const TabletSchema& ori_tablet_schema,
63+
const OlapTableIndexSchema* index_schema);
64+
4865
void release(Cache::Handle*);
4966

5067
private:

0 commit comments

Comments
 (0)