这是 Claude Code 的心脏。
query.ts(67KB, 1730行) 实现了从用户输入到 AI 响应的完整闭环。 本章将深入每一个工程巧思,让你理解为什么这样设计。
- 理解 query loop 的状态机设计(为什么用
while(true)+ State 对象) - 理解 StreamingToolExecutor 的并发控制模型(sibling abort、progress wake-up)
- 理解多层压缩策略(snip → microcompact → context collapse → autocompact)
- 理解错误恢复的分层设计(escalate → multi-turn → reactive compact)
- 理解 prompt cache 优化的全链路设计
- 理解 token budget 的 diminishing returns 检测
- 新增 通过端到端请求追踪建立整体认知
- 新增 理解设计决策背后的方案对比(为什么不用其他方案)
- 新增 提炼可迁移到自己 Agent 系统中的设计模式
先建立直觉:想象你在和 AI 对话——你说一句,AI 回一句,如果 AI 需要执行某个操作(比如读文件、跑命令),它就去执行,然后把结果告诉你,再继续回答。这个"说→做→说→做"的过程就是 query loop 的一次次迭代。
而 State 对象就是这个循环的记事本——它记录了当前对话进行到哪了、之前出过什么错、尝试过哪些恢复手段。每次循环开始时,代码从这个记事本里读取状态;每次循环结束时,把新状态写回去。
query.ts 用 while(true) 无限循环(而非递归调用)来实现这个过程。下面是 State 的完整定义,字段按功能分为三组:
type State = {
// ===== 第一组:核心对话状态 =====
messages: Message[] // The conversation history (all messages so far)
toolUseContext: ToolUseContext // Shared context for tool execution (file cache, permissions, etc.)
turnCount: number // How many loop iterations have run
// ===== 第二组:压缩与恢复追踪 =====
// These fields track error recovery attempts to prevent infinite retry loops
autoCompactTracking: AutoCompactTrackingState | undefined // Tracks auto-compression state (undefined = not started)
maxOutputTokensRecoveryCount: number // How many times we've retried after output truncation
hasAttemptedReactiveCompact: boolean // Whether we've already tried emergency compression
maxOutputTokensOverride: number | undefined // Upgraded output limit (undefined = use default)
pendingToolUseSummary: Promise<ToolUseSummaryMessage | null> | undefined // Async summary being generated in background
// ===== 第三组:流程控制 =====
stopHookActive: boolean | undefined // Whether stop hooks are currently running
transition: Continue | undefined // WHY the last iteration continued (undefined on first iteration)
// The transition field is not just for debugging — it's a guard against recovery loops (see below)
}给非 TS 读者的提示:
X | undefined表示"这个值可能不存在",类似其他语言的null/None。Promise<X>表示"一个正在后台运行的异步操作,最终会产出 X 类型的结果"。
巧思1:transition 字段 — 防止恢复死循环
State 中最巧妙的是 transition 字段。它记录了"上一次迭代为什么选择了 continue"。这不只是调试用的,它还用于防止恢复循环——如果上次已经尝试过某种恢复手段但失败了,这次就不再重复尝试:
// 如果上一次已经做了 collapse_drain_retry,这次还是 413,
// 就不再尝试 drain,而是 fall through 到 reactive compact
if (state.transition?.reason !== 'collapse_drain_retry') {
const drained = contextCollapse.recoverFromOverflow(...)
// ...
}所有可能的 transition 原因:
| transition.reason | 含义 | 触发条件 |
|---|---|---|
next_turn |
正常的下一轮 | 工具执行完毕 |
max_output_tokens_recovery |
输出被截断恢复 | stop_reason = max_output_tokens |
max_output_tokens_escalate |
升级输出限制 | 从 8k 升级到 64k |
reactive_compact_retry |
响应式压缩后重试 | prompt_too_long 错误 |
collapse_drain_retry |
上下文折叠后重试 | prompt_too_long 错误 |
stop_hook_blocking |
Stop Hook 阻止停止 | Hook 返回 blocking error |
token_budget_continuation |
Token 预算未用完 | 还有预算继续 |
早期版本可能是递归的(注释中还有 query_recursive_call checkpoint),但改成了循环:
递归的问题:
1. 每次递归创建新的闭包 → 内存泄漏(长对话可能有 100+ 轮)
2. 递归深度受限于调用栈
3. 状态传递需要大量参数
4. 无法在中间状态做 GC
while(true) + State 的优势:
1. 单一闭包,内存恒定
2. 无栈深度限制
3. 所有状态集中在 State 对象中
4. continue 站点可以精确控制哪些状态重置
巧思2:continue 站点的状态重置策略
每个 continue 站点(即循环中决定"继续下一轮"的地方)都精确控制哪些状态重置、哪些保留。这就像医生在不同情况下开不同的处方——正常复诊时清零所有临时记录,但如果上次治疗出了问题,就要保留那次的记录以避免重蹈覆辙:
// max_output_tokens 恢复:保留 hasAttemptedReactiveCompact
const next: State = {
messages: [...messagesForQuery, ...assistantMessages, recoveryMessage],
maxOutputTokensRecoveryCount: maxOutputTokensRecoveryCount + 1, // 递增
hasAttemptedReactiveCompact, // ← 保留!不重置
// ...
}
// 正常下一轮:重置恢复计数器
const next: State = {
messages: [...messagesForQuery, ...assistantMessages, ...toolResults],
maxOutputTokensRecoveryCount: 0, // ← 重置
hasAttemptedReactiveCompact: false, // ← 重置
// ...
}
// stop_hook_blocking:保留 hasAttemptedReactiveCompact
// 注释解释了为什么:
// "Preserve the reactive compact guard — if compact already ran and
// couldn't recover from prompt-too-long, retrying after a stop-hook
// blocking error will produce the same result. Resetting to false
// here caused an infinite loop: compact → still too long → error →
// stop hook blocking → compact → … burning thousands of API calls."这个注释揭示了一个真实的生产 bug:重置 hasAttemptedReactiveCompact 导致了无限循环,
烧掉了数千次 API 调用。
┌─────────────────────────────────────────────────────────────────────┐
│ query() 主循环 │
├─────────────────────────────────────────────────────────────────────┤
│ │
│ ┌──────────────────────────────────────────────────────────────┐ │
│ │ Phase 0: 预处理管线 (每次迭代) │ │
│ │ ├── applyToolResultBudget() → 大结果持久化到磁盘 │ │
│ │ ├── snipCompact() → 裁剪旧的工具结果 │ │
│ │ ├── microcompact() → 压缩中间工具调用 │ │
│ │ ├── contextCollapse() → 折叠已完成的子任务 │ │
│ │ └── autocompact() → 全量压缩(如果需要) │ │
│ └──────────────────────────────────────────────────────────────┘ │
│ │ │
│ ▼ │
│ ┌──────────────────────────────────────────────────────────────┐ │
│ │ Phase 1: API 调用 │ │
│ │ ├── prependUserContext() → 注入 CLAUDE.md 等 │ │
│ │ ├── callModel() 流式返回 → 逐块接收响应 │ │
│ │ ├── StreamingToolExecutor → 边接收边执行工具 │ │
│ │ └── 错误处理 → fallback model / withhold │ │
│ └──────────────────────────────────────────────────────────────┘ │
│ │ │
│ ┌─────────┴─────────┐ │
│ │ │ │
│ 有 tool_use? 无 tool_use │
│ │ │ │
│ ▼ ▼ │
│ ┌─────────────────────┐ ┌──────────────────────────────────┐ │
│ │ Phase 2: 工具执行 │ │ Phase 3: 停止判断 │ │
│ │ ├── 收集剩余结果 │ │ ├── 413 恢复 (collapse/compact) │ │
│ │ ├── 附件注入 │ │ ├── max_output_tokens 恢复 │ │
│ │ ├── 记忆预取消费 │ │ ├── Stop Hooks 执行 │ │
│ │ ├── 技能发现消费 │ │ ├── Token Budget 检查 │ │
│ │ └── 刷新工具列表 │ │ └── return Terminal │ │
│ └────────┬────────────┘ └──────────────────────────────────┘ │
│ │ │
│ ▼ │
│ ┌─────────────────────┐ │
│ │ Phase 4: 继续 │ │
│ │ state = next │ ──── continue ────→ 回到 Phase 0 │
│ └─────────────────────┘ │
│ │
└─────────────────────────────────────────────────────────────────────┘
理解 query loop 最好的方式不是分模块看,而是跟着一个真实请求走完全程。
假设用户输入:帮我修复 src/auth.ts 中的空指针 bug
时间线 发生了什么 数据变化
─────────────────────────────────────────────────────────────────────────────────────
T+0ms 用户按回车
│
├── processUserInput() messages = [
│ 将用户输入包装为 Message { role: 'user',
│ type: 'user', content: [{ type: 'text', content: '帮我修复...' }
│ text: '帮我修复 src/auth.ts 中的空指针 bug' }] ]
│
├── query() 入口
│ ├── buildQueryConfig() → 快照不可变配置
│ ├── startRelevantMemoryPrefetch() → 后台预取记忆
│ └── 初始化 State 对象 (turnCount=0)
│
▼ ═══ 进入 while(true) 循环 ═══
T+1ms Phase 0: 预处理管线
│
├── applyToolResultBudget() → 无操作(第一轮没有工具结果)
├── snipCompact() → 无操作(消息太少)
├── microcompact() → 无操作
├── contextCollapse() → 无操作
└── autocompact() → 无操作
T+2ms Phase 1: 组装 API 请求
│
├── prependUserContext() API payload = {
│ 注入 CLAUDE.md 内容到 system prompt system: [系统提示 + CLAUDE.md],
│ 注入 git status 等上下文 tools: [Bash, Read, Edit, ...],
│ messages: [用户消息],
├── callModel() model: 'claude-sonnet-4-...',
│ 发送 HTTP POST 到 Anthropic API max_tokens: 8192
│ 开始接收 SSE 流 }
│
▼
T+500ms Phase 1: 流式接收(持续 3-8 秒)
│
│ ┌─ SSE chunk 1: thinking block (如果启用 extended thinking)
│ ├─ SSE chunk 2: text "让我先看看这个文件..."
│ ├─ SSE chunk 3: tool_use 开始 { name: 'Read', id: 'tu_001' }
│ │ └── StreamingToolExecutor.addTool('Read', 'tu_001')
│ │ canExecuteTool(true) → 立即开始执行!
│ │ Read('src/auth.ts') 在后台运行
│ │
│ ├─ SSE chunk 4: tool_use input 完成 { path: 'src/auth.ts' }
│ ├─ SSE chunk 5: tool_use 开始 { name: 'Grep', id: 'tu_002' }
│ │ └── StreamingToolExecutor.addTool('Grep', 'tu_002')
│ │ canExecuteTool(true) → Read 是 ConcurrencySafe,并行!
│ │ Grep('null.*pointer', 'src/') 在后台运行
│ │
│ └─ SSE chunk N: message_stop, stop_reason: 'tool_use'
│
▼
T+4000ms Phase 2: 收集工具结果
│
├── getRemainingResults() messages = [
│ Read 已完成 → yield tool_result 用户消息,
│ Grep 已完成 → yield tool_result assistant(text + 2个tool_use),
│ user(2个tool_result)
├── 消费 memoryPrefetch(如果已完成) ]
├── 消费 skillDiscovery(如果有)
└── needsFollowUp = true(有 tool_use)
Phase 4: 继续
state = { messages: [..., tool_results], turnCount: 1,
transition: { reason: 'next_turn' } }
continue → 回到 Phase 0
▼ ═══ 第二次迭代 ═══
T+4001ms Phase 0: 预处理管线
├── snipCompact() → 可能裁剪第一轮的 Read 结果(如果文件很大)
└── 其他 → 无操作
T+4002ms Phase 1: 第二次 API 调用
│ API payload = {
│ messages 现在包含完整的对话历史 messages: [
│ 前缀与上次相同 → prompt cache 命中! 用户消息, ← 缓存命中
│ 只需要为新增的 tool_result 付费 assistant(...), ← 缓存命中
│ user(tool_results) ← 新增
│ Claude 看到文件内容,生成修复方案 ]
│ 返回 tool_use: Edit('src/auth.ts', ...) }
│
▼
T+8000ms Phase 2: 执行 Edit
│
├── Edit 不是 ConcurrencySafe → 独占执行
├── 权限检查 → 弹出确认对话框(如果需要)
├── 用户确认 → 执行编辑
└── tool_result: { success: true }
Phase 4: 继续 → 第三次迭代
▼ ═══ 第三次迭代 ═══
T+9000ms Phase 1: 第三次 API 调用
│
│ Claude 看到 Edit 成功
│ 返回 text: "已修复空指针 bug,添加了 null check..."
│ stop_reason: 'end_turn'(没有 tool_use)
│
▼
T+12000ms Phase 3: 停止判断
│
├── 没有 413 错误 → 跳过
├── stop_reason != max_output_tokens → 跳过
├── handleStopHooks()
│ ├── 执行用户定义的 stop hooks(如 lint 检查)
│ ├── hooks 通过 → 继续停止流程
│ └── 后台: extractMemories(), promptSuggestion()
├── checkTokenBudget() → budget 为 null → stop
└── return { reason: 'completed' }
═══ query() 返回 ═══
总计: 3 次 API 调用,~12 秒,消耗 ~5000 input tokens + ~2000 output tokens
其中 prompt cache 节省了第 2、3 次调用的 ~80% input token 费用
关键观察:
- 第一次迭代的 Read 和 Grep 在 Claude 还在输出时就开始执行了(StreamingToolExecutor 的价值)
- 第二次 API 调用的前缀与第一次完全相同 → prompt cache 命中(只付增量 token 的钱)
- Edit 工具独占执行(不与其他工具并行),因为它修改文件
- 整个过程是 3 次
while(true)迭代,不是 3 次递归调用
先建立直觉:传统做法是等 AI 说完所有话,再去执行它要求的操作。这就像老板写完一整页待办事项交给你,你才开始逐条执行。而流式执行就像老板边说边写,你看到第一条就立刻去做,不等他写完——这样总耗时大幅缩短。
传统方式:等 Claude 返回完整响应 → 解析所有 tool_use → 执行工具
传统方式 (串行):
Claude 响应 [████████████████] 5s
解析 tool_use 0.1s
执行 Tool A [████] 2s
执行 Tool B [██████] 3s
执行 Tool C [███] 1.5s
总计: 5 + 0.1 + 2 + 3 + 1.5 = 11.6s
流式方式 (StreamingToolExecutor):
Claude 响应 [████████████████] 5s
Tool A [████] ← 在 Claude 还在输出时就开始了!
Tool B [██████] ← Tool A 完成前就开始了!
Tool C [███] ← 并行执行!
总计: 5 + max(2,3,1.5) = 8s 节省 31%
先建立直觉:当 AI 同时要求执行多个操作时,哪些可以并行、哪些必须排队?Claude Code 的答案很简单——读操作可以并行,写操作必须独占。这就像图书馆:多人可以同时看书(读),但只能一个人在书上做标注(写)。
在深入实现之前,先理解为什么选择这个方案:
┌─────────────────────────────────────────────────────────────────────────┐
│ 方案对比:Agent 工具编排的三种范式 │
├─────────────────────────────────────────────────────────────────────────┤
│ │
│ 方案 A: Actor 模型 (Erlang/Elixir 风格) │
│ 每个工具一个 Actor,消息传递通信 │
│ 优点: 天然隔离,故障不传播 │
│ 缺点: JS 是单线程 → Actor 退化为 Promise │
│ progress 消息需要额外的 mailbox 机制 │
│ sibling abort 需要 supervisor 树 → 过度设计 │
│ 结论: 在 JS 中 Actor 模型没有真正的并发优势,反而增加复杂度 │
│ │
│ 方案 B: DAG 调度 (LangGraph 风格) │
│ 预先定义工具之间的依赖关系,拓扑排序执行 │
│ 优点: 依赖关系显式,可以做最优调度 │
│ 缺点: Claude 在运行时决定调用哪些工具 → 无法预先建 DAG │
│ 工具之间的依赖是隐式的(Bash A 的输出可能影响 Bash B) │
│ 需要等 Claude 完整响应才能建图 → 失去流式执行的优势 │
│ 结论: DAG 适合预定义工作流,不适合 LLM 动态决策的场景 │
│ │
│ 方案 C: 读写锁 + 流式添加 (Claude Code 的选择) ✓ │
│ ConcurrencySafe = 读锁,非安全 = 写锁 │
│ 优点: 简单直觉(读可并行,写必独占) │
│ 支持流式添加(不需要等完整响应) │
│ sibling abort 自然融入(共享 AbortController) │
│ 缺点: 不能表达细粒度依赖(如 "Read A 必须在 Edit B 之前") │
│ 结论: 在 LLM Agent 场景下,简单 > 精确 │
│ │
│ 核心洞察: LLM 决定工具调用顺序,而 LLM 通常会按逻辑顺序输出 │
│ (先 Read 再 Edit),所以简单的读写锁就够了。 │
│ 如果 LLM 输出了错误的顺序,Bash 错误的 sibling abort 会兜底。 │
└─────────────────────────────────────────────────────────────────────────┘
StreamingToolExecutor 使用了一个精巧的并发控制模型。核心逻辑只有一个判断:新工具能否与正在执行的工具并行? 规则是:如果大家都是"并发安全"的(即只读操作),就可以并行;否则必须等待:
// 核心判断:这个工具能否与其他工具并行执行?
private canExecuteTool(isConcurrencySafe: boolean): boolean {
const executingTools = this.tools.filter(t => t.status === 'executing')
return (
executingTools.length === 0 || // 没有正在执行的工具
(isConcurrencySafe && executingTools.every(t => t.isConcurrencySafe))
// 或者:自己是安全的 AND 所有正在执行的也是安全的
)
}这实现了一个读写锁语义:
┌─────────────────────────────────────────────────────────┐
│ 并发安全矩阵 │
├─────────────────────────────────────────────────────────┤
│ │
│ 正在执行 \ 新工具 ConcurrencySafe NOT Safe │
│ ───────────────────────────────────────────── │
│ 无 立即执行 立即执行 │
│ ConcurrencySafe 并行执行 等待 │
│ NOT Safe 等待 等待 │
│ │
│ 类比: │
│ ConcurrencySafe = 读锁 (多个读可以并行) │
│ NOT Safe = 写锁 (写必须独占) │
│ │
└─────────────────────────────────────────────────────────┘
先建立直觉:假设 AI 同时让你执行三个 shell 命令:安装依赖、编译代码、运行测试。如果"安装依赖"失败了,后面的"编译"和"测试"显然也没意义了。Sibling Abort 就是这个逻辑——一个 Bash 命令失败时,自动取消同批次的其他 Bash 命令。但注意:只有 Bash 命令之间才有这种级联关系,读文件失败不会影响其他操作。
这是一个非常精妙的设计。当多个 Bash 命令并行执行时,如果一个失败了, 其他的通常也没有意义了(因为 Bash 命令经常有隐式依赖链):
// 创建一个子 AbortController,不会影响父级
private siblingAbortController: AbortController
constructor(toolDefinitions, canUseTool, toolUseContext) {
// siblingAbortController 是 toolUseContext.abortController 的子控制器
// 取消它不会取消父级 → query loop 不会结束这个 turn
this.siblingAbortController = createChildAbortController(
toolUseContext.abortController,
)
}
// 在工具执行中:
if (isErrorResult) {
// 只有 Bash 错误才取消兄弟工具!
// Read/WebFetch 等是独立的 — 一个失败不应该影响其他
if (tool.block.name === BASH_TOOL_NAME) {
this.hasErrored = true
this.erroredToolDescription = this.getToolDescription(tool)
this.siblingAbortController.abort('sibling_error')
}
}巧思3:三层 AbortController 层级
toolUseContext.abortController (用户按 Ctrl+C)
└── siblingAbortController (Bash 错误级联)
├── toolAbortController[0] (Tool A 的独立控制器)
├── toolAbortController[1] (Tool B 的独立控制器)
└── toolAbortController[2] (Tool C 的独立控制器)
用户 Ctrl+C → 所有工具停止,query loop 结束
Bash A 失败 → B、C 停止,但 query loop 继续(收集错误结果)
权限拒绝 → 该工具停止,且冒泡到 query loop(结束 turn)
注意 toolAbortController 的 abort 事件监听器中的冒泡逻辑:
toolAbortController.signal.addEventListener('abort', () => {
// 如果不是 sibling_error,且父级没有 abort,且没有 discard
// → 冒泡到父级(例如权限拒绝需要结束整个 turn)
if (
toolAbortController.signal.reason !== 'sibling_error' &&
!this.toolUseContext.abortController.signal.aborted &&
!this.discarded
) {
this.toolUseContext.abortController.abort(
toolAbortController.signal.reason,
)
}
}, { once: true })工具执行过程中会产生 progress 消息(如 Bash 的实时输出)。 这些消息需要立即传递给 UI,而不是等工具完成:
// Progress 消息存储在单独的队列中
type TrackedTool = {
// ...
results?: Message[] // 最终结果(等工具完成后才 yield)
pendingProgress: Message[] // Progress 消息(立即 yield)
}
// 当有新的 progress 消息时,唤醒等待中的 getRemainingResults
if (update.message.type === 'progress') {
tool.pendingProgress.push(update.message)
// 唤醒!
if (this.progressAvailableResolve) {
this.progressAvailableResolve()
this.progressAvailableResolve = undefined
}
}
// getRemainingResults 中的等待逻辑:
async *getRemainingResults() {
while (this.hasUnfinishedTools()) {
// 先 yield 已完成的结果和 progress
for (const result of this.getCompletedResults()) {
yield result
}
// 如果还有执行中的工具,等待任一完成 OR progress 到达
if (this.hasExecutingTools() && !this.hasCompletedResults()) {
const progressPromise = new Promise<void>(resolve => {
this.progressAvailableResolve = resolve // 注册唤醒回调
})
await Promise.race([...executingPromises, progressPromise])
}
}
}巧思4:Promise.race 实现非阻塞等待
这是一个经典的"多信号等待"模式:
- 任何工具完成 → 唤醒 → yield 结果
- 任何 progress 到达 → 唤醒 → yield progress
- 两者都没有 → 继续等待
当 API 在流式传输过程中触发 fallback(切换到备用模型)时, 已经开始执行的工具需要被丢弃:
if (streamingFallbackOccured) {
// 1. 为已 yield 的 assistant 消息发送 tombstone(从 UI 和 transcript 中移除)
for (const msg of assistantMessages) {
yield { type: 'tombstone', message: msg }
}
// 2. 清空所有状态
assistantMessages.length = 0
toolResults.length = 0
toolUseBlocks.length = 0
needsFollowUp = false
// 3. 丢弃旧的 executor,创建新的
if (streamingToolExecutor) {
streamingToolExecutor.discard() // 标记为 discarded
streamingToolExecutor = new StreamingToolExecutor(...) // 全新的
}
}为什么需要 tombstone?
因为 partial 消息(特别是 thinking blocks)有无效的签名, 如果保留在消息历史中,会导致 API 返回 "thinking blocks cannot be modified" 错误。
先建立直觉:AI 对话越长,消耗的 token 越多,成本越高,速度越慢。当对话历史太长时,就需要"压缩"——把旧的、不太重要的内容精简掉。Claude Code 设计了 5 层压缩策略,就像整理房间:先扔掉明显的垃圾(零成本),再整理抽屉(低成本),最后才请搬家公司来大扫除(高成本)。
Claude Code 有 5 层上下文压缩策略,按执行顺序排列:
┌─────────────────────────────────────────────────────────────┐
│ 压缩管线 (每次迭代) │
├─────────────────────────────────────────────────────────────┤
│ │
│ 1. applyToolResultBudget() │
│ └── 超大的 tool_result → 持久化到磁盘,替换为引用 │
│ └── 在 microcompact 之前运行(MC 按 tool_use_id 操作) │
│ │
│ 2. snipCompact() [feature: HISTORY_SNIP] │
│ └── 裁剪旧的工具结果(保留结构,删除内容) │
│ └── 返回 tokensFreed 给 autocompact 参考 │
│ │
│ 3. microcompact() │
│ └── 压缩中间的工具调用(保留首尾,压缩中间) │
│ └── 支持 cached microcompact(利用 API 缓存删除) │
│ │
│ 4. contextCollapse() [feature: CONTEXT_COLLAPSE] │
│ └── 折叠已完成的子任务为摘要 │
│ └── 在 autocompact 之前运行 → 如果折叠够了就不需要全量 │
│ └── 是"读时投影"而非"写时修改" │
│ │
│ 5. autocompact() │
│ └── 全量压缩:调用 Claude 生成摘要 │
│ └── 最后手段,成本最高 │
│ │
└─────────────────────────────────────────────────────────────┘
巧思5:执行顺序的精心设计
toolResultBudget → snip → microcompact → collapse → autocompact
便宜 便宜 中等 中等 昂贵
本地 本地 本地/API 本地 API调用
便宜的先做,如果够了就不需要做昂贵的。特别是 contextCollapse 在 autocompact 之前:
// 注释原文:
// Runs BEFORE autocompact so that if collapse gets us under the
// autocompact threshold, autocompact is a no-op and we keep granular
// context instead of a single summary.折叠保留了更细粒度的上下文(每个子任务的摘要),而全量压缩会把所有东西压成一个大摘要。
先建立直觉:Context Collapse 不会修改原始对话记录——它更像是给一本厚书加了一个"精华摘要目录"。原书还在,但每次翻阅时先看目录,只有需要细节时才翻到原文。这样既节省了 token,又不会丢失任何原始信息。
这是一个特别优雅的设计模式:
// 注释原文:
// Nothing is yielded — the collapsed view is a read-time projection
// over the REPL's full history. Summary messages live in the collapse
// store, not the REPL array. This is what makes collapses persist
// across turns: projectView() replays the commit log on every entry.REPL 消息数组 (完整历史,从不修改):
[msg1, msg2, msg3, msg4, msg5, msg6, msg7, msg8]
Collapse Store (摘要存储):
commit1: { archived: [msg1, msg2, msg3], summary: "..." }
commit2: { archived: [msg4, msg5], summary: "..." }
projectView() 的输出 (每次迭代重新计算):
[summary1, summary2, msg6, msg7, msg8]
优势:
- REPL 数组从不被修改 → 不会丢失原始数据
- 摘要可以随时重新生成
- 类似 Git 的 commit log → 可以"回放"折叠历史
先建立直觉:AI 对话中会遇到两类常见错误——"AI 说到一半被截断了"和"对话太长 API 拒绝了"。Claude Code 对每种错误都设计了三阶段恢复策略:先尝试最简单的修复,不行再升级,最后才放弃。就像汽车抛锚:先试试重新打火(免费),不行就叫拖车(花钱),实在不行才叫救护车(昂贵)。
当 Claude 的输出被截断时(说到一半被强制停止),恢复策略分三个阶段:
阶段1: Escalate (升级限制)
条件: 使用了默认的 8k 限制 && 没有用户覆盖
动作: 重试同一请求,但 max_output_tokens = 64k
特点: 无额外消息,无多轮开销
阶段2: Multi-turn Recovery (多轮恢复)
条件: 升级后仍然被截断 || 已有用户覆盖
动作: 注入恢复消息,让 Claude 继续
消息: "Output token limit hit. Resume directly — no apology,
no recap. Pick up mid-thought if that is where the cut
happened. Break remaining work into smaller pieces."
最多: 3 次
阶段3: 放弃
条件: 3 次恢复都失败
动作: 显示被截断的消息
巧思6:恢复消息的措辞
注意恢复消息的精心措辞:
- "no apology" — 防止 Claude 浪费 token 道歉
- "no recap" — 防止 Claude 重复已说过的内容
- "Pick up mid-thought" — 允许从句子中间继续
- "Break remaining work into smaller pieces" — 引导 Claude 分块输出
阶段1: Context Collapse Drain
条件: 有待提交的折叠 && 上次不是 collapse_drain_retry
动作: 提交所有待处理的折叠
成本: 零(纯本地操作)
阶段2: Reactive Compact
条件: drain 不够 || 没有 collapse
动作: 调用 Claude 生成摘要
成本: 一次 API 调用
特点: 只尝试一次(hasAttemptedReactiveCompact 守卫)
阶段3: 放弃
条件: reactive compact 也不够
动作: 显示错误消息
关键: 不进入 Stop Hooks!
原因: "hooks have nothing meaningful to evaluate. Running stop hooks
on prompt-too-long creates a death spiral: error → hook blocking
→ retry → error → …"
先建立直觉:当出现可恢复的错误时,Claude Code 不会立刻把错误展示给用户(或 SDK 消费者),而是先"扣住"这个错误,在后台尝试恢复。只有恢复失败了,才把错误暴露出来。这就像快递丢件——快递公司不会第一时间告诉你"丢了",而是先内部查找、补发,实在找不到才通知你。
这是一个关键的设计模式:在流式循环中,可恢复的错误被"扣留"而不是立即 yield:
let withheld = false
if (contextCollapse?.isWithheldPromptTooLong(message, ...)) withheld = true
if (reactiveCompact?.isWithheldPromptTooLong(message)) withheld = true
if (reactiveCompact?.isWithheldMediaSizeError(message)) withheld = true
if (isWithheldMaxOutputTokens(message)) withheld = true
if (!withheld) yield yieldMessage // 只 yield 非扣留的消息为什么要 withhold?
如果立即 yield 错误消息,SDK 消费者(如 Desktop App)会看到错误并终止会话。 但恢复循环还在运行 — 没人在听了。Withhold 让恢复有机会成功, 只有恢复失败时才显示错误。
巧思7:hoisted gate 防止 withhold/recover 不一致
// 在流式循环之前就确定 mediaRecoveryEnabled
const mediaRecoveryEnabled =
reactiveCompact?.isReactiveCompactEnabled() ?? false
// 为什么?因为 CACHED_MAY_BE_STALE 可能在 5-30s 的流式传输中翻转
// 如果 withhold 时是 true,recover 时变成 false → 消息丢失!先建立直觉:每次调用 AI API 都要发送完整的对话历史,这很贵。但如果两次调用的前半部分完全相同(比如系统提示词 + 前几轮对话),API 可以缓存这部分,只对新增内容收费。Claude Code 的很多设计都是为了最大化这个缓存命中率——确保每次请求的前缀尽可能与上次相同。
Anthropic API 的 prompt cache key 由以下部分组成:
cache_key = hash(
system_prompt, ← 必须完全相同
tools, ← 必须完全相同(包括顺序)
model, ← 必须完全相同
messages[0..N-1], ← 前缀必须完全相同(字节级)
thinking_config, ← 必须完全相同
)
手段1:工具列表排序稳定
// tools.ts 中对工具排序,确保每次请求的工具定义前缀不变
// 即使 MCP 工具动态加载,排序也是确定性的手段2:不修改已发送的消息
// backfillObservableInput 只在 yield 的克隆上操作
// 原始 message 保持不变 → 下次请求的前缀字节相同
let yieldMessage: typeof message = message
if (addedFields) {
clonedContent ??= [...message.message.content]
clonedContent[i] = { ...block, input: inputCopy }
yieldMessage = { ...message, message: { ...message.message, content: clonedContent } }
}
// 注释:"The original `message` is left untouched for assistantMessages.push
// below — it flows back to the API and mutating it would break prompt caching
// (byte mismatch)."手段3:Fork 子 Agent 共享父级缓存
// CacheSafeParams — 必须与父级完全相同的参数
export type CacheSafeParams = {
systemPrompt: SystemPrompt
userContext: { [k: string]: string }
systemContext: { [k: string]: string }
toolUseContext: ToolUseContext
forkContextMessages: Message[] // 父级的消息历史
}
// Fork 子 Agent 使用父级的消息作为前缀
// → 前缀完全相同 → 缓存命中
// → Fork 很便宜(只付增量 token 的钱)手段4:contentReplacementState 克隆
// 注释原文:
// Clone by default (not fresh): cache-sharing forks process parent
// messages containing parent tool_use_ids. A fresh state would see
// them as unseen and make divergent replacement decisions → wire
// prefix differs → cache miss.手段5:dumpPromptsFetch 单例
// 每次 query session 只创建一个 fetch wrapper
// 避免每次迭代创建新闭包 → 只保留最新的请求体 (~700KB)
// 而不是所有请求体 (~500MB for long sessions)
const dumpPromptsFetch = config.gates.isAnt
? createDumpPromptsFetch(toolUseContext.agentId ?? config.sessionId)
: undefined先建立直觉:有时候你希望 AI 在一次对话中做更多事情(比如"帮我重构整个模块"),而不是每做一步就停下来等你确认。Token Budget 就是给 AI 一个"预算"——在预算内它可以持续工作,用完了才停。但如果 AI 开始"摸鱼"(每轮只产出很少内容),系统会提前叫停,避免浪费。
Token Budget 是一个让 Claude 在单次 turn 中使用更多 token 的机制:
export function checkTokenBudget(
tracker: BudgetTracker,
agentId: string | undefined,
budget: number | null,
globalTurnTokens: number,
): TokenBudgetDecision {
// 子 Agent 不参与 budget(只有主线程)
if (agentId || budget === null || budget <= 0) {
return { action: 'stop', completionEvent: null }
}
const pct = Math.round((turnTokens / budget) * 100)
const deltaSinceLastCheck = globalTurnTokens - tracker.lastGlobalTurnTokens
// 巧思8:Diminishing Returns 检测
const isDiminishing =
tracker.continuationCount >= 3 && // 至少继续了 3 次
deltaSinceLastCheck < DIMINISHING_THRESHOLD && // 这次增量 < 500 tokens
tracker.lastDeltaTokens < DIMINISHING_THRESHOLD // 上次增量也 < 500
// 如果没有 diminishing 且未达到 90% → 继续
if (!isDiminishing && turnTokens < budget * COMPLETION_THRESHOLD) {
return { action: 'continue', nudgeMessage: '...' }
}
// 否则停止
return { action: 'stop', completionEvent: { diminishingReturns: isDiminishing } }
}为什么需要 Diminishing Returns 检测?
场景:用户设置了 500k token 预算
没有 DR 检测:
Turn 1: 50k tokens (有用的工作)
Turn 2: 30k tokens (有用的工作)
Turn 3: 20k tokens (有用的工作)
Turn 4: 200 tokens (Claude 说 "I think we're done")
Turn 5: 150 tokens (Claude 说 "Is there anything else?")
Turn 6: 100 tokens (Claude 说 "Let me know if you need more")
... 继续浪费 token 直到 500k ...
有 DR 检测:
Turn 1-3: 同上
Turn 4: 200 tokens → delta < 500 → 记录
Turn 5: 150 tokens → 连续两次 delta < 500 → 停止!
节省了大量无意义的 token
消息队列是进程全局的单例,但每个 Agent 只消费属于自己的消息:
// 主线程只消费 agentId === undefined 的消息
// 子 Agent 只消费自己 agentId 的 task-notification
const queuedCommandsSnapshot = getCommandsByMaxPriority(
sleepRan ? 'later' : 'next',
).filter(cmd => {
if (isSlashCommand(cmd)) return false // slash 命令不在这里处理
if (isMainThread) return cmd.agentId === undefined
// 子 Agent 只消费 task-notification,不消费用户 prompt
return cmd.mode === 'task-notification' && cmd.agentId === currentAgentId
})巧思9:Sleep 工具触发更深的队列消费
const sleepRan = toolUseBlocks.some(b => b.name === SLEEP_TOOL_NAME)
// sleepRan ? 'later' : 'next'当 Claude 主动调用 Sleep 工具时,它在等待某些异步任务完成。 此时消费 'later' 优先级的消息(包括其他 Agent 的完成通知), 而正常情况下只消费 'next' 优先级的消息。
// 在 query() 入口处启动预取(只触发一次)
using pendingMemoryPrefetch = startRelevantMemoryPrefetch(
state.messages, state.toolUseContext,
)
// 在每次迭代的工具执行后尝试消费
if (
pendingMemoryPrefetch &&
pendingMemoryPrefetch.settledAt !== null && // 已完成
pendingMemoryPrefetch.consumedOnIteration === -1 // 还没消费过
) {
const memoryAttachments = filterDuplicateMemoryAttachments(
await pendingMemoryPrefetch.promise,
toolUseContext.readFileState, // 过滤已读过的文件
)
// ...
pendingMemoryPrefetch.consumedOnIteration = turnCount - 1
}设计要点:
using关键字确保在所有退出路径上清理(dispose)- 预取在 model 流式传输期间并行运行
- 如果预取还没完成,跳过(零等待),下次迭代再试
readFileState过滤确保不会注入 Claude 已经读过的文件
Stop Hooks 不只是简单的"检查是否停止",它是一个完整的可编程系统:
Claude 说 "我完成了"
│
▼
handleStopHooks()
│
├── 保存 CacheSafeParams (供 /btw 和 side_question 使用)
│
├── 执行 Stop Hooks (用户定义的 shell 命令)
│ ├── blocking error → 注入消息,继续循环
│ ├── preventContinuation → 强制停止
│ └── 通过 → 继续检查
│
├── 如果是 Teammate:
│ ├── 执行 TaskCompleted Hooks
│ └── 执行 TeammateIdle Hooks
│
├── 后台任务 (fire-and-forget):
│ ├── executePromptSuggestion() → 生成后续问题建议
│ ├── executeExtractMemories() → 提取记忆
│ └── executeAutoDream() → 自动梦境(反思)
│
└── 清理:
└── cleanupComputerUseAfterTurn() → 释放 CU 锁
巧思10:API 错误时跳过 Stop Hooks
// 如果最后一条消息是 API 错误,跳过 Stop Hooks
if (lastMessage?.isApiErrorMessage) {
void executeStopFailureHooks(lastMessage, toolUseContext)
return { reason: 'completed' }
}为什么?因为 Stop Hooks 评估的是 Claude 的响应。如果 Claude 没有产生有效响应 (API 错误),Hooks 评估它会创建死亡螺旋: error → hook blocking → retry → error → …
Claude Code 有两种工具编排模式,通过 feature gate 切换:
- 边接收 API 响应边执行工具
- 使用 addTool() 逐个添加
- 自动管理并发
- 支持 progress wake-up
- 支持 sibling abort
- 等 API 响应完成后批量执行
- 使用 partitionToolCalls() 分批
// partitionToolCalls 的分批策略:
// 连续的 ConcurrencySafe 工具合并为一批并行执行
// 非安全工具单独一批串行执行
// 输入: [Glob, Grep, Read, Edit, Glob, Read]
// 分批: [[Glob, Grep, Read], [Edit], [Glob, Read]]
// ↑ 并行执行 ↑ 串行 ↑ 并行执行
function partitionToolCalls(toolUseMessages, toolUseContext): Batch[] {
return toolUseMessages.reduce((acc, toolUse) => {
const isConcurrencySafe = /* ... */
// 如果当前工具安全 AND 上一批也是安全的 → 合并
if (isConcurrencySafe && acc[acc.length - 1]?.isConcurrencySafe) {
acc[acc.length - 1].blocks.push(toolUse)
} else {
acc.push({ isConcurrencySafe, blocks: [toolUse] })
}
return acc
}, [])
}并发限制:
function getMaxToolUseConcurrency(): number {
return parseInt(process.env.CLAUDE_CODE_MAX_TOOL_USE_CONCURRENCY || '', 10) || 10
}默认最多 10 个工具并行,可通过环境变量覆盖。
// 在 query() 入口处快照一次,整个循环期间不变
const config = buildQueryConfig()
export type QueryConfig = {
sessionId: SessionId
gates: {
streamingToolExecution: boolean // 是否启用流式工具执行
emitToolUseSummaries: boolean // 是否生成工具使用摘要
isAnt: boolean // 是否是 Anthropic 内部用户
fastModeEnabled: boolean // 是否启用快速模式
}
}巧思11:为什么要快照?
注释解释了:
// Immutable values snapshotted once at query() entry. Separating these from
// the per-iteration State struct and the mutable ToolUseContext makes future
// step() extraction tractable — a pure reducer can take (state, event, config)
// where config is plain data.这是为未来的架构重构做准备:
- State = 可变状态
- Config = 不可变配置
- Event = 输入事件
- → 纯函数 reducer:
(state, event, config) → newState
这是 Redux/Elm 架构的影子,说明团队在考虑将 query loop 重构为更可测试的纯函数。
工具使用摘要是给移动端 UI 看的(移动端不显示完整的工具调用):
// 在工具执行完成后,异步生成摘要(不阻塞下一次 API 调用)
nextPendingToolUseSummary = generateToolUseSummary({
tools: toolInfoForSummary,
signal: toolUseContext.abortController.signal,
isNonInteractiveSession: ...,
lastAssistantText,
}).then(summary => summary ? createToolUseSummaryMessage(summary, toolUseIds) : null)
.catch(() => null) // 失败静默
// 在下一次迭代开始时消费(Haiku ~1s,model streaming 5-30s)
if (pendingToolUseSummary) {
const summary = await pendingToolUseSummary
if (summary) yield summary
}设计要点:
- 使用 Haiku 模型(快速、便宜)
- 异步执行,不阻塞主循环
- 只在主线程生成(子 Agent 不需要)
- 在下一次迭代消费(此时 Haiku 早已完成)
| # | 巧思 | 解决的问题 |
|---|---|---|
| 1 | transition 字段 | 防止恢复循环,让测试可断言 |
| 2 | continue 站点精确重置 | 防止无限循环(真实 bug) |
| 3 | 三层 AbortController | Bash 级联取消 vs 用户中断 vs 权限拒绝 |
| 4 | Promise.race 多信号等待 | Progress 实时传递 |
| 5 | 5 层压缩管线 | 便宜的先做,贵的后做 |
| 6 | 恢复消息措辞 | 防止 Claude 浪费 token |
| 7 | hoisted gate | 防止 withhold/recover 不一致 |
| 8 | Diminishing Returns | 防止浪费 token 在无意义的继续上 |
| 9 | Sleep 触发深队列消费 | 让等待中的 Agent 能收到通知 |
| 10 | API 错误跳过 Hooks | 防止死亡螺旋 |
| 11 | Config 快照 | 为纯函数 reducer 重构做准备 |
以上 11 个巧思是 Claude Code 特有的,但它们背后有通用的设计模式, 可以直接迁移到任何 Agent 系统中:
在 Claude Code 中:withhold 模式 — 扣留错误消息,尝试恢复,恢复失败再暴露。
通用形式:
收到错误 → 不立即传播 → 尝试恢复策略 A → 失败 → 尝试策略 B → 失败 → 传播错误
适用场景:
- API 网关的重试机制(502 不立即返回给客户端,先重试其他后端)
- 数据库连接池(连接断开不立即报错,先尝试重连)
- 分布式系统的 leader 选举(leader 失联不立即告知客户端,先选新 leader)
关键约束:withhold 和 recover 必须使用同一个 gate 变量(hoisted gate), 否则在 withhold 和 recover 之间条件翻转会导致消息丢失。
在 Claude Code 中:5 层压缩管线 — 便宜的先做,贵的后做。
通用形式:
问题发生 → 尝试零成本方案 → 不够 → 尝试低成本方案 → 不够 → 尝试高成本方案
适用场景:
- CDN 缓存策略(内存缓存 → 磁盘缓存 → 回源)
- 搜索引擎降级(精确匹配 → 模糊匹配 → 语义搜索)
- 负载均衡(本地实例 → 同区域 → 跨区域)
关键约束:每层的执行顺序必须固定,且前一层的结果要能被后一层感知 (如 snipCompact 返回 tokensFreed 给 autocompact 参考)。
在 Claude Code 中:while(true) + transition 字段 — 记录为什么继续,防止恢复循环。
通用形式:
while (true) {
state = process(state)
state.transition = { reason, timestamp }
if (shouldStop(state)) break
if (isRecoveryLoop(state.transition, previousTransition)) break // 防止无限循环
}
适用场景:
- 工作流引擎(记录每步为什么跳转,防止循环审批)
- 游戏 AI 状态机(记录状态转换原因,防止行为抖动)
- 编译器优化 pass(记录每个 pass 的效果,防止优化循环)
关键约束:transition 记录不只是调试用的 — 它是防止无限循环的守卫条件。
在 Claude Code 中:ConcurrencySafe = 读锁,非安全 = 写锁。
通用形式:
操作分为两类:
- 只读操作(可并行):多个只读操作可以同时执行
- 写操作(独占):写操作执行时,其他所有操作等待
适用场景:
- 数据库的 MVCC(读不阻塞读,写阻塞一切)
- 文件系统的 flock(LOCK_SH vs LOCK_EX)
- Kubernetes 的 admission webhook(多个 mutating webhook 串行,validating 并行)
关键约束:在 LLM Agent 场景下,工具的读写属性是静态声明的(工具定义时确定), 不是运行时推断的。这比数据库简单得多,但也意味着不能表达细粒度依赖。
在 Claude Code 中:buildQueryConfig() 在入口处快照一次,整个循环不变。
通用形式:
const config = snapshot(mutableGlobalConfig) // 入口处快照
for (const event of events) {
state = reducer(state, event, config) // config 是不可变的
}
适用场景:
- React/Redux 的 store 设计(action → reducer → new state)
- 游戏引擎的帧更新(每帧开始时快照输入状态)
- 微服务的配置热更新(请求开始时快照配置,请求内不变)
关键约束:这是为可测试性服务的 — 纯函数 (state, event, config) → newState
比闭包捕获全局变量容易测试得多。
在 Claude Code 中:三层 AbortController — 用户中断取消一切,Bash 错误只取消兄弟,权限拒绝冒泡到 turn。
通用形式:
Root AbortController (最高级别的取消)
└── Group AbortController (组级别的取消)
├── Task A AbortController
├── Task B AbortController
└── Task C AbortController
取消传播规则:
- Root 取消 → 所有 Group 和 Task 取消
- Group 取消 → 该组的所有 Task 取消,Root 不受影响
- Task 取消 → 根据原因决定是否冒泡到 Group
适用场景:
- 微服务的请求取消传播(gateway timeout → 取消所有下游调用)
- CI/CD 的 job 取消(取消 pipeline → 取消所有 stage → 取消所有 step)
- 浏览器的 fetch 取消(页面导航 → 取消所有进行中的请求)
关键约束:取消原因(abort reason)必须携带语义信息,
让子级能根据原因决定是否冒泡。不是所有取消都应该传播到父级。
第5章将深入 多 Agent 系统——理解 AgentTool 如何创建和管理子 Agent。