Skip to content

Commit 4ff952e

Browse files
Python: Capture context provider instructions in agent telemetry (microsoft#6515)
* Fix agent instructions telemetry Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com> * Simplify agent instructions telemetry guard Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com> * Fix observability mypy cast Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com> --------- Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com>
1 parent 7bf2d2a commit 4ff952e

2 files changed

Lines changed: 246 additions & 35 deletions

File tree

python/packages/core/agent_framework/observability.py

Lines changed: 127 additions & 35 deletions
Original file line numberDiff line numberDiff line change
@@ -1488,14 +1488,21 @@ def get_response(
14881488
)
14891489

14901490
if stream:
1491+
agent_span = trace.get_current_span()
14911492
span = _start_streaming_span(attributes, OtelAttr.REQUEST_MODEL)
14921493

14931494
if OBSERVABILITY_SETTINGS.SENSITIVE_DATA_ENABLED and messages and span.is_recording():
1495+
system_instructions = _get_instructions_from_options(opts)
1496+
_capture_current_agent_system_instructions(
1497+
agent_span,
1498+
span,
1499+
system_instructions,
1500+
)
14941501
_capture_messages(
14951502
span=span,
14961503
provider_name=provider_name,
14971504
messages=messages,
1498-
system_instructions=opts.get("instructions"),
1505+
system_instructions=system_instructions,
14991506
)
15001507

15011508
span_state = {"closed": False}
@@ -1585,13 +1592,20 @@ async def _finalize_stream() -> None:
15851592
return wrapped_stream
15861593

15871594
async def _get_response() -> ChatResponse:
1595+
agent_span = trace.get_current_span()
15881596
with _get_span(attributes=attributes, span_name_attribute=OtelAttr.REQUEST_MODEL) as span:
15891597
if OBSERVABILITY_SETTINGS.SENSITIVE_DATA_ENABLED and messages and span.is_recording():
1598+
system_instructions = _get_instructions_from_options(opts)
1599+
_capture_current_agent_system_instructions(
1600+
agent_span,
1601+
span,
1602+
system_instructions,
1603+
)
15901604
_capture_messages(
15911605
span=span,
15921606
provider_name=provider_name,
15931607
messages=messages,
1594-
system_instructions=opts.get("instructions"),
1608+
system_instructions=system_instructions,
15951609
)
15961610
start_time_stamp = perf_counter()
15971611
try:
@@ -1761,7 +1775,6 @@ def _trace_agent_invocation(
17611775
inner_response_telemetry_captured_fields
17621776
)
17631777
inner_accumulated_usage_token = INNER_ACCUMULATED_USAGE.set({})
1764-
17651778
if stream:
17661779
span = _start_streaming_span(attributes, OtelAttr.AGENT_NAME)
17671780

@@ -1859,38 +1872,44 @@ async def _finalize_stream() -> None:
18591872
async def _run() -> AgentResponse[Any]:
18601873
try:
18611874
with _get_span(attributes=attributes, span_name_attribute=OtelAttr.AGENT_NAME) as span:
1862-
if OBSERVABILITY_SETTINGS.SENSITIVE_DATA_ENABLED and messages and span.is_recording():
1863-
_capture_messages(
1864-
span=span,
1865-
provider_name=provider_name,
1866-
messages=messages,
1867-
system_instructions=_get_instructions_from_options(dict(merged_options)),
1868-
)
1869-
start_time_stamp = perf_counter()
18701875
try:
1871-
response: AgentResponse[Any] = await execute()
1872-
except Exception as exception:
1873-
capture_exception(span=span, exception=exception, timestamp=time_ns())
1874-
raise
1875-
duration = perf_counter() - start_time_stamp
1876-
if response:
1877-
response_attributes = _get_response_attributes(
1878-
attributes,
1879-
response,
1880-
capture_response_id=INNER_RESPONSE_ID_CAPTURED_FIELD
1881-
not in inner_response_telemetry_captured_fields,
1882-
capture_usage=INNER_USAGE_CAPTURED_FIELD not in inner_response_telemetry_captured_fields,
1883-
)
1884-
_apply_accumulated_usage(response_attributes, inner_response_telemetry_captured_fields)
1885-
_capture_response(span=span, attributes=response_attributes, duration=duration)
1886-
if OBSERVABILITY_SETTINGS.SENSITIVE_DATA_ENABLED and response.messages and span.is_recording():
1876+
if OBSERVABILITY_SETTINGS.SENSITIVE_DATA_ENABLED and messages and span.is_recording():
18871877
_capture_messages(
18881878
span=span,
18891879
provider_name=provider_name,
1890-
messages=response.messages,
1891-
output=True,
1880+
messages=messages,
1881+
system_instructions=_get_instructions_from_options(dict(merged_options)),
1882+
)
1883+
start_time_stamp = perf_counter()
1884+
response: AgentResponse[Any] = await execute()
1885+
duration = perf_counter() - start_time_stamp
1886+
if response:
1887+
response_attributes = _get_response_attributes(
1888+
attributes,
1889+
response,
1890+
capture_response_id=INNER_RESPONSE_ID_CAPTURED_FIELD
1891+
not in inner_response_telemetry_captured_fields,
1892+
capture_usage=(
1893+
INNER_USAGE_CAPTURED_FIELD not in inner_response_telemetry_captured_fields
1894+
),
18921895
)
1893-
return response # type: ignore[return-value,no-any-return]
1896+
_apply_accumulated_usage(response_attributes, inner_response_telemetry_captured_fields)
1897+
_capture_response(span=span, attributes=response_attributes, duration=duration)
1898+
if (
1899+
OBSERVABILITY_SETTINGS.SENSITIVE_DATA_ENABLED
1900+
and response.messages
1901+
and span.is_recording()
1902+
):
1903+
_capture_messages(
1904+
span=span,
1905+
provider_name=provider_name,
1906+
messages=response.messages,
1907+
output=True,
1908+
)
1909+
return response # type: ignore[return-value,no-any-return]
1910+
except Exception as exception:
1911+
capture_exception(span=span, exception=exception, timestamp=time_ns())
1912+
raise
18941913
finally:
18951914
INNER_RESPONSE_TELEMETRY_CAPTURED_FIELDS.reset(inner_response_telemetry_captured_fields_token)
18961915
INNER_ACCUMULATED_USAGE.reset(inner_accumulated_usage_token)
@@ -2263,6 +2282,83 @@ def capture_exception(span: trace.Span, exception: Exception, timestamp: int | N
22632282
span.set_status(status=trace.StatusCode.ERROR, description=repr(exception))
22642283

22652284

2285+
def _capture_system_instructions(span: trace.Span, system_instructions: str | list[str] | None) -> None:
2286+
"""Capture system instructions on a span."""
2287+
if not system_instructions:
2288+
return
2289+
otel_sys_instructions = [
2290+
{"type": "text", "content": instruction} for instruction in _normalize_instructions(system_instructions)
2291+
]
2292+
span.set_attribute(OtelAttr.SYSTEM_INSTRUCTIONS, json.dumps(otel_sys_instructions, ensure_ascii=False))
2293+
2294+
2295+
def _capture_current_agent_system_instructions(
2296+
agent_span: trace.Span,
2297+
chat_span: trace.Span,
2298+
system_instructions: str | list[str] | None,
2299+
) -> None:
2300+
"""Capture final chat instructions on the current agent span when the chat span belongs to it."""
2301+
if not system_instructions or not agent_span.is_recording():
2302+
return
2303+
2304+
agent_attributes_obj = getattr(agent_span, "attributes", None)
2305+
if not isinstance(agent_attributes_obj, Mapping):
2306+
return
2307+
agent_attributes = cast(Mapping[str, Any], agent_attributes_obj)
2308+
if agent_attributes.get(OtelAttr.OPERATION.value) != OtelAttr.AGENT_INVOKE_OPERATION:
2309+
return
2310+
2311+
if not _instructions_preserve_existing_agent_instructions(agent_attributes, system_instructions):
2312+
return
2313+
2314+
chat_parent = getattr(chat_span, "parent", None)
2315+
agent_context = agent_span.get_span_context()
2316+
if (
2317+
chat_parent is None
2318+
or chat_parent.span_id != agent_context.span_id
2319+
or chat_parent.trace_id != agent_context.trace_id
2320+
):
2321+
return
2322+
2323+
_capture_system_instructions(agent_span, system_instructions)
2324+
2325+
2326+
def _normalize_instructions(system_instructions: str | list[str]) -> list[str]:
2327+
"""Normalize system instructions to telemetry text items."""
2328+
return system_instructions if isinstance(system_instructions, list) else [system_instructions]
2329+
2330+
2331+
def _instructions_preserve_existing_agent_instructions(
2332+
agent_attributes: Mapping[str, Any],
2333+
system_instructions: str | list[str],
2334+
) -> bool:
2335+
"""Return True when chat instructions preserve the agent span's existing instructions."""
2336+
existing = agent_attributes.get(OtelAttr.SYSTEM_INSTRUCTIONS)
2337+
if not isinstance(existing, str):
2338+
return True
2339+
2340+
try:
2341+
existing_items_obj = json.loads(existing)
2342+
except json.JSONDecodeError:
2343+
return False
2344+
2345+
if not isinstance(existing_items_obj, list):
2346+
return False
2347+
existing_items = cast(list[object], existing_items_obj)
2348+
2349+
existing_contents: list[str] = []
2350+
for item in existing_items:
2351+
if not isinstance(item, Mapping):
2352+
continue
2353+
content = cast(Mapping[str, Any], item).get("content")
2354+
if isinstance(content, str):
2355+
existing_contents.append(content)
2356+
2357+
existing_text = "\n".join(existing_contents)
2358+
new_text = "\n".join(_normalize_instructions(system_instructions))
2359+
return new_text == existing_text or new_text.startswith(f"{existing_text}\n")
2360+
2361+
22662362
def _capture_messages(
22672363
span: trace.Span,
22682364
provider_name: str,
@@ -2294,11 +2390,7 @@ def _capture_messages(
22942390
span.set_attribute(
22952391
OtelAttr.OUTPUT_MESSAGES if output else OtelAttr.INPUT_MESSAGES, json.dumps(otel_messages, ensure_ascii=False)
22962392
)
2297-
if system_instructions:
2298-
if not isinstance(system_instructions, list):
2299-
system_instructions = [system_instructions]
2300-
otel_sys_instructions = [{"type": "text", "content": instruction} for instruction in system_instructions]
2301-
span.set_attribute(OtelAttr.SYSTEM_INSTRUCTIONS, json.dumps(otel_sys_instructions, ensure_ascii=False))
2393+
_capture_system_instructions(span, system_instructions)
23022394

23032395

23042396
def _to_otel_message(message: Message) -> dict[str, Any]:

python/packages/core/tests/core/test_observability.py

Lines changed: 119 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@
1717
ChatResponse,
1818
ChatResponseUpdate,
1919
Content,
20+
ContextProvider,
2021
Message,
2122
RawAgent,
2223
ResponseStream,
@@ -3647,6 +3648,124 @@ async def test_agent_streaming_instructions_merged_from_default_and_options(
36473648
assert "Stream override." in system_instructions[0]["content"]
36483649

36493650

3651+
@pytest.mark.parametrize("enable_sensitive_data", [True], indirect=True)
3652+
@pytest.mark.parametrize("stream", [False, True])
3653+
async def test_agent_instructions_include_context_provider_extensions(
3654+
mock_chat_client,
3655+
span_exporter: InMemorySpanExporter,
3656+
enable_sensitive_data,
3657+
stream: bool,
3658+
) -> None:
3659+
"""Agent span instructions include instructions added by context providers."""
3660+
import json
3661+
3662+
class UserMemoryProvider(ContextProvider):
3663+
def __init__(self) -> None:
3664+
super().__init__(source_id="user-memory")
3665+
3666+
async def before_run(
3667+
self,
3668+
*,
3669+
agent: Any,
3670+
session: Any,
3671+
context: Any,
3672+
state: dict[str, Any],
3673+
) -> None:
3674+
context.extend_instructions(self.source_id, "The user's name is Alice.")
3675+
3676+
agent = Agent(
3677+
client=mock_chat_client(),
3678+
name="memory_agent",
3679+
instructions="You are a friendly assistant.",
3680+
context_providers=[UserMemoryProvider()],
3681+
)
3682+
3683+
span_exporter.clear()
3684+
if stream:
3685+
result_stream = agent.run("Hello", stream=True)
3686+
async for _ in result_stream:
3687+
pass
3688+
await result_stream.get_final_response()
3689+
else:
3690+
await agent.run("Hello")
3691+
3692+
spans = span_exporter.get_finished_spans()
3693+
agent_spans = [
3694+
span for span in spans if span.attributes.get(OtelAttr.OPERATION.value) == OtelAttr.AGENT_INVOKE_OPERATION
3695+
]
3696+
assert len(agent_spans) == 1
3697+
3698+
system_instructions = json.loads(agent_spans[0].attributes[OtelAttr.SYSTEM_INSTRUCTIONS])
3699+
contents = [item["content"] for item in system_instructions]
3700+
assert any("You are a friendly assistant." in content for content in contents)
3701+
assert any("The user's name is Alice." in content for content in contents)
3702+
3703+
3704+
@pytest.mark.parametrize("enable_sensitive_data", [True], indirect=True)
3705+
async def test_agent_instructions_not_overwritten_by_unrelated_nested_chat(
3706+
mock_chat_client,
3707+
span_exporter: InMemorySpanExporter,
3708+
enable_sensitive_data,
3709+
) -> None:
3710+
"""Unrelated nested chat calls must not overwrite agent span instructions."""
3711+
import json
3712+
3713+
class NestedChatProvider(ContextProvider):
3714+
def __init__(self, nested_client: BaseChatClient[Any]) -> None:
3715+
super().__init__(source_id="nested-chat")
3716+
self.nested_client = nested_client
3717+
3718+
async def before_run(
3719+
self,
3720+
*,
3721+
agent: Any,
3722+
session: Any,
3723+
context: Any,
3724+
state: dict[str, Any],
3725+
) -> None:
3726+
context.extend_instructions(self.source_id, "Context-provided instructions.")
3727+
3728+
async def after_run(
3729+
self,
3730+
*,
3731+
agent: Any,
3732+
session: Any,
3733+
context: Any,
3734+
state: dict[str, Any],
3735+
) -> None:
3736+
await self.nested_client.get_response(
3737+
messages=[Message(role="user", contents=["Nested request"])],
3738+
options={"model": "NestedModel", "instructions": "Unrelated nested instructions."},
3739+
client_kwargs={"session": session},
3740+
)
3741+
3742+
agent = Agent(
3743+
client=mock_chat_client(),
3744+
name="guarded_agent",
3745+
instructions="Base agent instructions.",
3746+
context_providers=[NestedChatProvider(mock_chat_client())],
3747+
)
3748+
3749+
span_exporter.clear()
3750+
await agent.run("Hello")
3751+
3752+
spans = span_exporter.get_finished_spans()
3753+
agent_spans = [
3754+
span for span in spans if span.attributes.get(OtelAttr.OPERATION.value) == OtelAttr.AGENT_INVOKE_OPERATION
3755+
]
3756+
assert len(agent_spans) == 1
3757+
chat_spans = [
3758+
span for span in spans if span.attributes.get(OtelAttr.OPERATION.value) == OtelAttr.CHAT_COMPLETION_OPERATION
3759+
]
3760+
assert len(chat_spans) == 2
3761+
3762+
system_instructions = json.loads(agent_spans[0].attributes[OtelAttr.SYSTEM_INSTRUCTIONS])
3763+
contents = [item["content"] for item in system_instructions]
3764+
assert any("Base agent instructions." in content for content in contents)
3765+
assert any("Context-provided instructions." in content for content in contents)
3766+
assert all("Unrelated nested instructions." not in content for content in contents)
3767+
3768+
36503769
@pytest.mark.parametrize("enable_sensitive_data", [True], indirect=True)
36513770
async def test_agent_no_instructions_in_default_or_options(
36523771
mock_chat_agent, span_exporter: InMemorySpanExporter, enable_sensitive_data

0 commit comments

Comments
 (0)