Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
33 changes: 30 additions & 3 deletions fastdeploy/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -1980,6 +1980,7 @@ def expand_bsz_map(real_bsz_to_captured_size):
int(envs.ENABLE_V1_KVCACHE_SCHEDULER) == 0
and self.model_config is not None
and self.model_config.enable_mm
and self.deploy_modality != DeployModality.TEXT
):
self.max_prefill_batch = 1 # TODO:当前V0多模prefill阶段只支持并行度为1,待优化
else:
Expand Down Expand Up @@ -2019,6 +2020,20 @@ def expand_bsz_map(real_bsz_to_captured_size):
self.check()
# self.print() # NOTE: it's better to explicitly call .print() when FDConfig is initialized

@property
def enable_mm_runtime(self) -> bool:
return (
self.model_config is not None
and self.model_config.enable_mm
and self.deploy_modality != DeployModality.TEXT
)

@property
def enable_rope_3d_runtime(self) -> bool:
return self.enable_mm_runtime and (
getattr(self.model_config, "rope_3d", False) or getattr(self.model_config, "use_3d_rope", False)
)

def _disable_sequence_parallel_moe_if_needed(self, mode_name):
if self.parallel_config.use_sequence_parallel_moe and self.graph_opt_config.use_cudagraph:
self.parallel_config.use_sequence_parallel_moe = False
Expand Down Expand Up @@ -2057,9 +2072,21 @@ def postprocess(self):
if self.long_prefill_token_threshold == 0:
self.long_prefill_token_threshold = int(self.model_config.max_model_len * 0.04)

if (
self.model_config is not None
and self.model_config.enable_mm
and self.deploy_modality == DeployModality.TEXT
):
if getattr(self.model_config, "rope_3d", False) or getattr(self.model_config, "use_3d_rope", False):
logger.info(
"Deploy modality is text; forcing the multimodal-capable model onto the 2D RoPE runtime path."
)
setattr(self.model_config, "rope_3d", False)
setattr(self.model_config, "use_3d_rope", False)

self.cache_config.max_block_num_per_seq = int(self.model_config.max_model_len // self.cache_config.block_size)
self.cache_config.postprocess(self.get_max_chunk_tokens(), self.scheduler_config.max_num_seqs)
if self.model_config is not None and self.model_config.enable_mm and not envs.ENABLE_V1_KVCACHE_SCHEDULER:
if self.model_config is not None and self.enable_mm_runtime and not envs.ENABLE_V1_KVCACHE_SCHEDULER:
self.cache_config.enable_prefix_caching = False
if (
self.structured_outputs_config is not None
Expand All @@ -2085,7 +2112,7 @@ def postprocess(self):
f"Guided decoding backend '{self.structured_outputs_config.guided_decoding_backend}' is not implemented. [auto, xgrammar, guidance, off]"
)

if self.model_config.enable_mm:
if self.enable_mm_runtime:
if self.cache_config.max_encoder_cache is None or self.cache_config.max_encoder_cache < 0:
self.cache_config.max_encoder_cache = self.scheduler_config.max_num_batched_tokens
elif self.cache_config.max_encoder_cache != 0:
Expand Down Expand Up @@ -2392,7 +2419,7 @@ def get_max_chunk_tokens(self, mm_max_tokens_per_item=None):
num_tokens = self.scheduler_config.max_num_seqs
else:
num_tokens = self.scheduler_config.max_num_batched_tokens
if mm_max_tokens_per_item is not None and self.deploy_modality != DeployModality.TEXT:
if self.enable_mm_runtime and mm_max_tokens_per_item is not None:
max_mm_tokens = max(
mm_max_tokens_per_item.get("image", 0),
mm_max_tokens_per_item.get("video", 0),
Expand Down
3 changes: 2 additions & 1 deletion fastdeploy/engine/async_llm.py
Original file line number Diff line number Diff line change
Expand Up @@ -294,6 +294,7 @@ def __init__(self, cfg, pid):
cfg.limit_mm_per_prompt,
cfg.mm_processor_kwargs,
cfg.tool_parser,
enable_mm_runtime=cfg.enable_mm_runtime,
)
# Create data processor
self.data_processor = self.input_processor.create_processor()
Expand Down Expand Up @@ -446,7 +447,7 @@ async def add_request(
)
if envs.ZMQ_SEND_BATCH_DATA and self.connection_manager is not None:
request["zmq_worker_pid"] = self.connection_manager.worker_pid
if self.cfg.model_config.enable_mm:
if self.cfg.enable_mm_runtime:
self.request_client.send_pyobj(request)
else:
self.request_client.send_json(request)
Expand Down
8 changes: 5 additions & 3 deletions fastdeploy/engine/common_engine.py
Original file line number Diff line number Diff line change
Expand Up @@ -330,6 +330,7 @@ def create_data_processor(self):
self.cfg.limit_mm_per_prompt,
self.cfg.mm_processor_kwargs,
self.cfg.tool_parser,
enable_mm_runtime=self.cfg.enable_mm_runtime,
)
self.data_processor = self.input_processor.create_processor()
self.mm_max_tokens_per_item = self.data_processor.get_mm_max_tokens_per_item(
Expand Down Expand Up @@ -601,7 +602,7 @@ def insert_tasks(self, tasks: List[Request], current_id=-1):
LoggingEventName.RESCHEDULED_INFERENCE_START, task.request_id, getattr(task, "user", "")
)
if not is_prefill:
if not self.cfg.model_config.enable_mm:
if not self.cfg.enable_mm_runtime:
self.update_requests_chunk_size(tasks)
else:
self.update_mm_requests_chunk_size(tasks)
Expand Down Expand Up @@ -1217,7 +1218,7 @@ def _insert_zmq_task_to_scheduler(self):
while self.running:
try:
block = True if len(added_requests) == 0 else False
if not self.cfg.model_config.enable_mm:
if not self.cfg.enable_mm_runtime:
err, data = self.recv_request_server.receive_json_once(block)
else:
err, data = self.recv_request_server.receive_pyobj_once(block)
Expand Down Expand Up @@ -1275,6 +1276,7 @@ def _insert_zmq_task_to_scheduler(self):
err_msg = None
try:
request = Request.from_dict(data)

request.metrics.scheduler_recv_req_time = time.time()
main_process_metrics.requests_number.inc()
trace_carrier = data.get("trace_carrier")
Expand Down Expand Up @@ -2377,7 +2379,7 @@ def _setting_environ_variables(self):
if self.cfg.scheduler_config.splitwise_role == "prefill":
variables["FLAGS_fmt_write_cache_completed_signal"] = 1

if self.cfg.model_config.enable_mm:
if self.cfg.enable_mm_runtime:
variables["FLAGS_max_partition_size"] = 1024

command_prefix = ""
Expand Down
6 changes: 3 additions & 3 deletions fastdeploy/engine/sched/resource_manager_v1.py
Original file line number Diff line number Diff line change
Expand Up @@ -205,11 +205,11 @@ def __init__(self, max_num_seqs, config, tensor_parallel_size, splitwise_role, l
self.need_block_num_map = dict()

self.encoder_cache = None
if config.model_config.enable_mm and config.cache_config.max_encoder_cache > 0:
if config.enable_mm_runtime and config.cache_config.max_encoder_cache > 0:
self.encoder_cache = EncoderCacheManager(config.cache_config.max_encoder_cache)

self.processor_cache = None
if config.model_config.enable_mm and config.cache_config.max_processor_cache > 0:
if config.enable_mm_runtime and config.cache_config.max_processor_cache > 0:
max_processor_cache_in_bytes = int(config.cache_config.max_processor_cache * 1024 * 1024 * 1024)
self.processor_cache = ProcessorCacheManager(max_processor_cache_in_bytes)

Expand Down Expand Up @@ -550,7 +550,7 @@ def _get_num_new_tokens(self, request, token_budget):
num_new_tokens = token_budget // self.config.cache_config.block_size * self.config.cache_config.block_size
request.with_image = False

if not self.config.model_config.enable_mm:
if not self.config.enable_mm_runtime:
return num_new_tokens

inputs = request.multimodal_inputs
Expand Down
4 changes: 3 additions & 1 deletion fastdeploy/entrypoints/engine_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -84,7 +84,7 @@ class EngineClient:
def __init__(self, pid: int | str, port: int | str, fd_config: FDConfig, workers: int = 1, max_logprobs: int = 20):
self.fd_config = fd_config
self.tensor_parallel_size = self.fd_config.parallel_config.tensor_parallel_size
self.enable_mm = self.fd_config.model_config.enable_mm
self.enable_mm = self.fd_config.enable_mm_runtime
self.max_logprobs = max_logprobs
input_processor = InputPreprocessor(
self.fd_config.model_config,
Expand All @@ -93,6 +93,7 @@ def __init__(self, pid: int | str, port: int | str, fd_config: FDConfig, workers
self.fd_config.mm_processor_kwargs,
self.fd_config.tool_parser,
self.enable_mm and self.fd_config.cache_config.max_processor_cache > 0,
enable_mm_runtime=self.enable_mm,
)
self.enable_logprob = self.fd_config.model_config.enable_logprob
self.data_processor = input_processor.create_processor()
Expand Down Expand Up @@ -358,6 +359,7 @@ async def add_requests(self, task):

task["max_tokens"] = min(self.max_model_len - input_ids_len, task.get("max_tokens"))
min_tokens = task.get("min_tokens", 1)

if "messages" in task:
task["messages"] = None
api_server_logger.info(f"task['max_tokens']:{task['max_tokens']}")
Expand Down
5 changes: 4 additions & 1 deletion fastdeploy/input/preprocess.py
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,7 @@ def __init__(
mm_processor_kwargs: Optional[Dict[str, Any]] = None,
tool_parser: str = None,
enable_processor_cache: bool = False,
enable_mm_runtime: Optional[bool] = None,
) -> None:
self.model_config = model_config
self.model_name_or_path = self.model_config.model
Expand All @@ -56,6 +57,7 @@ def __init__(
self.mm_processor_kwargs = mm_processor_kwargs
self.tool_parser = tool_parser
self.enable_processor_cache = enable_processor_cache
self.enable_mm_runtime = self.model_config.enable_mm if enable_mm_runtime is None else enable_mm_runtime

def create_processor(self):
reasoning_parser_obj = None
Expand All @@ -77,10 +79,11 @@ def create_processor(self):
reasoning_parser_obj=reasoning_parser_obj,
tool_parser_obj=tool_parser_obj,
mm_processor_kwargs=self.mm_processor_kwargs,
enable_mm_runtime=self.enable_mm_runtime,
)
except Exception as e:
logger.info(f"Plugin input processor not available ({e}), using built-in processor")
if not self.model_config.enable_mm:
if not self.enable_mm_runtime:
from fastdeploy.input.text_processor import TextProcessor

tokenizer_type = "ernie4_5" if ErnieArchitectures.contains_ernie_arch(architecture) else "auto"
Expand Down
2 changes: 0 additions & 2 deletions fastdeploy/inter_communicator/engine_worker_queue.py
Original file line number Diff line number Diff line change
Expand Up @@ -549,7 +549,6 @@ def put_tasks(self, tasks: List[Any]) -> None:
self.lock.release()
time.sleep(0.001)
self.lock.acquire()

if envs.FD_ENABLE_MAX_PREFILL or envs.FD_ENABLE_E2W_TENSOR_CONVERT:
# multimodal input numpy -> tensor
to_tensor(tasks[0])
Expand All @@ -571,7 +570,6 @@ def get_tasks(self) -> Tuple[List[Any], bool]:
"""
tasks: List[Any] = list()
self.lock.acquire()

tasks.extend(self.tasks)
self.client_read_flag[self.client_id] = 1
all_client_read: bool = np.sum(self.client_read_flag) == self.num_client
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -138,9 +138,7 @@ def __init__(
self.rope_theta: float = (
10000.0 if fd_config.model_config.rope_theta is None else fd_config.model_config.rope_theta
)
self.rope_3d: bool = getattr(fd_config.model_config, "rope_3d", False) or getattr(
fd_config.model_config, "use_3d_rope", False
)
self.rope_3d: bool = fd_config.enable_rope_3d_runtime
if fd_config.speculative_config.model_type != "main":
self.rope_3d = False
self.causal: bool = getattr(fd_config.model_config, "causal", True)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -136,7 +136,7 @@ def __init__(
self.rope_theta: float = (
10000.0 if fd_config.model_config.rope_theta is None else fd_config.model_config.rope_theta
)
self.rope_3d: bool = getattr(fd_config.model_config, "rope_3d", False)
self.rope_3d: bool = fd_config.enable_rope_3d_runtime
self.causal: bool = getattr(fd_config.model_config, "causal", True)
self.speculative_method: str = fd_config.speculative_config.method
self.use_speculate: bool = self.speculative_method is not None
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -267,9 +267,7 @@ def __init__(

self.rank, self.device_id = init_rank_and_device_id(fd_config)

self.rope_3d: bool = getattr(fd_config.model_config, "rope_3d", False) or getattr(
fd_config.model_config, "use_3d_rope", False
)
self.rope_3d: bool = fd_config.enable_rope_3d_runtime
if fd_config.speculative_config.model_type != "main":
self.rope_3d = False
# Note(ZKK): here must be consistent with append_attn_backend.py
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -121,9 +121,7 @@ def __init__(

self.rank, self.device_id = init_rank_and_device_id(fd_config)

self.rope_3d: bool = getattr(fd_config.model_config, "rope_3d", False) or getattr(
fd_config.model_config, "use_3d_rope", False
)
self.rope_3d: bool = fd_config.enable_rope_3d_runtime
if fd_config.speculative_config.model_type != "main":
self.rope_3d = False
self.max_partition_size: int = int(os.getenv("FLAGS_max_partition_size", "32768"))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -263,7 +263,7 @@ def __init__(
self.rope_theta: float = (
10000.0 if fd_config.model_config.rope_theta is None else fd_config.model_config.rope_theta
)
self.rope_3d: bool = getattr(fd_config.model_config, "rope_3d", False)
self.rope_3d: bool = fd_config.enable_rope_3d_runtime
self.causal: bool = getattr(fd_config.model_config, "causal", True)
self.speculative_method = fd_config.speculative_config.method
self.use_speculate: bool = self.speculative_method is not None
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -89,7 +89,7 @@ def __init__(
# note: scale need to change if using MLA
self.scale = 1.0 / sqrt(head_dim)
self.dtype = paddle.get_default_dtype()
self.enable_mm = fd_config.model_config.enable_mm
self.enable_mm = fd_config.enable_mm_runtime
self.rope_batch_stride = self.max_context_len * self.head_dim if self.enable_mm else 0
if "paddleocr" in fd_config.model_config.model_type:
self.is_interleaved_rope_mode = False
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -219,7 +219,7 @@ def __init__(
self.block_size = llm_config.cache_config.block_size
self.max_seq_len = llm_config.model_config.max_model_len
self.rope_theta = 10000.0 if llm_config.model_config.rope_theta is None else llm_config.model_config.rope_theta
self.rope_3d = getattr(llm_config.model_config, "rope_3d", False)
self.rope_3d = llm_config.enable_rope_3d_runtime
self.causal = getattr(llm_config.model_config, "causal", True)
self.speculative_method = llm_config.speculative_config.method
self.use_speculate: bool = self.speculative_method is not None
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -101,7 +101,7 @@ def __init__(
self.rope_theta: float = (
10000.0 if fd_config.model_config.rope_theta is None else fd_config.model_config.rope_theta
)
self.rope_3d: bool = getattr(fd_config.model_config, "rope_3d", False)
self.rope_3d: bool = fd_config.enable_rope_3d_runtime
self.causal: bool = getattr(fd_config.model_config, "causal", True)
self.speculative_method = fd_config.speculative_config.method
self.use_speculate: bool = self.speculative_method is not None
Expand All @@ -128,7 +128,7 @@ def __init__(
fd_config.parallel_config.expert_parallel_rank = 0

self.rank, self.device_id = init_rank_and_device_id(fd_config)
self.enable_mm = fd_config.model_config.enable_mm
self.enable_mm = fd_config.enable_mm_runtime
self.model_type = fd_config.model_config.model_type
self.is_neox_style = False
if "paddleocr" in fd_config.model_config.model_type:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -105,7 +105,7 @@ def __init__(
self.rope_theta: float = (
10000.0 if fd_config.model_config.rope_theta is None else fd_config.model_config.rope_theta
)
self.rope_3d: bool = getattr(fd_config.model_config, "rope_3d", False)
self.rope_3d: bool = fd_config.enable_rope_3d_runtime
self.causal: bool = getattr(fd_config.model_config, "causal", True)
self.speculative_method = fd_config.speculative_config.method
self.use_speculate: bool = self.speculative_method is not None
Expand Down
4 changes: 1 addition & 3 deletions fastdeploy/model_executor/layers/backends/xpu/attention.py
Original file line number Diff line number Diff line change
Expand Up @@ -88,9 +88,7 @@ def __init__(
self.rope_theta: float = (
10000.0 if fd_config.model_config.rope_theta is None else fd_config.model_config.rope_theta
)
self.rope_3d: bool = getattr(fd_config.model_config, "rope_3d", False) or getattr(
fd_config.model_config, "use_3d_rope", False
)
self.rope_3d: bool = fd_config.enable_rope_3d_runtime
self.causal: bool = getattr(fd_config.model_config, "causal", True)
self.keep_pd_step_flag: bool = fd_config.speculative_config.model_type == "mtp"
self.num_layers_draft_model: int = int(fd_config.speculative_config.method == SpecMethod.MTP)
Expand Down
1 change: 1 addition & 0 deletions fastdeploy/output/token_processor.py
Original file line number Diff line number Diff line change
Expand Up @@ -947,6 +947,7 @@ def _process_batch_output(self):
if not is_prefill:
self._record_completion_metrics(task, current_time)
llm_logger.info(f"task {task_id} received eos token. Recycling.")

if (
envs.ENABLE_V1_KVCACHE_SCHEDULER
and self.cfg.cache_config.enable_prefix_caching
Expand Down
2 changes: 1 addition & 1 deletion fastdeploy/spec_decode/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,7 @@ def __init__(self, fd_config: "FDConfig"):
self.max_ngram_size = self.speculative_config.max_ngram_size
self.min_ngram_size = self.speculative_config.min_ngram_size

self.enable_mm = self.model_config.enable_mm
self.enable_mm = self.fd_config.enable_mm_runtime

spec_logger.info(f"Speculate config: {self.speculative_config}")

Expand Down
2 changes: 1 addition & 1 deletion fastdeploy/spec_decode/mtp.py
Original file line number Diff line number Diff line change
Expand Up @@ -103,7 +103,7 @@ def __init__(
self.num_main_model_layers = self.model_config.num_hidden_layers
self.local_rank = local_rank
self.device_id = device_id
self.use_attn_mask_offset = self.enable_mm and self.fd_config.deploy_modality != "text"
self.use_attn_mask_offset = self.enable_mm

self._update_mtp_config(main_model)
self._load_model()
Expand Down
2 changes: 1 addition & 1 deletion fastdeploy/worker/gcu_model_runner.py
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,7 @@ def __init__(
local_rank: int,
):
super().__init__(fd_config=fd_config, device=device)
self.enable_mm = self.model_config.enable_mm
self.enable_mm = self.fd_config.enable_mm_runtime
self.rank = rank
self.local_rank = local_rank
self.device_id = device_id
Expand Down
4 changes: 3 additions & 1 deletion fastdeploy/worker/gpu_model_runner.py
Original file line number Diff line number Diff line change
Expand Up @@ -119,7 +119,7 @@ def __init__(
):
super().__init__(fd_config=fd_config, device=device)
self.MAX_INFER_SEED = 9223372036854775806
self.enable_mm = self.model_config.enable_mm
self.enable_mm = self.fd_config.enable_mm_runtime
self.rank = rank
self.local_rank = local_rank
self.device_id = device_id
Expand Down Expand Up @@ -1118,10 +1118,12 @@ def _dummy_prefill_inputs(self, input_length_list: List[int], max_dec_len_list:

def _prepare_inputs(self, cached_token_num=-1, cached_real_bsz=-1, is_dummy_or_profile_run=False) -> None:
"""Prepare the model inputs"""

if self.enable_mm and self.share_inputs["image_features_list"] is not None:
tensor_feats = [t for t in self.share_inputs["image_features_list"] if isinstance(t, paddle.Tensor)]
if tensor_feats:
self.share_inputs["image_features"] = paddle.concat(tensor_feats, axis=0)

recover_decode_task(
self.share_inputs["stop_flags"],
self.share_inputs["seq_lens_this_time"],
Expand Down
Loading
Loading