Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion contrib/test_efa.sh
Original file line number Diff line number Diff line change
Expand Up @@ -108,7 +108,7 @@ run_gtests() {
IBMOCK_FILTER="$IBMOCK_FILTER:srd/test_ucp_stream_many2one.send_worker_poll*"
IBMOCK_FILTER="$IBMOCK_FILTER:srd/test_ucp_peer_failure.*"
IBMOCK_FILTER="$IBMOCK_FILTER:srd/test_ucp_perf.envelope/*"
IBMOCK_FILTER="$IBMOCK_FILTER:*test_ucp_am_psn*"
IBMOCK_FILTER="$IBMOCK_FILTER:*test_ucp_am_psn*:*test_ucp_fault_tolerance*"

# Try the faster approach before valgrind
make -C contrib/test/gtest test \
Expand Down
7 changes: 7 additions & 0 deletions src/ucp/am/eager.inl
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,7 @@ static void ucp_am_eager_zcopy_completion(uct_completion_t *self)

ucs_assert(req->send.msg_proto.am.header.reg_desc != NULL);
ucs_mpool_put_inline(req->send.msg_proto.am.header.reg_desc);
req->send.msg_proto.am.header.reg_desc = NULL;
ucp_proto_request_zcopy_completion(self);
}

Expand All @@ -59,6 +60,11 @@ ucp_am_eager_zcopy_pack_user_header(ucp_request_t *req)
{
ucp_mem_desc_t *reg_desc;

if (req->send.msg_proto.am.internal_flags &
UCP_REQUEST_AM_FLAG_HEADER_PACKED) {
return UCS_OK;
}

/* Request must be in initial state or after @ref ucp_proto_t::reset */
ucs_assertv(req->send.msg_proto.am.header.reg_desc == NULL,
"request %p: am.header.reg_desc=%p", req,
Expand All @@ -76,6 +82,7 @@ ucp_am_eager_zcopy_pack_user_header(ucp_request_t *req)
}

req->send.msg_proto.am.header.reg_desc = reg_desc;
req->send.msg_proto.am.internal_flags |= UCP_REQUEST_AM_FLAG_HEADER_PACKED;
return UCS_OK;
}

Expand Down
41 changes: 34 additions & 7 deletions src/ucp/am/eager_multi.c
Original file line number Diff line number Diff line change
Expand Up @@ -203,7 +203,8 @@ static void ucp_am_eager_multi_zcopy_proto_probe_common(
.super.memtype_op = UCT_EP_OP_LAST,
.super.flags = UCP_PROTO_COMMON_INIT_FLAG_SEND_ZCOPY |
UCP_PROTO_COMMON_INIT_FLAG_CAP_SEG_SIZE |
UCP_PROTO_COMMON_INIT_FLAG_ERR_HANDLING,
UCP_PROTO_COMMON_INIT_FLAG_ERR_HANDLING |
UCP_PROTO_COMMON_INIT_FLAG_FAILOVER,
.super.exclude_map = 0,
.super.reg_mem_info = ucp_proto_common_select_param_mem_info(
init_params->select_param),
Expand Down Expand Up @@ -320,10 +321,14 @@ static UCS_F_ALWAYS_INLINE ucs_status_t ucp_am_eager_multi_zcopy_send_func(

static ucs_status_t ucp_am_eager_multi_zcopy_init(ucp_request_t *req)
{
ucp_am_eager_zcopy_pack_user_header(req);
ucp_proto_msg_multi_request_init(req);
ucs_status_t status;

return UCS_OK;
status = ucp_am_eager_zcopy_pack_user_header(req);
if (status != UCS_OK) {
return status;
}

return ucp_proto_msg_multi_request_init(req);
}

static ucs_status_t
Expand Down Expand Up @@ -373,6 +378,17 @@ static UCS_F_ALWAYS_INLINE ucs_status_t ucp_am_eager_multi_zcopy_psn_send_func(
UCP_AM_ID_AM_MIDDLE_PSN);
}

static void ucp_am_eager_multi_zcopy_psn_completion(uct_completion_t *self)
{
if (ucs_likely(self->status == UCS_OK)) {
ucp_am_eager_zcopy_completion(self);
} else {
/* NOTE: do not release the user header to allow the request to be
* restarted after */
ucp_proto_request_zcopy_completion(self);
}
}

static ucs_status_t
ucp_am_eager_multi_zcopy_psn_proto_progress(uct_pending_req_t *self)
{
Expand All @@ -385,11 +401,22 @@ ucp_am_eager_multi_zcopy_psn_proto_progress(uct_pending_req_t *self)
UCT_MD_MEM_ACCESS_LOCAL_READ, UCP_DT_MASK_CONTIG_IOV,
ucp_am_eager_multi_zcopy_psn_send_func,
ucp_request_invoke_uct_completion_success,
ucp_am_eager_zcopy_completion);
ucp_am_eager_multi_zcopy_psn_completion);
if (status == UCS_INPROGRESS) {
ucp_proto_am_set_middle_fragment(req);
}


return status;
}

static ucs_status_t ucp_am_eager_multi_zcopy_psn_reset(ucp_request_t *req)
{
ucs_status_t status;

status = ucp_am_proto_request_zcopy_reset(req);
ucp_datatype_iter_rewind(&req->send.state.dt_iter, UCP_DT_MASK_CONTIG_IOV);
/* Restart the request from the very first fragment */
req->send.msg_proto.am.internal_flags &= ~UCP_REQUEST_AM_FLAG_HEADER_SENT;
return status;
}

Expand All @@ -401,5 +428,5 @@ ucp_proto_t ucp_am_eager_multi_zcopy_psn_proto = {
.query = ucp_proto_multi_query,
.progress = {ucp_am_eager_multi_zcopy_psn_proto_progress},
.abort = ucp_proto_am_request_zcopy_abort,
.reset = ucp_am_proto_request_zcopy_reset
.reset = ucp_am_eager_multi_zcopy_psn_reset
};
9 changes: 7 additions & 2 deletions src/ucp/core/ucp_am.c
Original file line number Diff line number Diff line change
Expand Up @@ -328,6 +328,8 @@ ucp_am_zcopy_pack_user_header(ucp_request_t *req)
{
ucp_mem_desc_t *reg_desc;

ucs_assert(req->send.msg_proto.am.header.reg_desc == NULL);

reg_desc = ucp_worker_mpool_get(&req->send.ep->worker->reg_mp);
if (ucs_unlikely(reg_desc == NULL)) {
return UCS_ERR_NO_MEMORY;
Expand Down Expand Up @@ -616,6 +618,7 @@ static UCS_F_ALWAYS_INLINE void ucp_am_zcopy_complete_common(ucp_request_t *req)
ucs_assert(req->send.state.uct_comp.count == 0);

ucs_mpool_put_inline(req->send.msg_proto.am.header.reg_desc);
req->send.msg_proto.am.header.reg_desc = NULL;
ucp_request_send_buffer_dereg(req); /* TODO register+lane change */
}

Expand Down Expand Up @@ -1473,7 +1476,7 @@ ucp_am_handle_unfinished(ucp_worker_h worker, ucp_recv_desc_t *rdesc,
hdr = (ucp_am_hdr_t*)(first_ftr + 1);
recv_flags = ucp_am_hdr_reply_ep(worker, hdr->flags, reply_ep,
&reply_ep) |
UCP_AM_RECV_ATTR_FLAG_DATA;
UCP_AM_RECV_ATTR_FLAG_DATA;
payload = UCS_PTR_BYTE_OFFSET(first_rdesc + 1,
first_rdesc->payload_offset);
am_id = hdr->am_id;
Expand Down Expand Up @@ -1924,7 +1927,9 @@ ucs_status_t ucp_am_proto_request_zcopy_reset(ucp_request_t *request)

ucs_mpool_put_inline(request->send.msg_proto.am.header.reg_desc);
request->send.msg_proto.am.header.reg_desc = NULL;

/* reg_desc was released; next progress must repack user header */
request->send.msg_proto.am.internal_flags &=
~UCP_REQUEST_AM_FLAG_HEADER_PACKED;
return ucp_proto_request_zcopy_reset(request);
}

Expand Down
51 changes: 38 additions & 13 deletions src/ucp/core/ucp_ep.c
Original file line number Diff line number Diff line change
Expand Up @@ -1659,7 +1659,11 @@ ucp_ep_reconfig_internal(ucp_ep_h ep, ucp_lane_map_t failed_lanes)

for (lane = 0; lane < cfg_key.num_lanes; lane++) {
if (failed_lanes & UCS_BIT(lane)) {
ucs_assert(lane != UCP_NULL_LANE);
cfg_key.lanes[lane].lane_types |= UCS_BIT(UCP_LANE_TYPE_FAILED);
if (cfg_key.am_lane == lane) {
cfg_key.am_lane = UCP_NULL_LANE;
}
}

wiface = ucp_worker_iface(worker, cfg_key.lanes[lane].rsc_index);
Expand All @@ -1668,6 +1672,23 @@ ucp_ep_reconfig_internal(ucp_ep_h ep, ucp_lane_map_t failed_lanes)
cfg_key.lanes[lane].port_speed = wiface->port_speed;
}

if (cfg_key.am_lane == UCP_NULL_LANE) {
Comment thread
gleon99 marked this conversation as resolved.
for (lane = 0; lane < cfg_key.num_lanes; lane++) {
if ((cfg_key.lanes[lane].lane_types & UCS_BIT(UCP_LANE_TYPE_AM_BW)) &&
!(cfg_key.lanes[lane].lane_types & UCS_BIT(UCP_LANE_TYPE_FAILED))) {
cfg_key.am_lane = lane;
break;
}
}
}

if ((cfg_key.am_lane == UCP_NULL_LANE) &&
(ep->worker->context->config.features & UCP_FEATURE_AM)) {
ucs_diag("ep %p: AM lane not found after reconfiguration with "
"failed lanes 0x%lx", ep, failed_lanes);
return UCS_ERR_UNREACHABLE;
}

if (port_speed_changed) {
ucs_assertv(!ucp_ep_config_is_equal(&cfg_key, &ucp_ep_config(ep)->key),
"ep %p: config is not updated on port speed change", ep);
Expand Down Expand Up @@ -1706,28 +1727,32 @@ ucp_ep_failover_reconfig(ucp_ep_h ucp_ep, ucp_lane_map_t failed_lanes,
return UCS_OK;
}

ucs_status_t ucp_ep_set_lanes_failed(ucp_ep_h ucp_ep, ucp_lane_map_t lanes,
void ucp_ep_set_lanes_failed(ucp_ep_h ucp_ep, ucp_lane_map_t lanes,
ucs_status_t status)
{
const ucp_lane_index_t cm_lane = ucp_ep_get_cm_lane(ucp_ep);
ucs_status_t reconfig_status;

UCP_WORKER_THREAD_CS_CHECK_IS_BLOCKED(ucp_ep->worker);
ucs_assert(UCS_STATUS_IS_ERR(status));
ucs_assert(!ucs_async_is_from_async(&ucp_ep->worker->async));

if (!ucp_ep_err_mode_eq(ucp_ep, UCP_ERR_HANDLING_MODE_FAILOVER) ||
/* some unrecoverable error,
TODO refactor this to mark all lanes as failed */
(lanes == 0) ||
if (ucp_ep_err_mode_eq(ucp_ep, UCP_ERR_HANDLING_MODE_FAILOVER) &&
/* TODO refactor this to mark all lanes as failed */
(lanes != 0) &&
/* sockaddr is not supported for failover mode */
ucp_ep_has_cm_lane(ucp_ep)) {
return ucp_ep_set_failed(ucp_ep,
(lanes == UCS_BIT(ucp_ep_get_cm_lane(ucp_ep)) ?
ucp_ep_get_cm_lane(ucp_ep) : UCP_NULL_LANE), status);
cm_lane == UCP_NULL_LANE) {
reconfig_status = ucp_ep_failover_reconfig(ucp_ep, lanes, status);
if (reconfig_status == UCS_OK) {
return;
}
}

ucs_debug("ep %p: set_lanes_failed status %s on lanes 0x%lx", ucp_ep,
ucs_status_string(status), lanes);

return ucp_ep_failover_reconfig(ucp_ep, lanes, status);
/* else: unrecoverable error, mark the endpoint as failed. */
ucp_ep_set_failed(ucp_ep,
((cm_lane != UCP_NULL_LANE) &&
(lanes == UCS_BIT(cm_lane)) ?
cm_lane : UCP_NULL_LANE), status);
}

void ucp_ep_set_lanes_failed_schedule(ucp_ep_h ucp_ep, ucp_lane_map_t lanes,
Expand Down
5 changes: 2 additions & 3 deletions src/ucp/core/ucp_ep.h
Original file line number Diff line number Diff line change
Expand Up @@ -761,9 +761,8 @@ void ucp_ep_disconnected(ucp_ep_h ep, int force);

void ucp_ep_destroy_internal(ucp_ep_h ep);

ucs_status_t
ucp_ep_set_lanes_failed(ucp_ep_h ucp_ep, ucp_lane_map_t lanes,
ucs_status_t status);
void ucp_ep_set_lanes_failed(ucp_ep_h ucp_ep, ucp_lane_map_t lanes,
ucs_status_t status);

void ucp_ep_set_lanes_failed_schedule(ucp_ep_h ucp_ep, ucp_lane_map_t lanes,
ucs_status_t status);
Expand Down
1 change: 1 addition & 0 deletions src/ucp/core/ucp_request.h
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@ enum {
UCP_REQUEST_FLAG_COMPLETED = UCS_BIT(0),
UCP_REQUEST_FLAG_RELEASED = UCS_BIT(1),
UCP_REQUEST_FLAG_PROTO_SEND = UCS_BIT(2),
UCP_REQUEST_FLAG_MSG_ID_SET = UCS_BIT(3),
UCP_REQUEST_FLAG_SYNC_LOCAL_COMPLETED = UCS_BIT(4),
UCP_REQUEST_FLAG_SYNC_REMOTE_COMPLETED = UCS_BIT(5),
UCP_REQUEST_FLAG_CALLBACK = UCS_BIT(6),
Expand Down
3 changes: 2 additions & 1 deletion src/ucp/core/ucp_worker.c
Original file line number Diff line number Diff line change
Expand Up @@ -471,7 +471,8 @@ ucp_worker_iface_handle_uct_ep_failure(ucp_ep_h ucp_ep, ucp_lane_index_t lane,
!ucp_ep_is_local_connected(ucp_ep)) {
/* Failure on NON-AUX EP or failure on AUX EP before it sent its address
* means failure on the UCP EP */
return ucp_ep_set_lanes_failed(ucp_ep, UCS_BIT(lane), status);
ucp_ep_set_lanes_failed(ucp_ep, UCS_BIT(lane), status);
return UCS_OK;
}

if (wireup_ep->flags & UCP_WIREUP_EP_FLAG_READY) {
Expand Down
3 changes: 2 additions & 1 deletion src/ucp/proto/proto_am.inl
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,8 @@
* AM specific internal request flags
*/
enum ucp_request_am_internal_flags {
UCP_REQUEST_AM_FLAG_HEADER_SENT = UCS_BIT(0)
UCP_REQUEST_AM_FLAG_HEADER_SENT = UCS_BIT(0),
UCP_REQUEST_AM_FLAG_HEADER_PACKED = UCS_BIT(1)
};


Expand Down
21 changes: 17 additions & 4 deletions src/ucp/proto/proto_common.c
Original file line number Diff line number Diff line change
Expand Up @@ -43,9 +43,16 @@ ucp_proto_common_init_params(const ucp_proto_init_params_t *init_params)
int ucp_proto_common_init_check_err_handling(
const ucp_proto_common_init_params_t *init_params)
{
return (init_params->flags & UCP_PROTO_COMMON_INIT_FLAG_ERR_HANDLING) ||
(init_params->super.ep_config_key->err_mode ==
UCP_ERR_HANDLING_MODE_NONE);
switch (init_params->super.ep_config_key->err_mode) {
case UCP_ERR_HANDLING_MODE_NONE:
return 1;
case UCP_ERR_HANDLING_MODE_PEER:
return !!(init_params->flags & UCP_PROTO_COMMON_INIT_FLAG_ERR_HANDLING);
case UCP_ERR_HANDLING_MODE_FAILOVER:
return !!(init_params->flags & UCP_PROTO_COMMON_INIT_FLAG_FAILOVER);
default:
return 0;
}
}

static size_t
Expand Down Expand Up @@ -940,7 +947,13 @@ void ucp_proto_fatal_invalid_stage(ucp_request_t *req, const char *func_name)

ucs_status_t ucp_proto_offload_zcopy_reset(ucp_request_t *req)
{
ucp_proto_request_zcopy_reset(req);
ucs_status_t status;

status = ucp_proto_request_zcopy_reset(req);
if (status != UCS_OK) {
return status;
}

ucp_datatype_iter_rewind(&req->send.state.dt_iter, UCP_DT_MASK_ALL);
req->flags &= ~UCP_REQUEST_FLAG_PROTO_INITIALIZED;
return UCS_OK;
Expand Down
7 changes: 5 additions & 2 deletions src/ucp/proto/proto_common.h
Original file line number Diff line number Diff line change
Expand Up @@ -71,12 +71,15 @@ typedef enum {
* sending more than the remote side supports */
UCP_PROTO_COMMON_INIT_FLAG_CAP_SEG_SIZE = UCS_BIT(8),

/* Supports error handling */
/* Supports peer failure error handling mode */
UCP_PROTO_COMMON_INIT_FLAG_ERR_HANDLING = UCS_BIT(9),

/* Supports starting the request when its datatype iterator offset is > 0 */
UCP_PROTO_COMMON_INIT_FLAG_RESUME = UCS_BIT(10),
UCP_PROTO_COMMON_KEEP_MD_MAP = UCS_BIT(11)
UCP_PROTO_COMMON_KEEP_MD_MAP = UCS_BIT(11),

/* Supports failover error handling mode */
UCP_PROTO_COMMON_INIT_FLAG_FAILOVER = UCS_BIT(12)
} ucp_proto_common_init_flags_t;


Expand Down
5 changes: 5 additions & 0 deletions src/ucp/proto/proto_common.inl
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,12 @@ ucp_proto_msg_multi_request_init(ucp_request_t *req)
return UCS_OK;
}

if (req->flags & UCP_REQUEST_FLAG_MSG_ID_SET) {
return UCS_OK;
}

req->send.msg_proto.message_id = req->send.ep->worker->am_message_id++;
req->flags |= UCP_REQUEST_FLAG_MSG_ID_SET;
return UCS_OK;
}

Expand Down
3 changes: 2 additions & 1 deletion src/ucp/rma/get_offload.c
Original file line number Diff line number Diff line change
Expand Up @@ -214,7 +214,8 @@ ucp_proto_get_offload_zcopy_probe(const ucp_proto_init_params_t *init_params)
UCP_PROTO_COMMON_INIT_FLAG_REMOTE_ACCESS |
UCP_PROTO_COMMON_INIT_FLAG_RESPONSE |
UCP_PROTO_COMMON_INIT_FLAG_MIN_FRAG |
UCP_PROTO_COMMON_INIT_FLAG_ERR_HANDLING,
UCP_PROTO_COMMON_INIT_FLAG_ERR_HANDLING |
UCP_PROTO_COMMON_INIT_FLAG_FAILOVER,
.super.exclude_map = 0,
.super.reg_mem_info = ucp_proto_common_select_param_mem_info(
init_params->select_param),
Expand Down
3 changes: 2 additions & 1 deletion src/ucp/rma/put_offload.c
Original file line number Diff line number Diff line change
Expand Up @@ -309,7 +309,8 @@ ucp_proto_put_offload_zcopy_probe(const ucp_proto_init_params_t *init_params)
.super.flags = UCP_PROTO_COMMON_INIT_FLAG_SEND_ZCOPY |
UCP_PROTO_COMMON_INIT_FLAG_RECV_ZCOPY |
UCP_PROTO_COMMON_INIT_FLAG_REMOTE_ACCESS |
UCP_PROTO_COMMON_INIT_FLAG_ERR_HANDLING,
UCP_PROTO_COMMON_INIT_FLAG_ERR_HANDLING |
UCP_PROTO_COMMON_INIT_FLAG_FAILOVER,
.super.exclude_map = 0,
.super.reg_mem_info = ucp_proto_common_select_param_mem_info(
init_params->select_param),
Expand Down
12 changes: 12 additions & 0 deletions src/ucp/wireup/select.c
Original file line number Diff line number Diff line change
Expand Up @@ -2748,6 +2748,18 @@ ucp_wireup_construct_lanes(const ucp_wireup_select_params_t *select_params,
}
}

/* Failover mode: add AM type to all am_bw lanes so any can serve as
* fallback AM lane on lane failure (protocol selection requires
* UCP_LANE_TYPE_AM).
*/
if (key->err_mode == UCP_ERR_HANDLING_MODE_FAILOVER) {
for (lane = 0; lane < key->num_lanes; ++lane) {
if (key->lanes[lane].lane_types & UCS_BIT(UCP_LANE_TYPE_AM_BW)) {
key->lanes[lane].lane_types |= UCS_BIT(UCP_LANE_TYPE_AM);
}
}
}

/* Sort AM, RMA and AMO lanes according to score */
ucs_qsort_r(key->am_bw_lanes + first_am_bw_lane,
UCP_MAX_LANES - first_am_bw_lane, sizeof(ucp_lane_index_t),
Expand Down
Loading
Loading