Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 7 additions & 0 deletions fastdeploy/rl/dynamic_weight_manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -348,6 +348,13 @@ def clear_parameters(self, pid: int = 0, shutdown_process_group=False) -> None:
if shutdown_process_group:
paddle.distributed.shutdown_process_group(self.parallel_config.ep_group)
if shutdown_process_group:
Comment thread
Sunny-bot1 marked this conversation as resolved.
# ProcessGroupGloo has no shutdown(); remove it from paddle's registry
# before the global sweep to avoid AttributeError.
from paddle.distributed.collective import _get_group_map_by_name

for name, pg in list(_get_group_map_by_name().items()):
if pg.process_group is not None and not hasattr(pg.process_group, "shutdown"):
_get_group_map_by_name().pop(name, None)
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🟡 建议 在循环内重复调用 _get_group_map_by_name() 且使用私有 API

当前代码在循环内调用了两次 _get_group_map_by_name()——第一次 .items() 用于遍历,第二次 .pop() 用于删除。若 Paddle 内部实现返回同一个可变 dict 引用,则当前代码可正常工作;但若内部实现将来改为返回副本,则 pop 会静默失效,gloo group 无法被清除,shutdown_process_group() 仍会抛出 AttributeError

同时,_get_group_map_by_name 是 Paddle 内部私有 API(以 _ 开头),可能在 Paddle 版本升级时无预警地变更或删除。

建议缓存引用,同时做防御性注释:

group_map = _get_group_map_by_name()  # internal API, cache once
for name, pg in list(group_map.items()):
    if pg.process_group is not None and not hasattr(pg.process_group, "shutdown"):
        group_map.pop(name, None)

paddle.distributed.shutdown_process_group()
self._update_shared_status(pid, ModelWeightsStatus.CLEARED)

Expand Down
17 changes: 12 additions & 5 deletions fastdeploy/worker/worker_process.py
Original file line number Diff line number Diff line change
Expand Up @@ -178,6 +178,10 @@ def __init__(self, fd_config: FDConfig, ranks: int = 1, local_rank: int = 0) ->
self.max_chips_per_node = 16 if current_platform.is_iluvatar() else 8
self.enable_overlap_schedule = self.scheduler_config.enable_overlap_schedule
self.cached_control_reqs = []
if self.ranks > 1:
self.gloo_group = dist.new_group(list(range(self.ranks)), backend="gloo")

This comment was marked as outdated.

else:
self.gloo_group = None

def init_control(self):
engine_worker_queue_port = self.parallel_config.local_engine_worker_queue_port
Expand Down Expand Up @@ -312,9 +316,12 @@ def update_weights_from_tensor(self, mmap_infos):
self.experts_manager.tensor_infos = None

def _broadcast_model_weights_signal(self, src: int, group) -> int:
signal_list = [self.model_weights_signal[0]]
paddle.distributed.broadcast_object_list(signal_list, src=src, group=group)
return int(signal_list[0])
model_weights_signal_tensor = paddle.full(
shape=[1], fill_value=self.model_weights_signal[0], dtype="int32", device="cpu"
)
paddle.distributed.broadcast(model_weights_signal_tensor, src=src, group=group)
value = model_weights_signal_tensor.numpy()[0]
Comment thread
Sunny-bot1 marked this conversation as resolved.
return int(value)

def _get_exist_task_flag(self) -> bool:
if self.nnode > 1:
Expand Down Expand Up @@ -465,7 +472,7 @@ def event_loop_normal(self) -> None:
if self.fd_config.load_config.dynamic_load_weight and not envs.FD_ENABLE_V1_UPDATE_WEIGHTS:
self.model_weights_signal[0] = int(self.model_weights_status.value[0])
if self.ranks > 1:
self.model_weights_signal[0] = self._broadcast_model_weights_signal(src=0, group=None)
self.model_weights_signal[0] = self._broadcast_model_weights_signal(src=0, group=self.gloo_group)

req_dicts = None
self.worker_healthy_live_signal.value[tp_rank % self.max_chips_per_node] = int(time.time())
Expand Down Expand Up @@ -530,7 +537,7 @@ def event_loop_normal(self) -> None:
self.model_weights_signal[0] = self.model_weights_status.value[0]
if self.ranks > 1:
self.model_weights_signal[0] = self._broadcast_model_weights_signal(
src=0, group=None
src=0, group=self.gloo_group
)
time.sleep(1)
self.model_weights_status.value[0] = (
Expand Down
Loading