diff --git a/gloo/CMakeLists.txt b/gloo/CMakeLists.txt index 50ed81cba..186fe1288 100644 --- a/gloo/CMakeLists.txt +++ b/gloo/CMakeLists.txt @@ -51,11 +51,6 @@ list(APPEND GLOO_HDRS "${CMAKE_CURRENT_SOURCE_DIR}/types.h" ) -if(NOT MSVC AND NOT CMAKE_SYSTEM_PROCESSOR MATCHES "^(aarch64|arm)") - list(APPEND GLOO_SRCS "${CMAKE_CURRENT_SOURCE_DIR}/allreduce_shm.cc") - list(APPEND GLOO_HDRS "${CMAKE_CURRENT_SOURCE_DIR}/allreduce_shm.h") -endif() - if(USE_CUDA) file(GLOB GLOO_CUDA_SRCS "${CMAKE_CURRENT_SOURCE_DIR}/cuda*.cc" diff --git a/gloo/allreduce.cc b/gloo/allreduce.cc index 901d4302d..f74782818 100644 --- a/gloo/allreduce.cc +++ b/gloo/allreduce.cc @@ -12,12 +12,8 @@ #include #include -#if !defined(_WIN32) && !defined(__aarch64__) && !defined(__arm__) -#include "gloo/allreduce_shm.h" -#endif #include "gloo/common/logging.h" #include "gloo/math.h" -#include "gloo/transport/device.h" #include "gloo/types.h" namespace gloo { @@ -135,15 +131,7 @@ void allreduce(const detail::AllreduceOptionsImpl& opts) { return; } - auto algorithm = opts.algorithm; - -#if !defined(_WIN32) && !defined(__aarch64__) && !defined(__arm__) - if (context->isIntraNode() && !context->getDevice()->hasGPUDirect()) { - algorithm = detail::AllreduceOptionsImpl::SHM; - } -#endif - - switch (algorithm) { + switch (opts.algorithm) { case detail::AllreduceOptionsImpl::UNSPECIFIED: case detail::AllreduceOptionsImpl::RING: ring(opts, reduceInputs, broadcastOutputs); @@ -151,11 +139,6 @@ void allreduce(const detail::AllreduceOptionsImpl& opts) { case detail::AllreduceOptionsImpl::BCUBE: bcube(opts, reduceInputs, broadcastOutputs); break; -#if !defined(_WIN32) && !defined(__aarch64__) && !defined(__arm__) - case detail::AllreduceOptionsImpl::SHM: - shm(opts); - break; -#endif default: GLOO_ENFORCE(false, "Algorithm not handled."); } diff --git a/gloo/allreduce.h b/gloo/allreduce.h index 6f6037ede..904eb8b32 100644 --- a/gloo/allreduce.h +++ b/gloo/allreduce.h @@ -39,7 +39,6 @@ struct AllreduceOptionsImpl { UNSPECIFIED = 0, RING = 1, BCUBE = 2, - SHM = 3, }; explicit AllreduceOptionsImpl(const std::shared_ptr& context) diff --git a/gloo/allreduce_shm.cc b/gloo/allreduce_shm.cc deleted file mode 100644 index a81d376e5..000000000 --- a/gloo/allreduce_shm.cc +++ /dev/null @@ -1,515 +0,0 @@ -#include "gloo/allreduce_shm.h" - -#include -#include -#include -#include -#include -#include -#include -#include -#include -#include -#include - -namespace gloo { - -namespace { - -using ReductionFunction = AllreduceOptions::Func; -using CollState = AllreduceSharedMemoryData::CollState; -using Allreduceworkspace = AllreduceSharedMemoryData::AllreduceWorkspace; - -constexpr int VECTOR_LENGTH_IN_BYTES = 32; - -#define BUFFER0_OFFSET(current_buffer) \ - current_buffer* Allreduceworkspace::NAIVE_ALLREDUCE_THRESHOLD -#define BUFFER1_OFFSET(current_buffer) \ - 2 * Allreduceworkspace::NAIVE_ALLREDUCE_THRESHOLD + \ - current_buffer* Allreduceworkspace::MAX_BUF_SIZE - -// SHM building blocks -struct SharedData { - const char* name; - int descriptor; - void* bytes; - size_t nbytes; -}; - -void shared_open(SharedData* data, const char* name, size_t nbytes) { - int d = shm_open(name, O_RDWR, S_IRUSR | S_IWUSR); - if (d != -1) { - void* bytes = mmap(NULL, nbytes, PROT_READ | PROT_WRITE, MAP_SHARED, d, 0); - data->name = name; - data->descriptor = d; - data->bytes = bytes; - data->nbytes = nbytes; - } else { - if (errno != ENOENT) { - // don't print if shm can not be found because we want to loop over from - // caller again until the other ranks created the shm - printf("shared_open %s failed, errno=%d\n", name, errno); - } - data->descriptor = -1; - } -} - -void shared_create( - SharedData* data, - const char* name, - void* bytes, - size_t nbytes) { - int d = shm_open(name, O_CREAT | O_RDWR, S_IRUSR | S_IWUSR); - if (d != -1) { - if (nbytes = write(d, bytes, nbytes)) { - shared_open(data, name, nbytes); - } - close(d); - } else { - printf("shared_create %s failed\n", name); - } -} - -void wait_buffer_state( - CollState state0, - CollState state1, - int state_group, - std::chrono::milliseconds timeout, - std::shared_ptr shm_data) { - // Create a new thread - auto workspace = shm_data->workspace; - const int rank = shm_data->rank; - const int world_size = shm_data->world_size; - - for (int i = 0; i < world_size; i++) { - if (i == rank) { - continue; - } - volatile CollState* state_ptr = &(workspace[i]->states[state_group]); - - while (true) { - volatile CollState cur_state = *state_ptr; - if (cur_state == state0 || cur_state == state1) { - break; - } - if (shm_data->shutdown) { - return; - } - } - } - - std::unique_lock lock(shm_data->m); - shm_data->wait_done = true; - lock.unlock(); - shm_data->cv.notify_one(); -} - -void wait_buffer_state_until_2( - CollState state0, - CollState state1, - int state_group, - std::chrono::milliseconds timeout, - std::shared_ptr shm_data) { - shm_data->wait_done = false; - shm_data->shutdown = false; - - // Create wait buffer thread. - std::thread t( - wait_buffer_state, state0, state1, state_group, timeout, shm_data); - - std::unique_lock lock(shm_data->m); - auto done = - shm_data->cv.wait_for(lock, timeout, [&] { return shm_data->wait_done; }); - if (!done) { - shm_data->shutdown = true; - t.join(); - throw ::gloo::IoException(GLOO_ERROR_MSG( - "Timed out waiting", - timeout.count(), - "ms for wait buffer state operation to complete")); - } else { - t.join(); - } -} - -void reduce_all_buffers( - int start_elements, - int num_elements, - int element_size, - int to_buffer_idx, - int world_size, - char* to_buffer, - char** buffers, - ReductionFunction fn) { - size_t offset = start_elements * element_size; - memcpy(to_buffer + offset, buffers[0] + offset, num_elements * element_size); - for (int i = 1; i < world_size; i++) { - fn(to_buffer + offset, - to_buffer + offset, - buffers[i] + offset, - num_elements); - } -} - -static void parallel_memcpy(void* to, void* from, size_t n_bytes) - __attribute__((target("avx512bw"))); -static void parallel_memcpy(void* to, void* from, size_t n_bytes) { - auto aligned_bytes = n_bytes - (n_bytes % VECTOR_LENGTH_IN_BYTES); - // process aligned part -#pragma omp parallel for - for (int i = 0; i < aligned_bytes; i += VECTOR_LENGTH_IN_BYTES) { - auto val = _mm256_loadu_si256((__m256i*)((char*)from + i)); - _mm256_storeu_si256((__m256i*)((char*)to + i), val); - } - - // process remaining part - for (int i = aligned_bytes; i < n_bytes; i++) { - *((char*)to + i) = *((char*)from + i); - } -} - -size_t slice_size(size_t chunk_el, int slice_idx, int world_size) { - size_t slice_size = chunk_el / world_size; - return slice_idx == world_size - 1 ? slice_size + (chunk_el % world_size) - : slice_size; -} - -char* slice_data( - char* data_ptr, - size_t chunk_el, - int el_size, - int slice_idx, - int world_size) { - size_t slice_size = chunk_el / world_size; - size_t el_offset = slice_size * slice_idx; - return data_ptr + el_offset * el_size; -} - -size_t slice_el_start(size_t chunk_el, int slice_idx, int world_size) { - size_t slice_size = chunk_el / world_size; - return slice_size * slice_idx; -} - -void symmetric_naive_all_reduce( - char* data_ptr, - int element_size, - size_t chunk_size, - size_t chunk_el, - const detail::AllreduceOptionsImpl& opts) { - const auto& context = opts.context; - auto& shm_data = context->shmData; - const int rank = shm_data->rank; - const int world_size = shm_data->world_size; - auto symmetric_buffer = shm_data->symmetric_buffer; - auto workspace = shm_data->workspace; - auto& state_idx = shm_data->state_idx; - auto& current_buffer = shm_data->current_buffer; - - const int state_group = 0; - - CollState copy_current, copy_next; - - switch (state_idx) { - case 0: - copy_current = CollState::coll_allreduce_naive__copy_in_done; - copy_next = CollState::coll_alt1_allreduce_naive__copy_in_done; - break; - case 1: - copy_current = CollState::coll_alt1_allreduce_naive__copy_in_done; - copy_next = CollState::coll_alt2_allreduce_naive__copy_in_done; - break; - case 2: - copy_current = CollState::coll_alt2_allreduce_naive__copy_in_done; - copy_next = CollState::coll_allreduce_naive__copy_in_done; - break; - default: - assert(!"Should not get here."); - } - state_idx = (state_idx + 1) % 3; - - parallel_memcpy(symmetric_buffer[current_buffer][rank], data_ptr, chunk_size); - - std::atomic_thread_fence(std::memory_order_release); - workspace[rank]->states[state_group] = copy_current; - - wait_buffer_state_until_2( - copy_current, copy_next, state_group, opts.timeout, shm_data); - - // each rank reduce the buffer independently so therre is no need for - // synchronization afterward - reduce_all_buffers( - 0, - chunk_el, - element_size, - rank, - world_size, - data_ptr, - symmetric_buffer[current_buffer], - opts.reduce); - - // switch buffer - current_buffer = 1 - current_buffer; -} - -// naive allreduce distributed, each rank do naive reduce on its slice -void distributed_naive_reduce( - char* data_ptr, - int element_size, - size_t chunk_size, - size_t chunk_el, - const detail::AllreduceOptionsImpl& opts) { - const auto& context = opts.context; - auto& shm_data = context->shmData; - const int rank = shm_data->rank; - const int world_size = shm_data->world_size; - auto distributed_buffer = shm_data->distributed_buffer; - auto workspace = shm_data->workspace; - auto& state_idx = shm_data->state_idx; - auto& current_buffer = shm_data->current_buffer; - - const int state_group = 1; - - CollState copy_current, copy_next, reduce_current; - - // similar to symmetric_naive_allreduce, but here we only need two sets of - // states, because distributed naive reduce has two barriers in the - // algorithm - switch (state_idx) { - case 0: - copy_current = CollState::coll_allreduce_naive__copy_in_done; - reduce_current = CollState::coll_allreduce_naive__reduce_done; - copy_next = CollState::coll_alt1_allreduce_naive__copy_in_done; - break; - case 1: - copy_current = CollState::coll_alt1_allreduce_naive__copy_in_done; - reduce_current = CollState::coll_alt1_allreduce_naive__reduce_done; - copy_next = CollState::coll_allreduce_naive__copy_in_done; - break; - default: - assert(!"Should not get here."); - } - state_idx = (state_idx + 1) % 2; - - int data_size = chunk_size / chunk_el; - parallel_memcpy( - distributed_buffer[current_buffer][rank], data_ptr, chunk_size); - std::atomic_thread_fence(std::memory_order_release); - workspace[rank]->states[state_group] = copy_current; - - wait_buffer_state_until_2( - copy_current, reduce_current, state_group, opts.timeout, shm_data); - - // reduce scatter - reduce_all_buffers( - slice_el_start(chunk_el, rank, world_size), - slice_size(chunk_el, rank, world_size), - element_size, - rank, - world_size, - distributed_buffer[current_buffer][rank], - distributed_buffer[current_buffer], - opts.reduce); - std::atomic_thread_fence(std::memory_order_release); - workspace[rank]->states[state_group] = reduce_current; - - wait_buffer_state_until_2( - copy_current, reduce_current, state_group, opts.timeout, shm_data); - - for (int i = 0; i < world_size; i++) { - int rank = (i + rank) % world_size; - parallel_memcpy( - slice_data(data_ptr, chunk_el, data_size, rank, world_size), - slice_data( - distributed_buffer[current_buffer][rank], - chunk_el, - chunk_size / chunk_el, - rank, - world_size), - slice_size(chunk_el, rank, world_size) * data_size); - } - - current_buffer = 1 - current_buffer; -} - -} // namespace - -void AllreduceSharedMemoryData::initialize() { - std::string addr_string(""), port_string(""); - const auto& addr_string_env = std::getenv("MASTER_ADDR"); - if (addr_string_env != nullptr) { - addr_string = addr_string_env; - } - const auto port_string_env = std::getenv("MASTER_PORT"); - if (port_string_env != NULL) { - port_string = port_string_env; - } - - char shm_name_prefix[Allreduceworkspace::NAME_BUF_SIZE]; - char shm_name[Allreduceworkspace::NAME_BUF_SIZE]; - snprintf( - shm_name_prefix, - Allreduceworkspace::NAME_BUF_SIZE, - "%s_%d_%s_%s", - "shm_allreduce_buffer", - getsid(getpid()), - addr_string.c_str(), - port_string.c_str()); - // create shared workspace for SHM based allreduce - // allocate workspace_buf for current rank - AllreduceWorkspace* workspace_buf; - AllreduceWorkspace* workspace_buf_other; - SharedData allreduce_buffer; - cur_workspace = (AllreduceWorkspace*)malloc(sizeof(AllreduceWorkspace)); - workspace_buf = cur_workspace; - - int written = snprintf( - shm_name, - AllreduceWorkspace::NAME_BUF_SIZE, - "%s_%d", - shm_name_prefix, - rank); - if (written >= AllreduceWorkspace::NAME_BUF_SIZE) { - std::cout << "[warning]: written >= NAME_BUF_SIZE" << std::endl; - } - - shared_create( - &allreduce_buffer, shm_name, workspace_buf, sizeof(AllreduceWorkspace)); - - shm_fd.resize(world_size); - shm_fd[rank] = allreduce_buffer.descriptor; - - workspace_buf = (AllreduceWorkspace*)allreduce_buffer.bytes; - workspace_buf->states[0] = coll_alt2_allreduce_naive__copy_in_done; - workspace_buf->states[1] = coll_begin; - workspace_buf->fd = allreduce_buffer.descriptor; - strcpy(workspace_buf->name, shm_name); - shm_buffer_name = std::string(workspace_buf->name); - - // create the workspace pointer list - workspace = - (AllreduceWorkspace**)malloc(world_size * sizeof(Allreduceworkspace*)); - symmetric_buffer[0] = (char**)malloc(world_size * sizeof(char**)); - symmetric_buffer[1] = (char**)malloc(world_size * sizeof(char**)); - distributed_buffer[0] = (char**)malloc(world_size * sizeof(char**)); - distributed_buffer[1] = (char**)malloc(world_size * sizeof(char**)); - - // map shm of all ranks - for (int i = 0; i < world_size; i++) { - if (i != rank) { - int written = snprintf( - shm_name, - AllreduceWorkspace::NAME_BUF_SIZE, - "%s_%d", - shm_name_prefix, - i); - if (written >= AllreduceWorkspace::NAME_BUF_SIZE) { - std::cout << "[warning]: written >= NAME_BUF_SIZE" << std::endl; - } - - do { - shared_open(&allreduce_buffer, shm_name, sizeof(AllreduceWorkspace)); - } while (allreduce_buffer.descriptor == -1 && errno == ENOENT); - workspace_buf_other = (AllreduceWorkspace*)allreduce_buffer.bytes; - shm_fd[i] = allreduce_buffer.descriptor; - workspace[i] = workspace_buf_other; - } else { - workspace[i] = workspace_buf; - } - symmetric_buffer[0][i] = workspace[i]->buffer + BUFFER0_OFFSET(0); - symmetric_buffer[1][i] = workspace[i]->buffer + BUFFER0_OFFSET(1); - distributed_buffer[0][i] = workspace[i]->buffer + BUFFER1_OFFSET(0); - distributed_buffer[1][i] = workspace[i]->buffer + BUFFER1_OFFSET(1); - } - is_initialized = true; -} - -AllreduceSharedMemoryData::~AllreduceSharedMemoryData() { - if (is_initialized == true) { - // unlink and munmap shared memory - for (int i = 0; i < world_size; i++) { - close(shm_fd[i]); - munmap(workspace[i], sizeof(AllreduceWorkspace)); - } - - shm_unlink(shm_buffer_name.c_str()); - - free(cur_workspace); - free(workspace); - free(symmetric_buffer[0]); - free(symmetric_buffer[1]); - free(distributed_buffer[0]); - free(distributed_buffer[1]); - } -} - -void shm(const detail::AllreduceOptionsImpl& opts) { - const auto& context = opts.context; - if (context->shmData == nullptr) { - context->shmData = std::make_shared( - context->rank, context->size); - context->shmData->initialize(); - } - const size_t data_size = opts.elements * opts.elementSize; - auto& in = opts.in; - auto& out = opts.out; - - // Do local reduction - if (in.size() > 0) { - if (in.size() == 1) { - memcpy( - static_cast(out[0]->ptr), - static_cast(in[0]->ptr), - data_size); - } else { - opts.reduce( - static_cast(out[0]->ptr), - static_cast(in[0]->ptr), - static_cast(in[1]->ptr), - opts.elements); - for (size_t i = 2; i < in.size(); i++) { - opts.reduce( - static_cast(out[0]->ptr), - static_cast(out[0]->ptr), - static_cast(in[i]->ptr), - opts.elements); - } - } - } else { - for (size_t i = 1; i < out.size(); i++) { - opts.reduce( - static_cast(out[0]->ptr), - static_cast(out[0]->ptr), - static_cast(out[i]->ptr), - opts.elements); - } - } - - void* data = out[0].get()->ptr; - - for (int offset = 0; offset < data_size; - offset += Allreduceworkspace::MAX_BUF_SIZE) { - auto data_ptr = ((char*)(data) + offset); - size_t chunk_size = data_size - offset > Allreduceworkspace::MAX_BUF_SIZE - ? Allreduceworkspace::MAX_BUF_SIZE - : data_size - offset; - size_t chunk_el = chunk_size / (data_size / opts.elements); - if (chunk_size < Allreduceworkspace::NAIVE_ALLREDUCE_THRESHOLD) { - symmetric_naive_all_reduce( - data_ptr, opts.elementSize, chunk_size, chunk_el, opts); - } else { - distributed_naive_reduce( - data_ptr, opts.elementSize, chunk_size, chunk_el, opts); - } - } - - if (out.size() > 1) { - for (size_t i = 1; i < out.size(); i++) { - memcpy( - static_cast(out[i]->ptr), - static_cast(out[0]->ptr), - data_size); - } - } -} - -} // namespace gloo diff --git a/gloo/allreduce_shm.h b/gloo/allreduce_shm.h deleted file mode 100644 index a7b2f8615..000000000 --- a/gloo/allreduce_shm.h +++ /dev/null @@ -1,71 +0,0 @@ - -#pragma once - -#include -#include - -#include "gloo/allreduce.h" - -namespace gloo { - -struct AllreduceSharedMemoryData { - enum CollState { - coll_begin = 0, - coll_allreduce_naive__copy_in_done, - coll_allreduce_naive__reduce_done, - // alternative state when allreduce is working on alternative buffer - // of the double buffer. - coll_alt1_allreduce_naive__copy_in_done, - coll_alt2_allreduce_naive__copy_in_done, - coll_alt1_allreduce_naive__reduce_done, - }; - - struct AllreduceWorkspace { - static constexpr size_t MAX_BUF_SIZE = 1048576 * 32; - static constexpr size_t NAIVE_ALLREDUCE_THRESHOLD = 1048576; - static constexpr int NAME_BUF_SIZE = 1000; - - int fd; - enum CollState states[2]; // idx=0 -- state for symmetric_naive_all_reduce - // idx=1 -- state for distributed_naive_all_reduce - // double buffer to avoid syncing between rounds - // offset=0 -- 2*NAIVE_ALLREDUCE_THRESHOLD : buffer for - // symmetric_naive_all_reduce after that : buffer for - // distributed_naive_all_reduce - char name[NAME_BUF_SIZE]; - char buffer[2 * NAIVE_ALLREDUCE_THRESHOLD + 2 * MAX_BUF_SIZE]; - }; - - AllreduceSharedMemoryData(int rank, int world_size) - : rank(rank), - world_size(world_size), - current_buffer(0), - state_idx(0), - is_initialized(false) {} - ~AllreduceSharedMemoryData(); - void initialize(); - - int rank; - int world_size; - int current_buffer; - int state_idx; - bool is_initialized; - - AllreduceWorkspace* cur_workspace; - AllreduceWorkspace** workspace; - // buffer for small messages, double buffer - char** symmetric_buffer[2]; - // buffer for large messages, double buffer - char** distributed_buffer[2]; - std::vector shm_fd; - std::string shm_buffer_name; - - std::mutex m; - std::condition_variable cv; - bool wait_done; - bool shutdown; -}; - -void shm(const detail::AllreduceOptionsImpl& opts); - -} // namespace gloo diff --git a/gloo/context.cc b/gloo/context.cc index 367ad88b4..fd9b83c7b 100644 --- a/gloo/context.cc +++ b/gloo/context.cc @@ -67,8 +67,4 @@ std::chrono::milliseconds Context::getTimeout() const { return timeout_; } -bool Context::isIntraNode() const { - return transportContext_->isIntraNode(); -} - } // namespace gloo diff --git a/gloo/context.h b/gloo/context.h index c114b8866..ddd6221b7 100644 --- a/gloo/context.h +++ b/gloo/context.h @@ -24,8 +24,6 @@ class Device; class UnboundBuffer; } // namespace transport -class AllreduceSharedMemoryData; - class Context { public: Context(int rank, int size, int base = 2); @@ -35,8 +33,6 @@ class Context { const int size; int base; - std::shared_ptr shmData; - std::shared_ptr& getDevice(); std::unique_ptr& getPair(int i); @@ -56,8 +52,6 @@ class Context { std::chrono::milliseconds getTimeout() const; - bool isIntraNode() const; - std::unique_ptr deserializeRemoteKey( const std::string& serialized) { return transportContext_->deserializeRemoteKey(serialized); diff --git a/gloo/transport/context.cc b/gloo/transport/context.cc index 4b517eba5..4b4cfadbf 100644 --- a/gloo/transport/context.cc +++ b/gloo/transport/context.cc @@ -42,21 +42,18 @@ void Context::createAndConnectAllPairs(std::shared_ptr store) { const std::vector value(localHostName.begin(), localHostName.end()); store->set(localKey, value); - intraNode_ = true; for (int i = 0; i < size; i++) { if (i == rank) { break; } std::string key("rank_" + std::to_string(i)); - auto val = store->wait_get(key, getTimeout()); + auto val = store->get(key); auto hostName = std::string((const char*)val.data(), val.size()); if (hostName == localHostName) { localRank++; } - - intraNode_ = intraNode_ && hostName == localHostName; } // Create pairs diff --git a/gloo/transport/context.h b/gloo/transport/context.h index 56d8ad8ec..1733b7979 100644 --- a/gloo/transport/context.h +++ b/gloo/transport/context.h @@ -68,10 +68,6 @@ class Context { return timeout_; } - bool isIntraNode() const { - return intraNode_; - } - virtual std::unique_ptr deserializeRemoteKey( const std::string& serialized) { throw std::runtime_error("Not implemented"); @@ -102,9 +98,6 @@ class Context { // any kind of send/recv operation. std::chrono::milliseconds timeout_; - // Whether is intra-node. - bool intraNode_ = false; - std::vector extractAddress(const std::vector& allAddrs, int i) const; diff --git a/gloo/transport/tcp/context.cc b/gloo/transport/tcp/context.cc index 5b02b6340..023178741 100644 --- a/gloo/transport/tcp/context.cc +++ b/gloo/transport/tcp/context.cc @@ -123,7 +123,6 @@ void Context::createAndConnectAllPairs(std::shared_ptr store) { }); } - intraNode_ = true; // Connect every pair for (int i = 0; i < size; i++) { if (i == rank) { @@ -141,8 +140,6 @@ void Context::createAndConnectAllPairs(std::shared_ptr store) { ++localRank; } - intraNode_ = intraNode_ && (remoteRankInfo.hostname == localHostName); - const auto& pair = pairs_[i]; auto remoteDeviceAddr = Address(remoteRankInfo.addressBytes).getSockaddr();