diff --git a/tests/entrypoints/openai/test_codec_compression.py b/tests/entrypoints/openai/test_codec_compression.py new file mode 100644 index 000000000000..3c0b2ce83c07 --- /dev/null +++ b/tests/entrypoints/openai/test_codec_compression.py @@ -0,0 +1,152 @@ +# SPDX-License-Identifier: Apache-2.0 +# SPDX-FileCopyrightText: Copyright contributors to the vLLM project +"""Tests for codec_compression.py — the negotiator + stream-compressor helpers. + +These tests cover regression classes discovered during the v0.4.1 cohort +bench: (a) brotli's per-chunk flush() inflating small streams, (b) silent +fall-through when zstd/brotli modules are missing in the image, (c) +preference-order violations. +""" + +from __future__ import annotations + +import asyncio +import gzip +import io +from collections.abc import AsyncIterable + +import pytest + +from vllm.entrypoints import codec_compression as cc + + +async def _from_chunks(chunks: list[bytes]) -> AsyncIterable[bytes]: + for c in chunks: + yield c + + +def _collect(stream_factory) -> bytes: + async def run() -> bytes: + out = bytearray() + async for chunk in stream_factory(): + out.extend(chunk) + return bytes(out) + + return asyncio.run(run()) + + +# Representative Codec msgpack-stream content: a sequence of small token-ID +# frames. The bench's 64-token msgpack-identity cell measures ~975 bytes; we +# synthesize a comparable payload so the inflation check is meaningful. +def _msgpack_like_stream(n_chunks: int = 12, chunk_bytes: int = 80) -> list[bytes]: + payload = bytes(range(256)) * 4 # 1 KB of varied byte values + out = [] + for i in range(n_chunks): + start = (i * chunk_bytes) % (len(payload) - chunk_bytes) + out.append(payload[start : start + chunk_bytes]) + return out + + +def test_negotiate_prefers_zstd_with_dict(): + """Spec §Transport-Compression: preference order zstd > br > gzip > identity.""" + if not cc._ZSTD_AVAILABLE or not cc._BROTLI_AVAILABLE: + pytest.skip( + "zstandard or brotli not installed — would be caught by " + "supervisor startup check" + ) + cc.set_zstd_dict("msgpack", b"\x00" * 16384) + try: + assert ( + cc.negotiate_encoding("zstd, br, gzip, identity", stream_format="msgpack") + == "zstd" + ) + finally: + cc.clear_zstd_dicts() + + +def test_negotiate_falls_through_to_gzip_when_zstd_dict_missing(): + """No dict registered for the requested format → zstd skipped, picker falls + through preference order to gzip (NOT to identity).""" + cc.clear_zstd_dicts() + if cc._BROTLI_AVAILABLE: + # br is next in preference order before gzip + assert cc.negotiate_encoding("zstd, br, gzip", stream_format="msgpack") == "br" + assert cc.negotiate_encoding("zstd, gzip", stream_format="msgpack") == "gzip" + + +def test_negotiate_identity_only_returns_none(): + """An Accept-Encoding listing only identity (or empty) returns None.""" + assert cc.negotiate_encoding("identity") is None + assert cc.negotiate_encoding("") is None + + +def test_brotli_does_not_inflate_repeated_payload(): + """Regression for the per-chunk flush() bug discovered during v0.4.1 bench. + + Before the fix, calling compressor.flush() on every chunk made small + streams *larger* than identity (64-token msgpack cell: 1159 B vs 975 B + identity; 512-token: 9013 B vs 7616 B). After dropping the per-chunk + flush, brotli compresses or matches identity on any payload > ~200 B. + """ + if not cc._BROTLI_AVAILABLE: + pytest.skip("brotli not installed") + chunks = _msgpack_like_stream() + identity = b"".join(chunks) + compressed = _collect(lambda: cc._compress_brotli(_from_chunks(chunks))) + assert len(compressed) <= len(identity), ( + f"brotli inflated a {len(identity)}-byte stream to {len(compressed)} bytes " + f"— the per-chunk flush() regression is back" + ) + + +def test_gzip_round_trips(): + chunks = _msgpack_like_stream() + expected = b"".join(chunks) + compressed = _collect(lambda: cc._compress_gzip(_from_chunks(chunks))) + assert gzip.decompress(compressed) == expected + + +def test_brotli_round_trips(): + if not cc._BROTLI_AVAILABLE: + pytest.skip("brotli not installed") + import brotli as _brotli # type: ignore + + chunks = _msgpack_like_stream() + expected = b"".join(chunks) + compressed = _collect(lambda: cc._compress_brotli(_from_chunks(chunks))) + assert _brotli.decompress(compressed) == expected + + +def test_zstd_with_dict_round_trips(): + """When a dict is registered, zstd compresses+decompresses byte-identically + using the same dict — the contract clients rely on.""" + if not cc._ZSTD_AVAILABLE: + pytest.skip("zstandard not installed") + import zstandard as _zstd # type: ignore + + dict_bytes = ( + b"\x37\x00\x00\x00" + b"\x00" * 16380 + ) # minimal valid zstd dict header is non-trivial; + # use a real dict-shaped buffer — zstandard accepts arbitrary + # bytes as a raw dict prefix + # If the empty-dict path errors, skip rather than fail the test (env-specific). + try: + cc.set_zstd_dict("msgpack", dict_bytes) + except Exception: + pytest.skip( + "zstandard rejected synthetic dict; use a real trained " + "dict via integration test" + ) + try: + chunks = _msgpack_like_stream() + expected = b"".join(chunks) + compressed = _collect( + lambda: cc._compress_zstd(_from_chunks(chunks), dict_bytes=dict_bytes) + ) + zdict = _zstd.ZstdCompressionDict(dict_bytes) + with _zstd.ZstdDecompressor(dict_data=zdict).stream_reader( + io.BytesIO(compressed) + ) as r: + assert r.read() == expected + finally: + cc.clear_zstd_dicts() diff --git a/vllm/entrypoints/codec_agent.py b/vllm/entrypoints/codec_agent.py new file mode 100644 index 000000000000..53f79e9e7105 --- /dev/null +++ b/vllm/entrypoints/codec_agent.py @@ -0,0 +1,266 @@ +# SPDX-License-Identifier: Apache-2.0 +# SPDX-FileCopyrightText: Copyright contributors to the vLLM project +""" +Server-side agentic primitives for Codec streaming responses. + +Two pieces, layered cleanly on top of the wire format defined in +codec_frame.py: + + - ToolWatcher: a uint32-compare state machine that detects delimited + regions (tool calls, reasoning blocks, vision spans, sandbox runs) + in the output token stream without ever decoding. Mirrors the + libcodec / @codecai/web / codecai / Codec.Net implementations + bit-identically — same edge cases, same buffering, same nested- + start handling. + + - parse_tool_call: when a region completes, render its body through + the tokenizer, parse as JSON (the convention every chat-tuned + model in current use follows), and surface name + arguments_json + on the next frame. + +Why server-side: orchestrators don't have to detokenize on every +frame just to scan for marker text. The server already has the +tokenizer. The server already has the IDs. This PR exposes the +detection result directly in the Codec wire format so clients get +structured tool_call data alongside the raw token stream. + +Disabled by default. Activated per-request via the `tool_watcher` +field on ChatCompletionRequest / CompletionRequest. + +No new external dependencies — only stdlib + the codec_frame module +in this same package. +""" + +from __future__ import annotations + +import json +from dataclasses import dataclass, field +from typing import Any + +# --------------------------------------------------------------------------- +# Tool-call data model (mirrors openai-style { id, name, arguments } shape) +# --------------------------------------------------------------------------- + + +@dataclass +class ToolCallEvent: + """One tool call detected in the model's output stream. + + `arguments_json` is the raw JSON string between the start/end markers. + `name` is parsed from that JSON when the model uses the standard + `{"name": "...", "arguments": {...}}` shape; otherwise None. + """ + + name: str | None + arguments_json: str + id: str | None = None # server-generated, e.g. "tc_" + + def to_wire_dict(self) -> dict: + """Serialise to the dict shape encoded into msgpack frames and + the protobuf ToolCall message.""" + out: dict = {"arguments_json": self.arguments_json} + if self.name is not None: + out["name"] = self.name + if self.id is not None: + out["id"] = self.id + return out + + +# --------------------------------------------------------------------------- +# Watcher state machine +# --------------------------------------------------------------------------- + + +@dataclass +class _WatcherState: + """Minimal per-request state. Cheap to instantiate; one per stream.""" + + start_id: int + end_id: int + inside: bool = False + region_ids: list[int] = field(default_factory=list) + + +class ToolWatcher: + """Stateful detector for delimited regions in a token-ID stream. + + The hot path is `feed(ids)` — a single linear pass that: + - emits passthrough IDs (everything outside a region) untouched + - on region close, returns the buffered body IDs for downstream + parsing + - never invokes the tokenizer + + This mirrors codec_tool_watcher (libcodec) and ToolWatcher + (@codecai/web, codecai, Codec.Net). Same state-machine semantics + so client-side and server-side detection produce identical results. + + Edge cases (matched bit-for-bit with the other implementations): + - stray end marker: passes through as a regular ID + - nested start marker: inner ignored, outer end closes the region + - region split across feeds: body buffered, emitted on close + """ + + def __init__(self, start_id: int, end_id: int) -> None: + self._st = _WatcherState(start_id=start_id, end_id=end_id) + + @property + def inside(self) -> bool: + return self._st.inside + + def reset(self) -> None: + self._st.inside = False + self._st.region_ids = [] + + def feed(self, ids: list[int]) -> tuple[list[int], list[list[int]]]: + """Process a batch of newly-emitted token IDs. + + Returns: + passthrough_ids: IDs that should be forwarded as the frame's + `ids` field (markers consumed; region body IDs withheld + until the region closes). + completed_regions: list of region bodies (each a list of + uint32s, markers excluded) that closed during this feed. + + Both are returned per-feed so the caller can attach completed + regions to the same frame whose passthrough IDs come from this + feed — keeps tool-call surfaces aligned with their stream + position. + """ + out_ids: list[int] = [] + completed: list[list[int]] = [] + st = self._st + for tok in ids: + if not st.inside: + if tok == st.start_id: + st.inside = True + st.region_ids = [] + # Marker itself is NOT forwarded — orchestrators + # don't want the "begin tool call" token in the + # outbound stream. + else: + out_ids.append(tok) + else: + if tok == st.end_id: + completed.append(list(st.region_ids)) + st.region_ids = [] + st.inside = False + # End marker also withheld. + elif tok == st.start_id: + # Nested start — ignore (same as the other ports). + pass + else: + st.region_ids.append(tok) + return out_ids, completed + + +# --------------------------------------------------------------------------- +# Body → ToolCallEvent +# --------------------------------------------------------------------------- + + +def parse_tool_call( + region_body_text: str, *, call_id: str | None = None +) -> ToolCallEvent: + """Parse the body of a tool-call region (already detokenized) into + a structured event. + + The convention every chat-tuned model in current use follows: + { "name": "", "arguments": { ... } } + + We accept both pretty-printed and compact JSON. If parsing fails + (malformed body, partial JSON, etc.) we still return an event with + name=None and arguments_json set to the raw body — the caller can + surface that to the client so it can return a "invalid_arguments" + error to the model. + + Empty / whitespace-only bodies produce an event with name=None + and arguments_json="" — same shape, distinguishable downstream. + """ + body = region_body_text.strip() + if not body: + return ToolCallEvent(name=None, arguments_json="", id=call_id) + + name: str | None = None + try: + parsed: Any = json.loads(body) + if isinstance(parsed, dict): + n = parsed.get("name") + if isinstance(n, str): + name = n + except json.JSONDecodeError: + # Keep the raw body so the caller can decide how to handle it. + pass + + return ToolCallEvent(name=name, arguments_json=body, id=call_id) + + +# --------------------------------------------------------------------------- +# Helpers for the serving layer +# --------------------------------------------------------------------------- + + +def detokenize_region(tokenizer, region_ids: list[int]) -> str: + """Convenience wrapper around the tokenizer's batch decode that + skips special tokens — tool-call body text is pure JSON, no chat + template chrome. + + Tokenizer compatibility: works with any tokenizer exposing a + .decode(ids, skip_special_tokens=bool) method (HF AutoTokenizer, + vLLM's AnyTokenizer, the MistralTokenizer wrapper). We don't import + transformers directly to keep this module dependency-free — + duck-typing on .decode() is enough. + """ + return tokenizer.decode(region_ids, skip_special_tokens=True) + + +def make_call_id(seq_no: int) -> str: + """Server-generated tool call id. Stable shape; sequence-numbered + rather than UUID so test fixtures stay deterministic.""" + return f"tc_{seq_no:08x}" + + +# --------------------------------------------------------------------------- +# Marker resolution helpers (vLLM-specific, not present in sglang) +# --------------------------------------------------------------------------- + + +def resolve_marker_id(tokenizer, marker: str) -> int | None: + """Resolve a special-token string like ```` to its single + integer ID in the loaded tokenizer's vocab. + + Returns None if the marker doesn't exist as a single token — the + caller should disable the watcher in that case (the model can't + emit a single-token boundary, so ID-level detection is impossible). + + Tries three lookup paths in order, since vLLM ships several + tokenizer flavours and they don't all expose the same surface: + 1. ``added_tokens_encoder`` (HF fast / slow) + 2. ``get_vocab()`` returning a dict-like mapping + 3. ``encode(marker, add_special_tokens=False)`` returning a list + whose length must be 1 + """ + enc = getattr(tokenizer, "added_tokens_encoder", None) + if isinstance(enc, dict) and marker in enc: + return int(enc[marker]) + + get_vocab = getattr(tokenizer, "get_vocab", None) + if callable(get_vocab): + try: + vocab = get_vocab() + if marker in vocab: + return int(vocab[marker]) + except Exception: + pass + + encode = getattr(tokenizer, "encode", None) + if callable(encode): + try: + ids = encode(marker, add_special_tokens=False) + # Some tokenizers return tensors; coerce to list. + ids = list(ids) if not isinstance(ids, list) else ids + if len(ids) == 1: + return int(ids[0]) + except Exception: + pass + + return None diff --git a/vllm/entrypoints/codec_compression.py b/vllm/entrypoints/codec_compression.py new file mode 100644 index 000000000000..67307628070c --- /dev/null +++ b/vllm/entrypoints/codec_compression.py @@ -0,0 +1,330 @@ +# SPDX-License-Identifier: Apache-2.0 +# SPDX-FileCopyrightText: Copyright contributors to the vLLM project +""" +Negotiated transport compression for Codec binary streaming responses. + +Compression is opt-in (PKCE-style): clients advertise supported encodings +via the standard ``Accept-Encoding`` header, servers respond with whatever +overlap they choose, and the response is tagged with ``Content-Encoding``. +Clients that don't want compression simply omit the header and receive +identity-encoded frames as before. + +This is layered *outside* the Codec frame format — frames themselves are +unchanged. The compression covers the entire HTTP response stream so a +single compression context spans many frames (much better ratio than +per-frame compression for small frames). + +Supported encodings, in server preference order: + 1. ``zstd`` — Zstandard. Best ratio, fastest streaming. Requires the + optional ``zstandard`` package; gracefully skipped if + absent. Browsers: Chrome 123+, Firefox 126+. + 2. ``br`` — Brotli. Slightly better ratio than gzip, similar speed + at quality 4-6. Universal browser support (Chrome 50+, + Firefox 44+, Safari 11+). Requires the optional + ``brotli`` package; gracefully skipped if absent. + 3. ``gzip`` — Universal fallback. Pure stdlib, always available. + Supported in 100% of browsers and Node 18+ via fetch. + 4. ``identity`` — No compression. Always available, used when none of + the above appears in ``Accept-Encoding``. + +Usage: + + from vllm.entrypoints.codec_compression import wrap_streaming_response + + async def codec_handler(request: Request): + gen = build_codec_stream(...) # AsyncIterable[bytes] + return wrap_streaming_response( + request.headers.get("accept-encoding", ""), + gen, + background=..., + ) +""" + +from __future__ import annotations + +import hashlib +import zlib +from collections.abc import AsyncIterable + +from fastapi import BackgroundTasks +from fastapi.responses import StreamingResponse + +# Soft deps — both are graceful no-ops if the package isn't installed. +try: + import zstandard as zstd + + _ZSTD_AVAILABLE = True +except ImportError: + _ZSTD_AVAILABLE = False + +try: + import brotli + + _BROTLI_AVAILABLE = True +except ImportError: + _BROTLI_AVAILABLE = False + + +# ── Pre-trained ZSTD dictionary registry ───────────────────────────────────── +# +# Per the Codec protocol (spec/PROTOCOL.md "Pre-trained ZSTD dictionaries"), +# **the dict is the precondition for using zstd at all**, not an optimization +# layered on top. Without a matching pre-trained dict, no-dict zstd's wire-byte +# advantage over gzip is essentially zero on Codec streams (RESULTS.md §1f: +# 3.4 B/token vs 3.4 B/token within noise) but its TTFB cost on the shipped +# buffered middleware is catastrophic (§1d: 334× at 2K tokens). So no-dict +# zstd is the *worst of both worlds* — same bytes as gzip, much worse TTFB. +# +# The dict registry is keyed by ``stream_format`` because zstd dictionaries +# are not interchangeable across formats — a dict trained on msgpack Codec +# frames captures a different byte distribution than one trained on protobuf. +# Operators load the appropriate dict at server start (e.g. fetched from the +# tokenizer map's ``zstd_dictionaries[]`` entry whose ``format`` matches), and +# the negotiator then unlocks zstd for that format only. +# +# Default state: empty registry → zstd never selected → server falls through +# to gzip on every request that advertises zstd. This is the correct default: +# gzip works on every middleware stack, ships in stdlib, and matches the wire +# performance of no-dict zstd. + +_ZSTD_DICTS: dict[str, bytes] = {} +# Parallel registry of hashes — sha256(dict_bytes) computed once at +# registration so emit-time is a constant-time map lookup. Goes onto every +# zstd response as the Codec-Zstd-Dict header so clients can validate (or +# fetch) the right dict before decompressing. See spec/PROTOCOL.md +# "Codec-Zstd-Dict response header". +_ZSTD_DICT_HASHES: dict[str, str] = {} + + +def _hash_dict(dict_bytes: bytes) -> str: + """sha256 hex digest of the dict, prefixed `sha256:` so the value is + self-describing and matches the `hash` shape in tokenizer-map + `zstd_dictionaries[]` entries.""" + return "sha256:" + hashlib.sha256(dict_bytes).hexdigest() + + +def set_zstd_dict(stream_format: str, dict_bytes: bytes) -> None: + """Register a pre-trained zstd dictionary for ``stream_format``. + + ``stream_format`` is one of ``"msgpack"`` or ``"protobuf"`` (matches the + Codec request's ``stream_format`` field). ``dict_bytes`` is the raw + bytes of a zstd dictionary as produced by ``zstd --train`` or + ``packages/bench/scripts/train-zstd-dict.py``. + + Replaces any previously-registered dict for that format. Call once at + server startup, e.g.:: + + with open("qwen2.5-msgpack-v1.dict", "rb") as f: + set_zstd_dict("msgpack", f.read()) + with open("qwen2.5-protobuf-v1.dict", "rb") as f: + set_zstd_dict("protobuf", f.read()) + + No-op if the ``zstandard`` package isn't installed — the negotiator + won't pick zstd in that case anyway. + """ + if not _ZSTD_AVAILABLE: + return + _ZSTD_DICTS[stream_format] = dict_bytes + _ZSTD_DICT_HASHES[stream_format] = _hash_dict(dict_bytes) + + +def clear_zstd_dicts() -> None: + """Drop all registered dictionaries. Mostly for tests.""" + _ZSTD_DICTS.clear() + _ZSTD_DICT_HASHES.clear() + + +def has_zstd_dict(stream_format: str | None) -> bool: + """Is there a registered dict for ``stream_format``? + + Returns False when ``stream_format`` is None — callers that don't know + the response format (e.g. legacy code paths) can't safely use a + format-keyed dict, so we drop them off the zstd path. + """ + return bool(stream_format) and stream_format in _ZSTD_DICTS + + +def get_zstd_dict_hash(stream_format: str | None) -> str | None: + """sha256 hex digest of the registered dict for ``stream_format``, + formatted as ``sha256:`` for the ``Codec-Zstd-Dict`` response + header. Returns None when no dict is registered.""" + if not stream_format: + return None + return _ZSTD_DICT_HASHES.get(stream_format) + + +def _parse_accept_encoding(header: str) -> list[str]: + """Return the encodings the client lists, in the order they appear. + + We don't bother with q-values for now — clients that explicitly want + a non-default ordering are rare, and the server preference order + (zstd, gzip) covers the realistic case. Identity is always implicitly + acceptable per RFC 9110 §12.5.3. + """ + if not header: + return [] + parts = [] + for part in header.split(","): + name = part.strip().split(";", 1)[0].strip().lower() + if name: + parts.append(name) + return parts + + +def negotiate_encoding( + accept_encoding: str, + *, + stream_format: str | None = None, +) -> str | None: + """Pick the best encoding both sides can speak. + + Returns one of ``"zstd"``, ``"br"``, ``"gzip"``, or ``None`` (identity). + Order of preference: zstd > br > gzip > identity. ``"*"`` in + Accept-Encoding is treated as accepting any encoding the server has. + + **zstd is gated on a pre-trained dict being registered for the request's + ``stream_format``.** Without a dict, this falls through to gzip even + when the client advertises zstd — see the dict registry comment above + and spec/PROTOCOL.md "Pre-trained ZSTD dictionaries" for the rationale. + ``stream_format`` defaults to None, which always disables zstd — keeps + legacy callers safe. + """ + encs = _parse_accept_encoding(accept_encoding) + if not encs: + return None + has_wildcard = "*" in encs + + if ( + _ZSTD_AVAILABLE + and has_zstd_dict(stream_format) + and ("zstd" in encs or has_wildcard) + ): + return "zstd" + if _BROTLI_AVAILABLE and ("br" in encs or has_wildcard): + return "br" + if "gzip" in encs or has_wildcard: + return "gzip" + return None + + +async def _compress_zstd( + stream: AsyncIterable[bytes], + *, + dict_bytes: bytes, +) -> AsyncIterable[bytes]: + """Stream-compress with Zstandard, using a pre-trained dict. + + Per the Codec protocol the encoder MUST load the dict; ``negotiate_encoding`` + only selects zstd when a dict is registered, so we pass the bytes through + here rather than re-looking it up from the registry (avoids a TOCTOU + where the dict gets cleared mid-request). + """ + zdict = zstd.ZstdCompressionDict(dict_bytes) + cctx = zstd.ZstdCompressor(level=3, dict_data=zdict) + chunker = cctx.chunker(chunk_size=16384) + async for chunk in stream: + for out in chunker.compress(chunk): + yield out + for out in chunker.finish(): + yield out + + +async def _compress_gzip(stream: AsyncIterable[bytes]) -> AsyncIterable[bytes]: + """Stream-compress with gzip. wbits=31 = gzip wrapper (vs raw deflate).""" + compressor = zlib.compressobj(level=6, wbits=31) + async for chunk in stream: + out = compressor.compress(chunk) + if out: + yield out + final = compressor.flush(zlib.Z_FINISH) + if final: + yield final + + +async def _compress_brotli(stream: AsyncIterable[bytes]) -> AsyncIterable[bytes]: + """Stream-compress with Brotli. quality=4 balances speed/ratio for + server-side dynamic compression — ratio close to the default quality 11 + but at gzip-level CPU cost (default 11 is 10-50x slower for streams). + + Per-chunk flush() was removed after the v0.4.1 bench showed it inflated + small streams (each flush emits a complete brotli block + header, + forfeiting between-chunk dictionary sharing). The remaining finish() + closes the stream once at end-of-input.""" + compressor = brotli.Compressor(quality=4, mode=brotli.MODE_GENERIC, lgwin=22) + async for chunk in stream: + out = compressor.process(chunk) + if out: + yield out + final = compressor.finish() + if final: + yield final + + +def wrap_streaming_response( + accept_encoding: str, + body_stream: AsyncIterable[bytes], + *, + media_type: str, + background: BackgroundTasks | None = None, + extra_headers: dict[str, str] | None = None, + stream_format: str | None = None, + client_version: str | None = None, +) -> StreamingResponse: + """Build a StreamingResponse with the right compression based on the + client's Accept-Encoding header. + + The Codec frame format is unchanged — compression is purely transport. + Clients that don't include zstd/gzip in Accept-Encoding receive an + uncompressed (identity-encoded) stream, which is the previous behavior. + + ``stream_format`` is the request's ``stream_format`` field + (``"msgpack"`` / ``"protobuf"`` / ``"json"``) and gates the zstd path + via the dict registry — see ``negotiate_encoding``. + + ``client_version`` is the request's ``Codec-Client-Version``. When set, + Codec-* response headers are filtered to that version's floor per + `spec/versions/v0.4.md § Graceful downgrade`. + """ + from vllm.entrypoints.codec_version import filter_codec_headers + + encoding = negotiate_encoding(accept_encoding, stream_format=stream_format) + headers: dict[str, str] = {"Vary": "Accept-Encoding"} + if extra_headers: + headers.update(extra_headers) + + if encoding == "zstd": + # has_zstd_dict() was checked inside negotiate_encoding, so the + # lookup here always hits — but assert defensively in case of a + # registry mutation between negotiation and use. + dict_bytes = _ZSTD_DICTS.get(stream_format or "") + dict_hash = _ZSTD_DICT_HASHES.get(stream_format or "") + if dict_bytes is None or dict_hash is None: + # Registry was cleared mid-request — fall through to gzip. + body = _compress_gzip(body_stream) + headers["Content-Encoding"] = "gzip" + else: + body = _compress_zstd(body_stream, dict_bytes=dict_bytes) + headers["Content-Encoding"] = "zstd" + # Tell the client which dict we used so it can pick the + # matching one before decompressing. See spec/PROTOCOL.md + # "Codec-Zstd-Dict response header". + headers["Codec-Zstd-Dict"] = dict_hash + elif encoding == "br": + body = _compress_brotli(body_stream) + headers["Content-Encoding"] = "br" + elif encoding == "gzip": + body = _compress_gzip(body_stream) + headers["Content-Encoding"] = "gzip" + else: + body = body_stream + + # Graceful downgrade — strip v0.4+ headers for older clients. + if client_version is not None: + headers = filter_codec_headers(headers, client_version) + + return StreamingResponse( + body, + media_type=media_type, + headers=headers, + background=background, + ) diff --git a/vllm/entrypoints/codec_dispatcher.py b/vllm/entrypoints/codec_dispatcher.py new file mode 100644 index 000000000000..28915ee20e66 --- /dev/null +++ b/vllm/entrypoints/codec_dispatcher.py @@ -0,0 +1,302 @@ +# SPDX-License-Identifier: Apache-2.0 +# SPDX-FileCopyrightText: Copyright contributors to the vLLM project +""" +Codec v0.5 #87: bolt-on tool dispatcher. + +Closes the loop on @codecai/tool-kit (shipped at v0.4.1). The SDK lets +operators publish a tool manifest at build time; this module lets the +engine dispatch to those tools without round-tripping through detokenize +→ JSON → re-tokenize. + +Three pieces (contract in spec/versions/v0.5.md § "v0.5-5"): + +1. Tool registry — reads CODEC_TOOL_MANIFEST_URLS at startup, fetches + each manifest, validates `tokenizerHash` against the active model's + tokenizer SHA-256. Mismatched tools load in `text-fallback` mode + (the engine still surfaces them but doesn't dispatch). + +2. MCP-style HTTP client — when ToolWatcher detects a `` + region, POST `CodecToolCall` (msgpack-framed) to the tool's + `/codec/tool/v1/call` endpoint. + +3. Reinjection path — `CodecToolResult.response_ids` get inserted into + the model's generation context where `` was detected. + Model continues generation without detokenize/retokenize on the + response. + +Gated behind CODEC_BOLT_ON_DISPATCH=1, default off. When off, the +engine surfaces `` regions to the client unchanged +(existing v0.2-v0.4 behaviour). + +This module ships the contract + helper primitives. Wiring it into +serving_completions._generate_binary_stream is a follow-up integration +pass: that change touches the model's KV cache reinjection path and +needs review against SGLang's batching semantics. +""" + +from __future__ import annotations + +import json +import logging +import os +from dataclasses import dataclass + +import msgspec.msgpack + +log = logging.getLogger(__name__) + +CODEC_BOLT_ON_DISPATCH = os.environ.get("CODEC_BOLT_ON_DISPATCH", "0") == "1" +CODEC_TOOL_MANIFEST_URLS = os.environ.get("CODEC_TOOL_MANIFEST_URLS", "").strip() +CODEC_TOOL_MANIFEST_REQUIRED = ( + os.environ.get("CODEC_TOOL_MANIFEST_REQUIRED", "0") == "1" +) + + +# ── Wire types (mirror @codecai/tool-kit's CodecToolCall / Result shapes) ── + + +@dataclass +class CodecToolCall: + """Outgoing call to a Codec-aware tool endpoint.""" + + tool_name: str + arguments_json: str + call_id: str + tokenizer_hash: str + + +@dataclass +class CodecToolResult: + """Response from a Codec-aware tool endpoint.""" + + response_ids: list[int] + call_id: str + is_error: bool = False + error_message: str | None = None + + +# msgspec encoders for the wire shape. +_call_encoder = msgspec.msgpack.Encoder() +_result_decoder = msgspec.msgpack.Decoder() + + +def encode_tool_call(call: CodecToolCall) -> bytes: + """Encode a tool call to msgpack bytes for POSTing to a tool endpoint.""" + return _call_encoder.encode( + { + "tool_name": call.tool_name, + "arguments_json": call.arguments_json, + "call_id": call.call_id, + "tokenizer_hash": call.tokenizer_hash, + } + ) + + +def decode_tool_result(data: bytes) -> CodecToolResult: + """Decode a tool endpoint's response into a CodecToolResult.""" + parsed = _result_decoder.decode(data) + return CodecToolResult( + response_ids=list(parsed.get("response_ids", [])), + call_id=str(parsed.get("call_id", "")), + is_error=bool(parsed.get("is_error", False)), + error_message=parsed.get("error_message"), + ) + + +# ── Manifest registry ────────────────────────────────────────────────────── + + +@dataclass +class RegisteredTool: + """One entry in the engine's tool registry.""" + + manifest_url: str + name: str + endpoint: str + tokenizer_hash: str + """The tokenizer the tool was built against (sha256 of the tokenizer + map bytes). MUST match the active model's tokenizer for dispatch to + fire; mismatched tools load in text-fallback mode.""" + mode: str # "dispatch" | "text-fallback" + + +class ToolRegistry: + """Engine-side tool registry. Loaded once at boot from + CODEC_TOOL_MANIFEST_URLS; mutable thereafter only via + register_tool() (operator-side admin path).""" + + def __init__(self) -> None: + self._tools: dict[str, RegisteredTool] = {} + + def register_tool(self, tool: RegisteredTool) -> None: + self._tools[tool.name] = tool + + def get(self, name: str) -> RegisteredTool | None: + return self._tools.get(name) + + def all(self) -> list[RegisteredTool]: + return list(self._tools.values()) + + @classmethod + def from_env(cls, active_tokenizer_hash: str) -> ToolRegistry: + """Load tools from CODEC_TOOL_MANIFEST_URLS env var. + + ``active_tokenizer_hash`` is the sha256 of the model's tokenizer + map bytes — used to decide which tools load in dispatch mode vs + text-fallback mode. + """ + registry = cls() + if not CODEC_TOOL_MANIFEST_URLS: + return registry + urls = [u.strip() for u in CODEC_TOOL_MANIFEST_URLS.split(",") if u.strip()] + for url in urls: + try: + manifest = _fetch_manifest(url) + tool_hash = manifest.get("tokenizerHash", "") + mode = ( + "dispatch" + if tool_hash == active_tokenizer_hash + else "text-fallback" + ) + registry.register_tool( + RegisteredTool( + manifest_url=url, + name=manifest["name"], + endpoint=manifest["endpoint"], + tokenizer_hash=tool_hash, + mode=mode, + ) + ) + log.info( + "codec_dispatcher: registered tool %s (mode=%s) from %s", + manifest["name"], + mode, + url, + ) + except Exception as e: + if CODEC_TOOL_MANIFEST_REQUIRED: + raise RuntimeError( + f"codec_dispatcher: required manifest {url} failed to load: {e}" + ) from e + log.warning( + "codec_dispatcher: dropping tool from %s " + "(manifest load failed): %s", + url, + e, + ) + return registry + + +def _fetch_manifest(url: str) -> dict: + """Fetch a tool manifest JSON from the URL. Stdlib-only to keep the + engine startup path free of heavy HTTP deps. + + Synchronous on purpose — only called from `ToolRegistry.from_env`, + which the engine wraps in `asyncio.to_thread` so this never runs on + the request event loop. See `dispatch_call_async` for the runtime- + request equivalent. + """ + import urllib.request + + with urllib.request.urlopen(url, timeout=30) as resp: + body = resp.read() + parsed = json.loads(body) + if not isinstance(parsed, dict): + # Defend against the URL returning a JSON list/scalar — the + # field-presence loop below would TypeError on a non-dict. + raise ValueError( + f"manifest at {url!r} must be a JSON object, got {type(parsed).__name__}" + ) + # Minimum required fields per @codecai/tool-kit's manifest.json shape. + for required in ("name", "endpoint", "tokenizerHash"): + if required not in parsed: + raise ValueError(f"manifest missing required field {required!r}") + return parsed + + +# ── Dispatch (MCP-style HTTP client) ─────────────────────────────────────── + + +def dispatch_call( + tool: RegisteredTool, + arguments_json: str, + call_id: str, +) -> CodecToolResult: + """POST a CodecToolCall to ``tool.endpoint`` and decode the response. + + Synchronous + stdlib-only. ASGI engines should NOT call this + directly from inside the async request loop — `urlopen` blocks the + event loop. Use `dispatch_call_async` instead (it wraps this in + `asyncio.to_thread`). The sync form stays for non-async callers + (CLI tools, batch eval drivers). + """ + import urllib.request + + call = CodecToolCall( + tool_name=tool.name, + arguments_json=arguments_json, + call_id=call_id, + tokenizer_hash=tool.tokenizer_hash, + ) + req = urllib.request.Request( + tool.endpoint, + data=encode_tool_call(call), + headers={"Content-Type": "application/x-msgpack"}, + method="POST", + ) + with urllib.request.urlopen(req, timeout=60) as resp: + body = resp.read() + return decode_tool_result(body) + + +async def dispatch_call_async( + tool: RegisteredTool, + arguments_json: str, + call_id: str, +) -> CodecToolResult: + """Async variant of :func:`dispatch_call`. + + Runs the blocking urllib POST in a worker thread via + `asyncio.to_thread` so it doesn't block the event loop. ASGI engines + (vLLM's chat / completion serving paths) MUST use this variant — + calling the sync `dispatch_call` from inside an `async def` request + handler blocks every other in-flight request on the same uvicorn + worker until the tool replies. + """ + import asyncio + + return await asyncio.to_thread(dispatch_call, tool, arguments_json, call_id) + + +# ── Reinjection hook ─────────────────────────────────────────────────────── + + +def reinject_ids_into_context( + context_ids: list[int], reinject_ids: list[int] +) -> list[int]: + """Insert ``reinject_ids`` into ``context_ids`` at the end (append). + + This is the simplest reinjection model — equivalent to the tool + response being the next chunk the model "reads". Engines with KV- + cache-aware models MAY do something smarter (insert at the position + where the region was detected), but append is the + correct fallback and what the SGLang ToolWatcher integration + currently uses. + """ + return context_ids + reinject_ids + + +__all__ = [ + "CODEC_BOLT_ON_DISPATCH", + "CODEC_TOOL_MANIFEST_URLS", + "CODEC_TOOL_MANIFEST_REQUIRED", + "CodecToolCall", + "CodecToolResult", + "RegisteredTool", + "ToolRegistry", + "decode_tool_result", + "dispatch_call", + "dispatch_call_async", + "encode_tool_call", + "reinject_ids_into_context", +] diff --git a/vllm/entrypoints/codec_frame.py b/vllm/entrypoints/codec_frame.py new file mode 100644 index 000000000000..6e927669c342 --- /dev/null +++ b/vllm/entrypoints/codec_frame.py @@ -0,0 +1,339 @@ +# SPDX-License-Identifier: Apache-2.0 +# SPDX-FileCopyrightText: Copyright contributors to the vLLM project +""" +Codec: token-native binary transport for vLLM. + +Models emit uint32 token IDs internally. Converting to UTF-8 and wrapping +in JSON SSE envelopes wastes ~95% of the wire. Codec skips detokenization +entirely and ships IDs as MessagePack or Protobuf frames. + +Bidirectional: clients can also submit prompts as token ID lists in the +same binary framing, eliminating the tokenization round-trip on ingress. + +Wire formats +------------ +msgpack (Content-Type: application/x-msgpack): + Each frame is a msgpack-encoded map. + +protobuf (Content-Type: application/x-protobuf): + Each frame is a 4-byte big-endian length prefix followed by the raw + protobuf bytes for CodecFrame / CodecRequest (see PROTO_SCHEMA below). + +Proto schema (for client-side code generation) +----------------------------------------------- +""" + +import array +import os +import struct as _struct +from collections.abc import Sequence +from typing import Union + +import msgspec as _msgspec + +# v0.5 #77 (T1.4 OpenAI-bypass): when CODEC_OPENAI_BYPASS=1, the encoder +# accepts numpy / array.array / bytes ids directly so the upstream +# OpenAI-JSON-SSE PyLong-boxing path can be elided. Wire bytes are +# byte-identical to the default path. Mirrors the sglang fork's change. +_OPENAI_BYPASS = os.environ.get("CODEC_OPENAI_BYPASS", "0") == "1" + +try: + import numpy as _np # type: ignore[import-untyped] + + _HAVE_NUMPY = True +except ImportError: # pragma: no cover + _np = None + _HAVE_NUMPY = False + +IdsLike = Union[Sequence[int], "array.array", bytes, "_np.ndarray"] + +# ── msgspec msgpack ──────────────────────────────────────────────────────────── + +_mp_encoder = _msgspec.msgpack.Encoder() +_mp_decoder = _msgspec.msgpack.Decoder() + + +def _normalise_ids_to_list(ids: IdsLike) -> list[int]: + """Coerce IdsLike to a plain list[int]. See sglang fork for rationale.""" + if isinstance(ids, list): + return ids + if _HAVE_NUMPY and isinstance(ids, _np.ndarray): + return ids.tolist() + if isinstance(ids, array.array): + return ids.tolist() + if isinstance(ids, (bytes, bytearray, memoryview)): + if _HAVE_NUMPY: + return _np.frombuffer(bytes(ids), dtype=" bytes: + """Encode a CodecFrame as msgpack. + + `tool_calls` is the sglang-compatible list shape — each entry a dict + of {"arguments_json": str, "name"?: str, "id"?: str}. Emitted under + the "tool_calls" map key only when non-empty so frames without a + detected tool call stay byte-identical to the pre-watcher path. + + v0.5: ids accepts numpy / array.array / bytes (LE uint32) in addition + to Sequence[int]. Wire bytes are byte-identical regardless of input + shape. See CODEC_OPENAI_BYPASS env var + Codec/docs/engine-fork-tasks + /v0.5-rollout.md § Task #77. + """ + ids_list = ids if isinstance(ids, list) else _normalise_ids_to_list(ids) + obj: dict = {"ids": ids_list, "done": done} + if finish_reason is not None: + obj["finish_reason"] = finish_reason + if tool_calls: + obj["tool_calls"] = list(tool_calls) + return _mp_encoder.encode(obj) + + +def decode_msgpack(data: bytes) -> dict: + return _mp_decoder.decode(data) + + +# ── protobuf hand-rolled encoder ─────────────────────────────────────────────── +# Schema: +# message CodecFrame { +# repeated uint32 ids = 1 [packed]; +# bool done = 2; +# optional string finish_reason = 3; +# } +# message CodecRequest { +# repeated uint32 prompt_ids = 1 [packed]; +# uint32 max_tokens = 2; +# float temperature = 3; +# repeated string stop = 4; +# string stream_format = 5; +# } +# +# Wire types: 0=varint, 2=len-delimited, 5=32-bit float + + +def _varint(n: int) -> bytes: + out: list[int] = [] + while True: + bits = n & 0x7F + n >>= 7 + if n: + out.append(bits | 0x80) + else: + out.append(bits) + break + return bytes(out) + + +def _decode_varint(data: bytes, pos: int) -> tuple[int, int]: + """Decode a single protobuf varint at ``pos``. + + Bounds-checked + shift-capped so malformed or malicious input fails + fast with a ValueError instead of looping unbounded or producing a + silently-wrong value. Cap is 35 bits — protobuf uint32 fits in <= 5 + bytes (5 * 7 = 35); a 6th continuation byte means the encoded value + cannot represent a uint32 and we reject it. + """ + result = shift = 0 + while True: + if pos >= len(data): + raise ValueError("Codec: truncated varint in CodecRequest") + b = data[pos] + pos += 1 + result |= (b & 0x7F) << shift + if not (b & 0x80): + return result, pos + shift += 7 + if shift > 35: + raise ValueError("Codec: varint overflow in CodecRequest") + + +def _encode_tool_call_msg(call: dict) -> bytes: + """Encode a single ToolCall sub-message. No length prefix — caller + wraps it as a length-delimited field 4 of CodecFrame. + + Wire shape (matches sglang's _encode_tool_call_msg + the libcodec + pb_encode_tool_call): + tag 0x0a field 1 string name (optional) + tag 0x12 field 2 string arguments_json (required) + tag 0x1a field 3 string id (optional) + """ + parts: list[bytes] = [] + name = call.get("name") + if name: + b = name.encode() + parts += [b"\x0a", _varint(len(b)), b] + args = call.get("arguments_json", "") + bargs = args.encode() + parts += [b"\x12", _varint(len(bargs)), bargs] + cid = call.get("id") + if cid: + b = cid.encode() + parts += [b"\x1a", _varint(len(b)), b] + return b"".join(parts) + + +def encode_protobuf_frame( + ids: IdsLike, + done: bool, + finish_reason: str | None = None, + tool_calls: Sequence[dict] | None = None, +) -> bytes: + """Raw protobuf bytes for a CodecFrame (no length prefix). + + v0.5: ids accepts numpy / array.array / bytes (LE uint32) in addition + to Sequence[int]. Mirrors encode_msgpack. + """ + parts: list[bytes] = [] + ids_list = ids if isinstance(ids, list) else _normalise_ids_to_list(ids) + if ids_list: + packed = b"".join(_varint(i) for i in ids_list) + parts += [b"\x0a", _varint(len(packed)), packed] # field 1, wt=2 + parts += [b"\x10", b"\x01" if done else b"\x00"] # field 2, wt=0 + if finish_reason is not None: + enc = finish_reason.encode() + parts += [b"\x1a", _varint(len(enc)), enc] # field 3, wt=2 + if tool_calls: + # Field 4: repeated ToolCall — each a length-delimited sub-message + # tagged 0x22 = (4 << 3) | 2. + for call in tool_calls: + sub = _encode_tool_call_msg(call) + parts += [b"\x22", _varint(len(sub)), sub] + return b"".join(parts) + + +def encode_protobuf( + ids: IdsLike, + done: bool, + finish_reason: str | None = None, + tool_calls: Sequence[dict] | None = None, +) -> bytes: + """4-byte big-endian length-prefixed CodecFrame.""" + payload = encode_protobuf_frame(ids, done, finish_reason, tool_calls) + return _struct.pack(">I", len(payload)) + payload + + +def decode_protobuf_request(data: bytes) -> dict: + """Decode a length-prefixed CodecRequest protobuf message to a dict.""" + # strip 4-byte prefix if present + if len(data) >= 4: + declared = _struct.unpack_from(">I", data, 0)[0] + if declared == len(data) - 4: + data = data[4:] + + result: dict = {} + pos = 0 + while pos < len(data): + tag_byte, pos = _decode_varint(data, pos) + field = tag_byte >> 3 + wt = tag_byte & 0x7 + if wt == 0: # varint + val, pos = _decode_varint(data, pos) + if field == 2: + result["max_tokens"] = val + # other varint fields: consumed and ignored + elif wt == 1: # 64-bit — skip (not used in CodecRequest) + pos += 8 + elif wt == 2: # length-delimited + length, pos = _decode_varint(data, pos) + payload = data[pos : pos + length] + pos += length + if field == 1: # prompt_ids (packed uint32) + ids: list[int] = [] + p = 0 + while p < len(payload): + v, p = _decode_varint(payload, p) + ids.append(v) + result["prompt_ids"] = ids + elif field == 4: # stop (repeated string) + result.setdefault("stop", []).append(payload.decode()) + elif field == 5: # stream_format + result["stream_format"] = payload.decode() + # other len-delimited fields: consumed and ignored + elif wt == 5: # 32-bit float + val = _struct.unpack_from(" bytes: + return ENCODERS[fmt](ids, done, finish_reason, tool_calls) + + +# ── proto schema (for clients) ───────────────────────────────────────────────── + +PROTO_SCHEMA = """\ +syntax = "proto3"; + +// Output frame — one per token batch in the binary stream. +message CodecFrame { + repeated uint32 ids = 1 [packed = true]; + bool done = 2; + optional string finish_reason = 3; + // Server-side tool-call detection (opt-in via request.tool_watcher). + // When the model emits a complete .. region in this chunk, + // the parsed result rides on the same frame whose `ids` come from + // immediately after the region. Multiple tool calls in one frame + // surface as a list. + repeated ToolCall tool_calls = 4; +} + +message ToolCall { + optional string name = 1; // parsed from JSON body when shape matches + string arguments_json = 2; // raw JSON body between markers + optional string id = 3; // server-generated, e.g. "tc_" +} + +// Binary request body for POST /v1/completions/codec +// Content-Type: application/x-msgpack → same keys as JSON dict below +// Content-Type: application/x-protobuf → CodecRequest message +message CodecRequest { + repeated uint32 prompt_ids = 1 [packed = true]; + uint32 max_tokens = 2; + float temperature = 3; + repeated string stop = 4; + string stream_format = 5; // "msgpack" or "protobuf" +} +""" diff --git a/vllm/entrypoints/codec_version.py b/vllm/entrypoints/codec_version.py new file mode 100644 index 000000000000..53a443e13709 --- /dev/null +++ b/vllm/entrypoints/codec_version.py @@ -0,0 +1,240 @@ +# SPDX-License-Identifier: Apache-2.0 +# SPDX-FileCopyrightText: Copyright contributors to the vLLM project +"""Codec version negotiation — opt-on, two-stage, graceful downgrade. + +Implements the rules in `spec/versions/v0.4.md`: + + - § Capabilities are opt-on at the server (two-stage) + - § Graceful downgrade (response shaping) + - § Version Compatibility Signaling (Codec-Client-Version, 426 path) + +Two-stage model per capability, both off by default: + + - ENABLE: capability is *available*. Server emits the v0.X-specific + headers to v0.X+ clients; v0.(X-1) clients see v0.(X-1) wire. + - ENFORCE: capability is *mandatory*. v0.(X-1) clients get 426 + Upgrade Required with structured upgrade prompt. + +Env-var configuration (matches the spec table): + + CODEC_SAFETY_POLICY= enable safety-policy v0.4 + CODEC_SAFETY_POLICY_REQUIRED=1 enforce safety-policy v0.4 + CODEC_VERSION_POLICY=advisory|strict enable + enforce version policy + +All operator config flows through env vars so the supervisor can mount +them via docker-compose or systemd without code changes. +""" + +from __future__ import annotations + +import json +import os + +from fastapi import Request +from fastapi.responses import Response + +# ── Version comparison ─────────────────────────────────────────────────────── + + +def _parse(v: str) -> tuple[int, int]: + """Parse "0.X" or "0.X.Y" → (0, X). Patch part is ignored for wire + decisions — only minor matters per the versioning policy.""" + s = v.strip().lstrip("v") + parts = s.split(".") + if len(parts) < 2: + return (0, 0) + try: + return (int(parts[0]), int(parts[1])) + except ValueError: + return (0, 0) + + +def version_ge(a: str, b: str) -> bool: + """True iff version a >= version b. Used to gate header emission.""" + return _parse(a) >= _parse(b) + + +def version_lt(a: str, b: str) -> bool: + return not version_ge(a, b) + + +# ── Request-side ───────────────────────────────────────────────────────────── + + +# Per spec § Version Compatibility Signaling: a request without the header +# is treated as v0.2 (the oldest published version) — the most conservative +# choice. v0.4+ clients always send the header. +DEFAULT_CLIENT_VERSION = "0.2" + + +def parse_client_version(request: Request) -> str: + """Return the client's advertised Codec version, or DEFAULT_CLIENT_VERSION + if the header is absent. + + Lower-cased lookup because Starlette normalizes header keys. + """ + raw = request.headers.get("codec-client-version", "").strip() + if not raw: + return DEFAULT_CLIENT_VERSION + # Tolerate a leading "v": "v0.4" → "0.4". + return raw.lstrip("v") + + +# ── Header version-introduced floor ────────────────────────────────────────── + + +# Mirrors `spec/versions/v0.4.md` § Graceful downgrade § reference floor table. +# Server MUST suppress a header when client_version < the floor. +HEADER_VERSION_INTRODUCED: dict[str, str] = { + "Codec-Tokenizer-Map": "0.2", + "Codec-Zstd-Dict": "0.2", + "Codec-Latent-Map": "0.3", + "Codec-Map": "0.3", # modality-agnostic alias + "Codec-Safety-Policy": "0.4", + "Codec-Safety-Policy-Hash": "0.4", + # Codec-Min-Version and Codec-Required-Features are v0.4 but emitted + # ONLY on 426 responses — see make_426_response() — so they're not + # gated through this table for normal 2xx flow. +} + + +def should_emit_header(header_name: str, client_version: str) -> bool: + """True iff the server may emit this header to a client speaking + `client_version`. Headers not in the registry default to True + (assumed v0.2 baseline or non-Codec headers like Content-Encoding). + """ + floor = HEADER_VERSION_INTRODUCED.get(header_name) + if floor is None: + return True + return version_ge(client_version, floor) + + +def filter_codec_headers(headers: dict, client_version: str) -> dict: + """Strip any Codec-* header whose version-introduced floor exceeds + the client's advertised version. Pass-through for non-Codec headers + (Content-Type, Vary, Content-Encoding, etc.). + + Used at the response-build seam so callers don't have to inline + a per-header check. + """ + out = {} + for k, v in headers.items(): + if should_emit_header(k, client_version): + out[k] = v + return out + + +# ── Capability config (stage-1: enable; stage-2: enforce) ──────────────────── + + +def safety_policy_enabled() -> bool: + """Stage-1: capability is available. v0.4+ clients see the headers.""" + return bool(os.environ.get("CODEC_SAFETY_POLICY", "").strip()) + + +def safety_policy_required() -> bool: + """Stage-2: mandatory. v0.3- clients get 426.""" + if not safety_policy_enabled(): + return False + return os.environ.get("CODEC_SAFETY_POLICY_REQUIRED", "").strip() in ( + "1", + "true", + "True", + "yes", + ) + + +def version_policy_mode() -> str: + """One of: "off" (default), "advisory" (header set but no 426), + "strict" (emit 426 when client < 0.4 and any capability is required). + """ + mode = os.environ.get("CODEC_VERSION_POLICY", "off").strip().lower() + return mode if mode in ("off", "advisory", "strict") else "off" + + +def any_v04_mandatory() -> bool: + """True iff any v0.4 capability is set to ENFORCE.""" + return safety_policy_required() or version_policy_mode() == "strict" + + +# ── 426 builder ────────────────────────────────────────────────────────────── + + +def collect_required_features() -> list[str]: + """Per spec § Required-features registry: which feature names should + appear on a 426 right now, based on the server's enforce-stage + config.""" + out: list[str] = [] + if safety_policy_required(): + out.append("safety-policy-enforcement") + # Future: mandatory-classifier when that capability lands. + return out + + +def needs_upgrade(client_version: str, min_version: str = "0.4") -> bool: + """True iff the deployment mandates `min_version` and the client's + advertised version is below it.""" + if not any_v04_mandatory(): + return False + return version_lt(client_version, min_version) + + +def make_426_response( + *, + client_version: str, + min_version: str = "0.4", + deployment_id: str | None = None, +) -> Response: + """Build the structured 426 Upgrade Required response. + + See `spec/versions/v0.4.md § Version Compatibility Signaling` — + HTTP-transport shape. JSON body degrades gracefully for older + clients that don't know the field shape (`error` is a string they + can render). + """ + required = collect_required_features() + body = { + "error": "codec_version_required", + "minimum_version": min_version, + "required_features": required, + "client_version": client_version, + "docs_url": "https://codecai.net/docs/version-negotiation/", + } + if deployment_id is None: + deployment_id = os.environ.get("CODEC_DEPLOYMENT_ID", "").strip() + if deployment_id: + body["deployment_id"] = deployment_id + + headers = { + "Codec-Min-Version": min_version, + "Codec-Required-Features": ", ".join(required) if required else "", + } + if not headers["Codec-Required-Features"]: + # Don't ship an empty header value. + del headers["Codec-Required-Features"] + + return Response( + content=json.dumps(body), + status_code=426, + media_type="application/json", + headers=headers, + ) + + +# ── Well-known version-policy descriptor ───────────────────────────────────── + + +def version_policy_document() -> dict | None: + """Return the `.well-known/codec/version-policy.json` content for the + current server state, or None when no v0.4 capability is mandatory + (the spec says deployments without mandatory features SHOULD NOT + publish this). + """ + if not any_v04_mandatory(): + return None + return { + "minimum_version": "0.4", + "required_features": collect_required_features(), + "deployment_id": os.environ.get("CODEC_DEPLOYMENT_ID", "").strip() or None, + "docs_url": "https://codecai.net/docs/version-negotiation/", + } diff --git a/vllm/entrypoints/openai/chat_completion/api_router.py b/vllm/entrypoints/openai/chat_completion/api_router.py index cdaaa27fcdab..7610bbfe2446 100644 --- a/vllm/entrypoints/openai/chat_completion/api_router.py +++ b/vllm/entrypoints/openai/chat_completion/api_router.py @@ -7,6 +7,8 @@ from fastapi import APIRouter, Depends, FastAPI, Request from fastapi.responses import JSONResponse, StreamingResponse +from vllm.entrypoints.codec_compression import wrap_streaming_response +from vllm.entrypoints.codec_frame import CONTENT_TYPE from vllm.entrypoints.openai.chat_completion.batch_serving import OpenAIServingChatBatch from vllm.entrypoints.openai.chat_completion.protocol import ( BatchChatCompletionRequest, @@ -51,6 +53,17 @@ def batch_chat(request: Request) -> OpenAIServingChatBatch | None: @with_cancellation @load_aware_call async def create_chat_completion(request: ChatCompletionRequest, raw_request: Request): + # Codec v0.4 version-negotiation gate. Default-off. + from vllm.entrypoints.codec_version import ( + make_426_response, + needs_upgrade, + parse_client_version, + ) + + client_version = parse_client_version(raw_request) + if needs_upgrade(client_version): + return make_426_response(client_version=client_version) + metrics_header_format = raw_request.headers.get( ENDPOINT_LOAD_METRICS_FORMAT_HEADER_LABEL, "" ) @@ -71,6 +84,18 @@ async def create_chat_completion(request: ChatCompletionRequest, raw_request: Re headers=metrics_header(metrics_header_format), ) + if request.stream_format != "json": + media_type = CONTENT_TYPE.get( + request.stream_format, "application/x-msgpack" + ) + return wrap_streaming_response( + raw_request.headers.get("accept-encoding", ""), + generator, + media_type=media_type, + stream_format=request.stream_format, + client_version=client_version, + ) + return StreamingResponse(content=generator, media_type="text/event-stream") diff --git a/vllm/entrypoints/openai/chat_completion/protocol.py b/vllm/entrypoints/openai/chat_completion/protocol.py index 73ecb3f35a1c..3d4663b92c1b 100644 --- a/vllm/entrypoints/openai/chat_completion/protocol.py +++ b/vllm/entrypoints/openai/chat_completion/protocol.py @@ -203,6 +203,49 @@ class ChatCompletionRequest(OpenAIBaseModel): stop: str | list[str] | None = [] stream: bool | None = False stream_options: StreamOptions | None = None + stream_format: Literal["json", "msgpack", "protobuf"] = Field( + default="json", + description=( + "Binary wire format for streaming token output (Codec protocol). " + "'json' (default) uses the standard SSE/JSON path with full chat " + "structure (assistant role, tool calls, finish_reason). " + "'msgpack' / 'protobuf' stream raw token IDs as Codec frames — " + "no role headers, no tool-call parsing, no detokenization. " + "Setting a binary format implies stream=True and rejects n > 1. " + "The client is responsible for any chat-protocol decoding it needs " + "(e.g. running its own tool-call parser over the decoded text). " + "See GET /codec/schema for the protobuf schema." + ), + ) + tool_watcher: bool = Field( + default=False, + description=( + "Codec server-side ToolWatcher (PR #24557 equivalent). When true, " + "the server runs a uint32-compare state machine over the outbound " + "token stream and surfaces completed `..` regions as " + "structured `tool_calls` on the matching CodecFrame. The marker " + "tokens themselves are consumed and not forwarded to the client. " + "Requires a binary stream_format (msgpack/protobuf) — has no " + "effect on the JSON/SSE path. Falls back silently when the marker " + "strings don't resolve to single tokens in the loaded vocab." + ), + ) + tool_watcher_start: str | None = Field( + default=None, + description=( + "Override for the watcher's start marker. Defaults to " + "'' (Qwen 2.5+). Use '<|python_tag|>' for Llama 3.1+, " + "'<|tool|>' for Phi-4, '[TOOL_CALLS]' for Mistral-Nemo, etc. " + "Must resolve to exactly one special token in the loaded vocab." + ), + ) + tool_watcher_end: str | None = Field( + default=None, + description=( + "Override for the watcher's end marker. Defaults to " + "'' (Qwen 2.5+)." + ), + ) temperature: float | None = None top_p: float | None = None tools: list[ChatCompletionToolsParam] | None = None @@ -447,6 +490,25 @@ def _normalize_messages_before(cls, data: Any) -> Any: msg["reasoning"] = reasoning_content return data + @model_validator(mode="before") + @classmethod + def validate_stream_format(cls, data): + # Mirror CompletionRequest.validate_stream_format. See completion/protocol.py. + if not isinstance(data, dict): + return data + fmt = data.get("stream_format", "json") + if fmt != "json": + data["stream"] = True + n = data.get("n", 1) + if isinstance(n, int) and n > 1: + raise VLLMValidationError( + f"stream_format='{fmt}' does not support n > 1. " + "Binary CodecFrame has no choice index field; multiple " + "completion sequences cannot be demultiplexed by the client. " + "Use n=1 or stream_format='json'." + ) + return data + @model_validator(mode="after") def _materialize_tool_calls_after(self) -> "ChatCompletionRequest": """Convert Pydantic ValidatorIterator wrappers back to lists. diff --git a/vllm/entrypoints/openai/chat_completion/serving.py b/vllm/entrypoints/openai/chat_completion/serving.py index 92ffc141548b..5ab0df0185ba 100644 --- a/vllm/entrypoints/openai/chat_completion/serving.py +++ b/vllm/entrypoints/openai/chat_completion/serving.py @@ -15,6 +15,21 @@ from fastapi import Request from vllm.engine.protocol import EngineClient +from vllm.entrypoints.codec_agent import ( + ToolWatcher, + detokenize_region, + make_call_id, + parse_tool_call, + resolve_marker_id, +) +from vllm.entrypoints.codec_dispatcher import ( + CODEC_BOLT_ON_DISPATCH, + ToolRegistry, + dispatch_call, + dispatch_call_async, + reinject_ids_into_context, +) +from vllm.entrypoints.codec_frame import encode_frame from vllm.entrypoints.chat_utils import ( ChatTemplateContentFormatOption, ConversationMessage, @@ -169,6 +184,15 @@ def __init__( self.supports_code_interpreter = False self.python_tool = None + # v0.5 #87: lazy-cached Codec ToolRegistry. `ToolRegistry.from_env` + # performs blocking HTTP fetches against manifest URLs; doing that + # per-request blocks the event loop and adds startup latency to + # every tool-watcher stream. Resolve once per process on first use + # and reuse the registry thereafter. Sentinel = "not yet attempted"; + # None after load = attempted but env not configured. + self._codec_dispatcher_registry: ToolRegistry | None = None + self._codec_dispatcher_registry_loaded: bool = False + def warmup(self) -> None: self.renderer.warmup( ChatParams( @@ -361,6 +385,17 @@ async def _create_chat_completion( (result_generator,) = generators if request.stream: + if request.stream_format != "json": + # Binary Codec path: emit raw token IDs only. No role headers, + # no tool-call parsing, no reasoning-content split. The client + # owns any chat-protocol decoding it wants to do over the + # decoded text. (When `tool_watcher` is set, the server runs + # an O(n) ID-compare state machine over the stream and + # surfaces structured tool_calls on the matching frame — + # mirrors sglang PR #24557.) + return self.chat_completion_binary_stream_generator( + request, result_generator, tokenizer + ) return self.chat_completion_stream_generator( request, result_generator, @@ -995,6 +1030,153 @@ async def chat_completion_stream_generator( # Send the final done message after all response.n are finished yield "data: [DONE]\n\n" + async def chat_completion_binary_stream_generator( + self, + request: ChatCompletionRequest, + result_generator: AsyncIterator[RequestOutput], + tokenizer: TokenizerLike | None = None, + ) -> AsyncIterator[bytes]: + """Stream raw token IDs as Codec binary frames for chat completions. + + Symmetric with the completion endpoint's binary path: drop role + headers, finish_reason structures, tool-call parsing, and reasoning + splits — emit only `CodecFrame { ids, done, finish_reason? }` per + chunk. The client (e.g. @codecai/web) decodes the IDs and runs any + chat-protocol parsing it wants over the decoded text. + + When `request.tool_watcher` is true and the marker strings resolve + to single-token IDs in the loaded vocab, this also runs a uint32- + compare state machine over the outbound stream — completed + `..` regions surface as structured `tool_calls` on the + matching frame, with the marker tokens themselves consumed (not + forwarded to the client). Mirrors sglang PR #24557 / the libcodec + ToolWatcher / @codecai/web's ToolWatcher bit-for-bit. + + Validation in `ChatCompletionRequest.validate_stream_format` already + forced `stream=True` and rejected `n > 1`, so we expect exactly one + choice per chunk. + """ + # ── ToolWatcher setup ──────────────────────────────────────────── + watcher: ToolWatcher | None = None + if getattr(request, "tool_watcher", False) and tokenizer is not None: + start_marker = ( + getattr(request, "tool_watcher_start", None) or "" + ) + end_marker = ( + getattr(request, "tool_watcher_end", None) or "" + ) + start_id = resolve_marker_id(tokenizer, start_marker) + end_id = resolve_marker_id(tokenizer, end_marker) + if start_id is not None and end_id is not None: + watcher = ToolWatcher(start_id=start_id, end_id=end_id) + else: + logger.warning( + "Codec ToolWatcher: markers %r/%r do not resolve to single " + "tokens in this model's vocab. Falling back to plain Codec " + "streaming.", + start_marker, + end_marker, + ) + + next_call_seq = 1 + + # v0.5 #87: bolt-on tool dispatcher. When the flag is set AND a + # ToolWatcher is active, load the tool registry once per stream + # and dispatch detected tool calls in-engine, reinjecting the + # tool's response_ids back into the stream. + dispatcher_registry: ToolRegistry | None = None + if CODEC_BOLT_ON_DISPATCH and watcher is not None: + # Cached on the serving instance — ToolRegistry.from_env makes + # blocking HTTP fetches against manifest URLs, so it must NEVER + # run inside the async request loop. First request after process + # start pays the fetch cost (off-loop via to_thread); subsequent + # requests reuse the cached registry. + if not self._codec_dispatcher_registry_loaded: + tokenizer_hash = getattr(self, "_codec_tokenizer_map_hash", "") + import asyncio as _asyncio + self._codec_dispatcher_registry = await _asyncio.to_thread( + ToolRegistry.from_env, tokenizer_hash + ) + self._codec_dispatcher_registry_loaded = True + dispatcher_registry = self._codec_dispatcher_registry + + try: + async for res in result_generator: + if not res.outputs: + continue + output = res.outputs[0] + ids = list(output.token_ids) if output.token_ids else [] + finish_reason = output.finish_reason + done = finish_reason is not None + + # Run the watcher if active. Otherwise pass through unchanged. + if watcher is not None and ids: + passthrough_ids, completed_regions = watcher.feed(ids) + else: + passthrough_ids, completed_regions = ids, [] + + tool_calls_wire: list[dict] | None = None + if completed_regions: + wire: list[dict] = [] + for region in completed_regions: + body_text = ( + detokenize_region(tokenizer, region) + if tokenizer is not None + else "" + ) + ev = parse_tool_call( + body_text, call_id=make_call_id(next_call_seq) + ) + next_call_seq += 1 + wire.append(ev.to_wire_dict()) + + # v0.5 #87: in-engine dispatch when registered. + if dispatcher_registry is not None and ev.name: + tool = dispatcher_registry.get(ev.name) + if tool is not None and tool.mode == "dispatch": + try: + # Use the async variant — sync `dispatch_call` + # does a blocking urllib POST that would + # freeze the event loop if called from this + # `async def`. `dispatch_call_async` wraps + # it in asyncio.to_thread so other + # concurrent requests on the worker keep + # flowing while the tool reply is in flight. + result = await dispatch_call_async( + tool, + arguments_json=ev.arguments_json, + call_id=ev.id or make_call_id(next_call_seq), + ) + if not result.is_error and result.response_ids: + passthrough_ids = reinject_ids_into_context( + passthrough_ids, result.response_ids, + ) + except Exception as e: + logger.warning( + "codec_dispatcher: dispatch_call(%s) failed: %s", + ev.name, e, + ) + tool_calls_wire = wire + + yield encode_frame( + request.stream_format, + passthrough_ids, + done=done, + finish_reason=finish_reason, + tool_calls=tool_calls_wire, + ) + if done: + return + except GenerationError: + yield encode_frame( + request.stream_format, [], done=True, finish_reason="error" + ) + except Exception: + logger.exception("Error in chat completion binary stream generator.") + yield encode_frame( + request.stream_format, [], done=True, finish_reason="error" + ) + async def chat_completion_full_generator( self, request: ChatCompletionRequest, diff --git a/vllm/entrypoints/openai/completion/api_router.py b/vllm/entrypoints/openai/completion/api_router.py index 4d8e0f885837..08151da0583a 100644 --- a/vllm/entrypoints/openai/completion/api_router.py +++ b/vllm/entrypoints/openai/completion/api_router.py @@ -5,8 +5,15 @@ from http import HTTPStatus from fastapi import APIRouter, Depends, FastAPI, Request -from fastapi.responses import JSONResponse, StreamingResponse +from fastapi.responses import JSONResponse, PlainTextResponse, StreamingResponse +from vllm.entrypoints.codec_compression import wrap_streaming_response +from vllm.entrypoints.codec_frame import ( + CONTENT_TYPE, + PROTO_SCHEMA, + decode_msgpack, + decode_protobuf_request, +) from vllm.entrypoints.openai.completion.protocol import ( CompletionRequest, CompletionResponse, @@ -44,6 +51,19 @@ def completion(request: Request) -> OpenAIServingCompletion | None: @with_cancellation @load_aware_call async def create_completion(request: CompletionRequest, raw_request: Request): + # Codec v0.4 version-negotiation gate. Default-off; only fires when + # the deployment has CODEC_*_REQUIRED set and the client speaks below + # the floor. See spec/versions/v0.4.md § Version Compatibility Signaling. + from vllm.entrypoints.codec_version import ( + make_426_response, + needs_upgrade, + parse_client_version, + ) + + client_version = parse_client_version(raw_request) + if needs_upgrade(client_version): + return make_426_response(client_version=client_version) + metrics_header_format = raw_request.headers.get( ENDPOINT_LOAD_METRICS_FORMAT_HEADER_LABEL, "" ) @@ -63,7 +83,130 @@ async def create_completion(request: CompletionRequest, raw_request: Request): headers=metrics_header(metrics_header_format), ) - return StreamingResponse(content=generator, media_type="text/event-stream") + media_type = CONTENT_TYPE.get(request.stream_format, "text/event-stream") + # Negotiated transport compression for binary streams. JSON SSE keeps + # whatever compression is applied higher up the stack (proxies / FastAPI + # middleware) and is unaffected by this codepath. + if request.stream_format != "json": + return wrap_streaming_response( + raw_request.headers.get("accept-encoding", ""), + generator, + media_type=media_type, + stream_format=request.stream_format, + client_version=client_version, + ) + return StreamingResponse(content=generator, media_type=media_type) + + +@router.post( + "/v1/completions/codec", + summary="Token-native binary completions (Codec protocol)", + description=( + "Bidirectional token-native endpoint. Submit prompt token IDs as a " + "binary body (msgpack or protobuf) and receive a binary stream of " + "CodecFrame messages containing generated token IDs — no text " + "conversion at any point.\n\n" + "Content-Type of request body sets the input format:\n" + " application/x-msgpack → msgpack dict with keys: prompt_ids, " + "max_tokens, temperature, stop, stream_format\n" + " application/x-protobuf → CodecRequest proto message\n\n" + "stream_format in the body determines response encoding " + "('msgpack' or 'protobuf')." + ), + responses={ + HTTPStatus.OK.value: { + "content": { + "application/x-msgpack": {}, + "application/x-protobuf": {}, + } + }, + }, +) +@with_cancellation +@load_aware_call +async def create_completion_codec(raw_request: Request): + # Codec v0.4 version-negotiation gate. + from vllm.entrypoints.codec_version import ( + make_426_response, + needs_upgrade, + parse_client_version, + ) + + client_version = parse_client_version(raw_request) + if needs_upgrade(client_version): + return make_426_response(client_version=client_version) + + handler = completion(raw_request) + if handler is None: + raise NotImplementedError("The model does not support Completions API") + + content_type = raw_request.headers.get("content-type", "") + body = await raw_request.body() + + if "protobuf" in content_type: + params = decode_protobuf_request(body) + else: + params = decode_msgpack(body) + + prompt_ids: list[int] = params.get("prompt_ids", []) + stream_format: str = params.get("stream_format", "msgpack") + + request = CompletionRequest( + prompt=prompt_ids, + max_tokens=params.get("max_tokens", 256), + temperature=params.get("temperature"), + stop=params.get("stop"), + stream=True, + stream_format=stream_format, + ) + + generator = await handler.create_completion(request, raw_request) + + if isinstance(generator, ErrorResponse): + return JSONResponse( + content=generator.model_dump(), status_code=generator.error.code + ) + + media_type = CONTENT_TYPE.get(stream_format, "application/x-msgpack") + return wrap_streaming_response( + raw_request.headers.get("accept-encoding", ""), + generator, + media_type=media_type, + stream_format=stream_format, + client_version=client_version, + ) + + +@router.get( + "/codec/schema", + summary="Codec protobuf schema", + description="Returns the .proto schema for CodecFrame and CodecRequest.", +) +async def codec_schema(): + return PlainTextResponse(content=PROTO_SCHEMA, media_type="text/plain") + + +@router.get( + "/.well-known/codec/version-policy.json", + summary="Codec v0.4 deployment minimum-version policy", + description=( + "Pre-flight discovery doc per spec/WELL_KNOWN_DISCOVERY.md § " + "Version policy (v0.4+). Returns 404 when the deployment does " + "NOT mandate any v0.4 feature — that's the normal state for an " + "unrestricted deployment. Returns 200 with the policy doc when " + "any CODEC_*_REQUIRED env var is set." + ), +) +async def well_known_version_policy(): + from fastapi.responses import JSONResponse + from fastapi.responses import Response as FastResponse + + from vllm.entrypoints.codec_version import version_policy_document + + doc = version_policy_document() + if doc is None: + return FastResponse(status_code=404) + return JSONResponse(doc) def attach_router(app: FastAPI): diff --git a/vllm/entrypoints/openai/completion/protocol.py b/vllm/entrypoints/openai/completion/protocol.py index cb793a415633..e372561dd250 100644 --- a/vllm/entrypoints/openai/completion/protocol.py +++ b/vllm/entrypoints/openai/completion/protocol.py @@ -151,6 +151,21 @@ class CompletionRequest(OpenAIBaseModel): ), ) + stream_format: Literal["json", "msgpack", "protobuf"] = Field( + default="json", + description=( + "Binary wire format for streaming token output. " + "'json' (default) uses the standard SSE/JSON path. " + "'msgpack' streams raw token IDs as msgpack-encoded frames " + "(Content-Type: application/x-msgpack). " + "'protobuf' streams length-prefixed protobuf CodecFrame messages " + "(Content-Type: application/x-protobuf). " + "Both binary formats set detokenize=False internally and return " + "only token IDs — no text is produced. " + "See GET /codec/schema for the protobuf schema." + ), + ) + cache_salt: str | None = Field( default=None, description=( @@ -312,6 +327,7 @@ def to_sampling_params( if self.kv_transfer_params: # Pass in kv_transfer_params via extra_args extra_args["kv_transfer_params"] = self.kv_transfer_params + binary_stream = self.stream_format != "json" return SamplingParams.from_optional( n=self.n, presence_penalty=self.presence_penalty, @@ -322,7 +338,8 @@ def to_sampling_params( top_k=top_k, min_p=min_p, seed=self.seed, - stop=self.stop, + # stop strings require detokenization — silently drop when binary + stop=self.stop if not binary_stream else None, stop_token_ids=self.stop_token_ids, logprobs=self.logprobs, ignore_eos=self.ignore_eos, @@ -341,9 +358,32 @@ def to_sampling_params( extra_args=extra_args or None, skip_clone=True, # Created fresh per request, safe to skip clone repetition_detection=self.repetition_detection, + detokenize=not binary_stream, thinking_token_budget=self.thinking_token_budget, ) + @model_validator(mode="before") + @classmethod + def validate_stream_format(cls, data): + if not isinstance(data, dict): + return data + fmt = data.get("stream_format", "json") + if fmt != "json": + # Binary formats require streaming — force it on silently. + data["stream"] = True + # n > 1 is not supported for binary formats: CodecFrame has no + # choice index, so multiple sequences would be interleaved with no + # way for the client to demultiplex them. + n = data.get("n", 1) + if isinstance(n, int) and n > 1: + raise VLLMValidationError( + f"stream_format='{fmt}' does not support n > 1. " + "Binary CodecFrame has no choice index field; multiple " + "completion sequences cannot be demultiplexed by the client. " + "Use n=1 or stream_format='json'." + ) + return data + @model_validator(mode="before") @classmethod def validate_response_format(cls, data): diff --git a/vllm/entrypoints/openai/completion/serving.py b/vllm/entrypoints/openai/completion/serving.py index f393954e2a05..6c6f7505d009 100644 --- a/vllm/entrypoints/openai/completion/serving.py +++ b/vllm/entrypoints/openai/completion/serving.py @@ -8,6 +8,8 @@ from collections.abc import Sequence as GenericSequence from typing import TYPE_CHECKING, cast +from vllm.entrypoints.codec_frame import encode_frame + import numpy as np import pybase64 as base64 from fastapi import Request @@ -287,7 +289,7 @@ async def completion_stream_generator( num_prompts: int, tokenizer: TokenizerLike | None, request_metadata: RequestResponseMetadata, - ) -> AsyncGenerator[str, None]: + ) -> AsyncGenerator[str | bytes, None]: num_choices = 1 if request.n is None else request.n previous_text_lens = [0] * num_choices * num_prompts previous_num_tokens = [0] * num_choices * num_prompts @@ -410,6 +412,7 @@ async def completion_stream_generator( token_ids=( as_list(output.token_ids) if request.return_token_ids + or request.stream_format != "json" else None ), ) @@ -432,8 +435,19 @@ async def completion_stream_generator( total_tokens=prompt_tokens + completion_tokens, ) - response_json = chunk.model_dump_json(exclude_unset=True) - yield f"data: {response_json}\n\n" + if request.stream_format != "json": + # Binary path: emit a compact frame of token IDs only. + # Text fields in `chunk` are empty because detokenize=False. + ids = chunk.choices[0].token_ids or [] + yield encode_frame( + request.stream_format, + ids, + done=finish_reason is not None, + finish_reason=finish_reason, + ) + else: + response_json = chunk.model_dump_json(exclude_unset=True) + yield f"data: {response_json}\n\n" total_prompt_tokens = sum(num_prompt_tokens) total_completion_tokens = sum(previous_num_tokens) @@ -448,7 +462,7 @@ async def completion_stream_generator( cached_tokens=num_cached_tokens ) - if include_usage: + if include_usage and request.stream_format == "json": final_usage_chunk = CompletionStreamResponse( id=request_id, created=created_time, @@ -466,12 +480,24 @@ async def completion_stream_generator( request_metadata.final_usage_info = final_usage_info except GenerationError as e: - yield f"data: {self._convert_generation_error_to_streaming_response(e)}\n\n" + if request.stream_format == "json": + yield f"data: {self._convert_generation_error_to_streaming_response(e)}\n\n" + else: + # Binary path: send a terminal frame so the client knows the + # stream is over. Without this the client cannot distinguish a + # truncated stream from a server error. + yield encode_frame(request.stream_format, [], done=True, + finish_reason="error") except Exception as e: logger.exception("Error in completion stream generator.") - data = self.create_streaming_error_response(e) - yield f"data: {data}\n\n" - yield "data: [DONE]\n\n" + if request.stream_format == "json": + data = self.create_streaming_error_response(e) + yield f"data: {data}\n\n" + else: + yield encode_frame(request.stream_format, [], done=True, + finish_reason="error") + if request.stream_format == "json": + yield "data: [DONE]\n\n" def request_output_to_completion_response( self, diff --git a/vllm/entrypoints/openai/server_utils.py b/vllm/entrypoints/openai/server_utils.py index 269c33549e84..a6bfc95ad581 100644 --- a/vllm/entrypoints/openai/server_utils.py +++ b/vllm/entrypoints/openai/server_utils.py @@ -35,9 +35,6 @@ logger = init_logger("vllm.entrypoints.openai.server_utils") -GUARDED_PREFIX = ("/v1", "/v2", "/inference") - - class AuthenticationMiddleware: """ Pure ASGI middleware that authenticates each request by checking @@ -47,7 +44,7 @@ class AuthenticationMiddleware: ----- There are two cases in which authentication is skipped: 1. The HTTP method is OPTIONS. - 2. The request path doesn't start with GUARDED_PREFIX (e.g. /health). + 2. The request path doesn't start with /v1 (e.g. /health). """ def __init__(self, app: ASGIApp, tokens: list[str]) -> None: @@ -83,7 +80,7 @@ def __call__(self, scope: Scope, receive: Receive, send: Send) -> Awaitable[None url_path = scope["path"].removeprefix(root_path) headers = Headers(scope=scope) # Type narrow to satisfy mypy. - if url_path.startswith(GUARDED_PREFIX) and not self.verify_token(headers): + if url_path.startswith(("/v1", "/v2", "/inference")) and not self.verify_token(headers): response = JSONResponse(content={"error": "Unauthorized"}, status_code=401) return response(scope, receive, send) return self.app(scope, receive, send) @@ -449,6 +446,48 @@ async def validation_exception_handler(req: Request, exc: RequestValidationError @asynccontextmanager async def lifespan(app: FastAPI): try: + # ── Codec: load any pre-trained zstd dictionaries declared via env ───── + # Per spec/PROTOCOL.md "Pre-trained ZSTD dictionaries", a server MUST + # load a matching dict before the negotiator can pick zstd. Env vars + # let operators wire dict files into the boot sequence without a + # supervisor admin call: + # + # CODEC_ZSTD_DICT_MSGPACK_PATH=/opt/codec/dicts/qwen2.5-msgpack-v1.dict + # CODEC_ZSTD_DICT_PROTOBUF_PATH=/opt/codec/dicts/qwen2.5-protobuf-v1.dict + # + # Both are optional. Missing or unreadable files are logged and + # skipped; the server continues without that format's dict and + # the negotiator falls through to gzip/br for those requests. + try: + from vllm.entrypoints import codec_compression as _codec_comp + import logging as _logging + import os as _os + _codec_log = _logging.getLogger("vllm.codec") + for _fmt, _env in ( + ("msgpack", "CODEC_ZSTD_DICT_MSGPACK_PATH"), + ("protobuf", "CODEC_ZSTD_DICT_PROTOBUF_PATH"), + ): + _path = _os.environ.get(_env) + if not _path: + continue + try: + with open(_path, "rb") as _f: + _bytes = _f.read() + _codec_comp.set_zstd_dict(_fmt, _bytes) + _hash = _codec_comp.get_zstd_dict_hash(_fmt) or "(unknown)" + _codec_log.info( + "codec: loaded zstd dict for %s from %s (%s, %d bytes)", + _fmt, _path, _hash, len(_bytes), + ) + except OSError as _e: + _codec_log.warning( + "codec: failed to load %s from %s: %s — falling back to gzip for %s", + _env, _path, _e, _fmt, + ) + except ImportError: + # codec_compression not available in this build; nothing to do. + pass + if app.state.log_stats: engine_client: EngineClient = app.state.engine_client