Skip to content

Commit bf0dace

Browse files
[Scheduler] Increase sleep interval in fetch loops and cancel schedule threashold for prefill instance (#7871)
1 parent 0a5d4b6 commit bf0dace

5 files changed

Lines changed: 13 additions & 7 deletions

File tree

fastdeploy/engine/common_engine_prepare_mixin.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -248,10 +248,10 @@ def _fetch_loop(self, fetch_fn, thread_idx: int):
248248
with self._pause_cond:
249249
self._pause_cond.wait_for(lambda: not self.is_paused)
250250
fetch_fn()
251-
time.sleep(0.002)
251+
time.sleep(0.02)
252252
except Exception as e:
253253
self.llm_logger.error(f"fetching request error in worker-{thread_idx}: {e} {traceback.format_exc()}")
254-
time.sleep(0.002)
254+
time.sleep(0.02)
255255

256256
def _prepare_request_v1(self):
257257
"""Prepare request and send to the queue for scheduling"""

fastdeploy/engine/sched/resource_manager_v1.py

Lines changed: 8 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -245,7 +245,7 @@ def get_new_block_nums(self, request: Request, num_new_tokens: int):
245245
block_num = (
246246
request.num_computed_tokens + num_new_tokens + self.config.cache_config.block_size - 1
247247
) // self.config.cache_config.block_size - len(request.block_tables)
248-
248+
block_num = max(block_num, 0)
249249
if self.config.speculative_config.method is not None:
250250
block_num = min(block_num + 1, self.config.cache_config.max_block_num_per_seq)
251251
else:
@@ -1001,7 +1001,13 @@ def _allocate_decode_and_extend():
10011001
req_index += 1
10021002
continue
10031003
num_new_block = self.get_new_block_nums(request, num_new_tokens)
1004-
can_schedule_block_num_threshold = self._get_can_schedule_prefill_threshold_block(num_new_block)
1004+
if self.config.scheduler_config.splitwise_role == "prefill":
1005+
# for prefill instance, do not set threshold for running requests
1006+
can_schedule_block_num_threshold = 0
1007+
else:
1008+
can_schedule_block_num_threshold = self._get_can_schedule_prefill_threshold_block(
1009+
num_new_block
1010+
)
10051011
# Allocate blocks to prefill
10061012
if self.cache_manager.can_allocate_gpu_blocks(can_schedule_block_num_threshold):
10071013
request.block_tables.extend(

fastdeploy/envs.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -192,7 +192,7 @@ def _validate_split_kv_size(value: int) -> int:
192192
# "Enable FP8 calibration on HPU"
193193
"FD_HPU_MEASUREMENT_MODE": lambda: os.getenv("FD_HPU_MEASUREMENT_MODE", "0"),
194194
# Number of worker threads for prepare requests in prefill instance
195-
"FD_PREFILL_PREPARE_REQ_THREAD_NUM": lambda: int(os.getenv("FD_PREFILL_PREPARE_REQ_THREAD_NUM", "5")),
195+
"FD_PREFILL_PREPARE_REQ_THREAD_NUM": lambda: int(os.getenv("FD_PREFILL_PREPARE_REQ_THREAD_NUM", "3")),
196196
"FD_PREFILL_WAIT_DECODE_RESOURCE_SECONDS": lambda: int(os.getenv("FD_PREFILL_WAIT_DECODE_RESOURCE_SECONDS", "30")),
197197
"FD_ENABLE_REQUEST_DISCONNECT_STOP_INFERENCE": lambda: int(
198198
os.getenv("FD_ENABLE_REQUEST_DISCONNECT_STOP_INFERENCE", "1")

fastdeploy/output/token_processor.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -672,7 +672,7 @@ def _recycle_resources(self, task_id, index, task, result=None, is_prefill=False
672672
# TODO: Refine checking sending cache and do not keep waiting
673673
if time.time() - start_time > 30:
674674
llm_logger.warning(f"wait for sending cache, {task_id}")
675-
time.sleep(0.002)
675+
time.sleep(0.005)
676676
else:
677677
if envs.ENABLE_V1_KVCACHE_SCHEDULER:
678678
self.resource_manager.finish_requests_async(task_id)

fastdeploy/splitwise/splitwise_connector.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -267,7 +267,7 @@ def check_decode_allocated(self, task):
267267
return True, ""
268268

269269
while self.current_request_ids[task.request_id] == "init":
270-
time.sleep(0.001)
270+
time.sleep(0.005)
271271
if time.time() - start_time > envs.FD_PREFILL_WAIT_DECODE_RESOURCE_SECONDS:
272272
del self.current_request_ids[task.request_id]
273273
return False, "prefill waits for decode resource timeout"

0 commit comments

Comments
 (0)