Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
25 changes: 15 additions & 10 deletions index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -127,7 +127,7 @@ import {
createCodexHeaders,
extractRequestUrl,
handleErrorResponse,
handleSuccessResponse,
handleSuccessResponseDetailed,
getUnsupportedCodexModelInfo,
resolveUnsupportedCodexFallbackModel,
refreshAndUpdateToken,
Expand Down Expand Up @@ -2184,20 +2184,22 @@ export const OpenAIOAuthPlugin: Plugin = async ({ client }: PluginInput) => {
let restartAccountTraversalWithFallback = false;

while (attempted.size < Math.max(1, accountCount)) {
const selectionExplainability = accountManager.getSelectionExplainability(
const selectionNow = Date.now();
const selection = accountManager.getSelectionExplainabilityAndNextForFamilyHybrid(
modelFamily,
model,
Date.now(),
selectionNow,
{ pidOffsetEnabled },
);
runtimeMetrics.lastSelectionSnapshot = {
timestamp: Date.now(),
timestamp: selectionNow,
family: modelFamily,
model: model ?? null,
selectedAccountIndex: null,
quotaKey,
explainability: selectionExplainability,
explainability: selection.explainability,
};
const account = accountManager.getCurrentOrNextForFamilyHybrid(modelFamily, model, { pidOffsetEnabled });
const account = selection.account;
if (!account || attempted.has(account.index)) {
break;
}
Expand Down Expand Up @@ -2644,9 +2646,10 @@ while (attempted.size < Math.max(1, accountCount)) {

resetRateLimitBackoff(account.index, quotaKey);
runtimeMetrics.cumulativeLatencyMs += fetchLatencyMs;
const successResponse = await handleSuccessResponse(response, isStreaming, {
const successResult = await handleSuccessResponseDetailed(response, isStreaming, {
streamStallTimeoutMs,
});
const successResponse = successResult.response;

if (!successResponse.ok) {
runtimeMetrics.failedRequests++;
Expand All @@ -2656,10 +2659,12 @@ while (attempted.size < Math.max(1, accountCount)) {
}

if (!isStreaming && emptyResponseMaxRetries > 0) {
const clonedResponse = successResponse.clone();
try {
const bodyText = await clonedResponse.text();
const parsedBody = bodyText ? JSON.parse(bodyText) as unknown : null;
let parsedBody: unknown = successResult.parsedJson;
if (parsedBody === undefined) {
const bodyText = await successResponse.clone().text();
parsedBody = bodyText ? JSON.parse(bodyText) as unknown : null;
}
if (isEmptyResponse(parsedBody)) {
if (
emptyResponseRetries < emptyResponseMaxRetries &&
Expand Down
147 changes: 147 additions & 0 deletions lib/accounts.ts
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ import {
getHealthTracker,
getTokenTracker,
selectHybridAccount,
DEFAULT_HYBRID_SELECTION_CONFIG,
type AccountWithMetrics,
type HybridSelectionOptions,
} from "./rotation.js";
Expand Down Expand Up @@ -207,6 +208,11 @@ export interface AccountSelectionExplainability {
lastUsed: number;
}

export interface AccountSelectionResult {
explainability: AccountSelectionExplainability[];
account: ManagedAccount | null;
}

export class AccountManager {
private accounts: ManagedAccount[] = [];
private cursorByFamily: Record<ModelFamily, number> = initFamilyState(0);
Expand Down Expand Up @@ -480,6 +486,147 @@ export class AccountManager {
});
}

getSelectionExplainabilityAndNextForFamilyHybrid(
family: ModelFamily,
model?: string | null,
now = nowMs(),
options?: HybridSelectionOptions,
): AccountSelectionResult {
const count = this.accounts.length;
if (count === 0) {
return {
explainability: [],
account: null,
};
}

const quotaKey = model ? `${family}:${model}` : family;
const baseQuotaKey = getQuotaKey(family);
const modelQuotaKey = model ? getQuotaKey(family, model) : null;
const currentIndex = this.currentAccountIndexByFamily[family];
const healthTracker = getHealthTracker();
const tokenTracker = getTokenTracker();
const cfg = DEFAULT_HYBRID_SELECTION_CONFIG;
const pidBonus = options?.pidOffsetEnabled ? (process.pid % 100) * 0.01 : 0;

const explainability: AccountSelectionExplainability[] = [];
let selectedAccount: ManagedAccount | null = null;
let leastRecentlyUsedEnabled: ManagedAccount | null = null;
let availableCount = 0;
let bestScore = -Infinity;
let currentEligibleSelected = false;

for (const account of this.accounts) {
clearExpiredRateLimits(account);
const enabled = account.enabled !== false;
const reasons: string[] = [];
let rateLimitedUntil: number | undefined;
const baseRateLimit = account.rateLimitResetTimes[baseQuotaKey];
const modelRateLimit = modelQuotaKey ? account.rateLimitResetTimes[modelQuotaKey] : undefined;
if (typeof baseRateLimit === "number" && baseRateLimit > now) {
rateLimitedUntil = baseRateLimit;
}
if (
typeof modelRateLimit === "number" &&
modelRateLimit > now &&
(rateLimitedUntil === undefined || modelRateLimit > rateLimitedUntil)
) {
rateLimitedUntil = modelRateLimit;
}

const coolingDownUntil =
typeof account.coolingDownUntil === "number" && account.coolingDownUntil > now
? account.coolingDownUntil
: undefined;

if (!enabled) reasons.push("disabled");
if (rateLimitedUntil !== undefined) reasons.push("rate-limited");
if (coolingDownUntil !== undefined) {
reasons.push(
account.cooldownReason ? `cooldown:${account.cooldownReason}` : "cooldown",
);
}

const tokensAvailable = tokenTracker.getTokens(account.index, quotaKey);
if (tokensAvailable < 1) reasons.push("token-bucket-empty");

const eligible =
enabled &&
rateLimitedUntil === undefined &&
coolingDownUntil === undefined &&
tokensAvailable >= 1;
if (reasons.length === 0) reasons.push("eligible");

const healthScore = healthTracker.getScore(account.index, quotaKey);
explainability.push({
index: account.index,
enabled,
isCurrentForFamily: currentIndex === account.index,
eligible,
reasons,
healthScore,
tokensAvailable,
rateLimitedUntil,
coolingDownUntil,
cooldownReason: coolingDownUntil !== undefined ? account.cooldownReason : undefined,
lastUsed: account.lastUsed,
});

if (!enabled) {
continue;
}
if (!leastRecentlyUsedEnabled || account.lastUsed < leastRecentlyUsedEnabled.lastUsed) {
leastRecentlyUsedEnabled = account;
}
if (!eligible) {
continue;
}

if (account.index === currentIndex) {
selectedAccount = account;
currentEligibleSelected = true;
continue;
}

if (currentEligibleSelected) {
continue;
}

availableCount += 1;
const hoursSinceUsed = (now - account.lastUsed) / (1000 * 60 * 60);
let score =
healthScore * cfg.healthWeight +
tokensAvailable * cfg.tokenWeight +
hoursSinceUsed * cfg.freshnessWeight;
if (options?.pidOffsetEnabled) {
score += ((account.index * 0.131 + pidBonus) % 1) * cfg.freshnessWeight * 0.1;
}
if (availableCount === 1 || score > bestScore) {
bestScore = score;
selectedAccount = account;
}
}

if (!selectedAccount) {
selectedAccount = availableCount === 0 ? leastRecentlyUsedEnabled : null;
}

if (!selectedAccount) {
return {
explainability,
account: null,
};
}

this.currentAccountIndexByFamily[family] = selectedAccount.index;
this.cursorByFamily[family] = (selectedAccount.index + 1) % count;
selectedAccount.lastUsed = now;
return {
explainability,
account: selectedAccount,
};
}

setActiveIndex(index: number): ManagedAccount | null {
if (!Number.isFinite(index)) return null;
if (index < 0 || index >= this.accounts.length) return null;
Expand Down
35 changes: 28 additions & 7 deletions lib/request/fetch-helpers.ts
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ import { queuedRefresh } from "../refresh-queue.js";
import { logRequest, logError, logWarn } from "../logger.js";
import { getCodexInstructions, getModelFamily } from "../prompts/codex.js";
import { transformRequestBody, normalizeModel } from "./request-transformer.js";
import { convertSseToJson, ensureContentType } from "./response-handler.js";
import { convertSseToJsonDetailed, ensureContentType } from "./response-handler.js";
import type { UserConfig, RequestBody } from "../types.js";
import { CodexAuthError } from "../errors.js";
import { isRecord } from "../utils.js";
Expand Down Expand Up @@ -278,6 +278,11 @@ export interface ErrorDiagnostics {
httpStatus?: number;
}

export interface SuccessResponseDetails {
response: Response;
parsedJson?: unknown;
}

/**
* Determines if the current auth token needs to be refreshed
* @param auth - Current authentication state
Expand Down Expand Up @@ -590,6 +595,15 @@ export async function handleSuccessResponse(
isStreaming: boolean,
options?: { streamStallTimeoutMs?: number },
): Promise<Response> {
const details = await handleSuccessResponseDetailed(response, isStreaming, options);
return details.response;
}

export async function handleSuccessResponseDetailed(
response: Response,
isStreaming: boolean,
options?: { streamStallTimeoutMs?: number },
): Promise<SuccessResponseDetails> {
// Check for deprecation headers (RFC 8594)
const deprecation = response.headers.get("Deprecation");
const sunset = response.headers.get("Sunset");
Expand All @@ -601,15 +615,22 @@ export async function handleSuccessResponse(

// For non-streaming requests (generateText), convert SSE to JSON
if (!isStreaming) {
return await convertSseToJson(response, responseHeaders, options);
const converted = await convertSseToJsonDetailed(response, responseHeaders, options);
return {
response: converted.response,
parsedJson: converted.parsedResponse,
};
}

// For streaming requests (streamText), return stream as-is
return new Response(response.body, {
status: response.status,
statusText: response.statusText,
headers: responseHeaders,
});
return {
response: new Response(response.body, {
status: response.status,
statusText: response.statusText,
headers: responseHeaders,
}),
parsedJson: undefined,
};
}

async function safeReadBody(response: Response): Promise<string> {
Expand Down
60 changes: 58 additions & 2 deletions lib/request/helpers/tool-utils.ts
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,45 @@ export interface Tool {
function: ToolFunction;
}

const cleanedToolCache = new WeakMap<object, Tool>();
const cleanedToolArrayCache = new WeakMap<readonly unknown[], unknown>();

function cloneJsonLike(value: unknown): unknown {
if (value === null) return null;
if (value === undefined) return undefined;
if (
typeof value === "string" ||
typeof value === "number" ||
typeof value === "boolean"
) {
return value;
}

if (Array.isArray(value)) {
return value.map((item) => {
const cloned = cloneJsonLike(item);
return cloned === undefined ? null : cloned;
});
}

if (typeof value === "object") {
const withJson = value as { toJSON?: () => unknown };
if (typeof withJson.toJSON === "function") {
return cloneJsonLike(withJson.toJSON());
}
const output: Record<string, unknown> = {};
for (const [key, item] of Object.entries(value as Record<string, unknown>)) {
const cloned = cloneJsonLike(item);
if (cloned !== undefined) {
output[key] = cloned;
}
}
return output;
}

return undefined;
}

/**
* Cleans up tool definitions to ensure strict JSON Schema compliance.
*
Expand All @@ -30,19 +69,36 @@ export interface Tool {
export function cleanupToolDefinitions(tools: unknown): unknown {
if (!Array.isArray(tools)) return tools;

return tools.map((tool) => {
const cachedArray = cleanedToolArrayCache.get(tools);
if (cachedArray) {
return cachedArray;
}

const cleaned = tools.map((tool) => {
if (tool?.type !== "function" || !tool.function) {
return tool;
}

const cachedTool = cleanedToolCache.get(tool);
if (cachedTool) {
return cachedTool;
}

// Clone to avoid mutating original
const cleanedTool = JSON.parse(JSON.stringify(tool));
const cloned = cloneJsonLike(tool);
if (!cloned || typeof cloned !== "object") {
return tool;
}
const cleanedTool = cloned as Tool;
if (cleanedTool.function.parameters) {
cleanupSchema(cleanedTool.function.parameters);
}
cleanedToolCache.set(tool, cleanedTool);

return cleanedTool;
});
cleanedToolArrayCache.set(tools, cleaned);
return cleaned;
}

/**
Expand Down
Loading