Skip to content

Commit 8de95ec

Browse files
committed
Preserve Copilot quota snapshots for router dashboard
Keep router usage totals populated when Copilot surfaces quota data through streaming terminal events or websocket Responses completions instead of HTTP headers. Preserve last-known-good snapshot fields so later empty headers do not erase valid dashboard data. Constraint: Dashboard must show practical used/total quota for all routed instances after live SSE traffic. Rejected: UI-only placeholder handling | It would mask missing upstream quota propagation instead of fixing the data path. Confidence: high Scope-risk: moderate Directive: Keep quota snapshot parsing centralized in router/lib.ts and do not clear good fields with empty upstream responses. Tested: bun run lint:all --fix; bun run build; bun test; bun run typecheck; live 4140 requests verified 6/6 dashboard instances have u/t values. Not-tested: none
1 parent 64c2b00 commit 8de95ec

12 files changed

Lines changed: 748 additions & 46 deletions

router/lib.ts

Lines changed: 87 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -206,6 +206,93 @@ export function parseUpstreamHeaderSnapshot(
206206
}
207207
}
208208

209+
interface CopilotQuotaSnapshotFields {
210+
entitlement: number
211+
overage: number
212+
remainingPercent: number
213+
resetAt: string
214+
}
215+
216+
const parseCopilotQuotaSnapshot = (
217+
value: unknown,
218+
): CopilotQuotaSnapshotFields | null => {
219+
if (!isRecord(value)) {
220+
return null
221+
}
222+
223+
const entitlement =
224+
typeof value.entitlement === "string" ?
225+
Number(value.entitlement)
226+
: Number.NaN
227+
const remainingPercent =
228+
typeof value.percent_remaining === "number" ?
229+
value.percent_remaining
230+
: Number.NaN
231+
const overage =
232+
typeof value.overage_count === "number" ? value.overage_count : Number.NaN
233+
const resetAt = typeof value.reset_date === "string" ? value.reset_date : null
234+
235+
if (
236+
!Number.isFinite(entitlement)
237+
|| !Number.isFinite(remainingPercent)
238+
|| !Number.isFinite(overage)
239+
|| entitlement < 0
240+
|| remainingPercent < 0
241+
|| remainingPercent > 100
242+
|| overage < 0
243+
|| !resetAt
244+
|| Number.isNaN(Date.parse(resetAt))
245+
) {
246+
return null
247+
}
248+
249+
return { entitlement, overage, remainingPercent, resetAt }
250+
}
251+
252+
export function parseUpstreamQuotaSnapshots(
253+
value: unknown,
254+
): UpstreamHeaderSnapshot {
255+
const snapshot: UpstreamHeaderSnapshot = {
256+
premiumUsage: null,
257+
sessionRateLimit: null,
258+
weeklyRateLimit: null,
259+
}
260+
261+
if (!isRecord(value)) {
262+
return snapshot
263+
}
264+
265+
const premium = parseCopilotQuotaSnapshot(value.premium_interactions)
266+
if (premium) {
267+
const used =
268+
premium.overage > 0 ?
269+
premium.entitlement + premium.overage
270+
: premium.entitlement
271+
- (premium.entitlement * premium.remainingPercent) / 100
272+
if (Number.isFinite(used) && used >= 0) {
273+
snapshot.premiumUsage = { used, total: premium.entitlement }
274+
}
275+
}
276+
277+
const session = parseCopilotQuotaSnapshot(value["5Hour-Session-RateLimits"])
278+
if (session) {
279+
snapshot.sessionRateLimit = {
280+
remaining: session.remainingPercent,
281+
resetAt: session.resetAt,
282+
}
283+
}
284+
285+
const weekly = parseCopilotQuotaSnapshot(value["Weekly-Session-RateLimits"])
286+
if (weekly) {
287+
snapshot.weeklyRateLimit = {
288+
remaining: weekly.remainingPercent,
289+
resetAt: weekly.resetAt,
290+
}
291+
}
292+
293+
return snapshot
294+
}
295+
209296
export function getBindingKey(
210297
sessionId: string | null,
211298
agent: string,

router/state.ts

Lines changed: 119 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@ import {
66
getHeaderValue,
77
isRecord,
88
parseUpstreamHeaderSnapshot,
9+
parseUpstreamQuotaSnapshots,
910
parseModelFromBody,
1011
parseModelIds,
1112
parseModelObjects,
@@ -89,6 +90,7 @@ export interface ProxyToOptions {
8990
context: ProxyContext
9091
logger: (line: string) => void
9192
fetchImpl?: typeof fetch
93+
onQuotaSnapshots?: (quotaSnapshots: unknown) => void
9294
}
9395

9496
export interface DashboardHandlerOptions {
@@ -128,12 +130,52 @@ const EMPTY_UPSTREAM_HEADER_SNAPSHOT: UpstreamHeaderSnapshot = {
128130
weeklyRateLimit: null,
129131
}
130132

133+
export function mergeUpstreamHeaderSnapshot(
134+
previous: UpstreamHeaderSnapshot | undefined,
135+
next: UpstreamHeaderSnapshot,
136+
): UpstreamHeaderSnapshot {
137+
if (!previous) {
138+
return next
139+
}
140+
141+
return {
142+
premiumUsage: next.premiumUsage ?? previous.premiumUsage,
143+
sessionRateLimit: next.sessionRateLimit ?? previous.sessionRateLimit,
144+
weeklyRateLimit: next.weeklyRateLimit ?? previous.weeklyRateLimit,
145+
}
146+
}
147+
131148
export function updateUpstreamHeaderSnapshot(
132149
state: StickyRouterState,
133150
port: number,
134151
headers: Headers,
135152
) {
136-
state.portHeaderSnapshots.set(port, parseUpstreamHeaderSnapshot(headers))
153+
const next = parseUpstreamHeaderSnapshot(headers)
154+
updateUpstreamSnapshot(state, port, next)
155+
}
156+
157+
export function updateUpstreamSnapshot(
158+
state: StickyRouterState,
159+
port: number,
160+
next: UpstreamHeaderSnapshot,
161+
) {
162+
const previous = state.portHeaderSnapshots.get(port)
163+
state.portHeaderSnapshots.set(
164+
port,
165+
mergeUpstreamHeaderSnapshot(previous, next),
166+
)
167+
}
168+
169+
export function updateUpstreamQuotaSnapshot(
170+
state: StickyRouterState,
171+
port: number,
172+
quotaSnapshots: unknown,
173+
) {
174+
updateUpstreamSnapshot(
175+
state,
176+
port,
177+
parseUpstreamQuotaSnapshots(quotaSnapshots),
178+
)
137179
}
138180

139181
export function getRemainingCooldownMs(
@@ -494,6 +536,69 @@ export function getInstanceName(
494536
return state.portToInstance.get(port)?.name || `:${port}`
495537
}
496538

539+
function observeResponsesSseQuotaSnapshots(
540+
body: ReadableStream<Uint8Array> | null,
541+
onQuotaSnapshots?: (quotaSnapshots: unknown) => void,
542+
): ReadableStream<Uint8Array> | null {
543+
if (!body || !onQuotaSnapshots) {
544+
return body
545+
}
546+
547+
const decoder = new TextDecoder()
548+
let buffered = ""
549+
550+
const inspectEvent = (eventText: string) => {
551+
const data = eventText
552+
.split(/\r?\n/u)
553+
.filter((line) => line.startsWith("data:"))
554+
.map((line) => line.slice("data:".length).trimStart())
555+
.join("\n")
556+
557+
if (!data || data === "[DONE]") {
558+
return
559+
}
560+
561+
try {
562+
const parsed = JSON.parse(data) as { copilot_quota_snapshots?: unknown }
563+
if (parsed.copilot_quota_snapshots) {
564+
onQuotaSnapshots(parsed.copilot_quota_snapshots)
565+
}
566+
} catch {
567+
return
568+
}
569+
}
570+
571+
const inspectText = (text: string, flush = false) => {
572+
buffered += text
573+
574+
for (;;) {
575+
const separator = buffered.search(/\r?\n\r?\n/u)
576+
if (separator === -1) break
577+
const eventText = buffered.slice(0, separator)
578+
const separatorLength = buffered[separator] === "\r" ? 4 : 2
579+
buffered = buffered.slice(separator + separatorLength)
580+
inspectEvent(eventText)
581+
}
582+
583+
if (flush && buffered.trim()) {
584+
inspectEvent(buffered)
585+
buffered = ""
586+
}
587+
}
588+
589+
return body.pipeThrough(
590+
new TransformStream<Uint8Array, Uint8Array>({
591+
transform(chunk, controller) {
592+
inspectText(decoder.decode(chunk, { stream: true }))
593+
controller.enqueue(chunk)
594+
},
595+
flush() {
596+
inspectText(decoder.decode(), true)
597+
},
598+
}),
599+
)
600+
}
601+
497602
export async function proxyTo(options: ProxyToOptions): Promise<Response> {
498603
const { port, context, logger } = options
499604
const fetchImpl = options.fetchImpl ?? fetch
@@ -509,7 +614,15 @@ export async function proxyTo(options: ProxyToOptions): Promise<Response> {
509614
body: req.method !== "GET" && req.method !== "HEAD" ? body : undefined,
510615
})
511616

512-
return new Response(upstream.body, {
617+
const responseBody =
618+
upstream.headers.get("content-type")?.includes("text/event-stream") ?
619+
observeResponsesSseQuotaSnapshots(
620+
upstream.body,
621+
options.onQuotaSnapshots,
622+
)
623+
: upstream.body
624+
625+
return new Response(responseBody, {
513626
status: upstream.status,
514627
headers: upstream.headers,
515628
})
@@ -709,6 +822,8 @@ async function handleNoModelRequest(
709822
context: { body: request.bodyText, req: request.req, url: request.url },
710823
logger: runtime.logger,
711824
fetchImpl: runtime.fetchImpl,
825+
onQuotaSnapshots: (quotaSnapshots) =>
826+
updateUpstreamQuotaSnapshot(runtime.state, port, quotaSnapshots),
712827
})
713828
applyCooldownOn429(runtime, proxied, {
714829
port,
@@ -777,6 +892,8 @@ async function handleModelRequest(
777892
context: { body: request.bodyText, req: request.req, url: request.url },
778893
logger: runtime.logger,
779894
fetchImpl: runtime.fetchImpl,
895+
onQuotaSnapshots: (quotaSnapshots) =>
896+
updateUpstreamQuotaSnapshot(runtime.state, result.port, quotaSnapshots),
780897
})
781898
applyCooldownOn429(runtime, proxied, {
782899
port: result.port,

src/routes/messages/anthropic-types.ts

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -206,6 +206,7 @@ export interface AnthropicMessageDeltaEvent {
206206
stop_reason?: AnthropicResponse["stop_reason"]
207207
stop_sequence?: string | null
208208
}
209+
copilot_quota_snapshots?: unknown
209210
usage?: {
210211
input_tokens?: number
211212
output_tokens: number

src/routes/messages/responses-stream-translation.ts

Lines changed: 13 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -505,17 +505,20 @@ const handleResponseCompleted = (
505505
hasToolCall: state.hasToolCall,
506506
toolSearchName: state.toolSearchName,
507507
})
508-
events.push(
509-
{
510-
type: "message_delta",
511-
delta: {
512-
stop_reason: anthropic.stop_reason,
513-
stop_sequence: anthropic.stop_sequence,
514-
},
515-
usage: anthropic.usage,
508+
const messageDelta: AnthropicStreamEventData = {
509+
type: "message_delta",
510+
delta: {
511+
stop_reason: anthropic.stop_reason,
512+
stop_sequence: anthropic.stop_sequence,
516513
},
517-
{ type: "message_stop" },
518-
)
514+
usage: anthropic.usage,
515+
}
516+
517+
if ("copilot_quota_snapshots" in rawEvent) {
518+
messageDelta.copilot_quota_snapshots = rawEvent.copilot_quota_snapshots
519+
}
520+
521+
events.push(messageDelta, { type: "message_stop" })
519522
state.messageCompleted = true
520523
return events
521524
}

0 commit comments

Comments
 (0)