Skip to content

Commit f531458

Browse files
authored
[opt] Prevent Load starvation by reserving buffers for high-priority operations (#895)
## Purpose This PR introduces a reserved buffer mechanism to prevent Load operations from being starved when Dump operations consume all available buffers slowly. In scenarios where Dump operations process buffers slowly, they can occupy all buffers, causing Load operations to wait indefinitely. By reserving a portion of buffers exclusively for Load operations, we ensure system responsiveness and prevent deadlocks. ## Modifications - Added loadExclusiveBufferNumber configuration parameter (default 1024) to specify the number of buffers reserved for Load operations. - Modified buffer allocation logic to respect reserved buffers: ordinary allocations cannot use the last loadExclusiveBufferNumber buffers, while Load operations (with allowReserved=true) can access all buffers. - Updated LoadQueue::DispatchOneTask to request buffers with allowReserved=true to ensure Load tasks can always get buffers. - Added comprehensive tests for reserved buffer behavior. ## Test - Added GetReservedNode test to verify reserved buffer allocation logic. - Existing tests pass with updated configurations. ## Configuration New config parameter: `cache_load_exclusive_buffer_number` (default: 1024)
1 parent 9d6abcd commit f531458

6 files changed

Lines changed: 77 additions & 24 deletions

File tree

ucm/store/cache/cc/cache_store.cc

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -128,6 +128,7 @@ class CacheStore : public StoreV1 {
128128
config.GetNumber("running_queue_depth", param.runningQueueDepth);
129129
config.GetNumber("timeout_ms", param.timeoutMs);
130130
config.GetNumber("cache_stream_number", param.streamNumber);
131+
config.GetNumber("cache_load_exclusive_buffer_number", param.loadExclusiveBufferNumber);
131132
return param;
132133
}
133134
Status CheckSizeConfig(const Config& config)
@@ -160,7 +161,7 @@ class CacheStore : public StoreV1 {
160161
auto s = CheckSizeConfig(config);
161162
if (s.Failure()) { return s; }
162163
auto bufferNumber = config.bufferCapacity / config.shardSize;
163-
if (bufferNumber < 1024) {
164+
if (bufferNumber < 1024 || bufferNumber < config.loadExclusiveBufferNumber * 2) {
164165
return Status::InvalidParam("too small buffer({}) on shard({})", config.bufferCapacity,
165166
config.shardSize);
166167
}

ucm/store/cache/cc/global_config.h

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -39,6 +39,7 @@ struct Config {
3939
bool ioDirect{false};
4040
std::vector<ssize_t> cpuAffinityCores{};
4141
size_t bufferCapacity{256ULL << 30};
42+
size_t loadExclusiveBufferNumber{1024};
4243
bool shareBufferEnable{true};
4344
size_t waitingQueueDepth{8192};
4445
size_t runningQueueDepth{524288};

ucm/store/cache/cc/load_queue.cc

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -86,7 +86,7 @@ void LoadQueue::DispatchOneTask(TaskPair&& pair)
8686
for (size_t i = 0; i < nShard; i++) {
8787
auto& shard = task->desc[i];
8888
ShardTask shardTask;
89-
shardTask.bufferHandle = buffer_->Get(shard.owner, shard.index);
89+
shardTask.bufferHandle = buffer_->Get(shard.owner, shard.index, true);
9090
shardTask.backendTaskHandle = 0;
9191
if (shardTask.bufferHandle.Owner() && !shardTask.bufferHandle.Ready()) {
9292
Detail::TaskDesc backendTask{

ucm/store/cache/cc/trans_buffer.cc

Lines changed: 30 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -70,12 +70,13 @@ class BufferStrategy {
7070
int32_t deviceId{-1};
7171
size_t nodeSize{0};
7272
size_t totalSize{0};
73+
size_t reservedNumber{0};
7374
};
7475
BaseConfig base_;
7576

7677
public:
77-
BufferStrategy(int32_t deviceId, size_t nodeSize, size_t totalSize)
78-
: base_({deviceId, nodeSize, totalSize})
78+
BufferStrategy(int32_t deviceId, size_t nodeSize, size_t totalSize, size_t reservedNumber)
79+
: base_({deviceId, nodeSize, totalSize, reservedNumber})
7980
{
8081
}
8182
virtual ~BufferStrategy() = default;
@@ -86,7 +87,7 @@ class BufferStrategy {
8687
virtual void NodeLock(size_t iNode) = 0;
8788
virtual void NodeUnlock(size_t iNode) = 0;
8889
virtual size_t& FirstAt(size_t iBucket) = 0;
89-
virtual size_t FetchNode() = 0;
90+
virtual size_t FetchNode(bool allowReserved) = 0;
9091
virtual void* DataAt(size_t iNode) = 0;
9192
virtual BufferMetaNode* MetaAt(size_t iNode) = 0;
9293
};
@@ -132,8 +133,9 @@ class LocalBufferStrategy : public BufferStrategy {
132133
std::shared_ptr<void> data_;
133134

134135
public:
135-
LocalBufferStrategy(int32_t deviceId, size_t nodeSize, size_t totalSize, bool ioDirect)
136-
: BufferStrategy(deviceId, nodeSize, totalSize), ioDirect_(ioDirect)
136+
LocalBufferStrategy(int32_t deviceId, size_t nodeSize, size_t totalSize, size_t reservedNumber,
137+
bool ioDirect)
138+
: BufferStrategy(deviceId, nodeSize, totalSize, reservedNumber), ioDirect_(ioDirect)
137139
{
138140
}
139141
Status Setup() override
@@ -181,10 +183,11 @@ class LocalBufferStrategy : public BufferStrategy {
181183
void NodeLock(size_t iNode) override { nodeLocks_[iNode].Lock(); }
182184
void NodeUnlock(size_t iNode) override { nodeLocks_[iNode].Unlock(); }
183185
size_t& FirstAt(size_t iBucket) override { return header_.buckets[iBucket]; }
184-
size_t FetchNode() override
186+
size_t FetchNode(bool allowReserved) override
185187
{
186-
auto head = header_.freeHead++;
187-
if (header_.freeHead == header_.nNode) { header_.freeHead = 0; }
188+
const auto limit = header_.nNode - (allowReserved ? 0 : base_.reservedNumber);
189+
const auto head = header_.freeHead;
190+
header_.freeHead = (head + 1 == limit) ? 0 : (head + 1);
188191
return head;
189192
}
190193
void* DataAt(size_t iNode) override
@@ -368,8 +371,8 @@ class SharedBufferStrategy : public BufferStrategy {
368371

369372
public:
370373
SharedBufferStrategy(const std::string& uuid, int32_t deviceId, size_t nodeSize,
371-
size_t totalSize)
372-
: BufferStrategy(deviceId, nodeSize, totalSize), uuid_(uuid)
374+
size_t totalSize, size_t reservedNumber)
375+
: BufferStrategy(deviceId, nodeSize, totalSize, reservedNumber), uuid_(uuid)
373376
{
374377
}
375378
~SharedBufferStrategy() override
@@ -410,11 +413,12 @@ class SharedBufferStrategy : public BufferStrategy {
410413
void NodeLock(size_t iNode) override { header_->nodeLocks[iNode].Lock(); }
411414
void NodeUnlock(size_t iNode) override { header_->nodeLocks[iNode].Unlock(); }
412415
size_t& FirstAt(size_t iBucket) override { return header_->buckets[iBucket]; }
413-
size_t FetchNode() override
416+
size_t FetchNode(bool allowReserved) override
414417
{
418+
const auto limit = header_->nNode - (allowReserved ? 0 : base_.reservedNumber);
415419
header_->lock.Lock();
416-
auto iNode = header_->freeHead++;
417-
if (header_->freeHead == nNode_) { header_->freeHead = 0; }
420+
const auto iNode = header_->freeHead;
421+
header_->freeHead = (iNode + 1 == limit) ? 0 : (iNode + 1);
418422
header_->lock.Unlock();
419423
return iNode;
420424
}
@@ -424,7 +428,9 @@ class SharedBufferStrategy : public BufferStrategy {
424428

425429
class SharedBufferWatcherStrategy : public SharedBufferStrategy {
426430
public:
427-
SharedBufferWatcherStrategy(const std::string& uuid) : SharedBufferStrategy(uuid, -1, 0, 0) {}
431+
SharedBufferWatcherStrategy(const std::string& uuid) : SharedBufferStrategy(uuid, -1, 0, 0, 0)
432+
{
433+
}
428434
Status Setup() override
429435
{
430436
shmName_ = ShmPrefix() + uuid_;
@@ -462,10 +468,12 @@ Status TransBuffer::Setup(const Config& config)
462468
try {
463469
if (!config.shareBufferEnable) {
464470
strategy_ = std::make_shared<LocalBufferStrategy>(
465-
config.deviceId, config.shardSize, config.bufferCapacity, config.ioDirect);
471+
config.deviceId, config.shardSize, config.bufferCapacity,
472+
config.loadExclusiveBufferNumber, config.ioDirect);
466473
} else if (config.deviceId >= 0) {
467474
strategy_ = std::make_shared<SharedBufferStrategy>(
468-
config.uniqueId, config.deviceId, config.shardSize, config.bufferCapacity);
475+
config.uniqueId, config.deviceId, config.shardSize, config.bufferCapacity,
476+
config.loadExclusiveBufferNumber);
469477
} else {
470478
strategy_ = std::make_shared<SharedBufferWatcherStrategy>(config.uniqueId);
471479
}
@@ -475,7 +483,8 @@ Status TransBuffer::Setup(const Config& config)
475483
return strategy_->Setup();
476484
}
477485

478-
TransBuffer::Handle TransBuffer::Get(const Detail::BlockId& blockId, size_t shardIdx)
486+
TransBuffer::Handle TransBuffer::Get(const Detail::BlockId& blockId, size_t shardIdx,
487+
bool allowReserved)
479488
{
480489
auto iBucket = Hash(blockId, shardIdx);
481490
bool owner = false;
@@ -485,7 +494,7 @@ TransBuffer::Handle TransBuffer::Get(const Detail::BlockId& blockId, size_t shar
485494
strategy_->BucketUnlock(iBucket);
486495
return Handle{this, iNode, owner};
487496
}
488-
iNode = Alloc(blockId, shardIdx, iBucket);
497+
iNode = Alloc(blockId, shardIdx, iBucket, allowReserved);
489498
strategy_->BucketUnlock(iBucket);
490499
return Handle(this, iNode, true);
491500
}
@@ -536,10 +545,11 @@ size_t TransBuffer::FindAt(size_t iBucket, const Detail::BlockId& blockId, size_
536545
return iNode;
537546
}
538547

539-
size_t TransBuffer::Alloc(const Detail::BlockId& blockId, size_t shardIdx, size_t iBucket)
548+
size_t TransBuffer::Alloc(const Detail::BlockId& blockId, size_t shardIdx, size_t iBucket,
549+
bool allowReserved)
540550
{
541551
for (;;) {
542-
auto iNode = strategy_->FetchNode();
552+
auto iNode = strategy_->FetchNode(allowReserved);
543553
auto meta = strategy_->MetaAt(iNode);
544554
strategy_->NodeLock(iNode);
545555
if (meta->reference > 0) {

ucm/store/cache/cc/trans_buffer.h

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -98,13 +98,14 @@ class TransBuffer {
9898

9999
public:
100100
Status Setup(const Config& config);
101-
Handle Get(const Detail::BlockId& blockId, size_t shardIdx);
101+
Handle Get(const Detail::BlockId& blockId, size_t shardIdx, bool allowReserved = false);
102102
bool Exist(const Detail::BlockId& blockId, size_t shardIdx);
103103

104104
private:
105105
bool ExistAt(size_t iBucket, const Detail::BlockId& blockId, size_t shardIdx);
106106
size_t FindAt(size_t iBucket, const Detail::BlockId& blockId, size_t shardIdx, bool& owner);
107-
size_t Alloc(const Detail::BlockId& blockId, size_t shardIdx, size_t iBucket);
107+
size_t Alloc(const Detail::BlockId& blockId, size_t shardIdx, size_t iBucket,
108+
bool allowReserved = false);
108109
void MoveTo(size_t iBucket, size_t iNode);
109110
void Remove(size_t iBucket, size_t iNode);
110111
void* DataAt(Index pos);

ucm/store/test/case/cache/cache_trans_buffer_test.cc

Lines changed: 40 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -42,6 +42,7 @@ TEST_P(UCCacheTransBufferTest, GetFirstNode)
4242
config.bufferCapacity = config.shardSize * 32768;
4343
config.shareBufferEnable = GetParam();
4444
config.deviceId = 0;
45+
config.loadExclusiveBufferNumber = 0;
4546
auto s = transBuffer.Setup(config);
4647
ASSERT_EQ(s, UC::Status::OK());
4748
auto blockId = UC::Test::Detail::TypesHelper::MakeBlockId("a1b2c3d4e5f6789012345678901234ab");
@@ -59,6 +60,44 @@ TEST_P(UCCacheTransBufferTest, GetFirstNode)
5960
ASSERT_TRUE(handle2.Ready());
6061
}
6162

63+
TEST_P(UCCacheTransBufferTest, GetReservedNode)
64+
{
65+
UC::CacheStore::TransBuffer transBuffer;
66+
UC::CacheStore::Config config;
67+
config.uniqueId = rd.RandomString(10);
68+
config.shardSize = 32768;
69+
config.loadExclusiveBufferNumber = 16;
70+
config.bufferCapacity = config.shardSize * (config.loadExclusiveBufferNumber + 1);
71+
config.shareBufferEnable = GetParam();
72+
config.deviceId = 0;
73+
auto s = transBuffer.Setup(config);
74+
ASSERT_EQ(s, UC::Status::OK());
75+
auto blockId1 = UC::Test::Detail::TypesHelper::MakeBlockId("a1b2c3d4e5f6789012345678901234ab");
76+
auto blockId2 = UC::Test::Detail::TypesHelper::MakeBlockId("a2b2c3d4e5f6789012345678901234ab");
77+
constexpr size_t shardIdx = 0;
78+
void* ptr = nullptr;
79+
{
80+
auto handle1 = transBuffer.Get(blockId1, shardIdx);
81+
ASSERT_TRUE(handle1);
82+
ptr = handle1.Data();
83+
}
84+
{
85+
auto handle2 = transBuffer.Get(blockId2, shardIdx);
86+
ASSERT_TRUE(handle2);
87+
ASSERT_EQ(ptr, handle2.Data());
88+
}
89+
{
90+
auto handle1 = transBuffer.Get(blockId1, shardIdx, true);
91+
ASSERT_TRUE(handle1);
92+
ptr = handle1.Data();
93+
}
94+
{
95+
auto handle2 = transBuffer.Get(blockId2, shardIdx, true);
96+
ASSERT_TRUE(handle2);
97+
ASSERT_NE(ptr, handle2.Data());
98+
}
99+
}
100+
62101
TEST_P(UCCacheTransBufferTest, InsertDifferentDataRepeatedly)
63102
{
64103
constexpr size_t nBatch = 2;
@@ -71,6 +110,7 @@ TEST_P(UCCacheTransBufferTest, InsertDifferentDataRepeatedly)
71110
config.bufferCapacity = nBlock * nShard * config.shardSize;
72111
config.shareBufferEnable = GetParam();
73112
config.deviceId = 0;
113+
config.loadExclusiveBufferNumber = 0;
74114
auto s = transBuffer.Setup(config);
75115
ASSERT_EQ(s, UC::Status::OK());
76116
for (size_t iBatch = 0; iBatch < nBatch; iBatch++) {

0 commit comments

Comments
 (0)