Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
138 changes: 134 additions & 4 deletions src/routes/messages/handler.ts
Original file line number Diff line number Diff line change
Expand Up @@ -6,11 +6,24 @@ import { streamSSE } from "hono/streaming"
import { awaitApproval } from "~/lib/approval"
import { checkRateLimit } from "~/lib/rate-limit"
import { state } from "~/lib/state"
import {
createResponsesStreamState,
translateResponsesStreamEvent,
} from "~/routes/messages/responses-stream-translation"
import {
translateAnthropicMessagesToResponsesPayload,
translateResponsesResultToAnthropic,
} from "~/routes/messages/responses-translation"
import { getResponsesRequestOptions } from "~/routes/responses/utils"
import {
createChatCompletions,
type ChatCompletionChunk,
type ChatCompletionResponse,
} from "~/services/copilot/create-chat-completions"
import {
createResponses,
type ResponsesResult,
} from "~/services/copilot/create-responses"

import {
type AnthropicMessagesPayload,
Expand All @@ -28,16 +41,31 @@ export async function handleCompletion(c: Context) {
const anthropicPayload = await c.req.json<AnthropicMessagesPayload>()
consola.debug("Anthropic request payload:", JSON.stringify(anthropicPayload))

const useResponsesApi = shouldUseResponsesApi(anthropicPayload.model)

if (state.manualApprove) {
await awaitApproval()
}

if (useResponsesApi) {
return await handleWithResponsesApi(c, anthropicPayload)
}

return await handleWithChatCompletions(c, anthropicPayload)
}

const RESPONSES_ENDPOINT = "/responses"

const handleWithChatCompletions = async (
c: Context,
anthropicPayload: AnthropicMessagesPayload,
) => {
const openAIPayload = translateToOpenAI(anthropicPayload)
consola.debug(
"Translated OpenAI request payload:",
JSON.stringify(openAIPayload),
)

if (state.manualApprove) {
await awaitApproval()
}

const response = await createChatCompletions(openAIPayload)

if (isNonStreaming(response)) {
Expand Down Expand Up @@ -86,6 +114,108 @@ export async function handleCompletion(c: Context) {
})
}

const handleWithResponsesApi = async (
c: Context,
anthropicPayload: AnthropicMessagesPayload,
) => {
const responsesPayload =
translateAnthropicMessagesToResponsesPayload(anthropicPayload)
consola.debug(
"Translated Responses payload:",
JSON.stringify(responsesPayload),
)

const { vision, initiator } = getResponsesRequestOptions(responsesPayload)
const response = await createResponses(responsesPayload, {
vision,
initiator,
})

if (responsesPayload.stream && isAsyncIterable(response)) {
consola.debug("Streaming response from Copilot (Responses API)")
return streamSSE(c, async (stream) => {
const streamState = createResponsesStreamState()

for await (const chunk of response) {
consola.debug("Responses raw stream event:", JSON.stringify(chunk))

const eventName = (chunk as { event?: string }).event
if (eventName === "ping") {
await stream.writeSSE({ event: "ping", data: "" })
continue
}

const data = (chunk as { data?: string }).data
if (!data) {
continue
}

if (data === "[DONE]") {
break
}

const parsed = safeJsonParse(data)
if (!parsed) {
continue
}

const events = translateResponsesStreamEvent(parsed, streamState)
for (const event of events) {
consola.debug("Translated Anthropic event:", JSON.stringify(event))
await stream.writeSSE({
event: event.type,
data: JSON.stringify(event),
})
}
}

if (!streamState.messageCompleted) {
consola.warn(
"Responses stream ended without completion; sending fallback message_stop",
)
const fallback = { type: "message_stop" as const }
await stream.writeSSE({
event: fallback.type,
data: JSON.stringify(fallback),
})
}
})
}

consola.debug(
"Non-streaming Responses result:",
JSON.stringify(response).slice(-400),
)
const anthropicResponse = translateResponsesResultToAnthropic(
response as ResponsesResult,
)
consola.debug(
"Translated Anthropic response:",
JSON.stringify(anthropicResponse),
)
return c.json(anthropicResponse)
}

const shouldUseResponsesApi = (modelId: string): boolean => {
const selectedModel = state.models?.data.find((model) => model.id === modelId)
return (
selectedModel?.supported_endpoints?.includes(RESPONSES_ENDPOINT) ?? false
)
}

const isNonStreaming = (
response: Awaited<ReturnType<typeof createChatCompletions>>,
): response is ChatCompletionResponse => Object.hasOwn(response, "choices")

const isAsyncIterable = <T>(value: unknown): value is AsyncIterable<T> =>
Boolean(value)
&& typeof (value as AsyncIterable<T>)[Symbol.asyncIterator] === "function"

const safeJsonParse = (value: string): Record<string, unknown> | undefined => {
try {
return JSON.parse(value) as Record<string, unknown>
} catch (error) {
consola.warn("Failed to parse Responses stream chunk:", value, error)
return undefined
}
}
Loading