Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
36 changes: 33 additions & 3 deletions src/components/tl/ucp/allgather/allgather.c
Original file line number Diff line number Diff line change
Expand Up @@ -54,9 +54,7 @@ ucc_status_t ucc_tl_ucp_allgather_init(ucc_tl_ucp_task_t *task)
char *ucc_tl_ucp_allgather_score_str_get(ucc_tl_ucp_team_t *team)
{
int max_size = ALLGATHER_MAX_PATTERN_SIZE;
int algo_num = UCC_TL_TEAM_SIZE(team) % 2
? UCC_TL_UCP_ALLGATHER_ALG_RING
: UCC_TL_UCP_ALLGATHER_ALG_NEIGHBOR;
int algo_num;
char * str = ucc_malloc(max_size * sizeof(char));
ucc_tl_ucp_context_t *ctx = UCC_TL_UCP_TEAM_CTX(team);
uint64_t cuda_types =
Expand All @@ -67,13 +65,21 @@ char *ucc_tl_ucp_allgather_score_str_get(ucc_tl_ucp_team_t *team)
char * non_cuda_str;
char * cuda_str;

algo_num = UCC_TL_TEAM_SIZE(team) % 2
? UCC_TL_UCP_ALLGATHER_ALG_RING
: UCC_TL_UCP_ALLGATHER_ALG_NEIGHBOR;

if (team->cfg.use_reordering) {
Comment thread
Juee14Desai marked this conversation as resolved.
sbgp = ucc_topo_get_sbgp(team->topo, UCC_SBGP_FULL_HOST_ORDERED);
if (!ucc_ep_map_is_identity(&sbgp->map)) {
algo_num = UCC_TL_UCP_ALLGATHER_ALG_RING;
}
}

if (algo_num == UCC_TL_UCP_ALLGATHER_ALG_RING && !team->cuda_ring) {
Comment on lines 76 to +79
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

P1 Algorithm regression for odd-size non-CUDA teams

When cuda_ring is NULL (any CPU team, or GPU team without NVLink topology), odd-size teams previously used the flat RING algorithm. After this guard, they fall through to KNOMIAL. This silently changes the default for every odd-size non-CUDA workload, including large CPU clusters.

The underlying reason is that allgather_ring_init_common now hard-fails with UCC_ERR_NOT_SUPPORTED when cuda_ring == NULL, so the score-string must avoid selecting it. However, the correct fallback for the "no-topology-info" case is still the flat ring (old behavior), not knomial. Consider keeping the flat-ring algorithm alive under a separate name/enum and using it as the fallback, or only switching to KNOMIAL when the caller is guaranteed to benefit.

algo_num = UCC_TL_UCP_ALLGATHER_ALG_KNOMIAL;
}

if (team->topo && ucc_topo_is_single_ppn(team->topo)) {
if (cuda_types) {
cuda_str = ucc_malloc(max_size * sizeof(char));
Expand All @@ -98,6 +104,30 @@ char *ucc_tl_ucp_allgather_score_str_get(ucc_tl_ucp_team_t *team)
return str;
}
}

if (team->cuda_ring && cuda_types) {
cuda_str = ucc_malloc(max_size * sizeof(char));
ucc_mtype_map_to_str(cuda_types, ",", cuda_str, max_size);
if (non_cuda_types) {
non_cuda_str = ucc_malloc(max_size * sizeof(char));
ucc_mtype_map_to_str(non_cuda_types, ",", non_cuda_str, max_size);
ucc_snprintf_safe(str, max_size,
"allgather:0-4k:@0#allgather:4k-inf:%s:@%d"
"#allgather:4k-inf:%s:@%d",
cuda_str, UCC_TL_UCP_ALLGATHER_ALG_RING,
non_cuda_str, algo_num);
ucc_free(cuda_str);
ucc_free(non_cuda_str);
return str;
}
ucc_snprintf_safe(str, max_size,
"allgather:0-4k:@0#allgather:4k-inf:%s:@%d"
"#allgather:4k-inf:@%d",
cuda_str, UCC_TL_UCP_ALLGATHER_ALG_RING, algo_num);
ucc_free(cuda_str);
return str;
}

ucc_snprintf_safe(str, max_size,
UCC_TL_UCP_ALLGATHER_DEFAULT_ALG_SELECT_STR, algo_num);
return str;
Expand Down
156 changes: 83 additions & 73 deletions src/components/tl/ucp/allgather/allgather_ring.c
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/**
* Copyright (c) 2021-2023, NVIDIA CORPORATION & AFFILIATES. All rights reserved.
* Copyright (c) 2021-2025, NVIDIA CORPORATION & AFFILIATES. All rights reserved.
*
* See file LICENSE for terms.
*/
Expand All @@ -12,62 +12,78 @@
#include "utils/ucc_math.h"
#include "utils/ucc_coll_utils.h"
#include "components/mc/ucc_mc.h"
#include "coll_patterns/ring.h"

static ucc_rank_t ucc_tl_ucp_allgather_ring_get_send_block(ucc_subset_t *subset,
ucc_rank_t trank,
ucc_rank_t tsize,
int step)
{
return ucc_ep_map_eval(subset->map, (trank - step + tsize) % tsize);
}

static ucc_rank_t ucc_tl_ucp_allgather_ring_get_recv_block(ucc_subset_t *subset,
ucc_rank_t trank,
ucc_rank_t tsize,
int step)
{
return ucc_ep_map_eval(subset->map, (trank - step - 1 + tsize) % tsize);
}
#define MAX_RINGS 8

void ucc_tl_ucp_allgather_ring_progress(ucc_coll_task_t *coll_task)
{
ucc_tl_ucp_task_t *task = ucc_derived_of(coll_task, ucc_tl_ucp_task_t);
ucc_tl_ucp_team_t *team = TASK_TEAM(task);
ucc_rank_t trank = task->subset.myrank;
ucc_rank_t tsize = (ucc_rank_t)task->subset.map.ep_num;
void *rbuf = TASK_ARGS(task).dst.info.buffer;
ucc_memory_type_t rmem = TASK_ARGS(task).dst.info.mem_type;
size_t count = TASK_ARGS(task).dst.info.count;
ucc_datatype_t dt = TASK_ARGS(task).dst.info.datatype;
size_t data_size = (count / tsize) * ucc_dt_size(dt);
ucc_rank_t sendto, recvfrom, sblock, rblock;
int step;
void *buf;
ucc_tl_ucp_task_t *task = ucc_derived_of(coll_task, ucc_tl_ucp_task_t);
ucc_tl_ucp_team_t *team = TASK_TEAM(task);
ucc_coll_args_t *args = &TASK_ARGS(task);
void *rbuf = args->dst.info.buffer;
ucc_memory_type_t rmem = args->dst.info.mem_type;
size_t count = args->dst.info.count;
ucc_datatype_t dt = args->dst.info.datatype;
size_t rdt_size = ucc_dt_size(dt);
ucc_ring_pattern_t *ring = team->cuda_ring;
ucc_rank_t ring_id;
ucc_rank_t nrings;
ucc_rank_t rrank;
ucc_rank_t tsize;
ucc_rank_t send_idx, recv_idx, sendto, recvfrom, step;
size_t block_count, ring_offset, ring_count;
size_t data_size, data_displ;

nrings = ucc_min(MAX_RINGS, ring->num_rings);
tsize = ucc_ring_pattern_size(ring, 0);
block_count = count / tsize;

if (UCC_INPROGRESS == ucc_tl_ucp_test(task)) {
return;
}
sendto = ucc_ep_map_eval(task->subset.map, (trank + 1) % tsize);
recvfrom = ucc_ep_map_eval(task->subset.map, (trank - 1 + tsize) % tsize);

while (task->tagged.send_posted < tsize - 1) {
step = task->tagged.send_posted;
sblock = task->allgather_ring.get_send_block(&task->subset, trank,
tsize, step);
rblock = task->allgather_ring.get_recv_block(&task->subset, trank,
tsize, step);
buf = PTR_OFFSET(rbuf, sblock * data_size);
UCPCHECK_GOTO(
ucc_tl_ucp_send_nb(buf, data_size, rmem, sendto, team, task),
task, out);
buf = PTR_OFFSET(rbuf, rblock * data_size);
UCPCHECK_GOTO(
ucc_tl_ucp_recv_nb(buf, data_size, rmem, recvfrom, team, task),
task, out);

while (task->tagged.send_posted < 1 + nrings * (tsize - 1)) {
ucc_assert(task->tagged.send_posted > 0);
ucc_assert(task->tagged.recv_posted > 0);
ucc_assert(task->tagged.send_posted == task->tagged.recv_posted);
step = (ucc_rank_t)((task->tagged.send_posted - 1) / nrings);
for (ring_id = 0; ring_id < nrings; ring_id++) {
rrank = ucc_ring_pattern_rank(ring, ring_id);
sendto = ucc_ring_pattern_get_send_peer(ring, ring_id, rrank);
recvfrom = ucc_ring_pattern_get_recv_peer(ring, ring_id, rrank);

send_idx = ucc_ring_pattern_get_send_block(ring, ring_id,
rrank, step);
ring_offset = ucc_buffer_block_offset(block_count, nrings,
ring_id);
ring_count = ucc_buffer_block_count(block_count, nrings,
ring_id);
data_displ = (send_idx * block_count + ring_offset) * rdt_size;
data_size = ring_count * rdt_size;
UCPCHECK_GOTO(ucc_tl_ucp_send_nb(PTR_OFFSET(rbuf, data_displ),
data_size, rmem, sendto, team,
task),
task, out);

recv_idx = ucc_ring_pattern_get_recv_block(ring, ring_id,
rrank, step);
ring_offset = ucc_buffer_block_offset(block_count, nrings,
ring_id);
ring_count = ucc_buffer_block_count(block_count, nrings,
ring_id);
data_displ = (recv_idx * block_count + ring_offset) * rdt_size;
data_size = ring_count * rdt_size;
UCPCHECK_GOTO(ucc_tl_ucp_recv_nb(PTR_OFFSET(rbuf, data_displ),
data_size, rmem, recvfrom,
team, task),
task, out);
}
if (UCC_INPROGRESS == ucc_tl_ucp_test(task)) {
return;
}
}

ucc_assert(UCC_TL_UCP_TASK_P2P_COMPLETE(task));
task->super.status = UCC_OK;
out:
Expand All @@ -76,59 +92,53 @@ void ucc_tl_ucp_allgather_ring_progress(ucc_coll_task_t *coll_task)

ucc_status_t ucc_tl_ucp_allgather_ring_start(ucc_coll_task_t *coll_task)
{
ucc_tl_ucp_task_t *task = ucc_derived_of(coll_task, ucc_tl_ucp_task_t);
ucc_tl_ucp_team_t *team = TASK_TEAM(task);
size_t count = TASK_ARGS(task).dst.info.count;
void *sbuf = TASK_ARGS(task).src.info.buffer;
void *rbuf = TASK_ARGS(task).dst.info.buffer;
ucc_memory_type_t smem = TASK_ARGS(task).src.info.mem_type;
ucc_memory_type_t rmem = TASK_ARGS(task).dst.info.mem_type;
ucc_datatype_t dt = TASK_ARGS(task).dst.info.datatype;
ucc_rank_t trank = task->subset.myrank;
ucc_rank_t tsize = (ucc_rank_t)task->subset.map.ep_num;
size_t data_size = (count / tsize) * ucc_dt_size(dt);
ucc_tl_ucp_task_t *task = ucc_derived_of(coll_task, ucc_tl_ucp_task_t);
ucc_tl_ucp_team_t *team = TASK_TEAM(task);
ucc_coll_args_t *args = &TASK_ARGS(task);
ucc_ring_pattern_t *ring = team->cuda_ring;
size_t count = args->dst.info.count;
void *sbuf = args->src.info.buffer;
void *rbuf = args->dst.info.buffer;
ucc_memory_type_t rmem = args->dst.info.mem_type;
ucc_memory_type_t smem = args->src.info.mem_type;
ucc_datatype_t dt = args->dst.info.datatype;
ucc_rank_t tsize = ucc_ring_pattern_size(ring, 0);
ucc_rank_t block = UCC_TL_TEAM_RANK(team);
size_t data_size = (count / tsize) * ucc_dt_size(dt);
ucc_status_t status;
ucc_rank_t block;

UCC_TL_UCP_PROFILE_REQUEST_EVENT(coll_task, "ucp_allgather_ring_start", 0);
ucc_tl_ucp_task_reset(task, UCC_INPROGRESS);

if (!UCC_IS_INPLACE(TASK_ARGS(task))) {
block = task->allgather_ring.get_send_block(&task->subset, trank, tsize,
0);
if (!UCC_IS_INPLACE(*args)) {
status = ucc_mc_memcpy(PTR_OFFSET(rbuf, data_size * block),
sbuf, data_size, rmem, smem);
sbuf, data_size, rmem, smem);
if (ucc_unlikely(UCC_OK != status)) {
return status;
}
}

task->tagged.send_posted = task->tagged.send_completed = 1;
task->tagged.recv_posted = task->tagged.recv_completed = 1;

return ucc_progress_queue_enqueue(UCC_TL_CORE_CTX(team)->pq, &task->super);
}

ucc_status_t ucc_tl_ucp_allgather_ring_init_common(ucc_tl_ucp_task_t *task)
{
ucc_tl_ucp_team_t *team = TASK_TEAM(task);
ucc_sbgp_t *sbgp;

if (!ucc_coll_args_is_predefined_dt(&TASK_ARGS(task), UCC_RANK_INVALID)) {
tl_error(UCC_TASK_LIB(task), "user defined datatype is not supported");
return UCC_ERR_NOT_SUPPORTED;
}

if (!(task->flags & UCC_TL_UCP_TASK_FLAG_SUBSET)) {
if (team->cfg.use_reordering) {
sbgp = ucc_topo_get_sbgp(team->topo, UCC_SBGP_FULL_HOST_ORDERED);
task->subset.myrank = sbgp->group_rank;
task->subset.map = sbgp->map;
}
if (!team->cuda_ring) {
return UCC_ERR_NOT_SUPPORTED;
}

task->allgather_ring.get_send_block = ucc_tl_ucp_allgather_ring_get_send_block;
task->allgather_ring.get_recv_block = ucc_tl_ucp_allgather_ring_get_recv_block;
task->super.post = ucc_tl_ucp_allgather_ring_start;
task->super.progress = ucc_tl_ucp_allgather_ring_progress;

task->super.post = ucc_tl_ucp_allgather_ring_start;
task->super.progress = ucc_tl_ucp_allgather_ring_progress;
return UCC_OK;
}

Expand Down
50 changes: 50 additions & 0 deletions src/components/tl/ucp/reduce_scatter/reduce_scatter.c
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,9 @@
#include "tl_ucp.h"
#include "reduce_scatter.h"
#include "utils/ucc_coll_utils.h"
#include "utils/ucc_string.h"

#define REDUCE_SCATTER_MAX_PATTERN_SIZE 256

ucc_base_coll_alg_info_t
ucc_tl_ucp_reduce_scatter_algs[UCC_TL_UCP_REDUCE_SCATTER_ALG_LAST + 1] = {
Expand All @@ -20,3 +23,50 @@ ucc_base_coll_alg_info_t
.desc = "recursive k-ing with arbitrary radix"},
[UCC_TL_UCP_REDUCE_SCATTER_ALG_LAST] = {
.id = 0, .name = NULL, .desc = NULL}};

char *ucc_tl_ucp_reduce_scatter_score_str_get(ucc_tl_ucp_team_t *team)
{
int max_size = REDUCE_SCATTER_MAX_PATTERN_SIZE;
char *str = ucc_malloc(max_size * sizeof(char));
ucc_tl_ucp_context_t *ctx = UCC_TL_UCP_TEAM_CTX(team);
uint64_t cuda_types =
ctx->ucp_memory_types &
(UCC_BIT(UCC_MEMORY_TYPE_CUDA) |
UCC_BIT(UCC_MEMORY_TYPE_CUDA_MANAGED));
uint64_t non_cuda_types = ctx->ucp_memory_types & (~cuda_types);
char *non_cuda_str;
char *cuda_str;

if (team->cuda_ring && cuda_types) {
cuda_str = ucc_malloc(max_size * sizeof(char));
ucc_mtype_map_to_str(cuda_types, ",", cuda_str, max_size);
if (non_cuda_types) {
non_cuda_str = ucc_malloc(max_size * sizeof(char));
ucc_mtype_map_to_str(non_cuda_types, ",", non_cuda_str, max_size);
ucc_snprintf_safe(str, max_size,
"reduce_scatter:0-4k:@%d"
"#reduce_scatter:4k-inf:%s:@%d"
"#reduce_scatter:4k-inf:%s:@%d",
UCC_TL_UCP_REDUCE_SCATTER_ALG_KNOMIAL,
cuda_str, UCC_TL_UCP_REDUCE_SCATTER_ALG_RING,
non_cuda_str, UCC_TL_UCP_REDUCE_SCATTER_ALG_KNOMIAL);
ucc_free(cuda_str);
ucc_free(non_cuda_str);
return str;
Comment thread
Juee14Desai marked this conversation as resolved.
}
ucc_snprintf_safe(str, max_size,
"reduce_scatter:0-4k:@%d"
"#reduce_scatter:4k-inf:%s:@%d"
"#reduce_scatter:4k-inf:@%d",
UCC_TL_UCP_REDUCE_SCATTER_ALG_KNOMIAL,
cuda_str, UCC_TL_UCP_REDUCE_SCATTER_ALG_RING,
UCC_TL_UCP_REDUCE_SCATTER_ALG_KNOMIAL);
ucc_free(cuda_str);
return str;
}

ucc_snprintf_safe(str, max_size,
UCC_TL_UCP_REDUCE_SCATTER_DEFAULT_ALG_SELECT_STR,
UCC_TL_UCP_REDUCE_SCATTER_ALG_KNOMIAL);
return str;
}
9 changes: 8 additions & 1 deletion src/components/tl/ucp/reduce_scatter/reduce_scatter.h
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,9 @@ extern ucc_base_coll_alg_info_t
ucc_tl_ucp_reduce_scatter_algs[UCC_TL_UCP_REDUCE_SCATTER_ALG_LAST + 1];

#define UCC_TL_UCP_REDUCE_SCATTER_DEFAULT_ALG_SELECT_STR \
"reduce_scatter:@ring"
"reduce_scatter:@%d"

char *ucc_tl_ucp_reduce_scatter_score_str_get(ucc_tl_ucp_team_t *team);

static inline int ucc_tl_ucp_reduce_scatter_alg_from_str(const char *str)
{
Expand Down Expand Up @@ -48,4 +50,9 @@ ucc_status_t
ucc_tl_ucp_reduce_scatter_ring_init(ucc_base_coll_args_t *coll_args,
ucc_base_team_t * team,
ucc_coll_task_t ** task_h);

ucc_status_t ucc_tl_ucp_reduce_scatter_ring_init_common(
ucc_tl_ucp_task_t *task);

void ucc_tl_ucp_reduce_scatter_ring_progress(ucc_coll_task_t *coll_task);
#endif
Loading
Loading