Skip to content

Commit 7576f38

Browse files
committed
UCP/FT: AM lane and am/zcopy/multi/psn proto failover
1 parent 4f24dbe commit 7576f38

12 files changed

Lines changed: 288 additions & 59 deletions

File tree

src/ucp/am/eager.inl

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -51,6 +51,7 @@ static void ucp_am_eager_zcopy_completion(uct_completion_t *self)
5151

5252
ucs_assert(req->send.msg_proto.am.header.reg_desc != NULL);
5353
ucs_mpool_put_inline(req->send.msg_proto.am.header.reg_desc);
54+
req->send.msg_proto.am.header.reg_desc = NULL;
5455
ucp_proto_request_zcopy_completion(self);
5556
}
5657

src/ucp/am/eager_multi.c

Lines changed: 6 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -203,7 +203,8 @@ static void ucp_am_eager_multi_zcopy_proto_probe_common(
203203
.super.memtype_op = UCT_EP_OP_LAST,
204204
.super.flags = UCP_PROTO_COMMON_INIT_FLAG_SEND_ZCOPY |
205205
UCP_PROTO_COMMON_INIT_FLAG_CAP_SEG_SIZE |
206-
UCP_PROTO_COMMON_INIT_FLAG_ERR_HANDLING,
206+
UCP_PROTO_COMMON_INIT_FLAG_ERR_HANDLING |
207+
UCP_PROTO_COMMON_INIT_FLAG_FAILOVER,
207208
.super.exclude_map = 0,
208209
.super.reg_mem_info = ucp_proto_common_select_param_mem_info(
209210
init_params->select_param),
@@ -321,9 +322,11 @@ static UCS_F_ALWAYS_INLINE ucs_status_t ucp_am_eager_multi_zcopy_send_func(
321322
static ucs_status_t ucp_am_eager_multi_zcopy_init(ucp_request_t *req)
322323
{
323324
ucp_am_eager_zcopy_pack_user_header(req);
324-
ucp_proto_msg_multi_request_init(req);
325+
if (req->send.msg_proto.am.internal_flags & UCP_REQUEST_AM_FLAG_RESET_DONE) {
326+
return UCS_OK;
327+
}
325328

326-
return UCS_OK;
329+
return ucp_proto_msg_multi_request_init(req);
327330
}
328331

329332
static ucs_status_t

src/ucp/core/ucp_am.c

Lines changed: 32 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -1899,33 +1899,51 @@ const ucp_request_send_proto_t ucp_am_reply_proto = {
18991899
.only_hdr_size = sizeof(ucp_am_hdr_t) + sizeof(ucp_am_reply_ftr_t)
19001900
};
19011901

1902-
ucs_status_t ucp_am_proto_request_zcopy_reset(ucp_request_t *request)
1902+
static void ucp_am_proto_request_zcopy_reset_header(ucp_request_t *request)
19031903
{
19041904
ucs_assert(request->send.msg_proto.am.header.reg_desc != NULL);
1905-
/* Zcopy pack function releases header if it's stored in mpool buffer
1906-
* and set copied flag to zero. Zcopy pack function is always called
1907-
* before reset function.
1908-
*/
1909-
ucs_assert(!(request->flags & UCP_REQUEST_FLAG_USER_HEADER_COPIED));
19101905

19111906
/* If user header is not guaranteed to be valid,
19121907
* use mpool buffer for storing the user header.
19131908
*/
19141909
if ((request->send.msg_proto.am.flags & UCP_AM_SEND_FLAG_COPY_HEADER) &&
19151910
(request->send.msg_proto.am.header.length != 0)) {
1916-
request->send.msg_proto.am.header.ptr = ucs_mpool_set_get_inline(
1917-
&request->send.ep->worker->am_mps,
1918-
request->send.msg_proto.am.header.length);
1919-
memcpy(request->send.msg_proto.am.header.ptr,
1920-
request->send.msg_proto.am.header.reg_desc + 1,
1921-
request->send.msg_proto.am.header.length);
1922-
request->flags |= UCP_REQUEST_FLAG_USER_HEADER_COPIED;
1911+
request->send.msg_proto.am.header.ptr = ucs_mpool_set_get_inline(
1912+
&request->send.ep->worker->am_mps,
1913+
request->send.msg_proto.am.header.length);
1914+
memcpy(request->send.msg_proto.am.header.ptr,
1915+
request->send.msg_proto.am.header.reg_desc + 1,
1916+
request->send.msg_proto.am.header.length);
1917+
request->flags |= UCP_REQUEST_FLAG_USER_HEADER_COPIED;
19231918
}
19241919

19251920
ucs_mpool_put_inline(request->send.msg_proto.am.header.reg_desc);
19261921
request->send.msg_proto.am.header.reg_desc = NULL;
19271922

1928-
return ucp_proto_request_zcopy_reset(request);
1923+
}
1924+
1925+
ucs_status_t ucp_am_proto_request_zcopy_reset(ucp_request_t *request)
1926+
{
1927+
/* Zcopy pack function releases header if it's stored in mpool buffer
1928+
* and set copied flag to zero. Zcopy pack function is always called
1929+
* before reset function.
1930+
* TODO: check the statement above, in case of failover request should
1931+
* be restarted after its (error) completion but the completion
1932+
* releases header buffer.
1933+
*/
1934+
ucs_assert(!(request->flags & UCP_REQUEST_FLAG_USER_HEADER_COPIED));
1935+
1936+
if (request->send.state.uct_comp.status != UCS_OK) {
1937+
ucs_assert(request->send.msg_proto.am.header.reg_desc == NULL);
1938+
} else {
1939+
ucp_am_proto_request_zcopy_reset_header(request);
1940+
}
1941+
1942+
request->send.msg_proto.am.internal_flags &=
1943+
~UCP_REQUEST_AM_FLAG_HEADER_SENT;
1944+
1945+
request->send.msg_proto.am.internal_flags |= UCP_REQUEST_AM_FLAG_RESET_DONE;
1946+
return ucp_proto_offload_zcopy_reset(request);
19291947
}
19301948

19311949
void ucp_proto_am_request_bcopy_abort(ucp_request_t *req, ucs_status_t status)

src/ucp/core/ucp_ep.c

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1640,6 +1640,9 @@ ucp_ep_reconfig_internal(ucp_ep_h ep, ucp_lane_map_t failed_lanes)
16401640
for (lane = 0; lane < cfg_key.num_lanes; lane++) {
16411641
if (failed_lanes & UCS_BIT(lane)) {
16421642
cfg_key.lanes[lane].lane_types |= UCS_BIT(UCP_LANE_TYPE_FAILED);
1643+
if (cfg_key.am_lane == lane) {
1644+
cfg_key.am_lane = UCP_NULL_LANE;
1645+
}
16431646
}
16441647

16451648
wiface = ucp_worker_iface(worker, cfg_key.lanes[lane].rsc_index);
@@ -1648,6 +1651,14 @@ ucp_ep_reconfig_internal(ucp_ep_h ep, ucp_lane_map_t failed_lanes)
16481651
cfg_key.lanes[lane].port_speed = wiface->port_speed;
16491652
}
16501653

1654+
for (lane = 0; lane < cfg_key.num_lanes; lane++) {
1655+
if ((cfg_key.lanes[lane].lane_types & UCS_BIT(UCP_LANE_TYPE_AM_BW)) &&
1656+
!(cfg_key.lanes[lane].lane_types & UCS_BIT(UCP_LANE_TYPE_FAILED))) {
1657+
cfg_key.am_lane = lane;
1658+
break;
1659+
}
1660+
}
1661+
16511662
if (port_speed_changed) {
16521663
ucs_assertv(!ucp_ep_config_is_equal(&cfg_key, &ucp_ep_config(ep)->key),
16531664
"ep %p: config is not updated on port speed change", ep);

src/ucp/proto/proto_am.inl

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -26,7 +26,8 @@
2626
* AM specific internal request flags
2727
*/
2828
enum ucp_request_am_internal_flags {
29-
UCP_REQUEST_AM_FLAG_HEADER_SENT = UCS_BIT(0)
29+
UCP_REQUEST_AM_FLAG_HEADER_SENT = UCS_BIT(0),
30+
UCP_REQUEST_AM_FLAG_RESET_DONE = UCS_BIT(1)
3031
};
3132

3233

src/ucp/proto/proto_common.c

Lines changed: 18 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -43,9 +43,16 @@ ucp_proto_common_init_params(const ucp_proto_init_params_t *init_params)
4343
int ucp_proto_common_init_check_err_handling(
4444
const ucp_proto_common_init_params_t *init_params)
4545
{
46-
return (init_params->flags & UCP_PROTO_COMMON_INIT_FLAG_ERR_HANDLING) ||
47-
(init_params->super.ep_config_key->err_mode ==
48-
UCP_ERR_HANDLING_MODE_NONE);
46+
switch (init_params->super.ep_config_key->err_mode) {
47+
case UCP_ERR_HANDLING_MODE_NONE:
48+
return 1;
49+
case UCP_ERR_HANDLING_MODE_PEER:
50+
return !!(init_params->flags & UCP_PROTO_COMMON_INIT_FLAG_ERR_HANDLING);
51+
case UCP_ERR_HANDLING_MODE_FAILOVER:
52+
return !!(init_params->flags & UCP_PROTO_COMMON_INIT_FLAG_FAILOVER);
53+
default:
54+
return 0;
55+
}
4956
}
5057

5158
static size_t
@@ -940,8 +947,15 @@ void ucp_proto_fatal_invalid_stage(ucp_request_t *req, const char *func_name)
940947

941948
ucs_status_t ucp_proto_offload_zcopy_reset(ucp_request_t *req)
942949
{
943-
ucp_proto_request_zcopy_reset(req);
950+
ucs_status_t status;
951+
952+
status = ucp_proto_request_zcopy_reset(req);
953+
if (status != UCS_OK) {
954+
return status;
955+
}
956+
944957
ucp_datatype_iter_rewind(&req->send.state.dt_iter, UCP_DT_MASK_ALL);
958+
//todo: check if this is needed
945959
req->flags &= ~UCP_REQUEST_FLAG_PROTO_INITIALIZED;
946960
return UCS_OK;
947961
}

src/ucp/proto/proto_common.h

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -71,12 +71,15 @@ typedef enum {
7171
* sending more than the remote side supports */
7272
UCP_PROTO_COMMON_INIT_FLAG_CAP_SEG_SIZE = UCS_BIT(8),
7373

74-
/* Supports error handling */
74+
/* Supports peer failure error handling mode */
7575
UCP_PROTO_COMMON_INIT_FLAG_ERR_HANDLING = UCS_BIT(9),
7676

7777
/* Supports starting the request when its datatype iterator offset is > 0 */
7878
UCP_PROTO_COMMON_INIT_FLAG_RESUME = UCS_BIT(10),
79-
UCP_PROTO_COMMON_KEEP_MD_MAP = UCS_BIT(11)
79+
UCP_PROTO_COMMON_KEEP_MD_MAP = UCS_BIT(11),
80+
81+
/* Supports failover error handling mode */
82+
UCP_PROTO_COMMON_INIT_FLAG_FAILOVER = UCS_BIT(12)
8083
} ucp_proto_common_init_flags_t;
8184

8285

src/ucp/rma/get_offload.c

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -214,7 +214,8 @@ ucp_proto_get_offload_zcopy_probe(const ucp_proto_init_params_t *init_params)
214214
UCP_PROTO_COMMON_INIT_FLAG_REMOTE_ACCESS |
215215
UCP_PROTO_COMMON_INIT_FLAG_RESPONSE |
216216
UCP_PROTO_COMMON_INIT_FLAG_MIN_FRAG |
217-
UCP_PROTO_COMMON_INIT_FLAG_ERR_HANDLING,
217+
UCP_PROTO_COMMON_INIT_FLAG_ERR_HANDLING |
218+
UCP_PROTO_COMMON_INIT_FLAG_FAILOVER,
218219
.super.exclude_map = 0,
219220
.super.reg_mem_info = ucp_proto_common_select_param_mem_info(
220221
init_params->select_param),

src/ucp/rma/put_offload.c

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -309,7 +309,8 @@ ucp_proto_put_offload_zcopy_probe(const ucp_proto_init_params_t *init_params)
309309
.super.flags = UCP_PROTO_COMMON_INIT_FLAG_SEND_ZCOPY |
310310
UCP_PROTO_COMMON_INIT_FLAG_RECV_ZCOPY |
311311
UCP_PROTO_COMMON_INIT_FLAG_REMOTE_ACCESS |
312-
UCP_PROTO_COMMON_INIT_FLAG_ERR_HANDLING,
312+
UCP_PROTO_COMMON_INIT_FLAG_ERR_HANDLING |
313+
UCP_PROTO_COMMON_INIT_FLAG_FAILOVER,
313314
.super.exclude_map = 0,
314315
.super.reg_mem_info = ucp_proto_common_select_param_mem_info(
315316
init_params->select_param),

src/ucp/rndv/rndv_get.c

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -126,8 +126,9 @@ ucp_proto_rndv_get_zcopy_probe(const ucp_proto_init_params_t *init_params)
126126
ucp_proto_rndv_get_common_probe(
127127
init_params, UCS_BIT(UCP_RNDV_MODE_GET_ZCOPY), SIZE_MAX,
128128
UCT_EP_OP_LAST,
129-
UCP_PROTO_COMMON_INIT_FLAG_SEND_ZCOPY |
130-
UCP_PROTO_COMMON_INIT_FLAG_ERR_HANDLING,
129+
UCP_PROTO_COMMON_INIT_FLAG_SEND_ZCOPY |
130+
UCP_PROTO_COMMON_INIT_FLAG_ERR_HANDLING |
131+
UCP_PROTO_COMMON_INIT_FLAG_FAILOVER,
131132
0, 0, &reg_mem_info);
132133
}
133134

0 commit comments

Comments
 (0)