From 8a13a8ce383439a6086db291546ed18c1ceac848 Mon Sep 17 00:00:00 2001 From: Evgeny Leksikov Date: Fri, 27 Feb 2026 14:26:57 +0200 Subject: [PATCH 1/6] UCP/FT: AM lane and am/zcopy/multi/psn proto failover --- src/ucp/am/eager.inl | 7 +++++++ src/ucp/am/eager_multi.c | 35 ++++++++++++++++++++++++++++------ src/ucp/core/ucp_am.c | 6 +++++- src/ucp/core/ucp_ep.c | 12 ++++++++++++ src/ucp/core/ucp_request.h | 1 + src/ucp/proto/proto_am.inl | 3 ++- src/ucp/proto/proto_common.c | 21 ++++++++++++++++---- src/ucp/proto/proto_common.h | 7 +++++-- src/ucp/proto/proto_common.inl | 5 +++++ src/ucp/rma/get_offload.c | 3 ++- src/ucp/rma/put_offload.c | 3 ++- src/ucp/wireup/select.c | 12 ++++++++++++ 12 files changed, 99 insertions(+), 16 deletions(-) diff --git a/src/ucp/am/eager.inl b/src/ucp/am/eager.inl index e9635aef7d9..4a7b76e0949 100644 --- a/src/ucp/am/eager.inl +++ b/src/ucp/am/eager.inl @@ -51,6 +51,7 @@ static void ucp_am_eager_zcopy_completion(uct_completion_t *self) ucs_assert(req->send.msg_proto.am.header.reg_desc != NULL); ucs_mpool_put_inline(req->send.msg_proto.am.header.reg_desc); + req->send.msg_proto.am.header.reg_desc = NULL; ucp_proto_request_zcopy_completion(self); } @@ -59,6 +60,11 @@ ucp_am_eager_zcopy_pack_user_header(ucp_request_t *req) { ucp_mem_desc_t *reg_desc; + if (req->send.msg_proto.am.internal_flags & + UCP_REQUEST_AM_FLAG_HEADER_PACKED) { + return UCS_OK; + } + /* Request must be in initial state or after @ref ucp_proto_t::reset */ ucs_assertv(req->send.msg_proto.am.header.reg_desc == NULL, "request %p: am.header.reg_desc=%p", req, @@ -76,6 +82,7 @@ ucp_am_eager_zcopy_pack_user_header(ucp_request_t *req) } req->send.msg_proto.am.header.reg_desc = reg_desc; + req->send.msg_proto.am.internal_flags |= UCP_REQUEST_AM_FLAG_HEADER_PACKED; return UCS_OK; } diff --git a/src/ucp/am/eager_multi.c b/src/ucp/am/eager_multi.c index 03809813e53..d92bfe51896 100644 --- a/src/ucp/am/eager_multi.c +++ b/src/ucp/am/eager_multi.c @@ -203,7 +203,8 @@ static void ucp_am_eager_multi_zcopy_proto_probe_common( .super.memtype_op = UCT_EP_OP_LAST, .super.flags = UCP_PROTO_COMMON_INIT_FLAG_SEND_ZCOPY | UCP_PROTO_COMMON_INIT_FLAG_CAP_SEG_SIZE | - UCP_PROTO_COMMON_INIT_FLAG_ERR_HANDLING, + UCP_PROTO_COMMON_INIT_FLAG_ERR_HANDLING | + UCP_PROTO_COMMON_INIT_FLAG_FAILOVER, .super.exclude_map = 0, .super.reg_mem_info = ucp_proto_common_select_param_mem_info( init_params->select_param), @@ -320,10 +321,14 @@ static UCS_F_ALWAYS_INLINE ucs_status_t ucp_am_eager_multi_zcopy_send_func( static ucs_status_t ucp_am_eager_multi_zcopy_init(ucp_request_t *req) { - ucp_am_eager_zcopy_pack_user_header(req); - ucp_proto_msg_multi_request_init(req); + ucs_status_t status; + + status = ucp_am_eager_zcopy_pack_user_header(req); + if (status != UCS_OK) { + return status; + } - return UCS_OK; + return ucp_proto_msg_multi_request_init(req); } static ucs_status_t @@ -373,6 +378,17 @@ static UCS_F_ALWAYS_INLINE ucs_status_t ucp_am_eager_multi_zcopy_psn_send_func( UCP_AM_ID_AM_MIDDLE_PSN); } +static void ucp_am_eager_multi_zcopy_psn_completion(uct_completion_t *self) +{ + if (ucs_likely(self->status == UCS_OK)) { + ucp_am_eager_zcopy_completion(self); + } else { + /* NOTE: do not release the user header to allow the request to be + * restarted after */ + ucp_proto_request_zcopy_completion(self); + } +} + static ucs_status_t ucp_am_eager_multi_zcopy_psn_proto_progress(uct_pending_req_t *self) { @@ -385,7 +401,7 @@ ucp_am_eager_multi_zcopy_psn_proto_progress(uct_pending_req_t *self) UCT_MD_MEM_ACCESS_LOCAL_READ, UCP_DT_MASK_CONTIG_IOV, ucp_am_eager_multi_zcopy_psn_send_func, ucp_request_invoke_uct_completion_success, - ucp_am_eager_zcopy_completion); + ucp_am_eager_multi_zcopy_psn_completion); if (status == UCS_INPROGRESS) { ucp_proto_am_set_middle_fragment(req); } @@ -393,6 +409,13 @@ ucp_am_eager_multi_zcopy_psn_proto_progress(uct_pending_req_t *self) return status; } +static ucs_status_t ucp_am_eager_multi_zcopy_psn_reset(ucp_request_t *req) +{ + ucp_datatype_iter_rewind(&req->send.state.dt_iter, UCP_DT_MASK_CONTIG_IOV); + req->send.msg_proto.am.internal_flags &= UCP_REQUEST_AM_FLAG_HEADER_PACKED; + return ucp_proto_request_zcopy_reset(req); +} + ucp_proto_t ucp_am_eager_multi_zcopy_psn_proto = { .name = "am/egr/multi/zcopy/psn", .desc = UCP_PROTO_MULTI_FRAG_DESC " " UCP_PROTO_ZCOPY_DESC " psn", @@ -401,5 +424,5 @@ ucp_proto_t ucp_am_eager_multi_zcopy_psn_proto = { .query = ucp_proto_multi_query, .progress = {ucp_am_eager_multi_zcopy_psn_proto_progress}, .abort = ucp_proto_am_request_zcopy_abort, - .reset = ucp_am_proto_request_zcopy_reset + .reset = ucp_am_eager_multi_zcopy_psn_reset }; diff --git a/src/ucp/core/ucp_am.c b/src/ucp/core/ucp_am.c index 46d739b0ffa..23ff2c9e2ac 100644 --- a/src/ucp/core/ucp_am.c +++ b/src/ucp/core/ucp_am.c @@ -328,6 +328,8 @@ ucp_am_zcopy_pack_user_header(ucp_request_t *req) { ucp_mem_desc_t *reg_desc; + ucs_assert(req->send.msg_proto.am.header.reg_desc == NULL); + reg_desc = ucp_worker_mpool_get(&req->send.ep->worker->reg_mp); if (ucs_unlikely(reg_desc == NULL)) { return UCS_ERR_NO_MEMORY; @@ -616,6 +618,7 @@ static UCS_F_ALWAYS_INLINE void ucp_am_zcopy_complete_common(ucp_request_t *req) ucs_assert(req->send.state.uct_comp.count == 0); ucs_mpool_put_inline(req->send.msg_proto.am.header.reg_desc); + req->send.msg_proto.am.header.reg_desc = NULL; ucp_request_send_buffer_dereg(req); /* TODO register+lane change */ } @@ -1473,7 +1476,7 @@ ucp_am_handle_unfinished(ucp_worker_h worker, ucp_recv_desc_t *rdesc, hdr = (ucp_am_hdr_t*)(first_ftr + 1); recv_flags = ucp_am_hdr_reply_ep(worker, hdr->flags, reply_ep, &reply_ep) | - UCP_AM_RECV_ATTR_FLAG_DATA; + UCP_AM_RECV_ATTR_FLAG_DATA; payload = UCS_PTR_BYTE_OFFSET(first_rdesc + 1, first_rdesc->payload_offset); am_id = hdr->am_id; @@ -1924,6 +1927,7 @@ ucs_status_t ucp_am_proto_request_zcopy_reset(ucp_request_t *request) ucs_mpool_put_inline(request->send.msg_proto.am.header.reg_desc); request->send.msg_proto.am.header.reg_desc = NULL; + request->send.msg_proto.am.internal_flags &= UCP_REQUEST_AM_FLAG_HEADER_PACKED; return ucp_proto_request_zcopy_reset(request); } diff --git a/src/ucp/core/ucp_ep.c b/src/ucp/core/ucp_ep.c index d2cf532359b..f44c732258e 100644 --- a/src/ucp/core/ucp_ep.c +++ b/src/ucp/core/ucp_ep.c @@ -1645,7 +1645,11 @@ ucp_ep_reconfig_internal(ucp_ep_h ep, ucp_lane_map_t failed_lanes) for (lane = 0; lane < cfg_key.num_lanes; lane++) { if (failed_lanes & UCS_BIT(lane)) { + ucs_assert(lane != UCP_NULL_LANE); cfg_key.lanes[lane].lane_types |= UCS_BIT(UCP_LANE_TYPE_FAILED); + if (cfg_key.am_lane == lane) { + cfg_key.am_lane = UCP_NULL_LANE; + } } wiface = ucp_worker_iface(worker, cfg_key.lanes[lane].rsc_index); @@ -1654,6 +1658,14 @@ ucp_ep_reconfig_internal(ucp_ep_h ep, ucp_lane_map_t failed_lanes) cfg_key.lanes[lane].port_speed = wiface->port_speed; } + for (lane = 0; lane < cfg_key.num_lanes; lane++) { + if ((cfg_key.lanes[lane].lane_types & UCS_BIT(UCP_LANE_TYPE_AM_BW)) && + !(cfg_key.lanes[lane].lane_types & UCS_BIT(UCP_LANE_TYPE_FAILED))) { + cfg_key.am_lane = lane; + break; + } + } + if (port_speed_changed) { ucs_assertv(!ucp_ep_config_is_equal(&cfg_key, &ucp_ep_config(ep)->key), "ep %p: config is not updated on port speed change", ep); diff --git a/src/ucp/core/ucp_request.h b/src/ucp/core/ucp_request.h index a820d56f1c2..005a6a5f836 100644 --- a/src/ucp/core/ucp_request.h +++ b/src/ucp/core/ucp_request.h @@ -37,6 +37,7 @@ enum { UCP_REQUEST_FLAG_COMPLETED = UCS_BIT(0), UCP_REQUEST_FLAG_RELEASED = UCS_BIT(1), UCP_REQUEST_FLAG_PROTO_SEND = UCS_BIT(2), + UCP_REQUEST_FLAG_MSG_ID_SET = UCS_BIT(3), UCP_REQUEST_FLAG_SYNC_LOCAL_COMPLETED = UCS_BIT(4), UCP_REQUEST_FLAG_SYNC_REMOTE_COMPLETED = UCS_BIT(5), UCP_REQUEST_FLAG_CALLBACK = UCS_BIT(6), diff --git a/src/ucp/proto/proto_am.inl b/src/ucp/proto/proto_am.inl index 5315cfceb5a..585915159f8 100644 --- a/src/ucp/proto/proto_am.inl +++ b/src/ucp/proto/proto_am.inl @@ -26,7 +26,8 @@ * AM specific internal request flags */ enum ucp_request_am_internal_flags { - UCP_REQUEST_AM_FLAG_HEADER_SENT = UCS_BIT(0) + UCP_REQUEST_AM_FLAG_HEADER_SENT = UCS_BIT(0), + UCP_REQUEST_AM_FLAG_HEADER_PACKED = UCS_BIT(1) }; diff --git a/src/ucp/proto/proto_common.c b/src/ucp/proto/proto_common.c index d95986dd165..1b0e24585b3 100644 --- a/src/ucp/proto/proto_common.c +++ b/src/ucp/proto/proto_common.c @@ -43,9 +43,16 @@ ucp_proto_common_init_params(const ucp_proto_init_params_t *init_params) int ucp_proto_common_init_check_err_handling( const ucp_proto_common_init_params_t *init_params) { - return (init_params->flags & UCP_PROTO_COMMON_INIT_FLAG_ERR_HANDLING) || - (init_params->super.ep_config_key->err_mode == - UCP_ERR_HANDLING_MODE_NONE); + switch (init_params->super.ep_config_key->err_mode) { + case UCP_ERR_HANDLING_MODE_NONE: + return 1; + case UCP_ERR_HANDLING_MODE_PEER: + return !!(init_params->flags & UCP_PROTO_COMMON_INIT_FLAG_ERR_HANDLING); + case UCP_ERR_HANDLING_MODE_FAILOVER: + return !!(init_params->flags & UCP_PROTO_COMMON_INIT_FLAG_FAILOVER); + default: + return 0; + } } static size_t @@ -940,7 +947,13 @@ void ucp_proto_fatal_invalid_stage(ucp_request_t *req, const char *func_name) ucs_status_t ucp_proto_offload_zcopy_reset(ucp_request_t *req) { - ucp_proto_request_zcopy_reset(req); + ucs_status_t status; + + status = ucp_proto_request_zcopy_reset(req); + if (status != UCS_OK) { + return status; + } + ucp_datatype_iter_rewind(&req->send.state.dt_iter, UCP_DT_MASK_ALL); req->flags &= ~UCP_REQUEST_FLAG_PROTO_INITIALIZED; return UCS_OK; diff --git a/src/ucp/proto/proto_common.h b/src/ucp/proto/proto_common.h index 679e2dc39a5..a3325d902af 100644 --- a/src/ucp/proto/proto_common.h +++ b/src/ucp/proto/proto_common.h @@ -71,12 +71,15 @@ typedef enum { * sending more than the remote side supports */ UCP_PROTO_COMMON_INIT_FLAG_CAP_SEG_SIZE = UCS_BIT(8), - /* Supports error handling */ + /* Supports peer failure error handling mode */ UCP_PROTO_COMMON_INIT_FLAG_ERR_HANDLING = UCS_BIT(9), /* Supports starting the request when its datatype iterator offset is > 0 */ UCP_PROTO_COMMON_INIT_FLAG_RESUME = UCS_BIT(10), - UCP_PROTO_COMMON_KEEP_MD_MAP = UCS_BIT(11) + UCP_PROTO_COMMON_KEEP_MD_MAP = UCS_BIT(11), + + /* Supports failover error handling mode */ + UCP_PROTO_COMMON_INIT_FLAG_FAILOVER = UCS_BIT(12) } ucp_proto_common_init_flags_t; diff --git a/src/ucp/proto/proto_common.inl b/src/ucp/proto/proto_common.inl index a4d4725ee0c..a43be8da992 100644 --- a/src/ucp/proto/proto_common.inl +++ b/src/ucp/proto/proto_common.inl @@ -50,7 +50,12 @@ ucp_proto_msg_multi_request_init(ucp_request_t *req) return UCS_OK; } + if (req->flags & UCP_REQUEST_FLAG_MSG_ID_SET) { + return UCS_OK; + } + req->send.msg_proto.message_id = req->send.ep->worker->am_message_id++; + req->flags |= UCP_REQUEST_FLAG_MSG_ID_SET; return UCS_OK; } diff --git a/src/ucp/rma/get_offload.c b/src/ucp/rma/get_offload.c index f14722fefba..7bec24804cf 100644 --- a/src/ucp/rma/get_offload.c +++ b/src/ucp/rma/get_offload.c @@ -214,7 +214,8 @@ ucp_proto_get_offload_zcopy_probe(const ucp_proto_init_params_t *init_params) UCP_PROTO_COMMON_INIT_FLAG_REMOTE_ACCESS | UCP_PROTO_COMMON_INIT_FLAG_RESPONSE | UCP_PROTO_COMMON_INIT_FLAG_MIN_FRAG | - UCP_PROTO_COMMON_INIT_FLAG_ERR_HANDLING, + UCP_PROTO_COMMON_INIT_FLAG_ERR_HANDLING | + UCP_PROTO_COMMON_INIT_FLAG_FAILOVER, .super.exclude_map = 0, .super.reg_mem_info = ucp_proto_common_select_param_mem_info( init_params->select_param), diff --git a/src/ucp/rma/put_offload.c b/src/ucp/rma/put_offload.c index 9516ee8ef11..e522296a893 100644 --- a/src/ucp/rma/put_offload.c +++ b/src/ucp/rma/put_offload.c @@ -309,7 +309,8 @@ ucp_proto_put_offload_zcopy_probe(const ucp_proto_init_params_t *init_params) .super.flags = UCP_PROTO_COMMON_INIT_FLAG_SEND_ZCOPY | UCP_PROTO_COMMON_INIT_FLAG_RECV_ZCOPY | UCP_PROTO_COMMON_INIT_FLAG_REMOTE_ACCESS | - UCP_PROTO_COMMON_INIT_FLAG_ERR_HANDLING, + UCP_PROTO_COMMON_INIT_FLAG_ERR_HANDLING | + UCP_PROTO_COMMON_INIT_FLAG_FAILOVER, .super.exclude_map = 0, .super.reg_mem_info = ucp_proto_common_select_param_mem_info( init_params->select_param), diff --git a/src/ucp/wireup/select.c b/src/ucp/wireup/select.c index 1111ff8317b..995b048c6d5 100644 --- a/src/ucp/wireup/select.c +++ b/src/ucp/wireup/select.c @@ -2748,6 +2748,18 @@ ucp_wireup_construct_lanes(const ucp_wireup_select_params_t *select_params, } } + /* Failover mode: add AM type to all am_bw lanes so any can serve as + * fallback AM lane on lane failure (protocol selection requires + * UCP_LANE_TYPE_AM). + */ + if (key->err_mode == UCP_ERR_HANDLING_MODE_FAILOVER) { + for (lane = 0; lane < key->num_lanes; ++lane) { + if (key->lanes[lane].lane_types & UCS_BIT(UCP_LANE_TYPE_AM_BW)) { + key->lanes[lane].lane_types |= UCS_BIT(UCP_LANE_TYPE_AM); + } + } + } + /* Sort AM, RMA and AMO lanes according to score */ ucs_qsort_r(key->am_bw_lanes + first_am_bw_lane, UCP_MAX_LANES - first_am_bw_lane, sizeof(ucp_lane_index_t), From cbe5861ebd1547debac12187c6f4ff625a59810d Mon Sep 17 00:00:00 2001 From: Evgeny Leksikov Date: Sat, 28 Feb 2026 12:28:18 +0200 Subject: [PATCH 2/6] GTEST: AM FT testing --- test/gtest/ucp/test_ucp_fault_tolerance.cc | 228 ++++++++++++++++++--- 1 file changed, 197 insertions(+), 31 deletions(-) diff --git a/test/gtest/ucp/test_ucp_fault_tolerance.cc b/test/gtest/ucp/test_ucp_fault_tolerance.cc index 7899644e80d..e7d2f0a3df8 100644 --- a/test/gtest/ucp/test_ucp_fault_tolerance.cc +++ b/test/gtest/ucp/test_ucp_fault_tolerance.cc @@ -6,8 +6,10 @@ #include "test_ucp_memheap.h" #include +#include // for std::memcpy #include #include +#include extern "C" { #include @@ -20,7 +22,7 @@ extern "C" { class test_ucp_fault_tolerance : public test_ucp_memheap { public: static void get_test_variants(std::vector& variants) { - add_variant_with_value(variants, UCP_FEATURE_RMA, 0, "rma"); + add_variant_with_value(variants, UCP_FEATURE_RMA|UCP_FEATURE_AM, 0, "rma|am"); } test_ucp_fault_tolerance() { @@ -28,6 +30,8 @@ class test_ucp_fault_tolerance : public test_ucp_memheap { } protected: + static constexpr uint16_t AM_ID = 0; + enum { GOOD_EP_INDEX = 0, /* Index for good endpoint */ INJECTED_EP_INDEX = 1 /* Index for failure-injected endpoint */ @@ -40,7 +44,8 @@ class test_ucp_fault_tolerance : public test_ucp_memheap { enum test_op_t { TEST_OP_PUT, - TEST_OP_GET + TEST_OP_GET, + TEST_OP_FLUSH }; void init() override { @@ -51,6 +56,41 @@ class test_ucp_fault_tolerance : public test_ucp_memheap { sender().connect(&receiver(), ep_params, INJECTED_EP_INDEX); receiver().connect(&sender(), ep_params, GOOD_EP_INDEX); receiver().connect(&sender(), ep_params, INJECTED_EP_INDEX); + + set_am_handler(); + } + + void set_am_handler() { + ucp_am_handler_param_t param; + param.field_mask = UCP_AM_HANDLER_PARAM_FIELD_ID | + UCP_AM_HANDLER_PARAM_FIELD_CB | + UCP_AM_HANDLER_PARAM_FIELD_ARG; + param.id = AM_ID; + param.cb = am_recv_cb; + param.arg = reinterpret_cast(this); + + ucs_status_t status = ucp_worker_set_am_recv_handler(receiver().worker(), + ¶m); + ASSERT_UCS_OK(status); + } + + static ucs_status_t am_recv_cb(void *arg, const void *header, + size_t header_length, void *data, + size_t length, + const ucp_am_recv_param_t *param) { + test_ucp_fault_tolerance *self = + reinterpret_cast(arg); + + if (param->recv_attr & UCP_AM_RECV_ATTR_FLAG_DATA) { + self->m_am_rbuf.resize(length); + std::memcpy(self->m_am_rbuf.data(), data, length); + self->m_am_received = true; + } + + EXPECT_FALSE(param->recv_attr & UCP_AM_RECV_ATTR_FLAG_RNDV) << + "RNDV is not covered yet"; + + return UCS_OK; } /** @@ -85,24 +125,80 @@ class test_ucp_fault_tolerance : public test_ucp_memheap { ++self->m_err_count; } + static void shuffle_lanes(std::vector &lanes, const std::string &lane_type) { + if (lanes.size() < 2) { + UCS_TEST_SKIP_R("At least 2 " + lane_type + "s are required, but only " + + std::to_string(lanes.size()) + " " + lane_type + "s available"); + } + + /* Allocate randomizer on heap to avoid exceeding stack frame size limits. */ + std::unique_ptr rnd_device(new std::random_device); + std::unique_ptr rng(new std::mt19937((*rnd_device)())); + std::shuffle(lanes.begin(), lanes.end(), *rng); + + for (ucp_lane_index_t lane : lanes) { + UCS_TEST_MESSAGE << lane_type << ": " << size_t(lane) << "/" << lanes.size(); + } + } + + ucp_ep_h get_ucp_ep_for_err_injection(failure_side_t failure_side) { + return (failure_side == FAILURE_SIDE_INITIATOR) ? sender().ep(0, INJECTED_EP_INDEX) : + receiver().ep(0, INJECTED_EP_INDEX); + } + /** - * Check if we have at least 2 RMA lanes, skip test if not + * Common helper function to test AM send 1KB with injected failure */ - void skip_if_insufficient_rma_lanes(ucp_ep_h ep, ucp_lane_index_t failure_lane) { - ucp_lane_index_t num_lanes = ucp_ep_num_lanes(ep); + void test_am_with_injected_failure(failure_side_t failure_side, bool flush_after = false) { + /* TODO: cover case when wireup is in progress, flush here is to complete wireup */ + flush_workers(); - if (num_lanes <= failure_lane) { - UCS_TEST_SKIP_R("Only " + std::to_string(int(num_lanes)) + " / " + std::to_string(int(failure_lane + 1)) + "lanes available"); - } else { - UCS_TEST_MESSAGE << "Endpoint has " << int(num_lanes) << " lanes, failure lane is " << int(failure_lane); + std::vector am_bw_lanes; + const ucp_lane_index_t *am_bw_lane_idx; + const ucp_lane_index_t *am_bw_lanes_key_p = + ucp_ep_config(sender().ep(0, INJECTED_EP_INDEX))->key.am_bw_lanes; + + ucs_carray_for_each(am_bw_lane_idx, am_bw_lanes_key_p, UCP_MAX_LANES) { + if (*am_bw_lane_idx != UCP_NULL_LANE) { + am_bw_lanes.push_back(*am_bw_lane_idx); + } } + + shuffle_lanes(am_bw_lanes, "AM BW lane"); + + UCS_TEST_MESSAGE << "Attempting AM send before failure injection..."; + ucs_status_t status = do_am_send_and_wait(sender().ep(0, INJECTED_EP_INDEX), am_msg_size(), + flush_after); + EXPECT_EQ(UCS_OK, status) << "AM send returned status: " << ucs_status_string(status); + + ucp_ep_h ucp_ep_for_injection = get_ucp_ep_for_err_injection(failure_side); + for (size_t lane_idx = 0; lane_idx < am_bw_lanes.size() - 1; ++lane_idx) { + ucp_lane_index_t lane = am_bw_lanes[lane_idx]; + uct_ep_h uct_ep_for_injection = ucp_ep_get_lane(ucp_ep_for_injection, lane); + status = uct_ep_invalidate(uct_ep_for_injection, 0); + if (status == UCS_ERR_UNSUPPORTED) { + UCS_TEST_SKIP_R("uct_ep_invalidate is not supported"); + } + + EXPECT_EQ(UCS_OK, status) << "uct_ep_invalidate returned status: " + << ucs_status_string(status); + + UCS_TEST_MESSAGE << "Attempting AM send after failure injection on lane " + << size_t(lane) << '/' << am_bw_lanes.size() << "..."; + status = do_am_send_and_wait(sender().ep(0, INJECTED_EP_INDEX), am_msg_size(), flush_after); + EXPECT_EQ(UCS_OK, status) << "AM send returned status: " << ucs_status_string(status); + } + + short_progress_loop(); + ASSERT_EQ(0, m_err_count) << "Error callback invoked " << m_err_count << " times"; + UCS_TEST_MESSAGE << "Success"; } /** * Common helper function to test RMA operation with injected failure */ void test_rma_with_injected_failure(failure_side_t failure_side, test_op_t op) { - const size_t size = 1 * UCS_GBYTE; + const size_t size = rma_msg_size(); const char *op_name = (op == TEST_OP_PUT) ? "PUT" : "GET"; /* TODO: cover case when wireup is in progress, flush here is to complete wireup */ @@ -118,20 +214,7 @@ class test_ucp_fault_tolerance : public test_ucp_memheap { } } - if (rma_bw_lanes.size() < 2) { - UCS_TEST_SKIP_R("At least 2 RMA BW lanes are required, but only " + - std::to_string(rma_bw_lanes.size()) + " available"); - } - - { // allocate randomizer on heap to avoid exceeding stack frame size limits - std::unique_ptr rnd_device(new std::random_device); - std::unique_ptr rng(new std::mt19937((*rnd_device)())); - std::shuffle(rma_bw_lanes.begin(), rma_bw_lanes.end(), *rng); - } - - for (ucp_lane_index_t lane : rma_bw_lanes) { - UCS_TEST_MESSAGE << "RMA BW lane: " << size_t(lane) << "/" << rma_bw_lanes.size(); - } + shuffle_lanes(rma_bw_lanes, "RMA BW lane"); mem_buffer lbuf(size, UCS_MEMORY_TYPE_HOST); mapped_buffer rbuf(size, receiver()); @@ -151,13 +234,11 @@ class test_ucp_fault_tolerance : public test_ucp_memheap { EXPECT_EQ(UCS_OK, status) << op_name << " operation returned status: " << ucs_status_string(status); - ucp_ep_h injected_ucp_ep = (failure_side == FAILURE_SIDE_INITIATOR) ? - sender().ep(0, INJECTED_EP_INDEX) : - receiver().ep(0, INJECTED_EP_INDEX); + ucp_ep_h ucp_ep_for_injection = get_ucp_ep_for_err_injection(failure_side); for (size_t lane_idx = 0; lane_idx < rma_bw_lanes.size() - 1; ++lane_idx) { ucp_lane_index_t lane = rma_bw_lanes[lane_idx]; - uct_ep_h injected_uct_ep = ucp_ep_get_lane(injected_ucp_ep, lane); - status = uct_ep_invalidate(injected_uct_ep, 0); + uct_ep_h uct_ep_for_injection = ucp_ep_get_lane(ucp_ep_for_injection, lane); + status = uct_ep_invalidate(uct_ep_for_injection, 0); if (status == UCS_ERR_UNSUPPORTED) { UCS_TEST_SKIP_R("uct_ep_invalidate is not supported"); } @@ -179,6 +260,70 @@ class test_ucp_fault_tolerance : public test_ucp_memheap { } private: + static size_t rma_msg_size() { + return ucs::limit_buffer_size((100 * UCS_MBYTE) / + ucs::test_time_multiplier()); + } + + static size_t am_msg_size() { + return ucs::limit_buffer_size(UCS_KBYTE); + } + + static std::string op_name(unsigned op_mask) + { + std::string name; + + if (op_mask & TEST_OP_PUT) { + name += "PUT|"; + } + + if (op_mask & TEST_OP_GET) { + name += "GET|"; + } + + if (op_mask & TEST_OP_FLUSH) { + name += "FLUSH|"; + } + + if (!name.empty()) { + name.pop_back(); + } + + return name; + } + + ucs_status_t do_am_send_and_wait(ucp_ep_h ep, size_t size, bool flush_after) { + m_am_received = false; + + mem_buffer sbuf(size, UCS_MEMORY_TYPE_HOST); + sbuf.pattern_fill(m_seed, size); + + ucp_request_param_t param; + param.op_attr_mask = 0; + + ucs_status_ptr_t sptr = ucp_am_send_nbx(ep, AM_ID, NULL, 0, sbuf.ptr(), + size, ¶m); + // TODO: enable flush_after when PR #11210 is merged + if (false && flush_after) { + ucs_status_t status = request_wait(ucp_ep_flush_nbx(ep, ¶m)); + if (status != UCS_OK) { + return status; + } + } + + ucs_status_t status = request_wait(sptr); + if (status != UCS_OK) { + return status; + } + + while (!m_am_received) { + short_progress_loop(); + } + + mem_buffer::pattern_check(m_am_rbuf.data(), size, m_seed); + return UCS_OK; + } + ucs_status_t do_put_and_wait(ucp_ep_h ep, mem_buffer &lbuf, mapped_buffer &rbuf, ucp_rkey_h rkey, size_t size) { ucp_request_param_t param; @@ -224,9 +369,15 @@ class test_ucp_fault_tolerance : public test_ucp_memheap { } } - size_t m_err_count = 0; - ucs_status_t m_err_status = UCS_OK; +protected: static constexpr uint64_t m_seed = 0x12345678; + + std::vector m_am_rbuf = std::vector(am_msg_size()); + volatile bool m_am_received = false; + +private: + size_t m_err_count = 0; + ucs_status_t m_err_status = UCS_OK; }; UCP_INSTANTIATE_TEST_CASE(test_ucp_fault_tolerance) @@ -250,3 +401,18 @@ UCS_TEST_P(test_ucp_fault_tolerance, get_with_target_failure) { test_rma_with_injected_failure(FAILURE_SIDE_TARGET, TEST_OP_GET); } + +UCS_TEST_P(test_ucp_fault_tolerance, am_send_with_initiator_failure, "MAX_EAGER_LANES=8", "ZCOPY_THRESH=0") +{ + test_am_with_injected_failure(FAILURE_SIDE_INITIATOR); +} + +UCS_TEST_P(test_ucp_fault_tolerance, am_send_with_target_failure, "MAX_EAGER_LANES=8", "ZCOPY_THRESH=0") +{ + test_am_with_injected_failure(FAILURE_SIDE_TARGET); +} + +UCS_TEST_P(test_ucp_fault_tolerance, am_send_flush_with_target_failure, "MAX_EAGER_LANES=8", "ZCOPY_THRESH=0") +{ + test_am_with_injected_failure(FAILURE_SIDE_TARGET, true); +} From 12d8b3cafe79479f43f3c1288b24d5dd360bf9ca Mon Sep 17 00:00:00 2001 From: Evgeny Leksikov Date: Thu, 2 Apr 2026 19:46:06 +0300 Subject: [PATCH 3/6] GTEST: refactor test_ucp_fault_tolerance according to feature --- test/gtest/ucp/test_ucp_fault_tolerance.cc | 115 ++++++++++----------- 1 file changed, 56 insertions(+), 59 deletions(-) diff --git a/test/gtest/ucp/test_ucp_fault_tolerance.cc b/test/gtest/ucp/test_ucp_fault_tolerance.cc index c8043b4d598..b6d17bd794e 100644 --- a/test/gtest/ucp/test_ucp_fault_tolerance.cc +++ b/test/gtest/ucp/test_ucp_fault_tolerance.cc @@ -20,7 +20,18 @@ extern "C" { class test_ucp_fault_tolerance : public test_ucp_memheap { public: static void get_test_variants(std::vector& variants) { - add_variant_with_value(variants, UCP_FEATURE_RMA|UCP_FEATURE_AM, 0, "rma|am"); + add_variant_with_value(variants, UCP_FEATURE_RMA, TEST_OP_PUT, + op_name(TEST_OP_PUT)); + add_variant_with_value(variants, UCP_FEATURE_RMA, TEST_OP_PUT | TEST_OP_FLUSH, + op_name(TEST_OP_PUT | TEST_OP_FLUSH)); + add_variant_with_value(variants, UCP_FEATURE_RMA, TEST_OP_GET, + op_name(TEST_OP_GET)); + add_variant_with_value(variants, UCP_FEATURE_RMA, TEST_OP_GET | TEST_OP_FLUSH, + op_name(TEST_OP_GET | TEST_OP_FLUSH)); + add_variant_with_value(variants, UCP_FEATURE_AM, TEST_OP_AM, + op_name(TEST_OP_AM)); + add_variant_with_value(variants, UCP_FEATURE_AM | UCP_FEATURE_RMA, TEST_OP_AM | TEST_OP_FLUSH, + op_name(TEST_OP_AM | TEST_OP_FLUSH)); } test_ucp_fault_tolerance() { @@ -43,7 +54,8 @@ class test_ucp_fault_tolerance : public test_ucp_memheap { enum test_op_t { TEST_OP_PUT = UCS_BIT(0), TEST_OP_GET = UCS_BIT(1), - TEST_OP_FLUSH = UCS_BIT(2) + TEST_OP_AM = UCS_BIT(2), + TEST_OP_FLUSH = UCS_BIT(3), }; void init() override { @@ -55,7 +67,9 @@ class test_ucp_fault_tolerance : public test_ucp_memheap { receiver().connect(&sender(), ep_params, GOOD_EP_INDEX); receiver().connect(&sender(), ep_params, INJECTED_EP_INDEX); - set_am_handler(); + if (get_variant_value() & TEST_OP_AM) { + set_am_handler(); + } } void set_am_handler() { @@ -67,8 +81,7 @@ class test_ucp_fault_tolerance : public test_ucp_memheap { param.cb = am_recv_cb; param.arg = reinterpret_cast(this); - ucs_status_t status = ucp_worker_set_am_recv_handler(receiver().worker(), - ¶m); + ucs_status_t status = ucp_worker_set_am_recv_handler(receiver().worker(), ¶m); ASSERT_UCS_OK(status); } @@ -126,7 +139,7 @@ class test_ucp_fault_tolerance : public test_ucp_memheap { static void shuffle_lanes(std::vector &lanes, const std::string &lane_type) { if (lanes.size() < 2) { UCS_TEST_SKIP_R("At least 2 " + lane_type + "s are required, but only " + - std::to_string(lanes.size()) + " " + lane_type + "s available"); + std::to_string(lanes.size()) + " available"); } /* Allocate randomizer on heap to avoid exceeding stack frame size limits. */ @@ -145,9 +158,11 @@ class test_ucp_fault_tolerance : public test_ucp_memheap { } /** - * Common helper function to test AM send 1KB with injected failure + * Common helper function to test AM send with injected failure */ - void test_am_with_injected_failure(failure_side_t failure_side, bool flush_after = false) { + void test_am_with_injected_failure(failure_side_t failure_side, unsigned op_mask) { + const std::string op_str = op_name(op_mask); + /* TODO: cover case when wireup is in progress, flush here is to complete wireup */ flush_workers(); @@ -164,10 +179,11 @@ class test_ucp_fault_tolerance : public test_ucp_memheap { shuffle_lanes(am_bw_lanes, "AM BW lane"); - UCS_TEST_MESSAGE << "Attempting AM send before failure injection..."; + UCS_TEST_MESSAGE << "Attempting " << op_str << " operation before failure injection..."; ucs_status_t status = do_am_send_and_wait(sender().ep(0, INJECTED_EP_INDEX), am_msg_size(), - flush_after); - EXPECT_EQ(UCS_OK, status) << "AM send returned status: " << ucs_status_string(status); + op_mask & TEST_OP_FLUSH); + EXPECT_EQ(UCS_OK, status) << op_str << " operation returned status: " + << ucs_status_string(status); ucp_ep_h ucp_ep_for_injection = get_ucp_ep_for_err_injection(failure_side); for (size_t lane_idx = 0; lane_idx < am_bw_lanes.size() - 1; ++lane_idx) { @@ -181,10 +197,13 @@ class test_ucp_fault_tolerance : public test_ucp_memheap { EXPECT_EQ(UCS_OK, status) << "uct_ep_invalidate returned status: " << ucs_status_string(status); - UCS_TEST_MESSAGE << "Attempting AM send after failure injection on lane " + UCS_TEST_MESSAGE << "Attempting " << op_str + << " operation after failure injection on lane " << size_t(lane) << '/' << am_bw_lanes.size() << "..."; - status = do_am_send_and_wait(sender().ep(0, INJECTED_EP_INDEX), am_msg_size(), flush_after); - EXPECT_EQ(UCS_OK, status) << "AM send returned status: " << ucs_status_string(status); + status = do_am_send_and_wait(sender().ep(0, INJECTED_EP_INDEX), am_msg_size(), + op_mask & TEST_OP_FLUSH); + EXPECT_EQ(UCS_OK, status) << op_str << " operation returned status: " + << ucs_status_string(status); } short_progress_loop(); @@ -196,7 +215,7 @@ class test_ucp_fault_tolerance : public test_ucp_memheap { * Common helper function to test RMA operation with injected failure */ void test_rma_with_injected_failure(failure_side_t failure_side, unsigned op_mask) { - const size_t size = 1 * UCS_GBYTE; + const size_t size = rma_msg_size(); const std::string op_str = op_name(op_mask); /* TODO: cover case when wireup is in progress, flush here is to complete wireup */ @@ -243,7 +262,8 @@ class test_ucp_fault_tolerance : public test_ucp_memheap { EXPECT_EQ(UCS_OK, status) << "uct_ep_invalidate returned status: " << ucs_status_string(status); - UCS_TEST_MESSAGE << "Attempting " << op_str << " operation after failure injection on lane " + UCS_TEST_MESSAGE << "Attempting " << op_str + << " operation after failure injection on lane " << size_t(lane) << '/' << rma_bw_lanes.size() << "..."; status = do_rma_and_wait(sender().ep(0, INJECTED_EP_INDEX), op_mask, lbuf, rbuf, rkey.get(), size); @@ -256,6 +276,17 @@ class test_ucp_fault_tolerance : public test_ucp_memheap { UCS_TEST_MESSAGE << "Success"; } + void do_test(failure_side_t failure_side) { + const unsigned op_mask = get_variant_value(); + + if (op_mask & TEST_OP_AM) { + ASSERT_FALSE(op_mask & (TEST_OP_PUT|TEST_OP_GET)); + test_am_with_injected_failure(failure_side, op_mask); + } else { + ASSERT_TRUE(op_mask & (TEST_OP_PUT|TEST_OP_GET)); + test_rma_with_injected_failure(failure_side, op_mask); + } + } private: static size_t rma_msg_size() { return ucs::limit_buffer_size((100 * UCS_MBYTE) / ucs::test_time_multiplier()); @@ -277,6 +308,10 @@ class test_ucp_fault_tolerance : public test_ucp_memheap { name += "GET|"; } + if (op_mask & TEST_OP_AM) { + name += "AM|"; + } + if (op_mask & TEST_OP_FLUSH) { name += "FLUSH|"; } @@ -311,10 +346,7 @@ class test_ucp_fault_tolerance : public test_ucp_memheap { return status; } - while (!m_am_received) { - short_progress_loop(); - } - + wait_for_value(&m_am_received, true); mem_buffer::pattern_check(m_am_rbuf.data(), size, m_seed); return UCS_OK; } @@ -389,47 +421,12 @@ class test_ucp_fault_tolerance : public test_ucp_memheap { UCP_INSTANTIATE_TEST_CASE(test_ucp_fault_tolerance) -UCS_TEST_P(test_ucp_fault_tolerance, put_with_initiator_failure) -{ - test_rma_with_injected_failure(FAILURE_SIDE_INITIATOR, TEST_OP_PUT); -} - -UCS_TEST_P(test_ucp_fault_tolerance, put_with_target_failure) -{ - test_rma_with_injected_failure(FAILURE_SIDE_TARGET, TEST_OP_PUT); -} - -UCS_TEST_P(test_ucp_fault_tolerance, put_flush_with_target_failure) -{ - test_rma_with_injected_failure(FAILURE_SIDE_TARGET, TEST_OP_PUT | TEST_OP_FLUSH); -} - -UCS_TEST_P(test_ucp_fault_tolerance, get_with_initiator_failure) -{ - test_rma_with_injected_failure(FAILURE_SIDE_INITIATOR, TEST_OP_GET); -} - -UCS_TEST_P(test_ucp_fault_tolerance, get_flush_with_initiator_failure) -{ - test_rma_with_injected_failure(FAILURE_SIDE_INITIATOR, TEST_OP_GET | TEST_OP_FLUSH); -} - -UCS_TEST_P(test_ucp_fault_tolerance, get_with_target_failure) -{ - test_rma_with_injected_failure(FAILURE_SIDE_TARGET, TEST_OP_GET); -} - -UCS_TEST_P(test_ucp_fault_tolerance, am_send_with_initiator_failure, "MAX_EAGER_LANES=8", "ZCOPY_THRESH=0") -{ - test_am_with_injected_failure(FAILURE_SIDE_INITIATOR); -} - -UCS_TEST_P(test_ucp_fault_tolerance, am_send_with_target_failure, "MAX_EAGER_LANES=8", "ZCOPY_THRESH=0") +UCS_TEST_P(test_ucp_fault_tolerance, initiator_failure, "MAX_EAGER_LANES=8") { - test_am_with_injected_failure(FAILURE_SIDE_TARGET); + do_test(FAILURE_SIDE_INITIATOR); } -UCS_TEST_P(test_ucp_fault_tolerance, am_send_flush_with_target_failure, "MAX_EAGER_LANES=8", "ZCOPY_THRESH=0") +UCS_TEST_P(test_ucp_fault_tolerance, target_failure, "MAX_EAGER_LANES=8") { - test_am_with_injected_failure(FAILURE_SIDE_TARGET, true); + do_test(FAILURE_SIDE_TARGET); } From 8d4b95c9f523836fe1ac0c5ff188bcd6dc97dfff Mon Sep 17 00:00:00 2001 From: Evgeny Leksikov Date: Thu, 2 Apr 2026 19:47:47 +0300 Subject: [PATCH 4/6] UCP/AM/FT: CR comments and refactoring after merge --- src/ucp/am/eager_multi.c | 10 ++++++--- src/ucp/core/ucp_am.c | 5 +++-- src/ucp/core/ucp_ep.c | 46 ++++++++++++++++++++++++--------------- src/ucp/core/ucp_ep.h | 5 ++--- src/ucp/core/ucp_worker.c | 3 ++- src/ucp/wireup/select.c | 2 +- 6 files changed, 44 insertions(+), 27 deletions(-) diff --git a/src/ucp/am/eager_multi.c b/src/ucp/am/eager_multi.c index d92bfe51896..ed05acf8d92 100644 --- a/src/ucp/am/eager_multi.c +++ b/src/ucp/am/eager_multi.c @@ -405,15 +405,19 @@ ucp_am_eager_multi_zcopy_psn_proto_progress(uct_pending_req_t *self) if (status == UCS_INPROGRESS) { ucp_proto_am_set_middle_fragment(req); } - + return status; } static ucs_status_t ucp_am_eager_multi_zcopy_psn_reset(ucp_request_t *req) { + ucs_status_t status; + + status = ucp_am_proto_request_zcopy_reset(req); ucp_datatype_iter_rewind(&req->send.state.dt_iter, UCP_DT_MASK_CONTIG_IOV); - req->send.msg_proto.am.internal_flags &= UCP_REQUEST_AM_FLAG_HEADER_PACKED; - return ucp_proto_request_zcopy_reset(req); + /* Restart the request from the very first fragment */ + req->send.msg_proto.am.internal_flags &= ~UCP_REQUEST_AM_FLAG_HEADER_SENT; + return status; } ucp_proto_t ucp_am_eager_multi_zcopy_psn_proto = { diff --git a/src/ucp/core/ucp_am.c b/src/ucp/core/ucp_am.c index 23ff2c9e2ac..de6e4a386d9 100644 --- a/src/ucp/core/ucp_am.c +++ b/src/ucp/core/ucp_am.c @@ -1927,8 +1927,9 @@ ucs_status_t ucp_am_proto_request_zcopy_reset(ucp_request_t *request) ucs_mpool_put_inline(request->send.msg_proto.am.header.reg_desc); request->send.msg_proto.am.header.reg_desc = NULL; - request->send.msg_proto.am.internal_flags &= UCP_REQUEST_AM_FLAG_HEADER_PACKED; - + /* reg_desc was released; next progress must repack user header */ + request->send.msg_proto.am.internal_flags &= + ~UCP_REQUEST_AM_FLAG_HEADER_PACKED; return ucp_proto_request_zcopy_reset(request); } diff --git a/src/ucp/core/ucp_ep.c b/src/ucp/core/ucp_ep.c index 8d51c44bbd0..f51ce6c0766 100644 --- a/src/ucp/core/ucp_ep.c +++ b/src/ucp/core/ucp_ep.c @@ -1672,14 +1672,23 @@ ucp_ep_reconfig_internal(ucp_ep_h ep, ucp_lane_map_t failed_lanes) cfg_key.lanes[lane].port_speed = wiface->port_speed; } - for (lane = 0; lane < cfg_key.num_lanes; lane++) { - if ((cfg_key.lanes[lane].lane_types & UCS_BIT(UCP_LANE_TYPE_AM_BW)) && - !(cfg_key.lanes[lane].lane_types & UCS_BIT(UCP_LANE_TYPE_FAILED))) { - cfg_key.am_lane = lane; - break; + if (cfg_key.am_lane == UCP_NULL_LANE) { + for (lane = 0; lane < cfg_key.num_lanes; lane++) { + if ((cfg_key.lanes[lane].lane_types & UCS_BIT(UCP_LANE_TYPE_AM_BW)) && + !(cfg_key.lanes[lane].lane_types & UCS_BIT(UCP_LANE_TYPE_FAILED))) { + cfg_key.am_lane = lane; + break; + } } } + if ((cfg_key.am_lane == UCP_NULL_LANE) && + (ep->worker->context->config.features & UCP_FEATURE_AM)) { + ucs_diag("ep %p: AM lane not found after reconfiguration with " + "failed lanes 0x%lx", ep, failed_lanes); + return UCS_ERR_UNREACHABLE; + } + if (port_speed_changed) { ucs_assertv(!ucp_ep_config_is_equal(&cfg_key, &ucp_ep_config(ep)->key), "ep %p: config is not updated on port speed change", ep); @@ -1718,28 +1727,31 @@ ucp_ep_failover_reconfig(ucp_ep_h ucp_ep, ucp_lane_map_t failed_lanes, return UCS_OK; } -ucs_status_t ucp_ep_set_lanes_failed(ucp_ep_h ucp_ep, ucp_lane_map_t lanes, +void ucp_ep_set_lanes_failed(ucp_ep_h ucp_ep, ucp_lane_map_t lanes, ucs_status_t status) { + ucs_status_t reconfig_status; + UCP_WORKER_THREAD_CS_CHECK_IS_BLOCKED(ucp_ep->worker); ucs_assert(UCS_STATUS_IS_ERR(status)); ucs_assert(!ucs_async_is_from_async(&ucp_ep->worker->async)); - if (!ucp_ep_err_mode_eq(ucp_ep, UCP_ERR_HANDLING_MODE_FAILOVER) || - /* some unrecoverable error, - TODO refactor this to mark all lanes as failed */ - (lanes == 0) || + if (ucp_ep_err_mode_eq(ucp_ep, UCP_ERR_HANDLING_MODE_FAILOVER) && + /* TODO refactor this to mark all lanes as failed */ + (lanes != 0) && /* sockaddr is not supported for failover mode */ - ucp_ep_has_cm_lane(ucp_ep)) { - return ucp_ep_set_failed(ucp_ep, - (lanes == UCS_BIT(ucp_ep_get_cm_lane(ucp_ep)) ? - ucp_ep_get_cm_lane(ucp_ep) : UCP_NULL_LANE), status); + !ucp_ep_has_cm_lane(ucp_ep)) { + reconfig_status = ucp_ep_failover_reconfig(ucp_ep, lanes, status); + if (reconfig_status == UCS_OK) { + return; + } } - ucs_debug("ep %p: set_lanes_failed status %s on lanes 0x%lx", ucp_ep, - ucs_status_string(status), lanes); + /* else: unrecoverable error, mark the endpoint as failed. */ + ucp_ep_set_failed(ucp_ep, + (lanes == UCS_BIT(ucp_ep_get_cm_lane(ucp_ep)) ? + ucp_ep_get_cm_lane(ucp_ep) : UCP_NULL_LANE), status); - return ucp_ep_failover_reconfig(ucp_ep, lanes, status); } void ucp_ep_set_lanes_failed_schedule(ucp_ep_h ucp_ep, ucp_lane_map_t lanes, diff --git a/src/ucp/core/ucp_ep.h b/src/ucp/core/ucp_ep.h index a66831d5f93..59348019e44 100644 --- a/src/ucp/core/ucp_ep.h +++ b/src/ucp/core/ucp_ep.h @@ -761,9 +761,8 @@ void ucp_ep_disconnected(ucp_ep_h ep, int force); void ucp_ep_destroy_internal(ucp_ep_h ep); -ucs_status_t -ucp_ep_set_lanes_failed(ucp_ep_h ucp_ep, ucp_lane_map_t lanes, - ucs_status_t status); +void ucp_ep_set_lanes_failed(ucp_ep_h ucp_ep, ucp_lane_map_t lanes, + ucs_status_t status); void ucp_ep_set_lanes_failed_schedule(ucp_ep_h ucp_ep, ucp_lane_map_t lanes, ucs_status_t status); diff --git a/src/ucp/core/ucp_worker.c b/src/ucp/core/ucp_worker.c index c2159a0a74f..da08bec82f9 100644 --- a/src/ucp/core/ucp_worker.c +++ b/src/ucp/core/ucp_worker.c @@ -471,7 +471,8 @@ ucp_worker_iface_handle_uct_ep_failure(ucp_ep_h ucp_ep, ucp_lane_index_t lane, !ucp_ep_is_local_connected(ucp_ep)) { /* Failure on NON-AUX EP or failure on AUX EP before it sent its address * means failure on the UCP EP */ - return ucp_ep_set_lanes_failed(ucp_ep, UCS_BIT(lane), status); + ucp_ep_set_lanes_failed(ucp_ep, UCS_BIT(lane), status); + return UCS_OK; } if (wireup_ep->flags & UCP_WIREUP_EP_FLAG_READY) { diff --git a/src/ucp/wireup/select.c b/src/ucp/wireup/select.c index 995b048c6d5..b6b1572aaa5 100644 --- a/src/ucp/wireup/select.c +++ b/src/ucp/wireup/select.c @@ -2752,7 +2752,7 @@ ucp_wireup_construct_lanes(const ucp_wireup_select_params_t *select_params, * fallback AM lane on lane failure (protocol selection requires * UCP_LANE_TYPE_AM). */ - if (key->err_mode == UCP_ERR_HANDLING_MODE_FAILOVER) { + if (key->err_mode == UCP_ERR_HANDLING_MODE_FAILOVER) { for (lane = 0; lane < key->num_lanes; ++lane) { if (key->lanes[lane].lane_types & UCS_BIT(UCP_LANE_TYPE_AM_BW)) { key->lanes[lane].lane_types |= UCS_BIT(UCP_LANE_TYPE_AM); From 95a533bbff798f68b7b08fceaab538747b14af84 Mon Sep 17 00:00:00 2001 From: Evgeny Leksikov Date: Thu, 2 Apr 2026 22:46:33 +0300 Subject: [PATCH 5/6] UCP/FT: fix UB for cm_lane(NULL) bitwise shift --- src/ucp/core/ucp_ep.c | 9 +++++---- 1 file changed, 5 insertions(+), 4 deletions(-) diff --git a/src/ucp/core/ucp_ep.c b/src/ucp/core/ucp_ep.c index f51ce6c0766..03004f55c18 100644 --- a/src/ucp/core/ucp_ep.c +++ b/src/ucp/core/ucp_ep.c @@ -1730,6 +1730,7 @@ ucp_ep_failover_reconfig(ucp_ep_h ucp_ep, ucp_lane_map_t failed_lanes, void ucp_ep_set_lanes_failed(ucp_ep_h ucp_ep, ucp_lane_map_t lanes, ucs_status_t status) { + const ucp_lane_index_t cm_lane = ucp_ep_get_cm_lane(ucp_ep); ucs_status_t reconfig_status; UCP_WORKER_THREAD_CS_CHECK_IS_BLOCKED(ucp_ep->worker); @@ -1740,7 +1741,7 @@ void ucp_ep_set_lanes_failed(ucp_ep_h ucp_ep, ucp_lane_map_t lanes, /* TODO refactor this to mark all lanes as failed */ (lanes != 0) && /* sockaddr is not supported for failover mode */ - !ucp_ep_has_cm_lane(ucp_ep)) { + cm_lane == UCP_NULL_LANE) { reconfig_status = ucp_ep_failover_reconfig(ucp_ep, lanes, status); if (reconfig_status == UCS_OK) { return; @@ -1749,9 +1750,9 @@ void ucp_ep_set_lanes_failed(ucp_ep_h ucp_ep, ucp_lane_map_t lanes, /* else: unrecoverable error, mark the endpoint as failed. */ ucp_ep_set_failed(ucp_ep, - (lanes == UCS_BIT(ucp_ep_get_cm_lane(ucp_ep)) ? - ucp_ep_get_cm_lane(ucp_ep) : UCP_NULL_LANE), status); - + ((cm_lane != UCP_NULL_LANE) && + (lanes == UCS_BIT(cm_lane)) ? + cm_lane : UCP_NULL_LANE), status); } void ucp_ep_set_lanes_failed_schedule(ucp_ep_h ucp_ep, ucp_lane_map_t lanes, From 274f81005ac46feb2e270ffb3c551c5ae95f2784 Mon Sep 17 00:00:00 2001 From: Evgeny Leksikov Date: Tue, 7 Apr 2026 11:54:03 +0300 Subject: [PATCH 6/6] TEST: disable test_ucp_fault_tolerance for efa --- contrib/test_efa.sh | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/contrib/test_efa.sh b/contrib/test_efa.sh index 0c7d734d42a..fba4eccbe70 100755 --- a/contrib/test_efa.sh +++ b/contrib/test_efa.sh @@ -108,7 +108,7 @@ run_gtests() { IBMOCK_FILTER="$IBMOCK_FILTER:srd/test_ucp_stream_many2one.send_worker_poll*" IBMOCK_FILTER="$IBMOCK_FILTER:srd/test_ucp_peer_failure.*" IBMOCK_FILTER="$IBMOCK_FILTER:srd/test_ucp_perf.envelope/*" - IBMOCK_FILTER="$IBMOCK_FILTER:*test_ucp_am_psn*" + IBMOCK_FILTER="$IBMOCK_FILTER:*test_ucp_am_psn*:*test_ucp_fault_tolerance*" # Try the faster approach before valgrind make -C contrib/test/gtest test \