diff --git a/fastdeploy/config.py b/fastdeploy/config.py index ad02ba8d333..a1edc2ba670 100644 --- a/fastdeploy/config.py +++ b/fastdeploy/config.py @@ -60,7 +60,7 @@ _ResolvedTask = Literal["generate", "encode", "embed"] # Model implementation backend options -ModelImpl = Literal["auto", "fastdeploy", "paddleformers"] +ModelImpl = Literal["auto", "fastdeploy", "paddleformers", "paddlefleet"] _RUNNER_CONVERTS: dict[RunnerType, list[ConvertType]] = { "generate": [], diff --git a/fastdeploy/engine/args_utils.py b/fastdeploy/engine/args_utils.py index 88381f1413c..13678ceff62 100644 --- a/fastdeploy/engine/args_utils.py +++ b/fastdeploy/engine/args_utils.py @@ -139,6 +139,7 @@ class EngineArgs: 'auto': Use native FastDeploy implementation when available, fallback to PaddleFormers. 'fastdeploy': Use only native FastDeploy implementations. 'paddleformers': Use PaddleFormers backend with FastDeploy optimizations. + 'paddlefleet': Use PaddleFleet backend. """ override_pooler_config: Optional[Union[dict, PoolerConfig]] = None """ @@ -641,7 +642,7 @@ def __post_init__(self): "kvcache_storage_backend is only supported when ENABLE_V1_KVCACHE_SCHEDULER=1" ) - valid_model_impls = ["auto", "fastdeploy", "paddleformers"] + valid_model_impls = ["auto", "fastdeploy", "paddleformers", "paddlefleet"] if self.model_impl not in valid_model_impls: raise NotImplementedError( f"not support model_impl: '{self.model_impl}'. " f"Must be one of: {', '.join(valid_model_impls)}" @@ -979,13 +980,14 @@ def add_cli_args(parser: FlexibleArgumentParser) -> FlexibleArgumentParser: model_group.add_argument( "--model-impl", type=str, - choices=["auto", "fastdeploy", "paddleformers"], + choices=["auto", "fastdeploy", "paddleformers", "paddlefleet"], default=EngineArgs.model_impl, help=( "Model implementation backend. " "'auto': Use native FastDeploy when available, fallback to PaddleFormers. " "'fastdeploy': Use only native FastDeploy implementations. " "'paddleformers': Use PaddleFormers backend with FastDeploy optimizations." + "'paddlefleet': Use PaddleFleet backend." ), ) diff --git a/fastdeploy/model_executor/graph_optimization/decorator.py b/fastdeploy/model_executor/graph_optimization/decorator.py index 05ec79a495c..3d8d72807d5 100644 --- a/fastdeploy/model_executor/graph_optimization/decorator.py +++ b/fastdeploy/model_executor/graph_optimization/decorator.py @@ -60,12 +60,12 @@ def __init__(self, fd_config: FDConfig, **kwargs): # Not use graph optimization return - def __call__(self, **kwargs): + def __call__(self, *args, **kwargs): """Decorator model.__call__() func""" if not self.use_graph_opt: - return self.forward(**kwargs) + return self.forward(*args, **kwargs) - return self.graph_opt_backend(**kwargs) + return self.graph_opt_backend(*args, **kwargs) cls.__init__ = __init__ cls.__call__ = __call__ diff --git a/fastdeploy/model_executor/models/__init__.py b/fastdeploy/model_executor/models/__init__.py index 51a46a176ea..831ab8893c8 100644 --- a/fastdeploy/model_executor/models/__init__.py +++ b/fastdeploy/model_executor/models/__init__.py @@ -19,7 +19,7 @@ import os from pathlib import Path -from paddleformers.transformers import PretrainedModel +from paddleformers.transformers.model_utils import PretrainedModel from fastdeploy.plugins.model_register import load_model_register_plugins diff --git a/fastdeploy/model_executor/models/model_base.py b/fastdeploy/model_executor/models/model_base.py index d55c88947e7..9e51a1e6bf7 100644 --- a/fastdeploy/model_executor/models/model_base.py +++ b/fastdeploy/model_executor/models/model_base.py @@ -194,6 +194,19 @@ def _try_resolve_paddleformers( elif model_impl == "auto" and is_fallback: # Auto mode fallback when no native implementation exists backend_arch = "PaddleFormersForCausalLM" + elif model_impl == "paddlefleet": + from fastdeploy.model_executor.utils import is_paddlefleet_available + + if is_paddlefleet_available(): + backend_arch = "PaddleFleetForCausalLM" + else: + raise ImportError( + "paddlefleet backend requires paddlefleet to be installed.\n" + "Please install with [change cuda version if needed ]:\n" + "python -m pip install paddlefleet==0.3.0.dev20260527 " + "--extra-index-url https://www.paddlepaddle.org.cn/packages/stable/cu126/ " + "--extra-index-url https://www.paddlepaddle.org.cn/packages/nightly/cu126/" + ) elif model_impl == "fastdeploy": return None else: diff --git a/fastdeploy/model_executor/models/paddleformers/__init__.py b/fastdeploy/model_executor/models/paddleformers/__init__.py index 77174269389..315b72b8775 100644 --- a/fastdeploy/model_executor/models/paddleformers/__init__.py +++ b/fastdeploy/model_executor/models/paddleformers/__init__.py @@ -19,6 +19,7 @@ ModelForCasualLM, ModelRegistry, ) +from fastdeploy.model_executor.utils import is_paddlefleet_available from .base import PaddleFormersModelBase from .causallm import CausalLMMixin @@ -38,3 +39,19 @@ class PaddleFormersForCausalLM(CausalLMMixin, PaddleFormersModelBase, ModelForCa @classmethod def name(cls): return "PaddleFormersForCausalLM" + + +if is_paddlefleet_available(): + from .base_fleet import PaddleFleetModelBase + + __all__ += ["PaddleFleetForCausalLM"] + + @ModelRegistry.register_model_class( + architecture="PaddleFleetForCausalLM", + module_name="paddleformers", + category=ModelCategory.TEXT_GENERATION, + ) + class PaddleFleetForCausalLM(PaddleFleetModelBase, ModelForCasualLM): + @classmethod + def name(cls): + return "PaddleFleetForCausalLM" diff --git a/fastdeploy/model_executor/models/paddleformers/base_fleet.py b/fastdeploy/model_executor/models/paddleformers/base_fleet.py new file mode 100644 index 00000000000..1f57aebb75d --- /dev/null +++ b/fastdeploy/model_executor/models/paddleformers/base_fleet.py @@ -0,0 +1,741 @@ +""" +# Copyright (c) 2025 PaddlePaddle Authors. All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +""" + +"""Generic PaddleFormers modeling backend base class.""" + +import logging + +from fastdeploy.model_executor.utils import is_paddlefleet_available + +if not is_paddlefleet_available(): + logging.warning("paddlefleet is not installed, skipping base_fleet module") +else: + import math + from collections.abc import Iterable + from typing import TYPE_CHECKING, Dict + + import paddle + from paddle import nn + from paddlefleet.models.gpt.gpt_embedding import GPTEmbedding + from paddlefleet.models.gpt.lm_head import GPTLMHead + from paddlefleet.transformer.layer import FleetLayer + from paddlefleet.transformer.transformer_config import TransformerConfig + from paddleformers.transformers import AutoConfig + from paddleformers.transformers.auto.modeling import AutoModelForCausalLM + from paddleformers.utils.log import logger + + from fastdeploy.model_executor.forward_meta import ForwardMeta # noqa: F401 + from fastdeploy.model_executor.graph_optimization.decorator import ( + support_graph_optimization, + ) + + if TYPE_CHECKING: + from fastdeploy.config import FDConfig + + from fastdeploy.model_executor.layers.attention.attention import Attention + + class FastDeployAttention(FleetLayer): + """ + FastDeploy version of DotProductAttention, holding an internal FastDeploy Attention module. + + This class can be used to replace PaddleFleet's DotProductAttention, + using FastDeploy's attention backend for computation. + """ + + def __init__( + self, + config: TransformerConfig, + fd_attention: Attention, + num_attention_heads: int, + num_key_value_heads: int, + softmax_scale: float, + hidden_size_per_attention_head: int, + hidden_size_per_partition: int, + layer_id: int, + ): + """ + Initialize FastDeployAttention. + + Args: + fd_attention: FastDeploy Attention instance + num_attention_heads: Number of attention heads + num_key_value_heads: Number of KV heads + softmax_scale: Softmax scaling factor + hidden_size_per_attention_head: Hidden dimension per attention head + hidden_size_per_partition: Hidden size per partition + layer_id: Current layer ID + """ + super().__init__(config) + self.fd_attention = fd_attention + self.num_attention_heads = num_attention_heads + self.num_key_value_heads = num_key_value_heads + self.softmax_scale = softmax_scale + self.hidden_size_per_attention_head = hidden_size_per_attention_head + self.hidden_size_per_partition = hidden_size_per_partition + self.layer_id = layer_id + + def forward( + self, + query: paddle.Tensor, + key: paddle.Tensor, + value: paddle.Tensor, + attention_mask: paddle.Tensor, + attn_mask_startend_row_indices: paddle.Tensor = None, + attn_mask_type=None, + attention_bias: paddle.Tensor = None, + packed_seq_params=None, + use_rr_flash_attention: bool = False, + past_key_values=None, + layer_idx=None, + use_cache=False, + x: paddle.Tensor = None, + qr: paddle.Tensor = None, + kv_compressed: paddle.Tensor = None, + k_pos_emb: paddle.Tensor = None, + q_absorbed: paddle.Tensor = None, + v_b_proj_weight: paddle.Tensor = None, + ): + """ + Forward pass. + + Args: + query: Query tensor, supported formats: + - 4D BSHD: [b, sq, np, hn] (PaddleFleet default) + - 3D HSD: [np, sq, hn] + - 3D SHD: [sq, np, hn] + key: Key tensor, same format as above, but head count may differ (GQA) + value: Value tensor, same format as above + attention_mask: Attention mask + attn_mask_startend_row_indices: FlashMask start-end row indices + attn_mask_type: Attention mask type + attention_bias: Attention bias + packed_seq_params: Packed sequence parameters + use_rr_flash_attention: Whether to use RR Flash Attention + kv_compressed: Compressed KV tensor for MLA (Multi-Latent Attention) + + Returns: + Attention output tensor + """ + # Try to get forward_meta from config (PaddleFleet does not pass this parameter when calling) + forward_meta = getattr(self.config, "forward_meta", None) + assert forward_meta is not None, "forward_meta must be provided" + + # Set scaling factor + original_scale = getattr(self.fd_attention, "scale", None) + if original_scale is None: + self.fd_attention.scale = self.softmax_scale + + # Check if MLA mode is enabled + is_mla = getattr(self.config, "multi_latent_attention", False) + + try: + # Refer to the processing logic of fastdeploy_append_attention_forward + # Support 3D (SHD) and 4D (BSHD) input + + # 4D input: squeeze to 3D (only supports batch=1) + def squeeze_to_3d(t: paddle.Tensor, name: str) -> paddle.Tensor: + if t is None: + return None + if t.ndim == 4: + if int(t.shape[0]) != 1: + raise ValueError( + f"{name} batch size {int(t.shape[0])} not supported, only batch=1 is supported" + ) + return t.squeeze(0) + if t.ndim == 3: + return t + raise ValueError(f"{name} has unexpected dims {t.ndim}, expect 3 or 4") + + q = squeeze_to_3d(query, "query") + k = squeeze_to_3d(key, "key") + v = squeeze_to_3d(value, "value") + + if is_mla: + need_do_prefill = forward_meta.max_len_tensor_cpu[1] > 0 + need_do_decode = forward_meta.max_len_tensor_cpu[2] > 0 + + # MLA mode: pass q, k, v, compressed_kv, k_pe separately + # Reference: deepseek_v3.py line 389 + # + # Note: + # - Prefill (flash_attn_func): expects 3D tensors [seq, heads, dim] + # - Decode (multi_head_latent_attention): expects 2D tensors [seq, heads*dim] + # So we need to flatten q for decode phase only + + # Process compressed_kv and k_pe + + assert kv_compressed is not None, "kv_compressed must be provided when use" + compressed_kv = kv_compressed.squeeze(0) + k_pos_emb = k_pos_emb.squeeze(0) + + output = None + fmqa_out = None + if need_do_prefill: + # Prefill: keep 3D tensors for flash_attn_func + output = self.fd_attention.forward( + q=q, + k=k, + v=v, + qkv=None, + compressed_kv=compressed_kv, + k_pe=k_pos_emb, + forward_meta=forward_meta, + ) + output.reshape_([output.shape[0], output.shape[1] * output.shape[2]]) + + if need_do_decode: + # Decode: use absorbed q for multi_head_latent_attention C++ kernel + # q_absorbed: [s, heads, kv_lora_rank + qk_rope_head_dim] (after squeeze_to_3d) + # C++ kernel expects: [token_num, heads * (kv_lora_rank + qk_rope_head_dim)] + q_abs = squeeze_to_3d(q_absorbed, "q_absorbed") if q_absorbed.ndim == 4 else q_absorbed + seq_len = int(q_abs.shape[0]) + q_input = q_abs.reshape([seq_len, -1]) + + fmqa_out = self.fd_attention.forward( + q=q_input, + k=None, + v=None, + qkv=None, + compressed_kv=compressed_kv, + k_pe=k_pos_emb, + forward_meta=forward_meta, + ) + + # V de-absorption: kernel output [token, heads * kv_lora_rank] + # -> [heads, token, kv_lora_rank] @ wv_b [heads, kv_lora_rank, v_head_dim] + # -> [token, heads * v_head_dim] + kv_lora_rank = self.config.kv_lora_rank + v_head_dim = self.config.v_head_dim + num_heads = fmqa_out.shape[-1] // kv_lora_rank + fmqa_out = fmqa_out.reshape([-1, num_heads, kv_lora_rank]).transpose([1, 0, 2]) + fmqa_out = paddle.bmm(fmqa_out, v_b_proj_weight) + fmqa_out = fmqa_out.transpose([1, 0, 2]).reshape([-1, num_heads * v_head_dim]) + # Merge prefill and decode outputs if both are present + if need_do_prefill: + try: + from fastdeploy.model_executor.ops.gpu import ( + merge_prefill_decode_output, + ) + + merge_prefill_decode_output( + output, + fmqa_out, + forward_meta.seq_lens_encoder, + forward_meta.seq_lens_decoder, + forward_meta.seq_lens_this_time, + forward_meta.cu_seqlens_q, + num_heads, + v_head_dim, + 1, + ) + except (ImportError, AttributeError): + logger.warning("merge_prefill_decode_output not available, using decode output only") + output = fmqa_out + else: + output = fmqa_out + else: + # Standard mode: concatenate QKV + seq_len = int(q.shape[0]) + + # SHD: [seq, heads, dim] -> flatten to [seq, heads*dim] + q_flat = q.reshape([seq_len, -1]) + k_flat = k.reshape([seq_len, -1]) + v_flat = v.reshape([seq_len, -1]) + + # Concatenate QKV: [seq, (q_heads + kv_heads + kv_heads) * head_dim] + qkv = paddle.concat([q_flat, k_flat, v_flat], axis=-1) + + output = self.fd_attention.forward(qkv=qkv, forward_meta=forward_meta) + + # Restore batch dimension: [seq, hidden] -> [b, seq, hidden] + # PaddleFleet expects 3D output format + output = output.unsqueeze(0) + + return output + finally: + # Restore original scale + if original_scale is None: + if hasattr(self.fd_attention, "scale"): + delattr(self.fd_attention, "scale") + else: + self.fd_attention.scale = original_scale + + @support_graph_optimization + class PaddleFleetModelBase(nn.Layer): + """ + A mixin-style base class to provide PaddleFormers backend logic on top of nn.Layer. + This class subclasses nn.Layer and provides common methods to + initialize and manage a PaddleFormers model. + """ + + def __init__(self, fd_config: "FDConfig", **kwargs): + super().__init__(fd_config) + logger.info("Initializing PaddleFormers backend.") + self.fd_config = fd_config # FastDeploy's top-level FDConfig + self.model_config = fd_config.model_config # FastDeploy's ModelConfig + self.paddleformers_config = AutoConfig.from_pretrained(self.model_config.model) + + # Assign parallel config from fd_config.parallel_config to paddleformers_config + parallel_config = fd_config.parallel_config + # parallel_config.tensor_parallel_size = 1 + # parallel_config.expert_parallel_size = 2 + self.paddleformers_config.data_parallel_size = parallel_config.data_parallel_size + self.paddleformers_config.tensor_model_parallel_size = parallel_config.tensor_parallel_size + self.paddleformers_config.sequence_parallel = parallel_config.sequence_parallel + self.paddleformers_config.expert_model_parallel_size = parallel_config.expert_parallel_size + # if parallel_config.expert_parallel_size > 1 and parallel_config.sequence_parallel == False: + # self.paddleformers_config.tensor_model_parallel_size = 1 + # logger.warning("When using expert parallelism and tensor parallelism, sequence parallelism must be used in fleet set tp=1 .") + self.paddleformers_config.parallel_output = self.paddleformers_config.tensor_model_parallel_size == 1 + self.paddleformers_config.max_seq_len = self.model_config.max_model_len + self.paddleformers_config.params_dtype = self.model_config.dtype or "bfloat16" + # self.paddleformers_config.moe_grouped_gemm = True + self.paddleformers_config.moe_token_dispatcher_type = "deepep" + # self.paddleformers_config.use_cpu_initialization = True + self.paddleformers_config.gated_attention = getattr(self.paddleformers_config, "use_gated_attn", False) + if self.paddleformers_config.multi_latent_attention: + self.paddleformers_config.qk_head_dim = ( + self.paddleformers_config.qk_rope_head_dim + self.paddleformers_config.qk_nope_head_dim + ) + # Initialize PaddleFleet parallel_state so that its TP group is consistent with FastDeploy. + # PaddleFleet's ColumnParallelLinear/RowParallelLinear obtains TP world_size/rank + # via parallel_state. Without initialization, it defaults to 1, causing weights + # to not be TP-sharded, which mismatches FastDeploy's KV cache (allocated per TP). + # if parallel_config.tensor_parallel_size > 1: + self._init_paddlefleet_parallel_state(fd_config) + + # The specific text model config + # Sync important config values from text_config to model_config + # This ensures fallback models use their actual config values instead of FD defaults + self._sync_config_from_text_config() + # For convenience, keep direct access to some FD configs + self.quant_config = self.fd_config.quant_config + + # Load model using from_pretrained to support weight loading + # Pass dtype, config and other options from kwargs + + model_load_kwargs = { + "dtype": self.model_config.dtype, + "config": self.paddleformers_config, + "convert_from_hf": True, + "load_via_cpu": True, + "load_checkpoint_format": "flex_checkpoint", + } + # Set random seed before model construction for reproducibility + self.model = AutoModelForCausalLM.from_pretrained( + self.model_config.model, + **model_load_kwargs, + ) + + self.model.eval() + # Patch PaddleFleet core_attention with FastDeploy attention + patched_count = patch_paddlefleet_core_attention( + model=self.model, + fd_config=self.fd_config, + ) + logger.info(f"Patched {patched_count} attention layers with FastDeploy") + + def compute_logits(self, hidden_state, forward_meta=None): + """Compute logits from hidden states using lm_head.""" + lm_head = self.model.get_lm_head() + # ColumnParallelLinear expects input [s, b, h] + hidden_state = hidden_state.unsqueeze(1) # [num_tokens, h] -> [num_tokens, 1, h] + logits = lm_head({"hidden_states": hidden_state}) + # Output [num_tokens, 1, vocab], squeeze back to [num_tokens, vocab] + if logits.ndim == 3: + logits = logits.squeeze(1) + logits = logits.astype(paddle.float32) + logits[:, self.model_config.ori_vocab_size :] = -float("inf") + return logits + + def _init_paddlefleet_parallel_state(self, fd_config) -> None: + """ + Initialize PaddleFleet's parallel_state so that ColumnParallelLinear/RowParallelLinear + can correctly obtain TP world_size and rank, and thus correctly shard weights + and build sharded_state_dict. + + References the initialization logic in PaddleFormers' training_args.py, + using the official initialize_fleet API instead of directly manipulating + parallel_state internal variables. + """ + from paddle.distributed import fleet + + parallel_config = fd_config.parallel_config + + strategy = fleet.DistributedStrategy() + strategy.hybrid_configs = { + "dp_degree": 1, + "mp_degree": parallel_config.tensor_parallel_size, + "pp_degree": 1, + "sep_degree": 1, + "sharding_degree": parallel_config.data_parallel_size, + "ep_degree": parallel_config.expert_parallel_size, + "cp_degree": 1, + "moe_sharding_degree": 1, + "order": [ + "pp", + "moe_sharding", + "ep", + "dp", + "sharding", + "sep", + "cp", + "mp", + ], + } + fleet.init(is_collective=True, strategy=strategy) + logger.info( + f"Initialized PaddleFleet parallel_state via initialize_fleet " + f"(sharddp={parallel_config.data_parallel_size}, " + f"mp={parallel_config.tensor_parallel_size}, " + f"ep={parallel_config.expert_parallel_size}, " + f"sp={parallel_config.sequence_parallel})" + ) + + import paddle.distributed as dist + from paddlefleet import parallel_state + + hcg = fleet.get_hybrid_communicate_group() + expected_tp_size = parallel_config.tensor_parallel_size + + # Check if we need to initialize or reinitialize TP group + need_init = False + if parallel_state._TENSOR_MODEL_PARALLEL_GROUP is None: + need_init = True + reason = "TP group not initialized" + else: + # Check if current TP group size matches expected + current_tp_group = parallel_state._TENSOR_MODEL_PARALLEL_GROUP + current_tp_size = getattr(current_tp_group, "nranks", None) + if current_tp_size is None: + current_tp_size = getattr(current_tp_group, "world_size", None) + if current_tp_size != expected_tp_size: + need_init = True + reason = f"TP group size mismatch: current={current_tp_size}, expected={expected_tp_size}" + + if need_init: + logger.warning(f"{reason}, reinitializing TP group with size={expected_tp_size}") + if expected_tp_size == 1: + # Single process TP group - create manually + current_rank = dist.get_rank() + tp_ranks = [current_rank] + default_pg = dist.new_group(ranks=tp_ranks) + parallel_state._TENSOR_MODEL_PARALLEL_GROUP = default_pg + parallel_state._TENSOR_MODEL_PARALLEL_GLOBAL_RANKS = tp_ranks + logger.info(f"Reinitialized TP group with size=1, rank={current_rank}, ranks={tp_ranks}") + else: + # Multiple processes - use hcg's mp group + parallel_state.initialize_model_parallel(hcg) + + from paddlefleet.tensor_parallel.random import ( + model_parallel_cuda_manual_seed, + ) + + try: + model_parallel_cuda_manual_seed(seed=42) + except AssertionError: + pass + + def _sync_config_from_text_config(self) -> None: + """ + Sync important config values from text_config (PaddleFormers/HF config) + to model_config. This ensures fallback models use their actual config + values instead of FD's defaults. + + This is crucial for models with unique configs like: + - Gemma3: tie_word_embeddings=True, layer_types, sliding_window + - Mistral: sliding_window + - etc. + """ + mc = self.model_config + tc = self.paddleformers_config + + sync_fields = [ + "tie_word_embeddings", + "sliding_window", + "sliding_window_pattern", + "layer_types", # May be computed as property + "rope_theta", + "rope_scaling", + "head_dim", + "v_head_dim", # For MLA (Multi-Latent Attention) support + "qk_head_dim", + "rms_norm_eps", + "rope_local_base_freq", # Gemma3 specific + "query_pre_attn_scalar", # Gemma3 specific + ] + + synced = [] + for field in sync_fields: + text_value = getattr(tc, field, None) + if text_value is not None: + # Only sync if not already set or if FD default differs + current_value = getattr(mc, field, None) if hasattr(mc, field) else None + if current_value is None or current_value != text_value: + setattr(mc, field, text_value) + synced.append(f"{field}={text_value}") + + def embed_input_ids(self, input_ids: paddle.Tensor) -> paddle.Tensor: + """Embed input_ids using the model's embedding layer.""" + embedding_layer = self.model.get_input_embeddings() + + original_ndim = input_ids.ndim + if input_ids.ndim == 1: + input_ids = input_ids.unsqueeze(0) # [num_tokens] -> [1, num_tokens] + + inputs_embeds = embedding_layer(input_ids) + + # Embedding output is [batch, seq, h], squeeze back to [num_tokens, h] + if original_ndim == 1 and inputs_embeds.ndim == 3: + inputs_embeds = inputs_embeds.squeeze(0) + + if hasattr(self, "embed_scale") and self.embed_scale is not None: + inputs_embeds *= self.embed_scale + return inputs_embeds + + @paddle.no_grad() + def forward( + self, + inputs: Dict, + forward_meta: ForwardMeta, + **kwargs, + ): + """Full transformer forward: input_ids -> hidden_states. + + This method is the primary forward pass for the model, computing: + 1. Position IDs based on seq_lens_decoder (absolute positions for RoPE) + 2. Token embeddings via embed_input_ids + 3. Transformer layers via self.model() + + Returns: + hidden_states: [TotalTokens, HiddenDim] + """ + # Handle empty batch case (e.g., DP worker with no data in EP mode) + if getattr(forward_meta, "is_zero_size", False) or inputs["ids_remove_padding"].shape[0] == 0: + # Return zero tensor with correct shape: [0, hidden_size] + hidden_size = self.model_config.hidden_size + dtype = self.model_config.dtype + return paddle.empty([0, hidden_size], dtype=dtype) + + ids_remove_padding = inputs["ids_remove_padding"] + num_tokens = ids_remove_padding.shape[0] + batch_id_per_token = forward_meta.batch_id_per_token # [num_tokens] + seq_lens_decoder = forward_meta.seq_lens_decoder # [batch_size, 1] + + if batch_id_per_token is not None and seq_lens_decoder is not None: + decoder_offsets = seq_lens_decoder.squeeze(-1) # [batch_size] + # Ensure decoder_offsets is at least 1D tensor + if decoder_offsets.ndim == 0: + decoder_offsets = decoder_offsets.reshape([1]) + token_decoder_offsets = paddle.index_select( + decoder_offsets, batch_id_per_token, axis=0 + ) # [num_tokens] + + cu_seqlens = forward_meta.cu_seqlens_q # [batch_size + 1] + if cu_seqlens is not None: + token_global_idx = paddle.arange(num_tokens, dtype="int64") + request_start_idx = paddle.index_select(cu_seqlens[:-1], batch_id_per_token, axis=0) + relative_positions = token_global_idx - request_start_idx.astype("int64") + else: + relative_positions = paddle.zeros([num_tokens], dtype="int64") + position_ids = token_decoder_offsets.astype("int64") + relative_positions + else: + position_ids = paddle.arange(num_tokens, dtype="int64") + if seq_lens_decoder is not None: + position_ids = position_ids + seq_lens_decoder[0, 0].astype("int64") + forward_meta.rope_already_applied = True + # Also set forward_meta on each TransformerLayer's config + # so that FastDeployAttention can retrieve it from core_attn.config + if hasattr(self.model, "run_function"): + for layer in self.model.run_function: + if not isinstance(layer, (GPTEmbedding, GPTLMHead)): + if hasattr(layer, "self_attn") and hasattr(layer.self_attn, "core_attention"): + core_attn = layer.self_attn.core_attention + if hasattr(core_attn, "config"): + core_attn.config.forward_meta = forward_meta + + inputs_embeds = self.embed_input_ids(ids_remove_padding).unsqueeze(0) + + # Build input dict, PipelineLayer passes data between layers via dict + model_input = { + "input_ids": None, + "position_ids": position_ids, + } + # Add other parameters from kwargs + for k, v in kwargs.items(): + if v is not None: + model_input[k] = v + + # Iterate over run_function, skip GPTLMHead + # Only call TransformerLayer + i = -1 + for layer in self.model.run_function: + if isinstance(layer, GPTLMHead): + continue + if isinstance(layer, (GPTEmbedding)): + model_input = layer(model_input, decoder_input=inputs_embeds) + else: + model_input = layer(model_input) + i += 1 + hidden_states = model_input["hidden_states"] + # [b, s, h] -> [s, h] (b=1) + hidden_states = hidden_states.squeeze(0) + + return hidden_states + + @paddle.no_grad() + def load_weights(self, weights: Iterable[tuple[str, paddle.Tensor]]): + # use model.from_pretrained to load weight + logger.debug("load_weights called but skipped: weights already loaded via from_pretrained") + pass + + def set_state_dict(self, state_dict): + self.model.set_state_dict(state_dict) + + # ============================================================================ + # PaddleFleet Attention Patch Functions + # ============================================================================ + + def patch_paddlefleet_core_attention( + model, + fd_config: "FDConfig", + layers_to_patch: list[int] | None = None, + ): + """ + Replace core_attention in all TransformerLayers of a PaddleFleet model with FastDeployAttention. + + Args: + model: PaddleFleet model instance (inheriting from PipelineLayer) + fd_config: FastDeploy FDConfig object, used to create Attention instances + layers_to_patch: List of layer indices to patch, None means patch all layers + + Returns: + int: Number of layers successfully patched + + Raises: + ValueError: If the model structure is unexpected or parameters are incorrect + """ + if fd_config is None: + raise ValueError("fd_config must be provided") + + from fastdeploy.model_executor.layers.attention.attention import Attention + + # Iterate over run_function to find TransformerLayers + patched_count = 0 + transformer_layers = [] + + # Collect all TransformerLayers + if hasattr(model, "run_function"): + for layer in model.run_function: + # Try to identify TransformerLayer + layer_type = type(layer).__name__ + if "TransformerLayer" in layer_type or "transformer" in str(type(layer)): + transformer_layers.append(layer) + + if not transformer_layers: + # Try alternative ways to find layers + for name, module in model.named_sublayers(): + if "TransformerLayer" in type(module).__name__: + transformer_layers.append(module) + + if not transformer_layers: + raise ValueError("No TransformerLayer found in model") + + # Patch core_attention for each TransformerLayer + for layer in transformer_layers: + layer_number = getattr(layer, "layer_number", None) + if layer_number is None: + # Try to get from other attributes + layer_number = getattr(layer, "layer_id", None) + + if layer_number is None: + logger.warning("layer_number not found, skip patching...") + continue # Skip layers where layer_id cannot be obtained + + # Check if this layer needs to be patched + if layers_to_patch is not None and (layer_number) not in layers_to_patch: + continue + + # Get core_attention + if not hasattr(layer, "self_attn"): + logger.warning(f"self_attn not found in layer {layer_number}, skip patching...") + continue + + core_attn = layer.self_attn.core_attention + if core_attn is None: + logger.warning(f"core_attn not found in layer {layer_number}, skip patching...") + continue + + # Get configuration info + # Prefer per-partition values (values after TP sharding), + # because PaddleFleet's QKV output is already per-partition when TP>1 + num_attention_heads = getattr( + core_attn, "num_attention_heads_per_partition", getattr(core_attn.config, "num_attention_heads", None) + ) + num_key_value_heads = getattr( + core_attn, + "num_query_groups_per_partition", + getattr(core_attn.config, "num_key_value_heads", num_attention_heads), + ) + hidden_size_per_attention_head = getattr(core_attn, "hidden_size_per_attention_head", None) + if hidden_size_per_attention_head is not None: + softmax_scale = getattr(core_attn, "softmax_scale", 1.0 / math.sqrt(hidden_size_per_attention_head)) + else: + softmax_scale = 1.0 + + hidden_size_per_partition = getattr(core_attn, "hidden_size_per_partition", None) + if hidden_size_per_partition is None: + head_dim = getattr(core_attn, "hidden_size_per_attention_head", hidden_size_per_attention_head) + hidden_size_per_partition = num_attention_heads * head_dim + + fd_layer_id = layer_number + + # Create Attention instance inside FastDeployAttention + fd_attn_instance = Attention( + fd_config=fd_config, + layer_id=fd_layer_id, + ) + + # Override Attention instance's head config to match PaddleFleet model + # This is necessary because fd_config.model_config may differ from PaddleFleet model config + fd_attn_instance.num_heads = num_attention_heads + fd_attn_instance.kv_num_heads = num_key_value_heads + fd_attn_instance.head_dim = hidden_size_per_attention_head + logger.info( + f"Overriding Attention config: num_heads={num_attention_heads}, kv_num_heads={num_key_value_heads}, head_dim={hidden_size_per_attention_head}" + ) + + # Create FastDeployAttention object and directly replace core_attention + fast_deploy_core_attn = FastDeployAttention( + config=core_attn.config, + fd_attention=fd_attn_instance, + num_attention_heads=num_attention_heads, + num_key_value_heads=num_key_value_heads, + softmax_scale=softmax_scale, + hidden_size_per_attention_head=hidden_size_per_attention_head, + hidden_size_per_partition=hidden_size_per_partition, + layer_id=fd_layer_id, + ) + + # Replace core_attention object + layer.self_attn.core_attention = fast_deploy_core_attn + + patched_count += 1 + logger.info(f"Replaced core_attention with FastDeployAttention for layer {fd_layer_id}") + + logger.info(f"Successfully replaced {patched_count} core_attention layers with FastDeployAttention") + + return patched_count diff --git a/fastdeploy/model_executor/utils.py b/fastdeploy/model_executor/utils.py index c34b697d785..bd115bdf46a 100644 --- a/fastdeploy/model_executor/utils.py +++ b/fastdeploy/model_executor/utils.py @@ -559,6 +559,10 @@ def has_flashinfer(): return importlib.util.find_spec("flashinfer") is not None +def is_paddlefleet_available(): + return importlib.util.find_spec("paddlefleet") is not None + + @cache def get_sm_version(): if paddle.cuda.is_available(): diff --git a/fastdeploy/worker/worker_process.py b/fastdeploy/worker/worker_process.py index 6096759825a..efd3000a5c1 100644 --- a/fastdeploy/worker/worker_process.py +++ b/fastdeploy/worker/worker_process.py @@ -1003,9 +1003,9 @@ def parse_args(): parser.add_argument( "--model-impl", type=str, - choices=["auto", "fastdeploy", "paddleformers"], + choices=["auto", "fastdeploy", "paddleformers", "paddlefleet"], default="auto", - help="Model implementation backend (auto, fastdeploy, paddleformers)", + help="Model implementation backend (auto, fastdeploy, paddleformers, paddlefleet)", ) parser.add_argument( diff --git a/scripts/coverage_run.sh b/scripts/coverage_run.sh index 1d44f72eb97..e1337e97bfd 100644 --- a/scripts/coverage_run.sh +++ b/scripts/coverage_run.sh @@ -11,9 +11,17 @@ export COVERAGE_RCFILE=${COVERAGE_RCFILE:-$DIR/../scripts/.coveragerc} # Classify tests into one of the following categories # - multi_gpu: requires multiple GPUs / ports (run sequentially) # - single_gpu: independent tests (can run in parallel) +# - isolated: tests that install dependencies, run last (isolated) # ============================================================ classify_tests() { local test_file=$1 + + # Rule 0: model_executor_fallback tests should run in isolation (last) + if [[ "$test_file" =~ tests/model_executor_fallback/.*\.py ]]; then + echo "isolated" + return + fi + # Rule 1: distributed tests (explicit multi-GPU launch) if [[ "$test_file" =~ tests/distributed/.*test_.*\.py ]]; then echo "multi_gpu" @@ -247,6 +255,7 @@ fi MULTI_GPU_TESTS=() SINGLE_GPU_TESTS=() +ISOLATED_TESTS=() TOTAL_TESTS=0 for file in $ALL_TEST_FILES; do @@ -260,11 +269,15 @@ for file in $ALL_TEST_FILES; do "single_gpu") SINGLE_GPU_TESTS+=("$file") ;; + "isolated") + ISOLATED_TESTS+=("$file") + ;; esac done echo "Multi-GPU tests: ${#MULTI_GPU_TESTS[@]}" echo "Single-GPU tests: ${#SINGLE_GPU_TESTS[@]}" +echo "Isolated tests: ${#ISOLATED_TESTS[@]}" echo "Total tests: $TOTAL_TESTS" # ============================================================ @@ -327,9 +340,24 @@ else fi # ============================================================ -# Step 4: Summary +# Step 4: Run isolated tests (last, to avoid dependency pollution) +# ============================================================ +echo "Step 4: Running isolated tests (tests with special dependencies)" + +if [ ${#ISOLATED_TESTS[@]} -gt 0 ]; then + echo "Isolated tests will run last to avoid dependency pollution." + for file in "${ISOLATED_TESTS[@]}"; do + echo "Running isolated test: $file" + run_test_with_logging "$file" "$failed_tests_file" + done +else + echo "No isolated tests to run." +fi + +# ============================================================ +# Step 5: Summary # ============================================================ -echo "Step 4: Summary" +echo "Step 5: Summary" # Count failed tests if [ -f "$failed_tests_file" ]; then @@ -369,7 +397,7 @@ if [ "$failed_count" -ne 0 ]; then # Only package logs when there are failures echo "====================================" - echo "Step 5: Packaging logs (only on failure)" + echo "Step 6: Packaging logs (only on failure)" echo "====================================" if [ -d "${run_path}/unittest_logs" ]; then diff --git a/tests/model_executor_fallback/conftest.py b/tests/model_executor_fallback/conftest.py new file mode 100644 index 00000000000..41c2b277241 --- /dev/null +++ b/tests/model_executor_fallback/conftest.py @@ -0,0 +1,180 @@ +"""Pytest configuration for model_executor tests. + +This conftest handles special dependencies required only by specific tests, +avoiding pollution of the global test environment. +""" + +import os +import shlex +import subprocess +import sys + +import pytest + + +def get_package_version(package_name): + """Get the version of an installed package. + + Args: + package_name: Name of the package + + Returns: + Version string or "not installed" if package is not found + """ + try: + import importlib.metadata + + version = importlib.metadata.version(package_name) + return version + except Exception: + try: + # Fallback for older Python versions + import pkg_resources + + version = pkg_resources.get_distribution(package_name).version + return version + except Exception: + return "not installed" + + +def print_package_versions(): + """Print versions of key packages (paddlepaddle, paddlefleet, paddleformers).""" + print("\n" + "=" * 70) + print("[conftest] Package Versions:") + print("=" * 70) + + packages = ["paddlepaddle-gpu", "paddlefleet", "paddleformers", "transformers"] + for pkg in packages: + version = get_package_version(pkg) + status = "✓" if version != "not installed" else "✗" + print(f"[conftest] {status} {pkg:20s}: {version}") + + print("=" * 70 + "\n") + + +def pytest_configure(config): + """Configure pytest before test collection.""" + # Register custom marker for paddlefleet tests + config.addinivalue_line("markers", "paddlefleet: tests that require paddlefleet and paddleformers dependencies") + + +def pytest_collection_modifyitems(config, items): + """Modify test collection to handle paddlefleet dependencies. + + This hook runs after test collection but before test execution. + It checks if any collected tests require paddlefleet dependencies + and installs them in an isolated manner if needed. + """ + # IMPORTANT: Skip installation during collection phase (--collect-only) + if config.option.collectonly: + print("[conftest] Skipping dependency installation during collection phase") + return + + # All tests in this directory require paddlefleet + if not items: + return + + # Check if dependencies are already installed with correct versions + try: + import paddlefleet # noqa: F401 + + print("\n" + "=" * 70) + print("[conftest] paddlefleet already installed, skipping installation") + print("=" * 70) + print_package_versions() + return + except ImportError: + pass + + # Print versions before installation + print("\n" + "=" * 70) + print("[conftest] Package versions BEFORE installing paddlefleet dependencies:") + print("=" * 70) + print_package_versions() + + # Install dependencies only when running paddlefleet tests + print("=" * 70) + print("[conftest] Installing paddlefleet-specific dependencies...") + print("=" * 70) + + try: + # Install paddleformers + paddleformers_url = os.getenv( + "PADDLEFORMERS_WHEEL_URL", + "paddleformers==1.1.0.dev20260507 --extra-index-url https://www.paddlepaddle.org.cn/packages/stable/cu126/ --extra-index-url https://www.paddlepaddle.org.cn/packages/nightly/cu126/", # fallback to PyPI name + ) + install_args = [sys.executable, "-m", "pip", "install"] + shlex.split(paddleformers_url) + ["--quiet"] + subprocess.check_call(install_args) + print(f"[conftest] ✓ Installed paddleformers 1.1.0.dev20250507 from {paddleformers_url}") + + # Install paddlefleet (skip paddlepaddle dependency, use existing version) + paddlefleet_url = os.getenv( + "PADDLEFLEET_WHEEL_URL", + "paddlefleet==0.3.0.dev20260527 --extra-index-url https://www.paddlepaddle.org.cn/packages/stable/cu126/ --extra-index-url https://www.paddlepaddle.org.cn/packages/nightly/cu126/", # fallback to PyPI name + ) + # Use --no-deps to avoid reinstalling paddlepaddle + install_args = ( + [sys.executable, "-m", "pip", "install"] + shlex.split(paddlefleet_url) + ["--no-deps", "--quiet"] + ) + subprocess.check_call(install_args) + print(f"[conftest] ✓ Installed paddlefleet 0.3.0.dev20260527 (--no-deps) from {paddlefleet_url}") + + # Install paddlefleet_ops + paddlefleet_ops_url = os.getenv( + "PADDLEFLEET_OPS_WHEEL_URL", + "paddlefleet_ops==0.3.0.dev20260520+2702ba51 --extra-index-url https://www.paddlepaddle.org.cn/packages/stable/cu126/ --extra-index-url https://www.paddlepaddle.org.cn/packages/nightly/cu126/", + ) + install_args = [sys.executable, "-m", "pip", "install"] + shlex.split(paddlefleet_ops_url) + ["--quiet"] + subprocess.check_call(install_args) + print(f"[conftest] ✓ Installed paddlefleet_ops 0.3.0.dev20260520+2702ba51 from {paddlefleet_ops_url}") + print("[conftest] ℹ Using existing paddlepaddle from environment") + + # Clear module cache to ensure fresh imports after version change + # This is critical when transformers version changes during pytest session + import importlib + + keys_to_clear = [ + k for k in sys.modules.keys() if "huggingface_hub" in k or "transformers" in k or "paddleformers" in k + ] + for key in keys_to_clear: + del sys.modules[key] + importlib.invalidate_caches() + if keys_to_clear: + print(f"[conftest] ✓ Cleared {len(keys_to_clear)} cached modules (transformers/paddleformers)") + + # Print versions after installation + print("\n" + "=" * 70) + print("[conftest] Package versions AFTER installing paddlefleet dependencies:") + print("=" * 70) + print_package_versions() + + except subprocess.CalledProcessError as e: + print(f"[conftest] ✗ Failed to install dependencies: {e}") + print("[conftest] Tests requiring paddlefleet will be skipped") + + # Mark all paddlefleet tests to skip + skip_marker = pytest.mark.skip(reason="Failed to install paddlefleet dependencies") + for item in items: + if "test_fallback_fleet_model.py" in item.nodeid: + item.add_marker(skip_marker) + + print("=" * 70 + "\n") + + +def pytest_sessionfinish(session, exitstatus): + """Optional: cleanup after test session if needed. + + You can uninstall the dependencies here to keep the environment clean, + but this may slow down subsequent test runs. + """ + # Uncomment the following to auto-cleanup after tests + # if os.getenv("CLEANUP_PADDLEFLEET_DEPS", "false").lower() == "true": + # try: + # subprocess.check_call([ + # sys.executable, "-m", "pip", "uninstall", + # "paddlefleet", "paddleformers", "-y", "--quiet" + # ]) + # print("[conftest] Cleaned up paddlefleet dependencies") + # except Exception: + # pass + pass diff --git a/tests/model_executor_fallback/test_fallback_fleet_model.py b/tests/model_executor_fallback/test_fallback_fleet_model.py new file mode 100644 index 00000000000..28d126c4ea1 --- /dev/null +++ b/tests/model_executor_fallback/test_fallback_fleet_model.py @@ -0,0 +1,105 @@ +# Copyright (c) 2026 PaddlePaddle Authors. All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +"""Unit tests for LLM inference with real model results validation.""" + +import gc +import os + +import pytest + +from fastdeploy import LLM, SamplingParams + +DEFAULT_MODEL_DIR = "./models" +MODEL_NAME = "Qwen3-0.6B-Base" + +model_dir = os.getenv("MODEL_PATH", DEFAULT_MODEL_DIR) +MODEL_PATH = os.path.join(model_dir, MODEL_NAME) + + +@pytest.mark.gpu +class TestLLMInferenceRealModel: + """Test LLM inference with real model results validation.""" + + @classmethod + def setup_class(cls): + """Setup LLM instance once for all tests in this class.""" + cls.llm = LLM( + model=MODEL_PATH, + model_impl="paddlefleet", + max_model_len=32768, + tensor_parallel_size=1, + data_parallel_size=1, + enable_expert_parallel=True, + graph_optimization_config={"use_cudagraph": False}, + ) + + @classmethod + def teardown_class(cls): + """Cleanup LLM instance after all tests.""" + if hasattr(cls, "llm"): + del cls.llm + gc.collect() + + @pytest.fixture + def sampling_params(self): + """Provide sampling parameters for generation.""" + return SamplingParams(max_tokens=64, temperature=0.1) + + def test_generate_with_text_result_check(self, sampling_params): + """Test generate API and validate text result contains expected content.""" + prompt = "We the People of the United States, in Order to" + outputs_generate = self.llm.generate(prompt, sampling_params) + + if isinstance(outputs_generate, list): + res = outputs_generate[0].outputs.text + else: + res = outputs_generate + + expected1 = ( + "form a more perfect Union, establish Justice, insure domestic Tranquility, " + "provide for the common defence, promote the general Welfare, and secure the " + "Blessings of Liberty to ourselves and our Posterity, do ordain and establish " + "this Constitution for the United States of America." + ) + expected2 = ( + "form a more perfect Union, establish Justice, insure domestic Tranquility, " + "provide for the common defense, promote the general Welfare, and secure the " + "Blessings of Liberty to ourselves and our Posterity, do ordain and establish " + "this Constitution for the United States of America." + ) + + assert ( + expected1 in res or expected2 in res + ), f"Result check failed!\nExpected to contain:\n {expected1}\nGot:\n {res}" + + def test_generate_with_top_p_sampling(self): + """Test generate with top_p sampling.""" + params = SamplingParams(max_tokens=20, temperature=0.8, top_p=0.9) + prompt = "The meaning of life is" + output = self.llm.generate(prompt, params) + + result = output[0].outputs.text if isinstance(output, list) else output.outputs.text + assert len(result) > 0, "Should generate some text with top_p sampling" + + def test_generate_max_tokens_constraint(self): + """Test that max_tokens constraint is respected.""" + max_tokens = 10 + params = SamplingParams(max_tokens=max_tokens, temperature=0.1) + prompt = "Tell me a long story about" + output = self.llm.generate(prompt, params) + + token_ids = output[0].outputs.token_ids if isinstance(output, list) else output.outputs.token_ids + # Generated tokens should not exceed max_tokens by more than 1 (for EOS) + assert len(token_ids) <= max_tokens + 1, f"Expected at most {max_tokens + 1} tokens, got {len(token_ids)}"