Skip to content

Commit 7781cd8

Browse files
authored
Merge pull request #3076 from yanglimingcn/bugfix/rdma_block_pool_runtime_extend
rdma runtime extend block restriction
2 parents fc2e58c + 4533167 commit 7781cd8

3 files changed

Lines changed: 31 additions & 15 deletions

File tree

src/brpc/input_messenger.cpp

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -331,7 +331,12 @@ int InputMessenger::ProcessNewMessage(
331331
"destroyed when authentication failed";
332332
}
333333
}
334-
if (!m->is_read_progressive()) {
334+
#if BRPC_WITH_RDMA
335+
if (!m->is_read_progressive() && !rdma::FLAGS_rdma_use_polling)
336+
#else
337+
if (!m->is_read_progressive())
338+
#endif
339+
{
335340
// Transfer ownership to last_msg
336341
last_msg.reset(msg.release());
337342
} else {

src/brpc/rdma/block_pool.cpp

Lines changed: 24 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -36,7 +36,7 @@ DEFINE_int32(rdma_memory_pool_initial_size_mb, 1024,
3636
"Initial size of memory pool for RDMA (MB)");
3737
DEFINE_int32(rdma_memory_pool_increase_size_mb, 1024,
3838
"Increased size of memory pool for RDMA (MB)");
39-
DEFINE_int32(rdma_memory_pool_max_regions, 1, "Max number of regions");
39+
DEFINE_int32(rdma_memory_pool_max_regions, 3, "Max number of regions");
4040
DEFINE_int32(rdma_memory_pool_buckets, 4, "Number of buckets to reduce race");
4141
DEFINE_int32(rdma_memory_pool_tls_cache_num, 128, "Number of cached block in tls");
4242
DEFINE_bool(rdma_memory_pool_user_specified_memory, false,
@@ -96,6 +96,7 @@ struct GlobalInfo {
9696
std::vector<IdleNode*> idle_list[BLOCK_SIZE_COUNT];
9797
std::vector<butil::Mutex*> lock[BLOCK_SIZE_COUNT];
9898
std::vector<size_t> idle_size[BLOCK_SIZE_COUNT];
99+
int region_num[BLOCK_SIZE_COUNT];
99100
butil::Mutex extend_lock;
100101
};
101102
static GlobalInfo* g_info = NULL;
@@ -128,10 +129,30 @@ uint32_t GetRegionId(const void* buf) {
128129
return r->id;
129130
}
130131

132+
// When both rdma_memory_pool_max_regions and rdma_memory_pool_buckets are
133+
// greater than 1, dynamic memory expansion may cause concurrent modification
134+
// issues in the memory linked list due to lock contention problems. To address
135+
// this, we increase the region_num count for each block_type. Dynamic memory
136+
// expansion is only permitted when both of the following conditions are met:
137+
// rdma_memory_pool_buckets equals 1
138+
// g_info->region_num[block_type] is less than 1
139+
static bool CanExtendBlockRuntime(int block_type) {
140+
return FLAGS_rdma_memory_pool_buckets == 1 ||
141+
g_info->region_num[block_type] < 1;
142+
}
143+
131144
static void* ExtendBlockPoolImpl(void* region_base, size_t region_size,
132145
int block_type) {
146+
if (CanExtendBlockRuntime(block_type) == false) {
147+
LOG(INFO) << "Runtime extend memory only support one bucket or region "
148+
"num is zero for per block_type";
149+
free(region_base);
150+
errno = ENOMEM;
151+
return NULL;
152+
}
133153
if (g_region_num == FLAGS_rdma_memory_pool_max_regions) {
134154
LOG(INFO) << "Memory pool reaches max regions";
155+
free(region_base);
135156
errno = ENOMEM;
136157
return NULL;
137158
}
@@ -167,6 +188,7 @@ static void* ExtendBlockPoolImpl(void* region_base, size_t region_size,
167188
g_info->idle_list[block_type][i] = node[i];
168189
g_info->idle_size[block_type][i] += node[i]->len;
169190
}
191+
g_info->region_num[block_type]++;
170192

171193
return region_base;
172194
}
@@ -214,12 +236,6 @@ void* ExtendBlockPoolByUser(void* region_base, size_t region_size,
214236
uint64_t index = butil::fast_rand() % g_buckets;
215237
BAIDU_SCOPED_LOCK(*g_info->lock[block_type][index]);
216238
BAIDU_SCOPED_LOCK(g_info->extend_lock);
217-
218-
if (g_region_num > 1 && FLAGS_rdma_memory_pool_buckets > 1) {
219-
LOG_EVERY_SECOND(ERROR)
220-
<< "Runtime extend memory only support single bucket";
221-
return NULL;
222-
}
223239
region_size =
224240
region_size * BYTES_IN_MB / g_block_size[block_type] / g_buckets;
225241
region_size *= g_block_size[block_type] * g_buckets;
@@ -274,14 +290,7 @@ bool InitBlockPool(RegisterCallback cb) {
274290
errno = EINVAL;
275291
return false;
276292
}
277-
// runtime extend block pool only support 1 bucket
278-
if (FLAGS_rdma_memory_pool_max_regions > 1 &&
279-
FLAGS_rdma_memory_pool_buckets > 1) {
280-
LOG(WARNING) << "rdma runtime extend block pool only support 1 bucket";
281-
return false;
282-
}
283293
g_buckets = FLAGS_rdma_memory_pool_buckets;
284-
285294
g_info = new (std::nothrow) GlobalInfo;
286295
if (!g_info) {
287296
return false;
@@ -300,6 +309,7 @@ bool InitBlockPool(RegisterCallback cb) {
300309
if (g_info->idle_size[i].size() != g_buckets) {
301310
return false;
302311
}
312+
g_info->region_num[i] = 0;
303313
for (size_t j = 0; j < g_buckets; ++j) {
304314
g_info->lock[i][j] = new (std::nothrow) butil::Mutex;
305315
if (!g_info->lock[i][j]) {

src/brpc/rdma/rdma_endpoint.h

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -36,6 +36,7 @@ namespace brpc {
3636
class Socket;
3737
namespace rdma {
3838

39+
DECLARE_bool(rdma_use_polling);
3940
DECLARE_int32(rdma_poller_num);
4041
DECLARE_bool(rdma_edisp_unsched);
4142
DECLARE_bool(rdma_disable_bthread);

0 commit comments

Comments
 (0)