diff --git a/fastdeploy/rl/dynamic_weight_manager.py b/fastdeploy/rl/dynamic_weight_manager.py index a6f14068abc..c30da6f9124 100644 --- a/fastdeploy/rl/dynamic_weight_manager.py +++ b/fastdeploy/rl/dynamic_weight_manager.py @@ -348,6 +348,13 @@ def clear_parameters(self, pid: int = 0, shutdown_process_group=False) -> None: if shutdown_process_group: paddle.distributed.shutdown_process_group(self.parallel_config.ep_group) if shutdown_process_group: + # ProcessGroupGloo has no shutdown(); remove it from paddle's registry + # before the global sweep to avoid AttributeError. + from paddle.distributed.collective import _get_group_map_by_name + + for name, pg in list(_get_group_map_by_name().items()): + if pg.process_group is not None and not hasattr(pg.process_group, "shutdown"): + _get_group_map_by_name().pop(name, None) paddle.distributed.shutdown_process_group() self._update_shared_status(pid, ModelWeightsStatus.CLEARED) diff --git a/fastdeploy/worker/worker_process.py b/fastdeploy/worker/worker_process.py index 6096759825a..694e2b23c80 100644 --- a/fastdeploy/worker/worker_process.py +++ b/fastdeploy/worker/worker_process.py @@ -178,6 +178,10 @@ def __init__(self, fd_config: FDConfig, ranks: int = 1, local_rank: int = 0) -> self.max_chips_per_node = 16 if current_platform.is_iluvatar() else 8 self.enable_overlap_schedule = self.scheduler_config.enable_overlap_schedule self.cached_control_reqs = [] + if self.ranks > 1: + self.gloo_group = dist.new_group(list(range(self.ranks)), backend="gloo") + else: + self.gloo_group = None def init_control(self): engine_worker_queue_port = self.parallel_config.local_engine_worker_queue_port @@ -312,9 +316,12 @@ def update_weights_from_tensor(self, mmap_infos): self.experts_manager.tensor_infos = None def _broadcast_model_weights_signal(self, src: int, group) -> int: - signal_list = [self.model_weights_signal[0]] - paddle.distributed.broadcast_object_list(signal_list, src=src, group=group) - return int(signal_list[0]) + model_weights_signal_tensor = paddle.full( + shape=[1], fill_value=self.model_weights_signal[0], dtype="int32", device="cpu" + ) + paddle.distributed.broadcast(model_weights_signal_tensor, src=src, group=group) + value = model_weights_signal_tensor.numpy()[0] + return int(value) def _get_exist_task_flag(self) -> bool: if self.nnode > 1: @@ -465,7 +472,7 @@ def event_loop_normal(self) -> None: if self.fd_config.load_config.dynamic_load_weight and not envs.FD_ENABLE_V1_UPDATE_WEIGHTS: self.model_weights_signal[0] = int(self.model_weights_status.value[0]) if self.ranks > 1: - self.model_weights_signal[0] = self._broadcast_model_weights_signal(src=0, group=None) + self.model_weights_signal[0] = self._broadcast_model_weights_signal(src=0, group=self.gloo_group) req_dicts = None self.worker_healthy_live_signal.value[tp_rank % self.max_chips_per_node] = int(time.time()) @@ -530,7 +537,7 @@ def event_loop_normal(self) -> None: self.model_weights_signal[0] = self.model_weights_status.value[0] if self.ranks > 1: self.model_weights_signal[0] = self._broadcast_model_weights_signal( - src=0, group=None + src=0, group=self.gloo_group ) time.sleep(1) self.model_weights_status.value[0] = (