Skip to content

Commit 6151c15

Browse files
committed
fix: remove IPCLock and add missing attrs to align with upstream
1 parent 82c0805 commit 6151c15

5 files changed

Lines changed: 136 additions & 140 deletions

File tree

fastdeploy/config.py

Lines changed: 102 additions & 67 deletions
Original file line numberDiff line numberDiff line change
@@ -380,6 +380,9 @@ def override_name_from_config(self):
380380
# Because the ERNIE 4.5 config.json contains two sets of keys, adaptation is required.
381381
self.moe_num_shared_experts = self.n_shared_experts
382382

383+
if hasattr(self, "num_experts_per_tok") and not hasattr(self, "moe_k"):
384+
self.moe_k = self.num_experts_per_tok
385+
383386
def read_from_env(self):
384387
"""
385388
Read configuration information from environment variables and update the object's attributes.
@@ -673,6 +676,7 @@ def __init__(
673676
self.pod_ip: str = None
674677
# enable the custom all-reduce kernel and fall back to NCCL(dist.all_reduce).
675678
self.disable_custom_all_reduce: bool = False
679+
self.enable_flashinfer_allreduce_fusion: bool = False
676680
for key, value in args.items():
677681
if hasattr(self, key):
678682
setattr(self, key, value)
@@ -776,7 +780,7 @@ class SpeculativeConfig:
776780
"benchmark_mode": False,
777781
"enf_gen_phase_tag": False,
778782
"enable_draft_logprob": False,
779-
"verify_strategy": "topp",
783+
"verify_strategy": "target_match",
780784
"accept_policy": "normal",
781785
}
782786

@@ -1060,6 +1064,7 @@ def __init__(
10601064
- None (default): capture sizes are inferred from llm config.
10611065
- list[int]: capture sizes are specified as given."""
10621066
self.cudagraph_capture_sizes: Optional[list[int]] = None
1067+
self.flag_cudagraph_capture_sizes_initlized = False
10631068
self.cudagraph_capture_sizes_prefill: list[int] = [1, 2, 4, 8]
10641069
""" Number of warmup runs for cudagraph. """
10651070
self.cudagraph_num_of_warmups: int = 2
@@ -1110,13 +1115,27 @@ def __init__(
11101115

11111116
self.check_legality_parameters()
11121117

1113-
def init_with_cudagrpah_size(self, max_capture_size: int = 0, max_capture_shape_prefill: int = 0) -> None:
1118+
def init_with_cudagrpah_size(
1119+
self,
1120+
max_capture_size: int = 0,
1121+
max_capture_shape_prefill: int = 0,
1122+
num_speculative_tokens: int = 0,
1123+
) -> None:
11141124
"""
11151125
Initialize cuda graph capture sizes and
11161126
pre-compute the mapping from batch size to padded graph size
11171127
"""
11181128
# Regular capture sizes
1119-
self.cudagraph_capture_sizes = [size for size in self.cudagraph_capture_sizes if size <= max_capture_size]
1129+
if num_speculative_tokens != 0:
1130+
max_capture_size = max_capture_size * (num_speculative_tokens + 1)
1131+
if not self.flag_cudagraph_capture_sizes_initlized and num_speculative_tokens != 0:
1132+
self.cudagraph_capture_sizes = [
1133+
size * (num_speculative_tokens + 1)
1134+
for size in self.cudagraph_capture_sizes
1135+
if (size * (num_speculative_tokens + 1)) <= max_capture_size
1136+
]
1137+
else:
1138+
self.cudagraph_capture_sizes = [size for size in self.cudagraph_capture_sizes if size <= max_capture_size]
11201139
self.cudagraph_capture_sizes_prefill = [
11211140
size for size in self.cudagraph_capture_sizes_prefill if size <= max_capture_shape_prefill
11221141
]
@@ -1156,24 +1175,41 @@ def init_with_cudagrpah_size(self, max_capture_size: int = 0, max_capture_shape_
11561175
self.real_shape_to_captured_size_prefill[bs] = end
11571176
self.real_shape_to_captured_size_prefill[self.max_capture_size_prefill] = self.max_capture_size_prefill
11581177

1178+
if num_speculative_tokens != 0:
1179+
real_bsz_to_captured_size = {}
1180+
for capture_size in self.cudagraph_capture_sizes:
1181+
dummy_batch_size = int(capture_size / (num_speculative_tokens + 1))
1182+
real_bsz_to_captured_size[dummy_batch_size] = capture_size
1183+
1184+
def expand_bsz_map(real_bsz_to_captured_size):
1185+
sorted_items = sorted(real_bsz_to_captured_size.items())
1186+
result = {}
1187+
prev_bsz = 0
1188+
for curr_bsz, cap in sorted_items:
1189+
for bsz in range(prev_bsz + 1, curr_bsz + 1):
1190+
result[bsz] = cap
1191+
prev_bsz = curr_bsz
1192+
return result
1193+
1194+
self.real_bsz_to_captured_size = expand_bsz_map(real_bsz_to_captured_size)
1195+
1196+
self.flag_cudagraph_capture_sizes_initlized = True
1197+
11591198
def _set_cudagraph_sizes(
11601199
self,
11611200
max_capture_size: int = 0,
11621201
max_capture_shape_prefill: int = 0,
1163-
dec_token_per_query_per_step: int = 1,
11641202
):
11651203
"""
11661204
Calculate a series of candidate capture sizes,
11671205
and then extract a portion of them as the capture list for the CUDA graph based on user input.
11681206
"""
1169-
# Shape [1, 2, 4, 8, 16, ... 120, 128] * dec_token_per_query_per_step
1170-
draft_capture_sizes = [i * dec_token_per_query_per_step for i in [1, 2, 4]] + [
1171-
8 * i * dec_token_per_query_per_step for i in range(1, 17)
1172-
]
1173-
# Shape [128, 144, ... 240, 256] * dec_token_per_query_per_step
1174-
draft_capture_sizes += [16 * i * dec_token_per_query_per_step for i in range(9, 17)]
1175-
# Shape [256, 288, ... 992, 1024] * dec_token_per_query_per_step
1176-
draft_capture_sizes += [32 * i * dec_token_per_query_per_step for i in range(9, 33)]
1207+
# Shape [1, 2, 4, 8, 16, ... 120, 128]
1208+
draft_capture_sizes = [i for i in [1, 2, 4]] + [8 * i for i in range(1, 17)]
1209+
# Shape [128, 144, ... 240, 256]
1210+
draft_capture_sizes += [16 * i for i in range(9, 17)]
1211+
# Shape [256, 288, ... 992, 1024]
1212+
draft_capture_sizes += [32 * i for i in range(9, 33)]
11771213

11781214
draft_capture_sizes_prefill = draft_capture_sizes.copy()
11791215
draft_capture_sizes.append(max_capture_size)
@@ -1417,6 +1453,7 @@ def __init__(
14171453
self.dynamic_load_weight: bool = False
14181454
self.load_strategy: Optional[Literal["ipc", "ipc_snapshot", "meta", "normal", "rsync"]] = "normal"
14191455
self.rsync_config: Optional[Dict[str, Any]] = None
1456+
self.model_loader_extra_config: Optional[Dict[str, Any]] = None
14201457
for key, value in args.items():
14211458
if hasattr(self, key):
14221459
setattr(self, key, value)
@@ -1903,65 +1940,34 @@ def __init__(
19031940
self.deploy_modality: DeployModality = deploy_modality
19041941
# Initialize cuda graph capture list
19051942
max_capture_shape = self.scheduler_config.max_num_seqs
1906-
if self.speculative_config is not None and self.speculative_config.method in [
1907-
SpecMethod.MTP,
1908-
SpecMethod.SUFFIX,
1909-
]:
1910-
max_capture_shape = self.scheduler_config.max_num_seqs * (
1911-
self.speculative_config.num_speculative_tokens + 1
1912-
)
1913-
assert max_capture_shape % 2 == 0, "CUDAGraph only supports capturing even token nums in MTP scenarios."
1914-
self.graph_opt_config.real_bsz_to_captured_size = {
1915-
k: 0 for k in range(1, self.scheduler_config.max_num_seqs + 1)
1916-
}
19171943
if self.graph_opt_config.cudagraph_only_prefill:
19181944
max_capture_shape = 512
19191945
else:
1920-
max_capture_shape = (
1921-
max_capture_shape if self.speculative_config is not None else min(512, max_capture_shape)
1922-
)
1946+
max_capture_shape = min(512, max_capture_shape)
19231947

19241948
max_capture_shape_prefill = graph_opt_config.max_capture_shape_prefill
19251949

19261950
if self.graph_opt_config.cudagraph_capture_sizes is None:
1927-
dec_token_per_query_per_step = (
1928-
self.speculative_config.num_speculative_tokens + 1
1929-
if self.speculative_config is not None and self.speculative_config.method is not None
1930-
else 1
1931-
)
19321951
self.graph_opt_config._set_cudagraph_sizes(
19331952
max_capture_size=max_capture_shape,
19341953
max_capture_shape_prefill=max_capture_shape_prefill,
1935-
dec_token_per_query_per_step=dec_token_per_query_per_step,
19361954
)
1937-
if self.speculative_config is not None and self.speculative_config.method is not None:
1938-
real_bsz_to_captured_size = {}
1939-
for capture_size in self.graph_opt_config.cudagraph_capture_sizes:
1940-
dummy_batch_size = int(capture_size / (self.speculative_config.num_speculative_tokens + 1))
1941-
real_bsz_to_captured_size[dummy_batch_size] = capture_size
19421955

1943-
def expand_bsz_map(real_bsz_to_captured_size):
1944-
"""
1945-
Expand a sparse batch size mapping into a dense one.
1946-
1947-
Args:
1948-
real_bsz_to_captured_size (dict): Sparse batch size to capture size mapping.
1949-
Returns:
1950-
dict: Dense batch size to capture size mapping.
1951-
"""
1952-
sorted_items = sorted(real_bsz_to_captured_size.items())
1953-
result = {}
1954-
prev_bsz = 0
1955-
for curr_bsz, cap in sorted_items:
1956-
for bsz in range(prev_bsz + 1, curr_bsz + 1):
1957-
result[bsz] = cap
1958-
prev_bsz = curr_bsz
1959-
return result
1960-
1961-
self.graph_opt_config.real_bsz_to_captured_size = expand_bsz_map(real_bsz_to_captured_size)
19621956
self.graph_opt_config.init_with_cudagrpah_size(
19631957
max_capture_size=max_capture_shape,
19641958
max_capture_shape_prefill=max_capture_shape_prefill,
1959+
num_speculative_tokens=(
1960+
self.speculative_config.num_speculative_tokens
1961+
if (
1962+
self.speculative_config is not None
1963+
and self.speculative_config.method
1964+
in [
1965+
SpecMethod.MTP,
1966+
SpecMethod.SUFFIX,
1967+
]
1968+
)
1969+
else 0
1970+
),
19651971
)
19661972

19671973
self.tokenizer = tokenizer
@@ -2002,6 +2008,7 @@ def expand_bsz_map(real_bsz_to_captured_size):
20022008
int(envs.ENABLE_V1_KVCACHE_SCHEDULER) == 0
20032009
and self.model_config is not None
20042010
and self.model_config.enable_mm
2011+
and self.deploy_modality != DeployModality.TEXT
20052012
):
20062013
self.max_prefill_batch = 1 # TODO:当前V0多模prefill阶段只支持并行度为1,待优化
20072014
else:
@@ -2029,18 +2036,32 @@ def expand_bsz_map(real_bsz_to_captured_size):
20292036
and self.router_config
20302037
and self.router_config.router
20312038
):
2032-
# For RL scenario: version.yaml will be required for models in future releases.
2039+
# For RL scenario, version.yaml is required for models
20332040
# Temporarily enforce use router to be enabled.
20342041
self.model_config.read_model_version()
20352042

20362043
self.read_from_config()
20372044
self.postprocess()
2038-
self.init_cache_info()
2045+
self.init_pd_info()
20392046
if test_mode:
20402047
return
20412048
self.check()
20422049
# self.print() # NOTE: it's better to explicitly call .print() when FDConfig is initialized
20432050

2051+
@property
2052+
def enable_mm_runtime(self) -> bool:
2053+
return (
2054+
self.model_config is not None
2055+
and self.model_config.enable_mm
2056+
and self.deploy_modality != DeployModality.TEXT
2057+
)
2058+
2059+
@property
2060+
def enable_rope_3d_runtime(self) -> bool:
2061+
return self.enable_mm_runtime and (
2062+
getattr(self.model_config, "rope_3d", False) or getattr(self.model_config, "use_3d_rope", False)
2063+
)
2064+
20442065
def _disable_sequence_parallel_moe_if_needed(self, mode_name):
20452066
if self.parallel_config.use_sequence_parallel_moe and self.graph_opt_config.use_cudagraph:
20462067
self.parallel_config.use_sequence_parallel_moe = False
@@ -2069,7 +2090,10 @@ def postprocess(self):
20692090

20702091
if self.scheduler_config.max_num_batched_tokens is None:
20712092
if int(envs.ENABLE_V1_KVCACHE_SCHEDULER):
2072-
self.scheduler_config.max_num_batched_tokens = 8192 # if set to max_model_len, it's easy to be OOM
2093+
if int(envs.FD_DISABLE_CHUNKED_PREFILL):
2094+
self.scheduler_config.max_num_batched_tokens = self.model_config.max_model_len
2095+
else:
2096+
self.scheduler_config.max_num_batched_tokens = 8192 # if set to max_model_len, it's easy to be OOM
20732097
else:
20742098
if self.cache_config.enable_chunked_prefill:
20752099
self.scheduler_config.max_num_batched_tokens = 2048
@@ -2079,9 +2103,21 @@ def postprocess(self):
20792103
if self.long_prefill_token_threshold == 0:
20802104
self.long_prefill_token_threshold = int(self.model_config.max_model_len * 0.04)
20812105

2106+
if (
2107+
self.model_config is not None
2108+
and self.model_config.enable_mm
2109+
and self.deploy_modality == DeployModality.TEXT
2110+
):
2111+
if getattr(self.model_config, "rope_3d", False) or getattr(self.model_config, "use_3d_rope", False):
2112+
logger.info(
2113+
"Deploy modality is text; forcing the multimodal-capable model onto the 2D RoPE runtime path."
2114+
)
2115+
setattr(self.model_config, "rope_3d", False)
2116+
setattr(self.model_config, "use_3d_rope", False)
2117+
20822118
self.cache_config.max_block_num_per_seq = int(self.model_config.max_model_len // self.cache_config.block_size)
20832119
self.cache_config.postprocess(self.get_max_chunk_tokens(), self.scheduler_config.max_num_seqs)
2084-
if self.model_config is not None and self.model_config.enable_mm and not envs.ENABLE_V1_KVCACHE_SCHEDULER:
2120+
if self.model_config is not None and self.enable_mm_runtime and not envs.ENABLE_V1_KVCACHE_SCHEDULER:
20852121
self.cache_config.enable_prefix_caching = False
20862122
if (
20872123
self.structured_outputs_config is not None
@@ -2107,7 +2143,7 @@ def postprocess(self):
21072143
f"Guided decoding backend '{self.structured_outputs_config.guided_decoding_backend}' is not implemented. [auto, xgrammar, guidance, off]"
21082144
)
21092145

2110-
if self.model_config.enable_mm:
2146+
if self.enable_mm_runtime:
21112147
if self.cache_config.max_encoder_cache is None or self.cache_config.max_encoder_cache < 0:
21122148
self.cache_config.max_encoder_cache = self.scheduler_config.max_num_batched_tokens
21132149
elif self.cache_config.max_encoder_cache != 0:
@@ -2402,18 +2438,17 @@ def print(self):
24022438
logger.info("{:<20}:{:<6}{}".format(k, "", v))
24032439
logger.info("=============================================================")
24042440

2405-
def init_cache_info(self):
2441+
def init_pd_info(self):
24062442
"""
2407-
initialize cache info
2443+
initialize info for pd deployment
24082444
"""
2409-
# TODO: group the splitiwse params
24102445
# There are two methods for splitwise deployment:
24112446
# 1. v0 splitwise_scheduler or dp_scheduler
2412-
# 2. v1 local_scheduler + router
2447+
# 2. v1 local_scheduler + router (optional)
24132448
self.splitwise_version = None
24142449
if self.scheduler_config.name in ("splitwise", "dp"):
24152450
self.splitwise_version = "v0"
2416-
elif self.scheduler_config.name == "local" and self.router_config and self.router_config.router:
2451+
elif self.scheduler_config.name == "local":
24172452
self.splitwise_version = "v1"
24182453

24192454
# the information for registering this server to router or splitwise_scheduler
@@ -2474,7 +2509,7 @@ def get_max_chunk_tokens(self, mm_max_tokens_per_item=None):
24742509
num_tokens = self.scheduler_config.max_num_seqs
24752510
else:
24762511
num_tokens = self.scheduler_config.max_num_batched_tokens
2477-
if mm_max_tokens_per_item is not None and self.deploy_modality != DeployModality.TEXT:
2512+
if self.enable_mm_runtime and mm_max_tokens_per_item is not None:
24782513
max_mm_tokens = max(
24792514
mm_max_tokens_per_item.get("image", 0),
24802515
mm_max_tokens_per_item.get("video", 0),

fastdeploy/engine/common_engine.py

Lines changed: 0 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -61,7 +61,6 @@
6161
from fastdeploy.inter_communicator import (
6262
EngineCacheQueue,
6363
EngineWorkerQueue,
64-
IPCLock,
6564
IPCSignal,
6665
ZmqIpcServer,
6766
ZmqTcpServer,
@@ -223,10 +222,6 @@ def __init__(self, cfg: FDConfig, start_queue=True, use_async_llm=False):
223222
)
224223
self._init_worker_monitor_signals()
225224

226-
# Pass the GPU KV cache lock to cache_manager for mutual exclusion
227-
# between the CPU transfer process and the worker process.
228-
self.resource_manager.cache_manager.gpu_cache_lock = self.gpu_cache_lock
229-
230225
# Initialize RegisterManager
231226
self._register_manager = RegisterManager(
232227
cfg=self.cfg,
@@ -465,14 +460,6 @@ def _init_worker_monitor_signals(self): # exist_task_signal 用于各worker进
465460
create=True,
466461
)
467462

468-
# gpu_cache_lock: file-based lock for mutual exclusion between worker
469-
# and CPU transfer when accessing GPU KV cache.
470-
self.gpu_cache_lock = IPCLock(
471-
name="gpu_cache_lock",
472-
suffix=current_suffix,
473-
create=True,
474-
)
475-
476463
def start_worker_queue_service(self, start_queue):
477464
"""
478465
start queue service for engine worker communication

fastdeploy/worker/gpu_model_runner.py

Lines changed: 7 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -27,7 +27,7 @@
2727
from paddle import nn
2828
from paddleformers.utils.log import logger
2929

30-
from fastdeploy.config import FDConfig
30+
from fastdeploy.config import PREEMPTED_TOKEN_ID, FDConfig
3131
from fastdeploy.engine.pooling_params import PoolingParams
3232
from fastdeploy.engine.request import BatchRequest, ImagePosition, Request, RequestType
3333
from fastdeploy.model_executor.graph_optimization.utils import (
@@ -79,7 +79,6 @@
7979
speculate_schedule_cache,
8080
set_data_ipc,
8181
unset_data_ipc,
82-
get_position_ids_and_mask_encoder_batch,
8382
update_attn_mask_offsets,
8483
)
8584

@@ -88,7 +87,12 @@
8887
from fastdeploy import envs
8988
from fastdeploy.cache_manager.v1 import CacheController
9089
from fastdeploy.engine.tasks import PoolingTask
91-
from fastdeploy.input.ernie4_5_vl_processor import DataProcessor
90+
91+
try:
92+
from fastdeploy.input.ernie4_5_vl_processor import DataProcessor
93+
except ImportError:
94+
DataProcessor = None
95+
9296
from fastdeploy.inter_communicator import IPCSignal, ZmqIpcClient
9397
from fastdeploy.logger.deterministic_logger import DeterministicLogger
9498
from fastdeploy.model_executor.forward_meta import ForwardMeta

0 commit comments

Comments
 (0)