Skip to content
Open
Show file tree
Hide file tree
Changes from 9 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions fastdeploy/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -209,6 +209,7 @@ def __init__(
self.max_model_len = 0
self.dtype = "bfloat16"
self.enable_logprob = False
self.compute_logits_stats = False
self.max_logprobs = 20
self.logprobs_mode = "raw_logprobs"
self.redundant_experts_num = 0
Expand Down
12 changes: 12 additions & 0 deletions fastdeploy/engine/args_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -460,6 +460,12 @@ class EngineArgs:
Must be explicitly enabled via the `--enable-logprob` startup parameter to output logprob values.
"""

compute_logits_stats: bool = False
"""
Flag to enable per-token logits statistics (min/max/mean/std) output.
Only effective when enable_logprob is True.
"""

max_logprobs: int = 20
"""
Maximum number of log probabilities to return when `enable_logprob` is True. The default value comes the default for the
Expand Down Expand Up @@ -893,6 +899,12 @@ def add_cli_args(parser: FlexibleArgumentParser) -> FlexibleArgumentParser:
default=EngineArgs.enable_logprob,
help="Enable output of token-level log probabilities.",
)
model_group.add_argument(
"--compute-logits-stats",
action="store_true",
default=EngineArgs.compute_logits_stats,
help="Enable per-token logits statistics (min/max/mean/std) output.",
)
Comment on lines 899 to +907
Copy link

Copilot AI Mar 25, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

PR 标题目前为“【TI-Consisent】...”,不符合仓库要求的 [CLASS]Title 格式(模板里给出的 tag 列表如 [Feature] / [BugFix] 等)。建议将标题改为类似 [Feature] Add logits_stats metric for ZMQ logprobs,并修正 Consisent 的拼写以便后续检索与自动化流程识别。

Copilot uses AI. Check for mistakes.
model_group.add_argument(
"--max-logprobs",
type=int,
Expand Down
1 change: 1 addition & 0 deletions fastdeploy/engine/common_engine.py
Original file line number Diff line number Diff line change
Expand Up @@ -2451,6 +2451,7 @@ def _start_worker_service(self):
"use_internode_ll_two_stage": self.cfg.parallel_config.use_internode_ll_two_stage,
"disable_sequence_parallel_moe": self.cfg.parallel_config.disable_sequence_parallel_moe,
"enable_logprob": self.cfg.model_config.enable_logprob,
"compute_logits_stats": self.cfg.model_config.compute_logits_stats,
"lm_head_fp32": self.cfg.model_config.lm_head_fp32,
"moe_gate_fp32": self.cfg.model_config.moe_gate_fp32,
"enable_entropy": self.cfg.model_config.enable_entropy,
Expand Down
1 change: 1 addition & 0 deletions fastdeploy/engine/engine.py
Original file line number Diff line number Diff line change
Expand Up @@ -645,6 +645,7 @@ def _start_worker_service(self):
"use_internode_ll_two_stage": self.cfg.parallel_config.use_internode_ll_two_stage,
"disable_sequence_parallel_moe": self.cfg.parallel_config.disable_sequence_parallel_moe,
"enable_logprob": self.cfg.model_config.enable_logprob,
"compute_logits_stats": self.cfg.model_config.compute_logits_stats,
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

common_engine.py中也得加这个参数

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

已添加

"lm_head_fp32": self.cfg.model_config.lm_head_fp32,
"moe_gate_fp32": self.cfg.model_config.moe_gate_fp32,
"shutdown_comm_group_if_worker_idle": self.cfg.parallel_config.shutdown_comm_group_if_worker_idle,
Expand Down
3 changes: 3 additions & 0 deletions fastdeploy/engine/request.py
Original file line number Diff line number Diff line change
Expand Up @@ -727,6 +727,7 @@ class CompletionOutput:
delta_message: Optional[DeltaMessage] = None
multipart: Optional[list[Any]] = None
num_image_tokens: Optional[int] = None
logits_stats: Optional[dict[str, float]] = None

def to_dict(self):
"""
Expand All @@ -745,6 +746,7 @@ def to_dict(self):
"text": self.text,
"reasoning_content": self.reasoning_content,
"reasoning_token_num": self.reasoning_token_num,
"logits_stats": self.logits_stats,
}

@classmethod
Expand All @@ -770,6 +772,7 @@ def __repr__(self) -> str:
f"logprobs={self.logprobs}, "
f"top_logprobs={self.top_logprobs}, "
f"draft_top_logprobs={self.draft_top_logprobs}, "
f"logits_stats={self.logits_stats}, "
)

def get(self, key: str, default_value=None):
Expand Down
2 changes: 1 addition & 1 deletion fastdeploy/entrypoints/llm.py
Original file line number Diff line number Diff line change
Expand Up @@ -450,7 +450,7 @@ def _build_prompt_logprobs(
tensors.
"""

token_ids, logprobs, ranks = prompt_logprobs_tensors
token_ids, logprobs, ranks = prompt_logprobs_tensors[:3]

# Detokenize non-incrementally.
# Output is flat: [num_tok, num_lps] -> [num_tok * num_lps]
Expand Down
1 change: 1 addition & 0 deletions fastdeploy/entrypoints/openai/protocol.py
Original file line number Diff line number Diff line change
Expand Up @@ -294,6 +294,7 @@ class LogProbEntry(BaseModel):
logprob: float
bytes: Optional[List[int]] = None
top_logprobs: Optional[List[LogProbEntry]] = None
logits_stats: Optional[Dict[str, float]] = None


class LogProbs(BaseModel):
Expand Down
76 changes: 66 additions & 10 deletions fastdeploy/entrypoints/openai/serving_chat.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,8 @@
# limitations under the License.
"""

from __future__ import annotations

import asyncio
import itertools
import time
Expand Down Expand Up @@ -825,22 +827,74 @@ def _create_chat_logprobs(
request_decode_flag: Optional[bool] = True,
) -> Optional[LogProbs]:
"""Create OpenAI-style logprobs for chat completions."""
if output_top_logprobs is None or len(output_top_logprobs) < 3 or any(not lst for lst in output_top_logprobs):
if (
output_top_logprobs is None
or len(output_top_logprobs) < 3
or any(not lst for lst in output_top_logprobs[:3])
):
return None
logprobs_res: Optional[LogProbs] = None
for logprob_token_ids, logprobs, sampled_token_ranks in zip(
output_top_logprobs[0], output_top_logprobs[1], output_top_logprobs[2]
):
top_logprobs = LogprobsLists(
logprob_token_ids=[logprob_token_ids],
logprobs=[logprobs],
sampled_token_ranks=[sampled_token_ranks],
)

# Check if output_top_logprobs is a LogprobsLists object(NamedTuple) or a list
is_logprobslists = hasattr(output_top_logprobs, "logprob_token_ids")

# Extract logits stats if available
if is_logprobslists:
# output_top_logprobs is LogprobsLists namedtuple
has_logits_stats = output_top_logprobs.logits_min is not None
else:
# list from msgpack: [logprob_token_ids, logprobs, sampled_token_ranks, logits_min, logits_max, logits_mean, logits_std]
has_logits_stats = len(output_top_logprobs) >= 7 and output_top_logprobs[3] is not None

if is_logprobslists:
num_tokens = len(output_top_logprobs.logprobs)
_tk_ids = lambda idx: output_top_logprobs.logprob_token_ids[idx]
_lps = lambda idx: output_top_logprobs.logprobs[idx]
_ranks = lambda idx: output_top_logprobs.sampled_token_ranks[idx]
_lmin = lambda idx: output_top_logprobs.logits_min[idx]
_lmax = lambda idx: output_top_logprobs.logits_max[idx]
_lmean = lambda idx: output_top_logprobs.logits_mean[idx]
_lstd = lambda idx: output_top_logprobs.logits_std[idx]
else:
num_tokens = len(output_top_logprobs[1])
_tk_ids = lambda idx: output_top_logprobs[0][idx]
_lps = lambda idx: output_top_logprobs[1][idx]
_ranks = lambda idx: output_top_logprobs[2][idx]
_lmin = lambda idx: output_top_logprobs[3][idx]
_lmax = lambda idx: output_top_logprobs[4][idx]
_lmean = lambda idx: output_top_logprobs[5][idx]
_lstd = lambda idx: output_top_logprobs[6][idx]

for idx in range(num_tokens):
logits_stats = None
if has_logits_stats:
top_logprobs = LogprobsLists(
logprob_token_ids=[_tk_ids(idx)],
logprobs=[_lps(idx)],
sampled_token_ranks=[_ranks(idx)],
logits_min=[_lmin(idx)],
logits_max=[_lmax(idx)],
logits_mean=[_lmean(idx)],
logits_std=[_lstd(idx)],
)
logits_stats = {
"min": float(_lmin(idx)),
"max": float(_lmax(idx)),
"mean": float(_lmean(idx)),
"std": float(_lstd(idx)),
}
else:
top_logprobs = LogprobsLists(
logprob_token_ids=[_tk_ids(idx)],
logprobs=[_lps(idx)],
sampled_token_ranks=[_ranks(idx)],
)
step_logprobs_res = self._build_logprobs_response(
request_logprobs=request_logprobs,
response_logprobs=top_logprobs,
request_top_logprobs=request_top_logprobs,
request_decode_flag=request_decode_flag,
logits_stats=logits_stats,
)
if logprobs_res is None:
logprobs_res = step_logprobs_res
Expand All @@ -854,6 +908,7 @@ def _build_logprobs_response(
response_logprobs: Optional[LogprobsLists],
request_top_logprobs: int,
request_decode_flag: bool,
logits_stats: Optional[dict[str, float]] = None,
) -> Optional[LogProbs]:
Comment on lines 905 to 912
Copy link

Copilot AI Mar 25, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

_build_logprobs_response 的类型注解使用了 dict[str, float],但本文件未启用 from __future__ import annotations,在 Python 3.7/3.8(setup.py 标注支持 >=3.7)会在导入时报 TypeError: 'type' object is not subscriptable。建议改为 Optional[Dict[str, float]](并从 typing 引入 Dict),或在文件顶部添加 from __future__ import annotations 后再统一使用内置泛型。

Copilot uses AI. Check for mistakes.
"""
Construct a logprobs response object in line with the OpenAI style.
Expand Down Expand Up @@ -901,6 +956,7 @@ def _build_logprobs_response(
logprob=top_logprob_entries[0].logprob,
bytes=top_logprob_entries[0].bytes,
top_logprobs=top_logprob_entries[1:], # Here are the complete topk candidates
logits_stats=logits_stats,
)

return LogProbs(content=[sampled_entry])
Expand All @@ -922,7 +978,7 @@ def _build_prompt_logprobs(
tensors.
"""

token_ids, logprobs, ranks = prompt_logprobs_tensors
token_ids, logprobs, ranks = prompt_logprobs_tensors[:3]

# Normalize to plain Python lists (support both Tensor and list inputs)
if hasattr(token_ids, "tolist"):
Expand Down
2 changes: 1 addition & 1 deletion fastdeploy/entrypoints/openai/serving_completion.py
Original file line number Diff line number Diff line change
Expand Up @@ -905,7 +905,7 @@ def _build_prompt_logprobs(
tensors.
"""

token_ids, logprobs, ranks = prompt_logprobs_tensors
token_ids, logprobs, ranks = prompt_logprobs_tensors[:3]

# Normalize to plain Python lists (support both Tensor and list inputs)
if hasattr(token_ids, "tolist"):
Expand Down
21 changes: 21 additions & 0 deletions fastdeploy/output/token_processor.py
Original file line number Diff line number Diff line change
Expand Up @@ -83,6 +83,7 @@ def __init__(self, cfg, cached_generated_tokens, engine_worker_queue, split_conn

self.speculative_decoding = self.cfg.speculative_config.method is not None
self.use_logprobs = self.cfg.model_config.enable_logprob
self.compute_logits_stats = self.cfg.model_config.compute_logits_stats
self.enable_draft_logprob = self.cfg.speculative_config.enable_draft_logprob

if self.speculative_decoding:
Expand Down Expand Up @@ -350,6 +351,26 @@ def _process_batch_output_use_zmq(self, receive_datas):
logprobs_list: LogprobsLists = stream_data.logprobs.tolists()
result.outputs.logprob = float(logprobs_list.logprobs[0][0])
result.outputs.top_logprobs = logprobs_list
# Extract logits statistics if available
if self.compute_logits_stats:
assert (
logprobs_list.logits_min is not None
), "logits_min is None when compute_logits_stats is enabled"
assert (
logprobs_list.logits_max is not None
), "logits_max is None when compute_logits_stats is enabled"
assert (
logprobs_list.logits_mean is not None
), "logits_mean is None when compute_logits_stats is enabled"
assert (
logprobs_list.logits_std is not None
), "logits_std is None when compute_logits_stats is enabled"
Comment on lines +356 to +367
Copy link

Copilot AI Mar 25, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

这里用 assert ... is not None 来保证 logits_* 存在:
1)在 python -O 下 assert 会被跳过,可能导致后续 float(None) 等异常;
2)assert 触发后会被外层 except 吞掉,只打 warning,最终静默缺失 logits_stats,与 --compute-logits-stats 的预期不一致。
建议改成显式的条件判断:若缺字段则记录更明确的错误并决定是否直接报错/降级关闭 logits_stats 输出。

Suggested change
assert (
logprobs_list.logits_min is not None
), "logits_min is None when compute_logits_stats is enabled"
assert (
logprobs_list.logits_max is not None
), "logits_max is None when compute_logits_stats is enabled"
assert (
logprobs_list.logits_mean is not None
), "logits_mean is None when compute_logits_stats is enabled"
assert (
logprobs_list.logits_std is not None
), "logits_std is None when compute_logits_stats is enabled"
missing_fields = []
if logprobs_list.logits_min is None:
missing_fields.append("logits_min")
if logprobs_list.logits_max is None:
missing_fields.append("logits_max")
if logprobs_list.logits_mean is None:
missing_fields.append("logits_mean")
if logprobs_list.logits_std is None:
missing_fields.append("logits_std")
if missing_fields:
# When compute_logits_stats is enabled, all logits_* fields must be present
raise ValueError(
"Missing logits stats fields when compute_logits_stats is enabled: "
+ ", ".join(missing_fields)
)

Copilot uses AI. Check for mistakes.
result.outputs.logits_stats = {
"min": float(logprobs_list.logits_min[0]),
"max": float(logprobs_list.logits_max[0]),
"mean": float(logprobs_list.logits_mean[0]),
"std": float(logprobs_list.logits_std[0]),
}
except Exception as e:
llm_logger.warning(f"Failed to parse logprobs from StreamTransferData: {e}")
if getattr(stream_data, "prompt_logprobs", None) is not None:
Expand Down
5 changes: 2 additions & 3 deletions fastdeploy/worker/gpu_model_runner.py
Original file line number Diff line number Diff line change
Expand Up @@ -3052,9 +3052,8 @@ def _get_prompt_logprobs_list(
raw_logprobs = self.sampler.compute_logprobs(logits)
elif logprobs_mode == "raw_logits":
raw_logprobs = logits
token_ids, logprobs, ranks = self.sampler.gather_logprobs(
raw_logprobs, num_prompt_logprobs, prompt_token_ids_tensor
)
gathered = self.sampler.gather_logprobs(raw_logprobs, num_prompt_logprobs, prompt_token_ids_tensor)
token_ids, logprobs, ranks = gathered.logprob_token_ids, gathered.logprobs, gathered.selected_token_ranks
# Synchronize before using token_ids, logprobs and ranks to ensure async copy are completed.
paddle.device.synchronize()
chunk_slice = slice(start_idx, start_idx + num_logits)
Expand Down
5 changes: 2 additions & 3 deletions fastdeploy/worker/metax_model_runner.py
Original file line number Diff line number Diff line change
Expand Up @@ -2833,9 +2833,8 @@ def _get_prompt_logprobs_list(
raw_logprobs = self.sampler.compute_logprobs(logits)
elif logprobs_mode == "raw_logits":
raw_logprobs = logits
token_ids, logprobs, ranks = self.sampler.gather_logprobs(
raw_logprobs, num_prompt_logprobs, prompt_token_ids_tensor
)
gathered = self.sampler.gather_logprobs(raw_logprobs, num_prompt_logprobs, prompt_token_ids_tensor)
token_ids, logprobs, ranks = gathered.logprob_token_ids, gathered.logprobs, gathered.selected_token_ranks
chunk_slice = slice(start_idx, start_idx + num_logits)
logprobs_tensors.logprob_token_ids[chunk_slice].copy_(token_ids, False)
logprobs_tensors.logprobs[chunk_slice].copy_(logprobs, False)
Expand Down
50 changes: 50 additions & 0 deletions fastdeploy/worker/output.py
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,11 @@ class LogprobsLists(NamedTuple):
logprobs: list[list[float]]
# [num_reqs]
sampled_token_ranks: list[int]
# Logits statistics for each sequence (optional)
logits_min: Optional[list[float]] = None # [num_reqs]
logits_max: Optional[list[float]] = None # [num_reqs]
logits_mean: Optional[list[float]] = None # [num_reqs]
logits_std: Optional[list[float]] = None # [num_reqs]
Comment on lines 44 to +51
Copy link

Copilot AI Mar 25, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

本文件未启用 from __future__ import annotations,但新增的 Optional[list[float]] / list[list[int]] 等内置泛型注解在 Python 3.7/3.8 下会导致导入时异常;同时 setup.py 仍声明 python_requires=">=3.7"。建议:1)在文件顶部增加 from __future__ import annotations;或 2)把这些新增注解改为 Optional[List[float]] 等 typing 形式并补充导入,以保持与声明的 Python 版本兼容。

Copilot uses AI. Check for mistakes.
Comment on lines +48 to +51
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

PR里没有这些参数的计算逻辑?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

暂时只监控eb5a3b,相关计算逻辑在EB5_serving中


def slice_columns(self, start: int, end: int):
"""
Expand All @@ -54,6 +59,10 @@ def slice_columns(self, start: int, end: int):
[row[start:end] for row in self.logprob_token_ids],
[row[start:end] for row in self.logprobs],
self.sampled_token_ranks, # unchanged
self.logits_min, # unchanged
self.logits_max, # unchanged
self.logits_mean, # unchanged
self.logits_std, # unchanged
Comment on lines 58 to +65
Copy link

Copilot AI Mar 25, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

slice_columns 里保留了被注释掉的 logits_* 切片代码(62-65 行),当前实现又选择“unchanged”透传这些字段,容易让人误解哪些字段需要随列切片。建议删除注释代码并在 docstring/注释里明确 logits_* 的维度语义(是否按 position/token 对齐,还是按 request 对齐)。

Copilot uses AI. Check for mistakes.
)

def slice_rows(self, start: int, end: int):
Expand All @@ -65,6 +74,10 @@ def slice_rows(self, start: int, end: int):
self.logprob_token_ids[start:end],
self.logprobs[start:end],
self.sampled_token_ranks[start:end],
self.logits_min[start:end] if self.logits_min is not None else None,
self.logits_max[start:end] if self.logits_max is not None else None,
self.logits_mean[start:end] if self.logits_mean is not None else None,
self.logits_std[start:end] if self.logits_std is not None else None,
)


Expand All @@ -77,13 +90,22 @@ class LogprobsTensors(NamedTuple):
logprobs: paddle.Tensor
# [num_reqs]
selected_token_ranks: paddle.Tensor
# Logits statistics for each sequence (optional)
logits_min: Optional[paddle.Tensor] = None # [num_reqs]
logits_max: Optional[paddle.Tensor] = None # [num_reqs]
logits_mean: Optional[paddle.Tensor] = None # [num_reqs]
logits_std: Optional[paddle.Tensor] = None

def tolists(self):
"""Convert to lists."""
return LogprobsLists(
self.logprob_token_ids.tolist(),
self.logprobs.tolist(),
self.selected_token_ranks.tolist(),
self.logits_min.tolist() if self.logits_min is not None else None,
self.logits_max.tolist() if self.logits_max is not None else None,
self.logits_mean.tolist() if self.logits_mean is not None else None,
self.logits_std.tolist() if self.logits_std is not None else None,
)

@staticmethod
Expand All @@ -97,6 +119,10 @@ def empty_cpu(num_positions: int, num_tokens_per_position: int) -> "LogprobsTens
logprob_token_ids=logprob_token_ids,
logprobs=logprobs,
selected_token_ranks=selected_token_ranks,
logits_min=None,
logits_max=None,
logits_mean=None,
logits_std=None,
)

@staticmethod
Expand All @@ -110,6 +136,10 @@ def empty(num_positions: int, num_tokens_per_position: int) -> "LogprobsTensors"
logprob_token_ids=logprob_token_ids,
logprobs=logprobs,
selected_token_ranks=selected_token_ranks,
logits_min=None,
logits_max=None,
logits_mean=None,
logits_std=None,
)

def slice_rows(self, start: int, end: int):
Expand All @@ -122,6 +152,26 @@ def slice_rows(self, start: int, end: int):
paddle.to_tensor(self.logprob_token_ids.cpu()[start:end], place="cpu"),
paddle.to_tensor(self.logprobs.cpu()[start:end], place="cpu"),
paddle.to_tensor(self.selected_token_ranks.cpu()[start:end], place="cpu"),
(
paddle.to_tensor(self.logits_min.cpu()[start:end], place="cpu")
if self.logits_min is not None
else None
),
(
paddle.to_tensor(self.logits_max.cpu()[start:end], place="cpu")
if self.logits_max is not None
else None
),
(
paddle.to_tensor(self.logits_mean.cpu()[start:end], place="cpu")
if self.logits_mean is not None
else None
),
(
paddle.to_tensor(self.logits_std.cpu()[start:end], place="cpu")
if self.logits_std is not None
else None
),
)


Expand Down
5 changes: 5 additions & 0 deletions fastdeploy/worker/worker_process.py
Original file line number Diff line number Diff line change
Expand Up @@ -996,6 +996,11 @@ def parse_args():
action="store_true",
help="Enable output of token-level log probabilities.",
)
parser.add_argument(
"--compute_logits_stats",
action="store_true",
help="Enable per-token logits statistics (min/max/mean/std) output.",
)
parser.add_argument(
"--max_logprobs",
type=int,
Expand Down
Loading
Loading