Skip to content

Commit 77467e1

Browse files
wdunn001claude
andcommitted
Codec: address PR review — async dispatch, cached registry, hardened varint
Mirrors the sglang fork PR fixes (sgl-project/sglang#25544): 1. codec_frame.py: numpy-free LE-uint32 unpack now uses `struct.unpack('<NI', b)` (~10× faster than the per-element list comprehension) and rejects buffers whose length is not a multiple of 4 instead of silently corrupting. 2. codec_frame.py: `_decode_varint` gains bounds-check + shift-cap (35 bits = 5 bytes, the max uint32 varint width). Used by every length-delimited field decode including the packed `prompt_ids` loop. Malformed or malicious input fails fast with a clear ValueError instead of looping unbounded. 3. codec_dispatcher.py: add `dispatch_call_async`, a `asyncio.to_thread`-wrapping variant of `dispatch_call`. The sync form does a blocking `urllib.request.urlopen` POST that would freeze the event loop if called from an `async def` request handler. 4. openai/chat_completion/serving.py: cache the `ToolRegistry` on `OpenAIServingChat` rather than calling `ToolRegistry.from_env` per request. `from_env` performs blocking HTTP fetches against manifest URLs; the first request after process start pays the cost (off-loop via `asyncio.to_thread`) and every subsequent request reuses the cached registry. Same site now uses `dispatch_call_async` so tool dispatches don't block the worker either. Wire format unchanged. No new dependencies. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com> Signed-off-by: William Dunn <wdunn001@gmail.com>
1 parent 509e9b7 commit 77467e1

3 files changed

Lines changed: 75 additions & 6 deletions

File tree

vllm/entrypoints/codec_dispatcher.py

Lines changed: 24 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -197,8 +197,11 @@ def dispatch_call(
197197
) -> CodecToolResult:
198198
"""POST a CodecToolCall to ``tool.endpoint`` and decode the response.
199199
200-
Synchronous + stdlib-only. Engines that want async dispatch can
201-
wrap this in a thread or replace with their preferred HTTP client.
200+
Synchronous + stdlib-only. ASGI engines should NOT call this
201+
directly from inside the async request loop — `urlopen` blocks the
202+
event loop. Use `dispatch_call_async` instead (it wraps this in
203+
`asyncio.to_thread`). The sync form stays for non-async callers
204+
(CLI tools, batch eval drivers).
202205
"""
203206
import urllib.request
204207
call = CodecToolCall(
@@ -218,6 +221,24 @@ def dispatch_call(
218221
return decode_tool_result(body)
219222

220223

224+
async def dispatch_call_async(
225+
tool: RegisteredTool,
226+
arguments_json: str,
227+
call_id: str,
228+
) -> CodecToolResult:
229+
"""Async variant of :func:`dispatch_call`.
230+
231+
Runs the blocking urllib POST in a worker thread via
232+
`asyncio.to_thread` so it doesn't block the event loop. ASGI engines
233+
(vLLM's chat / completion serving paths) MUST use this variant —
234+
calling the sync `dispatch_call` from inside an `async def` request
235+
handler blocks every other in-flight request on the same uvicorn
236+
worker until the tool replies.
237+
"""
238+
import asyncio
239+
return await asyncio.to_thread(dispatch_call, tool, arguments_json, call_id)
240+
241+
221242
# ── Reinjection hook ───────────────────────────────────────────────────────
222243

223244

@@ -244,6 +265,7 @@ def reinject_ids_into_context(context_ids: list[int], reinject_ids: list[int]) -
244265
"ToolRegistry",
245266
"decode_tool_result",
246267
"dispatch_call",
268+
"dispatch_call_async",
247269
"encode_tool_call",
248270
"reinject_ids_into_context",
249271
]

vllm/entrypoints/codec_frame.py

Lines changed: 20 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -63,8 +63,15 @@ def _normalise_ids_to_list(ids: IdsLike) -> list[int]:
6363
if isinstance(ids, (bytes, bytearray, memoryview)):
6464
if _HAVE_NUMPY:
6565
return _np.frombuffer(bytes(ids), dtype="<u4").tolist()
66+
# Stdlib LE-uint32 unpack as the numpy-free fallback. `struct.unpack`
67+
# is ~10× faster than a Python list comprehension with
68+
# `int.from_bytes` — same shape, no per-element Python loop.
6669
b = bytes(ids)
67-
return [int.from_bytes(b[i:i + 4], "little") for i in range(0, len(b), 4)]
70+
if len(b) % 4 != 0:
71+
raise ValueError(
72+
f"codec_frame: bytes length {len(b)} is not a multiple of 4 (uint32 LE)"
73+
)
74+
return list(_struct.unpack(f"<{len(b) // 4}I", b))
6875
# Fall back to the Sequence path.
6976
return list(ids)
7077

@@ -121,14 +128,26 @@ def _varint(n: int) -> bytes:
121128

122129

123130
def _decode_varint(data: bytes, pos: int) -> tuple[int, int]:
131+
"""Decode a single protobuf varint at ``pos``.
132+
133+
Bounds-checked + shift-capped so malformed or malicious input fails
134+
fast with a ValueError instead of looping unbounded or producing a
135+
silently-wrong value. Cap is 35 bits — protobuf uint32 fits in <= 5
136+
bytes (5 * 7 = 35); a 6th continuation byte means the encoded value
137+
cannot represent a uint32 and we reject it.
138+
"""
124139
result = shift = 0
125140
while True:
141+
if pos >= len(data):
142+
raise ValueError("Codec: truncated varint in CodecRequest")
126143
b = data[pos]
127144
pos += 1
128145
result |= (b & 0x7F) << shift
129146
if not (b & 0x80):
130147
return result, pos
131148
shift += 7
149+
if shift > 35:
150+
raise ValueError("Codec: varint overflow in CodecRequest")
132151

133152

134153
def _encode_tool_call_msg(call: dict) -> bytes:

vllm/entrypoints/openai/chat_completion/serving.py

Lines changed: 31 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,7 @@
2323
CODEC_BOLT_ON_DISPATCH,
2424
ToolRegistry,
2525
dispatch_call,
26+
dispatch_call_async,
2627
reinject_ids_into_context,
2728
)
2829
from vllm.entrypoints.codec_frame import encode_frame
@@ -188,6 +189,15 @@ def __init__(
188189
self.supports_code_interpreter = False
189190
self.python_tool = None
190191

192+
# v0.5 #87: lazy-cached Codec ToolRegistry. `ToolRegistry.from_env`
193+
# performs blocking HTTP fetches against manifest URLs; doing that
194+
# per-request blocks the event loop and adds startup latency to
195+
# every tool-watcher stream. Resolve once per process on first use
196+
# and reuse the registry thereafter. Sentinel = "not yet attempted";
197+
# None after load = attempted but env not configured.
198+
self._codec_dispatcher_registry: ToolRegistry | None = None
199+
self._codec_dispatcher_registry_loaded: bool = False
200+
191201
def warmup(self) -> None:
192202
self.renderer.warmup(
193203
ChatParams(
@@ -1081,8 +1091,19 @@ async def chat_completion_binary_stream_generator(
10811091
# tool's response_ids back into the stream.
10821092
dispatcher_registry: ToolRegistry | None = None
10831093
if CODEC_BOLT_ON_DISPATCH and watcher is not None:
1084-
tokenizer_hash = getattr(self, "_codec_tokenizer_map_hash", "")
1085-
dispatcher_registry = ToolRegistry.from_env(tokenizer_hash)
1094+
# Cached on the serving instance — ToolRegistry.from_env makes
1095+
# blocking HTTP fetches against manifest URLs, so it must NEVER
1096+
# run inside the async request loop. First request after process
1097+
# start pays the fetch cost (off-loop via to_thread); subsequent
1098+
# requests reuse the cached registry.
1099+
if not self._codec_dispatcher_registry_loaded:
1100+
tokenizer_hash = getattr(self, "_codec_tokenizer_map_hash", "")
1101+
import asyncio as _asyncio
1102+
self._codec_dispatcher_registry = await _asyncio.to_thread(
1103+
ToolRegistry.from_env, tokenizer_hash
1104+
)
1105+
self._codec_dispatcher_registry_loaded = True
1106+
dispatcher_registry = self._codec_dispatcher_registry
10861107

10871108
try:
10881109
async for res in result_generator:
@@ -1119,7 +1140,14 @@ async def chat_completion_binary_stream_generator(
11191140
tool = dispatcher_registry.get(ev.name)
11201141
if tool is not None and tool.mode == "dispatch":
11211142
try:
1122-
result = dispatch_call(
1143+
# Use the async variant — sync `dispatch_call`
1144+
# does a blocking urllib POST that would
1145+
# freeze the event loop if called from this
1146+
# `async def`. `dispatch_call_async` wraps
1147+
# it in asyncio.to_thread so other
1148+
# concurrent requests on the worker keep
1149+
# flowing while the tool reply is in flight.
1150+
result = await dispatch_call_async(
11231151
tool,
11241152
arguments_json=ev.arguments_json,
11251153
call_id=ev.id or make_call_id(next_call_seq),

0 commit comments

Comments
 (0)