Skip to content

Commit 5fd4239

Browse files
authored
feat: support streaming mode for tool calls in agent mode, fix interleaved thinking stream (#9023)
Signed-off-by: Ettore Di Giacinto <mudler@localai.io>
1 parent b203025 commit 5fd4239

File tree

7 files changed

+234
-39
lines changed

7 files changed

+234
-39
lines changed

core/http/endpoints/openai/chat.go

Lines changed: 105 additions & 31 deletions
Original file line numberDiff line numberDiff line change
@@ -174,8 +174,78 @@ func ChatEndpoint(cl *config.ModelConfigLoader, ml *model.ModelLoader, evaluator
174174

175175
result := ""
176176
lastEmittedCount := 0
177+
178+
// Track accumulated content for incremental reasoning and content extraction (mirrors process())
179+
accumulatedContent := ""
180+
lastEmittedReasoning := ""
181+
lastEmittedCleanedContent := ""
182+
sentInitialRole := false
183+
177184
_, tokenUsage, chatDeltas, err := ComputeChoices(req, prompt, config, cl, startupOptions, loader, func(s string, c *[]schema.Choice) {}, func(s string, usage backend.TokenUsage) bool {
178185
result += s
186+
accumulatedContent += s
187+
188+
// Incremental reasoning extraction — emit reasoning deltas in their own SSE chunks
189+
// before any tool-call chunks (OpenAI spec: reasoning and tool_calls never share a delta)
190+
currentReasoning, cleanedContent := reason.ExtractReasoningWithConfig(accumulatedContent, thinkingStartToken, config.ReasoningConfig)
191+
192+
var reasoningDelta *string
193+
if currentReasoning != lastEmittedReasoning {
194+
if len(currentReasoning) > len(lastEmittedReasoning) && strings.HasPrefix(currentReasoning, lastEmittedReasoning) {
195+
newReasoning := currentReasoning[len(lastEmittedReasoning):]
196+
reasoningDelta = &newReasoning
197+
lastEmittedReasoning = currentReasoning
198+
} else if currentReasoning != "" {
199+
reasoningDelta = &currentReasoning
200+
lastEmittedReasoning = currentReasoning
201+
}
202+
}
203+
204+
if reasoningDelta != nil && *reasoningDelta != "" {
205+
responses <- schema.OpenAIResponse{
206+
ID: id,
207+
Created: created,
208+
Model: req.Model,
209+
Choices: []schema.Choice{{
210+
Delta: &schema.Message{Reasoning: reasoningDelta},
211+
Index: 0,
212+
}},
213+
Object: "chat.completion.chunk",
214+
}
215+
}
216+
217+
// Stream content deltas (cleaned of reasoning tags) while no tool calls
218+
// have been detected. Once the incremental parser finds tool calls,
219+
// content stops — per OpenAI spec, content and tool_calls don't mix.
220+
if lastEmittedCount == 0 && cleanedContent != "" {
221+
var deltaContent string
222+
if len(cleanedContent) > len(lastEmittedCleanedContent) && strings.HasPrefix(cleanedContent, lastEmittedCleanedContent) {
223+
deltaContent = cleanedContent[len(lastEmittedCleanedContent):]
224+
lastEmittedCleanedContent = cleanedContent
225+
} else if cleanedContent != lastEmittedCleanedContent {
226+
deltaContent = cleanedContent
227+
lastEmittedCleanedContent = cleanedContent
228+
}
229+
if deltaContent != "" {
230+
if !sentInitialRole {
231+
responses <- schema.OpenAIResponse{
232+
ID: id, Created: created, Model: req.Model,
233+
Choices: []schema.Choice{{Delta: &schema.Message{Role: "assistant"}, Index: 0}},
234+
Object: "chat.completion.chunk",
235+
}
236+
sentInitialRole = true
237+
}
238+
responses <- schema.OpenAIResponse{
239+
ID: id, Created: created, Model: req.Model,
240+
Choices: []schema.Choice{{
241+
Delta: &schema.Message{Content: &deltaContent},
242+
Index: 0,
243+
}},
244+
Object: "chat.completion.chunk",
245+
}
246+
}
247+
}
248+
179249
// Try incremental XML parsing for streaming support using iterative parser
180250
// This allows emitting partial tool calls as they're being generated
181251
cleanedResult := functions.CleanupLLMResult(result, config.FunctionsConfig)
@@ -306,20 +376,6 @@ func ChatEndpoint(cl *config.ModelConfigLoader, ml *model.ModelLoader, evaluator
306376

307377
switch {
308378
case noActionToRun:
309-
initialMessage := schema.OpenAIResponse{
310-
ID: id,
311-
Created: created,
312-
Model: req.Model, // we have to return what the user sent here, due to OpenAI spec.
313-
Choices: []schema.Choice{{Delta: &schema.Message{Role: "assistant"}, Index: 0, FinishReason: nil}},
314-
Object: "chat.completion.chunk",
315-
}
316-
responses <- initialMessage
317-
318-
result, err := handleQuestion(config, functionResults, result, prompt)
319-
if err != nil {
320-
xlog.Error("error handling question", "error", err)
321-
return err
322-
}
323379
usage := schema.OpenAIUsage{
324380
PromptTokens: tokenUsage.Prompt,
325381
CompletionTokens: tokenUsage.Completion,
@@ -330,25 +386,43 @@ func ChatEndpoint(cl *config.ModelConfigLoader, ml *model.ModelLoader, evaluator
330386
usage.TimingPromptProcessing = tokenUsage.TimingPromptProcessing
331387
}
332388

333-
var deltaReasoning *string
334-
if reasoning != "" {
335-
deltaReasoning = &reasoning
336-
}
337-
delta := &schema.Message{Content: &result}
338-
if deltaReasoning != nil {
339-
delta.Reasoning = deltaReasoning
340-
}
389+
if sentInitialRole {
390+
// Content was already streamed during the callback — just emit usage.
391+
delta := &schema.Message{}
392+
if reasoning != "" && lastEmittedReasoning == "" {
393+
delta.Reasoning = &reasoning
394+
}
395+
responses <- schema.OpenAIResponse{
396+
ID: id, Created: created, Model: req.Model,
397+
Choices: []schema.Choice{{Delta: delta, Index: 0}},
398+
Object: "chat.completion.chunk",
399+
Usage: usage,
400+
}
401+
} else {
402+
// Content was NOT streamed — send everything at once (fallback).
403+
responses <- schema.OpenAIResponse{
404+
ID: id, Created: created, Model: req.Model,
405+
Choices: []schema.Choice{{Delta: &schema.Message{Role: "assistant"}, Index: 0}},
406+
Object: "chat.completion.chunk",
407+
}
341408

342-
resp := schema.OpenAIResponse{
343-
ID: id,
344-
Created: created,
345-
Model: req.Model, // we have to return what the user sent here, due to OpenAI spec.
346-
Choices: []schema.Choice{{Delta: delta, Index: 0, FinishReason: nil}},
347-
Object: "chat.completion.chunk",
348-
Usage: usage,
349-
}
409+
result, err := handleQuestion(config, functionResults, result, prompt)
410+
if err != nil {
411+
xlog.Error("error handling question", "error", err)
412+
return err
413+
}
350414

351-
responses <- resp
415+
delta := &schema.Message{Content: &result}
416+
if reasoning != "" {
417+
delta.Reasoning = &reasoning
418+
}
419+
responses <- schema.OpenAIResponse{
420+
ID: id, Created: created, Model: req.Model,
421+
Choices: []schema.Choice{{Delta: delta, Index: 0}},
422+
Object: "chat.completion.chunk",
423+
Usage: usage,
424+
}
425+
}
352426

353427
default:
354428
for i, ss := range functionResults {

core/http/endpoints/openresponses/responses.go

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1737,6 +1737,16 @@ func handleOpenResponsesStream(c echo.Context, responseID string, createdAt int6
17371737

17381738
for mcpStreamIter := 0; mcpStreamIter <= mcpStreamMaxIterations; mcpStreamIter++ {
17391739
if mcpStreamIter > 0 {
1740+
// Reset reasoning and tool-call state for re-inference so reasoning
1741+
// extraction runs again on subsequent iterations
1742+
inToolCallMode = false
1743+
accumulatedContent = ""
1744+
lastEmittedReasoning = ""
1745+
lastEmittedCleanedContent = ""
1746+
currentMessageID = ""
1747+
lastEmittedToolCallCount = 0
1748+
currentReasoningID = ""
1749+
17401750
predInput = evaluator.TemplateMessages(*openAIReq, openAIReq.Messages, cfg, funcs, shouldUseFn)
17411751
xlog.Debug("Open Responses stream MCP re-templating", "iteration", mcpStreamIter)
17421752
images = images[:0]

core/http/react-ui/src/pages/AgentChat.jsx

Lines changed: 81 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -104,6 +104,9 @@ export default function AgentChat() {
104104
const [editingName, setEditingName] = useState(null)
105105
const [editName, setEditName] = useState('')
106106
const [chatSearch, setChatSearch] = useState('')
107+
const [streamContent, setStreamContent] = useState('')
108+
const [streamReasoning, setStreamReasoning] = useState('')
109+
const [streamToolCalls, setStreamToolCalls] = useState([])
107110
const messagesEndRef = useRef(null)
108111
const messagesRef = useRef(null)
109112
const textareaRef = useRef(null)
@@ -150,8 +153,41 @@ export default function AgentChat() {
150153
const data = JSON.parse(e.data)
151154
if (data.status === 'processing') {
152155
setProcessingChatId(activeIdRef.current)
156+
setStreamContent('')
157+
setStreamReasoning('')
158+
setStreamToolCalls([])
153159
} else if (data.status === 'completed') {
154160
setProcessingChatId(null)
161+
setStreamContent('')
162+
setStreamReasoning('')
163+
setStreamToolCalls([])
164+
}
165+
} catch (_err) {
166+
// ignore
167+
}
168+
})
169+
170+
es.addEventListener('stream_event', (e) => {
171+
try {
172+
const data = JSON.parse(e.data)
173+
if (data.type === 'reasoning') {
174+
setStreamReasoning(prev => prev + (data.content || ''))
175+
} else if (data.type === 'content') {
176+
setStreamContent(prev => prev + (data.content || ''))
177+
} else if (data.type === 'tool_call') {
178+
const name = data.tool_name || ''
179+
const args = data.tool_args || ''
180+
setStreamToolCalls(prev => {
181+
if (name) {
182+
return [...prev, { name, args }]
183+
}
184+
if (prev.length === 0) return prev
185+
const updated = [...prev]
186+
updated[updated.length - 1] = { ...updated[updated.length - 1], args: updated[updated.length - 1].args + args }
187+
return updated
188+
})
189+
} else if (data.type === 'done') {
190+
// Content will be finalized by json_message event
155191
}
156192
} catch (_err) {
157193
// ignore
@@ -192,7 +228,7 @@ export default function AgentChat() {
192228
// Auto-scroll to bottom
193229
useEffect(() => {
194230
messagesEndRef.current?.scrollIntoView({ behavior: 'smooth' })
195-
}, [messages])
231+
}, [messages, streamContent, streamReasoning, streamToolCalls])
196232

197233
// Highlight code blocks
198234
useEffect(() => {
@@ -537,7 +573,50 @@ export default function AgentChat() {
537573
flushSystem('end')
538574
return elements
539575
})()}
540-
{processing && (
576+
{processing && (streamReasoning || streamContent || streamToolCalls.length > 0) && (
577+
<div className="chat-message chat-message-assistant">
578+
<div className="chat-message-avatar">
579+
<i className="fas fa-robot" />
580+
</div>
581+
<div className="chat-message-bubble">
582+
{streamReasoning && (
583+
<details className="chat-activity-group" open={!streamContent} style={{ marginBottom: streamContent ? 'var(--spacing-sm)' : 0 }}>
584+
<summary className="chat-activity-toggle" style={{ cursor: 'pointer' }}>
585+
<span className={`chat-activity-summary${!streamContent ? ' chat-activity-shimmer' : ''}`}>
586+
{streamContent ? 'Thinking' : 'Thinking...'}
587+
</span>
588+
</summary>
589+
<div className="chat-activity-details">
590+
<div className="chat-activity-item chat-activity-thinking">
591+
<div className="chat-activity-item-content chat-activity-live"
592+
dangerouslySetInnerHTML={{ __html: renderMarkdown(streamReasoning) }} />
593+
</div>
594+
</div>
595+
</details>
596+
)}
597+
{streamToolCalls.length > 0 && !streamContent && (
598+
<div className="chat-activity-group" style={{ marginBottom: 'var(--spacing-sm)' }}>
599+
{streamToolCalls.map((tc, idx) => (
600+
<div key={idx} className="chat-activity-item chat-activity-tool-call" style={{ padding: 'var(--spacing-xs) var(--spacing-sm)' }}>
601+
<span className="chat-activity-item-label">
602+
<i className="fas fa-bolt" style={{ marginRight: 'var(--spacing-xs)' }} />
603+
{tc.name}
604+
</span>
605+
<span style={{ opacity: 0.5, fontSize: '0.85em', marginLeft: 'var(--spacing-xs)' }}>calling...</span>
606+
</div>
607+
))}
608+
</div>
609+
)}
610+
{streamContent && (
611+
<div className="chat-message-content">
612+
<span dangerouslySetInnerHTML={{ __html: renderMarkdown(streamContent) }} />
613+
<span className="chat-streaming-cursor" />
614+
</div>
615+
)}
616+
</div>
617+
</div>
618+
)}
619+
{processing && !streamReasoning && !streamContent && streamToolCalls.length === 0 && (
541620
<div className="chat-message chat-message-assistant">
542621
<div className="chat-message-avatar" style={{ background: 'var(--color-bg-tertiary)', color: 'var(--color-text-muted)' }}>
543622
<i className="fas fa-cogs" />

core/http/react-ui/src/pages/AgentJobDetails.jsx

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,9 @@ const traceColors = {
88
tool_call: { bg: 'rgba(139,92,246,0.1)', border: 'rgba(139,92,246,0.3)', icon: 'fa-wrench', color: 'var(--color-accent)' },
99
tool_result: { bg: 'rgba(34,197,94,0.1)', border: 'rgba(34,197,94,0.3)', icon: 'fa-check', color: 'var(--color-success)' },
1010
status: { bg: 'rgba(245,158,11,0.1)', border: 'rgba(245,158,11,0.3)', icon: 'fa-info-circle', color: 'var(--color-warning)' },
11+
stream_reasoning: { bg: 'rgba(99,102,241,0.06)', border: 'rgba(99,102,241,0.2)', icon: 'fa-lightbulb', color: 'var(--color-primary)' },
12+
stream_content: { bg: 'rgba(59,130,246,0.08)', border: 'rgba(59,130,246,0.25)', icon: 'fa-pen-nib', color: 'var(--color-info, #3b82f6)' },
13+
stream_tool_call: { bg: 'rgba(139,92,246,0.06)', border: 'rgba(139,92,246,0.2)', icon: 'fa-bolt', color: 'var(--color-accent)' },
1114
}
1215

1316
function TraceCard({ trace, index }) {

core/services/agent_jobs.go

Lines changed: 29 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -887,6 +887,35 @@ func (s *AgentJobService) executeJobInternal(job schema.Job, task schema.Task, c
887887
job.Traces = append(job.Traces, trace)
888888
s.jobs.Set(job.ID, job)
889889
}),
890+
cogito.WithStreamCallback(func(ev cogito.StreamEvent) {
891+
switch ev.Type {
892+
case cogito.StreamEventReasoning:
893+
trace := schema.JobTrace{
894+
Type: "stream_reasoning",
895+
Content: ev.Content,
896+
Timestamp: time.Now(),
897+
}
898+
job.Traces = append(job.Traces, trace)
899+
s.jobs.Set(job.ID, job)
900+
case cogito.StreamEventContent:
901+
trace := schema.JobTrace{
902+
Type: "stream_content",
903+
Content: ev.Content,
904+
Timestamp: time.Now(),
905+
}
906+
job.Traces = append(job.Traces, trace)
907+
s.jobs.Set(job.ID, job)
908+
case cogito.StreamEventToolCall:
909+
trace := schema.JobTrace{
910+
Type: "stream_tool_call",
911+
Content: ev.ToolArgs,
912+
ToolName: ev.ToolName,
913+
Timestamp: time.Now(),
914+
}
915+
job.Traces = append(job.Traces, trace)
916+
s.jobs.Set(job.ID, job)
917+
}
918+
}),
890919
)
891920

892921
// Execute tools

go.mod

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -31,7 +31,7 @@ require (
3131
github.com/mholt/archiver/v3 v3.5.1
3232
github.com/microcosm-cc/bluemonday v1.0.27
3333
github.com/modelcontextprotocol/go-sdk v1.4.0
34-
github.com/mudler/cogito v0.9.5-0.20260313170202-42271c7e1a6b
34+
github.com/mudler/cogito v0.9.5-0.20260315222927-63abdec7189b
3535
github.com/mudler/edgevpn v0.31.1
3636
github.com/mudler/go-processmanager v0.1.0
3737
github.com/mudler/memory v0.0.0-20251216220809-d1256471a6c2
@@ -128,7 +128,7 @@ require (
128128
github.com/kevinburke/ssh_config v1.2.0 // indirect
129129
github.com/labstack/gommon v0.4.2 // indirect
130130
github.com/mschoch/smat v0.2.0 // indirect
131-
github.com/mudler/LocalAGI v0.0.0-20260314222828-e38f13ab8cec
131+
github.com/mudler/LocalAGI v0.0.0-20260315223407-da286065e126
132132
github.com/mudler/localrecall v0.5.9-0.20260314221856-96d63875cc47 // indirect
133133
github.com/mudler/skillserver v0.0.5-0.20260221145827-0639a82c8f49
134134
github.com/olekukonko/tablewriter v0.0.5 // indirect

go.sum

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -654,10 +654,10 @@ github.com/mr-tron/base58 v1.2.0 h1:T/HDJBh4ZCPbU39/+c3rRvE0uKBQlU27+QI8LJ4t64o=
654654
github.com/mr-tron/base58 v1.2.0/go.mod h1:BinMc/sQntlIE1frQmRFPUoPA1Zkr8VRgBdjWI2mNwc=
655655
github.com/mschoch/smat v0.2.0 h1:8imxQsjDm8yFEAVBe7azKmKSgzSkZXDuKkSq9374khM=
656656
github.com/mschoch/smat v0.2.0/go.mod h1:kc9mz7DoBKqDyiRL7VZN8KvXQMWeTaVnttLRXOlotKw=
657-
github.com/mudler/LocalAGI v0.0.0-20260314222828-e38f13ab8cec h1:Y6JYhfJidFktfmQC00SwHtQVh0lr0O52qihgTKddSNU=
658-
github.com/mudler/LocalAGI v0.0.0-20260314222828-e38f13ab8cec/go.mod h1:yf+IlZzQCGgKPGFn5yclzA2Dxxhy75K3YDubkjCub04=
659-
github.com/mudler/cogito v0.9.5-0.20260313170202-42271c7e1a6b h1:Hs2Byjnukgkwm5Vw7z5aSZEjznPHaxjr2vLc5Uu1fHQ=
660-
github.com/mudler/cogito v0.9.5-0.20260313170202-42271c7e1a6b/go.mod h1:6sfja3lcu2nWRzEc0wwqGNu/eCG3EWgij+8s7xyUeQ4=
657+
github.com/mudler/LocalAGI v0.0.0-20260315223407-da286065e126 h1:7owcRhvwMP5BDDPsoK8TLnOBY4khcGJXutMY7pe9lhc=
658+
github.com/mudler/LocalAGI v0.0.0-20260315223407-da286065e126/go.mod h1:w8kG2r/TlADJ4SnYemPNirW1pdHsqM/RAdCPk9r5Ll0=
659+
github.com/mudler/cogito v0.9.5-0.20260315222927-63abdec7189b h1:A74T2Lauvg61KodYqsjTYDY05kPLcW+efVZjd23dghU=
660+
github.com/mudler/cogito v0.9.5-0.20260315222927-63abdec7189b/go.mod h1:6sfja3lcu2nWRzEc0wwqGNu/eCG3EWgij+8s7xyUeQ4=
661661
github.com/mudler/edgevpn v0.31.1 h1:7qegiDWd0kAg6ljhNHxqvp8hbo/6BbzSdbb7/2WZfiY=
662662
github.com/mudler/edgevpn v0.31.1/go.mod h1:ftV5B0nKFzm4R8vR80UYnCb2nf7lxCRgAALxUEEgCf8=
663663
github.com/mudler/go-piper v0.0.0-20241023091659-2494246fd9fc h1:RxwneJl1VgvikiX28EkpdAyL4yQVnJMrbquKospjHyA=

0 commit comments

Comments
 (0)