Skip to content

Commit bec53ca

Browse files
UCC/CORE: Added automation to loading local rank from topo if not provided by user
1 parent 5852377 commit bec53ca

4 files changed

Lines changed: 233 additions & 7 deletions

File tree

src/components/topo/ucc_topo.h

Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -290,6 +290,21 @@ static inline ucc_rank_t ucc_topo_nnodes(ucc_topo_t *topo)
290290
return sbgp->group_size;
291291
}
292292

293+
static inline ucc_rank_t ucc_topo_node_local_rank(ucc_topo_t *topo)
294+
{
295+
ucc_sbgp_t *sbgp;
296+
297+
if (!topo) {
298+
return 0;
299+
}
300+
sbgp = ucc_topo_get_sbgp(topo, UCC_SBGP_NODE);
301+
302+
if (sbgp->status != UCC_SBGP_ENABLED) {
303+
return 0;
304+
}
305+
return sbgp->group_rank;
306+
}
307+
293308
/* Returns node leaders array - array that maps each rank to the TEAM RANK that
294309
is the leader of that rank's node. Also returns per-node leaders array - array
295310
mapping node_id to the TEAM RANK of that node's leader */

src/core/ucc_context.c

Lines changed: 211 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@
66

77
#include "config.h"
88
#include "ucc_context.h"
9+
#include "components/topo/ucc_topo.h"
910
#include "utils/ucc_proc_info.h"
1011
#include "components/cl/ucc_cl.h"
1112
#include "components/tl/ucc_tl.h"
@@ -15,6 +16,7 @@
1516
#include "utils/ucc_list.h"
1617
#include "utils/ucc_string.h"
1718
#include "utils/ucc_debug.h"
19+
#include "utils/ucc_compiler_def.h"
1820
#include "ucc_progress_queue.h"
1921

2022
static uint32_t ucc_context_seq_num = 0;
@@ -60,7 +62,12 @@ static ucc_config_field_t ucc_context_config_table[] = {
6062
ucc_offsetof(ucc_context_config_t, net_devices), UCC_CONFIG_TYPE_STRING_ARRAY},
6163

6264
{"NODE_LOCAL_ID", "auto",
63-
"An optimization hint for the local identificator on a single node.",
65+
"An optimization hint for the local rank index on a single node.\n"
66+
"With OOB and more than one endpoint, \"auto\" runs one prefix allgather "
67+
"(ctx id / host), then the main address exchange (two allgathers: "
68+
"lengths, then full addresses). Custom OOB backends must support three "
69+
"sequential allgather operations on the same coll_info (each completes "
70+
"before the next begins).",
6471
ucc_offsetof(ucc_context_config_t, node_local_id), UCC_CONFIG_TYPE_ULUNITS},
6572

6673
{NULL}};
@@ -462,6 +469,7 @@ static ucc_status_t ucc_create_tl_contexts(ucc_context_t *ctx,
462469
return status;
463470
}
464471

472+
465473
ucc_status_t ucc_core_addr_exchange(
466474
ucc_context_t *context, ucc_oob_coll_t *oob,
467475
ucc_addr_storage_t *addr_storage)
@@ -599,6 +607,98 @@ ucc_status_t ucc_core_addr_exchange(
599607
return UCC_OK;
600608
}
601609

610+
ucc_status_t ucc_core_ctx_id_exchange(ucc_context_t *context, ucc_oob_coll_t *oob,
611+
ucc_addr_storage_t *addr_storage)
612+
{
613+
/* Caller must pass addr_storage cleared for a new exchange: at entry,
614+
addr_len == 0 iff storage == NULL. */
615+
ucc_status_t status;
616+
ucc_rank_t i;
617+
const size_t ctx_addr_prefix_len =
618+
ucc_offsetof(ucc_context_addr_header_t, n_components);
619+
ucc_context_addr_header_t *h;
620+
void *scratch;
621+
622+
poll:
623+
if (addr_storage->oob_req) {
624+
status = oob->req_test(addr_storage->oob_req);
625+
if (status < 0) {
626+
oob->req_free(addr_storage->oob_req);
627+
addr_storage->oob_req = NULL;
628+
ucc_free(addr_storage->storage);
629+
addr_storage->storage = NULL;
630+
addr_storage->addr_len = 0;
631+
ucc_error("oob req test failed during ctx id prefix exchange");
632+
return status;
633+
} else if (UCC_INPROGRESS == status) {
634+
return status;
635+
}
636+
oob->req_free(addr_storage->oob_req);
637+
addr_storage->oob_req = NULL;
638+
}
639+
/* If addr_len is still 0, we have not started the prefix allgather yet.
640+
* Once it is started, addr_len is set and we goto poll until completion;
641+
* we never reach the rank loop below with addr_len == 0. */
642+
if (0 == addr_storage->addr_len) {
643+
/* addr_len == 0 iff storage == NULL at entry / before prefix allgather. */
644+
ucc_assert(NULL == addr_storage->storage);
645+
addr_storage->size = oob->n_oob_eps;
646+
647+
addr_storage->storage = ucc_malloc(
648+
(addr_storage->size + 1) * ctx_addr_prefix_len, "ctx_ids_storage");
649+
if (!addr_storage->storage) {
650+
ucc_error(
651+
"failed to allocate %zd bytes for ctx_ids storage",
652+
(addr_storage->size + 1) * ctx_addr_prefix_len);
653+
return UCC_ERR_NO_MEMORY;
654+
}
655+
addr_storage->addr_len = ctx_addr_prefix_len;
656+
657+
scratch = PTR_OFFSET(addr_storage->storage,
658+
ctx_addr_prefix_len * addr_storage->size);
659+
memset(scratch, 0, ctx_addr_prefix_len);
660+
h = (ucc_context_addr_header_t *)scratch;
661+
h->ctx_id = context->id;
662+
h->host_info = ucc_local_host;
663+
664+
status = oob->allgather(scratch, addr_storage->storage,
665+
ctx_addr_prefix_len, oob->coll_info,
666+
&addr_storage->oob_req);
667+
if (UCC_OK != status) {
668+
ucc_free(addr_storage->storage);
669+
addr_storage->storage = NULL;
670+
addr_storage->addr_len = 0;
671+
ucc_error("failed to start oob allgather for ctx_ids");
672+
return status;
673+
}
674+
goto poll;
675+
}
676+
/* addr_len is the per-rank prefix stride; set with storage when the prefix
677+
allgather is started (see allocation block above). */
678+
{
679+
ucc_rank_t r = UCC_RANK_MAX;
680+
681+
for (i = 0; i < addr_storage->size; i++) {
682+
h = (ucc_context_addr_header_t *)PTR_OFFSET(
683+
addr_storage->storage, addr_storage->addr_len * i);
684+
if (UCC_CTX_ID_EQUAL(context->id, h->ctx_id)) {
685+
if (r != UCC_RANK_MAX) {
686+
ucc_error("ctx_id collision: %d %d", r, i);
687+
ucc_free(addr_storage->storage);
688+
addr_storage->storage = NULL;
689+
addr_storage->addr_len = 0;
690+
return UCC_ERR_NO_MESSAGE;
691+
}
692+
r = i;
693+
}
694+
}
695+
696+
addr_storage->flags = 0;
697+
addr_storage->rank = r;
698+
}
699+
return UCC_OK;
700+
}
701+
602702
static void remove_tl_ctx_from_array(ucc_tl_context_t **array, unsigned *size,
603703
ucc_tl_context_t *tl_ctx)
604704
{
@@ -669,6 +769,102 @@ ucc_status_t ucc_context_create_proc_info(
669769
ucc_check_wait_for_debugger(ctx->rank);
670770
#endif
671771
}
772+
773+
ctx->id.pi = *proc_info;
774+
ctx->id.seq_num = ucc_atomic_fadd32(&ucc_context_seq_num, 1);
775+
776+
if (config->node_local_id == UCC_ULUNITS_AUTO) {
777+
if ((params->mask & UCC_CONTEXT_PARAM_FIELD_OOB) &&
778+
params->oob.n_oob_eps == 1) {
779+
/* Single-rank OOB: no allgather; local rank on the node is 0. */
780+
b_params.node_local_id = 0;
781+
} else if ((params->mask & UCC_CONTEXT_PARAM_FIELD_OOB) &&
782+
params->oob.n_oob_eps > 1) {
783+
status = ucc_sysinfo_get_host_info(&ucc_local_host);
784+
if (UCC_OK != status) {
785+
ucc_error("failed to refresh host info for ctx topo exchange");
786+
goto error_ctx_create;
787+
}
788+
do {
789+
/* UCC context create is blocking fn, so we can wait here for the
790+
completion of addr exchange */
791+
status = ucc_core_ctx_id_exchange(ctx, &ctx->params.oob,
792+
&ctx->addr_storage);
793+
if (status < 0) {
794+
ucc_error("failed to exchange addresses during context "
795+
"creation with status: %s",
796+
ucc_status_string(status));
797+
goto error_ctx_create;
798+
}
799+
} while (status == UCC_INPROGRESS);
800+
/* Validate rank before ucc_context_topo_init allocates ctx->topo. */
801+
if (ctx->addr_storage.rank == UCC_RANK_MAX) {
802+
ucc_error("ctx_id not found in exchanged ids, rank detection "
803+
"failed");
804+
status = UCC_ERR_NO_MESSAGE;
805+
ucc_free(ctx->addr_storage.storage);
806+
ctx->addr_storage.storage = NULL;
807+
ctx->addr_storage.addr_len = 0;
808+
goto error_ctx_create;
809+
}
810+
if (ctx->addr_storage.rank != params->oob.oob_ep) {
811+
ucc_error("ctx_id exchange rank %d does not match OOB ep %d",
812+
ctx->addr_storage.rank, params->oob.oob_ep);
813+
status = UCC_ERR_NO_MESSAGE;
814+
ucc_free(ctx->addr_storage.storage);
815+
ctx->addr_storage.storage = NULL;
816+
ctx->addr_storage.addr_len = 0;
817+
goto error_ctx_create;
818+
}
819+
status = ucc_context_topo_init(&ctx->addr_storage, &ctx->topo);
820+
if (UCC_OK != status) {
821+
ucc_free(ctx->addr_storage.storage);
822+
ctx->addr_storage.storage = NULL;
823+
ctx->addr_storage.addr_len = 0;
824+
ucc_error("failed to init ctx topo");
825+
goto error_ctx_create;
826+
}
827+
ucc_assert(ctx->topo != NULL);
828+
829+
{
830+
ucc_subset_t set;
831+
ucc_topo_t *topo = NULL;
832+
833+
memset(&set.map, 0, sizeof(ucc_ep_map_t));
834+
set.map.type = UCC_EP_MAP_FULL;
835+
set.myrank = params->oob.oob_ep;
836+
set.map.ep_num = params->oob.n_oob_eps;
837+
838+
status = ucc_topo_init(set, ctx->topo, &topo);
839+
if (UCC_OK != status) {
840+
ucc_error("failed to init topo for computing local rank");
841+
ucc_free(ctx->addr_storage.storage);
842+
ctx->addr_storage.storage = NULL;
843+
ctx->addr_storage.addr_len = 0;
844+
goto error_ctx_create;
845+
}
846+
b_params.node_local_id = ucc_topo_node_local_rank(topo);
847+
ucc_topo_cleanup(topo);
848+
}
849+
850+
/* clean up addr_storage */
851+
ucc_free(ctx->addr_storage.storage);
852+
ctx->addr_storage.storage = NULL;
853+
ctx->addr_storage.addr_len = 0;
854+
ctx->addr_storage.size = 0;
855+
ctx->addr_storage.rank = UCC_RANK_MAX;
856+
ctx->addr_storage.flags = 0;
857+
ctx->addr_storage.oob_req = NULL;
858+
859+
/* clean up topo */
860+
ucc_context_topo_cleanup(ctx->topo);
861+
ctx->topo = NULL;
862+
} else if (!(params->mask & UCC_CONTEXT_PARAM_FIELD_OOB)) {
863+
/* No OOB: cannot run topo exchange; sole implicit rank on the node. */
864+
b_params.node_local_id = 0;
865+
}
866+
}
867+
672868
status = ucc_create_tl_contexts(ctx, config, b_params);
673869
if (UCC_OK != status) {
674870
/* only critical error could have happened - bail */
@@ -748,8 +944,7 @@ ucc_status_t ucc_context_create_proc_info(
748944
ucc_error("failed to init progress queue for context %p", ctx);
749945
goto error_ctx_create;
750946
}
751-
ctx->id.pi = *proc_info;
752-
ctx->id.seq_num = ucc_atomic_fadd32(&ucc_context_seq_num, 1);
947+
753948
if (params->mask & UCC_CONTEXT_PARAM_FIELD_OOB &&
754949
params->oob.n_oob_eps > 1) {
755950
do {
@@ -765,15 +960,23 @@ ucc_status_t ucc_context_create_proc_info(
765960
}
766961
} while (status == UCC_INPROGRESS);
767962

768-
if (topo_required) {
963+
if (topo_required && !ctx->topo) {
769964
/* At least one available CL context reported it needs topo info */
770965
status = ucc_context_topo_init(&ctx->addr_storage, &ctx->topo);
771966
if (UCC_OK != status) {
772967
ucc_error("failed to init ctx topo");
773968
ucc_free(ctx->addr_storage.storage);
969+
ctx->addr_storage.storage = NULL;
970+
ctx->addr_storage.addr_len = 0;
774971
goto error_ctx_create;
775972
}
776973
}
974+
if (ctx->addr_storage.rank == UCC_RANK_MAX) {
975+
ucc_error("ctx_id not found after address exchange, rank detection "
976+
"failed");
977+
status = UCC_ERR_NO_MESSAGE;
978+
goto error_ctx_create;
979+
}
777980
ucc_assert(ctx->addr_storage.rank == params->oob.oob_ep);
778981
}
779982
if (config->internal_oob) {
@@ -898,6 +1101,10 @@ ucc_status_t ucc_context_create_proc_info(
8981101
}
8991102
ucc_free(ctx->cl_ctx);
9001103
error_ctx:
1104+
ucc_context_topo_cleanup(ctx->topo);
1105+
ctx->topo = NULL;
1106+
ucc_free(ctx->addr_storage.storage);
1107+
ctx->addr_storage.storage = NULL;
9011108
ucc_free(ctx);
9021109
error:
9031110
return status;

src/core/ucc_context.h

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -152,6 +152,13 @@ ucc_status_t ucc_context_progress_deregister(ucc_context_t *ctx,
152152
ucc_status_t ucc_core_addr_exchange(ucc_context_t *context, ucc_oob_coll_t *oob,
153153
ucc_addr_storage_t *addr_storage);
154154

155+
/* Performs context id address exchange between the processes group defined by OOB.
156+
This function is used to exchange the context ids between the processes in order
157+
to find the local rank.
158+
*/
159+
ucc_status_t ucc_core_ctx_id_exchange(ucc_context_t *context, ucc_oob_coll_t *oob,
160+
ucc_addr_storage_t *addr_storage);
161+
155162
/* UCC context packed address layout:
156163
--------------------------------------------------------------------------
157164
|ctx_id|host_info|n_components|id0|offset0|id1|offset1|..|

tools/perf/ucc_pt_comm.cc

Lines changed: 0 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -167,9 +167,6 @@ ucc_status_t ucc_pt_comm::init()
167167
cfg_mod = std::to_string(bootstrap->get_ppn());
168168
UCCCHECK_GOTO(ucc_context_config_modify(ctx_config, NULL,
169169
"ESTIMATED_NUM_PPN", cfg_mod.c_str()), free_ctx_config, st);
170-
cfg_mod = std::to_string(bootstrap->get_local_rank());
171-
UCCCHECK_GOTO(ucc_context_config_modify(ctx_config, NULL,
172-
"NODE_LOCAL_ID", cfg_mod.c_str()), free_ctx_config, st);
173170
std::memset(&ctx_params, 0, sizeof(ucc_context_params_t));
174171
ctx_params.mask = UCC_CONTEXT_PARAM_FIELD_TYPE |
175172
UCC_CONTEXT_PARAM_FIELD_OOB |

0 commit comments

Comments
 (0)