Skip to content

Commit b79852d

Browse files
committed
fix(bfclv4): flatten multi-turn input/output and surface captured errors
The AGENT/ENTRY spans previously JSON-stringified BFCL's nested ``[[{...}],[{...}]]`` question/result structure into a single message content, producing the surprising "content has a serialised array inside it" pattern. Now flattens the structure one level so each role/content pair becomes its own ``{role, parts:[{type,content}]}`` message on both ``gen_ai.input.messages`` and ``gen_ai.output.messages``. Also surfaces BFCL-captured error strings (``Error during inference:``, ``Error during execution:``) and unhandled wrapped exceptions via ``span.record_exception`` so spans marked ERROR carry a visible exception event with the error message instead of just a status code. Change-Id: I372e87b683f907431889ac4d306bf6c235ec36ac Co-developed-by: Claude <noreply@anthropic.com>
1 parent bf5ed89 commit b79852d

1 file changed

Lines changed: 205 additions & 47 deletions

File tree

  • instrumentation-loongsuite/loongsuite-instrumentation-bfclv4/src/opentelemetry/instrumentation/bfclv4/internal

instrumentation-loongsuite/loongsuite-instrumentation-bfclv4/src/opentelemetry/instrumentation/bfclv4/internal/wrappers.py

Lines changed: 205 additions & 47 deletions
Original file line numberDiff line numberDiff line change
@@ -35,7 +35,7 @@ def __call__(self, wrapped, instance, args, kwargs):
3535
import sys
3636
import time
3737
from contextvars import ContextVar
38-
from typing import Any, Callable, Iterable, List, Optional
38+
from typing import Any, Callable, Dict, Iterable, List, Optional
3939

4040
from opentelemetry.instrumentation.bfclv4.internal.attributes import (
4141
BFCL_NUM_THREADS,
@@ -84,6 +84,8 @@ def __call__(self, wrapped, instance, args, kwargs):
8484
from opentelemetry.util.genai.types import (
8585
FunctionToolDefinition,
8686
GenericToolDefinition,
87+
InputMessage,
88+
OutputMessage,
8789
Text,
8890
)
8991

@@ -173,6 +175,134 @@ def _system_instruction_dict(content: Any) -> dict:
173175
return {"type": "text", "content": truncate_text(_safe_str(content))}
174176

175177

178+
class _BFCLCapturedError(RuntimeError):
179+
"""Synthetic exception that surfaces BFCL-captured error strings on spans.
180+
181+
BFCL's outer ``multi_threaded_inference`` swallows real exceptions and
182+
converts them into ``"Error during inference: ..."`` strings; the same
183+
happens for tool execution errors. We wrap those strings in this class so
184+
that ``span.record_exception`` produces a real exception event with the
185+
error message visible to span consumers.
186+
"""
187+
188+
189+
def _record_span_error(
190+
span: Any,
191+
error_text: str,
192+
*,
193+
exc_type: type = _BFCLCapturedError,
194+
attributes: Optional[Dict[str, Any]] = None,
195+
) -> None:
196+
if span is None:
197+
return
198+
try:
199+
if not span.is_recording():
200+
return
201+
except Exception: # noqa: BLE001
202+
return
203+
try:
204+
from opentelemetry.trace import Status, StatusCode
205+
except Exception: # noqa: BLE001
206+
return
207+
exc = exc_type(error_text)
208+
try:
209+
span.record_exception(exc, attributes=attributes or None)
210+
except Exception: # noqa: BLE001
211+
logger.debug("bfclv4: record_exception failed", exc_info=True)
212+
try:
213+
span.set_status(Status(StatusCode.ERROR, error_text[:200]))
214+
except Exception: # noqa: BLE001
215+
logger.debug("bfclv4: set_status ERROR failed", exc_info=True)
216+
217+
218+
def _normalise_role(value: Any, default: str) -> str:
219+
if value in (None, "", [], {}):
220+
return default
221+
role = str(value)
222+
return role or default
223+
224+
225+
def _normalise_message_dict(item: Any, *, default_role: str) -> Optional[dict]:
226+
"""Convert a single BFCL message-like value to ``{role, parts:[{type,content}]}``.
227+
228+
Returns ``None`` for empty values so callers can skip them.
229+
"""
230+
if item in (None, "", [], {}):
231+
return None
232+
if isinstance(item, dict):
233+
role = _normalise_role(item.get("role"), default_role)
234+
content = item.get("content")
235+
if content in (None, "", [], {}):
236+
extras = {
237+
k: v
238+
for k, v in item.items()
239+
if k not in {"role", "name", "tool_call_id"}
240+
}
241+
content = extras if extras else None
242+
if content in (None, "", [], {}):
243+
return None
244+
text = truncate_text(_safe_str(content))
245+
return {"role": role, "parts": [{"type": "text", "content": text}]}
246+
text = truncate_text(_safe_str(item))
247+
return {"role": default_role, "parts": [{"type": "text", "content": text}]}
248+
249+
250+
def _flatten_messages(value: Any, default_role: str = "user") -> List[dict]:
251+
"""Flatten arbitrary BFCL question/answer structures into a list of message dicts.
252+
253+
BFCL stores multi-turn questions as ``[[{...}, {...}], [{...}]]`` (list of
254+
turns, each turn a list of role/content dicts). Single-turn entries are
255+
``[{...}]`` or even a bare dict/string. We flatten everything one level so
256+
each role/content pair becomes its own ``{role, parts:[{type,content}]}``
257+
message — avoiding the previous behaviour where the whole nested list was
258+
JSON-stringified into a single message's ``content`` field.
259+
"""
260+
messages: List[dict] = []
261+
if value in (None, "", [], {}):
262+
return messages
263+
if isinstance(value, dict):
264+
msg = _normalise_message_dict(value, default_role=default_role)
265+
if msg is not None:
266+
messages.append(msg)
267+
return messages
268+
if isinstance(value, (list, tuple)):
269+
for item in value:
270+
messages.extend(_flatten_messages(item, default_role))
271+
return messages
272+
msg = _normalise_message_dict(value, default_role=default_role)
273+
if msg is not None:
274+
messages.append(msg)
275+
return messages
276+
277+
278+
def _messages_to_input(messages: List[dict]) -> List[InputMessage]:
279+
result: List[InputMessage] = []
280+
for msg in messages:
281+
parts = [Text(content=p.get("content", "")) for p in msg.get("parts", [])]
282+
if not parts:
283+
continue
284+
result.append(InputMessage(role=msg.get("role", "user"), parts=parts))
285+
return result
286+
287+
288+
def _messages_to_output(
289+
messages: List[dict], finish_reason: str = "stop"
290+
) -> List[OutputMessage]:
291+
result: List[OutputMessage] = []
292+
for msg in messages:
293+
parts = [Text(content=p.get("content", "")) for p in msg.get("parts", [])]
294+
if not parts:
295+
continue
296+
result.append(
297+
OutputMessage(
298+
role=msg.get("role", "assistant"),
299+
parts=parts,
300+
finish_reason=finish_reason,
301+
)
302+
)
303+
return result
304+
305+
176306
def _test_entry_to_messages(test_entry: Any):
177307
if not isinstance(test_entry, dict):
178308
return [], []
@@ -378,10 +508,10 @@ def _normalise_tool_arguments(arguments: Any) -> Any:
378508
def _extract_questions_from_cases(cases: Any) -> list:
379509
if not isinstance(cases, (list, tuple)):
380510
return []
381-
messages = []
511+
messages: list = []
382512
for case in cases[:10]:
383513
if isinstance(case, dict) and case.get("question") is not None:
384-
messages.append(_message_dict("user", case.get("question")))
514+
messages.extend(_flatten_messages(case.get("question"), "user"))
385515
return messages
386516

387517

@@ -600,7 +730,7 @@ def __call__(self, wrapped: Callable, instance: Any, args, kwargs): # noqa: D40
600730
entry_inv = EntryInvocation(session_id=session_id)
601731
entry_input_messages = _extract_questions_from_cases(test_cases_total)
602732
entry_system_instructions = _extract_tool_defs_from_cases(test_cases_total)
603-
entry_inv.input_messages = to_text_input("user", _safe_str(entry_input_messages))
733+
entry_inv.input_messages = _messages_to_input(entry_input_messages)
604734
handler = get_extended_telemetry_handler()
605735

606736
attributes = {GEN_AI_FRAMEWORK: FRAMEWORK_NAME}
@@ -635,7 +765,18 @@ def __call__(self, wrapped: Callable, instance: Any, args, kwargs): # noqa: D40
635765
)
636766
_set_json_span_attr(inv.span, GEN_AI_INPUT_MESSAGES_ATTR, entry_input_messages)
637767
_set_json_span_attr(inv.span, GEN_AI_SYSTEM_INSTRUCTIONS_ATTR, entry_system_instructions)
638-
result = wrapped(*args, **kwargs)
768+
try:
769+
result = wrapped(*args, **kwargs)
770+
except Exception as exc:
771+
if inv.span is not None and inv.span.is_recording():
772+
try:
773+
inv.span.record_exception(exc)
774+
except Exception: # noqa: BLE001
775+
logger.debug(
776+
"bfclv4 ENTRY: record_exception failed",
777+
exc_info=True,
778+
)
779+
raise
639780
if inv.span is not None and inv.span.is_recording():
640781
_set_json_span_attr(
641782
inv.span,
@@ -734,24 +875,45 @@ def __call__(self, wrapped: Callable, instance: Any, args, kwargs): # noqa: D40
734875
# content-capture mode, which makes K8s semantic validation opaque.
735876
question = test_entry.get("question")
736877
functions = test_entry.get("function")
737-
if question is not None:
738-
inv.input_messages = to_text_input(
739-
"user", truncate_text(_safe_str(question))
740-
)
878+
input_messages_dicts = _flatten_messages(question, "user")
879+
if input_messages_dicts:
880+
inv.input_messages = _messages_to_input(input_messages_dicts)
741881
if functions is not None:
742-
inv.system_instruction = to_text_input(
882+
system_inputs = to_text_input(
743883
"system", truncate_text(_safe_str(functions))
744-
)[0].parts if to_text_input("system", truncate_text(_safe_str(functions))) else []
884+
)
885+
inv.system_instruction = (
886+
system_inputs[0].parts if system_inputs else []
887+
)
745888
if inv.span is not None and inv.span.is_recording():
746-
_set_json_span_attr(inv.span, GEN_AI_INPUT_MESSAGES_ATTR, [_message_dict("user", question)])
747-
_set_json_span_attr(inv.span, GEN_AI_SYSTEM_INSTRUCTIONS_ATTR, [_system_instruction_dict(functions)])
889+
if input_messages_dicts:
890+
_set_json_span_attr(
891+
inv.span,
892+
GEN_AI_INPUT_MESSAGES_ATTR,
893+
input_messages_dicts,
894+
)
895+
if functions is not None:
896+
_set_json_span_attr(
897+
inv.span,
898+
GEN_AI_SYSTEM_INSTRUCTIONS_ATTR,
899+
[_system_instruction_dict(functions)],
900+
)
748901
# Run the original inference call.
749902
try:
750903
result = wrapped(*args, **kwargs)
751904
except Exception as exc:
752-
# The CM will mark the span as failed; we leave it to
753-
# the handler/CM to call ``fail_invoke_agent``.
754-
raise exc
905+
# The CM will mark the span as failed; record the
906+
# exception explicitly so the traceback/message is visible
907+
# on the span (util-genai's fail path only sets status).
908+
if inv.span is not None and inv.span.is_recording():
909+
try:
910+
inv.span.record_exception(exc)
911+
except Exception: # noqa: BLE001
912+
logger.debug(
913+
"bfclv4 AGENT: record_exception failed",
914+
exc_info=True,
915+
)
916+
raise
755917

756918
# Detect BFCL's own captured error path (no exception raised
757919
# but the returned result is the error string).
@@ -767,20 +929,12 @@ def __call__(self, wrapped: Callable, instance: Any, args, kwargs): # noqa: D40
767929
if (
768930
isinstance(result_payload, str)
769931
and result_payload.startswith(_BFCL_INFERENCE_ERROR_PREFIX)
770-
and inv.span is not None
771-
and inv.span.is_recording()
772932
):
773-
try:
774-
from opentelemetry.trace import Status, StatusCode
775-
776-
inv.span.set_status(
777-
Status(StatusCode.ERROR, result_payload[:200])
778-
)
779-
except Exception: # noqa: BLE001
780-
logger.debug(
781-
"bfclv4 AGENT: failed to set ERROR status",
782-
exc_info=True,
783-
)
933+
_record_span_error(
934+
inv.span,
935+
result_payload,
936+
attributes={"bfcl.error.captured": True},
937+
)
784938

785939
if isinstance(metadata_payload, dict):
786940
input_tokens = _flatten_tokens(
@@ -795,12 +949,22 @@ def __call__(self, wrapped: Callable, instance: Any, args, kwargs): # noqa: D40
795949
inv.output_tokens = output_tokens
796950

797951
if result_payload is not None:
798-
inv.output_messages = to_text_output(
799-
"assistant",
800-
truncate_text(_safe_str(result_payload)),
952+
output_messages_dicts = _flatten_messages(
953+
result_payload, "assistant"
954+
)
955+
if not output_messages_dicts:
956+
output_messages_dicts = [
957+
_message_dict("assistant", result_payload)
958+
]
959+
inv.output_messages = _messages_to_output(
960+
output_messages_dicts
801961
)
802962
if inv.span is not None and inv.span.is_recording():
803-
_set_json_span_attr(inv.span, GEN_AI_OUTPUT_MESSAGES_ATTR, [_message_dict("assistant", result_payload)])
963+
_set_json_span_attr(
964+
inv.span,
965+
GEN_AI_OUTPUT_MESSAGES_ATTR,
966+
output_messages_dicts,
967+
)
804968

805969
synthetic_tool_count = _emit_synthetic_tool_spans(
806970
result_payload,
@@ -1156,20 +1320,14 @@ def __call__(self, wrapped: Callable, instance: Any, args, kwargs): # noqa: D40
11561320
if isinstance(execution_result, str) and execution_result.startswith(
11571321
"Error during execution:"
11581322
):
1159-
try:
1160-
from opentelemetry.trace import (
1161-
Status,
1162-
StatusCode,
1163-
)
1164-
1165-
span.set_status(
1166-
Status(
1167-
StatusCode.ERROR,
1168-
execution_result[:200],
1169-
)
1170-
)
1171-
except Exception: # noqa: BLE001
1172-
pass
1323+
_record_span_error(
1324+
span,
1325+
execution_result,
1326+
attributes={
1327+
"bfcl.tool.error.captured": True,
1328+
BFCL_TOOL_INDEX: index,
1329+
},
1330+
)
11731331
# Approximate latency by sleeping the budgeted slice
11741332
# would distort BFCL execution; we instead rely on
11751333
# span start/end (currently both wall-clock-now).

0 commit comments

Comments
 (0)