Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,9 @@ megalinter-reports/
# Benchmarks
.asv/

# LLM Cache Files
.tiktoken_cache

# Byte-compiled / optimized / DLL files
__pycache__/
*.py[cod]
Expand Down
7 changes: 7 additions & 0 deletions newrelic/common/llm_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -120,12 +120,19 @@ def __init__(self, wrapped, on_stop_iteration, on_error, on_stream_chunk=None):
self._nr_on_stream_chunk = on_stream_chunk or noop
# Track if we've sent the LLM events yet to avoid sending them multiple times
self._nr_closed = False
# Lazily established by __aiter__ or __anext__. With LangChain's
# astream_events, __anext__ may be called before __aiter__.
self._nr_wrapped_iter = None

def __aiter__(self):
self._nr_wrapped_iter = self.__wrapped__.__aiter__()
return self

async def __anext__(self):
# Lazily establish the wrapped iterator. With astream_events,
# __anext__ may be called before __aiter__.
if self._nr_wrapped_iter is None:
self._nr_wrapped_iter = self.__wrapped__.__aiter__()
try:
return_val = await self._nr_wrapped_iter.__anext__()
self._nr_on_stream_chunk(self, return_val)
Expand Down
27 changes: 27 additions & 0 deletions newrelic/hooks/mlmodel_langchain.py
Original file line number Diff line number Diff line change
Expand Up @@ -267,6 +267,33 @@ def astream(self, *args, **kwargs):

return return_val

def astream_events(self, *args, **kwargs):
transaction = current_transaction()
if not transaction:
return self.__wrapped__.astream_events(*args, **kwargs)

agent_name = getattr(self.__wrapped__, "name", "agent")
agent_id = str(uuid.uuid4())
agent_event_dict = _construct_base_agent_event_dict(agent_name, agent_id, transaction)
function_trace_name = f"astream_events/{agent_name}"
agentic_subcomponent_data = {"type": "APM-AI_AGENT", "name": agent_name}

ft = FunctionTrace(name=function_trace_name, group="Llm/agent/LangChain")
ft.__enter__()
ft._add_agent_attribute("subcomponent", json.dumps(agentic_subcomponent_data))
try:
return_val = self.__wrapped__.astream_events(*args, **kwargs)
return_val = AsyncLLMStreamProxy(
return_val,
on_stop_iteration=self._nr_on_stop_iteration(ft, agent_event_dict),
on_error=self._nr_on_error(ft, agent_event_dict, agent_id),
)
except Exception:
self._nr_on_error(ft, agent_event_dict, agent_id)(transaction)
raise

return return_val

def transform(self, *args, **kwargs):
transaction = current_transaction()
if not transaction:
Expand Down
Loading
Loading