5959namespace doris ::io {
6060#include " common/compile_check_begin.h"
6161
62+ namespace {
63+
64+ constexpr std::array<FileCacheType, 4 > LRU_LOG_REPLAY_TYPES = {
65+ FileCacheType::TTL , FileCacheType::INDEX , FileCacheType::NORMAL , FileCacheType::DISPOSABLE };
66+
67+ size_t file_cache_type_index (FileCacheType type) {
68+ return static_cast <size_t >(type);
69+ }
70+
71+ } // namespace
72+
6273// Insert a block pointer into one shard while swallowing allocation failures.
63- bool NeedUpdateLRUBlocks::insert (FileBlockSPtr block) {
64- if (!block) {
74+ bool NeedUpdateLRUBlocks::insert (FileBlockSPtr block, size_t max_queue_size ) {
75+ if (!block || max_queue_size == 0 ) {
6576 return false ;
6677 }
78+ bool reserved = false ;
6779 try {
6880 auto * raw_ptr = block.get ();
6981 auto idx = shard_index (raw_ptr);
7082 auto & shard = _shards[idx];
7183 std::lock_guard lock (shard.mutex );
72- auto [_, inserted] = shard.entries .emplace (raw_ptr, std::move (block));
73- if (inserted) {
74- _size.fetch_add (1 , std::memory_order_relaxed);
84+ if (shard.entries .contains (raw_ptr)) {
85+ return false ;
7586 }
76- return inserted;
87+ size_t cur_size = _size.load (std::memory_order_relaxed);
88+ while (cur_size < max_queue_size) {
89+ if (_size.compare_exchange_weak (cur_size, cur_size + 1 , std::memory_order_relaxed)) {
90+ reserved = true ;
91+ break ;
92+ }
93+ }
94+ if (!reserved) {
95+ return false ;
96+ }
97+ auto [_, inserted] = shard.entries .emplace (raw_ptr, std::move (block));
98+ DORIS_CHECK (inserted);
99+ return true ;
77100 } catch (const std::exception& e) {
101+ if (reserved) {
102+ decrease_size (1 );
103+ }
78104 LOG (WARNING ) << " Failed to enqueue block for LRU update: " << e.what ();
79105 } catch (...) {
106+ if (reserved) {
107+ decrease_size (1 );
108+ }
80109 LOG (WARNING ) << " Failed to enqueue block for LRU update: unknown error" ;
81110 }
82111 return false ;
@@ -103,7 +132,7 @@ size_t NeedUpdateLRUBlocks::drain(size_t limit, std::vector<FileBlockSPtr>* outp
103132 ++shard_drained;
104133 }
105134 if (shard_drained > 0 ) {
106- _size. fetch_sub (shard_drained, std::memory_order_relaxed );
135+ decrease_size (shard_drained);
107136 drained += shard_drained;
108137 }
109138 }
@@ -123,7 +152,7 @@ void NeedUpdateLRUBlocks::clear() {
123152 if (!shard.entries .empty ()) {
124153 auto removed = shard.entries .size ();
125154 shard.entries .clear ();
126- _size. fetch_sub (removed, std::memory_order_relaxed );
155+ decrease_size (removed);
127156 }
128157 }
129158 } catch (const std::exception& e) {
@@ -133,6 +162,16 @@ void NeedUpdateLRUBlocks::clear() {
133162 }
134163}
135164
165+ void NeedUpdateLRUBlocks::decrease_size (size_t delta) {
166+ size_t cur_size = _size.load (std::memory_order_relaxed);
167+ while (true ) {
168+ DORIS_CHECK (cur_size >= delta);
169+ if (_size.compare_exchange_weak (cur_size, cur_size - delta, std::memory_order_relaxed)) {
170+ return ;
171+ }
172+ }
173+ }
174+
136175size_t NeedUpdateLRUBlocks::shard_index (FileBlock* ptr) const {
137176 DCHECK (ptr != nullptr );
138177 return std::hash<FileBlock*> {}(ptr)&kShardMask ;
@@ -348,12 +387,30 @@ BlockFileCache::BlockFileCache(const std::string& cache_base_path,
348387 _cache_base_path.c_str (), " file_cache_recycle_keys_length" );
349388 _need_update_lru_blocks_length_recorder = std::make_shared<bvar::LatencyRecorder>(
350389 _cache_base_path.c_str (), " file_cache_need_update_lru_blocks_length" );
390+ _need_update_lru_blocks_produce_metrics = std::make_shared<bvar::Adder<size_t >>(
391+ _cache_base_path.c_str (), " file_cache_need_update_lru_blocks_produce" );
392+ _need_update_lru_blocks_consume_metrics = std::make_shared<bvar::Adder<size_t >>(
393+ _cache_base_path.c_str (), " file_cache_need_update_lru_blocks_consume" );
351394 _update_lru_blocks_latency_us = std::make_shared<bvar::LatencyRecorder>(
352395 _cache_base_path.c_str (), " file_cache_update_lru_blocks_latency_us" );
353396 _ttl_gc_latency_us = std::make_shared<bvar::LatencyRecorder>(_cache_base_path.c_str (),
354397 " file_cache_ttl_gc_latency_us" );
355398 _shadow_queue_levenshtein_distance = std::make_shared<bvar::LatencyRecorder>(
356399 _cache_base_path.c_str (), " file_cache_shadow_queue_levenshtein_distance" );
400+ for (FileCacheType type : {FileCacheType::DISPOSABLE , FileCacheType::NORMAL ,
401+ FileCacheType::INDEX , FileCacheType::TTL }) {
402+ size_t idx = file_cache_type_index (type);
403+ std::string metric_prefix =
404+ " file_cache_lru_recorder_" + cache_type_to_string (type) + " _record_queue" ;
405+ _lru_recorder_queue_length_recorder[idx] = std::make_shared<bvar::LatencyRecorder>(
406+ _cache_base_path.c_str (), metric_prefix + " _length" );
407+ _lru_recorder_queue_produce_metrics[idx] = std::make_shared<bvar::Adder<size_t >>(
408+ _cache_base_path.c_str (), metric_prefix + " _produce" );
409+ _lru_recorder_queue_consume_metrics[idx] = std::make_shared<bvar::Adder<size_t >>(
410+ _cache_base_path.c_str (), metric_prefix + " _consume" );
411+ }
412+ _lru_recorder_log_replay_idle_metrics = std::make_shared<bvar::Adder<size_t >>(
413+ _cache_base_path.c_str (), " file_cache_lru_recorder_log_replay_idle" );
357414
358415 _disposable_queue = LRUQueue (cache_settings.disposable_queue_size ,
359416 cache_settings.disposable_queue_elements , 60 * 60 );
@@ -648,7 +705,10 @@ FileBlocks BlockFileCache::get_impl(const UInt128Wrapper& hash, const CacheConte
648705}
649706
650707void BlockFileCache::add_need_update_lru_block (FileBlockSPtr block) {
651- if (_need_update_lru_blocks.insert (std::move (block))) {
708+ int64_t queue_limit = config::file_cache_background_block_lru_update_queue_max_size;
709+ size_t max_queue_size = queue_limit <= 0 ? 0 : static_cast <size_t >(queue_limit);
710+ if (_need_update_lru_blocks.insert (std::move (block), max_queue_size)) {
711+ *_need_update_lru_blocks_produce_metrics << 1 ;
652712 *_need_update_lru_blocks_length_recorder << _need_update_lru_blocks.size ();
653713 }
654714}
@@ -2093,6 +2153,7 @@ void BlockFileCache::run_background_block_lru_update() {
20932153 *_need_update_lru_blocks_length_recorder << _need_update_lru_blocks.size ();
20942154 continue ;
20952155 }
2156+ *_need_update_lru_blocks_consume_metrics << drained;
20962157
20972158 int64_t duration_ns = 0 ;
20982159 {
@@ -2307,19 +2368,28 @@ void BlockFileCache::run_background_lru_log_replay() {
23072368 }
23082369 }
23092370
2310- _lru_recorder->replay_queue_event (FileCacheType::TTL );
2311- _lru_recorder->replay_queue_event (FileCacheType::INDEX );
2312- _lru_recorder->replay_queue_event (FileCacheType::NORMAL );
2313- _lru_recorder->replay_queue_event (FileCacheType::DISPOSABLE );
2371+ replay_lru_logs_once ();
2372+ }
2373+ }
2374+
2375+ size_t BlockFileCache::replay_lru_logs_once () {
2376+ size_t replayed = 0 ;
2377+ for (FileCacheType type : LRU_LOG_REPLAY_TYPES ) {
2378+ replayed += _lru_recorder->replay_queue_event (type);
2379+ }
23142380
2315- if (config::enable_evaluate_shadow_queue_diff) {
2316- SCOPED_CACHE_LOCK (_mutex, this );
2317- _lru_recorder->evaluate_queue_diff (_ttl_queue, " ttl" , cache_lock);
2318- _lru_recorder->evaluate_queue_diff (_index_queue, " index" , cache_lock);
2319- _lru_recorder->evaluate_queue_diff (_normal_queue, " normal" , cache_lock);
2320- _lru_recorder->evaluate_queue_diff (_disposable_queue, " disposable" , cache_lock);
2321- }
2381+ if (replayed == 0 ) {
2382+ *_lru_recorder_log_replay_idle_metrics << 1 ;
2383+ }
2384+
2385+ if (config::enable_evaluate_shadow_queue_diff) {
2386+ SCOPED_CACHE_LOCK (_mutex, this );
2387+ _lru_recorder->evaluate_queue_diff (_ttl_queue, " ttl" , cache_lock);
2388+ _lru_recorder->evaluate_queue_diff (_index_queue, " index" , cache_lock);
2389+ _lru_recorder->evaluate_queue_diff (_normal_queue, " normal" , cache_lock);
2390+ _lru_recorder->evaluate_queue_diff (_disposable_queue, " disposable" , cache_lock);
23222391 }
2392+ return replayed;
23232393}
23242394
23252395void BlockFileCache::dump_lru_queues (bool force) {
0 commit comments