Skip to content

Commit c5706bc

Browse files
authored
Merge branch 'next' into chris-villegas/AGX1-241/implement-sdk-credential-abstraction
2 parents 841bcda + bd90a61 commit c5706bc

1 file changed

Lines changed: 36 additions & 3 deletions

File tree

  • examples/tutorials/10_async/10_temporal/130_langgraph/project

examples/tutorials/10_async/10_temporal/130_langgraph/project/graph.py

Lines changed: 36 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,15 @@
1010
The router and tools are ``async`` so LangGraph awaits them directly — a sync
1111
callable would be offloaded via ``run_in_executor``, which Temporal's workflow
1212
event loop does not support.
13+
14+
The in-workflow ``tools`` node is a plain ``async`` function rather than
15+
LangGraph's ``ToolNode`` prebuilt on purpose. The plugin wraps an in-workflow
16+
node in ``wrap_workflow``, whose closure captures the wrapped object. When that
17+
object is itself a LangChain ``Runnable`` (as ``ToolNode`` is), LangGraph's
18+
``compile()`` subgraph detection (``find_subgraph_pregel`` →
19+
``get_function_nonlocals``) recurses through that wrapper without cycle
20+
detection and never terminates, tripping Temporal's deadlock detector. A plain
21+
function isn't a ``Runnable``, so compile stays trivial.
1322
"""
1423

1524
from __future__ import annotations
@@ -26,12 +35,14 @@
2635

2736
from langgraph.graph import END, START, StateGraph
2837
from langchain_openai import ChatOpenAI
29-
from langgraph.prebuilt import ToolNode
30-
from langchain_core.messages import SystemMessage
38+
from langchain_core.messages import ToolMessage, SystemMessage
3139
from langgraph.graph.message import add_messages
3240

3341
from project.tools import TOOLS
3442

43+
# Look up tools by name for the in-workflow tools node.
44+
_TOOLS_BY_NAME = {tool.name: tool for tool in TOOLS}
45+
3546
# Name this graph is registered under in the LangGraphPlugin (acp.py / run_worker.py).
3647
GRAPH_NAME = "at130-langgraph"
3748
MODEL_NAME = "gpt-4o"
@@ -58,6 +69,28 @@ async def agent_node(state: AgentState) -> dict[str, Any]:
5869
return {"messages": [await llm.ainvoke(messages)]}
5970

6071

72+
async def tools_node(state: AgentState) -> dict[str, Any]:
73+
"""Run the tool calls the model requested. Runs inline in the workflow.
74+
75+
A plain ``async`` function (not LangGraph's ``ToolNode``) — see the module
76+
docstring for why a ``Runnable`` tools node can't be compiled here.
77+
"""
78+
last = state["messages"][-1]
79+
results: list[Any] = []
80+
for call in getattr(last, "tool_calls", None) or []:
81+
tool = _TOOLS_BY_NAME.get(call["name"])
82+
# Mirror ToolNode: surface an unknown/hallucinated tool name as an error
83+
# ToolMessage so the graph keeps running instead of crashing the node.
84+
if tool is None:
85+
output = f"Error: unknown tool {call['name']!r}. Available: {list(_TOOLS_BY_NAME)}"
86+
else:
87+
output = await tool.ainvoke(call["args"])
88+
results.append(
89+
ToolMessage(content=str(output), tool_call_id=call["id"], name=call["name"])
90+
)
91+
return {"messages": results}
92+
93+
6194
async def route_after_agent(state: AgentState) -> str:
6295
"""Go to the tools node if the model requested tools, else finish (async router)."""
6396
last = state["messages"][-1]
@@ -72,7 +105,7 @@ def build_graph() -> StateGraph:
72105
agent_node,
73106
metadata={"execute_in": "activity", "start_to_close_timeout": timedelta(minutes=5)},
74107
)
75-
builder.add_node("tools", ToolNode(TOOLS), metadata={"execute_in": "workflow"})
108+
builder.add_node("tools", tools_node, metadata={"execute_in": "workflow"})
76109
builder.add_edge(START, "agent")
77110
builder.add_conditional_edges("agent", route_after_agent, {"tools": "tools", END: END})
78111
builder.add_edge("tools", "agent")

0 commit comments

Comments
 (0)