Skip to content

Commit f17d99f

Browse files
localai-botmudler
andauthored
fix(streaming/tools): stop healing-marker stubs from gating off content (#9999)
* fix(streaming/tools): stop healing-marker stubs from gating off content When the C++ autoparser is in pure-content fallback mode (e.g. qwen3 without --jinja) and the model emits a tool call as JSON, the streaming worker calls ParseJSONIterative on each new chunk. parseJSONWithStack heals partial input like `{` into `{"<marker>":1}` where <marker> is a random integer. removeHealingMarkerFromJSON only stripped the marker from values, so the synthetic key survived and downstream callers saw a stub object with a random-looking key. chat_stream_workers.go's JSON tool-call detector then bumped lastEmittedCount past the stub even though no real tool call was emitted, gating off ALL subsequent content chunks. The qwen3 + tools + streaming case ended up dribbling only the first `{"` to clients and then nothing, even when the model went on to call the noAction `answer({"message": "…"})` pseudo-tool. Three changes, each with its own regression test: * removeHealingMarkerFromJSON now strips the marker suffix from keys too, dropping the entry when the truncated key is empty. Inputs like `{` no longer leak `{"<marker>":1}` to callers; partial keys like `{ "code` still preserve the model-typed prefix `code`. * ParseJSONIterative skips empty-after-healing maps so a healed `{` doesn't surface as a stub result. * The streaming JSON detector now breaks (not continues) on entries without a usable `name`, and only bumps lastEmittedCount past successfully-emitted entries. Defense-in-depth against any future partial-parse shape. The parser tests cover eight partial-JSON-prefix shapes and verify no marker characters leak into keys, plus the two early shapes (`{`, `{"`) that should not surface a stub at all. Fixes #9988 Assisted-by: Claude:opus-4-7 [Read] [Edit] [Bash] Signed-off-by: Ettore Di Giacinto <mudler@localai.io> * test(streaming/tools): cover the autoparser-correctly-working path Extract the JSON tool-call streaming emit loop into emitJSONToolCallDeltas and unit-test it against every shape that can hit the streaming worker: * the bug case — a healing-marker stub at index 0 must NOT bump lastEmittedCount, so subsequent content chunks keep flowing; * the autoparser-correctly-working case — empty jsonResults (because the C++ autoparser cleared the raw text and delivers tool calls via TokenUsage.ChatDeltas) is a no-op, leaving the deferred end-of-stream emitter to ship the autoparser's tool calls; * a single complete tool call — emit one chunk, advance to 1; * arguments arriving as a JSON-string vs as a nested object — both serialize to the wire as JSON-string arguments; * multiple parallel tool calls — one chunk each; * a real tool call followed by a partial stub — emit the real one, stop at the stub, resume on a later chunk once the stub completes. Locks down the no-regression guarantee the user asked for: this PR's fix is scoped to the pure-content fallback path; when the autoparser actually classifies tool calls (jinja-recognized chat format with tool support), the helper is a no-op and nothing changes. Assisted-by: Claude:opus-4-7 [Read] [Edit] [Bash] [Write] Signed-off-by: Ettore Di Giacinto <mudler@localai.io> --------- Signed-off-by: Ettore Di Giacinto <mudler@localai.io> Co-authored-by: Ettore Di Giacinto <mudler@localai.io>
1 parent 597daa9 commit f17d99f

5 files changed

Lines changed: 393 additions & 44 deletions

File tree

core/http/endpoints/openai/chat_stream_workers.go

Lines changed: 75 additions & 43 deletions
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,77 @@ import (
1212
"github.com/mudler/xlog"
1313
)
1414

15+
// emitJSONToolCallDeltas iterates the JSON tool-call objects produced by the
16+
// streaming tool-call detector and emits SSE chunks for the ones the caller
17+
// hasn't already emitted. It returns the new lastEmittedCount.
18+
//
19+
// Semantics:
20+
// - Skips entries before lastEmittedCount (already emitted).
21+
// - Emits one tool_call chunk per consecutive entry that has a usable
22+
// `name` string.
23+
// - Stops at the first entry without a name (typically the partial-JSON
24+
// tail or a healing-marker stub — see issue #9988) so the caller doesn't
25+
// advance past it. Bumping lastEmittedCount past an unparsed stub
26+
// permanently gates off content emission for the rest of the stream.
27+
// - When jsonResults is empty (the autoparser-working case, where the raw
28+
// text result is cleared and only ChatDeltas carry tool calls), this is
29+
// a no-op and lastEmittedCount is returned unchanged.
30+
//
31+
// The autoparser-correctly-classifying-tool-calls path is unaffected: it
32+
// delivers tool calls via TokenUsage.ChatDeltas, and the deferred
33+
// end-of-stream block (ToolCallsFromChatDeltas → buildDeferredToolCallChunks)
34+
// emits them; this helper sees an empty jsonResults and emits nothing.
35+
func emitJSONToolCallDeltas(
36+
jsonResults []map[string]any,
37+
lastEmittedCount int,
38+
id, model string,
39+
created int,
40+
responses chan<- schema.OpenAIResponse,
41+
) int {
42+
for i := lastEmittedCount; i < len(jsonResults); i++ {
43+
jsonObj := jsonResults[i]
44+
name, ok := jsonObj["name"].(string)
45+
if !ok || name == "" {
46+
break
47+
}
48+
args := "{}"
49+
if argsVal, ok := jsonObj["arguments"]; ok {
50+
if argsStr, ok := argsVal.(string); ok {
51+
args = argsStr
52+
} else {
53+
argsBytes, _ := json.Marshal(argsVal)
54+
args = string(argsBytes)
55+
}
56+
}
57+
responses <- schema.OpenAIResponse{
58+
ID: id,
59+
Created: created,
60+
Model: model,
61+
Choices: []schema.Choice{{
62+
Delta: &schema.Message{
63+
Role: "assistant",
64+
ToolCalls: []schema.ToolCall{
65+
{
66+
Index: i,
67+
ID: id,
68+
Type: "function",
69+
FunctionCall: schema.FunctionCall{
70+
Name: name,
71+
Arguments: args,
72+
},
73+
},
74+
},
75+
},
76+
Index: 0,
77+
FinishReason: nil,
78+
}},
79+
Object: "chat.completion.chunk",
80+
}
81+
lastEmittedCount = i + 1
82+
}
83+
return lastEmittedCount
84+
}
85+
1586
// processStream is the streaming worker for chat completions with no
1687
// tool/function calling involved. It pushes SSE-shaped chunks onto
1788
// `responses` and returns the authoritative cumulative TokenUsage from
@@ -279,49 +350,10 @@ func processStreamWithTools(
279350
// Try JSON tool call parsing for streaming.
280351
// Only emit NEW tool calls (same guard as XML parser above).
281352
jsonResults, jsonErr := functions.ParseJSONIterative(cleanedResult, true)
282-
if jsonErr == nil && len(jsonResults) > lastEmittedCount {
283-
for i := lastEmittedCount; i < len(jsonResults); i++ {
284-
jsonObj := jsonResults[i]
285-
name, ok := jsonObj["name"].(string)
286-
if !ok || name == "" {
287-
continue
288-
}
289-
args := "{}"
290-
if argsVal, ok := jsonObj["arguments"]; ok {
291-
if argsStr, ok := argsVal.(string); ok {
292-
args = argsStr
293-
} else {
294-
argsBytes, _ := json.Marshal(argsVal)
295-
args = string(argsBytes)
296-
}
297-
}
298-
initialMessage := schema.OpenAIResponse{
299-
ID: id,
300-
Created: created,
301-
Model: req.Model,
302-
Choices: []schema.Choice{{
303-
Delta: &schema.Message{
304-
Role: "assistant",
305-
ToolCalls: []schema.ToolCall{
306-
{
307-
Index: i,
308-
ID: id,
309-
Type: "function",
310-
FunctionCall: schema.FunctionCall{
311-
Name: name,
312-
Arguments: args,
313-
},
314-
},
315-
},
316-
},
317-
Index: 0,
318-
FinishReason: nil,
319-
}},
320-
Object: "chat.completion.chunk",
321-
}
322-
responses <- initialMessage
323-
}
324-
lastEmittedCount = len(jsonResults)
353+
if jsonErr == nil {
354+
lastEmittedCount = emitJSONToolCallDeltas(
355+
jsonResults, lastEmittedCount, id, req.Model, created, responses,
356+
)
325357
}
326358
}
327359
return true
Lines changed: 197 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,197 @@
1+
package openai
2+
3+
import (
4+
"github.com/mudler/LocalAI/core/schema"
5+
. "github.com/onsi/ginkgo/v2"
6+
. "github.com/onsi/gomega"
7+
)
8+
9+
// drainChannel reads everything currently buffered on a channel without
10+
// blocking on close. The helper test channels are sized for the assertions.
11+
func drainChannel(ch <-chan schema.OpenAIResponse) []schema.OpenAIResponse {
12+
var out []schema.OpenAIResponse
13+
for {
14+
select {
15+
case r, ok := <-ch:
16+
if !ok {
17+
return out
18+
}
19+
out = append(out, r)
20+
default:
21+
return out
22+
}
23+
}
24+
}
25+
26+
// nameOf returns the name of the first tool call carried on the choice's
27+
// delta, or "" if none.
28+
func nameOf(r schema.OpenAIResponse) string {
29+
if len(r.Choices) == 0 || r.Choices[0].Delta == nil {
30+
return ""
31+
}
32+
if len(r.Choices[0].Delta.ToolCalls) == 0 {
33+
return ""
34+
}
35+
return r.Choices[0].Delta.ToolCalls[0].FunctionCall.Name
36+
}
37+
38+
var _ = Describe("emitJSONToolCallDeltas", func() {
39+
const (
40+
id = "test-stream"
41+
model = "test-model"
42+
created = 1700000000
43+
)
44+
45+
// The case that motivated this helper. With the previous version of
46+
// the streaming worker, ParseJSONIterative would hand back a stub
47+
// object like `{"4310046988783340008":1}` after the model had only
48+
// emitted `{`. The worker bumped lastEmittedCount unconditionally,
49+
// which permanently gated off content emission for the rest of the
50+
// stream (qwen3-4b with stream:true + tools dribbled only `{"` to
51+
// the client and then nothing). See issue #9988.
52+
Context("partial stub without a usable name", func() {
53+
It("does NOT bump lastEmittedCount and emits nothing", func() {
54+
responses := make(chan schema.OpenAIResponse, 4)
55+
// What ParseJSONIterative used to return for `{`:
56+
stubResults := []map[string]any{
57+
{"4310046988783340008": float64(1)},
58+
}
59+
60+
next := emitJSONToolCallDeltas(stubResults, 0, id, model, created, responses)
61+
62+
Expect(next).To(Equal(0),
63+
"lastEmittedCount must NOT advance past a stub without a name "+
64+
"— otherwise content emission gets permanently gated off")
65+
Expect(drainChannel(responses)).To(BeEmpty(),
66+
"no tool_call chunk should be emitted for a stub without a name")
67+
})
68+
})
69+
70+
// No-regression #1: the autoparser-correctly-working path. When the
71+
// C++ autoparser classifies tool calls itself, the raw text result is
72+
// cleared and ParseJSONIterative on it returns no results — this
73+
// helper must be a no-op so the deferred end-of-stream code can emit
74+
// the tool calls from TokenUsage.ChatDeltas.
75+
Context("empty jsonResults (autoparser-correctly-working path)", func() {
76+
It("is a no-op and leaves lastEmittedCount unchanged", func() {
77+
responses := make(chan schema.OpenAIResponse, 4)
78+
next := emitJSONToolCallDeltas(nil, 0, id, model, created, responses)
79+
Expect(next).To(Equal(0))
80+
Expect(drainChannel(responses)).To(BeEmpty())
81+
})
82+
83+
It("leaves a non-zero lastEmittedCount unchanged when later called with the same length", func() {
84+
responses := make(chan schema.OpenAIResponse, 4)
85+
results := []map[string]any{
86+
{"name": "search", "arguments": map[string]any{"q": "hi"}},
87+
}
88+
// First call emits the one available tool call.
89+
next := emitJSONToolCallDeltas(results, 0, id, model, created, responses)
90+
Expect(next).To(Equal(1))
91+
Expect(drainChannel(responses)).To(HaveLen(1))
92+
93+
// Subsequent chunks haven't grown the slice — must be a no-op.
94+
next = emitJSONToolCallDeltas(results, next, id, model, created, responses)
95+
Expect(next).To(Equal(1))
96+
Expect(drainChannel(responses)).To(BeEmpty())
97+
})
98+
})
99+
100+
// No-regression #2: the normal completed-JSON path. When the model
101+
// emits a real, complete tool call as JSON in raw content (e.g. qwen3
102+
// without jinja but with tools), we should emit exactly one tool_call
103+
// SSE chunk on the first call and become a no-op on later calls.
104+
Context("single complete tool call", func() {
105+
It("emits one tool_call chunk and bumps lastEmittedCount to 1", func() {
106+
responses := make(chan schema.OpenAIResponse, 4)
107+
results := []map[string]any{
108+
{
109+
"name": "search",
110+
"arguments": map[string]any{
111+
"q": "hello",
112+
},
113+
},
114+
}
115+
116+
next := emitJSONToolCallDeltas(results, 0, id, model, created, responses)
117+
118+
Expect(next).To(Equal(1))
119+
out := drainChannel(responses)
120+
Expect(out).To(HaveLen(1))
121+
Expect(nameOf(out[0])).To(Equal("search"))
122+
Expect(out[0].Choices[0].Delta.ToolCalls[0].FunctionCall.Arguments).
123+
To(ContainSubstring(`"q":"hello"`))
124+
})
125+
126+
It("accepts arguments already serialized as a string", func() {
127+
responses := make(chan schema.OpenAIResponse, 4)
128+
results := []map[string]any{
129+
{
130+
"name": "search",
131+
"arguments": `{"q":"hello"}`,
132+
},
133+
}
134+
135+
emitJSONToolCallDeltas(results, 0, id, model, created, responses)
136+
137+
out := drainChannel(responses)
138+
Expect(out).To(HaveLen(1))
139+
Expect(out[0].Choices[0].Delta.ToolCalls[0].FunctionCall.Arguments).
140+
To(Equal(`{"q":"hello"}`))
141+
})
142+
})
143+
144+
// No-regression #3: multiple tool calls (parallel tool calling).
145+
// Both must be emitted, lastEmittedCount must end at 2.
146+
Context("multiple complete tool calls", func() {
147+
It("emits one chunk per tool call and bumps lastEmittedCount to len(results)", func() {
148+
responses := make(chan schema.OpenAIResponse, 8)
149+
results := []map[string]any{
150+
{"name": "search", "arguments": map[string]any{"q": "a"}},
151+
{"name": "browse", "arguments": map[string]any{"url": "b"}},
152+
}
153+
154+
next := emitJSONToolCallDeltas(results, 0, id, model, created, responses)
155+
156+
Expect(next).To(Equal(2))
157+
out := drainChannel(responses)
158+
Expect(out).To(HaveLen(2))
159+
Expect(nameOf(out[0])).To(Equal("search"))
160+
Expect(nameOf(out[1])).To(Equal("browse"))
161+
})
162+
})
163+
164+
// The streaming-tail case: incremental chunks. First parse returns
165+
// one complete tool call followed by a partial stub; later chunks
166+
// complete the second tool call. We must emit the first immediately
167+
// and the second on the later call — without ever bumping past the
168+
// stub mid-stream.
169+
Context("partial tail behind a real tool call", func() {
170+
It("emits the complete entry, stops at the stub, and resumes once the tail completes", func() {
171+
responses := make(chan schema.OpenAIResponse, 8)
172+
173+
// Chunk 1: one real call + a partial stub for the next.
174+
chunk1 := []map[string]any{
175+
{"name": "search", "arguments": map[string]any{"q": "a"}},
176+
{"4310046988783340008": float64(1)},
177+
}
178+
next := emitJSONToolCallDeltas(chunk1, 0, id, model, created, responses)
179+
Expect(next).To(Equal(1),
180+
"must NOT advance to 2 — the stub at index 1 has no usable name")
181+
out := drainChannel(responses)
182+
Expect(out).To(HaveLen(1))
183+
Expect(nameOf(out[0])).To(Equal("search"))
184+
185+
// Chunk 2: the stub completes into a real call.
186+
chunk2 := []map[string]any{
187+
{"name": "search", "arguments": map[string]any{"q": "a"}},
188+
{"name": "browse", "arguments": map[string]any{"url": "b"}},
189+
}
190+
next = emitJSONToolCallDeltas(chunk2, next, id, model, created, responses)
191+
Expect(next).To(Equal(2))
192+
out = drainChannel(responses)
193+
Expect(out).To(HaveLen(1))
194+
Expect(nameOf(out[0])).To(Equal("browse"))
195+
})
196+
})
197+
})

pkg/functions/iterative_parser.go

Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -577,6 +577,21 @@ func trimPotentialPartialWord(content string, format *XMLToolCallFormat, startTh
577577
func removeHealingMarkerFromJSON(value map[string]any, marker string) map[string]any {
578578
result := make(map[string]any)
579579
for k, v := range value {
580+
// Strip the healing marker from KEYS. parseJSONWithStack appends the
581+
// marker to close a partial key (e.g. `{ "code` heals into
582+
// `{"code<marker>":1}`); we want to preserve the prefix the model
583+
// actually emitted. If the entire key was the marker (i.e. the input
584+
// was just `{` heals into `{"<marker>":1}`), the truncated key is
585+
// empty — drop the entry. Without this, downstream callers see a
586+
// stub object with a random integer-looking key and treat it as a
587+
// complete result, the shape that trips chat_stream_workers.go's
588+
// streaming tool-call detector in issue #9988.
589+
if idx := strings.Index(k, marker); idx != -1 {
590+
k = k[:idx]
591+
if k == "" {
592+
continue
593+
}
594+
}
580595
if str, ok := v.(string); ok {
581596
if idx := strings.Index(str, marker); idx != -1 {
582597
v = str[:idx]

pkg/functions/parse.go

Lines changed: 11 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -325,7 +325,17 @@ func ParseJSONIterative(s string, isPartial bool) ([]map[string]any, error) {
325325
if jsonValue != nil {
326326
// Convert to map[string]any if it's an object, or handle arrays
327327
if obj, ok := jsonValue.(map[string]any); ok {
328-
results = append(results, obj)
328+
// Skip stub objects that healed away to nothing. Partial inputs
329+
// like `{`, `{"`, or `{"n` go through parseJSONWithStack and
330+
// come back as `{"<marker>":1}`; after removeHealingMarkerFromJSON
331+
// drops the marker key the map is empty. Returning it as a
332+
// real result trips the streaming tool-call detector
333+
// (chat_stream_workers.go) into thinking a tool call landed,
334+
// gating off content emission for the rest of the stream
335+
// (issue #9988).
336+
if !(isPartialJSON && len(obj) == 0) {
337+
results = append(results, obj)
338+
}
329339
} else if arr, ok := jsonValue.([]any); ok {
330340
// Handle arrays: extract objects from array
331341
for _, item := range arr {

0 commit comments

Comments
 (0)