CORE: add global init status check#1303
Conversation
|
| Filename | Overview |
|---|---|
| src/core/ucc_coll.c | Restructures post-init flow so ucc_service_dt_check is always called for gather/scatter when the feature is enabled, passing local_status even on init failure; error-path routing to coll_finalize vs free_scratch appears correct but is intricate. |
| src/core/ucc_service_coll.c | Core of the change: extends the allreduce payload to 6 slots, adds global init-status check in ucc_dt_validate_results, reworks force-completion paths in both wrapper post functions. One defensive NULL check in ucc_dt_check_actual_wrapper_progress sets error status without forcing schedule completion. |
| src/core/ucc_service_coll.h | Updated docstring and signature for ucc_service_dt_check to include coll_args and local_status parameters; accurately reflects the must-call-on-every-rank contract. |
| src/schedule/ucc_schedule.h | Grows values array from 4 to 6 elements and adds ar_status field to ucc_dt_check_state_t; struct layout change is backward-compatible within the module. |
| src/utils/ucc_coll_utils.c | Single-line fix: uses task->bargs.team->size instead of task->team->params.size, correcting the size lookup for the scatter/scatterv buffer copy. |
Reviews (3): Last reviewed commit: "REVIEW: extend dt check to include init ..." | Re-trigger Greptile
| status = ucc_coll_init(team->score_map, &op_args, &task); | ||
| if (UCC_ERR_NOT_SUPPORTED == status) { | ||
| ucc_debug("failed to init collective: not supported"); | ||
| goto free_scratch; | ||
| } else if (ucc_unlikely(status < 0)) { | ||
| char coll_args_str[256] = {0}; | ||
| ucc_coll_args_str(&op_args.args, team->rank, team->size, coll_args_str, | ||
| sizeof(coll_args_str)); | ||
| ucc_error("failed to init collective: %s, err: (%d) %s", coll_args_str, | ||
| status, ucc_status_string(status)); | ||
| goto free_scratch; | ||
| } | ||
|
|
||
| /* Setup non-blocking datatype check for rooted collectives | ||
| * | ||
| * This implements transparent validation using a schedule with two tasks: | ||
| * 1. Allreduce validation task: uses MIN reduction with min/max trick to detect mismatches | ||
| * 2. Actual collective task: the real gather/scatter operation | ||
| * | ||
| * Validation uses allreduce (MIN) on [dt, -dt, mem, -mem]: | ||
| * - Message size: 8 bytes (4 × int16_t, doesn't scale with number of ranks) | ||
| * - After reduction: min(dt) == -min(-dt) means all ranks have same datatype | ||
| * | ||
| * Dependencies: allreduce validation → actual task | ||
| * If validation fails, the dependency mechanism prevents the actual task from posting. | ||
| */ | ||
| if (coll_args->coll_type == UCC_COLL_TYPE_GATHER || | ||
| coll_args->coll_type == UCC_COLL_TYPE_GATHERV || | ||
| coll_args->coll_type == UCC_COLL_TYPE_SCATTER || | ||
| coll_args->coll_type == UCC_COLL_TYPE_SCATTERV) { | ||
| /* Check if datatype validation is needed and create schedule if so */ | ||
| ucc_coll_task_t *validated_task; | ||
|
|
||
| validated_task = ucc_service_dt_check(team, task, &status); | ||
| if (!validated_task) { | ||
| ucc_error("failed to create dt_check schedule: %s", | ||
| ucc_status_string(status)); | ||
| goto coll_finalize; | ||
| if (ucc_global_config.check_asymmetric_dt && | ||
| (coll_args->coll_type == UCC_COLL_TYPE_GATHER || | ||
| coll_args->coll_type == UCC_COLL_TYPE_GATHERV || | ||
| coll_args->coll_type == UCC_COLL_TYPE_SCATTER || | ||
| coll_args->coll_type == UCC_COLL_TYPE_SCATTERV)) { | ||
|
|
||
| if (task == NULL && op_args.asymmetric_save_info.scratch != NULL) { | ||
| ucc_mc_free(op_args.asymmetric_save_info.scratch); | ||
| op_args.asymmetric_save_info.scratch = NULL; | ||
| } | ||
| task_wrap = ucc_service_dt_check(team, coll_args, status, task, | ||
| &wrap_err); | ||
| if (ucc_unlikely(!task_wrap)) { | ||
| status = wrap_err; | ||
| if (task) { | ||
| goto coll_finalize; | ||
| } | ||
| goto free_scratch; | ||
| } | ||
| task = task_wrap; | ||
| } else { | ||
| if (UCC_ERR_NOT_SUPPORTED == status) { | ||
| ucc_debug("failed to init collective: not supported"); | ||
| goto free_scratch; | ||
| } else if (ucc_unlikely(status < 0)) { | ||
| char coll_args_str[256] = {0}; | ||
| ucc_coll_args_str(&op_args.args, team->rank, team->size, | ||
| coll_args_str, sizeof(coll_args_str)); | ||
| ucc_error("failed to init collective: %s, err: (%d) %s", | ||
| coll_args_str, status, ucc_status_string(status)); | ||
| goto free_scratch; | ||
| } | ||
| /* Return schedule if validation was needed, or original task if not */ | ||
| task = validated_task; | ||
| } |
There was a problem hiding this comment.
Missing error logging on
ucc_coll_init failure in the DT-check path
When check_asymmetric_dt is enabled for gather/scatter operations, if ucc_coll_init returns an error (including UCC_ERR_NOT_SUPPORTED), the failure is silently forwarded to ucc_service_dt_check with no log message at all. The original else branch (now only reached for other collective types) still emits ucc_debug/ucc_error on failure. The DT-check path should emit at least a debug-level message so that the asymmetric allreduce step can be linked back to the init failure during troubleshooting.
| if (local_status != UCC_OK || !UCC_DT_IS_PREDEFINED(local_dt)) { | ||
| dt_check->values[0] = (int16_t) UCC_ERR_NOT_SUPPORTED; | ||
| dt_check->values[1] = -(int16_t) UCC_ERR_NOT_SUPPORTED; | ||
| /* Record the actual init failure only for non-DT errors so the | ||
| * actual_wrapper_post can surface the right status code. */ | ||
| dt_check->init_status = (local_status != UCC_OK && | ||
| UCC_DT_IS_PREDEFINED(local_dt)) | ||
| ? local_status : UCC_OK; | ||
| } else { | ||
| /* Predefined datatypes are always contiguous - safe to cast to int16 */ | ||
| dt_check->values[0] = (int16_t) local_dt; | ||
| dt_check->values[1] = -(int16_t) local_dt; | ||
| } | ||
| dt_check->values[2] = (int16_t) local_mem_type; | ||
| dt_check->values[3] = -(int16_t) local_mem_type; | ||
| /* Setup subset for full team */ | ||
| dt_check->subset.myrank = team->rank; | ||
| dt_check->subset.map.type = UCC_EP_MAP_FULL; | ||
| dt_check->values[0] = (int16_t) local_dt; | ||
| dt_check->values[1] = -(int16_t) local_dt; | ||
| dt_check->init_status = UCC_OK; | ||
| } |
There was a problem hiding this comment.
init_status not recorded when both local_status != UCC_OK and !UCC_DT_IS_PREDEFINED(local_dt)
The ternary expression evaluates to UCC_OK whenever !UCC_DT_IS_PREDEFINED(local_dt), even if local_status is a genuine non-DT error (e.g., UCC_ERR_NO_MEMORY). In that case actual_wrapper_post falls through to err = UCC_ERR_NOT_SUPPORTED, hiding the real failure from the caller. Ranks that fail for non-DT reasons would surface the wrong error code.
| static ucc_status_t ucc_dt_check_allreduce_post(ucc_coll_task_t *allreduce_wrapper) | ||
| { | ||
| ucc_dt_check_state_t *dt_check = UCC_DT_CHECK_FROM_TASK(allreduce_wrapper); | ||
| ucc_team_t *team = allreduce_wrapper->bargs.team; | ||
| ucc_status_t status; | ||
|
|
||
| /* Safety check */ | ||
| if (!dt_check) { | ||
| allreduce_wrapper->status = UCC_ERR_INVALID_PARAM; | ||
| return UCC_ERR_INVALID_PARAM; | ||
| } | ||
|
|
||
| /* Start in-place service allreduce with MIN operation on 4 int16_t values */ | ||
| status = ucc_service_allreduce(team, dt_check->values, dt_check->values, | ||
| UCC_DT_INT16, 4, UCC_OP_MIN, | ||
| dt_check->subset, &dt_check->check_req); | ||
| if (status != UCC_OK) { | ||
| ucc_schedule_t *schedule = allreduce_wrapper->schedule; | ||
|
|
||
| allreduce_wrapper->status = status; | ||
| return status; | ||
| /* ucc_schedule_start already set the schedule to UCC_INPROGRESS before | ||
| * firing SCHEDULE_STARTED. Fail the whole schedule now so that | ||
| * ucc_collective_finalize_internal will not refuse to run because the | ||
| * top-level request is still UCC_INPROGRESS. */ | ||
| if (schedule) { | ||
| schedule->n_completed_tasks = schedule->n_tasks; | ||
| schedule->super.status = status; | ||
| ucc_task_complete(&schedule->super); | ||
| } | ||
| return UCC_OK; | ||
| } | ||
| allreduce_wrapper->status = UCC_INPROGRESS; | ||
| /* Enqueue wrapper task for progress */ | ||
| return ucc_progress_queue_enqueue(team->contexts[0]->pq, allreduce_wrapper); | ||
| } |
There was a problem hiding this comment.
allreduce_wrapper never has ucc_task_complete called on it when its post fails
When ucc_service_allreduce fails inside ucc_dt_check_allreduce_post, the code sets allreduce_wrapper->status = status and then directly calls ucc_task_complete(&schedule->super) to fail the whole schedule. However, ucc_task_complete is never invoked on allreduce_wrapper itself, so UCC_EVENT_COMPLETED and UCC_EVENT_ERROR notifications to actual_wrapper are never sent. Please verify that ucc_schedule_finalize iterates all tasks and calls their finalize callbacks regardless of whether they were ever posted, to avoid leaking the actual_task returned by ucc_coll_init.
What
Extends an opt-in asymmetric-datatype check for rooted collectives to include a status check of
ucc_coll_initensuring all ranks either pass or fail.Why ?
Previously a rank could fail
ucc_coll_initand return an error while other ranks may succeed and hang waiting on the failed process.