Skip to content

Commit e77064f

Browse files
branch-4.1: [improvement](be) Add stampede protection for AnnIndexIVFListCache #62442 (#62567)
Cherry-picked from #62442 Co-authored-by: zhiqiang <seuhezhiqiang@163.com>
1 parent ee12830 commit e77064f

3 files changed

Lines changed: 390 additions & 7 deletions

File tree

be/src/storage/index/ann/faiss_ann_index.cpp

Lines changed: 28 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -301,6 +301,12 @@ struct FaissIndexReader : faiss::IOReader {
301301
* Thread-safety: cache lookups / inserts are lock-free (the LRU cache is
302302
* internally sharded). Disk reads are serialised by _io_mutex because
303303
* the CLucene IndexInput is stateful (seek + readBytes).
304+
*
305+
* Stampede protection: when multiple threads concurrently miss the cache
306+
* for the same key, borrow() acquires _io_mutex and re-checks the cache
307+
* (double-check pattern). The first thread reads from disk and inserts
308+
* into the cache; subsequent threads find the entry on re-check and skip
309+
* the redundant disk I/O.
304310
*/
305311
struct CachedRandomAccessReader : faiss::RandomAccessReader {
306312
CachedRandomAccessReader(lucene::store::IndexInput* input, std::string cache_key_prefix,
@@ -352,8 +358,25 @@ struct CachedRandomAccessReader : faiss::RandomAccessReader {
352358
return _make_pinned_ref(std::move(handle), nbytes);
353359
}
354360

355-
// Slow path: cache miss — read from disk, then insert.
356-
auto page = _fetch_from_disk(offset, nbytes, cache->mem_tracker());
361+
// Slow path: cache miss — acquire I/O lock, then double-check cache.
362+
//
363+
// Multiple threads may concurrently miss the fast-path lookup for the
364+
// same key. Since _io_mutex already serialises all CLucene reads for
365+
// this reader (IndexInput is stateful), we simply re-check the cache
366+
// after acquiring the lock. If a preceding thread loaded this key
367+
// while we were waiting, we get a cache hit and skip the redundant
368+
// disk I/O entirely (stampede protection).
369+
std::lock_guard<std::mutex> lock(_io_mutex);
370+
371+
// Double-check: a preceding thread may have loaded this key while
372+
// we waited on _io_mutex.
373+
if (cache->lookup(key, &handle)) {
374+
++g_ivf_on_disk_cache_stats.hit_cnt;
375+
return _make_pinned_ref(std::move(handle), nbytes);
376+
}
377+
378+
// Still a miss — we are the first thread to reach here; read from disk.
379+
auto page = _fetch_from_disk_locked(offset, nbytes, cache->mem_tracker());
357380
cache->insert(key, page.get(), &handle);
358381
page.release(); // ownership transferred to cache
359382
return _make_pinned_ref(std::move(handle), nbytes);
@@ -382,15 +405,13 @@ struct CachedRandomAccessReader : faiss::RandomAccessReader {
382405
// ---- Disk I/O + metrics ----
383406

384407
/// Read a region from CLucene, wrapped in a DataPage, and record fetch metrics.
385-
std::unique_ptr<DataPage> _fetch_from_disk(
408+
/// Caller must already hold _io_mutex.
409+
std::unique_ptr<DataPage> _fetch_from_disk_locked(
386410
size_t offset, size_t nbytes, std::shared_ptr<MemTrackerLimiter> mem_tracker) const {
387411
auto page = std::make_unique<DataPage>(nbytes, std::move(mem_tracker));
388412

389413
const int64_t start_ns = MonotonicNanos();
390-
{
391-
std::lock_guard<std::mutex> lock(_io_mutex);
392-
_read_clucene(offset, page->data(), nbytes);
393-
}
414+
_read_clucene(offset, page->data(), nbytes);
394415
const int64_t cost_ns = MonotonicNanos() - start_ns;
395416

396417
++g_ivf_on_disk_cache_stats.miss_cnt;
Lines changed: 238 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,238 @@
1+
// Licensed to the Apache Software Foundation (ASF) under one
2+
// or more contributor license agreements. See the NOTICE file
3+
// distributed with this work for additional information
4+
// regarding copyright ownership. The ASF licenses this file
5+
// to you under the Apache License, Version 2.0 (the
6+
// "License"); you may not use this file except in compliance
7+
// with the License. You may obtain a copy of the License at
8+
//
9+
// http://www.apache.org/licenses/LICENSE-2.0
10+
//
11+
// Unless required by applicable law or agreed to in writing,
12+
// software distributed under the License is distributed on an
13+
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14+
// KIND, either express or implied. See the License for the
15+
// specific language governing permissions and limitations
16+
// under the License.
17+
18+
#include "storage/cache/ann_index_ivf_list_cache.h"
19+
20+
#include <gtest/gtest-message.h>
21+
#include <gtest/gtest-test-part.h>
22+
23+
#include <atomic>
24+
#include <barrier>
25+
#include <cstring>
26+
#include <thread>
27+
#include <vector>
28+
29+
#include "gtest/gtest_pred_impl.h"
30+
31+
namespace doris {
32+
33+
static constexpr uint32_t kNumShards = AnnIndexIVFListCache::kDefaultNumShards;
34+
35+
class AnnIndexIVFListCacheTest : public testing::Test {
36+
public:
37+
AnnIndexIVFListCacheTest() {
38+
mem_tracker = MemTrackerLimiter::create_shared(MemTrackerLimiter::Type::GLOBAL,
39+
"AnnIndexIVFListCacheTest");
40+
}
41+
42+
protected:
43+
void SetUp() override {
44+
cache_ = std::make_unique<AnnIndexIVFListCache>(kNumShards * 1024 * 1024, kNumShards);
45+
}
46+
47+
void TearDown() override { cache_.reset(); }
48+
49+
std::shared_ptr<MemTrackerLimiter> mem_tracker;
50+
std::unique_ptr<AnnIndexIVFListCache> cache_;
51+
};
52+
53+
TEST_F(AnnIndexIVFListCacheTest, basic_insert_and_lookup) {
54+
AnnIndexIVFListCache::CacheKey key("test_file", 4096, 0);
55+
56+
PageCacheHandle handle;
57+
EXPECT_FALSE(cache_->lookup(key, &handle));
58+
59+
auto* page = new DataPage(1024, mem_tracker);
60+
std::memset(page->data(), 0xAB, 1024);
61+
62+
cache_->insert(key, page, &handle);
63+
Slice data = handle.data();
64+
EXPECT_EQ(data.data, page->data());
65+
66+
PageCacheHandle lookup_handle;
67+
EXPECT_TRUE(cache_->lookup(key, &lookup_handle));
68+
Slice lookup_data = lookup_handle.data();
69+
EXPECT_EQ(static_cast<unsigned char>(lookup_data.data[0]), 0xAB);
70+
}
71+
72+
TEST_F(AnnIndexIVFListCacheTest, cache_miss_different_key) {
73+
AnnIndexIVFListCache::CacheKey key("file_a", 4096, 0);
74+
75+
auto* page = new DataPage(512, mem_tracker);
76+
PageCacheHandle handle;
77+
cache_->insert(key, page, &handle);
78+
79+
{
80+
PageCacheHandle miss_handle;
81+
AnnIndexIVFListCache::CacheKey miss_key("file_b", 4096, 0);
82+
EXPECT_FALSE(cache_->lookup(miss_key, &miss_handle));
83+
}
84+
85+
{
86+
PageCacheHandle miss_handle;
87+
AnnIndexIVFListCache::CacheKey miss_key("file_a", 8192, 0);
88+
EXPECT_FALSE(cache_->lookup(miss_key, &miss_handle));
89+
}
90+
91+
{
92+
PageCacheHandle miss_handle;
93+
AnnIndexIVFListCache::CacheKey miss_key("file_a", 4096, 100);
94+
EXPECT_FALSE(cache_->lookup(miss_key, &miss_handle));
95+
}
96+
}
97+
98+
TEST_F(AnnIndexIVFListCacheTest, multiple_entries) {
99+
constexpr int kCount = 32;
100+
std::vector<PageCacheHandle> handles(kCount);
101+
for (int i = 0; i < kCount; ++i) {
102+
AnnIndexIVFListCache::CacheKey key("multi", 4096, i * 1024);
103+
auto* page = new DataPage(256, mem_tracker);
104+
std::memset(page->data(), static_cast<char>(i), 256);
105+
cache_->insert(key, page, &handles[i]);
106+
}
107+
108+
for (int i = 0; i < kCount; ++i) {
109+
AnnIndexIVFListCache::CacheKey key("multi", 4096, i * 1024);
110+
PageCacheHandle handle;
111+
EXPECT_TRUE(cache_->lookup(key, &handle));
112+
Slice data = handle.data();
113+
EXPECT_EQ(static_cast<unsigned char>(data.data[0]), static_cast<unsigned char>(i));
114+
}
115+
}
116+
117+
// Simulates the stampede protection pattern from CachedRandomAccessReader::borrow().
118+
//
119+
// The test exercises the double-check locking path: N threads concurrently
120+
// miss the fast-path cache lookup for the same key; only the first thread
121+
// through the mutex performs the "disk I/O" (here: allocate + fill a DataPage);
122+
// the remaining N-1 threads find the entry on re-check and skip the I/O.
123+
TEST_F(AnnIndexIVFListCacheTest, stampede_protection) {
124+
constexpr int kThreads = 8;
125+
const AnnIndexIVFListCache::CacheKey key("stampede_file", 4096, 0);
126+
127+
std::atomic<int> io_count {0};
128+
std::mutex io_mutex;
129+
std::barrier sync_point(kThreads);
130+
131+
auto simulate_borrow = [&]() {
132+
PageCacheHandle handle;
133+
if (cache_->lookup(key, &handle)) {
134+
return;
135+
}
136+
137+
sync_point.arrive_and_wait();
138+
139+
std::lock_guard<std::mutex> lock(io_mutex);
140+
141+
if (cache_->lookup(key, &handle)) {
142+
return;
143+
}
144+
145+
auto* page = new DataPage(1024, mem_tracker);
146+
std::memset(page->data(), 0xCD, 1024);
147+
io_count.fetch_add(1, std::memory_order_relaxed);
148+
cache_->insert(key, page, &handle);
149+
};
150+
151+
std::vector<std::thread> threads;
152+
threads.reserve(kThreads);
153+
for (int i = 0; i < kThreads; ++i) {
154+
threads.emplace_back(simulate_borrow);
155+
}
156+
for (auto& t : threads) {
157+
t.join();
158+
}
159+
160+
EXPECT_EQ(io_count.load(), 1);
161+
162+
PageCacheHandle verify_handle;
163+
EXPECT_TRUE(cache_->lookup(key, &verify_handle));
164+
Slice data = verify_handle.data();
165+
EXPECT_EQ(static_cast<unsigned char>(data.data[0]), 0xCD);
166+
}
167+
168+
// Variant: multiple distinct keys under contention. Each key should be
169+
// loaded exactly once even when many threads race on each key.
170+
TEST_F(AnnIndexIVFListCacheTest, stampede_protection_multiple_keys) {
171+
constexpr int kKeys = 4;
172+
constexpr int kThreadsPerKey = 4;
173+
constexpr int kTotalThreads = kKeys * kThreadsPerKey;
174+
175+
std::atomic<int> io_counts[kKeys];
176+
for (auto& c : io_counts) {
177+
c.store(0);
178+
}
179+
180+
std::mutex io_mutex;
181+
std::barrier sync_point(kTotalThreads);
182+
183+
auto simulate_borrow = [&](int key_idx) {
184+
AnnIndexIVFListCache::CacheKey key("multi_stampede", 4096, key_idx * 1024);
185+
186+
PageCacheHandle handle;
187+
if (cache_->lookup(key, &handle)) {
188+
return;
189+
}
190+
191+
sync_point.arrive_and_wait();
192+
193+
std::lock_guard<std::mutex> lock(io_mutex);
194+
195+
if (cache_->lookup(key, &handle)) {
196+
return;
197+
}
198+
199+
auto* page = new DataPage(512, mem_tracker);
200+
std::memset(page->data(), static_cast<char>(key_idx), 512);
201+
io_counts[key_idx].fetch_add(1, std::memory_order_relaxed);
202+
cache_->insert(key, page, &handle);
203+
};
204+
205+
std::vector<std::thread> threads;
206+
threads.reserve(kTotalThreads);
207+
for (int k = 0; k < kKeys; ++k) {
208+
for (int t = 0; t < kThreadsPerKey; ++t) {
209+
threads.emplace_back(simulate_borrow, k);
210+
}
211+
}
212+
for (auto& th : threads) {
213+
th.join();
214+
}
215+
216+
for (int k = 0; k < kKeys; ++k) {
217+
EXPECT_EQ(io_counts[k].load(), 1) << "key_idx=" << k;
218+
219+
AnnIndexIVFListCache::CacheKey key("multi_stampede", 4096, k * 1024);
220+
PageCacheHandle handle;
221+
EXPECT_TRUE(cache_->lookup(key, &handle));
222+
Slice data = handle.data();
223+
EXPECT_EQ(static_cast<unsigned char>(data.data[0]), static_cast<unsigned char>(k));
224+
}
225+
}
226+
227+
TEST_F(AnnIndexIVFListCacheTest, singleton_lifecycle) {
228+
EXPECT_EQ(AnnIndexIVFListCache::instance(), nullptr);
229+
230+
auto* inst = AnnIndexIVFListCache::create_global_cache(kNumShards * 2048, kNumShards);
231+
EXPECT_NE(inst, nullptr);
232+
EXPECT_EQ(AnnIndexIVFListCache::instance(), inst);
233+
234+
AnnIndexIVFListCache::destroy_global_cache();
235+
EXPECT_EQ(AnnIndexIVFListCache::instance(), nullptr);
236+
}
237+
238+
} // namespace doris

0 commit comments

Comments
 (0)