@@ -34,7 +34,7 @@ ucp_ep_flush_request_update_uct_comp(ucp_request_t *req, int diff,
3434 "started_lanes 0x%" PRIx64 "->0x%" PRIx64 ,
3535 req -> send .ep , req -> send .state .uct_comp .count ,
3636 req -> send .state .uct_comp .count + diff ,
37- req -> send .flush .num_lanes , ucp_ep_num_lanes (req -> send .ep ),
37+ ucs_popcount ( req -> send .flush .all_lanes ) , ucp_ep_num_lanes (req -> send .ep ),
3838 req -> send .flush .started_lanes , new_started_lanes );
3939
4040 req -> send .state .uct_comp .count += diff ;
@@ -44,14 +44,20 @@ ucp_ep_flush_request_update_uct_comp(ucp_request_t *req, int diff,
4444static void ucp_ep_flush_error (ucp_request_t * req , ucp_lane_index_t lane ,
4545 ucs_status_t status )
4646{
47- ucs_log_level_t level = ucp_ep_config_err_handling_enabled (req -> send .ep ) ?
47+ ucp_ep_h ep = req -> send .ep ;
48+ ucs_log_level_t level = ucp_ep_config_err_handling_enabled (ep ) ?
4849 UCS_LOG_LEVEL_TRACE_REQ : UCS_LOG_LEVEL_ERROR ;
4950
5051 ucs_assertv (lane != UCP_NULL_LANE , "req=%p ep=%p lane=%d status=%s" ,
51- req , req -> send . ep , lane , ucs_status_string (status ));
52+ req , ep , lane , ucs_status_string (status ));
5253
5354 req -> status = status ;
5455
56+ /* Mark lane failed and reconfigure EP for failover */
57+ if (ucp_ep_err_mode_eq (ep , UCP_ERR_HANDLING_MODE_FAILOVER )) {
58+ ucp_ep_set_lanes_failed_schedule (ep , UCS_BIT (lane ), status );
59+ }
60+
5561 ucp_ep_flush_request_update_uct_comp (req , -1 , UCS_BIT (lane ));
5662
5763 ucs_log (level , "req %p: error during flush: %s" , req ,
@@ -65,37 +71,28 @@ static int ucp_ep_flush_is_completed(ucp_request_t *req)
6571
6672static void ucp_ep_flush_progress (ucp_request_t * req )
6773{
68- ucp_ep_h ep = req -> send .ep ;
69- unsigned num_lanes = ucp_ep_num_lanes (ep );
70- ucp_lane_map_t all_lanes = UCS_MASK (num_lanes );
74+ ucp_ep_h ep = req -> send .ep ;
75+ ucp_lane_map_t ep_live_lanes = ucp_ep_get_live_lanes (ep );
7176 ucp_ep_flush_state_t * flush_state ;
7277 ucp_lane_index_t lane ;
7378 ucs_status_t status ;
7479 uct_ep_h uct_ep ;
75- int diff ;
76- ucp_lane_map_t destroyed_lanes ;
80+ ucp_lane_map_t ep_destroyed_lanes ;
81+ ucp_lane_map_t ep_new_lanes ;
7782
78- ucs_assertv (!(ep -> flags & UCP_EP_FLAG_BLOCK_FLUSH ), "req=%p ep=%p" , req ,
83+ ucs_assertv (!(ep -> flags & UCP_EP_FLAG_BLOCK_FLUSH ), "req=%p ep=%p" , req ,
7984 ep );
8085
8186 /* If the number of lanes changed since flush operation was submitted, adjust
82- * the number of expected completions */
83- diff = num_lanes - req -> send .flush .num_lanes ;
84- if (ucs_unlikely (diff != 0 )) {
85- if (diff > 0 ) {
86- ucp_ep_flush_request_update_uct_comp (req , diff , 0 );
87- } else {
88- /* Some lanes that we wanted to flush were destroyed. If we already
89- started to flush them, they would be completed by discard flow,
90- so reduce completion count only by the lanes we have not started
91- to flush yet. */
92- destroyed_lanes = UCS_MASK (req -> send .flush .num_lanes ) & ~all_lanes &
93- ~req -> send .flush .started_lanes ;
94- ucp_ep_flush_request_update_uct_comp (
95- req , - ucs_popcount (destroyed_lanes ), 0 );
96- }
97-
98- req -> send .flush .num_lanes = num_lanes ;
87+ * the number of expected completions. Account for failed lanes. */
88+ if (ucs_unlikely (ep_live_lanes != req -> send .flush .all_lanes )) {
89+ ep_destroyed_lanes = req -> send .flush .all_lanes & ~ep_live_lanes ;
90+ ep_new_lanes = ep_live_lanes & ~req -> send .flush .all_lanes ;
91+ ucp_ep_flush_request_update_uct_comp (req ,
92+ ucs_popcount (ep_new_lanes ) -
93+ ucs_popcount (ep_destroyed_lanes ),
94+ 0 );
95+ req -> send .flush .all_lanes |= ep_new_lanes ;
9996 }
10097
10198 ucp_trace_req (req ,
@@ -104,10 +101,10 @@ static void ucp_ep_flush_progress(ucp_request_t *req)
104101 ep , ep -> flags , req -> send .flush .started_lanes ,
105102 req -> send .state .uct_comp .count );
106103
107- while (req -> send .flush .started_lanes < all_lanes ) {
104+ while (req -> send .flush .started_lanes != ep_live_lanes ) {
108105
109106 /* Search for next lane to start flush */
110- lane = ucs_ffs64 (all_lanes & ~req -> send .flush .started_lanes );
107+ lane = ucs_ffs64 (ep_live_lanes & ~req -> send .flush .started_lanes );
111108 uct_ep = ucp_ep_get_lane (ep , lane );
112109 if (uct_ep == NULL ) {
113110 ucp_ep_flush_request_update_uct_comp (req , -1 , UCS_BIT (lane ));
@@ -261,6 +258,7 @@ static int ucp_flush_check_completion(ucp_request_t *req)
261258{
262259 ucp_worker_h worker = req -> send .ep -> worker ;
263260 ucs_status_t status ;
261+
264262 /* Check if flushed all lanes */
265263 if (!ucp_ep_flush_is_completed (req )) {
266264 return 0 ;
@@ -364,10 +362,10 @@ ucs_status_t ucp_ep_flush_progress_pending(uct_pending_req_t *self)
364362 ucp_ep_flush_progress (req );
365363 completed = ucp_flush_check_completion (req );
366364
367- /* If the operation has not completed, and not started on all lanes, add
368- * slow-path progress to resume */
365+ /* If the operation has not completed, and not started on all alive lanes,
366+ * add slow-path progress to resume */
369367 if (!completed &&
370- (req -> send .flush .started_lanes != UCS_MASK ( ucp_ep_num_lanes ( ep )) )) {
368+ (req -> send .flush .started_lanes != req -> send . flush . all_lanes )) {
371369 ucp_ep_flush_request_resched (ep , req );
372370 }
373371
@@ -383,6 +381,32 @@ ucs_status_t ucp_ep_flush_progress_pending(uct_pending_req_t *self)
383381 return UCS_OK ;
384382}
385383
384+ static void ucp_ep_flush_request_reset (ucp_request_t * req )
385+ {
386+ ucp_lane_map_t lanes = ucp_ep_get_live_lanes (req -> send .ep );
387+
388+ req -> status = UCS_OK ;
389+ req -> send .lane = UCP_NULL_LANE ;
390+ req -> send .state .uct_comp .count = ucs_popcount (lanes );
391+ req -> send .state .uct_comp .status = UCS_OK ;
392+ req -> send .flush .all_lanes = lanes ;
393+ req -> send .flush .started_lanes = 0 ;
394+ req -> send .flush .uct_flags = req -> send .flush .uct_flags_orig ;
395+ req -> send .flush .sw_started = 0 ;
396+ req -> send .flush .sw_done = 0 ;
397+ }
398+
399+ static unsigned ucp_ep_flush_failover_oneshot_cb (void * arg )
400+ {
401+ ucp_request_t * req = arg ;
402+
403+ ucp_trace_req (req , "flush restart" );
404+ ucp_ep_flush_request_reset (req );
405+ ucp_ep_flush_progress (req );
406+ ucp_flush_check_completion (req );
407+ return 1 ;
408+ }
409+
386410void ucp_ep_flush_completion (uct_completion_t * self )
387411{
388412 ucp_request_t * req = ucs_container_of (self , ucp_request_t ,
@@ -399,6 +423,21 @@ void ucp_ep_flush_completion(uct_completion_t *self)
399423 if (status == UCS_OK ) {
400424 ucp_ep_flush_progress (req );
401425 } else {
426+ /* In case of lane failure the flush operation should be resubmitted
427+ * as well as any other outstanding requests on this EP since
428+ * the flush guarantees ordering. */
429+ if (ucp_ep_err_mode_eq (req -> send .ep , UCP_ERR_HANDLING_MODE_FAILOVER ) &&
430+ !(req -> send .flush .uct_flags_orig & UCT_FLUSH_FLAG_CANCEL ) &&
431+ !(req -> send .ep -> flags & UCP_EP_FLAG_CLOSED ) &&
432+ (ucp_ep_get_live_lanes (req -> send .ep ) != 0 )) {
433+ ucp_trace_req (req , "flush completion error: %s, scheduling failover and restart" ,
434+ ucs_status_string (status ));
435+ ucs_callbackq_add_oneshot (& req -> send .ep -> worker -> uct -> progress_q ,
436+ req , ucp_ep_flush_failover_oneshot_cb , req );
437+ ucp_worker_signal_internal (req -> send .ep -> worker );
438+ return ;
439+ }
440+
402441 /* force flush completion in case of error */
403442 req -> send .flush .sw_done = 1 ;
404443 req -> send .state .uct_comp .count = 0 ;
@@ -412,21 +451,22 @@ void ucp_ep_flush_completion(uct_completion_t *self)
412451
413452void ucp_ep_flush_request_ff (ucp_request_t * req , ucs_status_t status )
414453{
415- /* Calculate how many completions to emulate: 1 for every lane we did not
416- * start to flush yet, plus one for the lane from which we just removed
417- * this request from its pending queue
418- */
419- int num_comps = req -> send .flush .num_lanes -
420- ucs_popcount (req -> send .flush .started_lanes );
454+ const ucp_lane_map_t ff_lanes = req -> send .flush .all_lanes &
455+ ~req -> send .flush .started_lanes ;
456+ const int num_comps = ucs_popcount (ff_lanes );
421457
422458 ucp_trace_req (
423- req , "fast-forward flush, comp-=%d num_lanes %d started 0x%" PRIx64 ,
424- num_comps , req -> send .flush .num_lanes ,
459+ req , "fast-forward flush, comp-=%d live_lanes=0x%" PRIx64
460+ " started_lanes=0x%" PRIx64 , num_comps ,
461+ ucp_ep_get_live_lanes (req -> send .ep ),
425462 req -> send .flush .started_lanes );
426463
427- ucp_ep_flush_request_update_uct_comp (req , - num_comps ,
428- UCS_MASK (req -> send .flush .num_lanes ) &
429- ~req -> send .flush .started_lanes );
464+ if (!(req -> send .flush .uct_flags & UCT_FLUSH_FLAG_CANCEL )) {
465+ ucp_trace_req (req , "fast-forward flush, setting cancel flag" );
466+ req -> send .flush .uct_flags |= UCT_FLUSH_FLAG_CANCEL ;
467+ }
468+
469+ ucp_ep_flush_request_update_uct_comp (req , - num_comps , ff_lanes );
430470 uct_completion_update_status (& req -> send .state .uct_comp , status );
431471 ucp_send_request_invoke_uct_completion (req );
432472}
@@ -464,20 +504,15 @@ ucs_status_ptr_t ucp_ep_flush_internal(ucp_ep_h ep, unsigned req_flags,
464504 * flushed. req->send.flush.lanes keeps track of which lanes we still have
465505 * to start flush on.
466506 */
467- req -> flags = req_flags ;
468- req -> status = UCS_OK ;
469- req -> send .ep = ep ;
470- req -> send .flushed_cb = flushed_cb ;
471- req -> send .flush .uct_flags = uct_flags ;
472- req -> send .flush .sw_started = 0 ;
473- req -> send .flush .sw_done = 0 ;
474- req -> send .flush .num_lanes = ucp_ep_num_lanes (ep );
475- req -> send .flush .started_lanes = 0 ;
476- req -> send .lane = UCP_NULL_LANE ;
477- req -> send .uct .func = ucp_ep_flush_progress_pending ;
478- req -> send .state .uct_comp .func = ucp_ep_flush_completion ;
479- req -> send .state .uct_comp .count = ucp_ep_num_lanes (ep );
480- req -> send .state .uct_comp .status = UCS_OK ;
507+ req -> send .ep = ep ;
508+ ucp_ep_flush_request_reset (req );
509+
510+ req -> flags = req_flags ;
511+ req -> send .flushed_cb = flushed_cb ;
512+ req -> send .flush .uct_flags =
513+ req -> send .flush .uct_flags_orig = uct_flags ;
514+ req -> send .uct .func = ucp_ep_flush_progress_pending ;
515+ req -> send .state .uct_comp .func = ucp_ep_flush_completion ;
481516
482517 ucp_request_set_super (req , worker_req );
483518 ucp_request_set_send_callback_param (param , req , send );
0 commit comments