diff --git a/CHANGELOG.md b/CHANGELOG.md index 4dc5b52..dcc5be1 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,5 +1,14 @@ # Changelog +## [Unreleased] + +### Features +* allow configuring request and stream idle timeouts via `ANTHROPIC_REQUEST_TIMEOUT_MS` and `ANTHROPIC_STREAM_IDLE_TIMEOUT_MS` + +### Bug Fixes +* surface generic API rate limits (429) and server overloads (503, 529) immediately to enable upstream fallback +* bound hung outbound authentication fetches with a 30s default timeout +* abort stalled successful SSE streams after a 15s idle period to prevent infinite "thinking" loops ## [1.4.9](https://github.com/griffinmartin/opencode-claude-auth/compare/v1.4.8...v1.4.9) (2026-04-08) diff --git a/README.md b/README.md index 54ebbcf..83393ec 100644 --- a/README.md +++ b/README.md @@ -107,6 +107,7 @@ If only one account is found, the switcher is hidden and the plugin uses it dire | Keychain access denied | Grant access when macOS prompts you | | Keychain read timed out | Restart Keychain Access (can happen on macOS Tahoe) | | "Credentials are unavailable or expired" | Run `claude` to refresh your Claude Code credentials | +| API rate or usage limits (429/503/529) | These errors are surfaced immediately so OpenCode can fall back to other providers. Check your Anthropic usage. | | "Extra usage is required for long context requests" | Your conversation exceeded 200k tokens. See [Long context (1M)](#long-context-1m) below | ### Diagnostic logging @@ -179,13 +180,15 @@ This reads your stored credentials, calls Anthropic's OAuth token endpoint, and All configurable parameters can be overridden via environment variables. If Anthropic changes something before we publish an update, set an env var and keep working: -| Variable | Description | Default | -| ----------------------------- | -------------------------------------------------------------------------- | ------------------------------------------------------------------------------------------------------- | -| `ANTHROPIC_CLI_VERSION` | Claude CLI version for user-agent and billing headers | `2.1.80` | -| `ANTHROPIC_USER_AGENT` | Full User-Agent string (overrides CLI version) | `claude-cli/{version} (external, cli)` | -| `ANTHROPIC_BETA_FLAGS` | Comma-separated beta feature flags | `claude-code-20250219,oauth-2025-04-20,interleaved-thinking-2025-05-14,prompt-caching-scope-2026-01-05` | -| `ANTHROPIC_ENABLE_1M_CONTEXT` | Enable 1M token context window for 4.6+ models (requires Max subscription) | `false` | -| `CLAUDE_AUTH_DEBUG` | Enable diagnostic logging (`1` for default path, or a custom file path) | disabled | +| Variable | Description | Default | +| ---------------------------------- | -------------------------------------------------------------------------- | ------------------------------------------------------------------------------------------------------- | +| `ANTHROPIC_CLI_VERSION` | Claude CLI version for user-agent and billing headers | `2.1.80` | +| `ANTHROPIC_USER_AGENT` | Full User-Agent string (overrides CLI version) | `claude-cli/{version} (external, cli)` | +| `ANTHROPIC_BETA_FLAGS` | Comma-separated beta feature flags | `claude-code-20250219,oauth-2025-04-20,interleaved-thinking-2025-05-14,prompt-caching-scope-2026-01-05` | +| `ANTHROPIC_ENABLE_1M_CONTEXT` | Enable 1M token context window for 4.6+ models (requires Max subscription) | `false` | +| `ANTHROPIC_REQUEST_TIMEOUT_MS` | Timeout for outbound authentication and metadata fetches | `30000` (30s) | +| `ANTHROPIC_STREAM_IDLE_TIMEOUT_MS` | Timeout for stalled response streams (SSE) | `15000` (15s) | +| `CLAUDE_AUTH_DEBUG` | Enable diagnostic logging (`1` for default path, or a custom file path) | disabled | Example: @@ -206,7 +209,12 @@ export ANTHROPIC_ENABLE_1M_CONTEXT=true # requires Claude Max - Provides an account switcher via `opencode auth login` when multiple accounts are found; persists selection to `~/.local/share/opencode/claude-account-source.txt` - Syncs credentials to `auth.json` on startup and every 5 minutes as a fallback (sync never triggers refresh; refresh is lazy, only on API requests) - On Windows, writes to both `%USERPROFILE%\.local\share\opencode\auth.json` and `%LOCALAPPDATA%\opencode\auth.json` -- Retries API requests on 429 (rate limit) and 529 (overloaded) with exponential backoff, respecting `retry-after` headers +- Surfaces API rate limits (429) and server overloads (503, 529) immediately to OpenCode to allow for quick fallback to other configured providers +- Bounded internal retries are limited to: + - One `401 Unauthorized` attempt after an in-place token refresh + - Long-context beta exclusion for canonical `context_too_long` errors (up to 3 attempts) +- Aborts hung outbound requests within `ANTHROPIC_REQUEST_TIMEOUT_MS` (default 30s) +- Terminates stalled successful response streams within `ANTHROPIC_STREAM_IDLE_TIMEOUT_MS` (default 15s) - When a token is within 60 seconds of expiry, refreshes directly via `POST https://claude.ai/v1/oauth/token` (no LLM tokens consumed). Falls back to `claude` CLI if the direct refresh fails. New tokens are written back to Keychain (macOS) or credentials file (Linux/Windows) to keep stored credentials in sync with rotated refresh tokens - If credentials aren't OAuth-based, the auth loader returns `{}` and falls through to API key auth - If credentials are unavailable or unreadable, the plugin disables itself and OpenCode continues without Claude auth diff --git a/src/credentials.ts b/src/credentials.ts index 31f8305..337c43a 100644 --- a/src/credentials.ts +++ b/src/credentials.ts @@ -328,13 +328,16 @@ export function getCredentialsForSync(): ClaudeCredentials | null { return null } -export function getCachedCredentials(): ClaudeCredentials | null { +export function getCachedCredentials( + forceRefresh = false, +): ClaudeCredentials | null { const account = getActiveAccount() if (!account) return null const now = Date.now() const cached = accountCacheMap.get(account.source) if ( + !forceRefresh && cached && now - cached.cachedAt < CREDENTIAL_CACHE_TTL_MS && cached.creds.expiresAt > now + 60_000 @@ -346,9 +349,25 @@ export function getCachedCredentials(): ClaudeCredentials | null { return cached.creds } + if (forceRefresh) { + log("cache_bypass", { + source: account.source, + reason: "forced_reload", + }) + accountCacheMap.delete(account.source) + const reloaded = refreshAccount(account.source) + if (reloaded) { + account.credentials = reloaded + } + } + log("cache_miss", { source: account.source, - reason: cached ? "stale or expiring" : "empty", + reason: forceRefresh + ? "forced_reload" + : cached + ? "stale or expiring" + : "empty", }) const fresh = refreshIfNeeded(account) diff --git a/src/index.test.ts b/src/index.test.ts index 84756d2..a14725b 100644 --- a/src/index.test.ts +++ b/src/index.test.ts @@ -146,7 +146,10 @@ async function loadHelpersWithCountingKeychain( initialExpiresAt: number, ): Promise<{ helpersModule: typeof import("./index.ts") - keychainModule: { __getReadCount: () => number } + keychainModule: { + __getReadCount: () => number + __setCredentials: (overrides: Partial) => void + } }> { const tempDir = await mkdtemp(join(tmpdir(), "opencode-claude-auth-cache-")) const tempKeychain = join(tempDir, "keychain.ts") @@ -155,20 +158,24 @@ async function loadHelpersWithCountingKeychain( await writeFile( tempKeychain, `let readCount = 0 -let credentials = { +let storedCredentials = { accessToken: "token", refreshToken: "refresh", expiresAt: ${initialExpiresAt} } +function cloneCredentials() { + return { ...storedCredentials } +} + export function readAllClaudeAccounts() { readCount += 1 - return [{ label: "Account 1", source: "Claude Code-credentials", credentials }] + return [{ label: "Account 1", source: "Claude Code-credentials", credentials: cloneCredentials() }] } export function refreshAccount(source) { readCount += 1 - return credentials + return cloneCredentials() } export function writeBackCredentials() { return true } @@ -180,6 +187,10 @@ export function buildAccountLabels(creds) { export function __getReadCount() { return readCount } + +export function __setCredentials(overrides) { + Object.assign(storedCredentials, overrides) +} `, "utf8", ) @@ -191,10 +202,53 @@ export function __getReadCount() { return { helpersModule, - keychainModule: keychainModule as { __getReadCount: () => number }, + keychainModule: keychainModule as { + __getReadCount: () => number + __setCredentials: (overrides: Partial) => void + }, + } +} + +async function loadAuthLoader(initialExpiresAt: number): Promise<{ + helpersModule: typeof import("./index.ts") + keychainModule: { + __getReadCount: () => number + __setCredentials: (overrides: Partial) => void + } + authFetch: (input: RequestInfo | URL, init?: RequestInit) => Promise +}> { + const { helpersModule, keychainModule } = + await loadHelpersWithCountingKeychain(initialExpiresAt) + const plugin = await helpersModule.default({} as never) + const typedPlugin = plugin as { auth?: { loader?: TestAuthLoader } } + assert.equal(typeof typedPlugin.auth?.loader, "function") + + const authConfig = await typedPlugin.auth!.loader!( + async () => ({ + type: "oauth", + refresh: "refresh", + access: "access", + expires: Date.now() + 60_000, + }), + { models: {} }, + ) + + return { + helpersModule, + keychainModule, + authFetch: authConfig.fetch, } } +async function loadAuthLoaderFetch( + initialExpiresAt: number, +): Promise< + (input: RequestInfo | URL, init?: RequestInit) => Promise +> { + const { authFetch } = await loadAuthLoader(initialExpiresAt) + return authFetch +} + function makeCreds(overrides?: Partial): ClaudeCredentials { return { accessToken: "sk-ant-test-access", @@ -528,6 +582,23 @@ export function buildAccountLabels(creds) { return creds.map((_, i) => \`Account ) }) + it("getRequestTimeoutMs falls back to default for invalid env values", () => { + assert.equal(helpers.DEFAULT_REQUEST_TIMEOUT_MS, 30_000) + assert.equal( + helpers.getRequestTimeoutMs({ ANTHROPIC_REQUEST_TIMEOUT_MS: "25" }), + 25, + ) + + for (const invalidValue of ["0", "-1", "1.5", "abc", "10ms"]) { + assert.equal( + helpers.getRequestTimeoutMs({ + ANTHROPIC_REQUEST_TIMEOUT_MS: invalidValue, + }), + helpers.DEFAULT_REQUEST_TIMEOUT_MS, + ) + } + }) + it("system transform does not inject when system already contains prefix", async () => { const originalSetInterval = globalThis.setInterval const originalHome = process.env.HOME @@ -689,6 +760,427 @@ export function buildAccountLabels(creds) { return creds.map((_, i) => \`Account } } }) + + it("auth loader aborts hung outbound fetch within request timeout", async () => { + const originalNow = Date.now + const originalSetInterval = globalThis.setInterval + const originalHome = process.env.HOME + const originalFetch = globalThis.fetch + const originalRequestTimeout = process.env.ANTHROPIC_REQUEST_TIMEOUT_MS + const tempHome = await mkdtemp(join(tmpdir(), "opencode-claude-auth-home-")) + process.env.HOME = tempHome + process.env.ANTHROPIC_REQUEST_TIMEOUT_MS = "10" + Date.now = () => 1_700_000_000_000 + globalThis.setInterval = (() => ({ + unref() {}, + })) as unknown as typeof setInterval + + let callCount = 0 + + try { + const authFetch = await loadAuthLoaderFetch(Date.now() + 10 * 60_000) + globalThis.fetch = ((_input: RequestInfo | URL, init?: RequestInit) => { + callCount += 1 + + return new Promise((_resolve, reject) => { + const signal = init?.signal + if (!signal) { + reject( + new Error("Expected auth loader to forward a timeout signal"), + ) + return + } + + if (signal.aborted) { + reject(signal.reason) + return + } + + signal.addEventListener("abort", () => reject(signal.reason), { + once: true, + }) + }) + }) as typeof fetch + + await assert.rejects( + authFetch("https://api.anthropic.com/v1/messages", { + method: "POST", + body: JSON.stringify({ model: "claude-haiku-4-5", messages: [] }), + }), + (error: unknown) => { + assert.equal((error as Error).name, "TimeoutError") + assert.match((error as Error).message, /10ms/) + return true + }, + ) + assert.equal(callCount, 1) + } finally { + Date.now = originalNow + globalThis.setInterval = originalSetInterval + globalThis.fetch = originalFetch + if (typeof originalHome === "string") { + process.env.HOME = originalHome + } else { + delete process.env.HOME + } + if (typeof originalRequestTimeout === "string") { + process.env.ANTHROPIC_REQUEST_TIMEOUT_MS = originalRequestTimeout + } else { + delete process.env.ANTHROPIC_REQUEST_TIMEOUT_MS + } + } + }) + + it("auth loader preserves caller abort signal over request timeout", async () => { + const originalNow = Date.now + const originalSetInterval = globalThis.setInterval + const originalHome = process.env.HOME + const originalFetch = globalThis.fetch + const originalRequestTimeout = process.env.ANTHROPIC_REQUEST_TIMEOUT_MS + const tempHome = await mkdtemp(join(tmpdir(), "opencode-claude-auth-home-")) + process.env.HOME = tempHome + process.env.ANTHROPIC_REQUEST_TIMEOUT_MS = "50" + Date.now = () => 1_700_000_000_000 + globalThis.setInterval = (() => ({ + unref() {}, + })) as unknown as typeof setInterval + + let callCount = 0 + const callerAbort = new Error("caller aborted") + callerAbort.name = "AbortError" + const callerController = new AbortController() + + try { + const authFetch = await loadAuthLoaderFetch(Date.now() + 10 * 60_000) + globalThis.fetch = ((_input: RequestInfo | URL, init?: RequestInit) => { + callCount += 1 + + return new Promise((_resolve, reject) => { + const signal = init?.signal + if (!signal) { + reject( + new Error("Expected auth loader to forward a combined signal"), + ) + return + } + + if (signal.aborted) { + reject(signal.reason) + return + } + + signal.addEventListener("abort", () => reject(signal.reason), { + once: true, + }) + }) + }) as typeof fetch + + const pendingResponse = authFetch( + "https://api.anthropic.com/v1/messages", + { + method: "POST", + body: JSON.stringify({ model: "claude-haiku-4-5", messages: [] }), + signal: callerController.signal, + }, + ) + + setTimeout(() => callerController.abort(callerAbort), 0) + + await assert.rejects(pendingResponse, (error: unknown) => { + assert.strictEqual(error, callerAbort) + return true + }) + assert.equal(callCount, 1) + } finally { + Date.now = originalNow + globalThis.setInterval = originalSetInterval + globalThis.fetch = originalFetch + if (typeof originalHome === "string") { + process.env.HOME = originalHome + } else { + delete process.env.HOME + } + if (typeof originalRequestTimeout === "string") { + process.env.ANTHROPIC_REQUEST_TIMEOUT_MS = originalRequestTimeout + } else { + delete process.env.ANTHROPIC_REQUEST_TIMEOUT_MS + } + } + }) + + it("auth loader retries once on 401 before surfacing terminal failure", async () => { + const originalNow = Date.now + const originalSetInterval = globalThis.setInterval + const originalHome = process.env.HOME + const originalFetch = globalThis.fetch + const tempHome = await mkdtemp(join(tmpdir(), "opencode-claude-auth-home-")) + process.env.HOME = tempHome + Date.now = () => 1_700_000_000_000 + globalThis.setInterval = (() => ({ + unref() {}, + })) as unknown as typeof setInterval + + let callCount = 0 + const seenAuthorizationHeaders: string[] = [] + const responseBody = JSON.stringify({ error: { message: "unauthorized" } }) + + try { + const { authFetch, keychainModule } = await loadAuthLoader( + Date.now() + 10 * 60_000, + ) + globalThis.fetch = (async ( + _input: RequestInfo | URL, + init?: RequestInit, + ) => { + callCount += 1 + seenAuthorizationHeaders.push( + new Headers(init?.headers).get("authorization") ?? "", + ) + + if (callCount === 1) { + keychainModule.__setCredentials({ accessToken: "token-refreshed" }) + } + + return new Response(responseBody, { + status: 401, + headers: { "content-type": "application/json" }, + }) + }) as typeof fetch + + const response = await authFetch( + "https://api.anthropic.com/v1/messages", + { + method: "POST", + body: JSON.stringify({ model: "claude-haiku-4-5", messages: [] }), + }, + ) + + assert.equal(response.status, 401) + assert.equal(await response.text(), responseBody) + assert.equal(callCount, 2) + assert.deepEqual(seenAuthorizationHeaders, [ + "Bearer token", + "Bearer token-refreshed", + ]) + } finally { + Date.now = originalNow + globalThis.setInterval = originalSetInterval + globalThis.fetch = originalFetch + if (typeof originalHome === "string") { + process.env.HOME = originalHome + } else { + delete process.env.HOME + } + } + }) + + it("auth loader surfaces generic 429 immediately for upstream fallback", async () => { + const originalNow = Date.now + const originalSetInterval = globalThis.setInterval + const originalHome = process.env.HOME + const originalFetch = globalThis.fetch + const tempHome = await mkdtemp(join(tmpdir(), "opencode-claude-auth-home-")) + process.env.HOME = tempHome + Date.now = () => 1_700_000_000_000 + globalThis.setInterval = (() => ({ + unref() {}, + })) as unknown as typeof setInterval + + let callCount = 0 + const responseBody = JSON.stringify({ error: { message: "rate limited" } }) + + try { + const { authFetch, helpersModule } = await loadAuthLoader( + Date.now() + 10 * 60_000, + ) + helpersModule.resetExcludedBetas() + globalThis.fetch = (async () => { + callCount += 1 + return new Response(responseBody, { + status: 429, + headers: { "content-type": "application/json" }, + }) + }) as typeof fetch + + const response = await authFetch( + "https://api.anthropic.com/v1/messages", + { + method: "POST", + body: JSON.stringify({ model: "claude-haiku-4-5", messages: [] }), + }, + ) + + assert.equal(response.status, 429) + assert.equal(await response.text(), responseBody) + assert.equal(callCount, 1) + assert.equal(helpersModule.getExcludedBetas("claude-haiku-4-5").size, 0) + } finally { + Date.now = originalNow + globalThis.setInterval = originalSetInterval + globalThis.fetch = originalFetch + if (typeof originalHome === "string") { + process.env.HOME = originalHome + } else { + delete process.env.HOME + } + } + }) + + it("auth loader retries long-context errors only while exclusions remain", async () => { + const originalNow = Date.now + const originalSetInterval = globalThis.setInterval + const originalHome = process.env.HOME + const originalFetch = globalThis.fetch + const tempHome = await mkdtemp(join(tmpdir(), "opencode-claude-auth-home-")) + process.env.HOME = tempHome + Date.now = () => 1_700_000_000_000 + globalThis.setInterval = (() => ({ + unref() {}, + })) as unknown as typeof setInterval + + let callCount = 0 + const responseBody = JSON.stringify({ + error: { message: "Extra usage is required for long context requests" }, + }) + const modelId = "claude-sonnet-4-6" + + try { + const { authFetch, helpersModule } = await loadAuthLoader( + Date.now() + 10 * 60_000, + ) + helpersModule.resetExcludedBetas() + globalThis.fetch = (async () => { + callCount += 1 + return new Response(responseBody, { + status: 429, + headers: { "content-type": "application/json" }, + }) + }) as typeof fetch + + const response = await authFetch( + "https://api.anthropic.com/v1/messages", + { + method: "POST", + body: JSON.stringify({ model: modelId, messages: [] }), + }, + ) + + assert.equal(response.status, 429) + assert.equal(await response.text(), responseBody) + assert.equal(callCount, helpersModule.LONG_CONTEXT_BETAS.length + 1) + assert.equal( + helpersModule.getExcludedBetas(modelId).size, + helpersModule.LONG_CONTEXT_BETAS.length, + ) + } finally { + Date.now = originalNow + globalThis.setInterval = originalSetInterval + globalThis.fetch = originalFetch + if (typeof originalHome === "string") { + process.env.HOME = originalHome + } else { + delete process.env.HOME + } + } + }) + + it("auth loader surfaces 503 immediately for upstream fallback", async () => { + const originalNow = Date.now + const originalSetInterval = globalThis.setInterval + const originalHome = process.env.HOME + const originalFetch = globalThis.fetch + const tempHome = await mkdtemp(join(tmpdir(), "opencode-claude-auth-home-")) + process.env.HOME = tempHome + Date.now = () => 1_700_000_000_000 + globalThis.setInterval = (() => ({ + unref() {}, + })) as unknown as typeof setInterval + + let callCount = 0 + const responseBody = JSON.stringify({ + error: { message: "service unavailable" }, + }) + + try { + const authFetch = await loadAuthLoaderFetch(Date.now() + 10 * 60_000) + globalThis.fetch = (async () => { + callCount += 1 + return new Response(responseBody, { + status: 503, + headers: { "content-type": "application/json" }, + }) + }) as typeof fetch + + const response = await authFetch( + "https://api.anthropic.com/v1/messages", + { + method: "POST", + body: JSON.stringify({ model: "claude-haiku-4-5", messages: [] }), + }, + ) + + assert.equal(response.status, 503) + assert.equal(await response.text(), responseBody) + assert.equal(callCount, 1) + } finally { + Date.now = originalNow + globalThis.setInterval = originalSetInterval + globalThis.fetch = originalFetch + if (typeof originalHome === "string") { + process.env.HOME = originalHome + } else { + delete process.env.HOME + } + } + }) + + it("auth loader surfaces 529 immediately for upstream fallback", async () => { + const originalNow = Date.now + const originalSetInterval = globalThis.setInterval + const originalHome = process.env.HOME + const originalFetch = globalThis.fetch + const tempHome = await mkdtemp(join(tmpdir(), "opencode-claude-auth-home-")) + process.env.HOME = tempHome + Date.now = () => 1_700_000_000_000 + globalThis.setInterval = (() => ({ + unref() {}, + })) as unknown as typeof setInterval + + let callCount = 0 + const responseBody = JSON.stringify({ error: { message: "overloaded" } }) + + try { + const authFetch = await loadAuthLoaderFetch(Date.now() + 10 * 60_000) + globalThis.fetch = (async () => { + callCount += 1 + return new Response(responseBody, { + status: 529, + headers: { "content-type": "application/json" }, + }) + }) as typeof fetch + + const response = await authFetch( + "https://api.anthropic.com/v1/messages", + { + method: "POST", + body: JSON.stringify({ model: "claude-haiku-4-5", messages: [] }), + }, + ) + + assert.equal(response.status, 529) + assert.equal(await response.text(), responseBody) + assert.equal(callCount, 1) + } finally { + Date.now = originalNow + globalThis.setInterval = originalSetInterval + globalThis.fetch = originalFetch + if (typeof originalHome === "string") { + process.env.HOME = originalHome + } else { + delete process.env.HOME + } + } + }) }) describe("auth hook — account resolution", () => { diff --git a/src/index.ts b/src/index.ts index 3edda7f..51856c6 100644 --- a/src/index.ts +++ b/src/index.ts @@ -72,6 +72,114 @@ const sessionId = crypto.randomUUID() type FetchFn = typeof fetch +export const DEFAULT_REQUEST_TIMEOUT_MS = 30_000 + +function createAbortError( + message: string, + name: "AbortError" | "TimeoutError", +): Error { + const error = new Error(message) + error.name = name + return error +} + +export function getRequestTimeoutMs( + env: Record = process.env, +): number { + const timeoutValue = env.ANTHROPIC_REQUEST_TIMEOUT_MS + if (!timeoutValue || !/^\d+$/.test(timeoutValue)) { + return DEFAULT_REQUEST_TIMEOUT_MS + } + + const parsedTimeout = Number.parseInt(timeoutValue, 10) + return parsedTimeout > 0 ? parsedTimeout : DEFAULT_REQUEST_TIMEOUT_MS +} + +function buildRequestTimeoutSignal( + signal: AbortSignal | null | undefined, + timeoutMs: number, +): { signal: AbortSignal; cleanup: () => void } { + const timeoutController = new AbortController() + const timeoutId = setTimeout(() => { + timeoutController.abort( + createAbortError( + `Anthropic request timed out after ${timeoutMs}ms`, + "TimeoutError", + ), + ) + }, timeoutMs) as ReturnType & { + unref?: () => void + } + const clearTimeoutSignal = () => clearTimeout(timeoutId) + + if (!signal) { + return { + signal: timeoutController.signal, + cleanup: clearTimeoutSignal, + } + } + + const combinedController = new AbortController() + const abortFromTimeout = () => { + cleanup() + combinedController.abort( + timeoutController.signal.reason ?? + createAbortError( + `Anthropic request timed out after ${timeoutMs}ms`, + "TimeoutError", + ), + ) + } + const abortFromCaller = () => { + cleanup() + combinedController.abort( + signal.reason ?? + createAbortError("Anthropic request was aborted", "AbortError"), + ) + } + const cleanup = () => { + clearTimeoutSignal() + timeoutController.signal.removeEventListener("abort", abortFromTimeout) + signal.removeEventListener("abort", abortFromCaller) + } + + if (signal.aborted) { + abortFromCaller() + return { + signal: combinedController.signal, + cleanup, + } + } + + timeoutController.signal.addEventListener("abort", abortFromTimeout, { + once: true, + }) + signal.addEventListener("abort", abortFromCaller, { once: true }) + + return { + signal: combinedController.signal, + cleanup, + } +} + +async function fetchWithRequestTimeout( + input: RequestInfo | URL, + init: RequestInit, + fetchImpl: FetchFn = fetch, +): Promise { + const timeoutMs = getRequestTimeoutMs() + const { signal, cleanup } = buildRequestTimeoutSignal(init.signal, timeoutMs) + + try { + return await fetchImpl(input, { + ...init, + signal, + }) + } finally { + cleanup() + } +} + export async function fetchWithRetry( input: RequestInfo | URL, init?: RequestInit, @@ -306,7 +414,7 @@ const plugin: Plugin = async () => { .filter(Boolean) log("fetch_headers_built", { headerKeys, betas, modelId }) - let response = await fetchWithRetry(input, { + let response = await fetchWithRequestTimeout(input, { ...requestInit, body, headers, @@ -318,29 +426,26 @@ const plugin: Plugin = async () => { retryAttempt: 0, }) - // On 401, force a credential refresh and retry once. - // This handles the common case of token expiry mid-session. if (response.status === 401) { log("fetch_401_retry", { modelId }) - const refreshed = getCachedCredentials() - if (refreshed && refreshed.accessToken !== latest.accessToken) { - const retryHeaders = buildRequestHeaders( - input, - requestInit, - refreshed.accessToken, - modelId, - excluded, - ) - response = await fetchWithRetry(input, { - ...requestInit, - body, - headers: retryHeaders, - }) - log("fetch_401_retry_result", { - status: response.status, - modelId, - }) - } + const refreshed = getCachedCredentials(true) + const retryToken = refreshed?.accessToken ?? latest.accessToken + const retryHeaders = buildRequestHeaders( + input, + requestInit, + retryToken, + modelId, + excluded, + ) + response = await fetchWithRequestTimeout(input, { + ...requestInit, + body, + headers: retryHeaders, + }) + log("fetch_401_retry_result", { + status: response.status, + modelId, + }) } // Check for long-context beta errors and retry with betas excluded @@ -384,7 +489,7 @@ const plugin: Plugin = async () => { newExcluded, ) - response = await fetchWithRetry(input, { + response = await fetchWithRequestTimeout(input, { ...requestInit, body, headers: newHeaders, diff --git a/src/transforms.test.ts b/src/transforms.test.ts index 763a5b5..df8de55 100644 --- a/src/transforms.test.ts +++ b/src/transforms.test.ts @@ -5,6 +5,8 @@ import { stripToolPrefix, transformBody, transformResponseStream, + getStreamIdleTimeoutMs, + DEFAULT_STREAM_IDLE_TIMEOUT_MS, } from "./transforms.ts" describe("transforms", () => { @@ -779,6 +781,109 @@ describe("transforms", () => { ) }) + it("transformResponseStream errors stalled SSE stream after idle timeout", async () => { + const encoder = new TextEncoder() + + const source = new ReadableStream({ + start(controller) { + controller.enqueue(encoder.encode('data: {"partial":"chunk"}')) + }, + }) + + const saved = process.env.ANTHROPIC_STREAM_IDLE_TIMEOUT_MS + process.env.ANTHROPIC_STREAM_IDLE_TIMEOUT_MS = "50" + try { + const response = new Response(source, { status: 200 }) + const transformed = transformResponseStream(response) + const reader = transformed.body!.getReader() + + await assert.rejects(reader.read(), (err: Error) => { + assert.ok( + err.message.includes("SSE stream idle timeout"), + `Expected idle timeout error, got: ${err.message}`, + ) + return true + }) + } finally { + if (saved === undefined) + delete process.env.ANTHROPIC_STREAM_IDLE_TIMEOUT_MS + else process.env.ANTHROPIC_STREAM_IDLE_TIMEOUT_MS = saved + } + }) + + it("transformResponseStream continues streaming when boundaries arrive before timeout", async () => { + const encoder = new TextEncoder() + let enqueueChunk: ((chunk: string) => void) | undefined + let closeStream: (() => void) | undefined + + const source = new ReadableStream({ + start(controller) { + enqueueChunk = (chunk: string) => + controller.enqueue(encoder.encode(chunk)) + closeStream = () => controller.close() + }, + }) + + const saved = process.env.ANTHROPIC_STREAM_IDLE_TIMEOUT_MS + process.env.ANTHROPIC_STREAM_IDLE_TIMEOUT_MS = "200" + try { + const response = new Response(source, { status: 200 }) + const transformed = transformResponseStream(response) + const reader = transformed.body!.getReader() + const decoder = new TextDecoder() + + enqueueChunk!('data: {"event":"one"}\n\n') + const first = await reader.read() + assert.equal(first.done, false) + assert.ok(decoder.decode(first.value).includes('"event":"one"')) + + await new Promise((r) => setTimeout(r, 50)) + enqueueChunk!('data: {"event":"two"}\n\n') + const second = await reader.read() + assert.equal(second.done, false) + assert.ok(decoder.decode(second.value).includes('"event":"two"')) + + closeStream!() + const final = await reader.read() + assert.equal(final.done, true) + } finally { + if (saved === undefined) + delete process.env.ANTHROPIC_STREAM_IDLE_TIMEOUT_MS + else process.env.ANTHROPIC_STREAM_IDLE_TIMEOUT_MS = saved + } + }) + + it("getStreamIdleTimeoutMs returns default for missing env", () => { + const saved = process.env.ANTHROPIC_STREAM_IDLE_TIMEOUT_MS + delete process.env.ANTHROPIC_STREAM_IDLE_TIMEOUT_MS + try { + assert.equal(getStreamIdleTimeoutMs(), DEFAULT_STREAM_IDLE_TIMEOUT_MS) + } finally { + if (saved !== undefined) + process.env.ANTHROPIC_STREAM_IDLE_TIMEOUT_MS = saved + } + }) + + it("getStreamIdleTimeoutMs rejects non-positive and non-integer values", () => { + const saved = process.env.ANTHROPIC_STREAM_IDLE_TIMEOUT_MS + try { + for (const bad of ["0", "-1", "3.5", "abc", ""]) { + process.env.ANTHROPIC_STREAM_IDLE_TIMEOUT_MS = bad + assert.equal( + getStreamIdleTimeoutMs(), + DEFAULT_STREAM_IDLE_TIMEOUT_MS, + `Expected default for "${bad}"`, + ) + } + process.env.ANTHROPIC_STREAM_IDLE_TIMEOUT_MS = "5000" + assert.equal(getStreamIdleTimeoutMs(), 5000) + } finally { + if (saved === undefined) + delete process.env.ANTHROPIC_STREAM_IDLE_TIMEOUT_MS + else process.env.ANTHROPIC_STREAM_IDLE_TIMEOUT_MS = saved + } + }) + it("transformResponseStream flushes remaining buffered data on stream end", async () => { const encoder = new TextEncoder() const chunk1 = 'data: {"name":"mcp_alpha"}\n\n' diff --git a/src/transforms.ts b/src/transforms.ts index acf5b00..e3bb032 100644 --- a/src/transforms.ts +++ b/src/transforms.ts @@ -249,6 +249,17 @@ export function stripToolPrefix(text: string): string { return text.replace(/"name"\s*:\s*"mcp_([^"]+)"/g, '"name": "$1"') } +export const DEFAULT_STREAM_IDLE_TIMEOUT_MS = 15_000 + +export function getStreamIdleTimeoutMs(): number { + const raw = process.env.ANTHROPIC_STREAM_IDLE_TIMEOUT_MS + if (raw === undefined) return DEFAULT_STREAM_IDLE_TIMEOUT_MS + const parsed = Number(raw) + if (!Number.isInteger(parsed) || parsed <= 0) + return DEFAULT_STREAM_IDLE_TIMEOUT_MS + return parsed +} + export function transformResponseStream(response: Response): Response { if (!response.body) { return response @@ -285,6 +296,8 @@ export function transformResponseStream(response: Response): Response { const decoder = new TextDecoder() const encoder = new TextEncoder() let buffer = "" + const idleMs = getStreamIdleTimeoutMs() + let idleTimer: ReturnType | undefined const stream = new ReadableStream({ async pull(controller) { @@ -293,11 +306,39 @@ export function transformResponseStream(response: Response): Response { if (boundary !== -1) { const completeEvent = buffer.slice(0, boundary + 2) buffer = buffer.slice(boundary + 2) + // Reset idle timer after emitting a complete SSE event + if (idleTimer !== undefined) clearTimeout(idleTimer) + idleTimer = undefined controller.enqueue(encoder.encode(stripToolPrefix(completeEvent))) return } - const { done, value } = await reader.read() + // Race reader.read() against idle timeout + const idlePromise = new Promise<{ idle: true }>((resolve) => { + idleTimer = setTimeout(() => resolve({ idle: true }), idleMs) + }) + + const result = await Promise.race([ + reader.read().then((r) => ({ idle: false as const, ...r })), + idlePromise, + ]) + + if ("idle" in result && result.idle) { + // Idle timeout fired — cancel upstream and error the stream + reader.cancel().catch(() => {}) + controller.error( + new Error( + `SSE stream idle timeout: no data received for ${idleMs}ms`, + ), + ) + return + } + + // Clear the idle timer after a successful raw read + if (idleTimer !== undefined) clearTimeout(idleTimer) + idleTimer = undefined + + const { done, value } = result as ReadableStreamReadResult if (done) { if (buffer) { @@ -311,6 +352,10 @@ export function transformResponseStream(response: Response): Response { buffer += decoder.decode(value, { stream: true }) } }, + cancel() { + if (idleTimer !== undefined) clearTimeout(idleTimer) + reader.cancel().catch(() => {}) + }, }) return new Response(stream, {