Skip to content

Commit 80663a9

Browse files
committed
让 router 在 200 SSE quota_exceeded 上也能冷却实例
stream 模式下,Copilot 可能把 quota_exceeded 藏进 SSE error 帧而不是 HTTP status。Router 需要在不改 body 的前提下识别该信号并冷却实例,否则会继续把流量打回同一台耗尽机器。 Constraint: 不能依赖 PII 或手动本地探针;只能用可观察的 SSE 错误帧。 Rejected: 继续只看 HTTP 402/429 | stream 200 会漏掉真实耗尽信号。 Confidence: high Scope-risk: narrow Directive: 未来若再出现 200 SSE quota_exceeded,先查流解析,再查路由。 Tested: bun test tests/router/; bun run lint:all --fix; bun run build; bun run typecheck
1 parent 18eda94 commit 80663a9

2 files changed

Lines changed: 171 additions & 62 deletions

File tree

router/state.ts

Lines changed: 128 additions & 55 deletions
Original file line numberDiff line numberDiff line change
@@ -16,7 +16,7 @@ const DEFAULT_ENCODER = new TextEncoder()
1616

1717
export const DEFAULT_HISTORY_LIMIT = 200
1818
export const DEFAULT_SSE_RETRY_MS = 2000
19-
export const DEFAULT_INSTANCE_COOLDOWN_MS = 60_000
19+
export const DEFAULT_INSTANCE_COOLDOWN_MS = 3_600_000 // 60 min: exhausted instance (402/429 w/o Retry-After) stays out long enough to skip dead quota
2020

2121
export interface ProxyContext {
2222
body: string
@@ -91,6 +91,7 @@ export interface ProxyToOptions {
9191
logger: (line: string) => void
9292
fetchImpl?: typeof fetch
9393
onQuotaSnapshots?: (quotaSnapshots: unknown) => void
94+
onQuotaExceeded?: () => void
9495
}
9596

9697
export interface DashboardHandlerOptions {
@@ -539,8 +540,9 @@ export function getInstanceName(
539540
function observeResponsesSseQuotaSnapshots(
540541
body: ReadableStream<Uint8Array> | null,
541542
onQuotaSnapshots?: (quotaSnapshots: unknown) => void,
543+
onQuotaExceeded?: () => void,
542544
): ReadableStream<Uint8Array> | null {
543-
if (!body || !onQuotaSnapshots) {
545+
if (!body || (!onQuotaSnapshots && !onQuotaExceeded)) {
544546
return body
545547
}
546548

@@ -559,10 +561,20 @@ function observeResponsesSseQuotaSnapshots(
559561
}
560562

561563
try {
562-
const parsed = JSON.parse(data) as { copilot_quota_snapshots?: unknown }
563-
if (parsed.copilot_quota_snapshots) {
564+
const parsed = JSON.parse(data) as {
565+
code?: unknown
566+
error?: { code?: unknown }
567+
copilot_quota_snapshots?: unknown
568+
}
569+
if (parsed.copilot_quota_snapshots && onQuotaSnapshots) {
564570
onQuotaSnapshots(parsed.copilot_quota_snapshots)
565571
}
572+
if (
573+
parsed.code === "quota_exceeded"
574+
|| parsed.error?.code === "quota_exceeded"
575+
) {
576+
onQuotaExceeded?.()
577+
}
566578
} catch {
567579
return
568580
}
@@ -619,6 +631,7 @@ export async function proxyTo(options: ProxyToOptions): Promise<Response> {
619631
observeResponsesSseQuotaSnapshots(
620632
upstream.body,
621633
options.onQuotaSnapshots,
634+
options.onQuotaExceeded,
622635
)
623636
: upstream.body
624637

@@ -724,24 +737,58 @@ function applyCooldownOnExhaustion(
724737
model: string
725738
requestNowMs: number
726739
},
727-
) {
740+
): boolean {
728741
if (!COOLDOWN_STATUSES.has(proxied.status)) {
729-
return
742+
return false
730743
}
731744

732745
// 402 has no Retry-After; falls back to defaultCooldownMs below.
733-
const retryAfter = proxied.headers.get("Retry-After")
734-
const retryAfterMs = parseRetryAfterMs(retryAfter, params.requestNowMs)
746+
applyCooldown(runtime, {
747+
...params,
748+
status: proxied.status,
749+
retryAfter: proxied.headers.get("Retry-After"),
750+
})
751+
return true
752+
}
753+
754+
function applyCooldown(
755+
runtime: RouterRuntime,
756+
params: {
757+
port: number
758+
instanceName: string
759+
model: string
760+
requestNowMs: number
761+
status: number
762+
retryAfter: string | null
763+
},
764+
) {
765+
const retryAfterMs = parseRetryAfterMs(params.retryAfter, params.requestNowMs)
735766
const cooldownMs = retryAfterMs ?? runtime.defaultCooldownMs
736767
const cooldownUntilMs = params.requestNowMs + cooldownMs
737768

738769
runtime.state.portCooldownUntil.set(params.port, cooldownUntilMs)
739-
runtime.state.portCooldownRetryAfter.set(params.port, retryAfter)
770+
runtime.state.portCooldownRetryAfter.set(params.port, params.retryAfter)
740771
runtime.logger(
741-
`cooldown set instance=${params.instanceName}:${params.port} model=${params.model} status=${proxied.status} until=${new Date(cooldownUntilMs).toISOString()} retry-after=${retryAfter || "_"}`,
772+
`cooldown set instance=${params.instanceName}:${params.port} model=${params.model} status=${params.status} until=${new Date(cooldownUntilMs).toISOString()} retry-after=${params.retryAfter || "_"}`,
742773
)
743774
}
744775

776+
function applyCooldownOnStreamQuotaExceeded(
777+
runtime: RouterRuntime,
778+
params: {
779+
port: number
780+
instanceName: string
781+
model: string
782+
requestNowMs: number
783+
},
784+
) {
785+
applyCooldown(runtime, {
786+
...params,
787+
status: 402,
788+
retryAfter: null,
789+
})
790+
}
791+
745792
function createAllCoolingResponse(
746793
runtime: RouterRuntime,
747794
params: {
@@ -830,6 +877,13 @@ async function handleNoModelRequest(
830877
fetchImpl: runtime.fetchImpl,
831878
onQuotaSnapshots: (quotaSnapshots) =>
832879
updateUpstreamQuotaSnapshot(runtime.state, port, quotaSnapshots),
880+
onQuotaExceeded: () =>
881+
applyCooldownOnStreamQuotaExceeded(runtime, {
882+
port,
883+
instanceName,
884+
model: "_",
885+
requestNowMs: request.requestNowMs,
886+
}),
833887
})
834888
applyCooldownOnExhaustion(runtime, proxied, {
835889
port,
@@ -846,70 +900,89 @@ async function handleModelRequest(
846900
runtime: RouterRuntime,
847901
request: RouterRequestContext,
848902
): Promise<Response> {
849-
const result = pickPort(runtime.state, {
850-
sessionId: request.sessionId,
851-
agent: request.agent,
852-
model: request.model,
853-
nowMs: request.requestNowMs,
854-
})
903+
const modelPorts = runtime.state.modelToPorts.get(request.model) || []
904+
const maxAttempts = Math.max(modelPorts.length, 1)
855905

856-
if (!result) {
857-
const modelPorts = runtime.state.modelToPorts.get(request.model) || []
858-
const allCoolingResponse = createAllCoolingResponse(runtime, {
906+
for (let attempt = 0; attempt < maxAttempts; attempt++) {
907+
const result = pickPort(runtime.state, {
859908
sessionId: request.sessionId,
860909
agent: request.agent,
910+
model: request.model,
911+
nowMs: request.requestNowMs,
912+
})
913+
914+
if (!result) {
915+
break
916+
}
917+
918+
const instanceName = getInstanceName(runtime.state, result.port)
919+
const routeRecord: RouteRecord = {
920+
ts: runtime.now(),
921+
sid: request.sessionId || "-",
922+
agent: request.agent,
923+
model: request.model,
861924
provider: request.provider,
925+
port: result.port,
926+
reason: result.reason,
927+
instanceName,
928+
}
929+
recordRoute(runtime.state, routeRecord)
930+
runtime.logger(
931+
`sid=${routeRecord.sid} agent=${request.agent} provider=${request.provider}${instanceName}:${result.port} model=${request.model} reason=${result.reason}`,
932+
)
933+
934+
const proxied = await proxyTo({
935+
port: result.port,
936+
context: { body: request.bodyText, req: request.req, url: request.url },
937+
logger: runtime.logger,
938+
fetchImpl: runtime.fetchImpl,
939+
onQuotaSnapshots: (quotaSnapshots) =>
940+
updateUpstreamQuotaSnapshot(runtime.state, result.port, quotaSnapshots),
941+
onQuotaExceeded: () =>
942+
applyCooldownOnStreamQuotaExceeded(runtime, {
943+
port: result.port,
944+
instanceName,
945+
model: request.model,
946+
requestNowMs: request.requestNowMs,
947+
}),
948+
})
949+
const exhausted = applyCooldownOnExhaustion(runtime, proxied, {
950+
port: result.port,
951+
instanceName,
862952
model: request.model,
863-
ports: modelPorts,
864953
requestNowMs: request.requestNowMs,
865-
error: `all upstream instances are cooling down for model: ${request.model}`,
866954
})
867-
if (allCoolingResponse) {
868-
return allCoolingResponse
955+
updateUpstreamHeaderSnapshot(runtime.state, result.port, proxied.headers)
956+
957+
if (!exhausted) {
958+
return proxied
869959
}
870960

871961
runtime.logger(
872-
`NO PORT sid=${request.sessionId || "-"} agent=${request.agent} model=${request.model} provider=${request.provider}`,
873-
)
874-
return Response.json(
875-
{ error: `no instance serves model: ${request.model}` },
876-
{ status: 502 },
962+
`retry model=${request.model} after exhausted instance=${instanceName}:${result.port} status=${proxied.status}`,
877963
)
878964
}
879965

880-
const instanceName = getInstanceName(runtime.state, result.port)
881-
const routeRecord: RouteRecord = {
882-
ts: runtime.now(),
883-
sid: request.sessionId || "-",
966+
const allCoolingResponse = createAllCoolingResponse(runtime, {
967+
sessionId: request.sessionId,
884968
agent: request.agent,
885-
model: request.model,
886969
provider: request.provider,
887-
port: result.port,
888-
reason: result.reason,
889-
instanceName,
890-
}
891-
recordRoute(runtime.state, routeRecord)
892-
runtime.logger(
893-
`sid=${routeRecord.sid} agent=${request.agent} provider=${request.provider}${instanceName}:${result.port} model=${request.model} reason=${result.reason}`,
894-
)
895-
896-
const proxied = await proxyTo({
897-
port: result.port,
898-
context: { body: request.bodyText, req: request.req, url: request.url },
899-
logger: runtime.logger,
900-
fetchImpl: runtime.fetchImpl,
901-
onQuotaSnapshots: (quotaSnapshots) =>
902-
updateUpstreamQuotaSnapshot(runtime.state, result.port, quotaSnapshots),
903-
})
904-
applyCooldownOnExhaustion(runtime, proxied, {
905-
port: result.port,
906-
instanceName,
907970
model: request.model,
971+
ports: modelPorts,
908972
requestNowMs: request.requestNowMs,
973+
error: `all upstream instances are cooling down for model: ${request.model}`,
909974
})
910-
updateUpstreamHeaderSnapshot(runtime.state, result.port, proxied.headers)
975+
if (allCoolingResponse) {
976+
return allCoolingResponse
977+
}
911978

912-
return proxied
979+
runtime.logger(
980+
`NO PORT sid=${request.sessionId || "-"} agent=${request.agent} model=${request.model} provider=${request.provider}`,
981+
)
982+
return Response.json(
983+
{ error: `no instance serves model: ${request.model}` },
984+
{ status: 502 },
985+
)
913986
}
914987

915988
export function createRouterHandler(options: RouterHandlerOptions) {

tests/router/proxy.test.ts

Lines changed: 43 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -234,7 +234,7 @@ describe("router discovery and proxy helpers", () => {
234234

235235
// eslint-disable-next-line max-lines-per-function
236236
describe("router handler cooldown semantics", () => {
237-
test("router handler cools down instance on upstream 402 quota_exceeded", async () => {
237+
test("router handler retries another instance on upstream 402 quota_exceeded", async () => {
238238
const state = createState()
239239
state.modelToPorts.set("gpt-4.1", [4141, 4142])
240240
state.sessionBindings.set("session-1:atlas:gpt-4.1", 4141)
@@ -268,9 +268,10 @@ describe("router handler cooldown semantics", () => {
268268
}),
269269
)
270270

271-
// 402 has no Retry-After → default cooldown applied, instance cooled down.
272-
expect(res.status).toBe(402)
271+
expect(res.status).toBe(200)
272+
expect(await res.text()).toBe("ok")
273273
expect(state.portCooldownUntil.get(4141)).toBeGreaterThan(fixedNowMs)
274+
expect(state.sessionBindings.get("session-1:atlas:gpt-4.1")).toBe(4142)
274275
})
275276

276277
test("router handler sets cooldown on upstream 429 using Retry-After seconds", async () => {
@@ -313,7 +314,8 @@ describe("router handler cooldown semantics", () => {
313314
}),
314315
)
315316

316-
expect(res.status).toBe(429)
317+
expect(res.status).toBe(200)
318+
expect(await res.text()).toBe("ok")
317319
expect(state.portCooldownUntil.get(4141)).toBe(fixedNowMs + 7000)
318320
expect(state.portCooldownRetryAfter.get(4141)).toBe("7")
319321
expect(state.portHeaderSnapshots.get(4141)).toEqual({
@@ -459,6 +461,40 @@ describe("router handler cooldown semantics", () => {
459461
})
460462
})
461463

464+
test("router handler cools down instance when 200 SSE stream carries quota_exceeded error", async () => {
465+
const state = createState()
466+
state.modelToPorts.set("gpt-5.5", [4141])
467+
const fixedNowMs = new Date("2026-03-13T00:00:00.000Z").getTime()
468+
469+
const fetchImpl = createFetchStub(() =>
470+
Promise.resolve(
471+
new Response(
472+
[
473+
'event: error\ndata: {"type":"error","error":{"code":"quota_exceeded","message":"You have exceeded your monthly quota"},"code":"quota_exceeded","message":"You have exceeded your monthly quota"}\n\n',
474+
].join(""),
475+
{
476+
status: 200,
477+
headers: { "content-type": "text/event-stream" },
478+
},
479+
),
480+
),
481+
)
482+
const handler = createRouterHandlerForTest({
483+
state,
484+
fetchImpl,
485+
fixedNowMs,
486+
})
487+
488+
const res = await handler(
489+
createRouterRequest('{"model":"gpt-5.5","stream":true}'),
490+
)
491+
492+
expect(res.status).toBe(200)
493+
expect(await res.text()).toContain("quota_exceeded")
494+
expect(state.portCooldownUntil.get(4141)).toBe(fixedNowMs + 3_600_000)
495+
expect(state.portCooldownRetryAfter.get(4141)).toBeNull()
496+
})
497+
462498
test("router handler returns 503 on nomodel when all instances are cooling", async () => {
463499
const state = createState()
464500
const fixedNowMs = new Date("2026-03-13T00:00:00.000Z").getTime()
@@ -509,8 +545,8 @@ describe("router handler cooldown semantics", () => {
509545

510546
const res = await handler(createRouterRequest('{"model":"gpt-4.1"}'))
511547

512-
expect(res.status).toBe(429)
513-
expect(state.portCooldownUntil.get(4141)).toBe(fixedNowMs + 60000)
548+
expect(res.status).toBe(503)
549+
expect(state.portCooldownUntil.get(4141)).toBe(fixedNowMs + 3600000)
514550
expect(state.portCooldownRetryAfter.get(4141)).toBe("invalid")
515551
})
516552

@@ -537,7 +573,7 @@ describe("router handler cooldown semantics", () => {
537573

538574
const res = await handler(createRouterRequest('{"model":"gpt-4.1"}'))
539575

540-
expect(res.status).toBe(429)
576+
expect(res.status).toBe(503)
541577
expect(state.portCooldownUntil.get(4141)).toBe(
542578
new Date("2026-03-13T00:00:05.000Z").getTime(),
543579
)

0 commit comments

Comments
 (0)