Skip to content

Commit 3af65e2

Browse files
committed
merge(local): Phase A — subagent hang hardening (chunkTimeout + retry budget)
Merges local/timeout-and-retry into local/integration. Phase A addresses root causes #1 and #3 of subagent hangs: #1 No default chunk timeout on SSE streams #3 SessionRetry.retryable didn't match SSE/transport errors Commits (10 total, each diamond-reviewed by default + codex-5.3): 2463766 config(provider): chunkTimeout=false support 90682b8 feat(message-v2): SSEStallError tagged error + cause-chain detection ae30ab3 fix(message-v2): throw SSEStallError class at provider SSE timeout site 04c345a test(provider): failing tests for resolveChunkTimeout (RED) 65917b4 test(provider): A.1.3 review fix — extended-thinking cross tests + tighter stall 2d6ed17 feat(provider): resolveChunkTimeout + wrapSSE wiring (GREEN) 1b8f62d test(retry): failing tests for transport retries + budget cap (RED) 1ec5a36 test(retry): A.2.1 review fix — verify terminal Exit is Cause.done 2176730 feat(retry): transport error matching + retry budget cap (GREEN) facd7c1 fix(retry): use plain Cause.done + Pull.isDoneCause (review fix) Changes: - chunkTimeout config accepts number | false (parallel to timeout) - DEFAULT_CHUNK_TIMEOUT_MS=120_000 applied automatically - EXTENDED_THINKING_CHUNK_TIMEOUT_MS=600_000 for anthropic, google-vertex-anthropic, amazon-bedrock - SSEStallError class in provider.ts; NamedError schema in message-v2.ts - hasSSEStallCause walks error cause chain (depth 8) and is checked before APICallError in MessageV2.fromError - SessionRetry.retryable matches: ETIMEDOUT, ECONNRESET, ECONNREFUSED, EAI_AGAIN, socket hang up, and SSEStallError instances - TRANSPORT_RETRY_CAP=5 enforces a hard stop on transport-class non-API errors via Cause.done(meta.attempt) Tests (all live-clock, no TestClock deadlock): 54 pass, 0 fail. Typecheck clean from packages/opencode. Phase-final review verdict: spec APPROVED (codex-5.3), quality APPROVED (default/Opus). Non-blocking followups tracked: - openrouter→anthropic routing gets 120s default (widening logic or docs) - SDK types.gen.ts:1333 noisy duplicate union member - no end-to-end integration test (planned for Phase C) - TRANSPORT_RETRY_CAP not yet user-configurable - docs at packages/web/src/content/docs/config.mdx not yet updated - AbortSignal.any reason-fidelity needs Bun/Node version matrix test No push. Branch stays local.
2 parents 33b2795 + facd7c1 commit 3af65e2

8 files changed

Lines changed: 313 additions & 11 deletions

File tree

packages/opencode/src/config/provider.ts

Lines changed: 7 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -98,9 +98,14 @@ export class Info extends Schema.Class<Info>("ProviderConfig")({
9898
description:
9999
"Timeout in milliseconds for requests to this provider. Default is 300000 (5 minutes). Set to false to disable timeout.",
100100
}),
101-
chunkTimeout: Schema.optional(PositiveInt).annotate({
101+
chunkTimeout: Schema.optional(
102+
Schema.Union([PositiveInt, Schema.Literal(false)]).annotate({
103+
description:
104+
"Timeout in milliseconds between streamed SSE chunks for this provider. If no chunk arrives within this window, the request is aborted. Defaults to 120000 (120s) for most providers, 600000 (10min) for Anthropic-family providers (to accommodate extended thinking). Set to false to disable.",
105+
}),
106+
).annotate({
102107
description:
103-
"Timeout in milliseconds between streamed SSE chunks for this provider. If no chunk arrives within this window, the request is aborted.",
108+
"Timeout in milliseconds between streamed SSE chunks for this provider. If no chunk arrives within this window, the request is aborted. Defaults to 120000 (120s) for most providers, 600000 (10min) for Anthropic-family providers (to accommodate extended thinking). Set to false to disable.",
104109
}),
105110
}),
106111
[Schema.Record(Schema.String, Schema.Any)],

packages/opencode/src/provider/provider.ts

Lines changed: 33 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -31,13 +31,41 @@ import { ModelID, ProviderID } from "./schema"
3131

3232
const log = Log.create({ service: "provider" })
3333

34+
export class SSEStallError extends Error {
35+
readonly _tag = "SSEStallError"
36+
constructor(message: string) {
37+
super(message)
38+
this.name = "SSEStallError"
39+
}
40+
}
41+
3442
function shouldUseCopilotResponsesApi(modelID: string): boolean {
3543
const match = /^gpt-(\d+)/.exec(modelID)
3644
if (!match) return false
3745
return Number(match[1]) >= 5 && !modelID.startsWith("gpt-5-mini")
3846
}
3947

40-
function wrapSSE(res: Response, ms: number, ctl: AbortController) {
48+
const DEFAULT_CHUNK_TIMEOUT_MS = 120_000
49+
const EXTENDED_THINKING_CHUNK_TIMEOUT_MS = 600_000
50+
const EXTENDED_THINKING_PROVIDERS: ReadonlySet<string> = new Set([
51+
"anthropic",
52+
"google-vertex-anthropic",
53+
"amazon-bedrock",
54+
])
55+
56+
export function resolveChunkTimeout(providerID: string, value: unknown): number {
57+
if (value === false) return 0
58+
if (typeof value === "number") {
59+
if (!Number.isFinite(value) || value <= 0) return 0
60+
return value
61+
}
62+
if (value !== undefined) log.warn("unrecognized chunkTimeout value, using provider default", { providerID, value })
63+
return EXTENDED_THINKING_PROVIDERS.has(providerID)
64+
? EXTENDED_THINKING_CHUNK_TIMEOUT_MS
65+
: DEFAULT_CHUNK_TIMEOUT_MS
66+
}
67+
68+
export function wrapSSE(res: Response, ms: number, ctl: AbortController) {
4169
if (typeof ms !== "number" || ms <= 0) return res
4270
if (!res.body) return res
4371
if (!res.headers.get("content-type")?.includes("text/event-stream")) return res
@@ -47,7 +75,7 @@ function wrapSSE(res: Response, ms: number, ctl: AbortController) {
4775
async pull(ctrl) {
4876
const part = await new Promise<Awaited<ReturnType<typeof reader.read>>>((resolve, reject) => {
4977
const id = setTimeout(() => {
50-
const err = new Error("SSE read timed out")
78+
const err = new SSEStallError(`SSE read timed out after ${ms}ms`)
5179
ctl.abort(err)
5280
void reader.cancel(err)
5381
reject(err)
@@ -1429,13 +1457,13 @@ const layer: Layer.Layer<
14291457
if (existing) return existing
14301458

14311459
const customFetch = options["fetch"]
1432-
const chunkTimeout = options["chunkTimeout"]
1460+
const resolvedChunkTimeout = resolveChunkTimeout(model.providerID, options["chunkTimeout"])
14331461
delete options["chunkTimeout"]
14341462

14351463
options["fetch"] = async (input: any, init?: BunFetchRequestInit) => {
14361464
const fetchFn = customFetch ?? fetch
14371465
const opts = init ?? {}
1438-
const chunkAbortCtl = typeof chunkTimeout === "number" && chunkTimeout > 0 ? new AbortController() : undefined
1466+
const chunkAbortCtl = resolvedChunkTimeout > 0 ? new AbortController() : undefined
14391467
const signals: AbortSignal[] = []
14401468

14411469
if (opts.signal) signals.push(opts.signal)
@@ -1468,7 +1496,7 @@ const layer: Layer.Layer<
14681496
})
14691497

14701498
if (!chunkAbortCtl) return res
1471-
return wrapSSE(res, chunkTimeout, chunkAbortCtl)
1499+
return wrapSSE(res, resolvedChunkTimeout, chunkAbortCtl)
14721500
}
14731501

14741502
const bundledLoader = BUNDLED_PROVIDERS[model.api.npm]

packages/opencode/src/session/message-v2.ts

Lines changed: 25 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -56,6 +56,11 @@ export const APIError = NamedError.create(
5656
}),
5757
)
5858
export type APIError = z.infer<typeof APIError.Schema>
59+
export const SSEStallError = NamedError.create(
60+
"SSEStallError",
61+
z.object({ message: z.string() }),
62+
)
63+
export type SSEStallError = z.infer<typeof SSEStallError.Schema>
5964
export const ContextOverflowError = NamedError.create(
6065
"ContextOverflowError",
6166
z.object({ message: z.string(), responseBody: z.string().optional() }),
@@ -415,6 +420,7 @@ export const Assistant = Base.extend({
415420
StructuredOutputError.Schema,
416421
ContextOverflowError.Schema,
417422
APIError.Schema,
423+
SSEStallError.Schema,
418424
])
419425
.optional(),
420426
parentID: MessageID.zod,
@@ -938,6 +944,20 @@ export const filterCompactedEffect = Effect.fnUntraced(function* (sessionID: Ses
938944
return filterCompacted(stream(sessionID))
939945
})
940946

947+
// Legacy substring fallback for rare paths where the tagged class gets stripped
948+
// during cross-realm rethrow. Primary signal is `name`/`_tag` set by SSEStallError.
949+
const SSE_STALL_MESSAGE_RE = /SSE (read|chunk) time(d out|out)/
950+
951+
function hasSSEStallCause(e: unknown, depth = 0): boolean {
952+
if (depth > 8) return false
953+
if (!e || typeof e !== "object") return false
954+
const err = e as { name?: string; _tag?: string; message?: string; cause?: unknown }
955+
if (err.name === "SSEStallError" || err._tag === "SSEStallError") return true
956+
if (typeof err.message === "string" && SSE_STALL_MESSAGE_RE.test(err.message)) return true
957+
if (err.cause) return hasSSEStallCause(err.cause, depth + 1)
958+
return false
959+
}
960+
941961
export function fromError(
942962
e: unknown,
943963
ctx: { providerID: ProviderID; aborted?: boolean },
@@ -988,6 +1008,11 @@ export function fromError(
9881008
},
9891009
{ cause: e },
9901010
).toObject()
1011+
case hasSSEStallCause(e):
1012+
return new SSEStallError(
1013+
{ message: e instanceof Error ? e.message : String(e) },
1014+
{ cause: e instanceof Error ? e : undefined },
1015+
).toObject()
9911016
case APICallError.isInstance(e):
9921017
const parsed = ProviderError.parseAPICallError({
9931018
providerID: ctx.providerID,

packages/opencode/src/session/retry.ts

Lines changed: 18 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -13,11 +13,22 @@ export const RETRY_INITIAL_DELAY = 2000
1313
export const RETRY_BACKOFF_FACTOR = 2
1414
export const RETRY_MAX_DELAY_NO_HEADERS = 30_000 // 30 seconds
1515
export const RETRY_MAX_DELAY = 2_147_483_647 // max 32-bit signed integer for setTimeout
16+
export const TRANSPORT_RETRY_CAP = 5
17+
18+
const TRANSPORT_PATTERNS = ["ETIMEDOUT", "ECONNRESET", "ECONNREFUSED", "EAI_AGAIN", "socket hang up"]
1619

1720
function cap(ms: number) {
1821
return Math.min(ms, RETRY_MAX_DELAY)
1922
}
2023

24+
function transportMessage(error: Err) {
25+
if (MessageV2.SSEStallError.isInstance(error)) return error.data.message
26+
const msg = error.data?.message
27+
if (typeof msg !== "string") return undefined
28+
if (!TRANSPORT_PATTERNS.some((pattern) => msg.includes(pattern))) return undefined
29+
return msg
30+
}
31+
2132
export function delay(attempt: number, error?: MessageV2.APIError) {
2233
if (error) {
2334
const headers = error.data.responseHeaders
@@ -76,6 +87,9 @@ export function retryable(error: Err) {
7687
}
7788
}
7889

90+
const transport = transportMessage(error)
91+
if (transport) return transport
92+
7993
const json = iife(() => {
8094
try {
8195
if (typeof error.data?.message === "string") {
@@ -111,7 +125,11 @@ export function policy(opts: {
111125
Effect.succeed((meta: Schedule.InputMetadata<unknown>) => {
112126
const error = opts.parse(meta.input)
113127
const message = retryable(error)
128+
const transport = transportMessage(error)
114129
if (!message) return Cause.done(meta.attempt)
130+
if (transport && !MessageV2.APIError.isInstance(error) && meta.attempt > TRANSPORT_RETRY_CAP) {
131+
return Cause.done(meta.attempt)
132+
}
115133
return Effect.gen(function* () {
116134
const wait = delay(meta.attempt, MessageV2.APIError.isInstance(error) ? error : undefined)
117135
const now = yield* Clock.currentTimeMillis
Lines changed: 70 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,70 @@
1+
import { describe, expect, test } from "bun:test"
2+
import { resolveChunkTimeout, SSEStallError, wrapSSE } from "../../src/provider/provider"
3+
4+
describe("provider.resolveChunkTimeout", () => {
5+
test("returns 120s default when undefined for generic provider", () => {
6+
expect(resolveChunkTimeout("github-copilot", undefined)).toBe(120_000)
7+
})
8+
9+
test("returns 600s default for Anthropic", () => {
10+
expect(resolveChunkTimeout("anthropic", undefined)).toBe(600_000)
11+
})
12+
13+
test("returns 600s default for google-vertex-anthropic", () => {
14+
expect(resolveChunkTimeout("google-vertex-anthropic", undefined)).toBe(600_000)
15+
})
16+
17+
test("returns 600s default for amazon-bedrock", () => {
18+
expect(resolveChunkTimeout("amazon-bedrock", undefined)).toBe(600_000)
19+
})
20+
21+
test("returns 0 when explicitly disabled with false", () => {
22+
expect(resolveChunkTimeout("github-copilot", false)).toBe(0)
23+
})
24+
25+
test("returns the user value when a positive number", () => {
26+
expect(resolveChunkTimeout("github-copilot", 60_000)).toBe(60_000)
27+
})
28+
29+
test("explicit positive number wins over extended-thinking default", () => {
30+
expect(resolveChunkTimeout("anthropic", 30_000)).toBe(30_000)
31+
})
32+
33+
test("false wins over extended-thinking default (returns 0)", () => {
34+
expect(resolveChunkTimeout("anthropic", false)).toBe(0)
35+
})
36+
37+
test("falls back to provider default for non-numeric junk", () => {
38+
// Defensive branch — config schema prevents this, but runtime check guards misconfig.
39+
expect(resolveChunkTimeout("github-copilot", "not-a-number" as never)).toBe(120_000)
40+
})
41+
})
42+
43+
describe("provider.wrapSSE — SSEStallError integration", () => {
44+
test("throws SSEStallError when chunk read exceeds timeout", async () => {
45+
const stream = new ReadableStream<Uint8Array>({
46+
pull() {
47+
return new Promise<void>(() => {}) // never resolves — forces stall
48+
},
49+
})
50+
const res = new Response(stream, { headers: { "content-type": "text/event-stream" } })
51+
const ctl = new AbortController()
52+
const wrapped = wrapSSE(res, 2, ctl)
53+
const reader = wrapped.body!.getReader()
54+
55+
await expect(reader.read()).rejects.toBeInstanceOf(SSEStallError)
56+
})
57+
58+
test("does not wrap non-SSE responses", () => {
59+
const res = new Response("hello", { headers: { "content-type": "text/plain" } })
60+
const ctl = new AbortController()
61+
expect(wrapSSE(res, 50, ctl)).toBe(res)
62+
})
63+
64+
test("returns original response when ms <= 0", () => {
65+
const res = new Response(new ReadableStream(), { headers: { "content-type": "text/event-stream" } })
66+
const ctl = new AbortController()
67+
expect(wrapSSE(res, 0, ctl)).toBe(res)
68+
expect(wrapSSE(res, -1, ctl)).toBe(res)
69+
})
70+
})
Lines changed: 64 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,64 @@
1+
import { describe, expect, test } from "bun:test"
2+
import { APICallError } from "ai"
3+
import { SSEStallError } from "../../src/provider/provider"
4+
import { ProviderID } from "../../src/provider/schema"
5+
import { MessageV2 } from "../../src/session/message-v2"
6+
7+
const providerID = ProviderID.make("github-copilot")
8+
9+
describe("session.message-v2.fromError — SSEStallError", () => {
10+
test("preserves SSEStallError for plain Error with SSEStallError name", () => {
11+
const error = new Error("SSE read timed out")
12+
error.name = "SSEStallError"
13+
14+
const result = MessageV2.fromError(error, { providerID })
15+
16+
expect(result.name).toBe("SSEStallError")
17+
expect(result.data.message).toBe("SSE read timed out")
18+
})
19+
20+
test("MessageV2.SSEStallError.isInstance returns true", () => {
21+
const error = new Error("SSE read timed out")
22+
error.name = "SSEStallError"
23+
24+
const result = MessageV2.fromError(error, { providerID })
25+
26+
expect(MessageV2.SSEStallError.isInstance(result)).toBe(true)
27+
})
28+
29+
test("detects SSE stall by timeout message without SSEStallError name", () => {
30+
const error = new Error("SSE chunk timeout after 120000ms")
31+
32+
const result = MessageV2.fromError(error, { providerID })
33+
34+
expect(result.name).toBe("SSEStallError")
35+
expect(result.data.message).toBe("SSE chunk timeout after 120000ms")
36+
})
37+
38+
test("detects SSE stall through two-deep cause chain", () => {
39+
const stall = new SSEStallError("SSE read timed out")
40+
const middle = new Error("middle")
41+
middle.cause = stall
42+
const outer = new Error("outer")
43+
outer.cause = middle
44+
45+
const result = MessageV2.fromError(outer, { providerID })
46+
47+
expect(result.name).toBe("SSEStallError")
48+
})
49+
50+
test("detects SSE stall when APICallError wraps SSEStallError", () => {
51+
const stall = new SSEStallError("SSE read timed out")
52+
const apiError = new APICallError({
53+
message: "stream error",
54+
url: "https://api.githubcopilot.com/chat/completions",
55+
requestBodyValues: {},
56+
cause: stall,
57+
})
58+
59+
const result = MessageV2.fromError(apiError, { providerID })
60+
61+
expect(result.name).toBe("SSEStallError")
62+
expect(MessageV2.APIError.isInstance(result)).toBe(false)
63+
})
64+
})

0 commit comments

Comments
 (0)