From ffcb10cbcc2ebf4a0bb22c1d5625b06e0e833587 Mon Sep 17 00:00:00 2001 From: kebo01 Date: Tue, 2 Jun 2026 10:40:38 +0800 Subject: [PATCH 1/5] fd fallback fleet model clean commit --- fastdeploy/config.py | 2 +- fastdeploy/engine/args_utils.py | 6 +- .../model_executor/fallback/conftest.py | 180 +++++ .../fallback/test_fallback_fleet_model.py | 105 +++ .../graph_optimization/decorator.py | 6 +- fastdeploy/model_executor/models/__init__.py | 2 +- .../model_executor/models/model_base.py | 13 + .../models/paddleformers/__init__.py | 17 + .../models/paddleformers/base_fleet.py | 741 ++++++++++++++++++ fastdeploy/model_executor/utils.py | 4 + fastdeploy/worker/worker_process.py | 4 +- scripts/coverage_run.sh | 34 +- 12 files changed, 1102 insertions(+), 12 deletions(-) create mode 100644 fastdeploy/model_executor/fallback/conftest.py create mode 100644 fastdeploy/model_executor/fallback/test_fallback_fleet_model.py create mode 100644 fastdeploy/model_executor/models/paddleformers/base_fleet.py diff --git a/fastdeploy/config.py b/fastdeploy/config.py index ff926a3d90f..335638cd37e 100644 --- a/fastdeploy/config.py +++ b/fastdeploy/config.py @@ -60,7 +60,7 @@ _ResolvedTask = Literal["generate", "encode", "embed"] # Model implementation backend options -ModelImpl = Literal["auto", "fastdeploy", "paddleformers"] +ModelImpl = Literal["auto", "fastdeploy", "paddleformers", "paddlefleet"] _RUNNER_CONVERTS: dict[RunnerType, list[ConvertType]] = { "generate": [], diff --git a/fastdeploy/engine/args_utils.py b/fastdeploy/engine/args_utils.py index 6bd9733bb8b..c8da0dad3d0 100644 --- a/fastdeploy/engine/args_utils.py +++ b/fastdeploy/engine/args_utils.py @@ -139,6 +139,7 @@ class EngineArgs: 'auto': Use native FastDeploy implementation when available, fallback to PaddleFormers. 'fastdeploy': Use only native FastDeploy implementations. 'paddleformers': Use PaddleFormers backend with FastDeploy optimizations. + 'paddlefleet': Use PaddleFleet backend with FastDeploy optimizations. """ override_pooler_config: Optional[Union[dict, PoolerConfig]] = None """ @@ -650,7 +651,7 @@ def __post_init__(self): "kvcache_storage_backend is only supported when ENABLE_V1_KVCACHE_SCHEDULER=1" ) - valid_model_impls = ["auto", "fastdeploy", "paddleformers"] + valid_model_impls = ["auto", "fastdeploy", "paddleformers", "paddlefleet"] if self.model_impl not in valid_model_impls: raise NotImplementedError( f"not support model_impl: '{self.model_impl}'. " f"Must be one of: {', '.join(valid_model_impls)}" @@ -998,13 +999,14 @@ def add_cli_args(parser: FlexibleArgumentParser) -> FlexibleArgumentParser: model_group.add_argument( "--model-impl", type=str, - choices=["auto", "fastdeploy", "paddleformers"], + choices=["auto", "fastdeploy", "paddleformers", "paddlefleet"], default=EngineArgs.model_impl, help=( "Model implementation backend. " "'auto': Use native FastDeploy when available, fallback to PaddleFormers. " "'fastdeploy': Use only native FastDeploy implementations. " "'paddleformers': Use PaddleFormers backend with FastDeploy optimizations." + "'paddlefleet': Use PaddleFleet backend with FastDeploy optimizations." ), ) diff --git a/fastdeploy/model_executor/fallback/conftest.py b/fastdeploy/model_executor/fallback/conftest.py new file mode 100644 index 00000000000..41c2b277241 --- /dev/null +++ b/fastdeploy/model_executor/fallback/conftest.py @@ -0,0 +1,180 @@ +"""Pytest configuration for model_executor tests. + +This conftest handles special dependencies required only by specific tests, +avoiding pollution of the global test environment. +""" + +import os +import shlex +import subprocess +import sys + +import pytest + + +def get_package_version(package_name): + """Get the version of an installed package. + + Args: + package_name: Name of the package + + Returns: + Version string or "not installed" if package is not found + """ + try: + import importlib.metadata + + version = importlib.metadata.version(package_name) + return version + except Exception: + try: + # Fallback for older Python versions + import pkg_resources + + version = pkg_resources.get_distribution(package_name).version + return version + except Exception: + return "not installed" + + +def print_package_versions(): + """Print versions of key packages (paddlepaddle, paddlefleet, paddleformers).""" + print("\n" + "=" * 70) + print("[conftest] Package Versions:") + print("=" * 70) + + packages = ["paddlepaddle-gpu", "paddlefleet", "paddleformers", "transformers"] + for pkg in packages: + version = get_package_version(pkg) + status = "✓" if version != "not installed" else "✗" + print(f"[conftest] {status} {pkg:20s}: {version}") + + print("=" * 70 + "\n") + + +def pytest_configure(config): + """Configure pytest before test collection.""" + # Register custom marker for paddlefleet tests + config.addinivalue_line("markers", "paddlefleet: tests that require paddlefleet and paddleformers dependencies") + + +def pytest_collection_modifyitems(config, items): + """Modify test collection to handle paddlefleet dependencies. + + This hook runs after test collection but before test execution. + It checks if any collected tests require paddlefleet dependencies + and installs them in an isolated manner if needed. + """ + # IMPORTANT: Skip installation during collection phase (--collect-only) + if config.option.collectonly: + print("[conftest] Skipping dependency installation during collection phase") + return + + # All tests in this directory require paddlefleet + if not items: + return + + # Check if dependencies are already installed with correct versions + try: + import paddlefleet # noqa: F401 + + print("\n" + "=" * 70) + print("[conftest] paddlefleet already installed, skipping installation") + print("=" * 70) + print_package_versions() + return + except ImportError: + pass + + # Print versions before installation + print("\n" + "=" * 70) + print("[conftest] Package versions BEFORE installing paddlefleet dependencies:") + print("=" * 70) + print_package_versions() + + # Install dependencies only when running paddlefleet tests + print("=" * 70) + print("[conftest] Installing paddlefleet-specific dependencies...") + print("=" * 70) + + try: + # Install paddleformers + paddleformers_url = os.getenv( + "PADDLEFORMERS_WHEEL_URL", + "paddleformers==1.1.0.dev20260507 --extra-index-url https://www.paddlepaddle.org.cn/packages/stable/cu126/ --extra-index-url https://www.paddlepaddle.org.cn/packages/nightly/cu126/", # fallback to PyPI name + ) + install_args = [sys.executable, "-m", "pip", "install"] + shlex.split(paddleformers_url) + ["--quiet"] + subprocess.check_call(install_args) + print(f"[conftest] ✓ Installed paddleformers 1.1.0.dev20250507 from {paddleformers_url}") + + # Install paddlefleet (skip paddlepaddle dependency, use existing version) + paddlefleet_url = os.getenv( + "PADDLEFLEET_WHEEL_URL", + "paddlefleet==0.3.0.dev20260527 --extra-index-url https://www.paddlepaddle.org.cn/packages/stable/cu126/ --extra-index-url https://www.paddlepaddle.org.cn/packages/nightly/cu126/", # fallback to PyPI name + ) + # Use --no-deps to avoid reinstalling paddlepaddle + install_args = ( + [sys.executable, "-m", "pip", "install"] + shlex.split(paddlefleet_url) + ["--no-deps", "--quiet"] + ) + subprocess.check_call(install_args) + print(f"[conftest] ✓ Installed paddlefleet 0.3.0.dev20260527 (--no-deps) from {paddlefleet_url}") + + # Install paddlefleet_ops + paddlefleet_ops_url = os.getenv( + "PADDLEFLEET_OPS_WHEEL_URL", + "paddlefleet_ops==0.3.0.dev20260520+2702ba51 --extra-index-url https://www.paddlepaddle.org.cn/packages/stable/cu126/ --extra-index-url https://www.paddlepaddle.org.cn/packages/nightly/cu126/", + ) + install_args = [sys.executable, "-m", "pip", "install"] + shlex.split(paddlefleet_ops_url) + ["--quiet"] + subprocess.check_call(install_args) + print(f"[conftest] ✓ Installed paddlefleet_ops 0.3.0.dev20260520+2702ba51 from {paddlefleet_ops_url}") + print("[conftest] ℹ Using existing paddlepaddle from environment") + + # Clear module cache to ensure fresh imports after version change + # This is critical when transformers version changes during pytest session + import importlib + + keys_to_clear = [ + k for k in sys.modules.keys() if "huggingface_hub" in k or "transformers" in k or "paddleformers" in k + ] + for key in keys_to_clear: + del sys.modules[key] + importlib.invalidate_caches() + if keys_to_clear: + print(f"[conftest] ✓ Cleared {len(keys_to_clear)} cached modules (transformers/paddleformers)") + + # Print versions after installation + print("\n" + "=" * 70) + print("[conftest] Package versions AFTER installing paddlefleet dependencies:") + print("=" * 70) + print_package_versions() + + except subprocess.CalledProcessError as e: + print(f"[conftest] ✗ Failed to install dependencies: {e}") + print("[conftest] Tests requiring paddlefleet will be skipped") + + # Mark all paddlefleet tests to skip + skip_marker = pytest.mark.skip(reason="Failed to install paddlefleet dependencies") + for item in items: + if "test_fallback_fleet_model.py" in item.nodeid: + item.add_marker(skip_marker) + + print("=" * 70 + "\n") + + +def pytest_sessionfinish(session, exitstatus): + """Optional: cleanup after test session if needed. + + You can uninstall the dependencies here to keep the environment clean, + but this may slow down subsequent test runs. + """ + # Uncomment the following to auto-cleanup after tests + # if os.getenv("CLEANUP_PADDLEFLEET_DEPS", "false").lower() == "true": + # try: + # subprocess.check_call([ + # sys.executable, "-m", "pip", "uninstall", + # "paddlefleet", "paddleformers", "-y", "--quiet" + # ]) + # print("[conftest] Cleaned up paddlefleet dependencies") + # except Exception: + # pass + pass diff --git a/fastdeploy/model_executor/fallback/test_fallback_fleet_model.py b/fastdeploy/model_executor/fallback/test_fallback_fleet_model.py new file mode 100644 index 00000000000..28d126c4ea1 --- /dev/null +++ b/fastdeploy/model_executor/fallback/test_fallback_fleet_model.py @@ -0,0 +1,105 @@ +# Copyright (c) 2026 PaddlePaddle Authors. All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +"""Unit tests for LLM inference with real model results validation.""" + +import gc +import os + +import pytest + +from fastdeploy import LLM, SamplingParams + +DEFAULT_MODEL_DIR = "./models" +MODEL_NAME = "Qwen3-0.6B-Base" + +model_dir = os.getenv("MODEL_PATH", DEFAULT_MODEL_DIR) +MODEL_PATH = os.path.join(model_dir, MODEL_NAME) + + +@pytest.mark.gpu +class TestLLMInferenceRealModel: + """Test LLM inference with real model results validation.""" + + @classmethod + def setup_class(cls): + """Setup LLM instance once for all tests in this class.""" + cls.llm = LLM( + model=MODEL_PATH, + model_impl="paddlefleet", + max_model_len=32768, + tensor_parallel_size=1, + data_parallel_size=1, + enable_expert_parallel=True, + graph_optimization_config={"use_cudagraph": False}, + ) + + @classmethod + def teardown_class(cls): + """Cleanup LLM instance after all tests.""" + if hasattr(cls, "llm"): + del cls.llm + gc.collect() + + @pytest.fixture + def sampling_params(self): + """Provide sampling parameters for generation.""" + return SamplingParams(max_tokens=64, temperature=0.1) + + def test_generate_with_text_result_check(self, sampling_params): + """Test generate API and validate text result contains expected content.""" + prompt = "We the People of the United States, in Order to" + outputs_generate = self.llm.generate(prompt, sampling_params) + + if isinstance(outputs_generate, list): + res = outputs_generate[0].outputs.text + else: + res = outputs_generate + + expected1 = ( + "form a more perfect Union, establish Justice, insure domestic Tranquility, " + "provide for the common defence, promote the general Welfare, and secure the " + "Blessings of Liberty to ourselves and our Posterity, do ordain and establish " + "this Constitution for the United States of America." + ) + expected2 = ( + "form a more perfect Union, establish Justice, insure domestic Tranquility, " + "provide for the common defense, promote the general Welfare, and secure the " + "Blessings of Liberty to ourselves and our Posterity, do ordain and establish " + "this Constitution for the United States of America." + ) + + assert ( + expected1 in res or expected2 in res + ), f"Result check failed!\nExpected to contain:\n {expected1}\nGot:\n {res}" + + def test_generate_with_top_p_sampling(self): + """Test generate with top_p sampling.""" + params = SamplingParams(max_tokens=20, temperature=0.8, top_p=0.9) + prompt = "The meaning of life is" + output = self.llm.generate(prompt, params) + + result = output[0].outputs.text if isinstance(output, list) else output.outputs.text + assert len(result) > 0, "Should generate some text with top_p sampling" + + def test_generate_max_tokens_constraint(self): + """Test that max_tokens constraint is respected.""" + max_tokens = 10 + params = SamplingParams(max_tokens=max_tokens, temperature=0.1) + prompt = "Tell me a long story about" + output = self.llm.generate(prompt, params) + + token_ids = output[0].outputs.token_ids if isinstance(output, list) else output.outputs.token_ids + # Generated tokens should not exceed max_tokens by more than 1 (for EOS) + assert len(token_ids) <= max_tokens + 1, f"Expected at most {max_tokens + 1} tokens, got {len(token_ids)}" diff --git a/fastdeploy/model_executor/graph_optimization/decorator.py b/fastdeploy/model_executor/graph_optimization/decorator.py index 05ec79a495c..3d8d72807d5 100644 --- a/fastdeploy/model_executor/graph_optimization/decorator.py +++ b/fastdeploy/model_executor/graph_optimization/decorator.py @@ -60,12 +60,12 @@ def __init__(self, fd_config: FDConfig, **kwargs): # Not use graph optimization return - def __call__(self, **kwargs): + def __call__(self, *args, **kwargs): """Decorator model.__call__() func""" if not self.use_graph_opt: - return self.forward(**kwargs) + return self.forward(*args, **kwargs) - return self.graph_opt_backend(**kwargs) + return self.graph_opt_backend(*args, **kwargs) cls.__init__ = __init__ cls.__call__ = __call__ diff --git a/fastdeploy/model_executor/models/__init__.py b/fastdeploy/model_executor/models/__init__.py index 51a46a176ea..831ab8893c8 100644 --- a/fastdeploy/model_executor/models/__init__.py +++ b/fastdeploy/model_executor/models/__init__.py @@ -19,7 +19,7 @@ import os from pathlib import Path -from paddleformers.transformers import PretrainedModel +from paddleformers.transformers.model_utils import PretrainedModel from fastdeploy.plugins.model_register import load_model_register_plugins diff --git a/fastdeploy/model_executor/models/model_base.py b/fastdeploy/model_executor/models/model_base.py index d55c88947e7..9e51a1e6bf7 100644 --- a/fastdeploy/model_executor/models/model_base.py +++ b/fastdeploy/model_executor/models/model_base.py @@ -194,6 +194,19 @@ def _try_resolve_paddleformers( elif model_impl == "auto" and is_fallback: # Auto mode fallback when no native implementation exists backend_arch = "PaddleFormersForCausalLM" + elif model_impl == "paddlefleet": + from fastdeploy.model_executor.utils import is_paddlefleet_available + + if is_paddlefleet_available(): + backend_arch = "PaddleFleetForCausalLM" + else: + raise ImportError( + "paddlefleet backend requires paddlefleet to be installed.\n" + "Please install with [change cuda version if needed ]:\n" + "python -m pip install paddlefleet==0.3.0.dev20260527 " + "--extra-index-url https://www.paddlepaddle.org.cn/packages/stable/cu126/ " + "--extra-index-url https://www.paddlepaddle.org.cn/packages/nightly/cu126/" + ) elif model_impl == "fastdeploy": return None else: diff --git a/fastdeploy/model_executor/models/paddleformers/__init__.py b/fastdeploy/model_executor/models/paddleformers/__init__.py index 77174269389..315b72b8775 100644 --- a/fastdeploy/model_executor/models/paddleformers/__init__.py +++ b/fastdeploy/model_executor/models/paddleformers/__init__.py @@ -19,6 +19,7 @@ ModelForCasualLM, ModelRegistry, ) +from fastdeploy.model_executor.utils import is_paddlefleet_available from .base import PaddleFormersModelBase from .causallm import CausalLMMixin @@ -38,3 +39,19 @@ class PaddleFormersForCausalLM(CausalLMMixin, PaddleFormersModelBase, ModelForCa @classmethod def name(cls): return "PaddleFormersForCausalLM" + + +if is_paddlefleet_available(): + from .base_fleet import PaddleFleetModelBase + + __all__ += ["PaddleFleetForCausalLM"] + + @ModelRegistry.register_model_class( + architecture="PaddleFleetForCausalLM", + module_name="paddleformers", + category=ModelCategory.TEXT_GENERATION, + ) + class PaddleFleetForCausalLM(PaddleFleetModelBase, ModelForCasualLM): + @classmethod + def name(cls): + return "PaddleFleetForCausalLM" diff --git a/fastdeploy/model_executor/models/paddleformers/base_fleet.py b/fastdeploy/model_executor/models/paddleformers/base_fleet.py new file mode 100644 index 00000000000..1f57aebb75d --- /dev/null +++ b/fastdeploy/model_executor/models/paddleformers/base_fleet.py @@ -0,0 +1,741 @@ +""" +# Copyright (c) 2025 PaddlePaddle Authors. All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +""" + +"""Generic PaddleFormers modeling backend base class.""" + +import logging + +from fastdeploy.model_executor.utils import is_paddlefleet_available + +if not is_paddlefleet_available(): + logging.warning("paddlefleet is not installed, skipping base_fleet module") +else: + import math + from collections.abc import Iterable + from typing import TYPE_CHECKING, Dict + + import paddle + from paddle import nn + from paddlefleet.models.gpt.gpt_embedding import GPTEmbedding + from paddlefleet.models.gpt.lm_head import GPTLMHead + from paddlefleet.transformer.layer import FleetLayer + from paddlefleet.transformer.transformer_config import TransformerConfig + from paddleformers.transformers import AutoConfig + from paddleformers.transformers.auto.modeling import AutoModelForCausalLM + from paddleformers.utils.log import logger + + from fastdeploy.model_executor.forward_meta import ForwardMeta # noqa: F401 + from fastdeploy.model_executor.graph_optimization.decorator import ( + support_graph_optimization, + ) + + if TYPE_CHECKING: + from fastdeploy.config import FDConfig + + from fastdeploy.model_executor.layers.attention.attention import Attention + + class FastDeployAttention(FleetLayer): + """ + FastDeploy version of DotProductAttention, holding an internal FastDeploy Attention module. + + This class can be used to replace PaddleFleet's DotProductAttention, + using FastDeploy's attention backend for computation. + """ + + def __init__( + self, + config: TransformerConfig, + fd_attention: Attention, + num_attention_heads: int, + num_key_value_heads: int, + softmax_scale: float, + hidden_size_per_attention_head: int, + hidden_size_per_partition: int, + layer_id: int, + ): + """ + Initialize FastDeployAttention. + + Args: + fd_attention: FastDeploy Attention instance + num_attention_heads: Number of attention heads + num_key_value_heads: Number of KV heads + softmax_scale: Softmax scaling factor + hidden_size_per_attention_head: Hidden dimension per attention head + hidden_size_per_partition: Hidden size per partition + layer_id: Current layer ID + """ + super().__init__(config) + self.fd_attention = fd_attention + self.num_attention_heads = num_attention_heads + self.num_key_value_heads = num_key_value_heads + self.softmax_scale = softmax_scale + self.hidden_size_per_attention_head = hidden_size_per_attention_head + self.hidden_size_per_partition = hidden_size_per_partition + self.layer_id = layer_id + + def forward( + self, + query: paddle.Tensor, + key: paddle.Tensor, + value: paddle.Tensor, + attention_mask: paddle.Tensor, + attn_mask_startend_row_indices: paddle.Tensor = None, + attn_mask_type=None, + attention_bias: paddle.Tensor = None, + packed_seq_params=None, + use_rr_flash_attention: bool = False, + past_key_values=None, + layer_idx=None, + use_cache=False, + x: paddle.Tensor = None, + qr: paddle.Tensor = None, + kv_compressed: paddle.Tensor = None, + k_pos_emb: paddle.Tensor = None, + q_absorbed: paddle.Tensor = None, + v_b_proj_weight: paddle.Tensor = None, + ): + """ + Forward pass. + + Args: + query: Query tensor, supported formats: + - 4D BSHD: [b, sq, np, hn] (PaddleFleet default) + - 3D HSD: [np, sq, hn] + - 3D SHD: [sq, np, hn] + key: Key tensor, same format as above, but head count may differ (GQA) + value: Value tensor, same format as above + attention_mask: Attention mask + attn_mask_startend_row_indices: FlashMask start-end row indices + attn_mask_type: Attention mask type + attention_bias: Attention bias + packed_seq_params: Packed sequence parameters + use_rr_flash_attention: Whether to use RR Flash Attention + kv_compressed: Compressed KV tensor for MLA (Multi-Latent Attention) + + Returns: + Attention output tensor + """ + # Try to get forward_meta from config (PaddleFleet does not pass this parameter when calling) + forward_meta = getattr(self.config, "forward_meta", None) + assert forward_meta is not None, "forward_meta must be provided" + + # Set scaling factor + original_scale = getattr(self.fd_attention, "scale", None) + if original_scale is None: + self.fd_attention.scale = self.softmax_scale + + # Check if MLA mode is enabled + is_mla = getattr(self.config, "multi_latent_attention", False) + + try: + # Refer to the processing logic of fastdeploy_append_attention_forward + # Support 3D (SHD) and 4D (BSHD) input + + # 4D input: squeeze to 3D (only supports batch=1) + def squeeze_to_3d(t: paddle.Tensor, name: str) -> paddle.Tensor: + if t is None: + return None + if t.ndim == 4: + if int(t.shape[0]) != 1: + raise ValueError( + f"{name} batch size {int(t.shape[0])} not supported, only batch=1 is supported" + ) + return t.squeeze(0) + if t.ndim == 3: + return t + raise ValueError(f"{name} has unexpected dims {t.ndim}, expect 3 or 4") + + q = squeeze_to_3d(query, "query") + k = squeeze_to_3d(key, "key") + v = squeeze_to_3d(value, "value") + + if is_mla: + need_do_prefill = forward_meta.max_len_tensor_cpu[1] > 0 + need_do_decode = forward_meta.max_len_tensor_cpu[2] > 0 + + # MLA mode: pass q, k, v, compressed_kv, k_pe separately + # Reference: deepseek_v3.py line 389 + # + # Note: + # - Prefill (flash_attn_func): expects 3D tensors [seq, heads, dim] + # - Decode (multi_head_latent_attention): expects 2D tensors [seq, heads*dim] + # So we need to flatten q for decode phase only + + # Process compressed_kv and k_pe + + assert kv_compressed is not None, "kv_compressed must be provided when use" + compressed_kv = kv_compressed.squeeze(0) + k_pos_emb = k_pos_emb.squeeze(0) + + output = None + fmqa_out = None + if need_do_prefill: + # Prefill: keep 3D tensors for flash_attn_func + output = self.fd_attention.forward( + q=q, + k=k, + v=v, + qkv=None, + compressed_kv=compressed_kv, + k_pe=k_pos_emb, + forward_meta=forward_meta, + ) + output.reshape_([output.shape[0], output.shape[1] * output.shape[2]]) + + if need_do_decode: + # Decode: use absorbed q for multi_head_latent_attention C++ kernel + # q_absorbed: [s, heads, kv_lora_rank + qk_rope_head_dim] (after squeeze_to_3d) + # C++ kernel expects: [token_num, heads * (kv_lora_rank + qk_rope_head_dim)] + q_abs = squeeze_to_3d(q_absorbed, "q_absorbed") if q_absorbed.ndim == 4 else q_absorbed + seq_len = int(q_abs.shape[0]) + q_input = q_abs.reshape([seq_len, -1]) + + fmqa_out = self.fd_attention.forward( + q=q_input, + k=None, + v=None, + qkv=None, + compressed_kv=compressed_kv, + k_pe=k_pos_emb, + forward_meta=forward_meta, + ) + + # V de-absorption: kernel output [token, heads * kv_lora_rank] + # -> [heads, token, kv_lora_rank] @ wv_b [heads, kv_lora_rank, v_head_dim] + # -> [token, heads * v_head_dim] + kv_lora_rank = self.config.kv_lora_rank + v_head_dim = self.config.v_head_dim + num_heads = fmqa_out.shape[-1] // kv_lora_rank + fmqa_out = fmqa_out.reshape([-1, num_heads, kv_lora_rank]).transpose([1, 0, 2]) + fmqa_out = paddle.bmm(fmqa_out, v_b_proj_weight) + fmqa_out = fmqa_out.transpose([1, 0, 2]).reshape([-1, num_heads * v_head_dim]) + # Merge prefill and decode outputs if both are present + if need_do_prefill: + try: + from fastdeploy.model_executor.ops.gpu import ( + merge_prefill_decode_output, + ) + + merge_prefill_decode_output( + output, + fmqa_out, + forward_meta.seq_lens_encoder, + forward_meta.seq_lens_decoder, + forward_meta.seq_lens_this_time, + forward_meta.cu_seqlens_q, + num_heads, + v_head_dim, + 1, + ) + except (ImportError, AttributeError): + logger.warning("merge_prefill_decode_output not available, using decode output only") + output = fmqa_out + else: + output = fmqa_out + else: + # Standard mode: concatenate QKV + seq_len = int(q.shape[0]) + + # SHD: [seq, heads, dim] -> flatten to [seq, heads*dim] + q_flat = q.reshape([seq_len, -1]) + k_flat = k.reshape([seq_len, -1]) + v_flat = v.reshape([seq_len, -1]) + + # Concatenate QKV: [seq, (q_heads + kv_heads + kv_heads) * head_dim] + qkv = paddle.concat([q_flat, k_flat, v_flat], axis=-1) + + output = self.fd_attention.forward(qkv=qkv, forward_meta=forward_meta) + + # Restore batch dimension: [seq, hidden] -> [b, seq, hidden] + # PaddleFleet expects 3D output format + output = output.unsqueeze(0) + + return output + finally: + # Restore original scale + if original_scale is None: + if hasattr(self.fd_attention, "scale"): + delattr(self.fd_attention, "scale") + else: + self.fd_attention.scale = original_scale + + @support_graph_optimization + class PaddleFleetModelBase(nn.Layer): + """ + A mixin-style base class to provide PaddleFormers backend logic on top of nn.Layer. + This class subclasses nn.Layer and provides common methods to + initialize and manage a PaddleFormers model. + """ + + def __init__(self, fd_config: "FDConfig", **kwargs): + super().__init__(fd_config) + logger.info("Initializing PaddleFormers backend.") + self.fd_config = fd_config # FastDeploy's top-level FDConfig + self.model_config = fd_config.model_config # FastDeploy's ModelConfig + self.paddleformers_config = AutoConfig.from_pretrained(self.model_config.model) + + # Assign parallel config from fd_config.parallel_config to paddleformers_config + parallel_config = fd_config.parallel_config + # parallel_config.tensor_parallel_size = 1 + # parallel_config.expert_parallel_size = 2 + self.paddleformers_config.data_parallel_size = parallel_config.data_parallel_size + self.paddleformers_config.tensor_model_parallel_size = parallel_config.tensor_parallel_size + self.paddleformers_config.sequence_parallel = parallel_config.sequence_parallel + self.paddleformers_config.expert_model_parallel_size = parallel_config.expert_parallel_size + # if parallel_config.expert_parallel_size > 1 and parallel_config.sequence_parallel == False: + # self.paddleformers_config.tensor_model_parallel_size = 1 + # logger.warning("When using expert parallelism and tensor parallelism, sequence parallelism must be used in fleet set tp=1 .") + self.paddleformers_config.parallel_output = self.paddleformers_config.tensor_model_parallel_size == 1 + self.paddleformers_config.max_seq_len = self.model_config.max_model_len + self.paddleformers_config.params_dtype = self.model_config.dtype or "bfloat16" + # self.paddleformers_config.moe_grouped_gemm = True + self.paddleformers_config.moe_token_dispatcher_type = "deepep" + # self.paddleformers_config.use_cpu_initialization = True + self.paddleformers_config.gated_attention = getattr(self.paddleformers_config, "use_gated_attn", False) + if self.paddleformers_config.multi_latent_attention: + self.paddleformers_config.qk_head_dim = ( + self.paddleformers_config.qk_rope_head_dim + self.paddleformers_config.qk_nope_head_dim + ) + # Initialize PaddleFleet parallel_state so that its TP group is consistent with FastDeploy. + # PaddleFleet's ColumnParallelLinear/RowParallelLinear obtains TP world_size/rank + # via parallel_state. Without initialization, it defaults to 1, causing weights + # to not be TP-sharded, which mismatches FastDeploy's KV cache (allocated per TP). + # if parallel_config.tensor_parallel_size > 1: + self._init_paddlefleet_parallel_state(fd_config) + + # The specific text model config + # Sync important config values from text_config to model_config + # This ensures fallback models use their actual config values instead of FD defaults + self._sync_config_from_text_config() + # For convenience, keep direct access to some FD configs + self.quant_config = self.fd_config.quant_config + + # Load model using from_pretrained to support weight loading + # Pass dtype, config and other options from kwargs + + model_load_kwargs = { + "dtype": self.model_config.dtype, + "config": self.paddleformers_config, + "convert_from_hf": True, + "load_via_cpu": True, + "load_checkpoint_format": "flex_checkpoint", + } + # Set random seed before model construction for reproducibility + self.model = AutoModelForCausalLM.from_pretrained( + self.model_config.model, + **model_load_kwargs, + ) + + self.model.eval() + # Patch PaddleFleet core_attention with FastDeploy attention + patched_count = patch_paddlefleet_core_attention( + model=self.model, + fd_config=self.fd_config, + ) + logger.info(f"Patched {patched_count} attention layers with FastDeploy") + + def compute_logits(self, hidden_state, forward_meta=None): + """Compute logits from hidden states using lm_head.""" + lm_head = self.model.get_lm_head() + # ColumnParallelLinear expects input [s, b, h] + hidden_state = hidden_state.unsqueeze(1) # [num_tokens, h] -> [num_tokens, 1, h] + logits = lm_head({"hidden_states": hidden_state}) + # Output [num_tokens, 1, vocab], squeeze back to [num_tokens, vocab] + if logits.ndim == 3: + logits = logits.squeeze(1) + logits = logits.astype(paddle.float32) + logits[:, self.model_config.ori_vocab_size :] = -float("inf") + return logits + + def _init_paddlefleet_parallel_state(self, fd_config) -> None: + """ + Initialize PaddleFleet's parallel_state so that ColumnParallelLinear/RowParallelLinear + can correctly obtain TP world_size and rank, and thus correctly shard weights + and build sharded_state_dict. + + References the initialization logic in PaddleFormers' training_args.py, + using the official initialize_fleet API instead of directly manipulating + parallel_state internal variables. + """ + from paddle.distributed import fleet + + parallel_config = fd_config.parallel_config + + strategy = fleet.DistributedStrategy() + strategy.hybrid_configs = { + "dp_degree": 1, + "mp_degree": parallel_config.tensor_parallel_size, + "pp_degree": 1, + "sep_degree": 1, + "sharding_degree": parallel_config.data_parallel_size, + "ep_degree": parallel_config.expert_parallel_size, + "cp_degree": 1, + "moe_sharding_degree": 1, + "order": [ + "pp", + "moe_sharding", + "ep", + "dp", + "sharding", + "sep", + "cp", + "mp", + ], + } + fleet.init(is_collective=True, strategy=strategy) + logger.info( + f"Initialized PaddleFleet parallel_state via initialize_fleet " + f"(sharddp={parallel_config.data_parallel_size}, " + f"mp={parallel_config.tensor_parallel_size}, " + f"ep={parallel_config.expert_parallel_size}, " + f"sp={parallel_config.sequence_parallel})" + ) + + import paddle.distributed as dist + from paddlefleet import parallel_state + + hcg = fleet.get_hybrid_communicate_group() + expected_tp_size = parallel_config.tensor_parallel_size + + # Check if we need to initialize or reinitialize TP group + need_init = False + if parallel_state._TENSOR_MODEL_PARALLEL_GROUP is None: + need_init = True + reason = "TP group not initialized" + else: + # Check if current TP group size matches expected + current_tp_group = parallel_state._TENSOR_MODEL_PARALLEL_GROUP + current_tp_size = getattr(current_tp_group, "nranks", None) + if current_tp_size is None: + current_tp_size = getattr(current_tp_group, "world_size", None) + if current_tp_size != expected_tp_size: + need_init = True + reason = f"TP group size mismatch: current={current_tp_size}, expected={expected_tp_size}" + + if need_init: + logger.warning(f"{reason}, reinitializing TP group with size={expected_tp_size}") + if expected_tp_size == 1: + # Single process TP group - create manually + current_rank = dist.get_rank() + tp_ranks = [current_rank] + default_pg = dist.new_group(ranks=tp_ranks) + parallel_state._TENSOR_MODEL_PARALLEL_GROUP = default_pg + parallel_state._TENSOR_MODEL_PARALLEL_GLOBAL_RANKS = tp_ranks + logger.info(f"Reinitialized TP group with size=1, rank={current_rank}, ranks={tp_ranks}") + else: + # Multiple processes - use hcg's mp group + parallel_state.initialize_model_parallel(hcg) + + from paddlefleet.tensor_parallel.random import ( + model_parallel_cuda_manual_seed, + ) + + try: + model_parallel_cuda_manual_seed(seed=42) + except AssertionError: + pass + + def _sync_config_from_text_config(self) -> None: + """ + Sync important config values from text_config (PaddleFormers/HF config) + to model_config. This ensures fallback models use their actual config + values instead of FD's defaults. + + This is crucial for models with unique configs like: + - Gemma3: tie_word_embeddings=True, layer_types, sliding_window + - Mistral: sliding_window + - etc. + """ + mc = self.model_config + tc = self.paddleformers_config + + sync_fields = [ + "tie_word_embeddings", + "sliding_window", + "sliding_window_pattern", + "layer_types", # May be computed as property + "rope_theta", + "rope_scaling", + "head_dim", + "v_head_dim", # For MLA (Multi-Latent Attention) support + "qk_head_dim", + "rms_norm_eps", + "rope_local_base_freq", # Gemma3 specific + "query_pre_attn_scalar", # Gemma3 specific + ] + + synced = [] + for field in sync_fields: + text_value = getattr(tc, field, None) + if text_value is not None: + # Only sync if not already set or if FD default differs + current_value = getattr(mc, field, None) if hasattr(mc, field) else None + if current_value is None or current_value != text_value: + setattr(mc, field, text_value) + synced.append(f"{field}={text_value}") + + def embed_input_ids(self, input_ids: paddle.Tensor) -> paddle.Tensor: + """Embed input_ids using the model's embedding layer.""" + embedding_layer = self.model.get_input_embeddings() + + original_ndim = input_ids.ndim + if input_ids.ndim == 1: + input_ids = input_ids.unsqueeze(0) # [num_tokens] -> [1, num_tokens] + + inputs_embeds = embedding_layer(input_ids) + + # Embedding output is [batch, seq, h], squeeze back to [num_tokens, h] + if original_ndim == 1 and inputs_embeds.ndim == 3: + inputs_embeds = inputs_embeds.squeeze(0) + + if hasattr(self, "embed_scale") and self.embed_scale is not None: + inputs_embeds *= self.embed_scale + return inputs_embeds + + @paddle.no_grad() + def forward( + self, + inputs: Dict, + forward_meta: ForwardMeta, + **kwargs, + ): + """Full transformer forward: input_ids -> hidden_states. + + This method is the primary forward pass for the model, computing: + 1. Position IDs based on seq_lens_decoder (absolute positions for RoPE) + 2. Token embeddings via embed_input_ids + 3. Transformer layers via self.model() + + Returns: + hidden_states: [TotalTokens, HiddenDim] + """ + # Handle empty batch case (e.g., DP worker with no data in EP mode) + if getattr(forward_meta, "is_zero_size", False) or inputs["ids_remove_padding"].shape[0] == 0: + # Return zero tensor with correct shape: [0, hidden_size] + hidden_size = self.model_config.hidden_size + dtype = self.model_config.dtype + return paddle.empty([0, hidden_size], dtype=dtype) + + ids_remove_padding = inputs["ids_remove_padding"] + num_tokens = ids_remove_padding.shape[0] + batch_id_per_token = forward_meta.batch_id_per_token # [num_tokens] + seq_lens_decoder = forward_meta.seq_lens_decoder # [batch_size, 1] + + if batch_id_per_token is not None and seq_lens_decoder is not None: + decoder_offsets = seq_lens_decoder.squeeze(-1) # [batch_size] + # Ensure decoder_offsets is at least 1D tensor + if decoder_offsets.ndim == 0: + decoder_offsets = decoder_offsets.reshape([1]) + token_decoder_offsets = paddle.index_select( + decoder_offsets, batch_id_per_token, axis=0 + ) # [num_tokens] + + cu_seqlens = forward_meta.cu_seqlens_q # [batch_size + 1] + if cu_seqlens is not None: + token_global_idx = paddle.arange(num_tokens, dtype="int64") + request_start_idx = paddle.index_select(cu_seqlens[:-1], batch_id_per_token, axis=0) + relative_positions = token_global_idx - request_start_idx.astype("int64") + else: + relative_positions = paddle.zeros([num_tokens], dtype="int64") + position_ids = token_decoder_offsets.astype("int64") + relative_positions + else: + position_ids = paddle.arange(num_tokens, dtype="int64") + if seq_lens_decoder is not None: + position_ids = position_ids + seq_lens_decoder[0, 0].astype("int64") + forward_meta.rope_already_applied = True + # Also set forward_meta on each TransformerLayer's config + # so that FastDeployAttention can retrieve it from core_attn.config + if hasattr(self.model, "run_function"): + for layer in self.model.run_function: + if not isinstance(layer, (GPTEmbedding, GPTLMHead)): + if hasattr(layer, "self_attn") and hasattr(layer.self_attn, "core_attention"): + core_attn = layer.self_attn.core_attention + if hasattr(core_attn, "config"): + core_attn.config.forward_meta = forward_meta + + inputs_embeds = self.embed_input_ids(ids_remove_padding).unsqueeze(0) + + # Build input dict, PipelineLayer passes data between layers via dict + model_input = { + "input_ids": None, + "position_ids": position_ids, + } + # Add other parameters from kwargs + for k, v in kwargs.items(): + if v is not None: + model_input[k] = v + + # Iterate over run_function, skip GPTLMHead + # Only call TransformerLayer + i = -1 + for layer in self.model.run_function: + if isinstance(layer, GPTLMHead): + continue + if isinstance(layer, (GPTEmbedding)): + model_input = layer(model_input, decoder_input=inputs_embeds) + else: + model_input = layer(model_input) + i += 1 + hidden_states = model_input["hidden_states"] + # [b, s, h] -> [s, h] (b=1) + hidden_states = hidden_states.squeeze(0) + + return hidden_states + + @paddle.no_grad() + def load_weights(self, weights: Iterable[tuple[str, paddle.Tensor]]): + # use model.from_pretrained to load weight + logger.debug("load_weights called but skipped: weights already loaded via from_pretrained") + pass + + def set_state_dict(self, state_dict): + self.model.set_state_dict(state_dict) + + # ============================================================================ + # PaddleFleet Attention Patch Functions + # ============================================================================ + + def patch_paddlefleet_core_attention( + model, + fd_config: "FDConfig", + layers_to_patch: list[int] | None = None, + ): + """ + Replace core_attention in all TransformerLayers of a PaddleFleet model with FastDeployAttention. + + Args: + model: PaddleFleet model instance (inheriting from PipelineLayer) + fd_config: FastDeploy FDConfig object, used to create Attention instances + layers_to_patch: List of layer indices to patch, None means patch all layers + + Returns: + int: Number of layers successfully patched + + Raises: + ValueError: If the model structure is unexpected or parameters are incorrect + """ + if fd_config is None: + raise ValueError("fd_config must be provided") + + from fastdeploy.model_executor.layers.attention.attention import Attention + + # Iterate over run_function to find TransformerLayers + patched_count = 0 + transformer_layers = [] + + # Collect all TransformerLayers + if hasattr(model, "run_function"): + for layer in model.run_function: + # Try to identify TransformerLayer + layer_type = type(layer).__name__ + if "TransformerLayer" in layer_type or "transformer" in str(type(layer)): + transformer_layers.append(layer) + + if not transformer_layers: + # Try alternative ways to find layers + for name, module in model.named_sublayers(): + if "TransformerLayer" in type(module).__name__: + transformer_layers.append(module) + + if not transformer_layers: + raise ValueError("No TransformerLayer found in model") + + # Patch core_attention for each TransformerLayer + for layer in transformer_layers: + layer_number = getattr(layer, "layer_number", None) + if layer_number is None: + # Try to get from other attributes + layer_number = getattr(layer, "layer_id", None) + + if layer_number is None: + logger.warning("layer_number not found, skip patching...") + continue # Skip layers where layer_id cannot be obtained + + # Check if this layer needs to be patched + if layers_to_patch is not None and (layer_number) not in layers_to_patch: + continue + + # Get core_attention + if not hasattr(layer, "self_attn"): + logger.warning(f"self_attn not found in layer {layer_number}, skip patching...") + continue + + core_attn = layer.self_attn.core_attention + if core_attn is None: + logger.warning(f"core_attn not found in layer {layer_number}, skip patching...") + continue + + # Get configuration info + # Prefer per-partition values (values after TP sharding), + # because PaddleFleet's QKV output is already per-partition when TP>1 + num_attention_heads = getattr( + core_attn, "num_attention_heads_per_partition", getattr(core_attn.config, "num_attention_heads", None) + ) + num_key_value_heads = getattr( + core_attn, + "num_query_groups_per_partition", + getattr(core_attn.config, "num_key_value_heads", num_attention_heads), + ) + hidden_size_per_attention_head = getattr(core_attn, "hidden_size_per_attention_head", None) + if hidden_size_per_attention_head is not None: + softmax_scale = getattr(core_attn, "softmax_scale", 1.0 / math.sqrt(hidden_size_per_attention_head)) + else: + softmax_scale = 1.0 + + hidden_size_per_partition = getattr(core_attn, "hidden_size_per_partition", None) + if hidden_size_per_partition is None: + head_dim = getattr(core_attn, "hidden_size_per_attention_head", hidden_size_per_attention_head) + hidden_size_per_partition = num_attention_heads * head_dim + + fd_layer_id = layer_number + + # Create Attention instance inside FastDeployAttention + fd_attn_instance = Attention( + fd_config=fd_config, + layer_id=fd_layer_id, + ) + + # Override Attention instance's head config to match PaddleFleet model + # This is necessary because fd_config.model_config may differ from PaddleFleet model config + fd_attn_instance.num_heads = num_attention_heads + fd_attn_instance.kv_num_heads = num_key_value_heads + fd_attn_instance.head_dim = hidden_size_per_attention_head + logger.info( + f"Overriding Attention config: num_heads={num_attention_heads}, kv_num_heads={num_key_value_heads}, head_dim={hidden_size_per_attention_head}" + ) + + # Create FastDeployAttention object and directly replace core_attention + fast_deploy_core_attn = FastDeployAttention( + config=core_attn.config, + fd_attention=fd_attn_instance, + num_attention_heads=num_attention_heads, + num_key_value_heads=num_key_value_heads, + softmax_scale=softmax_scale, + hidden_size_per_attention_head=hidden_size_per_attention_head, + hidden_size_per_partition=hidden_size_per_partition, + layer_id=fd_layer_id, + ) + + # Replace core_attention object + layer.self_attn.core_attention = fast_deploy_core_attn + + patched_count += 1 + logger.info(f"Replaced core_attention with FastDeployAttention for layer {fd_layer_id}") + + logger.info(f"Successfully replaced {patched_count} core_attention layers with FastDeployAttention") + + return patched_count diff --git a/fastdeploy/model_executor/utils.py b/fastdeploy/model_executor/utils.py index 676b2a31d39..11f721447f9 100644 --- a/fastdeploy/model_executor/utils.py +++ b/fastdeploy/model_executor/utils.py @@ -596,6 +596,10 @@ def has_flashinfer(): return importlib.util.find_spec("flashinfer") is not None +def is_paddlefleet_available(): + return importlib.util.find_spec("paddlefleet") is not None + + @cache def get_sm_version(): if paddle.cuda.is_available(): diff --git a/fastdeploy/worker/worker_process.py b/fastdeploy/worker/worker_process.py index 694e2b23c80..d652b20e25f 100644 --- a/fastdeploy/worker/worker_process.py +++ b/fastdeploy/worker/worker_process.py @@ -1010,9 +1010,9 @@ def parse_args(): parser.add_argument( "--model-impl", type=str, - choices=["auto", "fastdeploy", "paddleformers"], + choices=["auto", "fastdeploy", "paddleformers", "paddlefleet"], default="auto", - help="Model implementation backend (auto, fastdeploy, paddleformers)", + help="Model implementation backend (auto, fastdeploy, paddleformers, paddlefleet)", ) parser.add_argument( diff --git a/scripts/coverage_run.sh b/scripts/coverage_run.sh index 79d91c11d88..ecd1b8b062f 100644 --- a/scripts/coverage_run.sh +++ b/scripts/coverage_run.sh @@ -11,9 +11,16 @@ export COVERAGE_RCFILE=${COVERAGE_RCFILE:-$DIR/../scripts/.coveragerc} # Classify tests into one of the following categories # - multi_gpu: requires multiple GPUs / ports (run sequentially) # - single_gpu: independent tests (can run in parallel) +# - isolated: tests that install dependencies, run last (isolated) # ============================================================ classify_tests() { local test_file=$1 + # Rule 0: model_executor/fallback tests should run in isolation (last) + if [[ "$test_file" =~ tests/model_executor/fallback/.*\.py ]]; then + echo "isolated" + return + fi + # Rule 1: distributed tests (explicit multi-GPU launch) if [[ "$test_file" =~ tests/distributed/.*test_.*\.py ]]; then echo "multi_gpu" @@ -247,6 +254,7 @@ fi MULTI_GPU_TESTS=() SINGLE_GPU_TESTS=() +ISOLATED_TESTS=() TOTAL_TESTS=0 for file in $ALL_TEST_FILES; do @@ -260,13 +268,18 @@ for file in $ALL_TEST_FILES; do "single_gpu") SINGLE_GPU_TESTS+=("$file") ;; + "isolated") + ISOLATED_TESTS+=("$file") + ;; esac done echo "Multi-GPU tests: ${#MULTI_GPU_TESTS[@]}" echo "Single-GPU tests: ${#SINGLE_GPU_TESTS[@]}" +echo "Isolated tests: ${#ISOLATED_TESTS[@]}" echo "Total tests: $TOTAL_TESTS" + # ============================================================ # Step 2: Run multi-GPU tests (sequential) # ============================================================ @@ -327,9 +340,24 @@ else fi # ============================================================ -# Step 4: Summary +# Step 4: Run isolated tests (last, to avoid dependency pollution) +# ============================================================ +echo "Step 4: Running isolated tests (tests with special dependencies)" + +if [ ${#ISOLATED_TESTS[@]} -gt 0 ]; then + echo "Isolated tests will run last to avoid dependency pollution." + for file in "${ISOLATED_TESTS[@]}"; do + echo "Running isolated test: $file" + run_test_with_logging "$file" "$failed_tests_file" + done +else + echo "No isolated tests to run." +fi + +# ============================================================ +# Step 5: Summary # ============================================================ -echo "Step 4: Summary" +echo "Step 5: Summary" # Count failed tests if [ -f "$failed_tests_file" ]; then @@ -369,7 +397,7 @@ if [ "$failed_count" -ne 0 ]; then # Only package logs when there are failures echo "====================================" - echo "Step 5: Packaging logs (only on failure)" + echo "Step 6: Packaging logs (only on failure)" echo "====================================" if [ -d "${run_path}/unittest_logs" ]; then From 5c640f1e48fcbec85d9fc899ac86d8b3389ff1c5 Mon Sep 17 00:00:00 2001 From: kebo01 Date: Tue, 2 Jun 2026 13:56:01 +0800 Subject: [PATCH 2/5] remove test --- {fastdeploy => tests}/model_executor/fallback/conftest.py | 0 .../model_executor/fallback/test_fallback_fleet_model.py | 0 2 files changed, 0 insertions(+), 0 deletions(-) rename {fastdeploy => tests}/model_executor/fallback/conftest.py (100%) rename {fastdeploy => tests}/model_executor/fallback/test_fallback_fleet_model.py (100%) diff --git a/fastdeploy/model_executor/fallback/conftest.py b/tests/model_executor/fallback/conftest.py similarity index 100% rename from fastdeploy/model_executor/fallback/conftest.py rename to tests/model_executor/fallback/conftest.py diff --git a/fastdeploy/model_executor/fallback/test_fallback_fleet_model.py b/tests/model_executor/fallback/test_fallback_fleet_model.py similarity index 100% rename from fastdeploy/model_executor/fallback/test_fallback_fleet_model.py rename to tests/model_executor/fallback/test_fallback_fleet_model.py From a39135a6eb82210ab84b27eff317dcdfcbee20ec Mon Sep 17 00:00:00 2001 From: xiaoguoguo626807 Date: Tue, 2 Jun 2026 19:03:40 +0800 Subject: [PATCH 3/5] fix coverge not right --- scripts/.coveragerc | 1 + tests/model_executor/fallback/conftest.py | 2 +- .../test_fallback_fleet_model_coverge.py | 641 ++++++++++++++++++ 3 files changed, 643 insertions(+), 1 deletion(-) create mode 100644 tests/model_executor/fallback/test_fallback_fleet_model_coverge.py diff --git a/scripts/.coveragerc b/scripts/.coveragerc index f5a48a3da0f..16529bd8019 100644 --- a/scripts/.coveragerc +++ b/scripts/.coveragerc @@ -11,6 +11,7 @@ source = */site-packages/fastdeploy */lib/python3.10/site-packages/fastdeploy */fastdeploy + /workspace/FastDeploy/fastdeploy [report] exclude_lines = diff --git a/tests/model_executor/fallback/conftest.py b/tests/model_executor/fallback/conftest.py index 41c2b277241..dc93a1071a5 100644 --- a/tests/model_executor/fallback/conftest.py +++ b/tests/model_executor/fallback/conftest.py @@ -155,7 +155,7 @@ def pytest_collection_modifyitems(config, items): # Mark all paddlefleet tests to skip skip_marker = pytest.mark.skip(reason="Failed to install paddlefleet dependencies") for item in items: - if "test_fallback_fleet_model.py" in item.nodeid: + if "test_fallback_fleet_model.py" in item.nodeid or "test_fallback_fleet_model_coverge.py" in item.nodeid: item.add_marker(skip_marker) print("=" * 70 + "\n") diff --git a/tests/model_executor/fallback/test_fallback_fleet_model_coverge.py b/tests/model_executor/fallback/test_fallback_fleet_model_coverge.py new file mode 100644 index 00000000000..79d00596726 --- /dev/null +++ b/tests/model_executor/fallback/test_fallback_fleet_model_coverge.py @@ -0,0 +1,641 @@ +# Copyright (c) 2026 PaddlePaddle Authors. All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +"""Unit tests to improve coverage of base_fleet.py uncovered branches. + +Covers: +- FastDeployAttention.forward MLA branch (lines 166-249) +- FastDeployAttention.forward edge cases (squeeze_to_3d errors, scale restore) +- patch_paddlefleet_core_attention error branches (lines 633-705) +- PaddleFleetModelBase.forward zero-size & fallback branches (lines 530-561) +- load_weights / set_state_dict (lines 603-608) +""" + +from types import SimpleNamespace +from unittest.mock import MagicMock, patch + +import pytest + +from fastdeploy.model_executor.utils import is_paddlefleet_available + +pytestmark = pytest.mark.skipif(not is_paddlefleet_available(), reason="paddlefleet not installed") + +if is_paddlefleet_available(): + import paddle + from paddlefleet.transformer.layer import FleetLayer + + from fastdeploy.model_executor.models.paddleformers.base_fleet import ( + FastDeployAttention, + PaddleFleetModelBase, + patch_paddlefleet_core_attention, + ) + +# ============================================================================ +# Helpers +# ============================================================================ + + +def _create_mla_attention(kv_lora_rank=4, v_head_dim=2, num_heads=2): + """Create a FastDeployAttention instance configured for MLA testing.""" + mock_config = MagicMock() + mock_config.multi_latent_attention = True + mock_config.kv_lora_rank = kv_lora_rank + mock_config.v_head_dim = v_head_dim + + mock_fd_attention = MagicMock() + # Ensure fd_attention does NOT have 'scale' by default (covers line 138-139) + del mock_fd_attention.scale + + with patch.object(FleetLayer, "__init__", lambda self, config: None): + attn = FastDeployAttention( + config=mock_config, + fd_attention=mock_fd_attention, + num_attention_heads=num_heads, + num_key_value_heads=num_heads, + softmax_scale=0.125, + hidden_size_per_attention_head=kv_lora_rank, + hidden_size_per_partition=num_heads * kv_lora_rank, + layer_id=0, + ) + # Manually set config since FleetLayer.__init__ is mocked + attn.config = mock_config + return attn, mock_fd_attention + + +def _create_standard_attention(num_heads=2, head_dim=64): + """Create a FastDeployAttention instance for non-MLA edge-case testing.""" + mock_config = MagicMock() + mock_config.multi_latent_attention = False + mock_config.forward_meta = MagicMock() + + mock_fd_attention = MagicMock() + del mock_fd_attention.scale + + with patch.object(FleetLayer, "__init__", lambda self, config: None): + attn = FastDeployAttention( + config=mock_config, + fd_attention=mock_fd_attention, + num_attention_heads=num_heads, + num_key_value_heads=num_heads, + softmax_scale=1.0 / (head_dim**0.5), + hidden_size_per_attention_head=head_dim, + hidden_size_per_partition=num_heads * head_dim, + layer_id=0, + ) + attn.config = mock_config + return attn, mock_fd_attention + + +def _create_mock_forward_meta(prefill_tokens=0, decode_tokens=0): + """Create a mock ForwardMeta for MLA testing.""" + forward_meta = MagicMock() + forward_meta.max_len_tensor_cpu = [0, prefill_tokens, decode_tokens] + forward_meta.seq_lens_encoder = MagicMock() + forward_meta.seq_lens_decoder = MagicMock() + forward_meta.seq_lens_this_time = MagicMock() + forward_meta.cu_seqlens_q = MagicMock() + return forward_meta + + +def _create_mock_fleet_model_for_forward(hidden_size=64, num_tokens=5): + """Create a minimal mock PaddleFleetModelBase for testing forward branches.""" + + class MockTransformerLayer: + """A mock layer that simulates TransformerLayer behavior in forward.""" + + def __init__(self): + self.self_attn = MagicMock() + self.self_attn.core_attention = MagicMock() + self.self_attn.core_attention.config = MagicMock() + + def __call__(self, model_input, **kwargs): + if "hidden_states" not in model_input: + model_input["hidden_states"] = paddle.randn([1, num_tokens, hidden_size]) + return model_input + + model = PaddleFleetModelBase.__new__(PaddleFleetModelBase) + model.model_config = SimpleNamespace(hidden_size=hidden_size, dtype="float32") + model.embed_input_ids = MagicMock(return_value=paddle.randn([num_tokens, hidden_size])) + + mock_layer = MockTransformerLayer() + model.model = MagicMock() + model.model.run_function = [mock_layer] + + return model + + +# ============================================================================ +# Tests for patch_paddlefleet_core_attention error branches (no GPU needed) +# ============================================================================ + + +class TestPatchPaddlefleetCoreAttentionErrors: + """Test error and fallback branches in patch_paddlefleet_core_attention.""" + + def test_fd_config_none_raises(self): + """Line 633-634: fd_config=None should raise ValueError.""" + with pytest.raises(ValueError, match="fd_config must be provided"): + patch_paddlefleet_core_attention(model=MagicMock(), fd_config=None) + + def test_no_transformer_layer_named_sublayers_fallback(self): + """Lines 652-654: Fallback to named_sublayers when run_function has no TransformerLayer.""" + model = MagicMock() + model.run_function = [MagicMock()] # Not a TransformerLayer + + # Create a mock TransformerLayer found via named_sublayers + transformer_layer = MagicMock() + type(transformer_layer).__name__ = "TransformerLayer" + transformer_layer.layer_number = 1 + transformer_layer.self_attn = MagicMock() + core_attn = MagicMock() + core_attn.num_attention_heads_per_partition = 8 + core_attn.num_query_groups_per_partition = 2 + core_attn.hidden_size_per_attention_head = 64 + core_attn.hidden_size_per_partition = 512 + core_attn.config = MagicMock() + transformer_layer.self_attn.core_attention = core_attn + + model.named_sublayers.return_value = [("", transformer_layer)] + + with patch("fastdeploy.model_executor.layers.attention.attention.Attention", MagicMock()): + result = patch_paddlefleet_core_attention(model=model, fd_config=MagicMock()) + assert result == 1 + + def test_no_transformer_layer_at_all_raises(self): + """Lines 656-657: No TransformerLayer found anywhere -> ValueError.""" + model = MagicMock() + del model.run_function + model.named_sublayers.return_value = [] + + with pytest.raises(ValueError, match="No TransformerLayer found in model"): + patch_paddlefleet_core_attention(model=model, fd_config=MagicMock()) + + def test_layer_number_not_found_skip(self): + """Lines 665, 668-669: Layer without layer_number or layer_id is skipped.""" + model = MagicMock() + layer = MagicMock() + type(layer).__name__ = "TransformerLayer" + del layer.layer_number + del layer.layer_id + layer.self_attn = MagicMock() + layer.self_attn.core_attention = MagicMock() + model.run_function = [layer] + + result = patch_paddlefleet_core_attention(model=model, fd_config=MagicMock()) + assert result == 0 + + def test_layers_to_patch_filter(self): + """Lines 672-673: layers_to_patch excludes layers not in the list.""" + model = MagicMock() + layer = MagicMock() + type(layer).__name__ = "TransformerLayer" + layer.layer_number = 1 + layer.self_attn = MagicMock() + core_attn = MagicMock() + core_attn.num_attention_heads_per_partition = 8 + core_attn.num_query_groups_per_partition = 2 + core_attn.hidden_size_per_attention_head = 64 + core_attn.hidden_size_per_partition = 512 + core_attn.config = MagicMock() + layer.self_attn.core_attention = core_attn + model.run_function = [layer] + + # Layer 1 not in [2, 3] -> skipped + result = patch_paddlefleet_core_attention(model=model, fd_config=MagicMock(), layers_to_patch=[2, 3]) + assert result == 0 + + def test_no_self_attn_skip(self): + """Lines 677-678: Layer without self_attn is skipped.""" + model = MagicMock() + layer = MagicMock() + type(layer).__name__ = "TransformerLayer" + layer.layer_number = 1 + del layer.self_attn + model.run_function = [layer] + + result = patch_paddlefleet_core_attention(model=model, fd_config=MagicMock()) + assert result == 0 + + def test_core_attn_none_skip(self): + """Lines 682-683: Layer with core_attention=None is skipped.""" + model = MagicMock() + layer = MagicMock() + type(layer).__name__ = "TransformerLayer" + layer.layer_number = 1 + layer.self_attn = MagicMock() + layer.self_attn.core_attention = None + model.run_function = [layer] + + result = patch_paddlefleet_core_attention(model=model, fd_config=MagicMock()) + assert result == 0 + + def test_no_hidden_size_per_attention_head(self): + """Lines 699-700: hidden_size_per_attention_head is None -> softmax_scale = 1.0.""" + model = MagicMock() + layer = MagicMock() + type(layer).__name__ = "TransformerLayer" + layer.layer_number = 1 + layer.self_attn = MagicMock() + core_attn = MagicMock() + core_attn.num_attention_heads_per_partition = 8 + core_attn.num_query_groups_per_partition = 2 + core_attn.hidden_size_per_attention_head = None + core_attn.hidden_size_per_partition = 512 + core_attn.config = MagicMock() + layer.self_attn.core_attention = core_attn + model.run_function = [layer] + + with patch("fastdeploy.model_executor.layers.attention.attention.Attention", MagicMock()): + result = patch_paddlefleet_core_attention(model=model, fd_config=MagicMock()) + assert result == 1 + + def test_no_hidden_size_per_partition(self): + """Lines 703-705: hidden_size_per_partition is None -> computed from heads * head_dim.""" + model = MagicMock() + layer = MagicMock() + type(layer).__name__ = "TransformerLayer" + layer.layer_number = 1 + layer.self_attn = MagicMock() + core_attn = MagicMock() + core_attn.num_attention_heads_per_partition = 8 + core_attn.num_query_groups_per_partition = 2 + core_attn.hidden_size_per_attention_head = 64 + core_attn.hidden_size_per_partition = None + core_attn.config = MagicMock() + layer.self_attn.core_attention = core_attn + model.run_function = [layer] + + with patch("fastdeploy.model_executor.layers.attention.attention.Attention", MagicMock()): + result = patch_paddlefleet_core_attention(model=model, fd_config=MagicMock()) + assert result == 1 + + +# ============================================================================ +# Tests for FastDeployAttention.forward MLA branch +# ============================================================================ + + +class TestFastDeployAttentionMLA: + """Test FastDeployAttention.forward MLA (Multi-Latent Attention) branch.""" + + def test_mla_prefill_only(self): + """Lines 166-197: MLA with prefill only (need_do_prefill=True, need_do_decode=False).""" + kv_lora_rank, v_head_dim, num_heads = 4, 2, 2 + attn, mock_fd_attention = _create_mla_attention(kv_lora_rank, v_head_dim, num_heads) + forward_meta = _create_mock_forward_meta(prefill_tokens=4, decode_tokens=0) + attn.config.forward_meta = forward_meta + + seq_len = 4 + query = paddle.randn([seq_len, num_heads, kv_lora_rank]) + key = paddle.randn([seq_len, num_heads, kv_lora_rank]) + value = paddle.randn([seq_len, num_heads, kv_lora_rank]) + kv_compressed = paddle.randn([1, seq_len, num_heads, kv_lora_rank]) + k_pos_emb = paddle.randn([1, seq_len, num_heads, kv_lora_rank]) + + # Mock fd_attention.forward to return 3D output for prefill + prefill_output = paddle.randn([seq_len, num_heads, kv_lora_rank]) + mock_fd_attention.forward.return_value = prefill_output + + result = attn.forward( + query=query, + key=key, + value=value, + attention_mask=None, + kv_compressed=kv_compressed, + k_pos_emb=k_pos_emb, + ) + assert result is not None + assert result.shape[0] == 1 # unsqueeze(0) + mock_fd_attention.forward.assert_called_once() + + def test_mla_decode_only(self): + """Lines 199-248: MLA with decode only (need_do_prefill=False, need_do_decode=True). + + Covers: q_absorbed 3D handling, V de-absorption, else branch (output=fmqa_out). + """ + kv_lora_rank, v_head_dim, num_heads = 4, 2, 2 + attn, mock_fd_attention = _create_mla_attention(kv_lora_rank, v_head_dim, num_heads) + forward_meta = _create_mock_forward_meta(prefill_tokens=0, decode_tokens=2) + attn.config.forward_meta = forward_meta + + seq_len = 2 + query = paddle.randn([seq_len, num_heads, kv_lora_rank]) + key = paddle.randn([seq_len, num_heads, kv_lora_rank]) + value = paddle.randn([seq_len, num_heads, kv_lora_rank]) + kv_compressed = paddle.randn([1, seq_len, num_heads, kv_lora_rank]) + k_pos_emb = paddle.randn([1, seq_len, num_heads, kv_lora_rank]) + q_absorbed = paddle.randn([seq_len, num_heads, kv_lora_rank + 2]) + v_b_proj_weight = paddle.randn([num_heads, kv_lora_rank, v_head_dim]) + + decode_output = paddle.randn([seq_len, num_heads * kv_lora_rank]) + mock_fd_attention.forward.return_value = decode_output + + result = attn.forward( + query=query, + key=key, + value=value, + attention_mask=None, + kv_compressed=kv_compressed, + k_pos_emb=k_pos_emb, + q_absorbed=q_absorbed, + v_b_proj_weight=v_b_proj_weight, + ) + assert result is not None + assert result.shape[0] == 1 # unsqueeze(0) + + def test_mla_decode_4d_q_absorbed(self): + """Line 203: q_absorbed with 4D input (batch=1) -> squeeze_to_3d path.""" + kv_lora_rank, v_head_dim, num_heads = 4, 2, 2 + attn, mock_fd_attention = _create_mla_attention(kv_lora_rank, v_head_dim, num_heads) + forward_meta = _create_mock_forward_meta(prefill_tokens=0, decode_tokens=1) + attn.config.forward_meta = forward_meta + + seq_len = 1 + query = paddle.randn([seq_len, num_heads, kv_lora_rank]) + key = paddle.randn([seq_len, num_heads, kv_lora_rank]) + value = paddle.randn([seq_len, num_heads, kv_lora_rank]) + kv_compressed = paddle.randn([1, seq_len, num_heads, kv_lora_rank]) + k_pos_emb = paddle.randn([1, seq_len, num_heads, kv_lora_rank]) + # 4D q_absorbed with batch=1 -> triggers squeeze_to_3d + q_absorbed = paddle.randn([1, seq_len, num_heads, kv_lora_rank + 2]) + v_b_proj_weight = paddle.randn([num_heads, kv_lora_rank, v_head_dim]) + + decode_output = paddle.randn([seq_len, num_heads * kv_lora_rank]) + mock_fd_attention.forward.return_value = decode_output + + result = attn.forward( + query=query, + key=key, + value=value, + attention_mask=None, + kv_compressed=kv_compressed, + k_pos_emb=k_pos_emb, + q_absorbed=q_absorbed, + v_b_proj_weight=v_b_proj_weight, + ) + assert result is not None + + def test_mla_prefill_and_decode_merge_fallback(self): + """Lines 227-246: MLA with both prefill+decode, merge_prefill_decode_output ImportError. + + Covers the except branch: logger.warning + output = fmqa_out. + """ + kv_lora_rank, v_head_dim, num_heads = 4, 2, 2 + attn, mock_fd_attention = _create_mla_attention(kv_lora_rank, v_head_dim, num_heads) + forward_meta = _create_mock_forward_meta(prefill_tokens=2, decode_tokens=2) + attn.config.forward_meta = forward_meta + + seq_prefill, seq_decode = 2, 2 + total_seq = seq_prefill + seq_decode + query = paddle.randn([total_seq, num_heads, kv_lora_rank]) + key = paddle.randn([total_seq, num_heads, kv_lora_rank]) + value = paddle.randn([total_seq, num_heads, kv_lora_rank]) + kv_compressed = paddle.randn([1, total_seq, num_heads, kv_lora_rank]) + k_pos_emb = paddle.randn([1, total_seq, num_heads, kv_lora_rank]) + q_absorbed = paddle.randn([seq_decode, num_heads, kv_lora_rank + 2]) + v_b_proj_weight = paddle.randn([num_heads, kv_lora_rank, v_head_dim]) + + # fd_attention.forward called twice: prefill then decode + prefill_out = paddle.randn([seq_prefill, num_heads, kv_lora_rank]) + decode_out = paddle.randn([seq_decode, num_heads * kv_lora_rank]) + mock_fd_attention.forward.side_effect = [prefill_out, decode_out] + + # Mock merge_prefill_decode_output to raise ImportError + mock_gpu_ops = MagicMock() + mock_gpu_ops.merge_prefill_decode_output = MagicMock(side_effect=ImportError("not available")) + + with patch.dict("sys.modules", {"fastdeploy.model_executor.ops.gpu": mock_gpu_ops}): + result = attn.forward( + query=query, + key=key, + value=value, + attention_mask=None, + kv_compressed=kv_compressed, + k_pos_emb=k_pos_emb, + q_absorbed=q_absorbed, + v_b_proj_weight=v_b_proj_weight, + ) + assert result is not None + + def test_mla_prefill_and_decode_merge_success(self): + """Lines 228-243: MLA with both prefill+decode, merge_prefill_decode_output succeeds.""" + kv_lora_rank, v_head_dim, num_heads = 4, 2, 2 + attn, mock_fd_attention = _create_mla_attention(kv_lora_rank, v_head_dim, num_heads) + forward_meta = _create_mock_forward_meta(prefill_tokens=2, decode_tokens=2) + attn.config.forward_meta = forward_meta + + seq_prefill, seq_decode = 2, 2 + total_seq = seq_prefill + seq_decode + query = paddle.randn([total_seq, num_heads, kv_lora_rank]) + key = paddle.randn([total_seq, num_heads, kv_lora_rank]) + value = paddle.randn([total_seq, num_heads, kv_lora_rank]) + kv_compressed = paddle.randn([1, total_seq, num_heads, kv_lora_rank]) + k_pos_emb = paddle.randn([1, total_seq, num_heads, kv_lora_rank]) + q_absorbed = paddle.randn([seq_decode, num_heads, kv_lora_rank + 2]) + v_b_proj_weight = paddle.randn([num_heads, kv_lora_rank, v_head_dim]) + + prefill_out = paddle.randn([seq_prefill, num_heads, kv_lora_rank]) + decode_out = paddle.randn([seq_decode, num_heads * kv_lora_rank]) + mock_fd_attention.forward.side_effect = [prefill_out, decode_out] + + # Mock merge_prefill_decode_output to succeed (no-op) + mock_merge = MagicMock() + mock_gpu_ops = MagicMock() + mock_gpu_ops.merge_prefill_decode_output = mock_merge + + with patch.dict("sys.modules", {"fastdeploy.model_executor.ops.gpu": mock_gpu_ops}): + result = attn.forward( + query=query, + key=key, + value=value, + attention_mask=None, + kv_compressed=kv_compressed, + k_pos_emb=k_pos_emb, + q_absorbed=q_absorbed, + v_b_proj_weight=v_b_proj_weight, + ) + assert result is not None + mock_merge.assert_called_once() + + def test_mla_kv_compressed_none_raises(self): + """Line 180: kv_compressed=None in MLA mode -> AssertionError.""" + kv_lora_rank, v_head_dim, num_heads = 4, 2, 2 + attn, mock_fd_attention = _create_mla_attention(kv_lora_rank, v_head_dim, num_heads) + forward_meta = _create_mock_forward_meta(prefill_tokens=2, decode_tokens=0) + attn.config.forward_meta = forward_meta + + seq_len = 2 + query = paddle.randn([seq_len, num_heads, kv_lora_rank]) + key = paddle.randn([seq_len, num_heads, kv_lora_rank]) + value = paddle.randn([seq_len, num_heads, kv_lora_rank]) + + with pytest.raises(AssertionError, match="kv_compressed must be provided"): + attn.forward( + query=query, + key=key, + value=value, + attention_mask=None, + kv_compressed=None, + k_pos_emb=None, + ) + + +# ============================================================================ +# Tests for FastDeployAttention.forward edge cases +# ============================================================================ + + +class TestFastDeployAttentionEdgeCases: + """Test edge cases in FastDeployAttention.forward.""" + + def test_4d_query_batch_gt_1_raises(self): + """Lines 153-156: squeeze_to_3d with 4D input batch > 1 raises ValueError.""" + attn, _ = _create_standard_attention() + forward_meta = MagicMock() + attn.config.forward_meta = forward_meta + + # 4D query with batch=2 + query = paddle.randn([2, 5, 2, 64]) + key = paddle.randn([2, 5, 2, 64]) + value = paddle.randn([2, 5, 2, 64]) + + with pytest.raises(ValueError, match="batch size 2 not supported"): + attn.forward(query=query, key=key, value=value, attention_mask=None) + + def test_unexpected_ndim_raises(self): + """Line 160: squeeze_to_3d with unexpected ndim raises ValueError.""" + attn, _ = _create_standard_attention() + forward_meta = MagicMock() + attn.config.forward_meta = forward_meta + + # 2D query (ndim=2, not 3 or 4) + query = paddle.randn([5, 64]) + key = paddle.randn([5, 64]) + value = paddle.randn([5, 64]) + + with pytest.raises(ValueError, match="unexpected dims 2"): + attn.forward(query=query, key=key, value=value, attention_mask=None) + + def test_scale_restore_when_original_exists(self): + """Lines 276-277: fd_attention already has 'scale' -> restore original in finally.""" + attn, mock_fd_attention = _create_standard_attention() + # Set an existing scale on fd_attention (covers else branch in finally) + mock_fd_attention.scale = 0.5 + forward_meta = MagicMock() + attn.config.forward_meta = forward_meta + + seq_len = 3 + query = paddle.randn([seq_len, 2, 64]) + key = paddle.randn([seq_len, 2, 64]) + value = paddle.randn([seq_len, 2, 64]) + + # Mock fd_attention.forward to return standard output + output = paddle.randn([seq_len, 2 * 64]) + mock_fd_attention.forward.return_value = output + + attn.forward(query=query, key=key, value=value, attention_mask=None) + + # Verify original scale was restored (line 277) + assert mock_fd_attention.scale == 0.5 + + +# ============================================================================ +# Tests for PaddleFleetModelBase.forward branches (no GPU needed) +# ============================================================================ + + +class TestPaddleFleetModelBaseForward: + """Test uncovered forward branches using mock model instances.""" + + def test_forward_is_zero_size(self): + """Lines 530-534: Forward with is_zero_size=True returns empty tensor.""" + model = _create_mock_fleet_model_for_forward() + + forward_meta = MagicMock() + forward_meta.is_zero_size = True + + inputs = {"ids_remove_padding": paddle.zeros([0], dtype="int64")} + result = model.forward(inputs, forward_meta) + + assert result.shape[0] == 0 + assert result.shape[1] == model.model_config.hidden_size + + def test_forward_empty_ids(self): + """Lines 530-534: Forward with empty ids_remove_padding (shape[0]==0).""" + model = _create_mock_fleet_model_for_forward() + + forward_meta = MagicMock() + forward_meta.is_zero_size = False + + inputs = {"ids_remove_padding": paddle.zeros([0], dtype="int64")} + result = model.forward(inputs, forward_meta) + + assert result.shape[0] == 0 + + def test_forward_no_batch_id_per_token_with_seq_lens_decoder(self): + """Lines 558-561: Forward without batch_id_per_token, with seq_lens_decoder. + + Covers: position_ids = arange + seq_lens_decoder[0,0] + """ + num_tokens = 5 + model = _create_mock_fleet_model_for_forward(num_tokens=num_tokens) + + forward_meta = MagicMock() + forward_meta.is_zero_size = False + forward_meta.batch_id_per_token = None + forward_meta.seq_lens_decoder = paddle.to_tensor([[3]], dtype="int64") + forward_meta.cu_seqlens_q = None + + inputs = {"ids_remove_padding": paddle.to_tensor([1, 2, 3, 4, 5], dtype="int64")} + result = model.forward(inputs, forward_meta) + + assert result is not None + + def test_forward_no_batch_id_per_token_no_seq_lens_decoder(self): + """Lines 558-559: Forward without batch_id_per_token and no seq_lens_decoder. + + Covers: position_ids = arange only (line 559, no line 561). + """ + num_tokens = 5 + model = _create_mock_fleet_model_for_forward(num_tokens=num_tokens) + + forward_meta = MagicMock() + forward_meta.is_zero_size = False + forward_meta.batch_id_per_token = None + forward_meta.seq_lens_decoder = None + forward_meta.cu_seqlens_q = None + + inputs = {"ids_remove_padding": paddle.to_tensor([1, 2, 3, 4, 5], dtype="int64")} + result = model.forward(inputs, forward_meta) + + assert result is not None + + def test_forward_cu_seqlens_none(self): + """Line 556: Forward with batch_id_per_token set but cu_seqlens=None. + + Covers: relative_positions = paddle.zeros(...) + """ + num_tokens = 5 + model = _create_mock_fleet_model_for_forward(num_tokens=num_tokens) + + forward_meta = MagicMock() + forward_meta.is_zero_size = False + forward_meta.batch_id_per_token = paddle.to_tensor([0, 0, 0, 0, 0], dtype="int64") + forward_meta.seq_lens_decoder = paddle.to_tensor([[5]], dtype="int64") + forward_meta.cu_seqlens_q = None + + inputs = {"ids_remove_padding": paddle.to_tensor([1, 2, 3, 4, 5], dtype="int64")} + result = model.forward(inputs, forward_meta) + + assert result is not None From 80e85f8196eda2b1b6930b24a37f0bd61c0d07de Mon Sep 17 00:00:00 2001 From: xiaoguoguo626807 Date: Tue, 2 Jun 2026 19:16:24 +0800 Subject: [PATCH 4/5] recover review --- fastdeploy/model_executor/models/paddleformers/base_fleet.py | 2 +- tests/model_executor/fallback/conftest.py | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/fastdeploy/model_executor/models/paddleformers/base_fleet.py b/fastdeploy/model_executor/models/paddleformers/base_fleet.py index 1f57aebb75d..bc7540121a8 100644 --- a/fastdeploy/model_executor/models/paddleformers/base_fleet.py +++ b/fastdeploy/model_executor/models/paddleformers/base_fleet.py @@ -306,7 +306,7 @@ def __init__(self, fd_config: "FDConfig", **kwargs): self.paddleformers_config.moe_token_dispatcher_type = "deepep" # self.paddleformers_config.use_cpu_initialization = True self.paddleformers_config.gated_attention = getattr(self.paddleformers_config, "use_gated_attn", False) - if self.paddleformers_config.multi_latent_attention: + if getattr(self.paddleformers_config, "multi_latent_attention", False): self.paddleformers_config.qk_head_dim = ( self.paddleformers_config.qk_rope_head_dim + self.paddleformers_config.qk_nope_head_dim ) diff --git a/tests/model_executor/fallback/conftest.py b/tests/model_executor/fallback/conftest.py index dc93a1071a5..cef10938446 100644 --- a/tests/model_executor/fallback/conftest.py +++ b/tests/model_executor/fallback/conftest.py @@ -105,7 +105,7 @@ def pytest_collection_modifyitems(config, items): ) install_args = [sys.executable, "-m", "pip", "install"] + shlex.split(paddleformers_url) + ["--quiet"] subprocess.check_call(install_args) - print(f"[conftest] ✓ Installed paddleformers 1.1.0.dev20250507 from {paddleformers_url}") + print(f"[conftest] ✓ Installed paddleformers 1.1.0.dev20260507 from {paddleformers_url}") # Install paddlefleet (skip paddlepaddle dependency, use existing version) paddlefleet_url = os.getenv( From d4b793e2790edc3573eb4afbe12c1063da1aa785 Mon Sep 17 00:00:00 2001 From: xiaoguoguo626807 Date: Wed, 3 Jun 2026 10:03:53 +0800 Subject: [PATCH 5/5] add test --- scripts/.coveragerc | 1 - .../test_fallback_fleet_model_coverge.py | 577 ++++++++++++++++++ 2 files changed, 577 insertions(+), 1 deletion(-) diff --git a/scripts/.coveragerc b/scripts/.coveragerc index 16529bd8019..f5a48a3da0f 100644 --- a/scripts/.coveragerc +++ b/scripts/.coveragerc @@ -11,7 +11,6 @@ source = */site-packages/fastdeploy */lib/python3.10/site-packages/fastdeploy */fastdeploy - /workspace/FastDeploy/fastdeploy [report] exclude_lines = diff --git a/tests/model_executor/fallback/test_fallback_fleet_model_coverge.py b/tests/model_executor/fallback/test_fallback_fleet_model_coverge.py index 79d00596726..a9f5c35c46e 100644 --- a/tests/model_executor/fallback/test_fallback_fleet_model_coverge.py +++ b/tests/model_executor/fallback/test_fallback_fleet_model_coverge.py @@ -639,3 +639,580 @@ def test_forward_cu_seqlens_none(self): result = model.forward(inputs, forward_meta) assert result is not None + + def test_forward_cu_seqlens_not_none(self): + """Lines 542, 549-551: Forward with batch_id_per_token AND cu_seqlens_q set. + + Covers: + - decoder_offsets.ndim==0 reshape (line 542, scalar tensor case) + - cu_seqlens is not None branch: token_global_idx arange, index_select, relative_positions (lines 549-551) + """ + num_tokens = 3 + model = _create_mock_fleet_model_for_forward(num_tokens=num_tokens) + + forward_meta = MagicMock() + forward_meta.is_zero_size = False + forward_meta.batch_id_per_token = paddle.to_tensor([0, 0, 0], dtype="int64") + # Scalar (0-D) decoder offset to trigger ndim==0 reshape + forward_meta.seq_lens_decoder = paddle.to_tensor(2, dtype="int64").reshape([1, 1]) + # cu_seqlens_q: [0, 3] for 1 request of 3 tokens + forward_meta.cu_seqlens_q = paddle.to_tensor([0, 3], dtype="int64") + + inputs = {"ids_remove_padding": paddle.to_tensor([10, 20, 30], dtype="int64")} + result = model.forward(inputs, forward_meta) + + assert result is not None + + def test_forward_kwargs_forwarded_to_model_input(self): + """Lines 579-580: Extra kwargs (non-None) are forwarded into model_input dict.""" + hidden_size = 64 + num_tokens = 3 + model = _create_mock_fleet_model_for_forward(hidden_size=hidden_size, num_tokens=num_tokens) + + forward_meta = MagicMock() + forward_meta.is_zero_size = False + forward_meta.batch_id_per_token = None + forward_meta.seq_lens_decoder = None + forward_meta.cu_seqlens_q = None + + extra_tensor = paddle.ones([num_tokens], dtype="float32") + inputs = {"ids_remove_padding": paddle.to_tensor([1, 2, 3], dtype="int64")} + + # Replace run_function with a capturing layer that records model_input keys + captured = {} + + class CapturingLayer: + def __init__(self): + self.self_attn = MagicMock() + self.self_attn.core_attention = MagicMock() + self.self_attn.core_attention.config = MagicMock() + + def __call__(self, model_input, **kwargs): + captured.update(model_input) + if "hidden_states" not in model_input: + model_input["hidden_states"] = paddle.randn([1, num_tokens, hidden_size]) + return model_input + + model.model.run_function = [CapturingLayer()] + + result = model.forward(inputs, forward_meta, extra_key=extra_tensor, none_key=None) + + assert result is not None + # extra_key (non-None) should be forwarded; none_key (None) should be skipped + assert "extra_key" in captured + assert "none_key" not in captured + + def test_forward_gpt_lm_head_skipped_and_embedding_called(self): + """Lines 587-589: GPTLMHead layers are skipped; GPTEmbedding layers are called with decoder_input.""" + from paddlefleet.models.gpt.gpt_embedding import GPTEmbedding + from paddlefleet.models.gpt.lm_head import GPTLMHead + + hidden_size = 64 + num_tokens = 3 + model = _create_mock_fleet_model_for_forward(hidden_size=hidden_size, num_tokens=num_tokens) + + # Build a run_function with: GPTEmbedding mock, TransformerLayer mock, GPTLMHead mock + mock_embedding = MagicMock(spec=GPTEmbedding) + mock_embedding.return_value = {"hidden_states": paddle.randn([1, num_tokens, hidden_size])} + + mock_transformer = MagicMock() + + def transformer_call(model_input, **kwargs): + if "hidden_states" not in model_input: + model_input["hidden_states"] = paddle.randn([1, num_tokens, hidden_size]) + return model_input + + mock_transformer.__class__.__name__ = "TransformerLayer" + mock_transformer.side_effect = transformer_call + + mock_lm_head = MagicMock(spec=GPTLMHead) + + model.model.run_function = [mock_embedding, mock_transformer, mock_lm_head] + + forward_meta = MagicMock() + forward_meta.is_zero_size = False + forward_meta.batch_id_per_token = None + forward_meta.seq_lens_decoder = None + forward_meta.cu_seqlens_q = None + + inputs = {"ids_remove_padding": paddle.to_tensor([1, 2, 3], dtype="int64")} + result = model.forward(inputs, forward_meta) + + assert result is not None + # GPTEmbedding should be called with decoder_input kwarg + mock_embedding.assert_called_once() + call_kwargs = mock_embedding.call_args + assert "decoder_input" in call_kwargs.kwargs or (len(call_kwargs.args) > 1 and call_kwargs.args[1] is not None) + # GPTLMHead should NOT be called (skipped) + mock_lm_head.assert_not_called() + + +# ============================================================================ +# Tests for PaddleFleetModelBase utility methods +# ============================================================================ + + +class TestPaddleFleetModelBaseUtils: + """Test utility methods: compute_logits, embed_input_ids, load_weights, set_state_dict.""" + + def _make_model(self, hidden_size=32, vocab_size=100, ori_vocab_size=80): + """Create a minimal mock PaddleFleetModelBase for utility method testing.""" + from types import SimpleNamespace + + model = PaddleFleetModelBase.__new__(PaddleFleetModelBase) + model.model_config = SimpleNamespace( + hidden_size=hidden_size, + dtype="float32", + ori_vocab_size=ori_vocab_size, + ) + model.model = MagicMock() + return model + + def test_compute_logits_3d_output_squeezed(self): + """Lines 353-362: compute_logits squeezes 3D lm_head output and masks extended vocab.""" + model = self._make_model(hidden_size=32, vocab_size=100, ori_vocab_size=80) + num_tokens = 4 + + # lm_head returns 3D [num_tokens, 1, vocab_size] + mock_lm_head = MagicMock() + mock_lm_head.return_value = paddle.randn([num_tokens, 1, 100]) + model.model.get_lm_head.return_value = mock_lm_head + + hidden = paddle.randn([num_tokens, 32]) + logits = model.compute_logits(hidden) + + assert logits.ndim == 2 + assert logits.shape[0] == num_tokens + assert logits.shape[1] == 100 + # Extended vocab tokens should be -inf + import math + + assert math.isinf(float(logits[0, 80].numpy())) + + def test_compute_logits_2d_output_no_squeeze(self): + """Lines 353-362: compute_logits with 2D lm_head output (no squeeze needed).""" + model = self._make_model(hidden_size=32, vocab_size=100, ori_vocab_size=100) + num_tokens = 2 + + mock_lm_head = MagicMock() + # Return 2D directly (no squeeze branch) + mock_lm_head.return_value = paddle.randn([num_tokens, 100]) + model.model.get_lm_head.return_value = mock_lm_head + + hidden = paddle.randn([num_tokens, 32]) + logits = model.compute_logits(hidden) + + assert logits.ndim == 2 + assert logits.shape == [num_tokens, 100] + + def test_embed_input_ids_1d_input(self): + """Lines 493-503: embed_input_ids with 1D input_ids - unsqueeze then squeeze back.""" + model = self._make_model(hidden_size=16) + + embedding_out = paddle.randn([1, 5, 16]) # [batch=1, seq, hidden] + mock_embedding = MagicMock(return_value=embedding_out) + model.model.get_input_embeddings.return_value = mock_embedding + + input_ids = paddle.to_tensor([1, 2, 3, 4, 5], dtype="int64") # 1D + result = model.embed_input_ids(input_ids) + + # Should squeeze back to [5, 16] + assert result.ndim == 2 + assert result.shape[0] == 5 + assert result.shape[1] == 16 + + def test_embed_input_ids_with_embed_scale(self): + """Lines 505-507: embed_input_ids applies embed_scale when set.""" + model = self._make_model(hidden_size=8) + model.embed_scale = 2.0 + + base_out = paddle.ones([1, 3, 8]) + mock_embedding = MagicMock(return_value=base_out) + model.model.get_input_embeddings.return_value = mock_embedding + + input_ids = paddle.to_tensor([1, 2, 3], dtype="int64") + result = model.embed_input_ids(input_ids) + + # All values should be 2.0 (1.0 * scale=2.0) + assert float(result.numpy().mean()) == pytest.approx(2.0) + + def test_load_weights_is_noop(self): + """Lines 602-603: load_weights logs and returns without error.""" + model = self._make_model() + # Should not raise + model.load_weights(iter([("param", paddle.ones([2, 2]))])) + + def test_set_state_dict_delegates(self): + """Line 606: set_state_dict delegates to self.model.set_state_dict.""" + model = self._make_model() + state_dict = {"weight": paddle.ones([4, 4])} + model.set_state_dict(state_dict) + model.model.set_state_dict.assert_called_once_with(state_dict) + + +# ============================================================================ +# Tests for _sync_config_from_text_config +# ============================================================================ + + +class TestSyncConfigFromTextConfig: + """Test _sync_config_from_text_config field syncing logic (lines 463-489).""" + + def _make_model_for_sync(self): + from types import SimpleNamespace + + model = PaddleFleetModelBase.__new__(PaddleFleetModelBase) + model.model_config = SimpleNamespace() + model.paddleformers_config = SimpleNamespace() + return model + + def test_syncs_fields_when_mc_attribute_is_none(self): + """Lines 481-489: Fields not set on model_config are synced from paddleformers_config.""" + model = self._make_model_for_sync() + model.model_config.tie_word_embeddings = None + model.paddleformers_config.tie_word_embeddings = True + # Other fields: not present on mc at all (hasattr returns False) + + model._sync_config_from_text_config() + + assert model.model_config.tie_word_embeddings is True + + def test_does_not_overwrite_matching_value(self): + """Lines 486-488: If mc and tc values match, no overwrite occurs (value preserved).""" + model = self._make_model_for_sync() + model.model_config.rope_theta = 10000.0 + model.paddleformers_config.rope_theta = 10000.0 + + model._sync_config_from_text_config() + + assert model.model_config.rope_theta == 10000.0 + + def test_overwrites_differing_value(self): + """Lines 487-489: If values differ, tc value overwrites mc value.""" + model = self._make_model_for_sync() + model.model_config.sliding_window = 512 + model.paddleformers_config.sliding_window = 1024 + + model._sync_config_from_text_config() + + assert model.model_config.sliding_window == 1024 + + def test_skips_field_when_tc_value_is_none(self): + """Lines 483-484: If tc field is None, mc is not modified.""" + model = self._make_model_for_sync() + model.model_config.rms_norm_eps = 1e-5 + model.paddleformers_config.rms_norm_eps = None + + model._sync_config_from_text_config() + + assert model.model_config.rms_norm_eps == 1e-5 + + +# ============================================================================ +# Tests for FastDeployAttention.forward squeeze_to_3d None path (line 151) +# ============================================================================ + + +class TestSqueezeToThreeDNone: + """Line 151: squeeze_to_3d(None) returns None.""" + + def test_mla_decode_key_value_none(self): + """Line 151: key=None and value=None pass through squeeze_to_3d as None in MLA decode.""" + kv_lora_rank, v_head_dim, num_heads = 4, 2, 2 + attn, mock_fd_attention = _create_mla_attention(kv_lora_rank, v_head_dim, num_heads) + forward_meta = _create_mock_forward_meta(prefill_tokens=0, decode_tokens=1) + attn.config.forward_meta = forward_meta + + seq_len = 1 + query = paddle.randn([seq_len, num_heads, kv_lora_rank]) + kv_compressed = paddle.randn([1, seq_len, num_heads, kv_lora_rank]) + k_pos_emb = paddle.randn([1, seq_len, num_heads, kv_lora_rank]) + q_absorbed = paddle.randn([seq_len, num_heads, kv_lora_rank + 2]) + v_b_proj_weight = paddle.randn([num_heads, kv_lora_rank, v_head_dim]) + + decode_output = paddle.randn([seq_len, num_heads * kv_lora_rank]) + mock_fd_attention.forward.return_value = decode_output + + # key=None, value=None → squeeze_to_3d returns None (line 151) for both + result = attn.forward( + query=query, + key=None, + value=None, + attention_mask=None, + kv_compressed=kv_compressed, + k_pos_emb=k_pos_emb, + q_absorbed=q_absorbed, + v_b_proj_weight=v_b_proj_weight, + ) + assert result is not None + + +# ============================================================================ +# Tests for forward decoder_offsets 0-D scalar reshape (line 542) +# ============================================================================ + + +class TestForwardDecoderOffsets0D: + """Line 542: seq_lens_decoder with shape [1] → squeeze(-1) → 0-D scalar → reshape([1]).""" + + def test_decoder_offsets_0d_reshape(self): + """Line 542: 1-D seq_lens_decoder squeezed to 0-D triggers reshape([1]).""" + num_tokens = 2 + model = _create_mock_fleet_model_for_forward(num_tokens=num_tokens) + + forward_meta = MagicMock() + forward_meta.is_zero_size = False + forward_meta.batch_id_per_token = paddle.to_tensor([0, 0], dtype="int64") + # Shape [1]: after squeeze(-1) the single dim (size=1) is removed → 0-D scalar + forward_meta.seq_lens_decoder = paddle.to_tensor([3], dtype="int64") + forward_meta.cu_seqlens_q = None + + inputs = {"ids_remove_padding": paddle.to_tensor([1, 2], dtype="int64")} + result = model.forward(inputs, forward_meta) + + assert result is not None + + +# ============================================================================ +# Tests for _init_paddlefleet_parallel_state (lines 374-450) +# ============================================================================ + + +class TestInitPaddlefleetParallelState: + """Tests for _init_paddlefleet_parallel_state sub-branches.""" + + def _make_fd_config(self, tp_size=1): + fd_config = MagicMock() + fd_config.parallel_config.tensor_parallel_size = tp_size + fd_config.parallel_config.data_parallel_size = 1 + fd_config.parallel_config.expert_parallel_size = 1 + fd_config.parallel_config.sequence_parallel = False + return fd_config + + def test_tp1_group_none_creates_manual_group(self): + """Lines 415-438: _TENSOR_MODEL_PARALLEL_GROUP=None + TP=1 → manual group created.""" + import paddle.distributed as dist_module + import paddlefleet.parallel_state as ps + from paddlefleet.tensor_parallel import random as tp_random + + model = PaddleFleetModelBase.__new__(PaddleFleetModelBase) + fd_config = self._make_fd_config(tp_size=1) + + mock_fleet = MagicMock() + mock_new_group = MagicMock() + + original_group = ps._TENSOR_MODEL_PARALLEL_GROUP + try: + ps._TENSOR_MODEL_PARALLEL_GROUP = None + + with patch.object(dist_module, "fleet", mock_fleet): + with patch.object(dist_module, "get_rank", return_value=0): + with patch.object(dist_module, "new_group", return_value=mock_new_group): + with patch.object(tp_random, "model_parallel_cuda_manual_seed"): + model._init_paddlefleet_parallel_state(fd_config) + + assert ps._TENSOR_MODEL_PARALLEL_GROUP == mock_new_group + mock_fleet.init.assert_called_once() + finally: + ps._TENSOR_MODEL_PARALLEL_GROUP = original_group + + def test_tp_size_mismatch_calls_initialize_model_parallel(self): + """Lines 421-441: existing group size mismatches expected TP size → initialize_model_parallel.""" + import paddle.distributed as dist_module + import paddlefleet.parallel_state as ps + from paddlefleet.tensor_parallel import random as tp_random + + model = PaddleFleetModelBase.__new__(PaddleFleetModelBase) + # Expected TP=2, but mock current group has nranks=1 → mismatch + fd_config = self._make_fd_config(tp_size=2) + + mock_fleet = MagicMock() + mock_hcg = MagicMock() + mock_fleet.get_hybrid_communicate_group.return_value = mock_hcg + + mock_existing_group = MagicMock() + mock_existing_group.nranks = 1 # current size=1 ≠ expected=2 + + original_group = ps._TENSOR_MODEL_PARALLEL_GROUP + try: + ps._TENSOR_MODEL_PARALLEL_GROUP = mock_existing_group + + with patch.object(dist_module, "fleet", mock_fleet): + with patch.object(ps, "initialize_model_parallel") as mock_init_mp: + with patch.object(tp_random, "model_parallel_cuda_manual_seed"): + model._init_paddlefleet_parallel_state(fd_config) + + mock_init_mp.assert_called_once_with(mock_hcg) + finally: + ps._TENSOR_MODEL_PARALLEL_GROUP = original_group + + def test_seed_assertion_error_is_silenced(self): + """Lines 447-450: AssertionError from model_parallel_cuda_manual_seed is caught silently.""" + import paddle.distributed as dist_module + import paddlefleet.parallel_state as ps + from paddlefleet.tensor_parallel import random as tp_random + + model = PaddleFleetModelBase.__new__(PaddleFleetModelBase) + fd_config = self._make_fd_config(tp_size=1) + + mock_fleet = MagicMock() + mock_new_group = MagicMock() + + original_group = ps._TENSOR_MODEL_PARALLEL_GROUP + try: + ps._TENSOR_MODEL_PARALLEL_GROUP = None + + with patch.object(dist_module, "fleet", mock_fleet): + with patch.object(dist_module, "get_rank", return_value=0): + with patch.object(dist_module, "new_group", return_value=mock_new_group): + # Seed function raises AssertionError → should be silently ignored + with patch.object(tp_random, "model_parallel_cuda_manual_seed", side_effect=AssertionError): + model._init_paddlefleet_parallel_state(fd_config) # must not raise + finally: + ps._TENSOR_MODEL_PARALLEL_GROUP = original_group + + def test_group_size_via_world_size_fallback(self): + """Lines 423-424: nranks=None on existing group → fallback to world_size attribute.""" + import paddle.distributed as dist_module + import paddlefleet.parallel_state as ps + from paddlefleet.tensor_parallel import random as tp_random + + model = PaddleFleetModelBase.__new__(PaddleFleetModelBase) + fd_config = self._make_fd_config(tp_size=1) + + mock_fleet = MagicMock() + + # Group with no nranks but world_size=1 matching expected → need_init stays False + mock_existing_group = MagicMock(spec=[]) # no nranks, no world_size → both None + # Both None means current_tp_size=None ≠ 1, so need_init=True; fall into TP=1 branch + mock_new_group = MagicMock() + + original_group = ps._TENSOR_MODEL_PARALLEL_GROUP + try: + ps._TENSOR_MODEL_PARALLEL_GROUP = mock_existing_group + + with patch.object(dist_module, "fleet", mock_fleet): + with patch.object(dist_module, "get_rank", return_value=0): + with patch.object(dist_module, "new_group", return_value=mock_new_group): + with patch.object(tp_random, "model_parallel_cuda_manual_seed"): + model._init_paddlefleet_parallel_state(fd_config) + + assert ps._TENSOR_MODEL_PARALLEL_GROUP == mock_new_group + finally: + ps._TENSOR_MODEL_PARALLEL_GROUP = original_group + + +# ============================================================================ +# Tests for PaddleFleetModelBase.__init__ (lines 285-349) +# ============================================================================ + + +class TestPaddleFleetModelBaseInit: + """Tests for PaddleFleetModelBase.__init__ with all external deps mocked.""" + + def _make_fd_config(self, multi_latent_attention=False): + fd_config = MagicMock() + fd_config.model_config.model = "test_model" + fd_config.model_config.max_model_len = 4096 + fd_config.model_config.dtype = "bfloat16" + fd_config.parallel_config.data_parallel_size = 1 + fd_config.parallel_config.tensor_parallel_size = 1 + fd_config.parallel_config.sequence_parallel = False + fd_config.parallel_config.expert_parallel_size = 1 + # Prevent decorator (support_graph_optimization line 56) from failing on + # MagicMock > int comparison: set concrete int values. + fd_config.graph_opt_config.graph_opt_level = 0 + fd_config.graph_opt_config.use_cudagraph = False + return fd_config + + def test_init_standard_model(self): + """Lines 285-349: __init__ basic path (multi_latent_attention=False).""" + import fastdeploy.model_executor.models.paddleformers.base_fleet as bf_mod + + fd_config = self._make_fd_config() + + mock_pf_config = MagicMock() + mock_pf_config.tensor_model_parallel_size = 1 + mock_pf_config.multi_latent_attention = False + + mock_model = MagicMock() + + with ( + patch.object(bf_mod, "AutoConfig") as mock_ac, + patch.object(bf_mod, "AutoModelForCausalLM") as mock_am, + patch.object(bf_mod, "patch_paddlefleet_core_attention", return_value=2), + patch.object(PaddleFleetModelBase, "_init_paddlefleet_parallel_state"), + patch.object(PaddleFleetModelBase, "_sync_config_from_text_config"), + patch.object(paddle.nn.Layer, "__init__", lambda self, *a, **kw: None), + ): + + mock_ac.from_pretrained.return_value = mock_pf_config + mock_am.from_pretrained.return_value = mock_model + + model = object.__new__(PaddleFleetModelBase) + PaddleFleetModelBase.__init__(model, fd_config) + + assert model.fd_config is fd_config + assert model.paddleformers_config is mock_pf_config + mock_model.eval.assert_called_once() + + def test_init_mla_model_computes_qk_head_dim(self): + """Lines 309-312: multi_latent_attention=True → qk_head_dim computed from rope+nope.""" + import fastdeploy.model_executor.models.paddleformers.base_fleet as bf_mod + + fd_config = self._make_fd_config() + + mock_pf_config = MagicMock() + mock_pf_config.tensor_model_parallel_size = 1 + mock_pf_config.multi_latent_attention = True + mock_pf_config.qk_rope_head_dim = 64 + mock_pf_config.qk_nope_head_dim = 128 + + mock_model = MagicMock() + + with ( + patch.object(bf_mod, "AutoConfig") as mock_ac, + patch.object(bf_mod, "AutoModelForCausalLM") as mock_am, + patch.object(bf_mod, "patch_paddlefleet_core_attention", return_value=0), + patch.object(PaddleFleetModelBase, "_init_paddlefleet_parallel_state"), + patch.object(PaddleFleetModelBase, "_sync_config_from_text_config"), + patch.object(paddle.nn.Layer, "__init__", lambda self, *a, **kw: None), + ): + + mock_ac.from_pretrained.return_value = mock_pf_config + mock_am.from_pretrained.return_value = mock_model + + model = object.__new__(PaddleFleetModelBase) + PaddleFleetModelBase.__init__(model, fd_config) + + # qk_head_dim should be set to rope+nope + assert mock_pf_config.qk_head_dim == 64 + 128 + + +# ============================================================================ +# Tests for model_base._try_resolve_paddleformers paddlefleet error branch +# ============================================================================ + + +class TestTryResolvePaddlefleetImportError: + """Test model_base.py line 203-209: paddlefleet not installed raises ImportError.""" + + def test_paddlefleet_not_installed_raises_import_error(self): + """Lines 203-209 (model_base.py): model_impl='paddlefleet' + paddlefleet unavailable -> ImportError.""" + from fastdeploy.model_executor.models.model_base import ModelRegistry + + runner = ModelRegistry.__new__(ModelRegistry) + + mock_model_config = MagicMock() + mock_model_config.model_impl = "paddlefleet" + + with patch( + "fastdeploy.model_executor.utils.is_paddlefleet_available", + return_value=False, + ): + with pytest.raises(ImportError, match="paddlefleet backend requires paddlefleet"): + runner._try_resolve_paddleformers( + architecture="SomeModel", + model_config=mock_model_config, + is_fallback=False, + )