Skip to content

Commit d9c8dff

Browse files
committed
fix(deer-flow): address code review blockers
Blockers: - task_tool.py: initialize `result = None` before try, so capture-on + wrapped-raise no longer raises `UnboundLocalError` swallowing the original exception. - sandbox.py: `_ProviderAcquireAsyncWrapper` now awaits `wrapped` directly and manages the TASK span lifecycle inline, instead of routing through `asyncio.to_thread` (which returned a coroutine object). Non-blockers: - Attach all three manual TASK spans (task_tool / sandbox provider lifecycle / memory) to the OTel Context via `set_span_in_context` so the langchain TOOL span emitted inside `wrapped` becomes a child of the TASK span (execute.md §3.4.3). - `memory.py::_MemoryLoadSaveWrapper`: differentiate `load` vs `save` positional args — `save(memory_data, agent_name, *, user_id)` puts `agent_name` at `args[1]`, not `args[0]`. - entry.py: drop dead `_call_arg(args, kwargs, 5, "graph_input")` fallback — `graph_input` is kw-only. - README: document subagent AGENT span usage-metrics limitation and `graph_input` kw-only contract. Tests: three regression tests covering Blockers 1/2 and Non-Blocker 3.
1 parent 6fff239 commit d9c8dff

6 files changed

Lines changed: 297 additions & 11 deletions

File tree

instrumentation-loongsuite/loongsuite-instrumentation-deer-flow/README.md

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -69,3 +69,13 @@ opentelemetry-instrument --instrumentation_modules deer_flow
6969
span (langchain instrumentation, child). This is intentional — the TASK
7070
span describes "dispatch to subagent" semantics while the TOOL span covers
7171
the LangChain `@tool` invocation mechanics.
72+
* **Subagent AGENT span does not carry usage metrics.** DeerFlow constructs
73+
`SubagentTokenCollector` locally inside `SubagentExecutor._aexecute`, so
74+
the wrapper cannot reach it via an instance attribute. Token usage is still
75+
emitted on the langchain LLM child span via `loongsuite-instrumentation-langchain`,
76+
but `gen_ai_llm_usage_tokens` will not be set on the AGENT (subagent) span.
77+
This is a known limitation of the v1 patch surface; a follow-up could
78+
expose the collector on the instance.
79+
* **`run_agent` `graph_input` is keyword-only.** The ENTRY span builder reads
80+
it from `kwargs` only; the previous positional fallback was dead code and
81+
has been removed.

instrumentation-loongsuite/loongsuite-instrumentation-deer-flow/src/opentelemetry/instrumentation/deer_flow/patches/entry.py

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -94,7 +94,10 @@ async def __call__(
9494
return await wrapped(*args, **kwargs)
9595

9696
record = _call_arg(args, kwargs, 2, "record")
97-
graph_input = kwargs.get("graph_input") or _call_arg(args, kwargs, 5, "graph_input")
97+
# ``graph_input`` is kw-only in ``run_agent``; the positional fallback
98+
# was dead code (the positional index would never contain it). Reading
99+
# it from kwargs only.
100+
graph_input = kwargs.get("graph_input")
98101
invocation = _safe_call(
99102
"build_entry_invocation", _build_entry_invocation, record, graph_input
100103
)

instrumentation-loongsuite/loongsuite-instrumentation-deer-flow/src/opentelemetry/instrumentation/deer_flow/patches/memory.py

Lines changed: 20 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -41,7 +41,8 @@
4141
_should_capture_content,
4242
_should_capture_memory_content,
4343
)
44-
from opentelemetry.trace import SpanKind, Status, StatusCode, get_tracer
44+
from opentelemetry import context as otel_context
45+
from opentelemetry.trace import SpanKind, Status, StatusCode, get_tracer, set_span_in_context
4546
from opentelemetry.util.genai.extended_handler import ExtendedTelemetryHandler
4647
from opentelemetry.util.genai.extended_semconv.gen_ai_extended_attributes import (
4748
GEN_AI_SPAN_KIND,
@@ -92,16 +93,27 @@ def __call__(
9293
span.set_attribute(DEER_FLOW_COMPONENT, "memory")
9394
span.set_attribute(DEER_FLOW_TASK_NAME, task_name)
9495

96+
# ``FileMemoryStorage.load`` signature: ``load(agent_name=None, *,
97+
# user_id=None)`` — ``args[0]`` is ``agent_name``.
98+
# ``FileMemoryStorage.save`` signature: ``save(memory_data,
99+
# agent_name=None, *, user_id=None)`` — ``args[0]`` is ``memory_data``,
100+
# ``args[1]`` is ``agent_name``. Using ``args[0]`` for both would write
101+
# ``memory_data`` repr into ``gen_ai.agent.name``.
95102
agent_name = kwargs.get("agent_name") if kwargs else None
96103
if agent_name is None and args:
97-
agent_name = args[0]
104+
if method_name == "save":
105+
agent_name = args[1] if len(args) > 1 else None
106+
else:
107+
agent_name = args[0]
98108
if agent_name is not None:
99109
span.set_attribute("gen_ai.agent.name", str(agent_name))
100110

101111
user_id = kwargs.get("user_id") if kwargs else None
102112
if user_id is not None:
103113
span.set_attribute("gen_ai.user.id", str(user_id))
104114

115+
token = otel_context.attach(set_span_in_context(span))
116+
105117
if _capture_memory():
106118
_safe_call(
107119
"set_input_value",
@@ -112,6 +124,7 @@ def __call__(
112124
),
113125
)
114126

127+
result: Any = None
115128
try:
116129
result = wrapped(*args, **kwargs)
117130
except Exception as exc:
@@ -126,6 +139,7 @@ def __call__(
126139
"output.value", _serialize(result) or ""
127140
),
128141
)
142+
otel_context.detach(token)
129143
span.end()
130144
return result
131145

@@ -163,6 +177,8 @@ async def __call__(
163177
if agent_name is not None:
164178
span.set_attribute("gen_ai.agent.name", str(agent_name))
165179

180+
token = otel_context.attach(set_span_in_context(span))
181+
166182
if _capture_memory():
167183
_safe_call(
168184
"set_input_value",
@@ -173,6 +189,7 @@ async def __call__(
173189
),
174190
)
175191

192+
result: Any = None
176193
try:
177194
result = await wrapped(*args, **kwargs)
178195
except Exception as exc:
@@ -187,6 +204,7 @@ async def __call__(
187204
"output.value", _serialize(result) or ""
188205
),
189206
)
207+
otel_context.detach(token)
190208
span.end()
191209
return result
192210

instrumentation-loongsuite/loongsuite-instrumentation-deer-flow/src/opentelemetry/instrumentation/deer_flow/patches/sandbox.py

Lines changed: 33 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -36,7 +36,8 @@
3636
_safe_call,
3737
_should_capture_content,
3838
)
39-
from opentelemetry.trace import SpanKind, Status, StatusCode, get_tracer
39+
from opentelemetry import context as otel_context
40+
from opentelemetry.trace import SpanKind, Status, StatusCode, get_tracer, set_span_in_context
4041
from opentelemetry.util.genai.extended_handler import ExtendedTelemetryHandler
4142
from opentelemetry.util.genai.extended_semconv.gen_ai_extended_attributes import (
4243
GEN_AI_SPAN_KIND,
@@ -176,7 +177,9 @@ class _ProviderLifecycleWrapper:
176177
def __init__(self, tracer: Any):
177178
self._tracer = tracer
178179

179-
def _run(self, wrapped: Any, instance: Any, args: Any, kwargs: Any, *, method_name: str) -> Any:
180+
def _start_span(
181+
self, wrapped: Any, args: Any, kwargs: Any, *, method_name: str
182+
) -> tuple[Any, Any]:
180183
task_name = f"sandbox.{method_name}"
181184
span_name = f"run_task {task_name}"
182185
span = self._tracer.start_span(name=span_name, kind=SpanKind.INTERNAL)
@@ -190,13 +193,24 @@ def _run(self, wrapped: Any, instance: Any, args: Any, kwargs: Any, *, method_na
190193
thread_id = args[0]
191194
if thread_id is not None:
192195
span.set_attribute("gen_ai.session.id", str(thread_id))
196+
# Attach so downstream spans (e.g. Sandbox TOOL spans created inside
197+
# ``acquire``) become children of this lifecycle TASK span.
198+
token = otel_context.attach(set_span_in_context(span))
199+
return span, token
200+
201+
def _run(self, wrapped: Any, instance: Any, args: Any, kwargs: Any, *, method_name: str) -> Any:
202+
span, token = self._start_span(
203+
wrapped, args, kwargs, method_name=method_name
204+
)
205+
result: Any = None
193206
try:
194207
result = wrapped(*args, **kwargs)
195208
except Exception as exc:
196209
span.record_exception(exc)
197210
span.set_status(Status(StatusCode.ERROR, str(exc)))
198211
raise
199212
finally:
213+
otel_context.detach(token)
200214
span.end()
201215
return result
202216

@@ -210,12 +224,24 @@ async def __call__(
210224
self, wrapped: Any, instance: Any, args: Any, kwargs: Any
211225
) -> Any:
212226
method_name = getattr(wrapped, "__name__", "acquire_async")
213-
# Run sync path in a thread so the span covers the actual work.
214-
import asyncio
215-
216-
return await asyncio.to_thread(
217-
self._run, wrapped, instance, args, kwargs, method_name=method_name
227+
# ``wrapped`` is a coroutine function; awaiting it directly (rather
228+
# than routing through ``asyncio.to_thread``) preserves the caller's
229+
# event loop and returns the awaited ``str`` sandbox id instead of a
230+
# coroutine object.
231+
span, token = self._start_span(
232+
wrapped, args, kwargs, method_name=method_name
218233
)
234+
result: Any = None
235+
try:
236+
result = await wrapped(*args, **kwargs)
237+
except Exception as exc:
238+
span.record_exception(exc)
239+
span.set_status(Status(StatusCode.ERROR, str(exc)))
240+
raise
241+
finally:
242+
otel_context.detach(token)
243+
span.end()
244+
return result
219245

220246

221247
def instrument(handler: ExtendedTelemetryHandler) -> list[tuple[str, str]]:

instrumentation-loongsuite/loongsuite-instrumentation-deer-flow/src/opentelemetry/instrumentation/deer_flow/patches/task_tool.py

Lines changed: 9 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -36,7 +36,8 @@
3636
_safe_call,
3737
_should_capture_content,
3838
)
39-
from opentelemetry.trace import SpanKind, Status, StatusCode, get_tracer
39+
from opentelemetry import context as otel_context
40+
from opentelemetry.trace import SpanKind, Status, StatusCode, get_tracer, set_span_in_context
4041
from opentelemetry.util.genai.extended_handler import ExtendedTelemetryHandler
4142
from opentelemetry.util.genai.extended_semconv.gen_ai_extended_attributes import (
4243
GEN_AI_SPAN_KIND,
@@ -92,6 +93,11 @@ async def __call__(
9293
span.set_attribute(DEER_FLOW_TASK_NAME, task_name)
9394
span.set_attribute(GenAI.GEN_AI_AGENT_NAME, f"subagent:{subagent_type}")
9495

96+
# Attach the TASK span to the OTel Context so the langchain TOOL span
97+
# emitted inside ``wrapped`` becomes a child of this TASK span (see
98+
# execute.md §3.4.3).
99+
token = otel_context.attach(set_span_in_context(span))
100+
95101
if _should_capture_content():
96102
try:
97103
span.set_attribute(
@@ -109,6 +115,7 @@ async def __call__(
109115
except Exception:
110116
pass
111117

118+
result: Any = None
112119
try:
113120
result = await wrapped(*args, **kwargs)
114121
except Exception as exc:
@@ -122,6 +129,7 @@ async def __call__(
122129
lambda r: span.set_attribute("output.value", str(r)),
123130
result,
124131
)
132+
otel_context.detach(token)
125133
span.end()
126134
return result
127135

0 commit comments

Comments
 (0)