Skip to content

Commit a304e11

Browse files
committed
fix(qwen-agent): improve token and nested agent tracing
1 parent de65b32 commit a304e11

4 files changed

Lines changed: 587 additions & 8 deletions

File tree

instrumentation-loongsuite/loongsuite-instrumentation-qwen-agent/CHANGELOG.md

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,13 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
77

88
## Unreleased
99

10+
### Fixed
11+
12+
- Record token usage from Qwen-Agent DashScope response metadata on streaming
13+
and non-streaming chat spans.
14+
- Roll up child LLM token usage to Qwen-Agent invoke-agent spans, preserve
15+
nested agent spans, and record only the final agent answer as output.
16+
1017
## Version 0.6.0 (2026-06-03)
1118

1219
There are no changelog entries for this release.

instrumentation-loongsuite/loongsuite-instrumentation-qwen-agent/src/opentelemetry/instrumentation/qwen_agent/patch.py

Lines changed: 58 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -37,6 +37,8 @@
3737
from opentelemetry.util.genai.types import Error
3838

3939
from .utils import (
40+
_apply_usage_to_llm_invocation,
41+
_convert_qwen_agent_final_output_messages,
4042
_convert_qwen_messages_to_output_messages,
4143
_create_agent_invocation,
4244
_create_llm_invocation,
@@ -56,11 +58,14 @@
5658
_react_step_counter: ContextVar[int] = ContextVar(
5759
"qwen_react_step_counter", default=0
5860
)
61+
_active_agent_invocations: ContextVar[tuple[Any, ...]] = ContextVar(
62+
"qwen_active_agent_invocations", default=()
63+
)
5964

6065
# Reentrancy guards to prevent duplicate spans when Agent/BaseChatModel
6166
# are abstract classes and subclass calls super() (Proxy/Wrapper scenarios).
62-
_in_agent_run: ContextVar[bool] = ContextVar(
63-
"_qwen_in_agent_run", default=False
67+
_agent_run_instance_stack: ContextVar[tuple[int, ...]] = ContextVar(
68+
"_qwen_agent_run_instance_stack", default=()
6469
)
6570
_in_chat: ContextVar[bool] = ContextVar("_qwen_in_chat", default=False)
6671
_in_call_tool: ContextVar[bool] = ContextVar(
@@ -79,6 +84,42 @@ def _close_active_react_step(handler: ExtendedTelemetryHandler) -> None:
7984
_react_step_invocation.set(None)
8085

8186

87+
def _accumulate_llm_usage_on_active_agents(invocation: Any) -> None:
88+
"""Roll up child LLM token usage onto active invoke_agent spans.
89+
90+
The rollup is intentionally transitive: a parent agent records the total
91+
nested LLM cost of its run, so consumers should not sum agent spans to
92+
calculate global token usage.
93+
"""
94+
active_agents = _active_agent_invocations.get()
95+
if not active_agents:
96+
return
97+
98+
for active_agent in active_agents:
99+
if getattr(invocation, "input_tokens", None) is not None:
100+
active_agent.input_tokens = (active_agent.input_tokens or 0) + (
101+
invocation.input_tokens or 0
102+
)
103+
if getattr(invocation, "output_tokens", None) is not None:
104+
active_agent.output_tokens = (active_agent.output_tokens or 0) + (
105+
invocation.output_tokens or 0
106+
)
107+
if (
108+
getattr(invocation, "usage_cache_read_input_tokens", None)
109+
is not None
110+
):
111+
active_agent.usage_cache_read_input_tokens = (
112+
active_agent.usage_cache_read_input_tokens or 0
113+
) + (invocation.usage_cache_read_input_tokens or 0)
114+
if (
115+
getattr(invocation, "usage_cache_creation_input_tokens", None)
116+
is not None
117+
):
118+
active_agent.usage_cache_creation_input_tokens = (
119+
active_agent.usage_cache_creation_input_tokens or 0
120+
) + (invocation.usage_cache_creation_input_tokens or 0)
121+
122+
82123
def wrap_agent_run(
83124
wrapped, instance, args, kwargs, handler: ExtendedTelemetryHandler
84125
):
@@ -93,18 +134,20 @@ def wrap_agent_run(
93134
"""
94135
# Reentrancy guard: prevent duplicate spans in Proxy/Wrapper scenarios
95136
# where a subclass calls super().run().
96-
if _in_agent_run.get():
137+
run_stack = _agent_run_instance_stack.get()
138+
instance_id = id(instance)
139+
if instance_id in run_stack:
97140
yield from wrapped(*args, **kwargs)
98141
return
99-
run_token = _in_agent_run.set(True)
142+
run_token = _agent_run_instance_stack.set(run_stack + (instance_id,))
100143

101144
messages = args[0] if args else kwargs.get("messages", [])
102145

103146
try:
104147
invocation = _create_agent_invocation(instance, messages)
105148
except Exception as e:
106149
logger.debug(f"Failed to create agent invocation: {e}")
107-
_in_agent_run.reset(run_token)
150+
_agent_run_instance_stack.reset(run_token)
108151
yield from wrapped(*args, **kwargs)
109152
return
110153

@@ -113,6 +156,9 @@ def wrap_agent_run(
113156
mode_token = _react_mode.set(is_react)
114157
counter_token = _react_step_counter.set(0)
115158
step_token = _react_step_invocation.set(None)
159+
active_agent_token = _active_agent_invocations.set(
160+
_active_agent_invocations.get() + (invocation,)
161+
)
116162

117163
handler.start_invoke_agent(invocation)
118164

@@ -125,7 +171,7 @@ def wrap_agent_run(
125171
# Extract output from last yielded response
126172
if last_response:
127173
invocation.output_messages = (
128-
_convert_qwen_messages_to_output_messages(last_response)
174+
_convert_qwen_agent_final_output_messages(last_response)
129175
)
130176

131177
# Close the last react_step span before closing invoke_agent.
@@ -153,7 +199,8 @@ def wrap_agent_run(
153199
_react_step_counter.reset(counter_token)
154200
_react_step_invocation.reset(step_token)
155201
_react_mode.reset(mode_token)
156-
_in_agent_run.reset(run_token)
202+
_active_agent_invocations.reset(active_agent_token)
203+
_agent_run_instance_stack.reset(run_token)
157204

158205

159206
def wrap_chat_model_chat(
@@ -206,6 +253,7 @@ def wrap_chat_model_chat(
206253
else:
207254
# Non-streaming: result is List[Message]
208255
if result:
256+
_apply_usage_to_llm_invocation(invocation, result)
209257
invocation.output_messages = (
210258
_convert_qwen_messages_to_output_messages(result)
211259
)
@@ -225,6 +273,7 @@ def wrap_chat_model_chat(
225273
invocation.finish_reasons = ["tool_calls"]
226274
break
227275

276+
_accumulate_llm_usage_on_active_agents(invocation)
228277
handler.stop_llm(invocation)
229278
return result
230279

@@ -246,6 +295,7 @@ def _wrap_streaming_llm_response(
246295
if first_token:
247296
invocation.monotonic_first_token_s = timeit.default_timer()
248297
first_token = False
298+
_apply_usage_to_llm_invocation(invocation, response)
249299
last_response = response
250300
yield response
251301

@@ -269,6 +319,7 @@ def _wrap_streaming_llm_response(
269319
invocation.finish_reasons = ["tool_calls"]
270320
break
271321

322+
_accumulate_llm_usage_on_active_agents(invocation)
272323
handler.stop_llm(invocation)
273324

274325
except GeneratorExit as e:

instrumentation-loongsuite/loongsuite-instrumentation-qwen-agent/src/opentelemetry/instrumentation/qwen_agent/utils.py

Lines changed: 218 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -102,6 +102,183 @@ def _extract_content_text(content: Any) -> str:
102102
return str(content) if content else ""
103103

104104

105+
def _field_value(value: Any, *names: str) -> Any:
106+
"""Read the first present field from a mapping or SDK response object."""
107+
if value is None:
108+
return None
109+
110+
for name in names:
111+
if isinstance(value, dict):
112+
if name in value:
113+
return value[name]
114+
continue
115+
116+
try:
117+
attr_value = getattr(value, name)
118+
except Exception:
119+
attr_value = None
120+
if attr_value is not None:
121+
return attr_value
122+
123+
get_method = getattr(value, "get", None)
124+
if callable(get_method):
125+
try:
126+
got_value = get_method(name)
127+
except Exception:
128+
got_value = None
129+
if got_value is not None:
130+
return got_value
131+
132+
return None
133+
134+
135+
def _int_value(value: Any) -> Optional[int]:
136+
if value is None:
137+
return None
138+
try:
139+
return int(value)
140+
except (TypeError, ValueError):
141+
return None
142+
143+
144+
def _usage_token_values(usage: Any) -> Dict[str, int]:
145+
if usage is None:
146+
return {}
147+
148+
input_tokens = _int_value(
149+
_field_value(usage, "input_tokens", "prompt_tokens")
150+
)
151+
output_tokens = _int_value(
152+
_field_value(usage, "output_tokens", "completion_tokens")
153+
)
154+
cache_read_tokens = _int_value(
155+
_field_value(usage, "cache_read_input_tokens", "cached_prompt_tokens")
156+
)
157+
cache_creation_tokens = _int_value(
158+
_field_value(usage, "cache_creation_input_tokens")
159+
)
160+
161+
for detail_name in ("prompt_tokens_details", "input_tokens_details"):
162+
details = _field_value(usage, detail_name)
163+
if details is not None and cache_read_tokens is None:
164+
cache_read_tokens = _int_value(
165+
_field_value(details, "cached_tokens")
166+
)
167+
168+
values: Dict[str, int] = {}
169+
if input_tokens is not None:
170+
values["input_tokens"] = input_tokens
171+
if output_tokens is not None:
172+
values["output_tokens"] = output_tokens
173+
if cache_read_tokens is not None and cache_read_tokens > 0:
174+
values["cache_read_input_tokens"] = cache_read_tokens
175+
if cache_creation_tokens is not None and cache_creation_tokens > 0:
176+
values["cache_creation_input_tokens"] = cache_creation_tokens
177+
178+
return values
179+
180+
181+
def _usage_score(usage_values: Dict[str, int]) -> int:
182+
return (usage_values.get("input_tokens") or 0) + (
183+
usage_values.get("output_tokens") or 0
184+
)
185+
186+
187+
def _usage_sources(value: Any) -> List[Any]:
188+
sources = []
189+
usage = _field_value(value, "usage")
190+
if usage is not None:
191+
sources.append(usage)
192+
193+
extra = _field_value(value, "extra")
194+
if extra is not None:
195+
extra_usage = _field_value(extra, "usage", "usage_metadata")
196+
if extra_usage is not None:
197+
sources.append(extra_usage)
198+
199+
service_info = _field_value(extra, "model_service_info")
200+
if service_info is not None:
201+
sources.append(service_info)
202+
203+
service_info = _field_value(value, "model_service_info")
204+
if service_info is not None:
205+
sources.append(service_info)
206+
207+
return sources
208+
209+
210+
def _extract_usage_values(value: Any, depth: int = 0) -> Dict[str, int]:
211+
"""Extract token usage from qwen-agent Message/extra/model_service_info."""
212+
if value is None or depth > 4:
213+
return {}
214+
215+
best_values: Dict[str, int] = {}
216+
values = _usage_token_values(value)
217+
if values:
218+
best_values = values
219+
220+
if isinstance(value, (list, tuple)):
221+
for item in reversed(value):
222+
item_values = _extract_usage_values(item, depth + 1)
223+
if _usage_score(item_values) > _usage_score(best_values):
224+
best_values = item_values
225+
return best_values
226+
227+
for source in _usage_sources(value):
228+
source_values = _extract_usage_values(source, depth + 1)
229+
if _usage_score(source_values) > _usage_score(best_values):
230+
best_values = source_values
231+
232+
return best_values
233+
234+
235+
def _apply_usage_to_llm_invocation(
236+
invocation: LLMInvocation, value: Any
237+
) -> None:
238+
"""Apply qwen-agent token usage metadata to an LLMInvocation.
239+
240+
Qwen-Agent stores DashScope responses under Message.extra["model_service_info"]
241+
for both streaming and non-streaming calls. Streaming chunks can carry
242+
cumulative usage, so only replace existing values when the candidate usage
243+
has at least as many observed tokens as the current invocation.
244+
"""
245+
usage_values = _extract_usage_values(value)
246+
if not usage_values:
247+
return
248+
249+
current_score = (invocation.input_tokens or 0) + (
250+
invocation.output_tokens or 0
251+
)
252+
if current_score and _usage_score(usage_values) < current_score:
253+
return
254+
255+
if "input_tokens" in usage_values:
256+
invocation.input_tokens = usage_values["input_tokens"]
257+
if "output_tokens" in usage_values:
258+
invocation.output_tokens = usage_values["output_tokens"]
259+
if "cache_read_input_tokens" in usage_values:
260+
invocation.usage_cache_read_input_tokens = usage_values[
261+
"cache_read_input_tokens"
262+
]
263+
if "cache_creation_input_tokens" in usage_values:
264+
invocation.usage_cache_creation_input_tokens = usage_values[
265+
"cache_creation_input_tokens"
266+
]
267+
268+
269+
def apply_token_usage_from_qwen_messages(
270+
invocation: LLMInvocation,
271+
messages: Any,
272+
) -> None:
273+
"""Populate token usage from qwen-agent Message metadata.
274+
275+
Kept as a compatibility entrypoint for callers that used the previous
276+
helper name; the instrumentation wrapper now calls the generic extractor
277+
directly so it can process individual streaming chunks.
278+
"""
279+
_apply_usage_to_llm_invocation(invocation, messages)
280+
281+
105282
def _convert_qwen_messages_to_input_messages(
106283
messages: Any,
107284
) -> List[InputMessage]:
@@ -291,6 +468,47 @@ def _convert_qwen_messages_to_output_messages(
291468
return output_messages
292469

293470

471+
def _convert_qwen_agent_final_output_messages(
472+
messages: Any,
473+
) -> List[OutputMessage]:
474+
"""Convert only the final qwen-agent answer to GenAI OutputMessage format."""
475+
if not messages:
476+
return []
477+
478+
if not isinstance(messages, list):
479+
messages = [messages]
480+
481+
for msg in reversed(messages):
482+
try:
483+
role = (
484+
msg.role
485+
if hasattr(msg, "role")
486+
else msg.get("role", "assistant")
487+
)
488+
function_call = (
489+
msg.function_call
490+
if hasattr(msg, "function_call")
491+
else msg.get("function_call")
492+
)
493+
content = (
494+
msg.content
495+
if hasattr(msg, "content")
496+
else msg.get("content", "")
497+
)
498+
499+
if role in ("function", "tool") or function_call:
500+
continue
501+
502+
text = _extract_content_text(content)
503+
if text:
504+
return _convert_qwen_messages_to_output_messages([msg])
505+
except Exception as e:
506+
logger.debug(f"Error extracting final agent output message: {e}")
507+
continue
508+
509+
return _convert_qwen_messages_to_output_messages(messages)
510+
511+
294512
def _get_tool_definitions(
295513
functions: Optional[List[Dict]],
296514
) -> Optional[List[FunctionToolDefinition]]:

0 commit comments

Comments
 (0)