Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 6 additions & 1 deletion src/brpc/input_messenger.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -331,7 +331,12 @@ int InputMessenger::ProcessNewMessage(
"destroyed when authentication failed";
}
}
if (!m->is_read_progressive()) {
#if BRPC_WITH_RDMA
if (!m->is_read_progressive() && !rdma::FLAGS_rdma_use_polling)
#else
if (!m->is_read_progressive())
#endif
{
// Transfer ownership to last_msg
last_msg.reset(msg.release());
} else {
Expand Down
38 changes: 24 additions & 14 deletions src/brpc/rdma/block_pool.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ DEFINE_int32(rdma_memory_pool_initial_size_mb, 1024,
"Initial size of memory pool for RDMA (MB)");
DEFINE_int32(rdma_memory_pool_increase_size_mb, 1024,
"Increased size of memory pool for RDMA (MB)");
DEFINE_int32(rdma_memory_pool_max_regions, 1, "Max number of regions");
DEFINE_int32(rdma_memory_pool_max_regions, 3, "Max number of regions");
DEFINE_int32(rdma_memory_pool_buckets, 4, "Number of buckets to reduce race");
DEFINE_int32(rdma_memory_pool_tls_cache_num, 128, "Number of cached block in tls");
DEFINE_bool(rdma_memory_pool_user_specified_memory, false,
Expand Down Expand Up @@ -96,6 +96,7 @@ struct GlobalInfo {
std::vector<IdleNode*> idle_list[BLOCK_SIZE_COUNT];
std::vector<butil::Mutex*> lock[BLOCK_SIZE_COUNT];
std::vector<size_t> idle_size[BLOCK_SIZE_COUNT];
int region_num[BLOCK_SIZE_COUNT];
butil::Mutex extend_lock;
};
static GlobalInfo* g_info = NULL;
Expand Down Expand Up @@ -128,10 +129,30 @@ uint32_t GetRegionId(const void* buf) {
return r->id;
}

// When both rdma_memory_pool_max_regions and rdma_memory_pool_buckets are
// greater than 1, dynamic memory expansion may cause concurrent modification
// issues in the memory linked list due to lock contention problems. To address
// this, we increase the region_num count for each block_type. Dynamic memory
// expansion is only permitted when both of the following conditions are met:
// rdma_memory_pool_buckets equals 1
// g_info->region_num[block_type] is less than 1
static bool CanExtendBlockRuntime(int block_type) {
return FLAGS_rdma_memory_pool_buckets == 1 ||
g_info->region_num[block_type] < 1;
}

static void* ExtendBlockPoolImpl(void* region_base, size_t region_size,
int block_type) {
if (CanExtendBlockRuntime(block_type) == false) {
LOG(INFO) << "Runtime extend memory only support one bucket or region "
"num is zero for per block_type";
free(region_base);
errno = ENOMEM;
return NULL;
}
if (g_region_num == FLAGS_rdma_memory_pool_max_regions) {
LOG(INFO) << "Memory pool reaches max regions";
free(region_base);
errno = ENOMEM;
return NULL;
}
Expand Down Expand Up @@ -167,6 +188,7 @@ static void* ExtendBlockPoolImpl(void* region_base, size_t region_size,
g_info->idle_list[block_type][i] = node[i];
g_info->idle_size[block_type][i] += node[i]->len;
}
g_info->region_num[block_type]++;

return region_base;
}
Expand Down Expand Up @@ -214,12 +236,6 @@ void* ExtendBlockPoolByUser(void* region_base, size_t region_size,
uint64_t index = butil::fast_rand() % g_buckets;
BAIDU_SCOPED_LOCK(*g_info->lock[block_type][index]);
BAIDU_SCOPED_LOCK(g_info->extend_lock);

if (g_region_num > 1 && FLAGS_rdma_memory_pool_buckets > 1) {
LOG_EVERY_SECOND(ERROR)
<< "Runtime extend memory only support single bucket";
return NULL;
}
region_size =
region_size * BYTES_IN_MB / g_block_size[block_type] / g_buckets;
region_size *= g_block_size[block_type] * g_buckets;
Expand Down Expand Up @@ -274,14 +290,7 @@ bool InitBlockPool(RegisterCallback cb) {
errno = EINVAL;
return false;
}
// runtime extend block pool only support 1 bucket
if (FLAGS_rdma_memory_pool_max_regions > 1 &&
FLAGS_rdma_memory_pool_buckets > 1) {
LOG(WARNING) << "rdma runtime extend block pool only support 1 bucket";
return false;
}
g_buckets = FLAGS_rdma_memory_pool_buckets;

g_info = new (std::nothrow) GlobalInfo;
if (!g_info) {
return false;
Expand All @@ -300,6 +309,7 @@ bool InitBlockPool(RegisterCallback cb) {
if (g_info->idle_size[i].size() != g_buckets) {
return false;
}
g_info->region_num[i] = 0;
for (size_t j = 0; j < g_buckets; ++j) {
g_info->lock[i][j] = new (std::nothrow) butil::Mutex;
if (!g_info->lock[i][j]) {
Expand Down
1 change: 1 addition & 0 deletions src/brpc/rdma/rdma_endpoint.h
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ namespace brpc {
class Socket;
namespace rdma {

DECLARE_bool(rdma_use_polling);
DECLARE_int32(rdma_poller_num);
DECLARE_bool(rdma_edisp_unsched);
DECLARE_bool(rdma_disable_bthread);
Expand Down
Loading