Skip to content

Commit dcd1aff

Browse files
committed
feat: refactor rate limit logging and implement provider responses handler
1 parent 8e73c65 commit dcd1aff

5 files changed

Lines changed: 196 additions & 380 deletions

File tree

src/lib/codex-rate-limit.ts

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -56,15 +56,14 @@ export const logCodexRateLimitsEvent = (event: unknown): void => {
5656
}
5757
summary.push(
5858
`used=${window.used_percent}%`,
59-
`window=${window.window_minutes}m`,
6059
`reset_at=${formatCodexRateLimitResetAt(window.reset_at)}`,
6160
)
6261

6362
const label =
6463
planType ?
6564
`Codex ${scope} rate limit (${planType})`
6665
: `Codex ${scope} rate limit`
67-
consola.info(`${label}: ${summary.join(", ")}`)
66+
consola.log(`${label}: ${summary.join(", ")}`)
6867
}
6968
}
7069

src/lib/copilot-rate-limit.ts

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -136,7 +136,7 @@ export const logCopilotQuotaSnapshots = (
136136
const logCopilotRateLimitUsage = (usage: CopilotRateLimitUsage): void => {
137137
const d = new Date(usage.resetAt)
138138
const dateStr = Number.isNaN(d.getTime()) ? usage.resetAt : d.toLocaleString()
139-
consola.info(
139+
consola.log(
140140
`Copilot ${usage.type} quota remaining: ${usage.remaining}, resets at: ${dateStr}`,
141141
)
142142
}
Lines changed: 193 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,193 @@
1+
import type { Context } from "hono"
2+
3+
import { events } from "fetch-event-stream"
4+
5+
import { HTTPError } from "~/lib/error"
6+
import { createHandlerLogger, debugJson } from "~/lib/logger"
7+
import { resolveProviderConfig } from "~/lib/provider-resolver"
8+
import { requestContext } from "~/lib/request-context"
9+
import {
10+
createProviderTokenUsageRecorder,
11+
normalizeResponsesUsage,
12+
type UsageTokens,
13+
} from "~/lib/token-usage"
14+
import { applyResponsesApiContextManagement } from "~/routes/responses/utils"
15+
import type {
16+
ResponsesPayload,
17+
ResponsesResult,
18+
ResponseStreamEvent,
19+
} from "~/services/copilot/create-responses"
20+
import {
21+
createStandardizedCodexResponsesEventStream,
22+
forwardCodexResponses,
23+
normalizeCodexResponsesEvent,
24+
} from "~/services/codex/create-responses"
25+
import { getModels as getCodexModels } from "~/services/codex/get-models"
26+
import {
27+
createProviderProxyResponse,
28+
forwardProviderResponses,
29+
} from "~/services/providers/provider-proxy"
30+
31+
const logger = createHandlerLogger("provider-responses-handler")
32+
33+
export async function handleProviderResponsesForProvider(
34+
c: Context,
35+
options: {
36+
payload: ResponsesPayload
37+
provider: string
38+
},
39+
): Promise<Response> {
40+
const { payload, provider } = options
41+
const providerConfig = await resolveProviderConfig(provider)
42+
if (providerConfig?.type !== "openai-responses") {
43+
return c.json(
44+
{
45+
error: {
46+
message: `Provider '${provider}' does not support the /v1/responses endpoint`,
47+
type: "invalid_request_error",
48+
},
49+
},
50+
400,
51+
)
52+
}
53+
54+
const selectedModel =
55+
providerConfig.name === "codex" ?
56+
getCodexModels().data.find((model) => model.id === payload.model)
57+
: undefined
58+
59+
applyResponsesApiContextManagement(
60+
payload,
61+
selectedModel?.capabilities.limits.max_prompt_tokens,
62+
)
63+
64+
const upstreamResponse =
65+
providerConfig.name === "codex" ?
66+
await forwardCodexResponses(
67+
payload,
68+
c.req.raw.headers,
69+
providerConfig.baseUrl,
70+
)
71+
: await forwardProviderResponses(providerConfig, payload, c.req.raw.headers)
72+
73+
if (!upstreamResponse.ok) {
74+
throw new HTTPError(
75+
`Failed to create ${provider} responses`,
76+
upstreamResponse,
77+
)
78+
}
79+
80+
const recordUsage = createProviderResponsesUsageRecorder(payload, provider)
81+
82+
if (payload.stream) {
83+
void recordProviderResponsesStreamUsage(upstreamResponse.clone(), {
84+
normalizeCodex: providerConfig.name === "codex",
85+
provider,
86+
recordUsage,
87+
}).catch((error) => {
88+
logger.warn("provider.responses.usage_stream_error", {
89+
provider,
90+
error: getErrorMessage(error),
91+
})
92+
})
93+
} else {
94+
const responseBody = (await upstreamResponse
95+
.clone()
96+
.json()) as ResponsesResult
97+
recordUsage(normalizeResponsesUsage(responseBody.usage))
98+
}
99+
100+
if (providerConfig.name === "codex" && payload.stream) {
101+
return createProviderProxyResponse(
102+
upstreamResponse,
103+
createStandardizedCodexResponsesEventStream(
104+
getResponsesEvents(upstreamResponse),
105+
),
106+
)
107+
}
108+
109+
return createProviderProxyResponse(upstreamResponse)
110+
}
111+
112+
const getErrorMessage = (error: unknown): string => {
113+
if (error instanceof Error && error.message) {
114+
return error.message
115+
}
116+
117+
return String(error)
118+
}
119+
120+
const createProviderResponsesUsageRecorder = (
121+
payload: ResponsesPayload,
122+
provider: string,
123+
): ((usage: UsageTokens) => void) => {
124+
const sessionAffinity =
125+
requestContext.getStore()?.sessionAffinity?.trim() || null
126+
127+
return createProviderTokenUsageRecorder({
128+
endpoint: "responses",
129+
model: payload.model,
130+
providerName: provider,
131+
sessionId: sessionAffinity ?? "",
132+
})
133+
}
134+
135+
const recordProviderResponsesStreamUsage = async (
136+
upstreamResponse: unknown,
137+
options: {
138+
normalizeCodex: boolean
139+
provider: string
140+
recordUsage: (usage: UsageTokens) => void
141+
},
142+
): Promise<void> => {
143+
let usage: UsageTokens = {}
144+
145+
try {
146+
for await (const chunk of getResponsesEvents(upstreamResponse)) {
147+
debugJson(logger, "Responses stream chunk:", chunk)
148+
if (!chunk.data || chunk.data === "[DONE]") {
149+
continue
150+
}
151+
152+
const parsed = parseProviderResponsesStreamEvent(chunk.data, {
153+
normalizeCodex: options.normalizeCodex,
154+
provider: options.provider,
155+
})
156+
if (
157+
parsed?.type === "response.completed"
158+
|| parsed?.type === "response.failed"
159+
|| parsed?.type === "response.incomplete"
160+
) {
161+
usage = normalizeResponsesUsage(parsed.response.usage)
162+
}
163+
}
164+
} finally {
165+
options.recordUsage(usage)
166+
}
167+
}
168+
169+
const parseProviderResponsesStreamEvent = (
170+
data: string,
171+
options: {
172+
normalizeCodex: boolean
173+
provider: string
174+
},
175+
): ResponseStreamEvent | null => {
176+
try {
177+
const parsed = JSON.parse(data) as ResponseStreamEvent
178+
return options.normalizeCodex ?
179+
normalizeCodexResponsesEvent(parsed)
180+
: parsed
181+
} catch (error) {
182+
logger.error("provider.responses.parse_chunk_error", {
183+
provider: options.provider,
184+
data,
185+
error,
186+
})
187+
return null
188+
}
189+
}
190+
191+
const getResponsesEvents = (response: unknown) => {
192+
return events(response as Parameters<typeof events>[0])
193+
}

0 commit comments

Comments
 (0)