Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion fastdeploy/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -319,7 +319,7 @@ def _post_init(self):
if self.runner_type == "generate" and not is_generative_model:
if is_multimodal_model:
pass
elif self.model_impl in ("auto", "paddleformers"):
elif self.model_impl in ("auto", "paddleformers", "paddlefleet"):
# Skip check for auto/paddleformers - may fallback to paddleformers which supports any model
pass
else:
Expand Down
49 changes: 21 additions & 28 deletions fastdeploy/model_executor/models/paddleformers/base_fleet.py
Original file line number Diff line number Diff line change
Expand Up @@ -290,8 +290,6 @@ def __init__(self, fd_config: "FDConfig", **kwargs):

# Assign parallel config from fd_config.parallel_config to paddleformers_config
parallel_config = fd_config.parallel_config
# parallel_config.tensor_parallel_size = 1
# parallel_config.expert_parallel_size = 2
self.paddleformers_config.data_parallel_size = parallel_config.data_parallel_size
self.paddleformers_config.tensor_model_parallel_size = parallel_config.tensor_parallel_size
self.paddleformers_config.sequence_parallel = parallel_config.sequence_parallel
Expand All @@ -305,6 +303,8 @@ def __init__(self, fd_config: "FDConfig", **kwargs):
# self.paddleformers_config.moe_grouped_gemm = True
self.paddleformers_config.moe_token_dispatcher_type = "deepep"
# self.paddleformers_config.use_cpu_initialization = True
self.paddleformers_config.use_cpu_initialization = True
self.paddleformers_config.perform_initialization = False
self.paddleformers_config.gated_attention = getattr(self.paddleformers_config, "use_gated_attn", False)
if getattr(self.paddleformers_config, "multi_latent_attention", False):
self.paddleformers_config.qk_head_dim = (
Expand Down Expand Up @@ -396,6 +396,16 @@ def _init_paddlefleet_parallel_state(self, fd_config) -> None:
"mp",
],
}
# Reset parallel state so that PaddleFleet's fleet.init can reinitialize
# with the correct EP topology instead of reusing FastDeploy's.
import paddle.distributed.fleet.base.topology as tp_mod
import paddle.distributed.parallel_helper as ph

# 1) Reset hybrid parallel group so _init_hybrid_parallel_env runs again
tp_mod._HYBRID_PARALLEL_GROUP = None
# 2) Reset parallel context so init_parallel_env runs again
ph.__parallel_ctx__clz__ = None

fleet.init(is_collective=True, strategy=strategy)
logger.info(
f"Initialized PaddleFleet parallel_state via initialize_fleet "
Expand All @@ -404,40 +414,23 @@ def _init_paddlefleet_parallel_state(self, fd_config) -> None:
f"ep={parallel_config.expert_parallel_size}, "
f"sp={parallel_config.sequence_parallel})"
)

import paddle.distributed as dist
from paddlefleet import parallel_state

hcg = fleet.get_hybrid_communicate_group()
expected_tp_size = parallel_config.tensor_parallel_size

# Check if we need to initialize or reinitialize TP group
need_init = False
if parallel_state._TENSOR_MODEL_PARALLEL_GROUP is None:
need_init = True
reason = "TP group not initialized"
else:
# Check if current TP group size matches expected
current_tp_group = parallel_state._TENSOR_MODEL_PARALLEL_GROUP
current_tp_size = getattr(current_tp_group, "nranks", None)
tp_group = parallel_state._TENSOR_MODEL_PARALLEL_GROUP
current_tp_size = None
if tp_group is not None:
current_tp_size = getattr(tp_group, "nranks", None)
if current_tp_size is None:
current_tp_size = getattr(current_tp_group, "world_size", None)
if current_tp_size != expected_tp_size:
need_init = True
reason = f"TP group size mismatch: current={current_tp_size}, expected={expected_tp_size}"
current_tp_size = getattr(tp_group, "world_size", None)

expected_tp_size = parallel_config.tensor_parallel_size
need_init = tp_group is None or current_tp_size != expected_tp_size
if need_init:
logger.warning(f"{reason}, reinitializing TP group with size={expected_tp_size}")
if expected_tp_size == 1:
# Single process TP group - create manually
current_rank = dist.get_rank()
tp_ranks = [current_rank]
default_pg = dist.new_group(ranks=tp_ranks)
parallel_state._TENSOR_MODEL_PARALLEL_GROUP = default_pg
parallel_state._TENSOR_MODEL_PARALLEL_GLOBAL_RANKS = tp_ranks
logger.info(f"Reinitialized TP group with size=1, rank={current_rank}, ranks={tp_ranks}")
parallel_state._TENSOR_MODEL_PARALLEL_GROUP = dist.new_group(ranks=[dist.get_rank()])

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🔴 Bug TP=1 的手工重建分支只替换了 _TENSOR_MODEL_PARALLEL_GROUP,没有同步重置对应的 global ranks 状态。

这个函数前面刚把 HCG/parallel context 置空,目标是避免 PaddleFleet 复用旧拓扑;但当 expected_tp_size == 1 时,这里只创建新 group,删除了原先同步写 _TENSOR_MODEL_PARALLEL_GLOBAL_RANKS = [current_rank] 的逻辑。如果进程里已有上一次初始化留下的 global ranks,PaddleFleet parallel_state 会出现 group 与 global ranks 不一致,后续依赖 tensor-parallel rank/global ranks 的 sharded state 或随机种子初始化仍可能按旧拓扑运行。

建议修复方式:在 TP=1 分支同时设置与新 group 匹配的 global ranks,例如先保存 current_rank = dist.get_rank(),然后同时写:

parallel_state._TENSOR_MODEL_PARALLEL_GROUP = dist.new_group(ranks=[current_rank])
parallel_state._TENSOR_MODEL_PARALLEL_GLOBAL_RANKS = [current_rank]

如果 PaddleFleet 提供 destroy/reset API,更稳妥的是先清空 TP 相关 parallel_state 后再用新的 HCG 初始化。

else:
# Multiple processes - use hcg's mp group
hcg = fleet.get_hybrid_communicate_group()
parallel_state.initialize_model_parallel(hcg)

from paddlefleet.tensor_parallel.random import (
Expand Down
Loading