Skip to content

Commit 485d3a1

Browse files
Add admission control to VLLMOpenAIModelClass (#1049)
Move the kv_cache-based admission control (VLLMMetricsPoller + three-state deadband check_admission) from per-model implementations into the base VLLMOpenAIModelClass, so subclasses only need to instantiate self._metrics_poller in load_model() to get admission control. Fails open when the poller is unset or stale. Co-authored-by: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
1 parent 6472745 commit 485d3a1

1 file changed

Lines changed: 84 additions & 1 deletion

File tree

clarifai/runners/models/vllm_openai_class.py

Lines changed: 84 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,10 +1,65 @@
1+
import re
12
import threading
2-
from typing import Iterator
3+
import time
4+
from typing import Iterator, Optional
35

46
import httpx
57
from clarifai_protocol import get_item_id, register_item_abort_callback
68

79
from clarifai.runners.models.openai_class import OpenAIModelClass
10+
from clarifai.utils.logging import logger
11+
12+
13+
class VLLMMetricsPoller:
14+
"""Polls vLLM /metrics in background; caches kv_cache usage.
15+
16+
Fail-open: if the poller has never succeeded or is stale, admission is allowed.
17+
"""
18+
19+
KV_CACHE_HIGH = 0.8
20+
KV_CACHE_LOW = 0.5
21+
STALE_AFTER_SECONDS = 5.0
22+
23+
def __init__(self, base_url: str, poll_interval: float = 5.0):
24+
self.base_url = base_url
25+
self.poll_interval = poll_interval
26+
self._kv_cache = 0.0
27+
self._lock = threading.Lock()
28+
self._last_success = time.time()
29+
30+
threading.Thread(target=self._poll_loop, daemon=True).start()
31+
logger.info(
32+
f"[VLLMMetricsPoller] Started polling {base_url}/metrics every {poll_interval}s"
33+
)
34+
35+
def _poll_loop(self):
36+
while True:
37+
try:
38+
resp = httpx.get(f"{self.base_url}/metrics", timeout=1.0)
39+
if resp.status_code == 200:
40+
kv_cache = self._parse(
41+
resp.text, r'vllm:kv_cache_usage_perc\{[^}]*\}\s+([\d.]+)'
42+
)
43+
with self._lock:
44+
self._kv_cache = kv_cache
45+
self._last_success = time.time()
46+
logger.info(f"[VLLMMetricsPoller] kv_cache={kv_cache:.2%}")
47+
except Exception as e:
48+
logger.warning(f"[VLLMMetricsPoller] Poll failed: {e}")
49+
time.sleep(self.poll_interval)
50+
51+
def _parse(self, text: str, pattern: str) -> float:
52+
m = re.search(pattern, text)
53+
return float(m.group(1)) if m else 0.0
54+
55+
def snapshot(self) -> float:
56+
with self._lock:
57+
return self._kv_cache
58+
59+
@property
60+
def is_stale(self) -> bool:
61+
with self._lock:
62+
return time.time() - self._last_success > self.STALE_AFTER_SECONDS
863

964

1065
class VLLMCancellationHandler:
@@ -91,6 +146,34 @@ def generate(self, prompt, ...) -> Iterator[str]:
91146

92147
server = None
93148
cancellation_handler = None
149+
_metrics_poller: Optional[VLLMMetricsPoller] = None
150+
151+
@property
152+
def admission_increase_delay(self) -> float:
153+
return 0.0
154+
155+
@property
156+
def admission_decrease_delay(self) -> float:
157+
return 0.0
158+
159+
def check_admission(self):
160+
"""Three-state deadband on vLLM kv_cache usage.
161+
162+
Returns False above HIGH (AIMD shrinks), True below LOW (AIMD grows),
163+
None in-between (AIMD holds). Fails open when the subclass has not
164+
initialized ``self._metrics_poller`` or when the poller is stale.
165+
"""
166+
poller = self._metrics_poller
167+
if poller is None or poller.is_stale:
168+
return True
169+
170+
kv_cache = poller.snapshot()
171+
if kv_cache > poller.KV_CACHE_HIGH:
172+
logger.info(f"[AdmissionControl] REJECT kv_cache={kv_cache:.2%}")
173+
return False
174+
if kv_cache < poller.KV_CACHE_LOW:
175+
return True
176+
return None
94177

95178
def handle_liveness_probe(self) -> bool:
96179
if self.server is None:

0 commit comments

Comments
 (0)