Skip to content

Commit 49eb8d6

Browse files
committed
Implement wrapping for on_messages and on_messages_stream with FunctionTrace for improved traceability
1 parent dadbd95 commit 49eb8d6

1 file changed

Lines changed: 96 additions & 4 deletions

File tree

newrelic/hooks/mlmodel_autogen.py

Lines changed: 96 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,7 @@
1919
import uuid
2020

2121
from newrelic.api.function_trace import FunctionTrace
22-
from newrelic.api.time_trace import current_trace, get_trace_linking_metadata
22+
from newrelic.api.time_trace import get_trace_linking_metadata
2323
from newrelic.api.transaction import current_transaction
2424
from newrelic.common.object_names import callable_name
2525
from newrelic.common.object_wrapper import wrap_function_wrapper
@@ -41,6 +41,10 @@
4141
# This allows nested agents created inside tools to find the parent trace.
4242
_nr_tool_parent_trace = contextvars.ContextVar("_nr_tool_parent_trace", default=None)
4343

44+
# Flag to indicate we're inside wrap_on_messages, so on_messages_stream can skip
45+
# creating a duplicate agent FT (on_messages internally calls on_messages_stream).
46+
_nr_in_on_messages = contextvars.ContextVar("_nr_in_on_messages", default=False)
47+
4448
RECORD_EVENTS_FAILURE_LOG_MESSAGE = "Exception occurred in Autogen instrumentation: Failed to record LLM events. Please report this issue to New Relic Support.\n%s"
4549

4650

@@ -60,6 +64,74 @@ async def wrap_from_server_params(wrapped, instance, args, kwargs):
6064
return await wrapped(*args, **kwargs)
6165

6266

67+
async def wrap_on_messages(wrapped, instance, args, kwargs):
68+
"""Wrap on_messages (a regular async method) with an agent FunctionTrace.
69+
70+
on_messages is called by run() and internally iterates on_messages_stream.
71+
Since on_messages is awaited (not an async generator), the FT can stay open
72+
for the full execution, making tool FTs proper children of this agent FT.
73+
"""
74+
transaction = current_transaction()
75+
if not transaction:
76+
# When a tool calls an inner agent on a different thread, NR's thread-local context is lost.
77+
# The ContextVar is propagated by asyncio, so we can recover the parent trace from it.
78+
parent_trace = _nr_tool_parent_trace.get(None)
79+
if parent_trace:
80+
with ContextOf(trace=parent_trace):
81+
return await _on_messages_instrumented(wrapped, instance, args, kwargs)
82+
return await wrapped(*args, **kwargs)
83+
84+
return await _on_messages_instrumented(wrapped, instance, args, kwargs)
85+
86+
87+
async def _on_messages_instrumented(wrapped, instance, args, kwargs):
88+
transaction = current_transaction()
89+
if not transaction:
90+
return await wrapped(*args, **kwargs)
91+
92+
settings = transaction.settings or global_settings()
93+
if not settings.ai_monitoring.enabled:
94+
return await wrapped(*args, **kwargs)
95+
96+
# Framework metric also used for entity tagging in the UI
97+
transaction.add_ml_model_info("Autogen", AUTOGEN_VERSION)
98+
transaction._add_agent_attribute("llm", True)
99+
100+
agent_name = getattr(instance, "name", "agent")
101+
agent_id = str(uuid.uuid4())
102+
func_name = callable_name(wrapped)
103+
function_trace_name = f"{func_name}/{agent_name}"
104+
105+
agentic_subcomponent_data = {"type": "APM-AI_AGENT", "name": agent_name}
106+
107+
ft = FunctionTrace(name=function_trace_name, group="Llm/agent/Autogen")
108+
ft.__enter__()
109+
ft._add_agent_attribute("subcomponent", json.dumps(agentic_subcomponent_data))
110+
111+
# Set flag so on_messages_stream (called internally) skips creating a duplicate agent FT.
112+
token = _nr_in_on_messages.set(True)
113+
114+
try:
115+
return_val = await wrapped(*args, **kwargs)
116+
except Exception:
117+
ft.notice_error(attributes={"agent_id": agent_id})
118+
ft.__exit__(*sys.exc_info())
119+
agent_event_dict = _construct_base_agent_event_dict(agent_name, agent_id, transaction)
120+
agent_event_dict.update({"duration": ft.duration * 1000, "error": True})
121+
transaction.record_custom_event("LlmAgent", agent_event_dict)
122+
raise
123+
finally:
124+
_nr_in_on_messages.reset(token)
125+
126+
ft.__exit__(None, None, None)
127+
128+
agent_event_dict = _construct_base_agent_event_dict(agent_name, agent_id, transaction)
129+
agent_event_dict["duration"] = ft.duration * 1000
130+
transaction.record_custom_event("LlmAgent", agent_event_dict)
131+
132+
return return_val
133+
134+
63135
def wrap_on_messages_stream(wrapped, instance, args, kwargs):
64136
transaction = current_transaction()
65137
if not transaction:
@@ -75,6 +147,15 @@ def wrap_on_messages_stream(wrapped, instance, args, kwargs):
75147

76148

77149
def _on_messages_stream_instrumented(wrapped, instance, args, kwargs):
150+
"""Wrap on_messages_stream with an agent FT.
151+
152+
on_messages_stream returns an AsyncGenerator. When called from on_messages
153+
(the run() path), the agent FT is already created by wrap_on_messages, so
154+
we skip creating a duplicate here. When called directly (the run_stream()
155+
path), we create the agent FT with immediate exit since async generators
156+
cannot reliably keep the FT open (callers exit the loop early without
157+
closing the generator).
158+
"""
78159
transaction = current_transaction()
79160
if not transaction:
80161
return wrapped(*args, **kwargs)
@@ -83,18 +164,26 @@ def _on_messages_stream_instrumented(wrapped, instance, args, kwargs):
83164
if not settings.ai_monitoring.enabled:
84165
return wrapped(*args, **kwargs)
85166

167+
# If we're already inside wrap_on_messages, skip the agent FT here to avoid
168+
# a duplicate span. The on_messages wrapper owns the agent FT in that case.
169+
if _nr_in_on_messages.get(False):
170+
return wrapped(*args, **kwargs)
171+
86172
# Framework metric also used for entity tagging in the UI
87173
transaction.add_ml_model_info("Autogen", AUTOGEN_VERSION)
88174
transaction._add_agent_attribute("llm", True)
89175

90176
agent_name = getattr(instance, "name", "agent")
91177
agent_id = str(uuid.uuid4())
92-
agent_event_dict = _construct_base_agent_event_dict(agent_name, agent_id, transaction)
93178
func_name = callable_name(wrapped)
94179
function_trace_name = f"{func_name}/{agent_name}"
95180

96181
agentic_subcomponent_data = {"type": "APM-AI_AGENT", "name": agent_name}
97182

183+
# on_messages_stream returns an AsyncGenerator. The FT is entered and exited
184+
# immediately around the sync call to create the generator. AutoGen's callers
185+
# exit the async for loop early via return without closing the generator,
186+
# which would leave an open FT on the trace stack.
98187
ft = FunctionTrace(name=function_trace_name, group="Llm/agent/Autogen")
99188
ft.__enter__()
100189
ft._add_agent_attribute("subcomponent", json.dumps(agentic_subcomponent_data))
@@ -104,14 +193,15 @@ def _on_messages_stream_instrumented(wrapped, instance, args, kwargs):
104193
except Exception:
105194
ft.notice_error(attributes={"agent_id": agent_id})
106195
ft.__exit__(*sys.exc_info())
107-
# If we hit an exception, append the error attribute and duration from the exited function trace
196+
agent_event_dict = _construct_base_agent_event_dict(agent_name, agent_id, transaction)
108197
agent_event_dict.update({"duration": ft.duration * 1000, "error": True})
109198
transaction.record_custom_event("LlmAgent", agent_event_dict)
110199
raise
111200

112201
ft.__exit__(None, None, None)
113-
agent_event_dict.update({"duration": ft.duration * 1000})
114202

203+
agent_event_dict = _construct_base_agent_event_dict(agent_name, agent_id, transaction)
204+
agent_event_dict["duration"] = ft.duration * 1000
115205
transaction.record_custom_event("LlmAgent", agent_event_dict)
116206

117207
return return_val
@@ -241,6 +331,8 @@ async def wrap__execute_tool_call(wrapped, instance, args, kwargs):
241331

242332
def instrument_autogen_agentchat_agents__assistant_agent(module):
243333
if hasattr(module, "AssistantAgent"):
334+
if hasattr(module.AssistantAgent, "on_messages"):
335+
wrap_function_wrapper(module, "AssistantAgent.on_messages", wrap_on_messages)
244336
if hasattr(module.AssistantAgent, "on_messages_stream"):
245337
wrap_function_wrapper(module, "AssistantAgent.on_messages_stream", wrap_on_messages_stream)
246338
if hasattr(module.AssistantAgent, "_execute_tool_call"):

0 commit comments

Comments
 (0)