Skip to content

Commit a398da7

Browse files
feat: implement thought emission functions for enhanced feedback
- Added thought emission functions to enable custom thought display throughout function/tool execution - Integrated SPAN event handling in `StepAdaptor` to process custom thoughts. Signed-off-by: Patrick Chin <8509935+thepatrickchin@users.noreply.github.com>
1 parent 7c8884d commit a398da7

File tree

4 files changed

+469
-5
lines changed

4 files changed

+469
-5
lines changed
Lines changed: 124 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,124 @@
1+
# SPDX-FileCopyrightText: Copyright (c) 2025-2026, NVIDIA CORPORATION & AFFILIATES. All rights reserved.
2+
# SPDX-License-Identifier: Apache-2.0
3+
#
4+
# Licensed under the Apache License, Version 2.0 (the "License");
5+
# you may not use this file except in compliance with the License.
6+
# You may obtain a copy of the License at
7+
#
8+
# http://www.apache.org/licenses/LICENSE-2.0
9+
#
10+
# Unless required by applicable law or agreed to in writing, software
11+
# distributed under the License is distributed on an "AS IS" BASIS,
12+
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
# See the License for the specific language governing permissions and
14+
# limitations under the License.
15+
16+
import logging
17+
import typing
18+
import uuid
19+
20+
from nat.data_models.intermediate_step import IntermediateStepPayload
21+
from nat.data_models.intermediate_step import IntermediateStepType
22+
from nat.data_models.intermediate_step import StreamEventData
23+
24+
if typing.TYPE_CHECKING:
25+
from nat.builder.context import Context
26+
27+
logger = logging.getLogger(__name__)
28+
29+
30+
def emit_thought(context: "Context", thought_text: str, name: str | None = None) -> str:
31+
"""Emit a complete custom thought that appears in the UI's thought process display.
32+
33+
This is useful for showing discrete progress steps or status updates during
34+
function/tool execution.
35+
36+
Args:
37+
context: The NAT context object (obtain via Context.get())
38+
thought_text: The thought text to display in the UI
39+
name: Optional name for the thought (defaults to "custom_thought")
40+
41+
Returns:
42+
The UUID of the emitted thought
43+
"""
44+
thought_uuid = str(uuid.uuid4())
45+
thought_name = name or "custom_thought"
46+
47+
# Emit START event
48+
context.intermediate_step_manager.push_intermediate_step(
49+
IntermediateStepPayload(UUID=thought_uuid,
50+
event_type=IntermediateStepType.SPAN_START,
51+
name=thought_name,
52+
data=StreamEventData(input=None),
53+
metadata={"thought_text": thought_text}))
54+
55+
# Immediately emit END event (complete thought)
56+
context.intermediate_step_manager.push_intermediate_step(
57+
IntermediateStepPayload(UUID=thought_uuid,
58+
event_type=IntermediateStepType.SPAN_END,
59+
name=thought_name,
60+
data=StreamEventData(output=None),
61+
metadata={"thought_text": thought_text}))
62+
63+
return thought_uuid
64+
65+
66+
def emit_thought_start(context: "Context", thought_text: str, name: str | None = None) -> str:
67+
"""Start emitting a streaming thought that can be updated with chunks.
68+
69+
Use this for long-running operations where you want to show progressive updates.
70+
Follow up with emit_thought_chunk() for updates and emit_thought_end() to complete.
71+
72+
Args:
73+
context: The NAT context object (obtain via Context.get())
74+
thought_text: The initial thought text to display
75+
name: Optional name for the thought (defaults to "custom_thought")
76+
77+
Returns:
78+
The UUID of the started thought (use this for chunks and end)
79+
"""
80+
thought_uuid = str(uuid.uuid4())
81+
thought_name = name or "custom_thought"
82+
83+
context.intermediate_step_manager.push_intermediate_step(
84+
IntermediateStepPayload(UUID=thought_uuid,
85+
event_type=IntermediateStepType.SPAN_START,
86+
name=thought_name,
87+
data=StreamEventData(input=None),
88+
metadata={"thought_text": thought_text}))
89+
90+
return thought_uuid
91+
92+
93+
def emit_thought_chunk(context: "Context", thought_uuid: str, thought_text: str) -> None:
94+
"""Emit an update to a streaming thought started with emit_thought_start().
95+
96+
This updates the thought text in the UI, useful for showing progress updates.
97+
98+
Args:
99+
context: The NAT context object (obtain via Context.get())
100+
thought_uuid: The UUID returned from emit_thought_start()
101+
thought_text: The updated thought text to display
102+
"""
103+
context.intermediate_step_manager.push_intermediate_step(
104+
IntermediateStepPayload(UUID=thought_uuid,
105+
event_type=IntermediateStepType.SPAN_CHUNK,
106+
name="custom_thought",
107+
data=StreamEventData(chunk=thought_text),
108+
metadata={"thought_text": thought_text}))
109+
110+
111+
def emit_thought_end(context: "Context", thought_uuid: str, thought_text: str | None = None) -> None:
112+
"""Complete a streaming thought started with emit_thought_start().
113+
114+
Args:
115+
context: The NAT context object (obtain via Context.get())
116+
thought_uuid: The UUID returned from emit_thought_start()
117+
thought_text: Optional final thought text (if None, keeps the last chunk text)
118+
"""
119+
context.intermediate_step_manager.push_intermediate_step(
120+
IntermediateStepPayload(UUID=thought_uuid,
121+
event_type=IntermediateStepType.SPAN_END,
122+
name="custom_thought",
123+
data=StreamEventData(output=None),
124+
metadata={"thought_text": thought_text} if thought_text else {}))

packages/nvidia_nat_core/src/nat/front_ends/fastapi/step_adaptor.py

Lines changed: 25 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -57,13 +57,15 @@ def _step_matches_filter(self, step: IntermediateStep, config: StepAdaptorConfig
5757
return False
5858

5959
if config.mode == StepAdaptorMode.DEFAULT:
60-
# default existing behavior: show LLM events + TOOL_END + FUNCTION events
60+
# default existing behavior: show LLM events + TOOL_END + FUNCTION events + SPAN events
6161
if step.event_category == IntermediateStepCategory.LLM:
6262
return True
6363
if step.event_category == IntermediateStepCategory.TOOL:
6464
return True
6565
if step.event_category == IntermediateStepCategory.FUNCTION:
6666
return True
67+
if step.event_category == IntermediateStepCategory.SPAN:
68+
return True
6769
return False
6870

6971
if config.mode == StepAdaptorMode.CUSTOM:
@@ -322,6 +324,25 @@ def _handle_function(self, step: IntermediateStepPayload, ancestry: InvocationNo
322324

323325
return None
324326

327+
def _handle_span(self, step: IntermediateStepPayload, ancestry: InvocationNode) -> ResponseSerializable | None:
328+
"""
329+
Handles SPAN events (SPAN_START, SPAN_CHUNK, SPAN_END) for custom thoughts.
330+
"""
331+
# Check if this is a custom thought with metadata
332+
if not step.metadata or not isinstance(step.metadata, dict):
333+
return None
334+
335+
thought_text = step.metadata.get("thought_text")
336+
if not thought_text:
337+
return None
338+
339+
# All SPAN event types (START, CHUNK, END) share the same response structure
340+
return ResponseIntermediateStep(id=step.UUID,
341+
name=step.name or "Span",
342+
payload="",
343+
parent_id=ancestry.function_id,
344+
thought_text=thought_text)
345+
325346
def _handle_custom(self, payload: IntermediateStepPayload, ancestry: InvocationNode) -> ResponseSerializable | None:
326347
"""
327348
Handles the CUSTOM event
@@ -368,6 +389,9 @@ def process(self, step: IntermediateStep) -> ResponseSerializable | None:
368389
if step.event_category == IntermediateStepCategory.FUNCTION:
369390
return self._handle_function(payload, ancestry)
370391

392+
if step.event_category == IntermediateStepCategory.SPAN:
393+
return self._handle_span(payload, ancestry)
394+
371395
if step.event_category == IntermediateStepCategory.CUSTOM:
372396
return self._handle_custom(payload, ancestry)
373397

0 commit comments

Comments
 (0)