diff --git a/be/src/cloud/cloud_base_compaction.cpp b/be/src/cloud/cloud_base_compaction.cpp index f6e98e8fb316b5..64a0186df8d2d9 100644 --- a/be/src/cloud/cloud_base_compaction.cpp +++ b/be/src/cloud/cloud_base_compaction.cpp @@ -24,6 +24,7 @@ #include "cloud/cloud_meta_mgr.h" #include "cloud/config.h" #include "common/config.h" +#include "common/metrics/doris_metrics.h" #include "core/value/vdatetime_value.h" #include "cpp/sync_point.h" #include "service/backend_options.h" diff --git a/be/src/cloud/cloud_cumulative_compaction.cpp b/be/src/cloud/cloud_cumulative_compaction.cpp index 8a89e6317733f8..073ca8471e39ba 100644 --- a/be/src/cloud/cloud_cumulative_compaction.cpp +++ b/be/src/cloud/cloud_cumulative_compaction.cpp @@ -24,6 +24,7 @@ #include "cloud/config.h" #include "common/config.h" #include "common/logging.h" +#include "common/metrics/doris_metrics.h" #include "common/status.h" #include "cpp/sync_point.h" #include "service/backend_options.h" diff --git a/be/src/cloud/cloud_full_compaction.cpp b/be/src/cloud/cloud_full_compaction.cpp index 8dfb3da433f94e..4f33c413e40329 100644 --- a/be/src/cloud/cloud_full_compaction.cpp +++ b/be/src/cloud/cloud_full_compaction.cpp @@ -25,6 +25,7 @@ #include "cloud/cloud_storage_engine.h" #include "cloud/config.h" #include "common/config.h" +#include "common/metrics/doris_metrics.h" #include "common/status.h" #include "core/column/column.h" #include "cpp/sync_point.h" diff --git a/be/src/cloud/cloud_tablet_mgr.cpp b/be/src/cloud/cloud_tablet_mgr.cpp index 3e979864138645..ca3004cf41bf1e 100644 --- a/be/src/cloud/cloud_tablet_mgr.cpp +++ b/be/src/cloud/cloud_tablet_mgr.cpp @@ -26,11 +26,12 @@ #include "cloud/cloud_storage_engine.h" #include "cloud/cloud_tablet.h" #include "cloud/config.h" +#include "common/metrics/doris_metrics.h" #include "common/status.h" +#include "cpp/lru_cache.h" #include "cpp/sync_point.h" #include "runtime/memory/cache_policy.h" #include "util/debug_points.h" -#include "util/lru_cache.h" #include "util/stack_util.h" namespace doris { diff --git a/be/src/cloud/cloud_txn_delete_bitmap_cache.h b/be/src/cloud/cloud_txn_delete_bitmap_cache.h index 7cf6c27ecd8491..b0a6ef67f703f5 100644 --- a/be/src/cloud/cloud_txn_delete_bitmap_cache.h +++ b/be/src/cloud/cloud_txn_delete_bitmap_cache.h @@ -19,13 +19,13 @@ #include +#include "cpp/lru_cache.h" #include "storage/olap_common.h" #include "storage/partial_update_info.h" #include "storage/rowset/rowset.h" #include "storage/tablet/tablet_meta.h" #include "storage/txn/txn_manager.h" #include "util/countdown_latch.h" -#include "util/lru_cache.h" namespace doris { diff --git a/be/src/exec/scan/scanner.h b/be/src/exec/scan/scanner.h index 14d1b9e734c3a7..778f6db4a1134b 100644 --- a/be/src/exec/scan/scanner.h +++ b/be/src/exec/scan/scanner.h @@ -23,6 +23,7 @@ #include #include +#include "common/metrics/doris_metrics.h" #include "common/status.h" #include "core/block/block.h" #include "runtime/exec_env.h" diff --git a/be/src/exprs/function/function_java_udf.cpp b/be/src/exprs/function/function_java_udf.cpp index 4a0aecff0862df..cecab83da7a05e 100644 --- a/be/src/exprs/function/function_java_udf.cpp +++ b/be/src/exprs/function/function_java_udf.cpp @@ -23,6 +23,7 @@ #include #include "common/exception.h" +#include "common/metrics/doris_metrics.h" #include "core/block/block.h" #include "format/jni/jni_data_bridge.h" #include "jni.h" diff --git a/be/src/load/channel/load_channel_mgr.h b/be/src/load/channel/load_channel_mgr.h index fdddbee040006d..b7e56088d391dc 100644 --- a/be/src/load/channel/load_channel_mgr.h +++ b/be/src/load/channel/load_channel_mgr.h @@ -30,13 +30,13 @@ #include "common/compiler_util.h" // IWYU pragma: keep #include "common/status.h" +#include "cpp/lru_cache.h" #include "load/channel/load_channel.h" #include "load/memtable/memtable_memory_limiter.h" #include "runtime/memory/lru_cache_policy.h" #include "runtime/memory/mem_tracker_limiter.h" #include "runtime/thread_context.h" #include "util/countdown_latch.h" -#include "util/lru_cache.h" #include "util/uid_util.h" namespace doris { diff --git a/be/src/load/channel/load_stream_mgr.cpp b/be/src/load/channel/load_stream_mgr.cpp index 80dfb306b41217..63564e53034f1e 100644 --- a/be/src/load/channel/load_stream_mgr.cpp +++ b/be/src/load/channel/load_stream_mgr.cpp @@ -19,6 +19,7 @@ #include +#include "cpp/lru_cache.h" #include "load/channel/load_channel.h" #include "load/channel/load_stream.h" #include "runtime/exec_env.h" @@ -26,7 +27,6 @@ #include "storage/rowset/rowset_meta.h" #include "storage/storage_engine.h" #include "storage/tablet/tablet_manager.h" -#include "util/lru_cache.h" #include "util/uid_util.h" namespace doris { diff --git a/be/src/runtime/memory/lru_cache_policy.h b/be/src/runtime/memory/lru_cache_policy.h index 02794bb0e248bf..aa97a7a906d5a5 100644 --- a/be/src/runtime/memory/lru_cache_policy.h +++ b/be/src/runtime/memory/lru_cache_policy.h @@ -22,11 +22,12 @@ #include #include "common/be_mock_util.h" +#include "cpp/lru_cache.h" #include "runtime/memory/cache_policy.h" #include "runtime/memory/lru_cache_value_base.h" #include "runtime/memory/mem_tracker_limiter.h" #include "runtime/thread_context.h" -#include "util/lru_cache.h" +#include "util/be_lru_cache_metrics.h" #include "util/time.h" namespace doris { @@ -40,9 +41,12 @@ class LRUCachePolicy : public CachePolicy { : CachePolicy(type, capacity, stale_sweep_time_s, enable_prune), _lru_cache_type(lru_cache_type) { if (check_capacity(capacity, num_shards)) { - _cache = std::shared_ptr( + auto cache = std::shared_ptr( new ShardedLRUCache(type_string(type), capacity, lru_cache_type, num_shards, element_count_capacity, is_lru_k)); + cache->set_metrics_recorder( + create_be_lru_cache_metrics_recorder(type_string(type), cache.get())); + _cache = std::move(cache); } else { _cache = std::make_shared(); } @@ -58,10 +62,13 @@ class LRUCachePolicy : public CachePolicy { : CachePolicy(type, capacity, stale_sweep_time_s, enable_prune), _lru_cache_type(lru_cache_type) { if (check_capacity(capacity, num_shards)) { - _cache = std::shared_ptr( + auto cache = std::shared_ptr( new ShardedLRUCache(type_string(type), capacity, lru_cache_type, num_shards, cache_value_time_extractor, cache_value_check_timestamp, element_count_capacity, is_lru_k)); + cache->set_metrics_recorder( + create_be_lru_cache_metrics_recorder(type_string(type), cache.get())); + _cache = std::move(cache); } else { _cache = std::make_shared(); } @@ -126,7 +133,7 @@ class LRUCachePolicy : public CachePolicy { ->set_tracking_bytes(tracking_bytes, _mem_tracker, value_tracking_bytes, _value_mem_tracker); } - return _cache->insert(key, value, charge, priority); + return _cache->insert(key, value, charge, priority, cache_value_deleter); } void for_each_entry(const std::function& visitor) { diff --git a/be/src/runtime/memory/memory_profile.cpp b/be/src/runtime/memory/memory_profile.cpp index 35f3ee372b4f57..b74180f913031f 100644 --- a/be/src/runtime/memory/memory_profile.cpp +++ b/be/src/runtime/memory/memory_profile.cpp @@ -18,6 +18,7 @@ #include "runtime/memory/memory_profile.h" #include "bvar/reducer.h" +#include "common/metrics/doris_metrics.h" #include "runtime/exec_env.h" #include "runtime/memory/global_memory_arbitrator.h" #include "runtime/memory/jemalloc_control.h" diff --git a/be/src/runtime/query_cache/query_cache.h b/be/src/runtime/query_cache/query_cache.h index 6fd8db2eaa29c6..437e502fc59b65 100644 --- a/be/src/runtime/query_cache/query_cache.h +++ b/be/src/runtime/query_cache/query_cache.h @@ -30,12 +30,12 @@ #include "common/config.h" #include "common/status.h" #include "core/block/block.h" +#include "cpp/lru_cache.h" #include "io/fs/file_system.h" #include "io/fs/path.h" #include "runtime/exec_env.h" #include "runtime/memory/lru_cache_policy.h" #include "runtime/memory/mem_tracker.h" -#include "util/lru_cache.h" #include "util/slice.h" #include "util/time.h" diff --git a/be/src/service/point_query_executor.cpp b/be/src/service/point_query_executor.cpp index 53dd2900052bcc..23453020030645 100644 --- a/be/src/service/point_query_executor.cpp +++ b/be/src/service/point_query_executor.cpp @@ -37,6 +37,7 @@ #include "common/status.h" #include "core/data_type/data_type_factory.hpp" #include "core/data_type_serde/data_type_serde.h" +#include "cpp/lru_cache.h" #include "exec/sink/writer/vmysql_result_writer.h" #include "exprs/vexpr.h" #include "exprs/vexpr_context.h" @@ -55,7 +56,6 @@ #include "storage/tablet/tablet_schema.h" #include "storage/utils.h" #include "util/jsonb/serialize.h" -#include "util/lru_cache.h" #include "util/simd/bits.h" #include "util/thrift_util.h" diff --git a/be/src/service/point_query_executor.h b/be/src/service/point_query_executor.h index fb507530591c4b..c677443619451c 100644 --- a/be/src/service/point_query_executor.h +++ b/be/src/service/point_query_executor.h @@ -41,6 +41,7 @@ #include "common/status.h" #include "core/block/block.h" #include "core/data_type_serde/data_type_serde.h" +#include "cpp/lru_cache.h" #include "exprs/vexpr_fwd.h" #include "runtime/descriptors.h" #include "runtime/exec_env.h" @@ -49,7 +50,6 @@ #include "storage/rowset/rowset.h" #include "storage/tablet/tablet.h" #include "storage/utils.h" -#include "util/lru_cache.h" #include "util/mysql_global.h" #include "util/slice.h" diff --git a/be/src/storage/cache/page_cache.h b/be/src/storage/cache/page_cache.h index 1cc98c5cb49997..42732006b8171a 100644 --- a/be/src/storage/cache/page_cache.h +++ b/be/src/storage/cache/page_cache.h @@ -28,9 +28,9 @@ #include "core/allocator.h" #include "core/allocator_fwd.h" +#include "cpp/lru_cache.h" #include "runtime/memory/lru_cache_policy.h" #include "runtime/memory/mem_tracker_limiter.h" -#include "util/lru_cache.h" #include "util/slice.h" namespace doris { diff --git a/be/src/storage/file_header.h b/be/src/storage/file_header.h index f2f019a0279cab..aca0c9a6b0e71c 100644 --- a/be/src/storage/file_header.h +++ b/be/src/storage/file_header.h @@ -24,6 +24,7 @@ #include #include +#include "cpp/lru_cache.h" #include "io/fs/file_reader.h" #include "io/fs/file_writer.h" #include "io/fs/local_file_system.h" @@ -31,7 +32,6 @@ #include "storage/olap_define.h" #include "storage/utils.h" #include "util/debug_util.h" -#include "util/lru_cache.h" namespace doris { diff --git a/be/src/storage/index/inverted/inverted_index_cache.h b/be/src/storage/index/inverted/inverted_index_cache.h index 0e33fe747b3523..a96a4d5c32d45f 100644 --- a/be/src/storage/index/inverted/inverted_index_cache.h +++ b/be/src/storage/index/inverted/inverted_index_cache.h @@ -29,13 +29,13 @@ #include "common/config.h" #include "common/status.h" +#include "cpp/lru_cache.h" #include "io/fs/file_system.h" #include "io/fs/path.h" #include "runtime/exec_env.h" #include "runtime/memory/lru_cache_policy.h" #include "runtime/memory/mem_tracker.h" #include "storage/index/inverted/inverted_index_searcher.h" -#include "util/lru_cache.h" #include "util/slice.h" #include "util/time.h" diff --git a/be/src/storage/rowset/rowset_meta.cpp b/be/src/storage/rowset/rowset_meta.cpp index 4ba3163f5b5e88..285829a0912489 100644 --- a/be/src/storage/rowset/rowset_meta.cpp +++ b/be/src/storage/rowset/rowset_meta.cpp @@ -26,6 +26,7 @@ #include "cloud/cloud_storage_engine.h" #include "common/logging.h" #include "common/status.h" +#include "cpp/lru_cache.h" #include "cpp/sync_point.h" #include "exec/common/variant_util.h" #include "google/protobuf/util/message_differencer.h" @@ -44,7 +45,6 @@ #include "storage/tablet/tablet_fwd.h" #include "storage/tablet/tablet_schema.h" #include "storage/tablet/tablet_schema_cache.h" -#include "util/lru_cache.h" namespace doris { diff --git a/be/src/storage/segment/condition_cache.h b/be/src/storage/segment/condition_cache.h index 511b9c56abac5e..a882851b6827a0 100644 --- a/be/src/storage/segment/condition_cache.h +++ b/be/src/storage/segment/condition_cache.h @@ -29,12 +29,12 @@ #include "common/config.h" #include "common/status.h" +#include "cpp/lru_cache.h" #include "io/fs/file_system.h" #include "io/fs/path.h" #include "runtime/exec_env.h" #include "runtime/memory/lru_cache_policy.h" #include "runtime/memory/mem_tracker.h" -#include "util/lru_cache.h" #include "util/slice.h" #include "util/time.h" diff --git a/be/src/storage/segment/segment_loader.h b/be/src/storage/segment/segment_loader.h index 1384c6b36195f5..5f9bb03e9d0633 100644 --- a/be/src/storage/segment/segment_loader.h +++ b/be/src/storage/segment/segment_loader.h @@ -31,10 +31,10 @@ #include "common/cast_set.h" #include "common/status.h" +#include "cpp/lru_cache.h" #include "runtime/memory/lru_cache_policy.h" #include "storage/olap_common.h" // for rowset id #include "storage/segment/segment.h" -#include "util/lru_cache.h" #include "util/time.h" namespace doris { diff --git a/be/src/storage/tablet/tablet_meta.cpp b/be/src/storage/tablet/tablet_meta.cpp index 68a4720dece643..382a2af0ab92db 100644 --- a/be/src/storage/tablet/tablet_meta.cpp +++ b/be/src/storage/tablet/tablet_meta.cpp @@ -38,6 +38,7 @@ #include "cloud/cloud_storage_engine.h" #include "cloud/config.h" #include "common/config.h" +#include "cpp/lru_cache.h" #include "io/fs/file_writer.h" #include "io/fs/local_file_system.h" #include "storage/data_dir.h" @@ -51,7 +52,6 @@ #include "storage/tablet/tablet_schema_cache.h" #include "storage/utils.h" #include "util/debug_points.h" -#include "util/lru_cache.h" #include "util/mem_info.h" #include "util/parse_util.h" #include "util/string_util.h" diff --git a/be/src/storage/tablet/tablet_meta.h b/be/src/storage/tablet/tablet_meta.h index 0e52bd8429bb0b..6aff24b7b215e7 100644 --- a/be/src/storage/tablet/tablet_meta.h +++ b/be/src/storage/tablet/tablet_meta.h @@ -40,6 +40,7 @@ #include "common/logging.h" #include "common/status.h" +#include "cpp/lru_cache.h" #include "io/fs/file_system.h" #include "runtime/memory/lru_cache_policy.h" #include "storage/binlog_config.h" @@ -47,7 +48,6 @@ #include "storage/olap_common.h" #include "storage/rowset/rowset_meta.h" #include "storage/tablet/tablet_schema.h" -#include "util/lru_cache.h" #include "util/uid_util.h" namespace json2pb { diff --git a/be/src/util/be_lru_cache_metrics.cpp b/be/src/util/be_lru_cache_metrics.cpp new file mode 100644 index 00000000000000..d416ae4e331902 --- /dev/null +++ b/be/src/util/be_lru_cache_metrics.cpp @@ -0,0 +1,115 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#include "util/be_lru_cache_metrics.h" + +#include + +#include +#include +#include +#include + +#include "common/metrics/doris_metrics.h" +#include "common/metrics/metrics.h" + +namespace doris { + +DEFINE_GAUGE_METRIC_PROTOTYPE_2ARG(cache_capacity, MetricUnit::BYTES); +DEFINE_GAUGE_METRIC_PROTOTYPE_2ARG(cache_usage, MetricUnit::BYTES); +DEFINE_GAUGE_METRIC_PROTOTYPE_2ARG(cache_element_count, MetricUnit::NOUNIT); +DEFINE_GAUGE_METRIC_PROTOTYPE_2ARG(cache_usage_ratio, MetricUnit::NOUNIT); +DEFINE_COUNTER_METRIC_PROTOTYPE_2ARG(cache_lookup_count, MetricUnit::OPERATIONS); +DEFINE_COUNTER_METRIC_PROTOTYPE_2ARG(cache_hit_count, MetricUnit::OPERATIONS); +DEFINE_COUNTER_METRIC_PROTOTYPE_2ARG(cache_miss_count, MetricUnit::OPERATIONS); +DEFINE_COUNTER_METRIC_PROTOTYPE_2ARG(cache_stampede_count, MetricUnit::OPERATIONS); +DEFINE_GAUGE_METRIC_PROTOTYPE_2ARG(cache_hit_ratio, MetricUnit::NOUNIT); + +class BELRUCacheMetricsRecorder final : public ShardedLRUCache::MetricsRecorder { +public: + BELRUCacheMetricsRecorder(std::string name, const ShardedLRUCache* cache) + : _name(std::move(name)), _cache(cache) { + _entity = DorisMetrics::instance()->metric_registry()->register_entity( + std::string("lru_cache:") + _name, {{"name", _name}}); + _entity->register_hook(_name, std::bind(&BELRUCacheMetricsRecorder::_update_metrics, this)); + INT_GAUGE_METRIC_REGISTER(_entity, cache_capacity); + INT_GAUGE_METRIC_REGISTER(_entity, cache_usage); + INT_GAUGE_METRIC_REGISTER(_entity, cache_element_count); + DOUBLE_GAUGE_METRIC_REGISTER(_entity, cache_usage_ratio); + INT_COUNTER_METRIC_REGISTER(_entity, cache_lookup_count); + INT_COUNTER_METRIC_REGISTER(_entity, cache_hit_count); + INT_COUNTER_METRIC_REGISTER(_entity, cache_stampede_count); + INT_COUNTER_METRIC_REGISTER(_entity, cache_miss_count); + DOUBLE_GAUGE_METRIC_REGISTER(_entity, cache_hit_ratio); + + _hit_count_bvar = std::make_unique>("doris_cache", _name); + _hit_count_per_second = std::make_unique>>( + "doris_cache", _name + "_persecond", _hit_count_bvar.get(), 60); + _lookup_count_bvar = std::make_unique>("doris_cache", _name); + _lookup_count_per_second = std::make_unique>>( + "doris_cache", _name + "_persecond", _lookup_count_bvar.get(), 60); + } + + ~BELRUCacheMetricsRecorder() override { + _entity->deregister_hook(_name); + DorisMetrics::instance()->metric_registry()->deregister_entity(_entity); + } + +private: + void _update_metrics() const { + ShardedLRUCache::MetricsSnapshot snapshot = _cache->get_metrics_snapshot(); + cache_capacity->set_value(snapshot.capacity); + cache_usage->set_value(snapshot.usage); + cache_element_count->set_value(snapshot.element_count); + cache_lookup_count->set_value(snapshot.lookup_count); + cache_hit_count->set_value(snapshot.hit_count); + cache_miss_count->set_value(snapshot.miss_count); + cache_stampede_count->set_value(snapshot.stampede_count); + cache_usage_ratio->set_value(snapshot.capacity == 0 + ? 0 + : static_cast(snapshot.usage) / + static_cast(snapshot.capacity)); + cache_hit_ratio->set_value(snapshot.lookup_count == 0 + ? 0 + : static_cast(snapshot.hit_count) / + static_cast(snapshot.lookup_count)); + } + + const std::string _name; + const ShardedLRUCache* _cache; + std::shared_ptr _entity; + IntGauge* cache_capacity = nullptr; + IntGauge* cache_usage = nullptr; + IntGauge* cache_element_count = nullptr; + DoubleGauge* cache_usage_ratio = nullptr; + IntCounter* cache_lookup_count = nullptr; + IntCounter* cache_hit_count = nullptr; + IntCounter* cache_miss_count = nullptr; + IntCounter* cache_stampede_count = nullptr; + DoubleGauge* cache_hit_ratio = nullptr; + std::unique_ptr> _hit_count_bvar; + std::unique_ptr>> _hit_count_per_second; + std::unique_ptr> _lookup_count_bvar; + std::unique_ptr>> _lookup_count_per_second; +}; + +std::unique_ptr create_be_lru_cache_metrics_recorder( + std::string name, const ShardedLRUCache* cache) { + return std::make_unique(std::move(name), cache); +} + +} // namespace doris diff --git a/be/src/util/be_lru_cache_metrics.h b/be/src/util/be_lru_cache_metrics.h new file mode 100644 index 00000000000000..4cd7defe4a354e --- /dev/null +++ b/be/src/util/be_lru_cache_metrics.h @@ -0,0 +1,30 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#pragma once + +#include +#include + +#include "cpp/lru_cache.h" + +namespace doris { + +std::unique_ptr create_be_lru_cache_metrics_recorder( + std::string name, const ShardedLRUCache* cache); + +} // namespace doris diff --git a/be/src/util/obj_lru_cache.h b/be/src/util/obj_lru_cache.h index ab953e01c553ed..0a9a0c4240e6dd 100644 --- a/be/src/util/obj_lru_cache.h +++ b/be/src/util/obj_lru_cache.h @@ -17,8 +17,8 @@ #pragma once +#include "cpp/lru_cache.h" #include "runtime/memory/lru_cache_policy.h" -#include "util/lru_cache.h" namespace doris { diff --git a/be/test/storage/cache/lru_cache_test.cpp b/be/test/storage/cache/lru_cache_test.cpp index a31d532a5a4686..ae469661688ea6 100644 --- a/be/test/storage/cache/lru_cache_test.cpp +++ b/be/test/storage/cache/lru_cache_test.cpp @@ -15,7 +15,7 @@ // specific language governing permissions and limitations // under the License. -#include "util/lru_cache.h" +#include "cpp/lru_cache.h" #include #include @@ -178,14 +178,24 @@ static void insert_LRUCache(LRUCache& cache, const CacheKey& key, int value, CachePriority priority) { uint32_t hash = key.hash(key.data(), key.size(), 0); auto* cache_value = new CacheTest::CacheValue(EncodeValue(value)); - cache.release(cache.insert(key, hash, cache_value, value, priority)); + cache.release(cache.insert(key, hash, cache_value, value, priority, + cache_value_deleter)); } static void insert_number_LRUCache(LRUCache& cache, const CacheKey& key, int value, int charge, CachePriority priority) { uint32_t hash = key.hash(key.data(), key.size(), 0); auto* cache_value = new CacheTest::CacheValue(EncodeValue(value)); - cache.release(cache.insert(key, hash, cache_value, charge, priority)); + cache.release(cache.insert(key, hash, cache_value, charge, priority, + cache_value_deleter)); +} + +static size_t lru_handle_usage(const CacheKey& key) { + return sizeof(LRUHandle) - 1 + key.size(); +} + +static size_t lru_entry_usage(const CacheKey& key, size_t charge) { + return lru_handle_usage(key) + charge; } // https://stackoverflow.com/questions/42756443/undefined-reference-with-gtest @@ -302,40 +312,47 @@ TEST_F(CacheTest, Usage) { cache.set_capacity(1040); // The lru usage is handle_size + charge. - // handle_size = sizeof(handle) - 1 + key size = 96 - 1 + 3 = 98 CacheKey key1("100"); + const size_t usage1 = lru_entry_usage(key1, 100); insert_LRUCache(cache, key1, 100, CachePriority::NORMAL); - ASSERT_EQ(198, cache.get_usage()); // 100 + 98 + ASSERT_EQ(usage1, cache.get_usage()); CacheKey key2("200"); + const size_t usage2 = lru_entry_usage(key2, 200); insert_LRUCache(cache, key2, 200, CachePriority::DURABLE); - ASSERT_EQ(496, cache.get_usage()); // 198 + 298(d), d = DURABLE + ASSERT_EQ(usage1 + usage2, cache.get_usage()); CacheKey key3("300"); + const size_t usage3 = lru_entry_usage(key3, 300); insert_LRUCache(cache, key3, 300, CachePriority::NORMAL); - ASSERT_EQ(894, cache.get_usage()); // 198 + 298(d) + 398 + ASSERT_EQ(usage1 + usage2 + usage3, cache.get_usage()); CacheKey key4("400"); + const size_t usage4 = lru_entry_usage(key4, 400); insert_LRUCache(cache, key4, 400, CachePriority::NORMAL); - ASSERT_EQ(796, cache.get_usage()); // 298(d) + 498, evict 198 398 + ASSERT_EQ(usage2 + usage4, cache.get_usage()); CacheKey key5("500"); + const size_t usage5 = lru_entry_usage(key5, 500); insert_LRUCache(cache, key5, 500, CachePriority::NORMAL); - ASSERT_EQ(896, cache.get_usage()); // 298(d) + 598, evict 498 + ASSERT_EQ(usage2 + usage5, cache.get_usage()); CacheKey key6("600"); + const size_t usage6 = lru_entry_usage(key6, 600); insert_LRUCache(cache, key6, 600, CachePriority::NORMAL); - ASSERT_EQ(996, cache.get_usage()); // 298(d) + 698, evict 598 + ASSERT_EQ(usage2 + usage6, cache.get_usage()); CacheKey key7("950"); + const size_t usage7 = lru_entry_usage(key7, 950); insert_LRUCache(cache, key7, 950, CachePriority::DURABLE); - ASSERT_EQ( - 0, - cache.get_usage()); // evict 298 698, because 950 + 98 > 1040, data was freed when handle release. + ASSERT_EQ(0, + cache.get_usage()); // evict old entries, because usage7 exceeds cache capacity. + ASSERT_GT(usage7, cache.get_capacity()); CacheKey key8("900"); + const size_t usage8 = lru_entry_usage(key8, 900); insert_LRUCache(cache, key8, 900, CachePriority::NORMAL); - ASSERT_EQ(998, cache.get_usage()); // 900 + 98 < 1050 + ASSERT_EQ(usage8, cache.get_usage()); } TEST_F(CacheTest, UsageLRUK) { @@ -343,60 +360,54 @@ TEST_F(CacheTest, UsageLRUK) { cache.set_capacity(1050); // The lru usage is handle_size + charge. - // handle_size = sizeof(handle) - 1 + key size = 96 - 1 + 3 = 98 CacheKey key1("100"); + const size_t usage1 = lru_entry_usage(key1, 100); insert_LRUCache(cache, key1, 100, CachePriority::NORMAL); - ASSERT_EQ(198, cache.get_usage()); // 100 + 98 + ASSERT_EQ(usage1, cache.get_usage()); CacheKey key2("200"); + const size_t usage2 = lru_entry_usage(key2, 200); insert_LRUCache(cache, key2, 200, CachePriority::DURABLE); - ASSERT_EQ(496, cache.get_usage()); // 198 + 298(d), d = DURABLE + ASSERT_EQ(usage1 + usage2, cache.get_usage()); CacheKey key3("300"); + const size_t usage3 = lru_entry_usage(key3, 300); insert_LRUCache(cache, key3, 300, CachePriority::NORMAL); - ASSERT_EQ(894, cache.get_usage()); // 198 + 298(d) + 398 + ASSERT_EQ(usage1 + usage2 + usage3, cache.get_usage()); CacheKey key4("400"); + const size_t usage4 = lru_entry_usage(key4, 400); insert_LRUCache(cache, key4, 400, CachePriority::NORMAL); - // Data cache is full, not insert, visits lru cache not exist key=498(400 + 98) and insert it. - ASSERT_EQ(894, cache.get_usage()); + // Data cache is full, so the first insert only records the entry in the visits list. + ASSERT_EQ(usage1 + usage2 + usage3, cache.get_usage()); insert_LRUCache(cache, key4, 400, CachePriority::NORMAL); - // Data cache 298(d) + 498, evict 198 398. visits lru cache exist key=498 - // and erase from visits lru cache, insert to Data cache. - ASSERT_EQ(796, cache.get_usage()); + // The second insert promotes the entry into data cache and evicts the normal entries. + ASSERT_EQ(usage2 + usage4, cache.get_usage()); CacheKey key5("500"); + const size_t usage5 = lru_entry_usage(key5, 500); insert_LRUCache(cache, key5, 500, CachePriority::NORMAL); - // Data cache is full, not insert, visits lru cache not exist key=598(500 + 98) and insert it. - ASSERT_EQ(796, cache.get_usage()); + ASSERT_EQ(usage2 + usage4, cache.get_usage()); CacheKey key6("600"); insert_LRUCache(cache, key6, 600, CachePriority::NORMAL); - // Data cache is full, not insert, visits lru cache not exist key=698(600 + 98) and insert it, - // visits lru cache is full, evict key=598 from visits lru cache. - ASSERT_EQ(796, cache.get_usage()); + ASSERT_EQ(usage2 + usage4, cache.get_usage()); insert_LRUCache(cache, key5, 500, CachePriority::NORMAL); - // Data cache is full, not insert, visits lru cache not exist key=598 and insert it. - // visits lru cache is full, evict key=698 from visits lru cache. - ASSERT_EQ(796, cache.get_usage()); + ASSERT_EQ(usage2 + usage4, cache.get_usage()); insert_LRUCache(cache, key5, 500, CachePriority::NORMAL); - // Data cache 298(d) + 598, evict 498. visits lru cache exist key=598 - // and erase from visits lru cache, insert to Data cache. - ASSERT_EQ(896, cache.get_usage()); + ASSERT_EQ(usage2 + usage5, cache.get_usage()); CacheKey key7("980"); + const size_t usage7 = lru_entry_usage(key7, 980); insert_LRUCache(cache, key7, 980, CachePriority::DURABLE); - // Data cache is full, not insert, visits lru cache not exist key=1078(980 + 98) - // but 1078 > capacity(1050), not insert visits lru cache. - ASSERT_EQ(896, cache.get_usage()); + ASSERT_EQ(usage2 + usage5, cache.get_usage()); + ASSERT_GT(usage7, cache.get_capacity()); insert_LRUCache(cache, key7, 980, CachePriority::DURABLE); - // Ssame as above, data cache is full, not insert, visits lru cache not exist key=1078(980 + 98) - // but 1078 > capacity(1050), not insert visits lru cache. - ASSERT_EQ(896, cache.get_usage()); + ASSERT_EQ(usage2 + usage5, cache.get_usage()); } TEST_F(CacheTest, Prune) { diff --git a/be/test/storage/segment/inverted_index_searcher_cache_test.cpp b/be/test/storage/segment/inverted_index_searcher_cache_test.cpp index 222d016bc46b7f..ce86d5db0ff4b7 100644 --- a/be/test/storage/segment/inverted_index_searcher_cache_test.cpp +++ b/be/test/storage/segment/inverted_index_searcher_cache_test.cpp @@ -26,13 +26,13 @@ #include #include "common/status.h" +#include "cpp/lru_cache.h" #include "gtest/gtest_pred_impl.h" #include "io/fs/local_file_system.h" #include "storage/index/inverted/inverted_index_cache.h" #include "storage/index/inverted/inverted_index_desc.h" #include "storage/index/inverted/inverted_index_reader.h" #include "storage/olap_common.h" -#include "util/lru_cache.h" #include "util/time.h" namespace doris { diff --git a/be/test/util/lru_cache_util_test.cpp b/be/test/util/lru_cache_util_test.cpp index e43c1743160dd0..3c1509cb00336d 100644 --- a/be/test/util/lru_cache_util_test.cpp +++ b/be/test/util/lru_cache_util_test.cpp @@ -20,8 +20,8 @@ #include +#include "cpp/lru_cache.hpp" #include "gtest/gtest_pred_impl.h" -#include "util/lru_cache.hpp" namespace doris { diff --git a/cloud/src/common/config.h b/cloud/src/common/config.h index 770ee9ca13a4f5..0ed5ba7d05a929 100644 --- a/cloud/src/common/config.h +++ b/cloud/src/common/config.h @@ -158,6 +158,12 @@ CONF_mBool(enable_mvcc_meta_check, "false"); CONF_mInt64(mow_job_key_check_expiration_diff_seconds, "600"); // 10min +// KV cache config +CONF_mBool(enable_tablet_index_cache, "true"); +CONF_mInt64(ms_tablet_index_cache_capacity, "10000"); +CONF_mInt64(recycler_tablet_index_cache_capacity, "10000"); +CONF_mInt64(tablet_index_cache_ttl_seconds, "0"); // 0 means no TTL + CONF_String(test_s3_ak, ""); CONF_String(test_s3_sk, ""); CONF_String(test_s3_endpoint, ""); diff --git a/cloud/src/common/kv_cache.h b/cloud/src/common/kv_cache.h new file mode 100644 index 00000000000000..69259208cd00dc --- /dev/null +++ b/cloud/src/common/kv_cache.h @@ -0,0 +1,138 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#pragma once + +#include + +#include +#include +#include +#include +#include +#include +#include +#include +#include + +#include "cpp/lru_cache.h" + +namespace doris::cloud { + +namespace detail { + +template +inline constexpr bool dependent_false_v = false; + +template +void encode_key_component(std::string* encoded_key, const T& value) { + if constexpr (std::is_same_v, std::string>) { + uint64_t size = value.size(); + encoded_key->append(reinterpret_cast(&size), sizeof(size)); + encoded_key->append(value.data(), value.size()); + } else if constexpr (std::is_same_v, std::string_view>) { + uint64_t size = value.size(); + encoded_key->append(reinterpret_cast(&size), sizeof(size)); + encoded_key->append(value.data(), value.size()); + } else if constexpr (std::is_enum_v) { + using UnderlyingType = std::underlying_type_t; + auto stored = static_cast(value); + encoded_key->append(reinterpret_cast(&stored), sizeof(stored)); + } else if constexpr (std::is_integral_v) { + encoded_key->append(reinterpret_cast(&value), sizeof(value)); + } else { + static_assert(dependent_false_v, "unsupported key component type"); + } +} + +template +std::string encode_key(const KeyTuple& key) { + std::string encoded_key; + std::apply( + [&encoded_key](const auto&... values) { + (encode_key_component(&encoded_key, values), ...); + }, + key); + return encoded_key; +} + +inline int64_t now_seconds() { + return std::chrono::duration_cast( + std::chrono::steady_clock::now().time_since_epoch()) + .count(); +} + +} // namespace detail + +template +class KvCache { +public: + explicit KvCache(size_t capacity, int64_t ttl_seconds = 0, std::string name = "cloud_kv_cache") + : _ttl_seconds(ttl_seconds), + _cache(std::make_unique( + std::move(name), capacity, LRUCacheType::NUMBER, NumShards, 0, false)) {} + + bool get(const KeyTuple& key, ValuePB* value) { + std::string encoded_key = detail::encode_key(key); + Cache::Handle* handle = _cache->lookup(CacheKey(encoded_key)); + if (handle == nullptr) { + return false; + } + + auto* entry = static_cast(_cache->value(handle)); + if (_ttl_seconds > 0 && entry->expire_time < detail::now_seconds()) { + _cache->release(handle); + return false; + } + *value = entry->value; + _cache->release(handle); + return true; + } + + void put(const KeyTuple& key, const ValuePB& value) { + std::string encoded_key = detail::encode_key(key); + auto* entry = new CacheEntry; + entry->value = value; + entry->expire_time = _ttl_seconds > 0 ? detail::now_seconds() + _ttl_seconds : 0; + Cache::Handle* handle = + _cache->insert(CacheKey(encoded_key), entry, 1, CachePriority::NORMAL, + cache_value_deleter); + _cache->release(handle); + } + + void invalidate(const KeyTuple& key) { + std::string encoded_key = detail::encode_key(key); + _cache->erase(CacheKey(encoded_key)); + } + + void clear() { _cache->prune(); } + + size_t size() const { return _cache->get_element_count(); } + +private: + struct CacheEntry { + ValuePB value; + int64_t expire_time = 0; + }; + + const int64_t _ttl_seconds; + std::unique_ptr _cache; +}; + +using TabletIndexCache = KvCache, TabletIndexPB>; + +} // namespace doris::cloud diff --git a/cloud/src/meta-service/meta_service.cpp b/cloud/src/meta-service/meta_service.cpp index c6f423381a9f7c..1f94f41df98a5e 100644 --- a/cloud/src/meta-service/meta_service.cpp +++ b/cloud/src/meta-service/meta_service.cpp @@ -52,6 +52,7 @@ #include "common/bvars.h" #include "common/config.h" #include "common/encryption_util.h" +#include "common/kv_cache.h" #include "common/logging.h" #include "common/stats.h" #include "common/stopwatch.h" @@ -78,6 +79,20 @@ using namespace std::chrono; namespace doris::cloud { +namespace { + +TabletIndexCache* ms_cache_manager() { + if (!config::enable_tablet_index_cache) { + return nullptr; + } + static TabletIndexCache cache(config::ms_tablet_index_cache_capacity, + config::tablet_index_cache_ttl_seconds, + "meta_service_tablet_index_cache"); + return &cache; +} + +} // namespace + MetaServiceImpl::MetaServiceImpl(std::shared_ptr txn_kv, std::shared_ptr resource_mgr, std::shared_ptr rate_limiter, @@ -189,8 +204,35 @@ bool is_dropped_tablet(Transaction* txn, const std::string& instance_id, int64_t return false; } +bool get_tablet_idx_from_ms_cache(std::string_view instance_id, int64_t tablet_id, + TabletIndexPB* tablet_idx) { + TabletIndexCache* cache = ms_cache_manager(); + if (cache == nullptr) { + return false; + } + LOG(INFO) << "get from cache"; + auto cache_key = std::make_tuple(std::string(instance_id), tablet_id); + return cache->get(cache_key, tablet_idx); +} + +void put_tablet_idx_to_ms_cache(std::string_view instance_id, int64_t tablet_id, + const TabletIndexPB& tablet_idx) { + TabletIndexCache* cache = ms_cache_manager(); + if (cache == nullptr || !tablet_idx.has_db_id()) { + return; + } + auto cache_key = std::make_tuple(std::string(instance_id), tablet_id); + cache->put(cache_key, tablet_idx); +} + void get_tablet_idx(MetaServiceCode& code, std::string& msg, Transaction* txn, const std::string& instance_id, int64_t tablet_id, TabletIndexPB& tablet_idx) { + // Cache lookup + if (get_tablet_idx_from_ms_cache(instance_id, tablet_id, &tablet_idx)) { + return; + } + + // FDB read std::string key, val; meta_tablet_idx_key({instance_id, tablet_id}, &key); TxnErrorCode err = txn->get(key, &val); @@ -216,6 +258,9 @@ void get_tablet_idx(MetaServiceCode& code, std::string& msg, Transaction* txn, << " idx_pb_tablet_id=" << tablet_idx.tablet_id() << " key=" << hex(key); return; } + + // Cache put + put_tablet_idx_to_ms_cache(instance_id, tablet_id, tablet_idx); } void MetaServiceImpl::get_version(::google::protobuf::RpcController* controller, diff --git a/cloud/src/meta-service/meta_service_helper.h b/cloud/src/meta-service/meta_service_helper.h index 2f9ebd4fcc22f2..fac8dfe66d3b90 100644 --- a/cloud/src/meta-service/meta_service_helper.h +++ b/cloud/src/meta-service/meta_service_helper.h @@ -363,6 +363,12 @@ int decrypt_instance_info(InstanceInfoPB& instance, const std::string& instance_ void notify_refresh_instance(std::shared_ptr txn_kv, const std::string& instance_id, KVStats* stats, bool include_self = false); +bool get_tablet_idx_from_ms_cache(std::string_view instance_id, int64_t tablet_id, + TabletIndexPB* tablet_idx); + +void put_tablet_idx_to_ms_cache(std::string_view instance_id, int64_t tablet_id, + const TabletIndexPB& tablet_idx); + void get_tablet_idx(MetaServiceCode& code, std::string& msg, Transaction* txn, const std::string& instance_id, int64_t tablet_id, TabletIndexPB& tablet_idx); diff --git a/cloud/src/meta-service/meta_service_txn.cpp b/cloud/src/meta-service/meta_service_txn.cpp index 6be303a4c00317..b97cfcacc0d98d 100644 --- a/cloud/src/meta-service/meta_service_txn.cpp +++ b/cloud/src/meta-service/meta_service_txn.cpp @@ -1405,16 +1405,27 @@ std::pair get_tablet_indexes( Transaction* txn, std::unordered_map* tablet_indexes, std::string_view instance_id, const std::vector& tablet_ids, bool snapshot = false) { - std::vector tablet_idx_keys; - std::vector> tablet_idx_values; - tablet_idx_keys.reserve(tablet_ids.size()); - tablet_idx_values.resize(tablet_idx_keys.size()); + std::vector missed_tablet_ids; + std::vector missed_tablet_idx_keys; + missed_tablet_ids.reserve(tablet_ids.size()); + missed_tablet_idx_keys.reserve(tablet_ids.size()); for (int64_t tablet_id : tablet_ids) { - tablet_idx_keys.push_back(meta_tablet_idx_key({instance_id, tablet_id})); + TabletIndexPB tablet_index; + if (get_tablet_idx_from_ms_cache(instance_id, tablet_id, &tablet_index)) { + tablet_indexes->emplace(tablet_id, std::move(tablet_index)); + continue; + } + missed_tablet_ids.push_back(tablet_id); + missed_tablet_idx_keys.push_back(meta_tablet_idx_key({instance_id, tablet_id})); + } + + if (missed_tablet_ids.empty()) { + return {MetaServiceCode::OK, ""}; } - TxnErrorCode err = txn->batch_get(&tablet_idx_values, tablet_idx_keys, + std::vector> missed_tablet_idx_values; + TxnErrorCode err = txn->batch_get(&missed_tablet_idx_values, missed_tablet_idx_keys, Transaction::BatchGetOptions(snapshot)); if (err != TxnErrorCode::TXN_OK) { auto msg = fmt::format("failed to get tablet table index ids, err={}", err); @@ -1422,26 +1433,27 @@ std::pair get_tablet_indexes( return {cast_as(err), msg}; } - size_t total_tablets = tablet_idx_values.size(); + size_t total_tablets = missed_tablet_idx_values.size(); for (size_t i = 0; i < total_tablets; i++) { - int64_t tablet_id = tablet_ids[i]; - if (!tablet_idx_values[i].has_value()) [[unlikely]] { + int64_t tablet_id = missed_tablet_ids[i]; + if (!missed_tablet_idx_values[i].has_value()) [[unlikely]] { // The value must existed auto msg = fmt::format( "failed to get tablet table index ids, err=not found tablet_id={} ", tablet_id); - LOG_WARNING(msg).tag("err", err).tag("key", hex(tablet_idx_keys[i])); + LOG_WARNING(msg).tag("err", err).tag("key", hex(missed_tablet_idx_keys[i])); return {MetaServiceCode::KV_TXN_GET_ERR, msg}; } TabletIndexPB tablet_index; - if (!tablet_index.ParseFromString(tablet_idx_values[i].value())) [[unlikely]] { + if (!tablet_index.ParseFromString(missed_tablet_idx_values[i].value())) [[unlikely]] { auto msg = fmt::format("malformed tablet index value tablet_id={} snapshot={}", tablet_id, snapshot); - LOG_WARNING(msg).tag("key", hex(tablet_idx_keys[i])); + LOG_WARNING(msg).tag("key", hex(missed_tablet_idx_keys[i])); return {MetaServiceCode::PROTOBUF_PARSE_ERR, msg}; } VLOG_DEBUG << "tablet_id:" << tablet_id << " value:" << tablet_index.ShortDebugString(); + put_tablet_idx_to_ms_cache(instance_id, tablet_id, tablet_index); tablet_indexes->emplace(tablet_id, std::move(tablet_index)); } diff --git a/cloud/src/meta-service/txn_lazy_committer.cpp b/cloud/src/meta-service/txn_lazy_committer.cpp index 34d764a1324b30..94425ff2437769 100644 --- a/cloud/src/meta-service/txn_lazy_committer.cpp +++ b/cloud/src/meta-service/txn_lazy_committer.cpp @@ -161,6 +161,9 @@ static std::pair get_tablet_index( if (!is_versioned_read) { std::string tablet_idx_key = meta_tablet_idx_key({instance_id, tablet_id}); std::string tablet_idx_val; + if (get_tablet_idx_from_ms_cache(instance_id, tablet_id, tablet_idx)) { + return {code, msg}; + } TxnErrorCode err = txn->get(tablet_idx_key, &tablet_idx_val, true); if (TxnErrorCode::TXN_OK != err) { code = err == TxnErrorCode::TXN_KEY_NOT_FOUND ? MetaServiceCode::TXN_ID_NOT_FOUND @@ -179,6 +182,7 @@ static std::pair get_tablet_index( msg = ss.str(); return {code, msg}; } + put_tablet_idx_to_ms_cache(instance_id, tablet_id, *tablet_idx); } else { TxnErrorCode err = meta_reader.get_tablet_index(txn, tablet_id, tablet_idx); if (err != TxnErrorCode::TXN_OK) { diff --git a/cloud/src/recycler/recycler.cpp b/cloud/src/recycler/recycler.cpp index 00cf90456229a4..0baf4bfe064342 100644 --- a/cloud/src/recycler/recycler.cpp +++ b/cloud/src/recycler/recycler.cpp @@ -45,6 +45,7 @@ #include #include "common/defer.h" +#include "common/kv_cache.h" #include "common/stopwatch.h" #include "meta-service/meta_service.h" #include "meta-service/meta_service_helper.h" @@ -407,6 +408,7 @@ int Recycler::start(brpc::Server* server) { instance_filter_.reset(config::recycle_whitelist, config::recycle_blacklist); g_bvar_recycler_task_max_concurrency.set_value(config::recycle_concurrency); S3Environment::getInstance(); + recycler_cache_manager(); if (config::enable_checker) { checker_ = std::make_unique(txn_kv_); @@ -2204,19 +2206,22 @@ bool check_lazy_txn_finished(std::shared_ptr txn_kv, const std::string in std::string tablet_idx_key = meta_tablet_idx_key({instance_id, tablet_id}); std::string tablet_idx_val; - err = txn->get(tablet_idx_key, &tablet_idx_val); - if (TxnErrorCode::TXN_OK != err) { - LOG(WARNING) << "failed to get tablet index, instance_id=" << instance_id - << " tablet_id=" << tablet_id << " err=" << err - << " key=" << hex(tablet_idx_key); - return false; - } - TabletIndexPB tablet_idx_pb; - if (!tablet_idx_pb.ParseFromString(tablet_idx_val)) { - LOG(WARNING) << "failed to parse tablet_idx_pb, instance_id=" << instance_id - << " tablet_id=" << tablet_id; - return false; + if (!get_tablet_idx_from_recycler_cache(instance_id, tablet_id, &tablet_idx_pb)) { + err = txn->get(tablet_idx_key, &tablet_idx_val); + if (TxnErrorCode::TXN_OK != err) { + LOG(WARNING) << "failed to get tablet index, instance_id=" << instance_id + << " tablet_id=" << tablet_id << " err=" << err + << " key=" << hex(tablet_idx_key); + return false; + } + + if (!tablet_idx_pb.ParseFromString(tablet_idx_val)) { + LOG(WARNING) << "failed to parse tablet_idx_pb, instance_id=" << instance_id + << " tablet_id=" << tablet_id; + return false; + } + put_tablet_idx_to_recycler_cache(instance_id, tablet_id, tablet_idx_pb); } if (!tablet_idx_pb.has_db_id()) { @@ -2841,7 +2846,22 @@ int InstanceRecycler::recycle_tablets(int64_t table_id, int64_t index_id, } for (auto& k : tablet_idx_keys) { txn->remove(k); + // Invalidate cache for removed tablet_idx_keys + TabletIndexCache* cache = recycler_cache_manager(); + if (cache != nullptr) { + // Extract tablet_id from key: meta_tablet_idx_key({instance_id, tablet_id}) + // The key format is known, we can parse tablet_id from it + std::string_view k1 = k; + k1.remove_prefix(1); + // 0x01 "meta" ${instance_id} "tablet_index" ${tablet_id} + std::vector, int, int>> out; + decode_key(&k1, &out); + DCHECK_EQ(out.size(), 4) << k1; + auto tablet_id = std::get(std::get<0>(out[3])); + cache->invalidate(std::make_tuple(instance_id_, tablet_id)); + } } + for (auto& k : restore_job_keys) { txn->remove(k); } diff --git a/cloud/src/recycler/util.cpp b/cloud/src/recycler/util.cpp index 07d98ff87fd749..61c44e6ef8502e 100644 --- a/cloud/src/recycler/util.cpp +++ b/cloud/src/recycler/util.cpp @@ -21,6 +21,8 @@ #include +#include "common/config.h" +#include "common/kv_cache.h" #include "common/util.h" #include "meta-service/meta_service_schema.h" #include "meta-store/keys.h" @@ -28,6 +30,17 @@ #include "meta-store/txn_kv_error.h" namespace doris::cloud { + +TabletIndexCache* recycler_cache_manager() { + if (!config::enable_tablet_index_cache) { + return nullptr; + } + static TabletIndexCache cache(config::recycler_tablet_index_cache_capacity, + config::tablet_index_cache_ttl_seconds, + "recycler_tablet_index_cache"); + return &cache; +} + namespace config { extern int32_t recycle_job_lease_expired_ms; } // namespace config @@ -234,9 +247,37 @@ int lease_instance_recycle_job(TxnKv* txn_kv, std::string_view key, const std::s return 0; } +bool get_tablet_idx_from_recycler_cache(std::string_view instance_id, int64_t tablet_id, + TabletIndexPB* tablet_idx) { + TabletIndexCache* cache = recycler_cache_manager(); + if (cache == nullptr) { + return false; + } + auto cache_key = std::make_tuple(std::string(instance_id), tablet_id); + return cache->get(cache_key, tablet_idx); +} + +void put_tablet_idx_to_recycler_cache(std::string_view instance_id, int64_t tablet_id, + const TabletIndexPB& tablet_idx) { + TabletIndexCache* cache = recycler_cache_manager(); + if (cache == nullptr || !tablet_idx.has_db_id()) { + return; + } + auto cache_key = std::make_tuple(std::string(instance_id), tablet_id); + cache->put(cache_key, tablet_idx); +} + // ret: 0: success, 1: tablet not found, -1: failed int get_tablet_idx(TxnKv* txn_kv, const std::string& instance_id, int64_t tablet_id, TabletIndexPB& tablet_idx) { + // Cache lookup + auto cache_key = std::make_tuple(instance_id, tablet_id); + TabletIndexCache* cache = recycler_cache_manager(); + if (cache != nullptr && cache->get(cache_key, &tablet_idx)) { + LOG(INFO) << "finish get tablet index from cache, tablet_id= " << tablet_id; + return 0; + } + std::unique_ptr txn; TxnErrorCode err = txn_kv->create_txn(&txn); if (err != TxnErrorCode::TXN_OK) { @@ -266,6 +307,11 @@ int get_tablet_idx(TxnKv* txn_kv, const std::string& instance_id, int64_t tablet << " idx_pb_tablet_id=" << tablet_idx.tablet_id() << " key=" << hex(key); return -1; } + + // Cache put + if (cache != nullptr) { + cache->put(cache_key, tablet_idx); + } return 0; } diff --git a/cloud/src/recycler/util.h b/cloud/src/recycler/util.h index f6b66e81f4c522..13f1ce6397d065 100644 --- a/cloud/src/recycler/util.h +++ b/cloud/src/recycler/util.h @@ -24,6 +24,7 @@ #include #include "common/defer.h" +#include "common/kv_cache.h" namespace doris::cloud { @@ -97,4 +98,13 @@ int get_tablet_idx(TxnKv* txn_kv, const std::string& instance_id, int64_t tablet int get_tablet_meta(TxnKv* txn_kv, const std::string& instance_id, int64_t tablet_id, TabletMetaCloudPB& tablet_meta); + +TabletIndexCache* recycler_cache_manager(); + +bool get_tablet_idx_from_recycler_cache(std::string_view instance_id, int64_t tablet_id, + TabletIndexPB* tablet_idx); + +void put_tablet_idx_to_recycler_cache(std::string_view instance_id, int64_t tablet_id, + const TabletIndexPB& tablet_idx); + } // namespace doris::cloud diff --git a/cloud/test/CMakeLists.txt b/cloud/test/CMakeLists.txt index 0d40bd5c2d641c..4c777642e77c52 100644 --- a/cloud/test/CMakeLists.txt +++ b/cloud/test/CMakeLists.txt @@ -112,6 +112,10 @@ add_executable(versioned_value_test versioned_value_test.cpp) add_executable(bvars_test bvars_test.cpp) +add_executable(kv_cache_test kv_cache_test.cpp) + +add_executable(lru_cache_test lru_cache_test.cpp) + message("Meta-service test dependencies: ${TEST_LINK_LIBS}") #target_link_libraries(sync_point_test ${TEST_LINK_LIBS}) @@ -206,6 +210,12 @@ target_link_libraries(bvars_test ${FDB_LINKER_FLAGS} ${TEST_LINK_LIBS}) +target_link_libraries(kv_cache_test + ${FDB_LINKER_FLAGS} + ${TEST_LINK_LIBS}) + +target_link_libraries(lru_cache_test ${TEST_LINK_LIBS}) + install(FILES ${BASE_DIR}/script/run_all_tests.sh ${BASE_DIR}/conf/fdb.cluster @@ -223,4 +233,3 @@ install(FILES GROUP_READ GROUP_WRITE GROUP_EXECUTE WORLD_READ WORLD_EXECUTE DESTINATION ${BUILD_DIR}/test) - diff --git a/cloud/test/kv_cache_test.cpp b/cloud/test/kv_cache_test.cpp new file mode 100644 index 00000000000000..4916f23cee4a23 --- /dev/null +++ b/cloud/test/kv_cache_test.cpp @@ -0,0 +1,124 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#include "common/kv_cache.h" + +#include +#include + +#include +#include + +namespace doris::cloud { + +TEST(KvCacheTest, BasicGetPut) { + TabletIndexCache cache(100); + + TabletIndexPB pb; + pb.set_db_id(1); + pb.set_table_id(2); + pb.set_index_id(3); + pb.set_partition_id(4); + pb.set_tablet_id(100); + + auto key = std::make_tuple("instance1", 100L); + cache.put(key, pb); + + TabletIndexPB result; + ASSERT_TRUE(cache.get(key, &result)); + EXPECT_EQ(result.tablet_id(), 100); + EXPECT_EQ(result.db_id(), 1); +} + +TEST(KvCacheTest, CacheMiss) { + TabletIndexCache cache(100); + auto key = std::make_tuple("instance1", 999L); + TabletIndexPB result; + ASSERT_FALSE(cache.get(key, &result)); +} + +TEST(KvCacheTest, LRUEviction) { + TabletIndexCache cache(16); + + // Insert 50 items to ensure eviction (shard capacity is 16/16+1=2, total ~32) + for (int i = 1; i <= 50; ++i) { + TabletIndexPB pb; + pb.set_tablet_id(i); + cache.put(std::make_tuple("inst", (int64_t)i), pb); + } + + // Verify some early items were evicted + TabletIndexPB result; + int miss_count = 0; + for (int i = 1; i <= 20; ++i) { + if (!cache.get(std::make_tuple("inst", (int64_t)i), &result)) { + miss_count++; + } + } + EXPECT_GT(miss_count, 0); +} + +TEST(KvCacheTest, Invalidate) { + TabletIndexCache cache(100); + TabletIndexPB pb; + pb.set_tablet_id(100); + auto key = std::make_tuple("inst", 100L); + cache.put(key, pb); + + TabletIndexPB result; + ASSERT_TRUE(cache.get(key, &result)); + + cache.invalidate(key); + ASSERT_FALSE(cache.get(key, &result)); +} + +TEST(KvCacheTest, Clear) { + TabletIndexCache cache(100); + for (int i = 1; i <= 5; ++i) { + TabletIndexPB pb; + pb.set_tablet_id(i); + cache.put(std::make_tuple("inst", (int64_t)i), pb); + } + EXPECT_EQ(cache.size(), 5); + + cache.clear(); + EXPECT_EQ(cache.size(), 0); +} + +TEST(KvCacheTest, ConcurrentAccess) { + TabletIndexCache cache(1000); + std::vector threads; + + for (int t = 0; t < 8; ++t) { + threads.emplace_back([&cache, t]() { + for (int i = 0; i < 100; ++i) { + TabletIndexPB pb; + pb.set_tablet_id(t * 100 + i); + cache.put(std::make_tuple("inst", (int64_t)(t * 100 + i)), pb); + + TabletIndexPB result; + cache.get(std::make_tuple("inst", (int64_t)(t * 100 + i)), &result); + } + }); + } + + for (auto& th : threads) { + th.join(); + } +} + +} // namespace doris::cloud diff --git a/cloud/test/lru_cache_test.cpp b/cloud/test/lru_cache_test.cpp new file mode 100644 index 00000000000000..5ff43e9c76b9ff --- /dev/null +++ b/cloud/test/lru_cache_test.cpp @@ -0,0 +1,67 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#include "cpp/lru_cache.h" + +#include + +#include "gtest/gtest.h" + +namespace doris { + +class CloudLRUCacheTest : public ::testing::Test { +protected: + struct TestValue { + explicit TestValue(int value_) : value(value_) {} + ~TestValue() { ++deleted_count; } + + int value; + static inline std::atomic deleted_count = 0; + }; + + void SetUp() override { TestValue::deleted_count.store(0); } +}; + +TEST_F(CloudLRUCacheTest, CanUseSharedShardedLRUCache) { + ShardedLRUCache cache("cloud_ut", 2, LRUCacheType::NUMBER, 1, 0, false); + + Cache::Handle* handle = cache.insert(CacheKey("k1"), new TestValue(1), 1, CachePriority::NORMAL, + cache_value_deleter); + ASSERT_NE(handle, nullptr); + cache.release(handle); + + handle = cache.lookup(CacheKey("k1")); + ASSERT_NE(handle, nullptr); + EXPECT_EQ(static_cast(cache.value(handle))->value, 1); + cache.release(handle); +} + +TEST_F(CloudLRUCacheTest, EvictionUsesConfiguredDeleter) { + ShardedLRUCache cache("cloud_ut", 2, LRUCacheType::NUMBER, 1, 0, false); + + cache.release(cache.insert(CacheKey("k1"), new TestValue(1), 1, CachePriority::NORMAL, + cache_value_deleter)); + cache.release(cache.insert(CacheKey("k2"), new TestValue(2), 1, CachePriority::NORMAL, + cache_value_deleter)); + cache.release(cache.insert(CacheKey("k3"), new TestValue(3), 1, CachePriority::NORMAL, + cache_value_deleter)); + + EXPECT_EQ(TestValue::deleted_count.load(), 1); + EXPECT_EQ(cache.lookup(CacheKey("k1")), nullptr); +} + +} // namespace doris diff --git a/cloud/test/meta_server_test.cpp b/cloud/test/meta_server_test.cpp index cebdc54aa46b18..5e51c7b46339af 100644 --- a/cloud/test/meta_server_test.cpp +++ b/cloud/test/meta_server_test.cpp @@ -71,6 +71,12 @@ void notify_refresh_instance(std::shared_ptr txn_kv, const std::string& i } // namespace doris::cloud TEST(MetaServerTest, FQDNRefreshInstance) { + auto old_enable_tablet_index_cache = config::enable_tablet_index_cache; + DORIS_CLOUD_DEFER { + config::enable_tablet_index_cache = old_enable_tablet_index_cache; + }; + config::enable_tablet_index_cache = false; + class MockMetaService : public cloud::MetaServiceImpl { public: MockMetaService(std::shared_ptr txn_kv, diff --git a/be/src/util/lru_cache.cpp b/common/cpp/lru_cache.cpp similarity index 87% rename from be/src/util/lru_cache.cpp rename to common/cpp/lru_cache.cpp index 353c7303212376..ccde2e67e4d706 100644 --- a/be/src/util/lru_cache.cpp +++ b/common/cpp/lru_cache.cpp @@ -18,31 +18,24 @@ // Use of this source code is governed by a BSD-style license that can be // found in the LICENSE file. See the AUTHORS file for names of contributors. -#include "util/lru_cache.h" +#include "cpp/lru_cache.h" +#include #include #include #include -#include -#include -#include "common/metrics/metrics.h" -#include "util/time.h" +namespace doris { -using std::string; -using std::stringstream; +namespace { -namespace doris { +int64_t unix_millis() { + return std::chrono::duration_cast( + std::chrono::system_clock::now().time_since_epoch()) + .count(); +} -DEFINE_GAUGE_METRIC_PROTOTYPE_2ARG(cache_capacity, MetricUnit::BYTES); -DEFINE_GAUGE_METRIC_PROTOTYPE_2ARG(cache_usage, MetricUnit::BYTES); -DEFINE_GAUGE_METRIC_PROTOTYPE_2ARG(cache_element_count, MetricUnit::NOUNIT); -DEFINE_GAUGE_METRIC_PROTOTYPE_2ARG(cache_usage_ratio, MetricUnit::NOUNIT); -DEFINE_COUNTER_METRIC_PROTOTYPE_2ARG(cache_lookup_count, MetricUnit::OPERATIONS); -DEFINE_COUNTER_METRIC_PROTOTYPE_2ARG(cache_hit_count, MetricUnit::OPERATIONS); -DEFINE_COUNTER_METRIC_PROTOTYPE_2ARG(cache_miss_count, MetricUnit::OPERATIONS); -DEFINE_COUNTER_METRIC_PROTOTYPE_2ARG(cache_stampede_count, MetricUnit::OPERATIONS); -DEFINE_GAUGE_METRIC_PROTOTYPE_2ARG(cache_hit_ratio, MetricUnit::NOUNIT); +} // namespace uint32_t CacheKey::hash(const char* data, size_t n, uint32_t seed) const { // Similar to murmur hash @@ -317,7 +310,7 @@ Cache::Handle* LRUCache::lookup(const CacheKey& key, uint32_t hash) { } e->refs++; ++_hit_count; - e->last_visit_time = UnixMillis(); + e->last_visit_time = unix_millis(); } else { ++_miss_count; } @@ -476,7 +469,7 @@ bool LRUCache::_lru_k_insert_visits_list(size_t total_size, visits_lru_cache_key } Cache::Handle* LRUCache::insert(const CacheKey& key, uint32_t hash, void* value, size_t charge, - CachePriority priority) { + CachePriority priority, CacheValueDeleter deleter) { size_t handle_size = sizeof(LRUHandle) - 1 + key.size(); auto* e = reinterpret_cast(malloc(handle_size)); e->value = value; @@ -491,8 +484,9 @@ Cache::Handle* LRUCache::insert(const CacheKey& key, uint32_t hash, void* value, e->in_cache = false; e->priority = priority; e->type = _type; + e->deleter = deleter; memcpy(e->key_data, key.data(), key.size()); - e->last_visit_time = UnixMillis(); + e->last_visit_time = unix_millis(); LRUHandle* to_remove_head = nullptr; { @@ -689,26 +683,6 @@ ShardedLRUCache::ShardedLRUCache(const std::string& name, size_t capacity, LRUCa shards[s]->set_element_count_capacity(per_shard_element_count_capacity); } _shards = shards; - - _entity = DorisMetrics::instance()->metric_registry()->register_entity( - std::string("lru_cache:") + name, {{"name", name}}); - _entity->register_hook(name, std::bind(&ShardedLRUCache::update_cache_metrics, this)); - INT_GAUGE_METRIC_REGISTER(_entity, cache_capacity); - INT_GAUGE_METRIC_REGISTER(_entity, cache_usage); - INT_GAUGE_METRIC_REGISTER(_entity, cache_element_count); - DOUBLE_GAUGE_METRIC_REGISTER(_entity, cache_usage_ratio); - INT_COUNTER_METRIC_REGISTER(_entity, cache_lookup_count); - INT_COUNTER_METRIC_REGISTER(_entity, cache_hit_count); - INT_COUNTER_METRIC_REGISTER(_entity, cache_stampede_count); - INT_COUNTER_METRIC_REGISTER(_entity, cache_miss_count); - DOUBLE_GAUGE_METRIC_REGISTER(_entity, cache_hit_ratio); - - _hit_count_bvar.reset(new bvar::Adder("doris_cache", _name)); - _hit_count_per_second.reset(new bvar::PerSecond>( - "doris_cache", _name + "_persecond", _hit_count_bvar.get(), 60)); - _lookup_count_bvar.reset(new bvar::Adder("doris_cache", _name)); - _lookup_count_per_second.reset(new bvar::PerSecond>( - "doris_cache", _name + "_persecond", _lookup_count_bvar.get(), 60)); } ShardedLRUCache::ShardedLRUCache(const std::string& name, size_t capacity, LRUCacheType type, @@ -725,8 +699,7 @@ ShardedLRUCache::ShardedLRUCache(const std::string& name, size_t capacity, LRUCa } ShardedLRUCache::~ShardedLRUCache() { - _entity->deregister_hook(_name); - DorisMetrics::instance()->metric_registry()->deregister_entity(_entity); + _metrics_recorder.reset(); if (_shards) { for (int s = 0; s < _num_shards; s++) { delete _shards[s]; @@ -754,9 +727,9 @@ size_t ShardedLRUCache::get_capacity() { } Cache::Handle* ShardedLRUCache::insert(const CacheKey& key, void* value, size_t charge, - CachePriority priority) { + CachePriority priority, CacheValueDeleter deleter) { const uint32_t hash = _hash_slice(key); - return _shards[_shard(hash)]->insert(key, hash, value, charge, priority); + return _shards[_shard(hash)]->insert(key, hash, value, charge, priority, deleter); } Cache::Handle* ShardedLRUCache::lookup(const CacheKey& key) { @@ -824,41 +797,27 @@ size_t ShardedLRUCache::get_element_count() { return total_element_count; } -void ShardedLRUCache::update_cache_metrics() const { - size_t capacity = 0; - size_t total_usage = 0; - size_t total_lookup_count = 0; - size_t total_hit_count = 0; - size_t total_element_count = 0; - size_t total_miss_count = 0; - size_t total_stampede_count = 0; - +ShardedLRUCache::MetricsSnapshot ShardedLRUCache::get_metrics_snapshot() const { + MetricsSnapshot snapshot; for (int i = 0; i < _num_shards; i++) { - capacity += _shards[i]->get_capacity(); - total_usage += _shards[i]->get_usage(); - total_lookup_count += _shards[i]->get_lookup_count(); - total_hit_count += _shards[i]->get_hit_count(); - total_element_count += _shards[i]->get_element_count(); - total_miss_count += _shards[i]->get_miss_count(); - total_stampede_count += _shards[i]->get_stampede_count(); + snapshot.capacity += _shards[i]->get_capacity(); + snapshot.usage += _shards[i]->get_usage(); + snapshot.lookup_count += _shards[i]->get_lookup_count(); + snapshot.hit_count += _shards[i]->get_hit_count(); + snapshot.miss_count += _shards[i]->get_miss_count(); + snapshot.stampede_count += _shards[i]->get_stampede_count(); + snapshot.element_count += _shards[i]->get_element_count(); } + return snapshot; +} - cache_capacity->set_value(capacity); - cache_usage->set_value(total_usage); - cache_element_count->set_value(total_element_count); - cache_lookup_count->set_value(total_lookup_count); - cache_hit_count->set_value(total_hit_count); - cache_miss_count->set_value(total_miss_count); - cache_stampede_count->set_value(total_stampede_count); - cache_usage_ratio->set_value( - capacity == 0 ? 0 : (static_cast(total_usage) / static_cast(capacity))); - cache_hit_ratio->set_value(total_lookup_count == 0 ? 0 - : (static_cast(total_hit_count) / - static_cast(total_lookup_count))); +void ShardedLRUCache::set_metrics_recorder(std::unique_ptr metrics_recorder) { + std::lock_guard l(_mutex); + _metrics_recorder = std::move(metrics_recorder); } Cache::Handle* DummyLRUCache::insert(const CacheKey& key, void* value, size_t charge, - CachePriority priority) { + CachePriority priority, CacheValueDeleter deleter) { size_t handle_size = sizeof(LRUHandle); auto* e = reinterpret_cast(malloc(handle_size)); e->value = value; @@ -869,6 +828,7 @@ Cache::Handle* DummyLRUCache::insert(const CacheKey& key, void* value, size_t ch e->refs = 1; // only one for the returned handle e->next = e->prev = nullptr; e->in_cache = false; + e->deleter = deleter; return reinterpret_cast(e); } diff --git a/be/src/util/lru_cache.h b/common/cpp/lru_cache.h similarity index 93% rename from be/src/util/lru_cache.h rename to common/cpp/lru_cache.h index f01fd0cd6cf5e2..d5d1048ad401e3 100644 --- a/be/src/util/lru_cache.h +++ b/common/cpp/lru_cache.h @@ -22,7 +22,6 @@ #pragma once #include -#include #include #include @@ -32,15 +31,14 @@ #include #include #include +#include #include +#include #include #include +#include #include -#include "common/metrics/doris_metrics.h" -#include "common/metrics/metrics.h" -#include "runtime/memory/lru_cache_value_base.h" - namespace doris { class Cache; @@ -145,6 +143,7 @@ class CacheKey { enum class CachePriority { NORMAL = 0, DURABLE = 1 }; using CachePrunePredicate = std::function; +using CacheValueDeleter = void (*)(void*); // CacheValueTimeExtractor can extract timestamp // in cache value through the specified function, // such as last_visit_time in InvertedIndexSearcherCache::CacheValue @@ -154,6 +153,11 @@ struct PrunedInfo { int64_t pruned_size = 0; }; +template +void cache_value_deleter(void* value) { + delete static_cast(value); +} + class Cache { public: Cache() = default; @@ -179,7 +183,8 @@ class Cache { // // Note: if is ShardedLRUCache, cache capacity = ShardedLRUCache_capacity / num_shards. virtual Handle* insert(const CacheKey& key, void* value, size_t charge, - CachePriority priority = CachePriority::NORMAL) = 0; + CachePriority priority = CachePriority::NORMAL, + CacheValueDeleter deleter = nullptr) = 0; // If the cache has no mapping for "key", returns nullptr. // @@ -254,7 +259,8 @@ struct LRUHandle { CachePriority priority = CachePriority::NORMAL; LRUCacheType type; int64_t last_visit_time; // Save the last visit time of this cache entry. - char key_data[1]; // Beginning of key + CacheValueDeleter deleter = nullptr; + char key_data[1]; // Beginning of key // Note! key_data must be at the end. CacheKey key() const { @@ -268,8 +274,8 @@ struct LRUHandle { } void free() { - if (value != nullptr) { // value allows null pointer. - delete (LRUCacheValueBase*)value; + if (value != nullptr && deleter != nullptr) { + deleter(value); } ::free(this); } @@ -345,7 +351,8 @@ class LRUCache { // Like Cache methods, but with an extra "hash" parameter. // Must call release on the returned handle pointer. Cache::Handle* insert(const CacheKey& key, uint32_t hash, void* value, size_t charge, - CachePriority priority = CachePriority::NORMAL); + CachePriority priority = CachePriority::NORMAL, + CacheValueDeleter deleter = nullptr); Cache::Handle* lookup(const CacheKey& key, uint32_t hash); void release(Cache::Handle* handle); void erase(const CacheKey& key, uint32_t hash); @@ -415,9 +422,25 @@ class LRUCache { class ShardedLRUCache : public Cache { public: + struct MetricsSnapshot { + size_t capacity = 0; + size_t usage = 0; + size_t lookup_count = 0; + size_t hit_count = 0; + size_t miss_count = 0; + size_t stampede_count = 0; + size_t element_count = 0; + }; + + class MetricsRecorder { + public: + virtual ~MetricsRecorder() = default; + }; + ~ShardedLRUCache() override; Handle* insert(const CacheKey& key, void* value, size_t charge, - CachePriority priority = CachePriority::NORMAL) override; + CachePriority priority = CachePriority::NORMAL, + CacheValueDeleter deleter = nullptr) override; Handle* lookup(const CacheKey& key) override; void release(Handle* handle) override; void erase(const CacheKey& key) override; @@ -430,10 +453,8 @@ class ShardedLRUCache : public Cache { size_t get_element_count() override; PrunedInfo set_capacity(size_t capacity) override; size_t get_capacity() override; - -private: - // LRUCache can only be created and managed with LRUCachePolicy. - friend class LRUCachePolicy; + MetricsSnapshot get_metrics_snapshot() const; + void set_metrics_recorder(std::unique_ptr metrics_recorder); explicit ShardedLRUCache(const std::string& name, size_t capacity, LRUCacheType type, uint32_t num_shards, uint32_t element_count_capacity, bool is_lru_k); @@ -443,8 +464,6 @@ class ShardedLRUCache : public Cache { bool cache_value_check_timestamp, uint32_t element_count_capacity, bool is_lru_k); - void update_cache_metrics() const; - private: static uint32_t _hash_slice(const CacheKey& s); uint32_t _shard(uint32_t hash) const { @@ -458,22 +477,7 @@ class ShardedLRUCache : public Cache { std::atomic _last_id; std::mutex _mutex; size_t _capacity {0}; - - std::shared_ptr _entity; - IntGauge* cache_capacity = nullptr; - IntGauge* cache_usage = nullptr; - IntGauge* cache_element_count = nullptr; - DoubleGauge* cache_usage_ratio = nullptr; - IntCounter* cache_lookup_count = nullptr; - IntCounter* cache_hit_count = nullptr; - IntCounter* cache_miss_count = nullptr; - IntCounter* cache_stampede_count = nullptr; - DoubleGauge* cache_hit_ratio = nullptr; - // bvars - std::unique_ptr> _hit_count_bvar; - std::unique_ptr>> _hit_count_per_second; - std::unique_ptr> _lookup_count_bvar; - std::unique_ptr>> _lookup_count_per_second; + std::unique_ptr _metrics_recorder; }; // Compatible with ShardedLRUCache usage, but will not actually cache. @@ -481,7 +485,8 @@ class DummyLRUCache : public Cache { public: // Must call release on the returned handle pointer. Handle* insert(const CacheKey& key, void* value, size_t charge, - CachePriority priority = CachePriority::NORMAL) override; + CachePriority priority = CachePriority::NORMAL, + CacheValueDeleter deleter = nullptr) override; Handle* lookup(const CacheKey& key) override { return nullptr; }; void release(Handle* handle) override; void erase(const CacheKey& key) override {}; diff --git a/be/src/util/lru_cache.hpp b/common/cpp/lru_cache.hpp similarity index 100% rename from be/src/util/lru_cache.hpp rename to common/cpp/lru_cache.hpp