From 14e1c4bcea0735fec60ed92ad0bb018125c84abc Mon Sep 17 00:00:00 2001 From: Sunny-bot1 <592045536@qq.com> Date: Fri, 15 May 2026 18:19:23 +0800 Subject: [PATCH 1/5] support cpu tensor broadcast --- fastdeploy/worker/worker_process.py | 15 ++++++++++----- 1 file changed, 10 insertions(+), 5 deletions(-) diff --git a/fastdeploy/worker/worker_process.py b/fastdeploy/worker/worker_process.py index 6096759825a..9dc464ad771 100644 --- a/fastdeploy/worker/worker_process.py +++ b/fastdeploy/worker/worker_process.py @@ -312,9 +312,12 @@ def update_weights_from_tensor(self, mmap_infos): self.experts_manager.tensor_infos = None def _broadcast_model_weights_signal(self, src: int, group) -> int: - signal_list = [self.model_weights_signal[0]] - paddle.distributed.broadcast_object_list(signal_list, src=src, group=group) - return int(signal_list[0]) + model_weights_signal_tensor = paddle.full( + shape=[1], fill_value=self.model_weights_signal[0], dtype="int32", place=paddle.CPUPlace() + ) + paddle.distributed.broadcast(model_weights_signal_tensor, src=src, group=group) + value = model_weights_signal_tensor.numpy()[0] + return int(value) def _get_exist_task_flag(self) -> bool: if self.nnode > 1: @@ -465,7 +468,8 @@ def event_loop_normal(self) -> None: if self.fd_config.load_config.dynamic_load_weight and not envs.FD_ENABLE_V1_UPDATE_WEIGHTS: self.model_weights_signal[0] = int(self.model_weights_status.value[0]) if self.ranks > 1: - self.model_weights_signal[0] = self._broadcast_model_weights_signal(src=0, group=None) + group = dist.new_group(list(range(self.ranks)), backend="gloo") + self.model_weights_signal[0] = self._broadcast_model_weights_signal(src=0, group=group) req_dicts = None self.worker_healthy_live_signal.value[tp_rank % self.max_chips_per_node] = int(time.time()) @@ -529,8 +533,9 @@ def event_loop_normal(self) -> None: while self.model_weights_signal[0] != ModelWeightsStatus.UPDATING: self.model_weights_signal[0] = self.model_weights_status.value[0] if self.ranks > 1: + group = dist.new_group(list(range(self.ranks)), backend="gloo") self.model_weights_signal[0] = self._broadcast_model_weights_signal( - src=0, group=None + src=0, group=group ) time.sleep(1) self.model_weights_status.value[0] = ( From f60365d777c4a6508f8a153eb457b005444adce7 Mon Sep 17 00:00:00 2001 From: Sunny-bot1 <592045536@qq.com> Date: Fri, 15 May 2026 18:35:39 +0800 Subject: [PATCH 2/5] fix place --- fastdeploy/worker/worker_process.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/fastdeploy/worker/worker_process.py b/fastdeploy/worker/worker_process.py index 9dc464ad771..7c702a1ac45 100644 --- a/fastdeploy/worker/worker_process.py +++ b/fastdeploy/worker/worker_process.py @@ -313,7 +313,7 @@ def update_weights_from_tensor(self, mmap_infos): def _broadcast_model_weights_signal(self, src: int, group) -> int: model_weights_signal_tensor = paddle.full( - shape=[1], fill_value=self.model_weights_signal[0], dtype="int32", place=paddle.CPUPlace() + shape=[1], fill_value=self.model_weights_signal[0], dtype="int32", device="cpu" ) paddle.distributed.broadcast(model_weights_signal_tensor, src=src, group=group) value = model_weights_signal_tensor.numpy()[0] From 731f06dc75f58d94936e763e7576a7db390f8673 Mon Sep 17 00:00:00 2001 From: Sunny-bot1 <592045536@qq.com> Date: Mon, 18 May 2026 10:46:18 +0800 Subject: [PATCH 3/5] fix group --- fastdeploy/worker/worker_process.py | 7 +++---- 1 file changed, 3 insertions(+), 4 deletions(-) diff --git a/fastdeploy/worker/worker_process.py b/fastdeploy/worker/worker_process.py index 7c702a1ac45..3ba4f9759a5 100644 --- a/fastdeploy/worker/worker_process.py +++ b/fastdeploy/worker/worker_process.py @@ -178,6 +178,7 @@ def __init__(self, fd_config: FDConfig, ranks: int = 1, local_rank: int = 0) -> self.max_chips_per_node = 16 if current_platform.is_iluvatar() else 8 self.enable_overlap_schedule = self.scheduler_config.enable_overlap_schedule self.cached_control_reqs = [] + self.gloo_group = dist.new_group(list(range(self.ranks)), backend="gloo") def init_control(self): engine_worker_queue_port = self.parallel_config.local_engine_worker_queue_port @@ -468,8 +469,7 @@ def event_loop_normal(self) -> None: if self.fd_config.load_config.dynamic_load_weight and not envs.FD_ENABLE_V1_UPDATE_WEIGHTS: self.model_weights_signal[0] = int(self.model_weights_status.value[0]) if self.ranks > 1: - group = dist.new_group(list(range(self.ranks)), backend="gloo") - self.model_weights_signal[0] = self._broadcast_model_weights_signal(src=0, group=group) + self.model_weights_signal[0] = self._broadcast_model_weights_signal(src=0, group=self.gloo_group) req_dicts = None self.worker_healthy_live_signal.value[tp_rank % self.max_chips_per_node] = int(time.time()) @@ -533,9 +533,8 @@ def event_loop_normal(self) -> None: while self.model_weights_signal[0] != ModelWeightsStatus.UPDATING: self.model_weights_signal[0] = self.model_weights_status.value[0] if self.ranks > 1: - group = dist.new_group(list(range(self.ranks)), backend="gloo") self.model_weights_signal[0] = self._broadcast_model_weights_signal( - src=0, group=group + src=0, group=self.gloo_group ) time.sleep(1) self.model_weights_status.value[0] = ( From d15c1e5f68abbad9c8f6ac8e0bb73995da792b6d Mon Sep 17 00:00:00 2001 From: Sunny-bot1 <592045536@qq.com> Date: Mon, 18 May 2026 11:02:56 +0800 Subject: [PATCH 4/5] fix init --- fastdeploy/worker/worker_process.py | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/fastdeploy/worker/worker_process.py b/fastdeploy/worker/worker_process.py index 3ba4f9759a5..694e2b23c80 100644 --- a/fastdeploy/worker/worker_process.py +++ b/fastdeploy/worker/worker_process.py @@ -178,7 +178,10 @@ def __init__(self, fd_config: FDConfig, ranks: int = 1, local_rank: int = 0) -> self.max_chips_per_node = 16 if current_platform.is_iluvatar() else 8 self.enable_overlap_schedule = self.scheduler_config.enable_overlap_schedule self.cached_control_reqs = [] - self.gloo_group = dist.new_group(list(range(self.ranks)), backend="gloo") + if self.ranks > 1: + self.gloo_group = dist.new_group(list(range(self.ranks)), backend="gloo") + else: + self.gloo_group = None def init_control(self): engine_worker_queue_port = self.parallel_config.local_engine_worker_queue_port From c9f30fd0cacc467f409c6b8f13e96435a96815cc Mon Sep 17 00:00:00 2001 From: Sunny-bot1 <592045536@qq.com> Date: Mon, 18 May 2026 12:03:12 +0800 Subject: [PATCH 5/5] fix shutdown process group --- fastdeploy/rl/dynamic_weight_manager.py | 7 +++++++ 1 file changed, 7 insertions(+) diff --git a/fastdeploy/rl/dynamic_weight_manager.py b/fastdeploy/rl/dynamic_weight_manager.py index a6f14068abc..c30da6f9124 100644 --- a/fastdeploy/rl/dynamic_weight_manager.py +++ b/fastdeploy/rl/dynamic_weight_manager.py @@ -348,6 +348,13 @@ def clear_parameters(self, pid: int = 0, shutdown_process_group=False) -> None: if shutdown_process_group: paddle.distributed.shutdown_process_group(self.parallel_config.ep_group) if shutdown_process_group: + # ProcessGroupGloo has no shutdown(); remove it from paddle's registry + # before the global sweep to avoid AttributeError. + from paddle.distributed.collective import _get_group_map_by_name + + for name, pg in list(_get_group_map_by_name().items()): + if pg.process_group is not None and not hasattr(pg.process_group, "shutdown"): + _get_group_map_by_name().pop(name, None) paddle.distributed.shutdown_process_group() self._update_shared_status(pid, ModelWeightsStatus.CLEARED)