Skip to content

feat(adk): add eager tool execution middleware#934

Draft
shentongmartin wants to merge 63 commits into
alpha/09from
feat/eager_tool
Draft

feat(adk): add eager tool execution middleware#934
shentongmartin wants to merge 63 commits into
alpha/09from
feat/eager_tool

Conversation

@shentongmartin
Copy link
Copy Markdown
Contributor

@shentongmartin shentongmartin commented Apr 7, 2026

Eager Tool Execution

Problem

In the current ReAct loop, tools only start executing after the ChatModel finishes streaming its entire response. When the model produces multiple tool calls, this sequential wait adds unnecessary latency — the model might complete a tool call's arguments midway through the stream, but the tool sits idle until the stream ends.

Current:  Model streams ████████████ → ToolsNode executes tool1, tool2, tool3
Proposed: Model streams ██tool1▶██tool2▶██tool3▶█ → ToolsNode finds results ready

Solution

A fire-and-forget async middleware (eagerToolExecutorMiddleware) inside the model wrapper chain intercepts a copy of the model stream. As tool call arguments complete (validated by json.Valid), it immediately dispatches each tool via InternalStreamSingleToolCall. When the ToolsNode eventually runs, it finds pre-computed results via ToolExecutionProvider and skips re-execution.

The middleware is placed between eventSender and retryWrapper in the wrapper chain, receiving a Copy(2) of the stream:

eventSender → [Copy(2)] → copy[0] to retryWrapper (main path)
                        → copy[1] to eagerToolExecutor (async, fire-and-forget)

Why this placement works

  • Retry not blocked: The retryWrapper gets its own independent copy. If the model fails, retry happens immediately — the eager goroutine is just abandoned.
  • Cancel not blocked: StreamCanceledError appears on the eager copy. The executor sees it and aborts.
  • Interrupt deferred: If an eager tool interrupts, the result is stored as failed. The ToolsNode handles interrupt emission and resume as before — the eager executor is not involved in resume at all.

Decisions

Choice Alternative Rationale
Middleware, not graph node New graph node between Model and Tools A graph node would need native interrupt/resume support and would complicate the graph topology. The middleware approach is invisible to the graph.
Explicit opt-in (ToolsConfig.EagerExecution) Auto-enable when tools are present Eager execution changes observable behavior (tool callbacks fire earlier). Users should consciously enable it.
ToolsNode delegates ALL execution when ToolExecutionProvider is set ToolsNode checks pre-computed results per tool Full delegation avoids races between eager executor (still running) and ToolsNode (just started). The ToolsNode calls waitDone() and gets a complete picture.
eagerCoordHolder with sync.Mutex atomic.Pointer[T] atomic.Pointer[T] requires Go 1.19+; project targets Go 1.18. Mutex-based holder provides the same thread-safety.
Internal* prefix for non-public APIs Separate internal package InternalSetToolExecutionProvider, InternalStreamSingleToolCall follow Go ecosystem conventions (e.g., runtime.InternalXxx). Avoids package restructuring while clearly signaling "do not use directly."
Accumulator conflict detection aligned with concatToolCalls Silent overwrite on ID/Type/Name conflict concatToolCalls (schema/message.go) returns errors on conflicting ID/Type/Name. The accumulator follows the same semantics for consistency.
lastNilIdx for nil-Index chunk routing Always map nil-Index to accumulator[0] Models may emit multiple tool calls with Index: nil. When a chunk arrives with a new ID, it must start a new accumulator rather than corrupting the existing one.

Key Insight

Stream Copy(n) creates independent readers where closing one copy doesn't affect others. This makes the eager executor inherently compatible with retry: the retryWrapper closes its failed copy and retries, while the abandoned eager goroutine drains independently until it hits the same error. No coordination needed.

The accumulator's chunked streaming semantics must mirror concatToolCalls in schema/message.go — both are tool call merging implementations, and divergence between them would create subtle behavioral inconsistencies. Specifically: Extra field uses first-write-wins, and conflicting ID/Type/Name returns an error rather than silently overwriting.

Summary

Problem Solution
Tools wait for model to finish streaming before executing Eager middleware dispatches tools as soon as their JSON arguments complete in the stream
Eager execution must not break model retry Copy(2) gives retry its own stream; eager goroutine is abandoned on retry
Eager execution must not break cancel Eager goroutine detects StreamCanceledError and aborts
Eager execution must not break interrupt/resume Eager executor stores results; ToolsNode handles interrupt/resume as before
Multiple tool calls with nil-Index in chunked stream lastNilIdx tracking routes chunks to correct accumulator based on ID transitions
Incomplete JSON args dispatched after stream ends Post-loop isArgsComplete guard rejects accumulators with incomplete arguments
runToolCallTaskByStream crashes on pre-executed tasks Added task.executed early-return guard (matching existing runToolCallTaskByInvoke pattern)

工具急切执行(Eager Tool Execution)

问题

当前 ReAct 循环中,工具必须等到 ChatModel 完整输出流结束 后才开始执行。当模型产生多个工具调用时,这种串行等待增加了不必要的延迟——模型可能在流的中途就已经完成了某个工具的参数,但工具一直闲置到流结束。

当前:模型流式输出 ████████████ → ToolsNode 执行 tool1, tool2, tool3
优化:模型流式输出 ██tool1▶██tool2▶██tool3▶█ → ToolsNode 发现结果已就绪

解决方案

一个异步中间件(eagerToolExecutorMiddleware)在模型包装链中拦截模型流的副本。当工具调用参数完整时(通过 json.Valid 验证),立即通过 InternalStreamSingleToolCall 分发执行。当 ToolsNode 最终运行时,通过 ToolExecutionProvider 获取预计算结果,跳过重复执行。

中间件放置在 eventSenderretryWrapper 之间,接收 Copy(2) 的流副本:

eventSender → [Copy(2)] → copy[0] 给 retryWrapper(主路径)
                        → copy[1] 给 eagerToolExecutor(异步,即发即忘)

为什么这个位置有效

  • 不阻塞重试:retryWrapper 拥有独立副本。模型失败时立即重试——急切执行的 goroutine 直接被丢弃。
  • 不阻塞取消StreamCanceledError 出现在急切副本上。执行器看到后自行中止。
  • 中断延迟处理:如果急切执行的工具触发中断,结果被标记为失败存储。ToolsNode 像往常一样处理中断发射和恢复——急切执行器完全不参与恢复流程。

决策

选择 备选方案 原因
中间件,非图节点 Model 和 Tools 之间新增图节点 图节点需要原生中断/恢复支持,会使图拓扑复杂化。中间件方案对图透明。
显式启用(ToolsConfig.EagerExecution 有工具时自动启用 急切执行改变了可观察行为(工具回调更早触发)。用户应有意识地启用。
设置 ToolExecutionProvider 时 ToolsNode 委托全部执行 ToolsNode 逐工具检查预计算结果 全量委托避免急切执行器(仍在运行)与 ToolsNode(刚开始)之间的竞争。ToolsNode 调用 waitDone() 获取完整结果。
使用 eagerCoordHoldersync.Mutex atomic.Pointer[T] atomic.Pointer[T] 需要 Go 1.19+;项目目标为 Go 1.18。基于 Mutex 的 holder 提供相同的线程安全性。
Internal* 前缀标记非公开 API 独立的 internal InternalSetToolExecutionProviderInternalStreamSingleToolCall 遵循 Go 生态惯例(如 runtime.InternalXxx)。避免包结构重组,同时清晰标记"请勿直接使用"。
累加器冲突检测与 concatToolCalls 对齐 ID/Type/Name 冲突时静默覆盖 concatToolCalls(schema/message.go)在 ID/Type/Name 冲突时返回错误。累加器遵循相同语义以保持一致性。
lastNilIdx 处理 nil-Index 块路由 总是将 nil-Index 映射到 accumulator[0] 模型可能发出多个 Index: nil 的工具调用。当块携带新 ID 时,必须创建新累加器而非覆盖已有的。

关键洞察

流的 Copy(n) 创建独立读取器,关闭一个副本不影响其他副本。这使得急切执行器 天然兼容重试:retryWrapper 关闭失败副本并重试,被丢弃的急切 goroutine 独立消耗直到遇到相同错误。二者之间不需要协调。

累加器的分块流式合并语义必须与 schema/message.go 中的 concatToolCalls 保持一致——两者都是工具调用合并实现,行为分歧会导致微妙的不一致。具体来说:Extra 字段采用先写入者优先,ID/Type/Name 冲突返回错误而非静默覆盖。

总结

问题 解决方案
工具等待模型流结束后才执行 急切中间件在工具 JSON 参数完整时立即分发执行
急切执行不能破坏模型重试 Copy(2) 给重试独立流;急切 goroutine 在重试时被丢弃
急切执行不能破坏取消 急切 goroutine 检测到 StreamCanceledError 后自行中止
急切执行不能破坏中断/恢复 急切执行器存储结果;ToolsNode 像往常一样处理中断/恢复
分块流中多个 nil-Index 工具调用 lastNilIdx 跟踪机制根据 ID 变化将块路由到正确的累加器
流结束后不完整 JSON 参数被分发 后循环 isArgsComplete 守卫拒绝参数不完整的累加器
runToolCallTaskByStream 对预执行任务崩溃 添加 task.executed 提前返回守卫(与已有的 runToolCallTaskByInvoke 模式一致)

mrh997 and others added 30 commits March 30, 2026 09:49
feat(agentic_model):
- format print
- support agentic chat template
- support to compose agentic odel&agentic tools node
- support agentic tool node
- support agentic message concat
#882)

feat(adk): add agentmd middleware for auto-injecting Agents.md into model input

Change-Id: I34add4f925a23c6d6821925c482a21f6cddfddd4
…gModel

- AgenticModel is now BaseModel[*schema.AgenticMessage] instead of
  ToolCallingModel[*schema.AgenticMessage]. Tools are passed at request
  time via model.WithTools option.
- Remove generic ToolCallingModel[M] type. ToolCallingChatModel is
  restored as a concrete interface since it is the sole consumer.
- Remove baseModelAsAgenticAdapter and simplify agentic code path.

Change-Id: If0ab42b8d5d3db53defce5cbeeb9b8465a815bd5

refactor(adk): remove 4 trivially redundant From/Of convenience constructors

Remove NewAgentToolFrom, InterruptFrom, StatefulInterruptFrom, and
EventFromMessageOf. These were 1-line pass-throughs with identical
signatures to their Typed* counterparts, adding to the exported API
surface without providing any ergonomic benefit.

Change-Id: Iefe90a3222dbf0a0627f47d6f5f95dc4e34c663e

feat(adk): rename Generic* prefix to Typed* for succinct yet clear naming

Rename all exported Generic* types/functions to Typed* and all unexported
generic* to typed* for a more idiomatic Go naming convention:

- TypedAgent[M], TypedRunner[M], TypedAgentEvent[M], TypedAgentInput[M]
- TypedChatModelAgent[M], TypedChatModelAgentConfig[M]
- TypedChatModelAgentMiddleware[M], TypedBaseChatModelAgentMiddleware[M]
- TypedRunnerConfig[M], TypedState[M]
- TypedSetSubAgents[M], TypedSendEvent[M], TypedInterrupt[M], etc.

The Typed* prefix is shorter (5 vs 7 chars), semantically clear ("this is
the type-parameterized version"), and works uniformly on both types and
functions including long names.

Backward-compatible aliases (Agent, Runner, etc.) remain unchanged.
Gob registration strings preserved for checkpoint compatibility.

Change-Id: I9f746c2d1702d08ff826d50321bc08647e046cba

feat(adk): generify ADK to support *schema.AgenticMessage via Go generics

- Rename exported types from Agentic* to Generic*[M MessageType] pattern
- Add Generic* versions of all core types: GenericAgent, GenericRunner, etc.
- Add cascading *From/*Of constructors for type-inferred generic usage
- Generify deterministic_transfer.go with type-switched AgenticMessage support
- Add genAgenticTransferMessages using FunctionToolCall/FunctionToolResult
- Generify SendEvent to GenericSendEvent[M] for AgenticMessage middleware
- Fix processGenericState to use runContext.GenericRootInput detection
- Fix comment/type name mismatches on Generic* types
- Delete dead code and unexport 40+ internal-only generic symbols
- Move planexecute/utils.go test-only code to utils_test.go
- Register gob types for AgenticMessage variants

Change-Id: Ibd76de2d08acbdf3be84834b324a1209ebaf1184
- Add ComponentOfAgenticAgent constant for agentic agent callbacks
- Add AgenticAgentCallbackInput/Output types with conv and copy helpers
- Wire agentic callbacks in flow.go (Run, Resume, run, typedWrapIterWithOnEnd)
- Add AgenticAgentCallbackHandler to HandlerHelper in utils/callbacks
- Add copyAgenticAgentEvent to utils.go
- Add integration tests for agentic agent callbacks (OnStart, OnEnd, RunInfo, events, workflow)
- Add unit tests for conv/copy functions and template handler tests

Change-Id: I505f36ea6d8efaa1e06b26d77e8475c04ba6b246
…trical forbid

- Implement newAgenticReact graph: Init -> ChatModel -> Branch ->
  CancelCheck -> AgenticToolsNode -> AfterToolCalls -> CancelCheck -> loop/END
- Register *schema.AgenticMessage and []*schema.AgenticMessage for gob
  encoding (required for cancel/interrupt checkpoint serialization)
- Symmetrically forbid cross-type agent tools in both directions
  (ChatModelAgent <-> TypedChatModelAgent[*AgenticMessage])
- Move agenticReactRunInput to package level for Go 1.18 compat
- Add 12 comprehensive tests for agentic ReAct loop covering:
  basic invoke, multi-turn tool calling, streaming, max iterations,
  interrupt/resume round-tripping, gob state round-trip, cancel flows,
  double interrupt/resume, no-tools agent, tool argument passing
- Skip returnDirectly test (eventSenderToolHandler is Message-typed)

Change-Id: I758381b6fa7a7af8aca0e4e4b242414ae693ee0c
- Add case *schema.AgenticMessage branch in agent_tool.go
  fullChatHistoryAsInput type switch with getAgenticReactChatHistory
- Add M MessageType generic parameter to all TurnLoop types:
  TurnLoop, TurnLoopConfig, GenInputResult, GenResumeResult,
  TurnContext, TurnLoopExitState, PushOption and related functions
- Update all TurnLoop tests to use *schema.Message as second
  type parameter
- Add TestGetAgenticReactHistory and TestTurnLoopAgenticMessage

Change-Id: I2a94a98dd1aef31233190f2c7bb59fdb4f024c6a
OnError was declared but never fired by the ADK runtime for agent
callbacks, making it dead code. Remove the field, the Needed case,
the Handle dispatch, and the corresponding tests.

Change-Id: I02841d5be80c52c2d51db140b251ac4e760fc934
- Remove NewRunnerFrom, NewChatModelAgentFrom, NewSequentialAgentFrom,
  NewParallelAgentFrom, NewLoopAgentFrom convenience constructors
- Rename TypedState to typedState (unexported) since it has no
  external-facing usage beyond the State type alias
- Update all tests to use Typed constructors directly
- Add api-changes.md documenting all API surface changes

Change-Id: I78306f00ad91d1dda8c1db202d561c28c324c8a6
- AgenticAgentCallbackInput -> AgenticCallbackInput
- AgenticAgentCallbackOutput -> AgenticCallbackOutput
- ConvAgenticAgentCallbackInput -> ConvAgenticCallbackInput
- ConvAgenticAgentCallbackOutput -> ConvAgenticCallbackOutput
- AgenticAgentCallbackHandler -> AgenticCallbackHandler
- HandlerHelper.AgenticAgent() -> HandlerHelper.Agentic()

The 'AgenticAgent' naming was redundant and awkward. The new
'Agentic*' naming is symmetric with the existing 'Agent*' pattern:
AgentCallbackHandler vs AgenticCallbackHandler.

Change-Id: I46839edb3145f3bf1942a2af874d4ce6a83655a4
Change-Id: I4fe416b149b1739a2932323f205f54ac2013a08f
- Add EagerExecution option to ToolsConfig for explicit opt-in
- Implement eagerToolExecutorMiddleware as fire-and-forget async
  process inside model wrapper chain (between eventSender and retry)
- Add ToolExecutionProvider and InvokeSingleToolCall to ToolsNode
  for delegated execution with full config honor
- Honor model retry (abort on stream errors), cancel
  (abort on StreamCanceledError), and interrupt (defer to ToolsNode)
- Support concurrent and sequential eager tool dispatch
- Add unit tests and integration tests for eager execution

Change-Id: Ia73fbe843d51846972349eea703ca90af118bdd4
…compat

- atomic.Pointer[T] requires Go 1.19+, but go.mod specifies Go 1.18
- Replace with eagerCoordHolder using sync.Mutex for Load/Store

Change-Id: I76f64b52b60e38d3f8fab59c5c3901b2cd44d57f
- Extract setupEagerToolExecutionProvider and wireEagerToolExec helpers
- Reduces newReact function length below 200-line funlen limit
- Reuse helpers in both newReact and newAgenticReact

Change-Id: I5486b669bcd0c05e0cd6897f76e3ae6133868755
@codecov
Copy link
Copy Markdown

codecov Bot commented Apr 7, 2026

Codecov Report

❌ Patch coverage is 88.32808% with 37 lines in your changes missing coverage. Please review.
✅ Project coverage is 81.81%. Comparing base (97cb63d) to head (a5b7ce5).

Files with missing lines Patch % Lines
adk/wrappers.go 7.69% 11 Missing and 1 partial ⚠️
compose/tool_node.go 80.95% 6 Missing and 6 partials ⚠️
adk/react.go 83.92% 7 Missing and 2 partials ⚠️
adk/eager_tool_exec.go 97.79% 3 Missing and 1 partial ⚠️
Additional details and impacted files
@@                 Coverage Diff                  @@
##           feat/agentic_adk     #934      +/-   ##
====================================================
+ Coverage             81.57%   81.81%   +0.23%     
====================================================
  Files                   157      158       +1     
  Lines                 20012    20292     +280     
====================================================
+ Hits                  16325    16601     +276     
- Misses                 2500     2502       +2     
- Partials               1187     1189       +2     

☔ View full report in Codecov by Sentry.
📢 Have feedback on the report? Share it here.

🚀 New features to boost your workflow:
  • ❄️ Test Analytics: Detect flaky tests, report on failures, and find test suite problems.

- Add direct unit tests for runEager: dispatch, stream error, chunked
  accumulation, sequential mode, concurrent mode, no tool calls, unknown tool
- Add InvokeSingleToolCall tests in compose/tool_node_test.go
- Add eagerCoordHolder, getOrCreateAccumulator, isAborted unit tests
- Fix runEager to handle multiple complete tool calls in a single stream
  chunk by dispatching them directly (bypass accumulator)

Change-Id: I346955066c33da837664c2f0cc1d0475c407a713
Remove failedToolCallIDs from ToolExecutionProvider signature and
collectResults. When any tool fails during eager execution, the
provider now returns an error directly, causing ToolsNode to fail
as it would for a normal tool failure. This is consistent with the
full delegation design where ToolsNode does not re-execute failed
tool calls.

Change-Id: Ib73f7e232f7d0224d666bdb19cf7c08419570161
… bugs

- Eager Executor uses StreamSingleToolCall to preserve StreamableTool
  streaming behavior; ToolExecutionProvider returns StreamReader maps
- Remove ToolExecutionProvider from ToolsNodeConfig (exported field);
  use InternalSetToolExecutionProvider/InternalSetAgenticToolExecutionProvider
- Remove InvokeSingleToolCall; replace with InternalStreamSingleToolCall
  (package-level function, not method on ToolsNode)
- Fix nil-Index collision: track lastNilIdx for chunked tool calls
  with nil Index pointers to avoid merging different tool calls
- Fix post-loop dispatch: add isArgsComplete check before dispatching
  accumulated tool calls at stream EOF
- Fix accumulator Extra field: preserve first chunk's Extra, matching
  concatToolCalls behavior in schema/message.go
- Fix accumulator conflict detection: merge() returns error on
  conflicting ID/Type/Name, matching concatToolCalls semantics
- Add design decision comments for abort behavior, non-deterministic
  error reporting, and nil-Index routing strategy

Change-Id: I561cbd762b41840e42a006ab02a57ac3000dcd3c
Change-Id: I0798d3cbbe86b5ab53c04be2c849814f07fd54ec
…ng executed guard

- Add 12 tests for adk/eager_tool_exec.go: merge conflicts, Extra preservation,
  nil-Index routing, merge abort, post-loop flush, dispatch abort
- Add 10 tests for compose/tool_node.go: ToolExecutionProvider invoke/stream,
  enhanced provider, provider error, sequential execution, mixed pre-executed
- Fix bug: runToolCallTaskByStream missing task.executed early return guard
  (matched existing pattern in runToolCallTaskByInvoke)
- Coverage: runEager 78→96%, merge 65→100%, genToolCallTasks 61→86%,
  sequentialRunToolCall 0→100%, InternalSet* 0→100%, Invoke 55→92%

Change-Id: I4ca5dd0deb45c277d4af71602375dd9f44bf4f5e
@shentongmartin shentongmartin force-pushed the feat/agentic_adk branch 5 times, most recently from b060ded to 46e35e4 Compare April 15, 2026 12:22
@shentongmartin shentongmartin marked this pull request as draft April 16, 2026 02:00
@shentongmartin shentongmartin force-pushed the feat/agentic_adk branch 3 times, most recently from ada6749 to 3d8f620 Compare April 21, 2026 02:55
Base automatically changed from feat/agentic_adk to alpha/09 April 21, 2026 06:15
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Development

Successfully merging this pull request may close these issues.

6 participants