Skip to content
25 changes: 17 additions & 8 deletions lmdeploy/pytorch/engine/engine_loop.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
import asyncio
import logging
import time
from collections.abc import Callable
from dataclasses import dataclass
from typing import TYPE_CHECKING, Any, Optional

Expand Down Expand Up @@ -55,25 +56,32 @@ def clear(self):
class RunableEventAsync:
"""Awaitable async runable event."""

def __init__(self, scheduler: 'Scheduler'):
def __init__(self, scheduler: 'Scheduler', extra_runable_checker: Callable[[], bool] | None = None):
self.scheduler = scheduler
self.extra_runable_checker = extra_runable_checker
self.event = asyncio.Event()

def has_unfinished(self):
"""Check whether scheduler or engine-local state has runnable work."""
if self.scheduler.has_unfinished():
return True
return self.extra_runable_checker is not None and self.extra_runable_checker()

async def wait(self):
"""Wait event."""
await self.event.wait()

def set(self):
"""Set event."""
if self.scheduler.has_unfinished():
if self.has_unfinished():
self.event.set()
else:
self.event.clear()


def build_runable_event(scheduler: 'Scheduler'):
def build_runable_event(scheduler: 'Scheduler', extra_runable_checker: Callable[[], bool] | None = None):
"""Build runable event."""
return RunableEventAsync(scheduler)
return RunableEventAsync(scheduler, extra_runable_checker)


@dataclass
Expand Down Expand Up @@ -128,7 +136,9 @@ def __init__(self,
self.resp_queue = asyncio.Queue()
self.forward_event = CounterEvent()
self.migration_event = asyncio.Event()
self.has_runable_event = RunableEventAsync(self.scheduler)
# Active long-context chunks are owned by InputsMaker, not the
# scheduler WAITING/READY queues, so include them in the runnable gate.
self.has_runable_event = RunableEventAsync(self.scheduler, self.inputs_maker.has_pending_long_context_chunk)
# Sleep uses a small handshake with the scheduling loops:
# 1. sleep() sets _sleep_requested and waits for main/migration drain events.
# 2. main_loop and migration_loop reach safe boundaries, acknowledge
Expand Down Expand Up @@ -383,13 +393,12 @@ def __get_logprobs(batched_outputs: 'BatchedOutputs'):

async def _main_loop_try_send_next_inputs(self):
"""Try send next inputs."""
scheduler = self.scheduler
if not scheduler.has_unfinished():
if not self.has_runable_event.has_unfinished():
await self.has_runable_event.wait()
if self._sleep_requested:
return None, None

scheduler.collect_migration_done()
self.scheduler.collect_migration_done()
return await self.inputs_maker.send_next_inputs()

@staticmethod
Expand Down
284 changes: 265 additions & 19 deletions lmdeploy/pytorch/engine/inputs_maker.py

Large diffs are not rendered by default.

4 changes: 2 additions & 2 deletions lmdeploy/pytorch/engine/model_agent/agent.py
Original file line number Diff line number Diff line change
Expand Up @@ -626,11 +626,11 @@ def _prepare_inputs_prefill(
# for second round chat
self.step_inputs.reindex(delta)

if inputs.is_first_chunk or not inputs.is_chunk:
if inputs.is_first_chunk:
self._prev_chunk_output = None

# check long context
if self._prev_chunk_output is not None:
if inputs.is_chunk and self._prev_chunk_output is not None:
# update model metas
model_metas = self._prev_chunk_output.get('model_metas')
inputs.model_metas = model_metas
Expand Down
20 changes: 20 additions & 0 deletions lmdeploy/pytorch/envs.py
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,21 @@ def env_to_float(
return value


def env_to_choice(
env_var: str,
default: str,
choices: set | list,
):
"""Env to selected string."""
value = os.getenv(env_var)
if value is None:
return default
value = value.lower().strip()
if value not in choices:
raise ValueError(f"Invalid environment variable '{env_var}={value}'. Allowed values: {choices}")
return value


_ENVS = dict()


Expand Down Expand Up @@ -173,6 +188,11 @@ def _patched_get_env(
# fake capture flag for debug cudagraph padding behavior
fake_capture = env_to_bool('LMDEPLOY_FAKE_CUDA_GRAPH_CAPTURE', False)

# opt-ttft
opt_ttft_policy = env_to_choice('LMDEPLOY_PT_TTFT_POLICY', 'size', {'fifo', 'size'})
opt_ttft_short_turns = max(1, env_to_int('LMDEPLOY_PT_TTFT_SHORT_TURNS', 3))
opt_ttft_aging_sec = env_to_float('LMDEPLOY_PT_TTFT_AGING_SEC', 2.0)


def get_all_envs():
"""Get all environment variables."""
Expand Down
3 changes: 3 additions & 0 deletions lmdeploy/pytorch/messages.py
Original file line number Diff line number Diff line change
Expand Up @@ -698,6 +698,9 @@ class SchedulerSequence:
meta: Any = None
num_ignored_history: int = 0
model_meta: dict[str, Any] = None
# Exclusive absolute token limit for temporary KV ownership. Non-final
# long-context chunks use this to allocate only the computed prefix.
kv_token_limit: int | None = None

# For Disaggregation
migration_request: None | MigrationRequest = None
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,10 @@ class DefaultBlockManager(BaseBlockManager):
@classmethod
def num_required_blocks(cls, obj: SchedulerSequence, prealloc_size: int = 0):
"""Get num required blocks."""
num_tokens = obj.num_all_ids + prealloc_size
num_tokens = obj.num_all_ids
if obj.kv_token_limit is not None:
num_tokens = min(num_tokens, obj.kv_token_limit)
num_tokens += prealloc_size

num_all_blocks = _div_up(num_tokens, obj.block_size)
return max(0, num_all_blocks - len(obj.logical_blocks))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,13 @@ def num_required_blocks(self, obj: SchedulerSequence, prealloc_size: int = 0):
if obj.num_history_ids <= self.window_size:
return super().num_required_blocks(obj, prealloc_size)

return super().num_required_blocks(obj, prealloc_size) - obj.num_ignored_history // obj.block_size
# DefaultBlockManager applies kv_token_limit to the absolute token
# count. Sliding-window accounting then subtracts already-dropped
# history blocks so chunk-limited allocation grows only the retained
# window.
num_required_blocks = super().num_required_blocks(obj, prealloc_size)
num_required_blocks -= obj.num_ignored_history // obj.block_size
return max(0, num_required_blocks)

def can_allocate(self, msg: SchedulerSequence, prealloc_size: int = 0):
"""Return if physical block can be allocated for given message."""
Expand Down
2 changes: 2 additions & 0 deletions lmdeploy/pytorch/paging/block_trie.py
Original file line number Diff line number Diff line change
Expand Up @@ -1085,6 +1085,8 @@ def allocate(self, seq: SchedulerSequence):

num_matched = node.num_matched
num_valid_ids = seq.num_valid_ids
if seq.kv_token_limit is not None:
num_valid_ids = min(num_valid_ids, seq.kv_token_limit)

if num_matched + block_size > num_valid_ids:
return
Expand Down
Loading
Loading