diff --git a/fastdeploy/config.py b/fastdeploy/config.py index b15a6dc824b..8b92a138a34 100644 --- a/fastdeploy/config.py +++ b/fastdeploy/config.py @@ -1980,6 +1980,7 @@ def expand_bsz_map(real_bsz_to_captured_size): int(envs.ENABLE_V1_KVCACHE_SCHEDULER) == 0 and self.model_config is not None and self.model_config.enable_mm + and self.deploy_modality != DeployModality.TEXT ): self.max_prefill_batch = 1 # TODO:当前V0多模prefill阶段只支持并行度为1,待优化 else: @@ -2019,6 +2020,20 @@ def expand_bsz_map(real_bsz_to_captured_size): self.check() # self.print() # NOTE: it's better to explicitly call .print() when FDConfig is initialized + @property + def enable_mm_runtime(self) -> bool: + return ( + self.model_config is not None + and self.model_config.enable_mm + and self.deploy_modality != DeployModality.TEXT + ) + + @property + def enable_rope_3d_runtime(self) -> bool: + return self.enable_mm_runtime and ( + getattr(self.model_config, "rope_3d", False) or getattr(self.model_config, "use_3d_rope", False) + ) + def _disable_sequence_parallel_moe_if_needed(self, mode_name): if self.parallel_config.use_sequence_parallel_moe and self.graph_opt_config.use_cudagraph: self.parallel_config.use_sequence_parallel_moe = False @@ -2057,9 +2072,21 @@ def postprocess(self): if self.long_prefill_token_threshold == 0: self.long_prefill_token_threshold = int(self.model_config.max_model_len * 0.04) + if ( + self.model_config is not None + and self.model_config.enable_mm + and self.deploy_modality == DeployModality.TEXT + ): + if getattr(self.model_config, "rope_3d", False) or getattr(self.model_config, "use_3d_rope", False): + logger.info( + "Deploy modality is text; forcing the multimodal-capable model onto the 2D RoPE runtime path." + ) + setattr(self.model_config, "rope_3d", False) + setattr(self.model_config, "use_3d_rope", False) + self.cache_config.max_block_num_per_seq = int(self.model_config.max_model_len // self.cache_config.block_size) self.cache_config.postprocess(self.get_max_chunk_tokens(), self.scheduler_config.max_num_seqs) - if self.model_config is not None and self.model_config.enable_mm and not envs.ENABLE_V1_KVCACHE_SCHEDULER: + if self.model_config is not None and self.enable_mm_runtime and not envs.ENABLE_V1_KVCACHE_SCHEDULER: self.cache_config.enable_prefix_caching = False if ( self.structured_outputs_config is not None @@ -2085,7 +2112,7 @@ def postprocess(self): f"Guided decoding backend '{self.structured_outputs_config.guided_decoding_backend}' is not implemented. [auto, xgrammar, guidance, off]" ) - if self.model_config.enable_mm: + if self.enable_mm_runtime: if self.cache_config.max_encoder_cache is None or self.cache_config.max_encoder_cache < 0: self.cache_config.max_encoder_cache = self.scheduler_config.max_num_batched_tokens elif self.cache_config.max_encoder_cache != 0: @@ -2392,7 +2419,7 @@ def get_max_chunk_tokens(self, mm_max_tokens_per_item=None): num_tokens = self.scheduler_config.max_num_seqs else: num_tokens = self.scheduler_config.max_num_batched_tokens - if mm_max_tokens_per_item is not None and self.deploy_modality != DeployModality.TEXT: + if self.enable_mm_runtime and mm_max_tokens_per_item is not None: max_mm_tokens = max( mm_max_tokens_per_item.get("image", 0), mm_max_tokens_per_item.get("video", 0), diff --git a/fastdeploy/engine/async_llm.py b/fastdeploy/engine/async_llm.py index 4afb3dc5c49..c06292ec981 100644 --- a/fastdeploy/engine/async_llm.py +++ b/fastdeploy/engine/async_llm.py @@ -294,6 +294,7 @@ def __init__(self, cfg, pid): cfg.limit_mm_per_prompt, cfg.mm_processor_kwargs, cfg.tool_parser, + enable_mm_runtime=cfg.enable_mm_runtime, ) # Create data processor self.data_processor = self.input_processor.create_processor() @@ -446,7 +447,7 @@ async def add_request( ) if envs.ZMQ_SEND_BATCH_DATA and self.connection_manager is not None: request["zmq_worker_pid"] = self.connection_manager.worker_pid - if self.cfg.model_config.enable_mm: + if self.cfg.enable_mm_runtime: self.request_client.send_pyobj(request) else: self.request_client.send_json(request) diff --git a/fastdeploy/engine/common_engine.py b/fastdeploy/engine/common_engine.py index f1152c6e22c..cd9e42f8bcf 100644 --- a/fastdeploy/engine/common_engine.py +++ b/fastdeploy/engine/common_engine.py @@ -330,6 +330,7 @@ def create_data_processor(self): self.cfg.limit_mm_per_prompt, self.cfg.mm_processor_kwargs, self.cfg.tool_parser, + enable_mm_runtime=self.cfg.enable_mm_runtime, ) self.data_processor = self.input_processor.create_processor() self.mm_max_tokens_per_item = self.data_processor.get_mm_max_tokens_per_item( @@ -601,7 +602,7 @@ def insert_tasks(self, tasks: List[Request], current_id=-1): LoggingEventName.RESCHEDULED_INFERENCE_START, task.request_id, getattr(task, "user", "") ) if not is_prefill: - if not self.cfg.model_config.enable_mm: + if not self.cfg.enable_mm_runtime: self.update_requests_chunk_size(tasks) else: self.update_mm_requests_chunk_size(tasks) @@ -1217,7 +1218,7 @@ def _insert_zmq_task_to_scheduler(self): while self.running: try: block = True if len(added_requests) == 0 else False - if not self.cfg.model_config.enable_mm: + if not self.cfg.enable_mm_runtime: err, data = self.recv_request_server.receive_json_once(block) else: err, data = self.recv_request_server.receive_pyobj_once(block) @@ -1275,6 +1276,7 @@ def _insert_zmq_task_to_scheduler(self): err_msg = None try: request = Request.from_dict(data) + request.metrics.scheduler_recv_req_time = time.time() main_process_metrics.requests_number.inc() trace_carrier = data.get("trace_carrier") @@ -2377,7 +2379,7 @@ def _setting_environ_variables(self): if self.cfg.scheduler_config.splitwise_role == "prefill": variables["FLAGS_fmt_write_cache_completed_signal"] = 1 - if self.cfg.model_config.enable_mm: + if self.cfg.enable_mm_runtime: variables["FLAGS_max_partition_size"] = 1024 command_prefix = "" diff --git a/fastdeploy/engine/sched/resource_manager_v1.py b/fastdeploy/engine/sched/resource_manager_v1.py index 80b58d68972..45ec18aa1c0 100644 --- a/fastdeploy/engine/sched/resource_manager_v1.py +++ b/fastdeploy/engine/sched/resource_manager_v1.py @@ -205,11 +205,11 @@ def __init__(self, max_num_seqs, config, tensor_parallel_size, splitwise_role, l self.need_block_num_map = dict() self.encoder_cache = None - if config.model_config.enable_mm and config.cache_config.max_encoder_cache > 0: + if config.enable_mm_runtime and config.cache_config.max_encoder_cache > 0: self.encoder_cache = EncoderCacheManager(config.cache_config.max_encoder_cache) self.processor_cache = None - if config.model_config.enable_mm and config.cache_config.max_processor_cache > 0: + if config.enable_mm_runtime and config.cache_config.max_processor_cache > 0: max_processor_cache_in_bytes = int(config.cache_config.max_processor_cache * 1024 * 1024 * 1024) self.processor_cache = ProcessorCacheManager(max_processor_cache_in_bytes) @@ -550,7 +550,7 @@ def _get_num_new_tokens(self, request, token_budget): num_new_tokens = token_budget // self.config.cache_config.block_size * self.config.cache_config.block_size request.with_image = False - if not self.config.model_config.enable_mm: + if not self.config.enable_mm_runtime: return num_new_tokens inputs = request.multimodal_inputs diff --git a/fastdeploy/entrypoints/engine_client.py b/fastdeploy/entrypoints/engine_client.py index f03a18594de..4c56e9bcd76 100644 --- a/fastdeploy/entrypoints/engine_client.py +++ b/fastdeploy/entrypoints/engine_client.py @@ -84,7 +84,7 @@ class EngineClient: def __init__(self, pid: int | str, port: int | str, fd_config: FDConfig, workers: int = 1, max_logprobs: int = 20): self.fd_config = fd_config self.tensor_parallel_size = self.fd_config.parallel_config.tensor_parallel_size - self.enable_mm = self.fd_config.model_config.enable_mm + self.enable_mm = self.fd_config.enable_mm_runtime self.max_logprobs = max_logprobs input_processor = InputPreprocessor( self.fd_config.model_config, @@ -93,6 +93,7 @@ def __init__(self, pid: int | str, port: int | str, fd_config: FDConfig, workers self.fd_config.mm_processor_kwargs, self.fd_config.tool_parser, self.enable_mm and self.fd_config.cache_config.max_processor_cache > 0, + enable_mm_runtime=self.enable_mm, ) self.enable_logprob = self.fd_config.model_config.enable_logprob self.data_processor = input_processor.create_processor() @@ -358,6 +359,7 @@ async def add_requests(self, task): task["max_tokens"] = min(self.max_model_len - input_ids_len, task.get("max_tokens")) min_tokens = task.get("min_tokens", 1) + if "messages" in task: task["messages"] = None api_server_logger.info(f"task['max_tokens']:{task['max_tokens']}") diff --git a/fastdeploy/input/preprocess.py b/fastdeploy/input/preprocess.py index 755f0612def..6467f6a89ac 100644 --- a/fastdeploy/input/preprocess.py +++ b/fastdeploy/input/preprocess.py @@ -48,6 +48,7 @@ def __init__( mm_processor_kwargs: Optional[Dict[str, Any]] = None, tool_parser: str = None, enable_processor_cache: bool = False, + enable_mm_runtime: Optional[bool] = None, ) -> None: self.model_config = model_config self.model_name_or_path = self.model_config.model @@ -56,6 +57,7 @@ def __init__( self.mm_processor_kwargs = mm_processor_kwargs self.tool_parser = tool_parser self.enable_processor_cache = enable_processor_cache + self.enable_mm_runtime = self.model_config.enable_mm if enable_mm_runtime is None else enable_mm_runtime def create_processor(self): reasoning_parser_obj = None @@ -77,10 +79,11 @@ def create_processor(self): reasoning_parser_obj=reasoning_parser_obj, tool_parser_obj=tool_parser_obj, mm_processor_kwargs=self.mm_processor_kwargs, + enable_mm_runtime=self.enable_mm_runtime, ) except Exception as e: logger.info(f"Plugin input processor not available ({e}), using built-in processor") - if not self.model_config.enable_mm: + if not self.enable_mm_runtime: from fastdeploy.input.text_processor import TextProcessor tokenizer_type = "ernie4_5" if ErnieArchitectures.contains_ernie_arch(architecture) else "auto" diff --git a/fastdeploy/inter_communicator/engine_worker_queue.py b/fastdeploy/inter_communicator/engine_worker_queue.py index b64fcacda33..a7876669f8f 100644 --- a/fastdeploy/inter_communicator/engine_worker_queue.py +++ b/fastdeploy/inter_communicator/engine_worker_queue.py @@ -549,7 +549,6 @@ def put_tasks(self, tasks: List[Any]) -> None: self.lock.release() time.sleep(0.001) self.lock.acquire() - if envs.FD_ENABLE_MAX_PREFILL or envs.FD_ENABLE_E2W_TENSOR_CONVERT: # multimodal input numpy -> tensor to_tensor(tasks[0]) @@ -571,7 +570,6 @@ def get_tasks(self) -> Tuple[List[Any], bool]: """ tasks: List[Any] = list() self.lock.acquire() - tasks.extend(self.tasks) self.client_read_flag[self.client_id] = 1 all_client_read: bool = np.sum(self.client_read_flag) == self.num_client diff --git a/fastdeploy/model_executor/layers/attention/append_attn_backend.py b/fastdeploy/model_executor/layers/attention/append_attn_backend.py index 81eab7cce86..15b657c249d 100644 --- a/fastdeploy/model_executor/layers/attention/append_attn_backend.py +++ b/fastdeploy/model_executor/layers/attention/append_attn_backend.py @@ -138,9 +138,7 @@ def __init__( self.rope_theta: float = ( 10000.0 if fd_config.model_config.rope_theta is None else fd_config.model_config.rope_theta ) - self.rope_3d: bool = getattr(fd_config.model_config, "rope_3d", False) or getattr( - fd_config.model_config, "use_3d_rope", False - ) + self.rope_3d: bool = fd_config.enable_rope_3d_runtime if fd_config.speculative_config.model_type != "main": self.rope_3d = False self.causal: bool = getattr(fd_config.model_config, "causal", True) diff --git a/fastdeploy/model_executor/layers/attention/dsa_attention_backend.py b/fastdeploy/model_executor/layers/attention/dsa_attention_backend.py index 4d6bbcdfb7d..66a92a52599 100644 --- a/fastdeploy/model_executor/layers/attention/dsa_attention_backend.py +++ b/fastdeploy/model_executor/layers/attention/dsa_attention_backend.py @@ -136,7 +136,7 @@ def __init__( self.rope_theta: float = ( 10000.0 if fd_config.model_config.rope_theta is None else fd_config.model_config.rope_theta ) - self.rope_3d: bool = getattr(fd_config.model_config, "rope_3d", False) + self.rope_3d: bool = fd_config.enable_rope_3d_runtime self.causal: bool = getattr(fd_config.model_config, "causal", True) self.speculative_method: str = fd_config.speculative_config.method self.use_speculate: bool = self.speculative_method is not None diff --git a/fastdeploy/model_executor/layers/attention/flash_attn_backend.py b/fastdeploy/model_executor/layers/attention/flash_attn_backend.py index b51dce1449d..bcffcd0bac0 100644 --- a/fastdeploy/model_executor/layers/attention/flash_attn_backend.py +++ b/fastdeploy/model_executor/layers/attention/flash_attn_backend.py @@ -267,9 +267,7 @@ def __init__( self.rank, self.device_id = init_rank_and_device_id(fd_config) - self.rope_3d: bool = getattr(fd_config.model_config, "rope_3d", False) or getattr( - fd_config.model_config, "use_3d_rope", False - ) + self.rope_3d: bool = fd_config.enable_rope_3d_runtime if fd_config.speculative_config.model_type != "main": self.rope_3d = False # Note(ZKK): here must be consistent with append_attn_backend.py diff --git a/fastdeploy/model_executor/layers/attention/flash_mask_attn_backend.py b/fastdeploy/model_executor/layers/attention/flash_mask_attn_backend.py index 35d27504ab5..6ebea2cb3d9 100644 --- a/fastdeploy/model_executor/layers/attention/flash_mask_attn_backend.py +++ b/fastdeploy/model_executor/layers/attention/flash_mask_attn_backend.py @@ -121,9 +121,7 @@ def __init__( self.rank, self.device_id = init_rank_and_device_id(fd_config) - self.rope_3d: bool = getattr(fd_config.model_config, "rope_3d", False) or getattr( - fd_config.model_config, "use_3d_rope", False - ) + self.rope_3d: bool = fd_config.enable_rope_3d_runtime if fd_config.speculative_config.model_type != "main": self.rope_3d = False self.max_partition_size: int = int(os.getenv("FLAGS_max_partition_size", "32768")) diff --git a/fastdeploy/model_executor/layers/attention/mla_attention_backend.py b/fastdeploy/model_executor/layers/attention/mla_attention_backend.py index 61ccc4e16e7..209817e69a2 100644 --- a/fastdeploy/model_executor/layers/attention/mla_attention_backend.py +++ b/fastdeploy/model_executor/layers/attention/mla_attention_backend.py @@ -263,7 +263,7 @@ def __init__( self.rope_theta: float = ( 10000.0 if fd_config.model_config.rope_theta is None else fd_config.model_config.rope_theta ) - self.rope_3d: bool = getattr(fd_config.model_config, "rope_3d", False) + self.rope_3d: bool = fd_config.enable_rope_3d_runtime self.causal: bool = getattr(fd_config.model_config, "causal", True) self.speculative_method = fd_config.speculative_config.method self.use_speculate: bool = self.speculative_method is not None diff --git a/fastdeploy/model_executor/layers/backends/iluvatar/attention/mha_attn_backend.py b/fastdeploy/model_executor/layers/backends/iluvatar/attention/mha_attn_backend.py index 092912149a9..d01973f80d0 100644 --- a/fastdeploy/model_executor/layers/backends/iluvatar/attention/mha_attn_backend.py +++ b/fastdeploy/model_executor/layers/backends/iluvatar/attention/mha_attn_backend.py @@ -89,7 +89,7 @@ def __init__( # note: scale need to change if using MLA self.scale = 1.0 / sqrt(head_dim) self.dtype = paddle.get_default_dtype() - self.enable_mm = fd_config.model_config.enable_mm + self.enable_mm = fd_config.enable_mm_runtime self.rope_batch_stride = self.max_context_len * self.head_dim if self.enable_mm else 0 if "paddleocr" in fd_config.model_config.model_type: self.is_interleaved_rope_mode = False diff --git a/fastdeploy/model_executor/layers/backends/intel_hpu/attention/hpu_attn_backend.py b/fastdeploy/model_executor/layers/backends/intel_hpu/attention/hpu_attn_backend.py index 82938c87367..bd2d8505228 100644 --- a/fastdeploy/model_executor/layers/backends/intel_hpu/attention/hpu_attn_backend.py +++ b/fastdeploy/model_executor/layers/backends/intel_hpu/attention/hpu_attn_backend.py @@ -219,7 +219,7 @@ def __init__( self.block_size = llm_config.cache_config.block_size self.max_seq_len = llm_config.model_config.max_model_len self.rope_theta = 10000.0 if llm_config.model_config.rope_theta is None else llm_config.model_config.rope_theta - self.rope_3d = getattr(llm_config.model_config, "rope_3d", False) + self.rope_3d = llm_config.enable_rope_3d_runtime self.causal = getattr(llm_config.model_config, "causal", True) self.speculative_method = llm_config.speculative_config.method self.use_speculate: bool = self.speculative_method is not None diff --git a/fastdeploy/model_executor/layers/backends/metax/attention/flash_attn_backend.py b/fastdeploy/model_executor/layers/backends/metax/attention/flash_attn_backend.py index 74fc27f67b4..0fd3553fda9 100644 --- a/fastdeploy/model_executor/layers/backends/metax/attention/flash_attn_backend.py +++ b/fastdeploy/model_executor/layers/backends/metax/attention/flash_attn_backend.py @@ -101,7 +101,7 @@ def __init__( self.rope_theta: float = ( 10000.0 if fd_config.model_config.rope_theta is None else fd_config.model_config.rope_theta ) - self.rope_3d: bool = getattr(fd_config.model_config, "rope_3d", False) + self.rope_3d: bool = fd_config.enable_rope_3d_runtime self.causal: bool = getattr(fd_config.model_config, "causal", True) self.speculative_method = fd_config.speculative_config.method self.use_speculate: bool = self.speculative_method is not None @@ -128,7 +128,7 @@ def __init__( fd_config.parallel_config.expert_parallel_rank = 0 self.rank, self.device_id = init_rank_and_device_id(fd_config) - self.enable_mm = fd_config.model_config.enable_mm + self.enable_mm = fd_config.enable_mm_runtime self.model_type = fd_config.model_config.model_type self.is_neox_style = False if "paddleocr" in fd_config.model_config.model_type: diff --git a/fastdeploy/model_executor/layers/backends/metax/attention/mla_attn_metax_backend.py b/fastdeploy/model_executor/layers/backends/metax/attention/mla_attn_metax_backend.py index c905086f9f7..dcd5589f0d8 100644 --- a/fastdeploy/model_executor/layers/backends/metax/attention/mla_attn_metax_backend.py +++ b/fastdeploy/model_executor/layers/backends/metax/attention/mla_attn_metax_backend.py @@ -105,7 +105,7 @@ def __init__( self.rope_theta: float = ( 10000.0 if fd_config.model_config.rope_theta is None else fd_config.model_config.rope_theta ) - self.rope_3d: bool = getattr(fd_config.model_config, "rope_3d", False) + self.rope_3d: bool = fd_config.enable_rope_3d_runtime self.causal: bool = getattr(fd_config.model_config, "causal", True) self.speculative_method = fd_config.speculative_config.method self.use_speculate: bool = self.speculative_method is not None diff --git a/fastdeploy/model_executor/layers/backends/xpu/attention.py b/fastdeploy/model_executor/layers/backends/xpu/attention.py index 85565d33efb..31fce9bdf51 100644 --- a/fastdeploy/model_executor/layers/backends/xpu/attention.py +++ b/fastdeploy/model_executor/layers/backends/xpu/attention.py @@ -88,9 +88,7 @@ def __init__( self.rope_theta: float = ( 10000.0 if fd_config.model_config.rope_theta is None else fd_config.model_config.rope_theta ) - self.rope_3d: bool = getattr(fd_config.model_config, "rope_3d", False) or getattr( - fd_config.model_config, "use_3d_rope", False - ) + self.rope_3d: bool = fd_config.enable_rope_3d_runtime self.causal: bool = getattr(fd_config.model_config, "causal", True) self.keep_pd_step_flag: bool = fd_config.speculative_config.model_type == "mtp" self.num_layers_draft_model: int = int(fd_config.speculative_config.method == SpecMethod.MTP) diff --git a/fastdeploy/output/token_processor.py b/fastdeploy/output/token_processor.py index 1ab0b48f350..85e54647b7e 100644 --- a/fastdeploy/output/token_processor.py +++ b/fastdeploy/output/token_processor.py @@ -947,6 +947,7 @@ def _process_batch_output(self): if not is_prefill: self._record_completion_metrics(task, current_time) llm_logger.info(f"task {task_id} received eos token. Recycling.") + if ( envs.ENABLE_V1_KVCACHE_SCHEDULER and self.cfg.cache_config.enable_prefix_caching diff --git a/fastdeploy/spec_decode/base.py b/fastdeploy/spec_decode/base.py index fa50eae462a..8db764fcf12 100644 --- a/fastdeploy/spec_decode/base.py +++ b/fastdeploy/spec_decode/base.py @@ -71,7 +71,7 @@ def __init__(self, fd_config: "FDConfig"): self.max_ngram_size = self.speculative_config.max_ngram_size self.min_ngram_size = self.speculative_config.min_ngram_size - self.enable_mm = self.model_config.enable_mm + self.enable_mm = self.fd_config.enable_mm_runtime spec_logger.info(f"Speculate config: {self.speculative_config}") diff --git a/fastdeploy/spec_decode/mtp.py b/fastdeploy/spec_decode/mtp.py index 4ec57e93594..9ca5f535ab0 100644 --- a/fastdeploy/spec_decode/mtp.py +++ b/fastdeploy/spec_decode/mtp.py @@ -103,7 +103,7 @@ def __init__( self.num_main_model_layers = self.model_config.num_hidden_layers self.local_rank = local_rank self.device_id = device_id - self.use_attn_mask_offset = self.enable_mm and self.fd_config.deploy_modality != "text" + self.use_attn_mask_offset = self.enable_mm self._update_mtp_config(main_model) self._load_model() diff --git a/fastdeploy/worker/gcu_model_runner.py b/fastdeploy/worker/gcu_model_runner.py index 44a8c5f3578..284cee8843d 100644 --- a/fastdeploy/worker/gcu_model_runner.py +++ b/fastdeploy/worker/gcu_model_runner.py @@ -62,7 +62,7 @@ def __init__( local_rank: int, ): super().__init__(fd_config=fd_config, device=device) - self.enable_mm = self.model_config.enable_mm + self.enable_mm = self.fd_config.enable_mm_runtime self.rank = rank self.local_rank = local_rank self.device_id = device_id diff --git a/fastdeploy/worker/gpu_model_runner.py b/fastdeploy/worker/gpu_model_runner.py index bc315c3646b..2454b016209 100644 --- a/fastdeploy/worker/gpu_model_runner.py +++ b/fastdeploy/worker/gpu_model_runner.py @@ -119,7 +119,7 @@ def __init__( ): super().__init__(fd_config=fd_config, device=device) self.MAX_INFER_SEED = 9223372036854775806 - self.enable_mm = self.model_config.enable_mm + self.enable_mm = self.fd_config.enable_mm_runtime self.rank = rank self.local_rank = local_rank self.device_id = device_id @@ -1118,10 +1118,12 @@ def _dummy_prefill_inputs(self, input_length_list: List[int], max_dec_len_list: def _prepare_inputs(self, cached_token_num=-1, cached_real_bsz=-1, is_dummy_or_profile_run=False) -> None: """Prepare the model inputs""" + if self.enable_mm and self.share_inputs["image_features_list"] is not None: tensor_feats = [t for t in self.share_inputs["image_features_list"] if isinstance(t, paddle.Tensor)] if tensor_feats: self.share_inputs["image_features"] = paddle.concat(tensor_feats, axis=0) + recover_decode_task( self.share_inputs["stop_flags"], self.share_inputs["seq_lens_this_time"], diff --git a/fastdeploy/worker/iluvatar_worker.py b/fastdeploy/worker/iluvatar_worker.py index 625aca86db1..44be900bb73 100644 --- a/fastdeploy/worker/iluvatar_worker.py +++ b/fastdeploy/worker/iluvatar_worker.py @@ -40,7 +40,7 @@ def __init__( local_rank: int, rank: int, ): - if fd_config.model_config.enable_mm: + if fd_config.enable_mm_runtime: paddle.set_flags({"FLAGS_enable_ixattnbkd": True, "FLAGS_enable_ixdnn_attn": False}) super(IluvatarWorker, self).__init__( fd_config=fd_config, diff --git a/fastdeploy/worker/input_batch.py b/fastdeploy/worker/input_batch.py index 363dfb63097..55a3f39a2ee 100644 --- a/fastdeploy/worker/input_batch.py +++ b/fastdeploy/worker/input_batch.py @@ -17,13 +17,7 @@ import paddle from paddleformers.utils.log import logger -from fastdeploy.config import ( - CacheConfig, - DeployModality, - FDConfig, - ModelConfig, - SpeculativeConfig, -) +from fastdeploy.config import CacheConfig, FDConfig, ModelConfig, SpeculativeConfig from fastdeploy.model_executor.layers.rotary_embedding import get_rope from fastdeploy.model_executor.logits_processor import build_logits_processors from fastdeploy.platforms import current_platform @@ -101,7 +95,8 @@ def __init__(self, fd_config: FDConfig) -> None: self.scheduler_config = fd_config.scheduler_config self.speculative_config: SpeculativeConfig = fd_config.speculative_config self.speculative_decoding = self.speculative_config.method is not None - self.enable_mm = self.model_config.enable_mm + self.is_mm_model = self.model_config.enable_mm + self.enable_mm = fd_config.enable_mm_runtime self.enable_expert_parallel = fd_config.parallel_config.enable_expert_parallel self.index_to_batch_id = {} self.enable_pd_reorder = False @@ -231,6 +226,9 @@ def init_share_inputs(self): model_config=self.model_config, partial_rotary_factor=self.model_config.partial_rotary_factor, ) + if self.is_mm_model: + self.image_features = None + self.image_features_list = None # Set block tables pre_max_block_num = ( @@ -677,6 +675,9 @@ def reset_share_inputs(self): model_config=self.model_config, partial_rotary_factor=self.model_config.partial_rotary_factor, ) + if self.is_mm_model: + self.image_features = None + self.image_features_list = None # Reset other miscellaneous tensors fill_paddle_tensor(self, "mask_rollback", 0) @@ -689,7 +690,7 @@ def reset_share_inputs(self): class ProposerInputBatch(InputBatch): def __init__(self, fd_config: FDConfig, target_model_input_batch: InputBatch) -> None: - self.enable_mm = fd_config.model_config.enable_mm + self.enable_mm = fd_config.enable_mm_runtime self.num_model_steps = fd_config.speculative_config.num_model_steps self.index_to_batch_id = {} self.target_model_input_batch = target_model_input_batch @@ -863,18 +864,15 @@ def init_share_inputs(self): -1, dtype="int32", ) - if self.fd_config.deploy_modality != DeployModality.TEXT: - self.attn_mask_offsets = paddle.full( - shape=[self.scheduler_config.max_num_seqs * self.model_config.max_model_len], - fill_value=-1, - dtype="int32", - ) - self.attn_mask_offsets_full = paddle.full( - [self.scheduler_config.max_num_seqs, self.model_config.max_model_len], -1, dtype="int32" - ) - self.attn_mask_offsets_decoder = paddle.full( - [self.scheduler_config.max_num_seqs, 1], -1, dtype="int32" - ) + self.attn_mask_offsets = paddle.full( + shape=[self.scheduler_config.max_num_seqs * self.model_config.max_model_len], + fill_value=-1, + dtype="int32", + ) + self.attn_mask_offsets_full = paddle.full( + [self.scheduler_config.max_num_seqs, self.model_config.max_model_len], -1, dtype="int32" + ) + self.attn_mask_offsets_decoder = paddle.full([self.scheduler_config.max_num_seqs, 1], -1, dtype="int32") def swap_states(self, i1, i2) -> None: def swap_data(tensor, idx1, idx2): @@ -896,7 +894,7 @@ def swap_data(tensor, idx1, idx2): swap_data(self.input_ids_len, i1, i2) swap_data(self.mask_rollback, i1, i2) swap_data(self.recompute_token_num, i1, i2) - if self.enable_mm and self.fd_config.deploy_modality != DeployModality.TEXT: + if self.enable_mm: swap_data(self.attn_mask_offsets_full, i1, i2) swap_data(self.attn_mask_offsets_decoder, i1, i2) @@ -1030,10 +1028,9 @@ def reset_model_inputs(self) -> None: # Reset multimodal tensors if enabled if self.enable_mm: fill_paddle_tensor(self, "decode_states", -1) - if self.fd_config.deploy_modality != DeployModality.TEXT: - fill_paddle_tensor(self, "attn_mask_offsets", -1) - fill_paddle_tensor(self, "attn_mask_offsets_full", -1) - fill_paddle_tensor(self, "attn_mask_offsets_decoder", -1) + fill_paddle_tensor(self, "attn_mask_offsets", -1) + fill_paddle_tensor(self, "attn_mask_offsets_full", -1) + fill_paddle_tensor(self, "attn_mask_offsets_decoder", -1) logger.info("model_inputs reset completed") except Exception as e: diff --git a/fastdeploy/worker/metax_model_runner.py b/fastdeploy/worker/metax_model_runner.py index 93f5cec6a57..28c769e1166 100644 --- a/fastdeploy/worker/metax_model_runner.py +++ b/fastdeploy/worker/metax_model_runner.py @@ -97,7 +97,7 @@ def __init__( ): super().__init__(fd_config=fd_config, device=device) self.MAX_INFER_SEED = 9223372036854775806 - self.enable_mm = self.model_config.enable_mm + self.enable_mm = self.fd_config.enable_mm_runtime self.rank = rank self.local_rank = local_rank self.device_id = device_id diff --git a/fastdeploy/worker/worker_process.py b/fastdeploy/worker/worker_process.py index 8182e06990b..2f51359959f 100644 --- a/fastdeploy/worker/worker_process.py +++ b/fastdeploy/worker/worker_process.py @@ -138,7 +138,7 @@ def init_distributed_environment(seed: int = 20) -> Tuple[int, int]: def update_fd_config_for_mm(fd_config: FDConfig) -> None: architectures = fd_config.model_config.architectures - if fd_config.model_config.enable_mm and ErnieArchitectures.contains_ernie_arch(architectures): + if fd_config.enable_mm_runtime and ErnieArchitectures.contains_ernie_arch(architectures): fd_config.model_config.tensor_model_parallel_size = fd_config.parallel_config.tensor_parallel_size fd_config.model_config.tensor_parallel_rank = fd_config.parallel_config.tensor_parallel_rank fd_config.model_config.vision_config.dtype = fd_config.model_config.dtype @@ -506,7 +506,7 @@ def event_loop_normal(self) -> None: if tp_rank == 0: if self.task_queue.exist_tasks(): if envs.ENABLE_V1_KVCACHE_SCHEDULER or not ( - self.fd_config.model_config.enable_mm and self.worker.exist_prefill() + self.fd_config.enable_mm_runtime and self.worker.exist_prefill() ): if self.nnode > 1: self.task_queue.read_finish_flag.set(1) diff --git a/fastdeploy/worker/xpu_model_runner.py b/fastdeploy/worker/xpu_model_runner.py index 1446257d3ae..bd585519520 100644 --- a/fastdeploy/worker/xpu_model_runner.py +++ b/fastdeploy/worker/xpu_model_runner.py @@ -97,7 +97,7 @@ def __init__( local_rank: int, ): super().__init__(fd_config=fd_config, device=device) - self.enable_mm = self.model_config.enable_mm + self.enable_mm = self.fd_config.enable_mm_runtime self.rank = rank self.local_rank = local_rank self.device_id = device_id diff --git a/tests/distributed/chunked_moe.py b/tests/distributed/chunked_moe.py index fee1582f3c7..0fe9f9f3974 100644 --- a/tests/distributed/chunked_moe.py +++ b/tests/distributed/chunked_moe.py @@ -92,6 +92,7 @@ class SchedulerConfig: model_config = MockModelConfig() cache_config = MockCacheConfig() speculative_config = MockSpecaulativeConfig() + enable_mm_runtime = MockModelConfig.enable_mm def get_max_chunk_tokens(self, mm_max_tokens_per_item=None): return 8192 @@ -139,7 +140,7 @@ def setup_model_runner(self): model_runner.model_config = mock_model_config model_runner.cache_config = mock_cache_config model_runner.attn_backends = [MockAttentionBackend()] - model_runner.enable_mm = True + model_runner.enable_mm = mock_fd_config.enable_mm_runtime model_runner.cudagraph_only_prefill = False model_runner.use_cudagraph = False model_runner.speculative_decoding = False diff --git a/tests/entrypoints/test_engine_client.py b/tests/entrypoints/test_engine_client.py index 0ed8fbdc033..71ad4b29db9 100644 --- a/tests/entrypoints/test_engine_client.py +++ b/tests/entrypoints/test_engine_client.py @@ -102,6 +102,7 @@ def create_mock_fd_config( mock_config.structured_outputs_config = Mock() mock_config.structured_outputs_config.reasoning_parser = None mock_config.tool_parser = None + mock_config.enable_mm_runtime = enable_mm return mock_config @@ -181,6 +182,7 @@ async def asyncSetUp(self): mock_config.structured_outputs_config = Mock() mock_config.structured_outputs_config.reasoning_parser = None mock_config.node_rank = 0 + mock_config.enable_mm_runtime = mock_model_config.enable_mm # Create mocks for all the external dependencies mock_input_processor = Mock() @@ -363,6 +365,7 @@ def setUp(self): mock_config.structured_outputs_config = MagicMock() # Add this mock_config.structured_outputs_config.reasoning_parser = None mock_config.tool_parser = None # Add this attribute + mock_config.enable_mm_runtime = mock_model_config.enable_mm # Mock IPCSignal to avoid file system dependencies with patch("fastdeploy.entrypoints.engine_client.IPCSignal") as mock_ipcsignal: @@ -655,6 +658,7 @@ async def test_init_basic_parameters(self): mock_config.structured_outputs_config = Mock() mock_config.structured_outputs_config.reasoning_parser = None mock_config.tool_parser = None + mock_config.enable_mm_runtime = mock_config.model_config.enable_mm client = EngineClient( pid=5678, @@ -1078,6 +1082,7 @@ async def test_init_with_multimodal_prefix_cache(self): mock_config = Mock() mock_config.model_config = mock_model_config + mock_config.enable_mm_runtime = mock_model_config.enable_mm mock_config.eplb_config = Mock() mock_config.eplb_config.enable_eplb = False @@ -1131,6 +1136,7 @@ async def test_init_as_worker_node(self): mock_config = Mock() mock_config.model_config = mock_model_config + mock_config.enable_mm_runtime = mock_model_config.enable_mm mock_config.eplb_config = Mock() mock_config.eplb_config.enable_eplb = False @@ -1408,6 +1414,7 @@ async def test_init_iluvatar_platform(self): mock_config = Mock() mock_config.model_config = mock_model_config + mock_config.enable_mm_runtime = mock_model_config.enable_mm mock_config.eplb_config = Mock() mock_config.eplb_config.enable_eplb = False diff --git a/tests/layers/test_kv_cache_int8_dynamic_quant_backend.py b/tests/layers/test_kv_cache_int8_dynamic_quant_backend.py index 17a393ee11e..f679be08b31 100644 --- a/tests/layers/test_kv_cache_int8_dynamic_quant_backend.py +++ b/tests/layers/test_kv_cache_int8_dynamic_quant_backend.py @@ -92,6 +92,7 @@ def __init__(self): "max_model_len": 2048, "head_dim": 128, "num_hidden_layers": 2, + "enable_mm": False, "causal": True, "start_layer_index": 0, "rope_3d": False, @@ -124,6 +125,8 @@ def __init__(self): "model_type": "main", }, )() + self.enable_mm_runtime = self.model_config.enable_mm + self.enable_rope_3d_runtime = self.model_config.enable_mm class DummyLayer: diff --git a/tests/scheduler/test_chunked_prefill_determinism.py b/tests/scheduler/test_chunked_prefill_determinism.py index 1a0f786f3d1..17b466a014c 100644 --- a/tests/scheduler/test_chunked_prefill_determinism.py +++ b/tests/scheduler/test_chunked_prefill_determinism.py @@ -78,6 +78,7 @@ def __init__(self): self.cache_config = CacheConfig() self.parallel_config = ParallelConfig() self.speculative_config = SpeculativeConfig() + self.enable_mm_runtime = self.model_config.enable_mm # --------------------------------------------------------------------------- @@ -168,6 +169,7 @@ def _create_resource_manager(self, config): def _create_mm_resource_manager(self): config = StubConfig() config.model_config.enable_mm = True + config.enable_mm_runtime = config.model_config.enable_mm return self._create_resource_manager(config) # ==================== 1. Deterministic disabled ==================== diff --git a/tests/worker/test_gpu_prompt_logprobs.py b/tests/worker/test_gpu_prompt_logprobs.py index d26bc915339..f12bc4cf3dc 100644 --- a/tests/worker/test_gpu_prompt_logprobs.py +++ b/tests/worker/test_gpu_prompt_logprobs.py @@ -64,6 +64,7 @@ class SpecaulativeConfig: scheduler_config = SchedulerConfig() cache_config = CacheConfig() parallel_config = ParallelConfig() + enable_mm_runtime = model_config.enable_mm def get_max_chunk_tokens(self, mm_max_tokens_per_item=None): return 8192 diff --git a/tests/worker/test_reorder_split_prefill_and_decode.py b/tests/worker/test_reorder_split_prefill_and_decode.py index aff9f551cf4..d2d9e3a1f61 100644 --- a/tests/worker/test_reorder_split_prefill_and_decode.py +++ b/tests/worker/test_reorder_split_prefill_and_decode.py @@ -83,6 +83,7 @@ def create_mock_config(): fd_config.parallel_config = parallel_config fd_config.structured_outputs_config = structured_outputs_config fd_config.pad_to = 8 + fd_config.enable_mm_runtime = model_config.enable_mm def get_max_chunk_tokens(mm_max_tokens_per_item=None): return 100