Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
127 changes: 96 additions & 31 deletions backend/cpp/llama-cpp/grpc-server.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1309,6 +1309,7 @@ class BackendServiceImpl final : public backend::Backend::Service {

body_json["messages"] = messages_json;
body_json["stream"] = true; // PredictStream is always streaming
body_json["stream_options"] = {{"include_usage", true}}; // Ensure token counts in final chunk

// Check if grammar is provided from Go layer (NoGrammar=false)
// If grammar is provided, we must use it and NOT let template generate grammar from tools
Expand Down Expand Up @@ -1616,8 +1617,11 @@ class BackendServiceImpl final : public backend::Backend::Service {
data);
task.id_slot = json_value(data, "id_slot", -1);

// OAI-compat
task.params.res_type = TASK_RESPONSE_TYPE_NONE;
// OAI-compat: enable autoparser (PEG-based chat parsing) so that
// reasoning, tool calls, and content are classified into ChatDeltas.
// Without this, the PEG parser never produces diffs and the Go side
// cannot detect tool calls or separate reasoning from content.
task.params.res_type = TASK_RESPONSE_TYPE_OAI_CHAT;
task.params.oaicompat_cmpl_id = completion_id;
// oaicompat_model is already populated by params_from_json_cmpl

Expand All @@ -1642,19 +1646,47 @@ class BackendServiceImpl final : public backend::Backend::Service {
return grpc::Status(grpc::StatusCode::INTERNAL, error_json.value("message", "Error occurred"));
}

// Lambda to build a Reply from JSON + attach chat deltas from a result
// Lambda to build a Reply from JSON + attach chat deltas from a result.
// Handles both native format ({"content": "..."}) and OAI chat format
// ({"choices": [{"delta": {"content": "...", "reasoning": "..."}}]}).
auto build_reply_from_json = [](const json & res_json, server_task_result * raw_result) -> backend::Reply {
backend::Reply reply;
std::string completion_text = res_json.value("content", "");
std::string completion_text;

if (res_json.contains("choices")) {
// OAI chat format — extract content from choices[0].delta
const auto & choices = res_json.at("choices");
if (!choices.empty()) {
const auto & delta = choices[0].value("delta", json::object());
if (delta.contains("content") && !delta.at("content").is_null()) {
completion_text = delta.at("content").get<std::string>();
}
}
} else {
// Native llama.cpp format
completion_text = res_json.value("content", "");
}

reply.set_message(completion_text);
reply.set_tokens(res_json.value("tokens_predicted", 0));
reply.set_prompt_tokens(res_json.value("tokens_evaluated", 0));

// Token counts: native format has top-level fields,
// OAI format has them in "usage" (final chunk only)
if (res_json.contains("usage")) {
const auto & usage = res_json.at("usage");
reply.set_tokens(usage.value("completion_tokens", 0));
reply.set_prompt_tokens(usage.value("prompt_tokens", 0));
} else {
reply.set_tokens(res_json.value("tokens_predicted", 0));
reply.set_prompt_tokens(res_json.value("tokens_evaluated", 0));
}

// Timings: present as top-level "timings" in both formats
if (res_json.contains("timings")) {
reply.set_timing_prompt_processing(res_json.at("timings").value("prompt_ms", 0.0));
reply.set_timing_token_generation(res_json.at("timings").value("predicted_ms", 0.0));
}

// Logprobs: extract_logprobs_from_json handles both formats
json logprobs_json = extract_logprobs_from_json(res_json);
if (!logprobs_json.empty() && !logprobs_json.is_null()) {
reply.set_logprobs(logprobs_json.dump());
Expand All @@ -1663,21 +1695,17 @@ class BackendServiceImpl final : public backend::Backend::Service {
return reply;
};

// Attach chat deltas from the autoparser to a Reply.
// When diffs are available, populate ChatDeltas on the reply.
// The raw message is always preserved so the Go side can use it
// for reasoning extraction and tool call parsing as a fallback
// (important in distributed mode where ChatDeltas may not be
// the primary parsing path).
auto attach_chat_deltas = [](backend::Reply & reply, server_task_result * raw_result) {
// Try streaming partial result first
auto* partial = dynamic_cast<server_task_result_cmpl_partial*>(raw_result);
if (partial) {
if (!partial->oaicompat_msg_diffs.empty()) {
populate_chat_deltas_from_diffs(reply, partial->oaicompat_msg_diffs);
} else if (partial->is_updated) {
// Autoparser is active but hasn't classified this chunk yet
// (PEG parser warming up). Clear the raw message so the Go
// side doesn't try to parse partial tag tokens (e.g. "<|channel>"
// before the full "<|channel>thought\n" is received).
// This matches llama.cpp server behavior which only emits SSE
// chunks when the parser produces diffs.
reply.set_message("");
}
if (partial && !partial->oaicompat_msg_diffs.empty()) {
populate_chat_deltas_from_diffs(reply, partial->oaicompat_msg_diffs);
return;
}
// Try final result
Expand Down Expand Up @@ -2357,8 +2385,9 @@ class BackendServiceImpl final : public backend::Backend::Service {
data);
task.id_slot = json_value(data, "id_slot", -1);

// OAI-compat
task.params.res_type = TASK_RESPONSE_TYPE_NONE;
// OAI-compat: enable autoparser (PEG-based chat parsing) so that
// reasoning, tool calls, and content are classified into ChatDeltas.
task.params.res_type = TASK_RESPONSE_TYPE_OAI_CHAT;
task.params.oaicompat_cmpl_id = completion_id;
// oaicompat_model is already populated by params_from_json_cmpl

Expand Down Expand Up @@ -2389,25 +2418,48 @@ class BackendServiceImpl final : public backend::Backend::Service {
auto* final_res = dynamic_cast<server_task_result_cmpl_final*>(all_results.results[0].get());
GGML_ASSERT(final_res != nullptr);
json result_json = all_results.results[0]->to_json();
reply->set_message(result_json.value("content", ""));

int32_t tokens_predicted = result_json.value("tokens_predicted", 0);
// Handle both native format ({"content": "...", "tokens_predicted": N})
// and OAI chat format ({"choices": [{"message": {"content": "..."}}],
// "usage": {"completion_tokens": N, "prompt_tokens": N}}).
std::string completion_text;
int32_t tokens_predicted = 0;
int32_t tokens_evaluated = 0;

if (result_json.contains("choices")) {
// OAI chat format
const auto & choices = result_json.at("choices");
if (!choices.empty()) {
const auto & msg = choices[0].value("message", json::object());
if (msg.contains("content") && !msg.at("content").is_null()) {
completion_text = msg.at("content").get<std::string>();
}
}
if (result_json.contains("usage")) {
const auto & usage = result_json.at("usage");
tokens_predicted = usage.value("completion_tokens", 0);
tokens_evaluated = usage.value("prompt_tokens", 0);
}
} else {
// Native llama.cpp format
completion_text = result_json.value("content", "");
tokens_predicted = result_json.value("tokens_predicted", 0);
tokens_evaluated = result_json.value("tokens_evaluated", 0);
}
reply->set_message(completion_text);
reply->set_tokens(tokens_predicted);
int32_t tokens_evaluated = result_json.value("tokens_evaluated", 0);
reply->set_prompt_tokens(tokens_evaluated);

// Timings: present in both formats as a top-level "timings" object
if (result_json.contains("timings")) {
double timing_prompt_processing = result_json.at("timings").value("prompt_ms", 0.0);
reply->set_timing_prompt_processing(timing_prompt_processing);
double timing_token_generation = result_json.at("timings").value("predicted_ms", 0.0);
reply->set_timing_token_generation(timing_token_generation);
reply->set_timing_prompt_processing(result_json.at("timings").value("prompt_ms", 0.0));
reply->set_timing_token_generation(result_json.at("timings").value("predicted_ms", 0.0));
}

// Extract and set logprobs if present
// Logprobs: extract_logprobs_from_json handles both formats
json logprobs_json = extract_logprobs_from_json(result_json);
if (!logprobs_json.empty() && !logprobs_json.is_null()) {
std::string logprobs_str = logprobs_json.dump();
reply->set_logprobs(logprobs_str);
reply->set_logprobs(logprobs_json.dump());
}

// Populate chat deltas from the autoparser's final parsed message
Expand All @@ -2423,7 +2475,20 @@ class BackendServiceImpl final : public backend::Backend::Service {
for (auto & res : all_results.results) {
GGML_ASSERT(dynamic_cast<server_task_result_cmpl_final*>(res.get()) != nullptr);
json res_json = res->to_json();
arr.push_back(res_json.value("content", ""));
// Handle both native and OAI chat formats
std::string result_content;
if (res_json.contains("choices")) {
const auto & choices = res_json.at("choices");
if (!choices.empty()) {
const auto & msg = choices[0].value("message", json::object());
if (msg.contains("content") && !msg.at("content").is_null()) {
result_content = msg.at("content").get<std::string>();
}
}
} else {
result_content = res_json.value("content", "");
}
arr.push_back(result_content);

// Extract logprobs for each result
json logprobs_json = extract_logprobs_from_json(res_json);
Expand Down
26 changes: 23 additions & 3 deletions core/http/endpoints/openai/chat.go
Original file line number Diff line number Diff line change
Expand Up @@ -147,10 +147,23 @@ func ChatEndpoint(cl *config.ModelConfigLoader, ml *model.ModelLoader, evaluator
result := ""
lastEmittedCount := 0
sentInitialRole := false
hasChatDeltaToolCalls := false
hasChatDeltaContent := false

_, tokenUsage, chatDeltas, err := ComputeChoices(req, prompt, config, cl, startupOptions, loader, func(s string, c *[]schema.Choice) {}, func(s string, usage backend.TokenUsage) bool {
result += s

// Track whether ChatDeltas from the C++ autoparser contain
// tool calls or content, so the retry decision can account for them.
for _, d := range usage.ChatDeltas {
if len(d.ToolCalls) > 0 {
hasChatDeltaToolCalls = true
}
if d.Content != "" {
hasChatDeltaContent = true
}
}

var reasoningDelta, contentDelta string

goReasoning, goContent := extractor.ProcessToken(s)
Expand Down Expand Up @@ -309,15 +322,22 @@ func ChatEndpoint(cl *config.ModelConfigLoader, ml *model.ModelLoader, evaluator
// After streaming completes: check if we got actionable content
cleaned := extractor.CleanedContent()
// Check for tool calls from chat deltas (will be re-checked after ComputeChoices,
// but we need to know here whether to retry)
hasToolCalls := lastEmittedCount > 0
if cleaned == "" && !hasToolCalls {
// but we need to know here whether to retry).
// Also check ChatDelta flags — when the C++ autoparser is active,
// tool calls and content are delivered via ChatDeltas while the
// raw message is cleared. Without this check, we'd retry
// unnecessarily, losing valid results and concatenating output.
hasToolCalls := lastEmittedCount > 0 || hasChatDeltaToolCalls
hasContent := cleaned != "" || hasChatDeltaContent
if !hasContent && !hasToolCalls {
xlog.Warn("Streaming: backend produced only reasoning, retrying",
"reasoning_len", len(extractor.Reasoning()), "attempt", attempt+1)
extractor.ResetAndSuppressReasoning()
result = ""
lastEmittedCount = 0
sentInitialRole = false
hasChatDeltaToolCalls = false
hasChatDeltaContent = false
return true
}
return false
Expand Down
37 changes: 31 additions & 6 deletions core/http/endpoints/openai/inference.go
Original file line number Diff line number Diff line change
Expand Up @@ -113,11 +113,23 @@ func ComputeChoices(
}
prediction = p

// Built-in: retry on truly empty response (no tokens at all)
// Built-in: retry on truly empty response (no tokens at all).
// However, when the C++ autoparser is active, it clears the raw
// message and delivers content via ChatDeltas instead. Do NOT
// retry if ChatDeltas contain tool calls or content.
if strings.TrimSpace(prediction.Response) == "" && attempt < maxRetries {
xlog.Warn("Backend returned empty response, retrying",
"attempt", attempt+1, "maxRetries", maxRetries)
continue
hasChatDeltaData := false
for _, d := range prediction.ChatDeltas {
if d.Content != "" || len(d.ToolCalls) > 0 {
hasChatDeltaData = true
break
}
}
if !hasChatDeltaData {
xlog.Warn("Backend returned empty response, retrying",
"attempt", attempt+1, "maxRetries", maxRetries)
continue
}
}

tokenUsage.Prompt = prediction.Usage.Prompt
Expand All @@ -130,8 +142,21 @@ func ComputeChoices(
finetunedResponse := backend.Finetune(*config, predInput, prediction.Response)
cb(finetunedResponse, &result)

// Caller-driven retry (tool parsing, reasoning-only, etc.)
if shouldRetryFn != nil && shouldRetryFn(attempt) && attempt < maxRetries {
// Caller-driven retry (tool parsing, reasoning-only, etc.).
// When the C++ autoparser is active, it clears the raw response
// and delivers data via ChatDeltas. If the response is empty but
// ChatDeltas contain actionable data, skip the caller retry —
// the autoparser already parsed the response successfully.
skipCallerRetry := false
if strings.TrimSpace(prediction.Response) == "" && len(prediction.ChatDeltas) > 0 {
for _, d := range prediction.ChatDeltas {
if d.Content != "" || len(d.ToolCalls) > 0 {
skipCallerRetry = true
break
}
}
}
if shouldRetryFn != nil && !skipCallerRetry && shouldRetryFn(attempt) && attempt < maxRetries {
// Caller has already reset its state inside shouldRetry
result = result[:0]
allChatDeltas = nil
Expand Down
19 changes: 19 additions & 0 deletions tests/e2e/e2e_suite_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -101,6 +101,25 @@ var _ = BeforeSuite(func() {
Expect(err).ToNot(HaveOccurred())
Expect(os.WriteFile(configPath, configYAML, 0644)).To(Succeed())

// Create model config for autoparser tests (NoGrammar so tool calls
// are driven entirely by the backend's ChatDeltas, not grammar enforcement)
autoparserConfig := map[string]any{
"name": "mock-model-autoparser",
"backend": "mock-backend",
"parameters": map[string]any{
"model": "mock-model.bin",
},
"function": map[string]any{
"grammar": map[string]any{
"disable": true,
},
},
}
autoparserPath := filepath.Join(modelsPath, "mock-model-autoparser.yaml")
autoparserYAML, err := yaml.Marshal(autoparserConfig)
Expect(err).ToNot(HaveOccurred())
Expect(os.WriteFile(autoparserPath, autoparserYAML, 0644)).To(Succeed())

// Start mock MCP server and create MCP-enabled model config
mcpServerURL, mcpServerShutdown = startMockMCPServer()
mcpConfig := mcpModelConfig(mcpServerURL)
Expand Down
Loading
Loading