Skip to content

CORE: add global init status check#1303

Open
wfaderhold21 wants to merge 3 commits into
openucx:masterfrom
wfaderhold21:topic/strict_coll_check
Open

CORE: add global init status check#1303
wfaderhold21 wants to merge 3 commits into
openucx:masterfrom
wfaderhold21:topic/strict_coll_check

Conversation

@wfaderhold21
Copy link
Copy Markdown
Collaborator

What

Extends an opt-in asymmetric-datatype check for rooted collectives to include a status check of ucc_coll_init ensuring all ranks either pass or fail.

Why ?

Previously a rank could fail ucc_coll_init and return an error while other ranks may succeed and hang waiting on the failed process.

@greptile-apps
Copy link
Copy Markdown
Contributor

greptile-apps Bot commented May 1, 2026

Greptile Summary

This PR extends the opt-in check_asymmetric_dt feature for gather/scatter collectives to propagate ucc_coll_init success/failure across all ranks, preventing hangs when one rank fails init while others succeed. The allreduce payload grows from 4 to 6 int16_t values, adding a [status, -status] pair that, after MIN reduction, exposes the globally worst init status for uniform error reporting.

  • ucc_service_dt_check now always runs (regardless of ucc_coll_init status) and accepts coll_args directly so it can operate even when task == NULL.
  • ucc_dt_validate_results checks the global init-status slot before performing the existing DT/memory-type consistency checks, and the validate path in ucc_dt_check_actual_wrapper_post correctly propagates the reduced error code back to callers.
  • Service collection helpers (allreduce, allgather, bcast) now null out *req on failure, fixing a use-after-free risk for callers that check the pointer after an error return.

Confidence Score: 4/5

The core fix is sound: all ranks now participate in the service allreduce regardless of local init outcome, eliminating the hang. The error paths are complex and warrant careful second-pass review before merging.

The global-status propagation logic in ucc_dt_validate_results is correct, and the force-complete patterns in both wrapper post functions mirror each other consistently. The one concern is the defensive actual_task == NULL guard in ucc_dt_check_actual_wrapper_progress: if somehow reached, it sets an error status but never forces the schedule to complete, which would cause an indefinite hang. The path is unreachable during normal operation, but its failure mode is silent. The broader executor-transfer logic across ucc_dt_check_actual_wrapper_post and ucc_collective_init is intricate enough that a second read-through focusing on the executor stop/destroy flag bookkeeping is advisable.

src/core/ucc_service_coll.c — specifically the progress-function NULL guard and the n_completed_tasks force-completion pattern used in both wrapper post functions.

Important Files Changed

Filename Overview
src/core/ucc_coll.c Restructures post-init flow so ucc_service_dt_check is always called for gather/scatter when the feature is enabled, passing local_status even on init failure; error-path routing to coll_finalize vs free_scratch appears correct but is intricate.
src/core/ucc_service_coll.c Core of the change: extends the allreduce payload to 6 slots, adds global init-status check in ucc_dt_validate_results, reworks force-completion paths in both wrapper post functions. One defensive NULL check in ucc_dt_check_actual_wrapper_progress sets error status without forcing schedule completion.
src/core/ucc_service_coll.h Updated docstring and signature for ucc_service_dt_check to include coll_args and local_status parameters; accurately reflects the must-call-on-every-rank contract.
src/schedule/ucc_schedule.h Grows values array from 4 to 6 elements and adds ar_status field to ucc_dt_check_state_t; struct layout change is backward-compatible within the module.
src/utils/ucc_coll_utils.c Single-line fix: uses task->bargs.team->size instead of task->team->params.size, correcting the size lookup for the scatter/scatterv buffer copy.

Reviews (3): Last reviewed commit: "REVIEW: extend dt check to include init ..." | Re-trigger Greptile

Comment thread src/core/ucc_coll.c
Comment on lines 250 to 284
status = ucc_coll_init(team->score_map, &op_args, &task);
if (UCC_ERR_NOT_SUPPORTED == status) {
ucc_debug("failed to init collective: not supported");
goto free_scratch;
} else if (ucc_unlikely(status < 0)) {
char coll_args_str[256] = {0};
ucc_coll_args_str(&op_args.args, team->rank, team->size, coll_args_str,
sizeof(coll_args_str));
ucc_error("failed to init collective: %s, err: (%d) %s", coll_args_str,
status, ucc_status_string(status));
goto free_scratch;
}

/* Setup non-blocking datatype check for rooted collectives
*
* This implements transparent validation using a schedule with two tasks:
* 1. Allreduce validation task: uses MIN reduction with min/max trick to detect mismatches
* 2. Actual collective task: the real gather/scatter operation
*
* Validation uses allreduce (MIN) on [dt, -dt, mem, -mem]:
* - Message size: 8 bytes (4 × int16_t, doesn't scale with number of ranks)
* - After reduction: min(dt) == -min(-dt) means all ranks have same datatype
*
* Dependencies: allreduce validation → actual task
* If validation fails, the dependency mechanism prevents the actual task from posting.
*/
if (coll_args->coll_type == UCC_COLL_TYPE_GATHER ||
coll_args->coll_type == UCC_COLL_TYPE_GATHERV ||
coll_args->coll_type == UCC_COLL_TYPE_SCATTER ||
coll_args->coll_type == UCC_COLL_TYPE_SCATTERV) {
/* Check if datatype validation is needed and create schedule if so */
ucc_coll_task_t *validated_task;

validated_task = ucc_service_dt_check(team, task, &status);
if (!validated_task) {
ucc_error("failed to create dt_check schedule: %s",
ucc_status_string(status));
goto coll_finalize;
if (ucc_global_config.check_asymmetric_dt &&
(coll_args->coll_type == UCC_COLL_TYPE_GATHER ||
coll_args->coll_type == UCC_COLL_TYPE_GATHERV ||
coll_args->coll_type == UCC_COLL_TYPE_SCATTER ||
coll_args->coll_type == UCC_COLL_TYPE_SCATTERV)) {

if (task == NULL && op_args.asymmetric_save_info.scratch != NULL) {
ucc_mc_free(op_args.asymmetric_save_info.scratch);
op_args.asymmetric_save_info.scratch = NULL;
}
task_wrap = ucc_service_dt_check(team, coll_args, status, task,
&wrap_err);
if (ucc_unlikely(!task_wrap)) {
status = wrap_err;
if (task) {
goto coll_finalize;
}
goto free_scratch;
}
task = task_wrap;
} else {
if (UCC_ERR_NOT_SUPPORTED == status) {
ucc_debug("failed to init collective: not supported");
goto free_scratch;
} else if (ucc_unlikely(status < 0)) {
char coll_args_str[256] = {0};
ucc_coll_args_str(&op_args.args, team->rank, team->size,
coll_args_str, sizeof(coll_args_str));
ucc_error("failed to init collective: %s, err: (%d) %s",
coll_args_str, status, ucc_status_string(status));
goto free_scratch;
}
/* Return schedule if validation was needed, or original task if not */
task = validated_task;
}
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

P2 Missing error logging on ucc_coll_init failure in the DT-check path

When check_asymmetric_dt is enabled for gather/scatter operations, if ucc_coll_init returns an error (including UCC_ERR_NOT_SUPPORTED), the failure is silently forwarded to ucc_service_dt_check with no log message at all. The original else branch (now only reached for other collective types) still emits ucc_debug/ucc_error on failure. The DT-check path should emit at least a debug-level message so that the asymmetric allreduce step can be linked back to the init failure during troubleshooting.

Comment thread src/core/ucc_service_coll.c Outdated
Comment on lines +563 to +575
if (local_status != UCC_OK || !UCC_DT_IS_PREDEFINED(local_dt)) {
dt_check->values[0] = (int16_t) UCC_ERR_NOT_SUPPORTED;
dt_check->values[1] = -(int16_t) UCC_ERR_NOT_SUPPORTED;
/* Record the actual init failure only for non-DT errors so the
* actual_wrapper_post can surface the right status code. */
dt_check->init_status = (local_status != UCC_OK &&
UCC_DT_IS_PREDEFINED(local_dt))
? local_status : UCC_OK;
} else {
/* Predefined datatypes are always contiguous - safe to cast to int16 */
dt_check->values[0] = (int16_t) local_dt;
dt_check->values[1] = -(int16_t) local_dt;
}
dt_check->values[2] = (int16_t) local_mem_type;
dt_check->values[3] = -(int16_t) local_mem_type;
/* Setup subset for full team */
dt_check->subset.myrank = team->rank;
dt_check->subset.map.type = UCC_EP_MAP_FULL;
dt_check->values[0] = (int16_t) local_dt;
dt_check->values[1] = -(int16_t) local_dt;
dt_check->init_status = UCC_OK;
}
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

P1 init_status not recorded when both local_status != UCC_OK and !UCC_DT_IS_PREDEFINED(local_dt)

The ternary expression evaluates to UCC_OK whenever !UCC_DT_IS_PREDEFINED(local_dt), even if local_status is a genuine non-DT error (e.g., UCC_ERR_NO_MEMORY). In that case actual_wrapper_post falls through to err = UCC_ERR_NOT_SUPPORTED, hiding the real failure from the caller. Ranks that fail for non-DT reasons would surface the wrong error code.

Comment on lines 244 to 275
static ucc_status_t ucc_dt_check_allreduce_post(ucc_coll_task_t *allreduce_wrapper)
{
ucc_dt_check_state_t *dt_check = UCC_DT_CHECK_FROM_TASK(allreduce_wrapper);
ucc_team_t *team = allreduce_wrapper->bargs.team;
ucc_status_t status;

/* Safety check */
if (!dt_check) {
allreduce_wrapper->status = UCC_ERR_INVALID_PARAM;
return UCC_ERR_INVALID_PARAM;
}

/* Start in-place service allreduce with MIN operation on 4 int16_t values */
status = ucc_service_allreduce(team, dt_check->values, dt_check->values,
UCC_DT_INT16, 4, UCC_OP_MIN,
dt_check->subset, &dt_check->check_req);
if (status != UCC_OK) {
ucc_schedule_t *schedule = allreduce_wrapper->schedule;

allreduce_wrapper->status = status;
return status;
/* ucc_schedule_start already set the schedule to UCC_INPROGRESS before
* firing SCHEDULE_STARTED. Fail the whole schedule now so that
* ucc_collective_finalize_internal will not refuse to run because the
* top-level request is still UCC_INPROGRESS. */
if (schedule) {
schedule->n_completed_tasks = schedule->n_tasks;
schedule->super.status = status;
ucc_task_complete(&schedule->super);
}
return UCC_OK;
}
allreduce_wrapper->status = UCC_INPROGRESS;
/* Enqueue wrapper task for progress */
return ucc_progress_queue_enqueue(team->contexts[0]->pq, allreduce_wrapper);
}
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

P2 allreduce_wrapper never has ucc_task_complete called on it when its post fails

When ucc_service_allreduce fails inside ucc_dt_check_allreduce_post, the code sets allreduce_wrapper->status = status and then directly calls ucc_task_complete(&schedule->super) to fail the whole schedule. However, ucc_task_complete is never invoked on allreduce_wrapper itself, so UCC_EVENT_COMPLETED and UCC_EVENT_ERROR notifications to actual_wrapper are never sent. Please verify that ucc_schedule_finalize iterates all tasks and calls their finalize callbacks regardless of whether they were ever posted, to avoid leaking the actual_task returned by ucc_coll_init.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Projects

None yet

Development

Successfully merging this pull request may close these issues.

1 participant