diff --git a/index.ts b/index.ts index 4f7a4a59..ecbc5d5c 100644 --- a/index.ts +++ b/index.ts @@ -127,7 +127,7 @@ import { createCodexHeaders, extractRequestUrl, handleErrorResponse, - handleSuccessResponse, + handleSuccessResponseDetailed, getUnsupportedCodexModelInfo, resolveUnsupportedCodexFallbackModel, refreshAndUpdateToken, @@ -2184,20 +2184,22 @@ export const OpenAIOAuthPlugin: Plugin = async ({ client }: PluginInput) => { let restartAccountTraversalWithFallback = false; while (attempted.size < Math.max(1, accountCount)) { - const selectionExplainability = accountManager.getSelectionExplainability( + const selectionNow = Date.now(); + const selection = accountManager.getSelectionExplainabilityAndNextForFamilyHybrid( modelFamily, model, - Date.now(), + selectionNow, + { pidOffsetEnabled }, ); runtimeMetrics.lastSelectionSnapshot = { - timestamp: Date.now(), + timestamp: selectionNow, family: modelFamily, model: model ?? null, selectedAccountIndex: null, quotaKey, - explainability: selectionExplainability, + explainability: selection.explainability, }; - const account = accountManager.getCurrentOrNextForFamilyHybrid(modelFamily, model, { pidOffsetEnabled }); + const account = selection.account; if (!account || attempted.has(account.index)) { break; } @@ -2644,9 +2646,10 @@ while (attempted.size < Math.max(1, accountCount)) { resetRateLimitBackoff(account.index, quotaKey); runtimeMetrics.cumulativeLatencyMs += fetchLatencyMs; - const successResponse = await handleSuccessResponse(response, isStreaming, { + const successResult = await handleSuccessResponseDetailed(response, isStreaming, { streamStallTimeoutMs, }); + const successResponse = successResult.response; if (!successResponse.ok) { runtimeMetrics.failedRequests++; @@ -2656,10 +2659,12 @@ while (attempted.size < Math.max(1, accountCount)) { } if (!isStreaming && emptyResponseMaxRetries > 0) { - const clonedResponse = successResponse.clone(); try { - const bodyText = await clonedResponse.text(); - const parsedBody = bodyText ? JSON.parse(bodyText) as unknown : null; + let parsedBody: unknown = successResult.parsedJson; + if (parsedBody === undefined) { + const bodyText = await successResponse.clone().text(); + parsedBody = bodyText ? JSON.parse(bodyText) as unknown : null; + } if (isEmptyResponse(parsedBody)) { if ( emptyResponseRetries < emptyResponseMaxRetries && diff --git a/lib/accounts.ts b/lib/accounts.ts index c53804c4..083f7d15 100644 --- a/lib/accounts.ts +++ b/lib/accounts.ts @@ -16,6 +16,7 @@ import { getHealthTracker, getTokenTracker, selectHybridAccount, + DEFAULT_HYBRID_SELECTION_CONFIG, type AccountWithMetrics, type HybridSelectionOptions, } from "./rotation.js"; @@ -207,6 +208,11 @@ export interface AccountSelectionExplainability { lastUsed: number; } +export interface AccountSelectionResult { + explainability: AccountSelectionExplainability[]; + account: ManagedAccount | null; +} + export class AccountManager { private accounts: ManagedAccount[] = []; private cursorByFamily: Record = initFamilyState(0); @@ -480,6 +486,147 @@ export class AccountManager { }); } + getSelectionExplainabilityAndNextForFamilyHybrid( + family: ModelFamily, + model?: string | null, + now = nowMs(), + options?: HybridSelectionOptions, + ): AccountSelectionResult { + const count = this.accounts.length; + if (count === 0) { + return { + explainability: [], + account: null, + }; + } + + const quotaKey = model ? `${family}:${model}` : family; + const baseQuotaKey = getQuotaKey(family); + const modelQuotaKey = model ? getQuotaKey(family, model) : null; + const currentIndex = this.currentAccountIndexByFamily[family]; + const healthTracker = getHealthTracker(); + const tokenTracker = getTokenTracker(); + const cfg = DEFAULT_HYBRID_SELECTION_CONFIG; + const pidBonus = options?.pidOffsetEnabled ? (process.pid % 100) * 0.01 : 0; + + const explainability: AccountSelectionExplainability[] = []; + let selectedAccount: ManagedAccount | null = null; + let leastRecentlyUsedEnabled: ManagedAccount | null = null; + let availableCount = 0; + let bestScore = -Infinity; + let currentEligibleSelected = false; + + for (const account of this.accounts) { + clearExpiredRateLimits(account); + const enabled = account.enabled !== false; + const reasons: string[] = []; + let rateLimitedUntil: number | undefined; + const baseRateLimit = account.rateLimitResetTimes[baseQuotaKey]; + const modelRateLimit = modelQuotaKey ? account.rateLimitResetTimes[modelQuotaKey] : undefined; + if (typeof baseRateLimit === "number" && baseRateLimit > now) { + rateLimitedUntil = baseRateLimit; + } + if ( + typeof modelRateLimit === "number" && + modelRateLimit > now && + (rateLimitedUntil === undefined || modelRateLimit > rateLimitedUntil) + ) { + rateLimitedUntil = modelRateLimit; + } + + const coolingDownUntil = + typeof account.coolingDownUntil === "number" && account.coolingDownUntil > now + ? account.coolingDownUntil + : undefined; + + if (!enabled) reasons.push("disabled"); + if (rateLimitedUntil !== undefined) reasons.push("rate-limited"); + if (coolingDownUntil !== undefined) { + reasons.push( + account.cooldownReason ? `cooldown:${account.cooldownReason}` : "cooldown", + ); + } + + const tokensAvailable = tokenTracker.getTokens(account.index, quotaKey); + if (tokensAvailable < 1) reasons.push("token-bucket-empty"); + + const eligible = + enabled && + rateLimitedUntil === undefined && + coolingDownUntil === undefined && + tokensAvailable >= 1; + if (reasons.length === 0) reasons.push("eligible"); + + const healthScore = healthTracker.getScore(account.index, quotaKey); + explainability.push({ + index: account.index, + enabled, + isCurrentForFamily: currentIndex === account.index, + eligible, + reasons, + healthScore, + tokensAvailable, + rateLimitedUntil, + coolingDownUntil, + cooldownReason: coolingDownUntil !== undefined ? account.cooldownReason : undefined, + lastUsed: account.lastUsed, + }); + + if (!enabled) { + continue; + } + if (!leastRecentlyUsedEnabled || account.lastUsed < leastRecentlyUsedEnabled.lastUsed) { + leastRecentlyUsedEnabled = account; + } + if (!eligible) { + continue; + } + + if (account.index === currentIndex) { + selectedAccount = account; + currentEligibleSelected = true; + continue; + } + + if (currentEligibleSelected) { + continue; + } + + availableCount += 1; + const hoursSinceUsed = (now - account.lastUsed) / (1000 * 60 * 60); + let score = + healthScore * cfg.healthWeight + + tokensAvailable * cfg.tokenWeight + + hoursSinceUsed * cfg.freshnessWeight; + if (options?.pidOffsetEnabled) { + score += ((account.index * 0.131 + pidBonus) % 1) * cfg.freshnessWeight * 0.1; + } + if (availableCount === 1 || score > bestScore) { + bestScore = score; + selectedAccount = account; + } + } + + if (!selectedAccount) { + selectedAccount = availableCount === 0 ? leastRecentlyUsedEnabled : null; + } + + if (!selectedAccount) { + return { + explainability, + account: null, + }; + } + + this.currentAccountIndexByFamily[family] = selectedAccount.index; + this.cursorByFamily[family] = (selectedAccount.index + 1) % count; + selectedAccount.lastUsed = now; + return { + explainability, + account: selectedAccount, + }; + } + setActiveIndex(index: number): ManagedAccount | null { if (!Number.isFinite(index)) return null; if (index < 0 || index >= this.accounts.length) return null; diff --git a/lib/request/fetch-helpers.ts b/lib/request/fetch-helpers.ts index c004a531..2d87f35f 100644 --- a/lib/request/fetch-helpers.ts +++ b/lib/request/fetch-helpers.ts @@ -8,7 +8,7 @@ import { queuedRefresh } from "../refresh-queue.js"; import { logRequest, logError, logWarn } from "../logger.js"; import { getCodexInstructions, getModelFamily } from "../prompts/codex.js"; import { transformRequestBody, normalizeModel } from "./request-transformer.js"; -import { convertSseToJson, ensureContentType } from "./response-handler.js"; +import { convertSseToJsonDetailed, ensureContentType } from "./response-handler.js"; import type { UserConfig, RequestBody } from "../types.js"; import { CodexAuthError } from "../errors.js"; import { isRecord } from "../utils.js"; @@ -278,6 +278,11 @@ export interface ErrorDiagnostics { httpStatus?: number; } +export interface SuccessResponseDetails { + response: Response; + parsedJson?: unknown; +} + /** * Determines if the current auth token needs to be refreshed * @param auth - Current authentication state @@ -590,6 +595,15 @@ export async function handleSuccessResponse( isStreaming: boolean, options?: { streamStallTimeoutMs?: number }, ): Promise { + const details = await handleSuccessResponseDetailed(response, isStreaming, options); + return details.response; +} + +export async function handleSuccessResponseDetailed( + response: Response, + isStreaming: boolean, + options?: { streamStallTimeoutMs?: number }, +): Promise { // Check for deprecation headers (RFC 8594) const deprecation = response.headers.get("Deprecation"); const sunset = response.headers.get("Sunset"); @@ -601,15 +615,22 @@ export async function handleSuccessResponse( // For non-streaming requests (generateText), convert SSE to JSON if (!isStreaming) { - return await convertSseToJson(response, responseHeaders, options); + const converted = await convertSseToJsonDetailed(response, responseHeaders, options); + return { + response: converted.response, + parsedJson: converted.parsedResponse, + }; } // For streaming requests (streamText), return stream as-is - return new Response(response.body, { - status: response.status, - statusText: response.statusText, - headers: responseHeaders, - }); + return { + response: new Response(response.body, { + status: response.status, + statusText: response.statusText, + headers: responseHeaders, + }), + parsedJson: undefined, + }; } async function safeReadBody(response: Response): Promise { diff --git a/lib/request/helpers/tool-utils.ts b/lib/request/helpers/tool-utils.ts index 14212e44..78a86a5e 100644 --- a/lib/request/helpers/tool-utils.ts +++ b/lib/request/helpers/tool-utils.ts @@ -14,6 +14,45 @@ export interface Tool { function: ToolFunction; } +const cleanedToolCache = new WeakMap(); +const cleanedToolArrayCache = new WeakMap(); + +function cloneJsonLike(value: unknown): unknown { + if (value === null) return null; + if (value === undefined) return undefined; + if ( + typeof value === "string" || + typeof value === "number" || + typeof value === "boolean" + ) { + return value; + } + + if (Array.isArray(value)) { + return value.map((item) => { + const cloned = cloneJsonLike(item); + return cloned === undefined ? null : cloned; + }); + } + + if (typeof value === "object") { + const withJson = value as { toJSON?: () => unknown }; + if (typeof withJson.toJSON === "function") { + return cloneJsonLike(withJson.toJSON()); + } + const output: Record = {}; + for (const [key, item] of Object.entries(value as Record)) { + const cloned = cloneJsonLike(item); + if (cloned !== undefined) { + output[key] = cloned; + } + } + return output; + } + + return undefined; +} + /** * Cleans up tool definitions to ensure strict JSON Schema compliance. * @@ -30,19 +69,36 @@ export interface Tool { export function cleanupToolDefinitions(tools: unknown): unknown { if (!Array.isArray(tools)) return tools; - return tools.map((tool) => { + const cachedArray = cleanedToolArrayCache.get(tools); + if (cachedArray) { + return cachedArray; + } + + const cleaned = tools.map((tool) => { if (tool?.type !== "function" || !tool.function) { return tool; } + const cachedTool = cleanedToolCache.get(tool); + if (cachedTool) { + return cachedTool; + } + // Clone to avoid mutating original - const cleanedTool = JSON.parse(JSON.stringify(tool)); + const cloned = cloneJsonLike(tool); + if (!cloned || typeof cloned !== "object") { + return tool; + } + const cleanedTool = cloned as Tool; if (cleanedTool.function.parameters) { cleanupSchema(cleanedTool.function.parameters); } + cleanedToolCache.set(tool, cleanedTool); return cleanedTool; }); + cleanedToolArrayCache.set(tools, cleaned); + return cleaned; } /** diff --git a/lib/request/response-handler.ts b/lib/request/response-handler.ts index ff8f080a..58161125 100644 --- a/lib/request/response-handler.ts +++ b/lib/request/response-handler.ts @@ -7,6 +7,12 @@ const log = createLogger("response-handler"); const MAX_SSE_SIZE = 10 * 1024 * 1024; // 10MB limit to prevent memory exhaustion const DEFAULT_STREAM_STALL_TIMEOUT_MS = 45_000; const STREAM_ERROR_CODE = "stream_error"; +const TERMINAL_EVENT_TYPE_PATTERN = + /"type"\s*:\s*"(?:error|response\.error|response\.failed|response\.incomplete|response\.done|response\.completed)"/; +const TERMINAL_SSE_LINE_PATTERN = + /response\.(?:error|failed|incomplete|done|completed)|"type"\s*:\s*"error"/; +const RESPONSE_TYPE_PREFIX = "\"type\":\"response."; +const RESPONSE_TYPE_PREFIX_LENGTH = RESPONSE_TYPE_PREFIX.length; type ParsedSseResult = | { @@ -22,6 +28,11 @@ type ParsedSseResult = }; }; +export interface SseConversionResult { + response: Response; + parsedResponse?: unknown; +} + function toRecord(value: unknown): Record | null { if (value && typeof value === "object") { return value as Record; @@ -82,67 +93,173 @@ function extractResponseError(responseRecord: Record): { function parseDataPayload(line: string): string | null { if (!line.startsWith("data:")) return null; - const payload = line.slice(5).trimStart(); + let payloadStart = 5; + while (payloadStart < line.length) { + const code = line.charCodeAt(payloadStart); + if (code !== 32 && code !== 9) break; + payloadStart += 1; + } + const payload = line.slice(payloadStart); if (!payload || payload === "[DONE]") return null; return payload; } -/** +function parseSsePayload(payload: string): ParsedSseResult | null { + // Fast path: most SSE events are non-terminal deltas we can skip without JSON parsing. + if (!TERMINAL_EVENT_TYPE_PATTERN.test(payload)) { + return null; + } - * Parse SSE stream to extract final response - * @param sseText - Complete SSE stream text - * @returns Final response object or null if not found - */ -function parseSseStream(sseText: string): ParsedSseResult | null { - const lines = sseText.split(/\r?\n/); + try { + const data = JSON.parse(payload) as SSEEventData; + const responseRecord = toRecord((data as { response?: unknown }).response); - for (const line of lines) { - const trimmedLine = line.trim(); - const payload = parseDataPayload(trimmedLine); - if (payload) { - try { - const data = JSON.parse(payload) as SSEEventData; - const responseRecord = toRecord((data as { response?: unknown }).response); + if (data.type === "error" || data.type === "response.error") { + const parsedError = extractStreamError(data); + log.error("SSE error event received", { error: parsedError }); + return { kind: "error", error: parsedError }; + } - if (data.type === "error" || data.type === "response.error") { - const parsedError = extractStreamError(data); - log.error("SSE error event received", { error: parsedError }); - return { kind: "error", error: parsedError }; - } + if (data.type === "response.failed" || data.type === "response.incomplete") { + const parsedError = + (responseRecord && extractResponseError(responseRecord)) ?? + extractStreamError(data); + log.error("SSE response terminal error event received", { + type: data.type, + error: parsedError, + }); + return { kind: "error", error: parsedError }; + } - if (data.type === "response.failed" || data.type === "response.incomplete") { - const parsedError = - (responseRecord && extractResponseError(responseRecord)) ?? - extractStreamError(data); - log.error("SSE response terminal error event received", { - type: data.type, + if (data.type === "response.done" || data.type === "response.completed") { + if (responseRecord) { + const parsedError = extractResponseError(responseRecord); + if (parsedError) { + log.error("SSE response completed with terminal error", { error: parsedError, + status: responseRecord.status, }); return { kind: "error", error: parsedError }; } - - if (data.type === "response.done" || data.type === "response.completed") { - if (responseRecord) { - const parsedError = extractResponseError(responseRecord); - if (parsedError) { - log.error("SSE response completed with terminal error", { - error: parsedError, - status: responseRecord.status, - }); - return { kind: "error", error: parsedError }; - } - } - return { kind: "response", response: data.response }; - } - } catch { - // Skip malformed JSON } + return { kind: "response", response: data.response }; } + } catch { + // Skip malformed JSON } return null; } +function isPotentialTerminalSseLine(line: string): boolean { + const responseTypePrefixIndex = line.indexOf(RESPONSE_TYPE_PREFIX); + if (responseTypePrefixIndex >= 0) { + const terminalInitial = line.charCodeAt( + responseTypePrefixIndex + RESPONSE_TYPE_PREFIX_LENGTH, + ); + // done/completed/failed/incomplete/error + return ( + terminalInitial === 100 || + terminalInitial === 99 || + terminalInitial === 102 || + terminalInitial === 105 || + terminalInitial === 101 + ); + } + + if (line.includes("\"type\":\"error\"") || line.includes("\"type\": \"error\"")) { + return true; + } + + return line.includes("\"type\"") && TERMINAL_SSE_LINE_PATTERN.test(line); +} + +function consumeSseBuffer( + buffer: string, + options?: { flush?: boolean }, +): { parsed: ParsedSseResult | null; remainder: string } { + let cursor = 0; + while (true) { + const lineEnd = buffer.indexOf("\n", cursor); + if (lineEnd < 0) break; + let line = buffer.slice(cursor, lineEnd); + if (line.endsWith("\r")) { + line = line.slice(0, -1); + } + if (!isPotentialTerminalSseLine(line)) { + cursor = lineEnd + 1; + continue; + } + const payload = parseDataPayload(line); + if (payload) { + const parsed = parseSsePayload(payload); + if (parsed) { + return { + parsed, + remainder: "", + }; + } + } + cursor = lineEnd + 1; + } + + const remainder = buffer.slice(cursor); + const trimmedRemainder = remainder.trim(); + if (options?.flush && trimmedRemainder.length > 0) { + if (!isPotentialTerminalSseLine(trimmedRemainder)) { + return { + parsed: null, + remainder: "", + }; + } + const payload = parseDataPayload(trimmedRemainder); + if (payload) { + const parsed = parseSsePayload(payload); + if (parsed) { + return { + parsed, + remainder: "", + }; + } + } + } + + return { + parsed: null, + remainder: options?.flush ? "" : remainder, + }; +} + +function buildErrorResponse( + parsedError: { message: string; type?: string; code?: string | number }, + response: Response, + headers: Headers, +): Response { + log.warn("SSE stream returned an error event", parsedError); + logRequest("stream-error", { + error: parsedError.message, + type: parsedError.type, + code: parsedError.code, + }); + + const jsonHeaders = new Headers(headers); + jsonHeaders.set("content-type", "application/json; charset=utf-8"); + const status = response.status >= 400 ? response.status : 502; + const payload = { + error: { + message: parsedError.message, + type: parsedError.type ?? STREAM_ERROR_CODE, + code: parsedError.code ?? STREAM_ERROR_CODE, + }, + }; + + return new Response(JSON.stringify(payload), { + status, + statusText: status === 502 ? "Bad Gateway" : response.statusText, + headers: jsonHeaders, + }); +} + /** * Convert SSE stream response to JSON for generateText() * @param response - Fetch response with SSE stream @@ -154,59 +271,64 @@ export async function convertSseToJson( headers: Headers, options?: { streamStallTimeoutMs?: number }, ): Promise { + const result = await convertSseToJsonDetailed(response, headers, options); + return result.response; +} + +export async function convertSseToJsonDetailed( + response: Response, + headers: Headers, + options?: { streamStallTimeoutMs?: number }, +): Promise { if (!response.body) { throw new Error('[openai-codex-plugin] Response has no body'); } const reader = response.body.getReader(); const decoder = new TextDecoder(); - let fullText = ''; + let fullText = ""; + let parseBuffer = ""; + let parsedResult: ParsedSseResult | null = null; const streamStallTimeoutMs = Math.max( 1_000, Math.floor(options?.streamStallTimeoutMs ?? DEFAULT_STREAM_STALL_TIMEOUT_MS), ); try { - // Consume the entire stream + // Consume stream incrementally and stop early once terminal event is parsed. while (true) { const { done, value } = await readWithTimeout(reader, streamStallTimeoutMs); - if (done) break; - fullText += decoder.decode(value, { stream: true }); + const chunkText = done + ? decoder.decode() + : decoder.decode(value, { stream: true }); + if (chunkText) { + fullText += chunkText; + parseBuffer += chunkText; + } if (fullText.length > MAX_SSE_SIZE) { throw new Error(`SSE response exceeds ${MAX_SSE_SIZE} bytes limit`); } + + const consumed = consumeSseBuffer(parseBuffer, { flush: done }); + parseBuffer = consumed.remainder; + if (consumed.parsed) { + parsedResult = consumed.parsed; + if (!done && typeof reader.cancel === "function") { + await reader.cancel("terminal SSE event parsed").catch(() => {}); + } + break; + } + if (done) break; } if (LOGGING_ENABLED) { logRequest("stream-full", { fullContent: fullText }); } - // Parse SSE events to extract the final response - const parsedResult = parseSseStream(fullText); - if (parsedResult?.kind === "error") { - log.warn("SSE stream returned an error event", parsedResult.error); - logRequest("stream-error", { - error: parsedResult.error.message, - type: parsedResult.error.type, - code: parsedResult.error.code, - }); - - const jsonHeaders = new Headers(headers); - jsonHeaders.set("content-type", "application/json; charset=utf-8"); - const status = response.status >= 400 ? response.status : 502; - const payload = { - error: { - message: parsedResult.error.message, - type: parsedResult.error.type ?? STREAM_ERROR_CODE, - code: parsedResult.error.code ?? STREAM_ERROR_CODE, - }, + return { + response: buildErrorResponse(parsedResult.error, response, headers), + parsedResponse: undefined, }; - - return new Response(JSON.stringify(payload), { - status, - statusText: status === 502 ? "Bad Gateway" : response.statusText, - headers: jsonHeaders, - }); } const finalResponse = @@ -218,22 +340,28 @@ export async function convertSseToJson( logRequest("stream-error", { error: "No response.done event found" }); // Return original stream if we can't parse - return new Response(fullText, { - status: response.status, - statusText: response.statusText, - headers: headers, - }); + return { + response: new Response(fullText, { + status: response.status, + statusText: response.statusText, + headers: headers, + }), + parsedResponse: undefined, + }; } // Return as plain JSON (not SSE) const jsonHeaders = new Headers(headers); jsonHeaders.set('content-type', 'application/json; charset=utf-8'); - return new Response(JSON.stringify(finalResponse), { - status: response.status, - statusText: response.statusText, - headers: jsonHeaders, - }); + return { + response: new Response(JSON.stringify(finalResponse), { + status: response.status, + statusText: response.statusText, + headers: jsonHeaders, + }), + parsedResponse: finalResponse, + }; } catch (error) { log.error("Error converting stream", { error: String(error) }); @@ -246,7 +374,6 @@ export async function convertSseToJson( // Release the reader lock to prevent resource leaks reader.releaseLock(); } - } /** diff --git a/package.json b/package.json index 99934cf4..c3590e80 100644 --- a/package.json +++ b/package.json @@ -42,6 +42,8 @@ "test:ui": "vitest --ui", "test:coverage": "vitest run --coverage", "coverage": "vitest run --coverage", + "perf:bench:baseline": "npm run build && node scripts/perf-bench.mjs --output .omx/perf/baseline.json --baseline .omx/perf/baseline.json --write-baseline --gate-hotspot 0.4 --gate-nonhot 0.03", + "perf:bench": "npm run build && node scripts/perf-bench.mjs --output .omx/perf/current.json --baseline .omx/perf/baseline.json --gate-hotspot 0.4 --gate-nonhot 0.03", "audit:prod": "npm audit --omit=dev --audit-level=high", "audit:all": "npm audit --audit-level=high", "audit:dev:allowlist": "node scripts/audit-dev-allowlist.js", diff --git a/scripts/perf-bench.mjs b/scripts/perf-bench.mjs new file mode 100644 index 00000000..35210c25 --- /dev/null +++ b/scripts/perf-bench.mjs @@ -0,0 +1,460 @@ +#!/usr/bin/env node + +import { execSync } from "node:child_process"; +import { existsSync, mkdirSync, readFileSync, writeFileSync } from "node:fs"; +import { dirname, resolve } from "node:path"; +import { monitorEventLoopDelay, performance } from "node:perf_hooks"; +import { AccountManager } from "../dist/lib/accounts.js"; +import { convertSseToJson } from "../dist/lib/request/response-handler.js"; +import { cleanupToolDefinitions } from "../dist/lib/request/helpers/tool-utils.js"; + +const HOTSPOT_SCENARIOS = new Set([ + "selection_degraded_n200", + "sse_nonstream_large", + "tool_cleanup_n100", +]); + +const DEFAULT_HOTSPOT_TARGET = 0.4; +const DEFAULT_NONHOT_REGRESSION_LIMIT = 0.03; + +function parseArgs(argv) { + const parsed = { + output: ".omx/perf/current.json", + baseline: ".omx/perf/baseline.json", + writeBaseline: false, + hotspotTarget: DEFAULT_HOTSPOT_TARGET, + nonHotRegressionLimit: DEFAULT_NONHOT_REGRESSION_LIMIT, + }; + + for (let i = 0; i < argv.length; i += 1) { + const arg = argv[i]; + if (arg === "--output" && argv[i + 1]) { + parsed.output = argv[i + 1]; + i += 1; + continue; + } + if (arg === "--baseline" && argv[i + 1]) { + parsed.baseline = argv[i + 1]; + i += 1; + continue; + } + if (arg === "--gate-hotspot" && argv[i + 1]) { + parsed.hotspotTarget = Number(argv[i + 1]); + i += 1; + continue; + } + if (arg === "--gate-nonhot" && argv[i + 1]) { + parsed.nonHotRegressionLimit = Number(argv[i + 1]); + i += 1; + continue; + } + if (arg === "--write-baseline") { + parsed.writeBaseline = true; + } + } + + return parsed; +} + +function percentile(sortedValues, percentileValue) { + if (sortedValues.length === 0) return 0; + if (sortedValues.length === 1) return sortedValues[0] ?? 0; + const index = Math.min( + sortedValues.length - 1, + Math.max(0, Math.ceil((percentileValue / 100) * sortedValues.length) - 1), + ); + return sortedValues[index] ?? 0; +} + +function summarize(values) { + if (values.length === 0) { + return { + min: 0, + max: 0, + mean: 0, + p50: 0, + p95: 0, + p99: 0, + }; + } + const sorted = [...values].sort((a, b) => a - b); + const total = sorted.reduce((sum, value) => sum + value, 0); + return { + min: sorted[0] ?? 0, + max: sorted[sorted.length - 1] ?? 0, + mean: total / sorted.length, + p50: percentile(sorted, 50), + p95: percentile(sorted, 95), + p99: percentile(sorted, 99), + }; +} + +function ensureParentDir(path) { + const parent = dirname(path); + if (!existsSync(parent)) { + mkdirSync(parent, { recursive: true }); + } +} + +function toAccountStorage(count) { + const now = Date.now(); + return { + version: 3, + activeIndex: 0, + activeIndexByFamily: {}, + accounts: Array.from({ length: count }, (_, index) => ({ + accountId: `acct-${index}`, + organizationId: `org-${Math.floor(index / 4)}`, + accountIdSource: "token", + accountLabel: `bench-${index}`, + email: `bench-${index}@example.com`, + refreshToken: `refresh-${index}`, + accessToken: `access-${index}`, + expiresAt: now + 60 * 60 * 1000, + enabled: true, + addedAt: now - index * 1000, + lastUsed: now - index * 3000, + rateLimitResetTimes: {}, + })), + }; +} + +function runSelectionTraversal(count, rounds) { + for (let round = 0; round < rounds; round += 1) { + const manager = new AccountManager(undefined, toAccountStorage(count)); + const attempted = new Set(); + while (attempted.size < Math.max(1, manager.getAccountCount())) { + let selected = null; + if (typeof manager.getSelectionExplainabilityAndNextForFamilyHybrid === "function") { + const selection = manager.getSelectionExplainabilityAndNextForFamilyHybrid( + "codex", + "gpt-5-codex", + Date.now(), + { pidOffsetEnabled: false }, + ); + selected = selection?.account ?? null; + } else { + manager.getSelectionExplainability("codex", "gpt-5-codex", Date.now()); + selected = manager.getCurrentOrNextForFamilyHybrid("codex", "gpt-5-codex", { + pidOffsetEnabled: false, + }); + } + if (!selected || attempted.has(selected.index)) break; + attempted.add(selected.index); + manager.markAccountCoolingDown(selected, 120_000, "auth-failure"); + manager.recordFailure(selected, "codex", "gpt-5-codex"); + } + } +} + +function createLargeSsePayload(deltaEvents) { + const parts = []; + for (let i = 0; i < deltaEvents; i += 1) { + parts.push( + `data: {"type":"response.output_text.delta","delta":"chunk-${i}-aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa"}\n\n`, + ); + } + parts.push( + `data: {"type":"response.done","response":{"id":"resp-bench","object":"response","model":"gpt-5-codex","output":[{"type":"message","role":"assistant","content":[{"type":"output_text","text":"ok"}]}]}}\n\n`, + ); + parts.push("data: [DONE]\n\n"); + return parts.join(""); +} + +function streamFromString(value, chunkSize) { + const encoder = new TextEncoder(); + let offset = 0; + return new ReadableStream({ + pull(controller) { + if (offset >= value.length) { + controller.close(); + return; + } + const chunk = value.slice(offset, offset + chunkSize); + offset += chunkSize; + controller.enqueue(encoder.encode(chunk)); + }, + }); +} + +async function runSseConversion(deltaEvents, chunkSize, rounds) { + const payload = createLargeSsePayload(deltaEvents); + for (let i = 0; i < rounds; i += 1) { + const response = new Response(streamFromString(payload, chunkSize), { + headers: { + "content-type": "text/event-stream", + }, + }); + const converted = await convertSseToJson(response, new Headers(response.headers), { + streamStallTimeoutMs: 20_000, + }); + await converted.text(); + } +} + +function createToolFixture(toolCount) { + const tools = []; + for (let i = 0; i < toolCount; i += 1) { + tools.push({ + type: "function", + function: { + name: `bench_tool_${i}`, + description: "Synthetic benchmark tool", + parameters: { + type: "object", + required: ["mode", "level", "phantom"], + properties: { + mode: { + anyOf: [{ const: "fast" }, { const: "safe" }, { const: "balanced" }], + }, + level: { + type: ["string", "null"], + description: "level", + }, + payload: { + type: "object", + properties: { + seed: { type: "number" }, + values: { + type: "array", + items: { + type: ["string", "null"], + description: "nested", + }, + }, + }, + additionalProperties: true, + }, + }, + additionalProperties: true, + }, + }, + }); + } + return tools; +} + +function runToolCleanup(toolCount, rounds) { + const tools = createToolFixture(toolCount); + for (let i = 0; i < rounds; i += 1) { + cleanupToolDefinitions(tools); + } +} + +async function benchmarkScenario(config) { + const durations = []; + const heapDeltas = []; + const monitor = monitorEventLoopDelay({ resolution: 20 }); + monitor.enable(); + + for (let i = 0; i < config.warmup; i += 1) { + await config.run(); + } + + for (let i = 0; i < config.iterations; i += 1) { + const heapBefore = process.memoryUsage().heapUsed; + const started = performance.now(); + await config.run(); + const elapsed = performance.now() - started; + const heapAfter = process.memoryUsage().heapUsed; + durations.push(elapsed); + heapDeltas.push(heapAfter - heapBefore); + } + + monitor.disable(); + const latency = summarize(durations); + const heap = summarize(heapDeltas); + return { + name: config.name, + category: HOTSPOT_SCENARIOS.has(config.name) ? "hotspot" : "nonhot", + iterations: config.iterations, + warmup: config.warmup, + latencyMs: latency, + heapDeltaBytes: heap, + eventLoopDelayMeanMs: Number.isFinite(monitor.mean) ? monitor.mean / 1_000_000 : 0, + }; +} + +function toScenarioMap(scenarios) { + const map = new Map(); + for (const scenario of scenarios) { + map.set(scenario.name, scenario); + } + return map; +} + +function evaluateGate(currentRun, baselineRun, hotspotTarget, nonHotRegressionLimit) { + if (!baselineRun) { + return { + passed: true, + reason: "no-baseline", + details: [], + }; + } + + const baselineByName = toScenarioMap(baselineRun.scenarios); + const details = []; + let passed = true; + + for (const currentScenario of currentRun.scenarios) { + const baselineScenario = baselineByName.get(currentScenario.name); + if (!baselineScenario) { + passed = false; + details.push({ + name: currentScenario.name, + status: "missing-baseline-scenario", + }); + continue; + } + + const baseP95 = baselineScenario.latencyMs.p95; + const currP95 = currentScenario.latencyMs.p95; + const improvement = baseP95 > 0 ? (baseP95 - currP95) / baseP95 : 0; + const regression = baseP95 > 0 ? (currP95 - baseP95) / baseP95 : 0; + const isHotspot = currentScenario.category === "hotspot"; + + if (isHotspot) { + const ok = improvement >= hotspotTarget; + if (!ok) passed = false; + details.push({ + name: currentScenario.name, + status: ok ? "pass" : "fail", + requirement: `improvement>=${Math.round(hotspotTarget * 100)}%`, + improvementPct: Number((improvement * 100).toFixed(2)), + baselineP95Ms: Number(baseP95.toFixed(3)), + currentP95Ms: Number(currP95.toFixed(3)), + }); + continue; + } + + const ok = regression <= nonHotRegressionLimit; + if (!ok) passed = false; + details.push({ + name: currentScenario.name, + status: ok ? "pass" : "fail", + requirement: `regression<=${Math.round(nonHotRegressionLimit * 100)}%`, + regressionPct: Number((regression * 100).toFixed(2)), + baselineP95Ms: Number(baseP95.toFixed(3)), + currentP95Ms: Number(currP95.toFixed(3)), + }); + } + + return { + passed, + reason: passed ? "thresholds-satisfied" : "thresholds-failed", + details, + }; +} + +function safeReadJson(path) { + if (!existsSync(path)) return null; + try { + return JSON.parse(readFileSync(path, "utf8")); + } catch { + return null; + } +} + +function getGitCommit() { + try { + return execSync("git rev-parse --short HEAD", { encoding: "utf8" }).trim(); + } catch { + return "unknown"; + } +} + +async function main() { + const args = parseArgs(process.argv.slice(2)); + + if (!existsSync(resolve("dist/lib/accounts.js"))) { + console.error("dist build artifacts not found. Run `npm run build` first."); + process.exit(1); + } + + const scenarios = [ + await benchmarkScenario({ + name: "selection_degraded_n50", + iterations: 20, + warmup: 4, + run: () => runSelectionTraversal(50, 12), + }), + await benchmarkScenario({ + name: "selection_degraded_n200", + iterations: 20, + warmup: 4, + run: () => runSelectionTraversal(200, 10), + }), + await benchmarkScenario({ + name: "sse_nonstream_small", + iterations: 16, + warmup: 3, + run: () => runSseConversion(80, 2048, 2), + }), + await benchmarkScenario({ + name: "sse_nonstream_large", + iterations: 16, + warmup: 3, + run: () => runSseConversion(1600, 512, 1), + }), + await benchmarkScenario({ + name: "tool_cleanup_n25", + iterations: 30, + warmup: 4, + run: () => runToolCleanup(25, 10), + }), + await benchmarkScenario({ + name: "tool_cleanup_n100", + iterations: 25, + warmup: 4, + run: () => runToolCleanup(100, 8), + }), + ]; + + const baselinePath = resolve(args.baseline); + const outputPath = resolve(args.output); + const baselineRun = args.writeBaseline ? null : safeReadJson(baselinePath); + const run = { + meta: { + timestamp: new Date().toISOString(), + commit: getGitCommit(), + node: process.version, + platform: process.platform, + arch: process.arch, + }, + thresholds: { + hotspotImprovementRequired: args.hotspotTarget, + nonHotRegressionAllowed: args.nonHotRegressionLimit, + }, + scenarios, + }; + run.gate = evaluateGate( + run, + baselineRun, + args.hotspotTarget, + args.nonHotRegressionLimit, + ); + + ensureParentDir(outputPath); + writeFileSync(outputPath, JSON.stringify(run, null, 2), "utf8"); + console.log(`Performance benchmark written to ${outputPath}`); + + if (args.writeBaseline) { + ensureParentDir(baselinePath); + writeFileSync(baselinePath, JSON.stringify(run, null, 2), "utf8"); + console.log(`Baseline captured at ${baselinePath}`); + return; + } + + console.log(`Gate status: ${run.gate.passed ? "PASS" : "FAIL"} (${run.gate.reason})`); + for (const detail of run.gate.details) { + console.log(JSON.stringify(detail)); + } + if (!run.gate.passed) { + process.exit(1); + } +} + +main().catch((error) => { + console.error(error); + process.exit(1); +}); diff --git a/test/index-retry.test.ts b/test/index-retry.test.ts index e4268e6c..3abe9f3d 100644 --- a/test/index-retry.test.ts +++ b/test/index-retry.test.ts @@ -27,6 +27,10 @@ vi.mock("../lib/request/fetch-helpers.js", () => ({ resolveUnsupportedCodexFallbackModel: () => undefined, shouldFallbackToGpt52OnUnsupportedGpt53: () => false, handleSuccessResponse: async (response: Response) => response, + handleSuccessResponseDetailed: async (response: Response) => ({ + response, + parsedJson: undefined, + }), })); vi.mock("../lib/request/request-transformer.js", () => ({ @@ -59,6 +63,13 @@ vi.mock("../lib/accounts.js", () => { return []; } + getSelectionExplainabilityAndNextForFamilyHybrid() { + return { + explainability: this.getSelectionExplainability(), + account: this.getCurrentOrNextForFamilyHybrid(), + }; + } + recordSuccess() {} recordRateLimit() {} diff --git a/test/index.test.ts b/test/index.test.ts index 02e79061..e3c1d87b 100644 --- a/test/index.test.ts +++ b/test/index.test.ts @@ -190,6 +190,10 @@ vi.mock("../lib/request/rate-limit-backoff.js", () => ({ resolveUnsupportedCodexFallbackModel: vi.fn(() => undefined), shouldFallbackToGpt52OnUnsupportedGpt53: vi.fn(() => false), handleSuccessResponse: vi.fn(async (response: Response) => response), + handleSuccessResponseDetailed: vi.fn(async (response: Response) => ({ + response, + parsedJson: undefined, + })), })); const mockStorage = { @@ -305,6 +309,13 @@ vi.mock("../lib/accounts.js", () => { })); } + getSelectionExplainabilityAndNextForFamilyHybrid() { + return { + explainability: this.getSelectionExplainability(), + account: this.getCurrentOrNextForFamilyHybrid(), + }; + } + recordSuccess() {} recordRateLimit() {} recordFailure() {} @@ -1533,6 +1544,31 @@ describe("OpenAIOAuthPlugin fetch handler", () => { lastUsed: Date.now(), }, ], + getSelectionExplainabilityAndNextForFamilyHybrid: (_family: string, currentModel?: string) => ({ + explainability: [ + { + index: 0, + enabled: true, + isCurrentForFamily: true, + eligible: true, + reasons: ["eligible"], + healthScore: 100, + tokensAvailable: 50, + lastUsed: Date.now(), + }, + { + index: 1, + enabled: true, + isCurrentForFamily: false, + eligible: true, + reasons: ["eligible"], + healthScore: 100, + tokensAvailable: 50, + lastUsed: Date.now(), + }, + ], + account: customManager.getCurrentOrNextForFamilyHybrid(_family, currentModel), + }), toAuthDetails: (account: { accountId?: string }) => ({ type: "oauth" as const, access: `access-${account.accountId ?? "unknown"}`,