Skip to content

Commit e05830c

Browse files
UCC/CORE: Added automation to loading local rank from topo if not provided by user
1 parent 5852377 commit e05830c

3 files changed

Lines changed: 231 additions & 4 deletions

File tree

src/components/topo/ucc_topo.h

Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -290,6 +290,21 @@ static inline ucc_rank_t ucc_topo_nnodes(ucc_topo_t *topo)
290290
return sbgp->group_size;
291291
}
292292

293+
static inline ucc_rank_t ucc_topo_node_local_rank(ucc_topo_t *topo)
294+
{
295+
ucc_sbgp_t *sbgp;
296+
297+
if (!topo) {
298+
return UCC_RANK_INVALID;
299+
}
300+
sbgp = ucc_topo_get_sbgp(topo, UCC_SBGP_NODE);
301+
302+
if (sbgp->status != UCC_SBGP_ENABLED) {
303+
return UCC_RANK_INVALID;
304+
}
305+
return sbgp->group_rank;
306+
}
307+
293308
/* Returns node leaders array - array that maps each rank to the TEAM RANK that
294309
is the leader of that rank's node. Also returns per-node leaders array - array
295310
mapping node_id to the TEAM RANK of that node's leader */

src/core/ucc_context.c

Lines changed: 209 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@
66

77
#include "config.h"
88
#include "ucc_context.h"
9+
#include "components/topo/ucc_topo.h"
910
#include "utils/ucc_proc_info.h"
1011
#include "components/cl/ucc_cl.h"
1112
#include "components/tl/ucc_tl.h"
@@ -60,7 +61,12 @@ static ucc_config_field_t ucc_context_config_table[] = {
6061
ucc_offsetof(ucc_context_config_t, net_devices), UCC_CONFIG_TYPE_STRING_ARRAY},
6162

6263
{"NODE_LOCAL_ID", "auto",
63-
"An optimization hint for the local identificator on a single node.",
64+
"An optimization hint for the local rank index on a single node.\n"
65+
"With OOB and more than one endpoint, \"auto\" runs one prefix allgather "
66+
"(ctx id / host), then the main address exchange (two allgathers: "
67+
"lengths, then full addresses). Custom OOB backends must support three "
68+
"sequential allgather operations on the same coll_info (each completes "
69+
"before the next begins).",
6470
ucc_offsetof(ucc_context_config_t, node_local_id), UCC_CONFIG_TYPE_ULUNITS},
6571

6672
{NULL}};
@@ -599,6 +605,98 @@ ucc_status_t ucc_core_addr_exchange(
599605
return UCC_OK;
600606
}
601607

608+
ucc_status_t ucc_core_ctx_id_exchange(ucc_context_t *context, ucc_oob_coll_t *oob,
609+
ucc_addr_storage_t *addr_storage)
610+
{
611+
/* Caller must pass addr_storage cleared for a new exchange: at entry,
612+
addr_len == 0 iff storage == NULL. */
613+
ucc_status_t status;
614+
ucc_rank_t i;
615+
const size_t ctx_addr_prefix_len =
616+
ucc_offsetof(ucc_context_addr_header_t, n_components);
617+
ucc_context_addr_header_t *h;
618+
void *scratch;
619+
620+
poll:
621+
if (addr_storage->oob_req) {
622+
status = oob->req_test(addr_storage->oob_req);
623+
if (status < 0) {
624+
oob->req_free(addr_storage->oob_req);
625+
addr_storage->oob_req = NULL;
626+
ucc_free(addr_storage->storage);
627+
addr_storage->storage = NULL;
628+
addr_storage->addr_len = 0;
629+
ucc_error("oob req test failed during ctx id prefix exchange");
630+
return status;
631+
} else if (UCC_INPROGRESS == status) {
632+
return status;
633+
}
634+
oob->req_free(addr_storage->oob_req);
635+
addr_storage->oob_req = NULL;
636+
}
637+
/* If addr_len is still 0, we have not started the prefix allgather yet.
638+
* Once it is started, addr_len is set and we goto poll until completion;
639+
* we never reach the rank loop below with addr_len == 0. */
640+
if (0 == addr_storage->addr_len) {
641+
/* addr_len == 0 iff storage == NULL at entry / before prefix allgather. */
642+
ucc_assert(NULL == addr_storage->storage);
643+
addr_storage->size = oob->n_oob_eps;
644+
645+
addr_storage->storage = ucc_malloc(
646+
(addr_storage->size + 1) * ctx_addr_prefix_len, "ctx_ids_storage");
647+
if (!addr_storage->storage) {
648+
ucc_error(
649+
"failed to allocate %zd bytes for ctx_ids storage",
650+
(addr_storage->size + 1) * ctx_addr_prefix_len);
651+
return UCC_ERR_NO_MEMORY;
652+
}
653+
addr_storage->addr_len = ctx_addr_prefix_len;
654+
655+
scratch = PTR_OFFSET(addr_storage->storage,
656+
ctx_addr_prefix_len * addr_storage->size);
657+
memset(scratch, 0, ctx_addr_prefix_len);
658+
h = (ucc_context_addr_header_t *)scratch;
659+
h->ctx_id = context->id;
660+
h->host_info = ucc_local_host;
661+
662+
status = oob->allgather(scratch, addr_storage->storage,
663+
ctx_addr_prefix_len, oob->coll_info,
664+
&addr_storage->oob_req);
665+
if (UCC_OK != status) {
666+
ucc_free(addr_storage->storage);
667+
addr_storage->storage = NULL;
668+
addr_storage->addr_len = 0;
669+
ucc_error("failed to start oob allgather for ctx_ids");
670+
return status;
671+
}
672+
goto poll;
673+
}
674+
/* addr_len is the per-rank prefix stride; set with storage when the prefix
675+
allgather is started (see allocation block above). */
676+
{
677+
ucc_rank_t r = UCC_RANK_MAX;
678+
679+
for (i = 0; i < addr_storage->size; i++) {
680+
h = (ucc_context_addr_header_t *)PTR_OFFSET(
681+
addr_storage->storage, addr_storage->addr_len * i);
682+
if (UCC_CTX_ID_EQUAL(context->id, h->ctx_id)) {
683+
if (r != UCC_RANK_MAX) {
684+
ucc_error("ctx_id collision: %d %d", r, i);
685+
ucc_free(addr_storage->storage);
686+
addr_storage->storage = NULL;
687+
addr_storage->addr_len = 0;
688+
return UCC_ERR_NO_MESSAGE;
689+
}
690+
r = i;
691+
}
692+
}
693+
694+
addr_storage->flags = 0;
695+
addr_storage->rank = r;
696+
}
697+
return UCC_OK;
698+
}
699+
602700
static void remove_tl_ctx_from_array(ucc_tl_context_t **array, unsigned *size,
603701
ucc_tl_context_t *tl_ctx)
604702
{
@@ -669,6 +767,102 @@ ucc_status_t ucc_context_create_proc_info(
669767
ucc_check_wait_for_debugger(ctx->rank);
670768
#endif
671769
}
770+
771+
ctx->id.pi = *proc_info;
772+
ctx->id.seq_num = ucc_atomic_fadd32(&ucc_context_seq_num, 1);
773+
774+
if (config->node_local_id == UCC_ULUNITS_AUTO) {
775+
if ((params->mask & UCC_CONTEXT_PARAM_FIELD_OOB) &&
776+
params->oob.n_oob_eps == 1) {
777+
/* Single-rank OOB: no allgather; local rank on the node is 0. */
778+
b_params.node_local_id = 0;
779+
} else if ((params->mask & UCC_CONTEXT_PARAM_FIELD_OOB) &&
780+
params->oob.n_oob_eps > 1) {
781+
status = ucc_sysinfo_get_host_info(&ucc_local_host);
782+
if (UCC_OK != status) {
783+
ucc_error("failed to refresh host info for ctx topo exchange");
784+
goto error_ctx_create;
785+
}
786+
do {
787+
/* UCC context create is blocking fn, so we can wait here for the
788+
completion of addr exchange */
789+
status = ucc_core_ctx_id_exchange(ctx, &ctx->params.oob,
790+
&ctx->addr_storage);
791+
if (status < 0) {
792+
ucc_error("failed to exchange addresses during context "
793+
"creation with status: %s",
794+
ucc_status_string(status));
795+
goto error_ctx_create;
796+
}
797+
} while (status == UCC_INPROGRESS);
798+
/* Validate rank before ucc_context_topo_init allocates ctx->topo. */
799+
if (ctx->addr_storage.rank == UCC_RANK_MAX) {
800+
ucc_error("ctx_id not found in exchanged ids, rank detection "
801+
"failed");
802+
status = UCC_ERR_NO_MESSAGE;
803+
ucc_free(ctx->addr_storage.storage);
804+
ctx->addr_storage.storage = NULL;
805+
ctx->addr_storage.addr_len = 0;
806+
goto error_ctx_create;
807+
}
808+
if (ctx->addr_storage.rank != params->oob.oob_ep) {
809+
ucc_error("ctx_id exchange rank %d does not match OOB ep %d",
810+
ctx->addr_storage.rank, params->oob.oob_ep);
811+
status = UCC_ERR_NO_MESSAGE;
812+
ucc_free(ctx->addr_storage.storage);
813+
ctx->addr_storage.storage = NULL;
814+
ctx->addr_storage.addr_len = 0;
815+
goto error_ctx_create;
816+
}
817+
status = ucc_context_topo_init(&ctx->addr_storage, &ctx->topo);
818+
if (UCC_OK != status) {
819+
ucc_free(ctx->addr_storage.storage);
820+
ctx->addr_storage.storage = NULL;
821+
ctx->addr_storage.addr_len = 0;
822+
ucc_error("failed to init ctx topo");
823+
goto error_ctx_create;
824+
}
825+
ucc_assert(ctx->topo != NULL);
826+
827+
{
828+
ucc_subset_t set;
829+
ucc_topo_t *topo = NULL;
830+
831+
memset(&set.map, 0, sizeof(ucc_ep_map_t));
832+
set.map.type = UCC_EP_MAP_FULL;
833+
set.myrank = params->oob.oob_ep;
834+
set.map.ep_num = params->oob.n_oob_eps;
835+
836+
status = ucc_topo_init(set, ctx->topo, &topo);
837+
if (UCC_OK != status) {
838+
ucc_error("failed to init topo for computing local rank");
839+
ucc_free(ctx->addr_storage.storage);
840+
ctx->addr_storage.storage = NULL;
841+
ctx->addr_storage.addr_len = 0;
842+
goto error_ctx_create;
843+
}
844+
b_params.node_local_id = ucc_topo_node_local_rank(topo);
845+
ucc_topo_cleanup(topo);
846+
}
847+
848+
/* clean up addr_storage */
849+
ucc_free(ctx->addr_storage.storage);
850+
ctx->addr_storage.storage = NULL;
851+
ctx->addr_storage.addr_len = 0;
852+
ctx->addr_storage.size = 0;
853+
ctx->addr_storage.rank = UCC_RANK_MAX;
854+
ctx->addr_storage.flags = 0;
855+
ctx->addr_storage.oob_req = NULL;
856+
857+
/* clean up topo */
858+
ucc_context_topo_cleanup(ctx->topo);
859+
ctx->topo = NULL;
860+
} else if (!(params->mask & UCC_CONTEXT_PARAM_FIELD_OOB)) {
861+
/* No OOB: cannot run topo exchange; sole implicit rank on the node. */
862+
b_params.node_local_id = 0;
863+
}
864+
}
865+
672866
status = ucc_create_tl_contexts(ctx, config, b_params);
673867
if (UCC_OK != status) {
674868
/* only critical error could have happened - bail */
@@ -748,8 +942,7 @@ ucc_status_t ucc_context_create_proc_info(
748942
ucc_error("failed to init progress queue for context %p", ctx);
749943
goto error_ctx_create;
750944
}
751-
ctx->id.pi = *proc_info;
752-
ctx->id.seq_num = ucc_atomic_fadd32(&ucc_context_seq_num, 1);
945+
753946
if (params->mask & UCC_CONTEXT_PARAM_FIELD_OOB &&
754947
params->oob.n_oob_eps > 1) {
755948
do {
@@ -765,15 +958,23 @@ ucc_status_t ucc_context_create_proc_info(
765958
}
766959
} while (status == UCC_INPROGRESS);
767960

768-
if (topo_required) {
961+
if (topo_required && !ctx->topo) {
769962
/* At least one available CL context reported it needs topo info */
770963
status = ucc_context_topo_init(&ctx->addr_storage, &ctx->topo);
771964
if (UCC_OK != status) {
772965
ucc_error("failed to init ctx topo");
773966
ucc_free(ctx->addr_storage.storage);
967+
ctx->addr_storage.storage = NULL;
968+
ctx->addr_storage.addr_len = 0;
774969
goto error_ctx_create;
775970
}
776971
}
972+
if (ctx->addr_storage.rank == UCC_RANK_MAX) {
973+
ucc_error("ctx_id not found after address exchange, rank detection "
974+
"failed");
975+
status = UCC_ERR_NO_MESSAGE;
976+
goto error_ctx_create;
977+
}
777978
ucc_assert(ctx->addr_storage.rank == params->oob.oob_ep);
778979
}
779980
if (config->internal_oob) {
@@ -898,6 +1099,10 @@ ucc_status_t ucc_context_create_proc_info(
8981099
}
8991100
ucc_free(ctx->cl_ctx);
9001101
error_ctx:
1102+
ucc_context_topo_cleanup(ctx->topo);
1103+
ctx->topo = NULL;
1104+
ucc_free(ctx->addr_storage.storage);
1105+
ctx->addr_storage.storage = NULL;
9011106
ucc_free(ctx);
9021107
error:
9031108
return status;

src/core/ucc_context.h

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -152,6 +152,13 @@ ucc_status_t ucc_context_progress_deregister(ucc_context_t *ctx,
152152
ucc_status_t ucc_core_addr_exchange(ucc_context_t *context, ucc_oob_coll_t *oob,
153153
ucc_addr_storage_t *addr_storage);
154154

155+
/* Performs context id address exchange between the processes group defined by OOB.
156+
This function is used to exchange the context ids between the processes in order
157+
to find the local rank.
158+
*/
159+
ucc_status_t ucc_core_ctx_id_exchange(ucc_context_t *context, ucc_oob_coll_t *oob,
160+
ucc_addr_storage_t *addr_storage);
161+
155162
/* UCC context packed address layout:
156163
--------------------------------------------------------------------------
157164
|ctx_id|host_info|n_components|id0|offset0|id1|offset1|..|

0 commit comments

Comments
 (0)