Skip to content
Closed
Show file tree
Hide file tree
Changes from 5 commits
Commits
Show all changes
35 commits
Select commit Hold shift + click to select a range
91460f6
feat: Gemini API translation and streaming integration (squashed)
cuipengfei Sep 21, 2025
fa6b612
lint
cuipengfei Sep 21, 2025
edd2875
feat: route in server
cuipengfei Sep 21, 2025
bf5188e
feat: add idle timeout configuration for server
cuipengfei Sep 21, 2025
e6af851
Revert "lint"
cuipengfei Sep 21, 2025
e3e4cf5
fix input_tokens adaptation error
caozhiyuan Aug 28, 2025
acb4cf3
feature claude count token
caozhiyuan Aug 30, 2025
8b7d835
feat(start): add option to generate Claude Code environment variables
caozhiyuan Sep 2, 2025
ce6f058
fix: make usage property optional in AnthropicResponse and remove usa…
caozhiyuan Sep 24, 2025
e0c83ee
feature: token counting for different models
caozhiyuan Sep 10, 2025
8f9f449
Add comprehensive tests for content generation and translation features
cuipengfei Sep 26, 2025
faf03e6
trigger
cuipengfei Sep 26, 2025
8f21b6e
fix: update server import for fallback non-streaming in streaming tests
cuipengfei Sep 26, 2025
a57c238
feature gpt-5-codex responses api
caozhiyuan Sep 26, 2025
d9d9445
Merge pull request #8 from caozhiyuan/feature/count-token
cuipengfei Sep 28, 2025
87899a1
feat: enhance output type for function call and add content conversio…
caozhiyuan Sep 29, 2025
4fc0fa0
refactor: optimize content conversion logic in convertToolResultConte…
caozhiyuan Sep 29, 2025
574d47a
refactor: remove claudeCodeEnv option and update environment variable…
caozhiyuan Sep 30, 2025
2b9733b
refactor: remove unused function call output type and simplify respon…
caozhiyuan Sep 30, 2025
1df7e89
Merge pull request #9 from caozhiyuan/feature/gpt-5-codex
cuipengfei Sep 30, 2025
505f648
feat: add signature and reasoning handling to responses translation a…
caozhiyuan Sep 30, 2025
9477b45
feat: add signature to thinking messages and enhance reasoning struct…
caozhiyuan Sep 30, 2025
44551f9
refactor: remove summaryIndex from ResponsesStreamState and related h…
caozhiyuan Sep 30, 2025
708ae33
feat: enhance streaming response handling with ping mechanism
caozhiyuan Sep 30, 2025
1beeb72
feat: refactor tool translation and streaming handling for improved f…
cuipengfei Oct 1, 2025
47fb3e4
feat: responses translation add cache_read_input_tokens
caozhiyuan Oct 1, 2025
de096e8
refactor: remove unused tool response deduplication function and clea…
cuipengfei Oct 1, 2025
a54b7a6
refactor: update comments in ToolCallAccumulator for clarity and cons…
cuipengfei Oct 1, 2025
fcd2154
refactor: improve comments for clarity in test files
cuipengfei Oct 1, 2025
b287a85
Merge pull request #10 from caozhiyuan/feature/count-token
cuipengfei Oct 1, 2025
4a1f46f
Merge pull request #11 from caozhiyuan/feature/gpt-5-codex
cuipengfei Oct 1, 2025
6975118
Merge branch 'dev' into feature/gemini-api
cuipengfei Oct 1, 2025
4abb0a5
Update tests/generate-content/validation-and-routing.test.ts
cuipengfei Oct 1, 2025
7b3db7a
fix: ensure newline at end of file in server.ts
cuipengfei Oct 1, 2025
48a20c1
fix: handle missing model in token counting endpoint
cuipengfei Oct 1, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
335 changes: 335 additions & 0 deletions src/routes/generate-content/handler.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,335 @@
import type { Context } from "hono"
import type { SSEStreamingApi } from "hono/streaming"

import { streamSSE } from "hono/streaming"

import { awaitApproval } from "~/lib/approval"
import { checkRateLimit } from "~/lib/rate-limit"
import { state } from "~/lib/state"
import { getTokenCount } from "~/lib/tokenizer"
import {
createChatCompletions,
type ChatCompletionResponse,
type ChatCompletionChunk,
} from "~/services/copilot/create-chat-completions"

// Helper function to extract model from URL path
function extractModelFromUrl(url: string): string {
const match = url.match(/\/v1beta\/models\/([^:]+):/)
if (!match) {
throw new Error("Model name is required in URL path")
}
return match[1]
}

import {
translateGeminiToOpenAINonStream,
translateGeminiToOpenAIStream,
translateOpenAIToGemini,
translateGeminiCountTokensToOpenAI,
translateTokenCountToGemini,
translateOpenAIChunkToGemini,
} from "./translation"
import {
type GeminiRequest,
type GeminiCountTokensRequest,
type GeminiStreamResponse,
type GeminiResponse,
} from "./types"

// Standard generation endpoint
export async function handleGeminiGeneration(c: Context) {
const model = extractModelFromUrl(c.req.url)

if (!model) {
throw new Error("Model name is required in URL path")
}

await checkRateLimit(state)

const geminiPayload = await c.req.json<GeminiRequest>()

const openAIPayload = translateGeminiToOpenAINonStream(geminiPayload, model)

if (state.manualApprove) {
await awaitApproval()
}

const response = await createChatCompletions(openAIPayload)

if (isNonStreaming(response)) {
const geminiResponse = translateOpenAIToGemini(response)

return c.json(geminiResponse)
}

// This shouldn't happen for non-streaming endpoint
throw new Error("Unexpected streaming response for non-streaming endpoint")
}

// Helper function to handle non-streaming response conversion
function handleNonStreamingToStreaming(
c: Context,
geminiResponse: GeminiResponse,
) {
return streamSSE(c, async (stream) => {
try {
const firstPart = geminiResponse.candidates[0]?.content?.parts?.[0]
// eslint-disable-next-line @typescript-eslint/no-unnecessary-condition
const hasTextContent = firstPart && "text" in firstPart

// eslint-disable-next-line unicorn/prefer-ternary
if (hasTextContent) {
await sendTextInChunks(stream, firstPart.text, geminiResponse)
} else {
await sendFallbackResponse(stream, geminiResponse)
}

// Add a small delay to ensure all data is flushed
await new Promise((resolve) => setTimeout(resolve, 50))
} catch (error) {
console.error("[GEMINI_STREAM] Error in non-streaming conversion", error)
} finally {
try {
await stream.close()
} catch (closeError) {
console.error(
"[GEMINI_STREAM] Error closing non-streaming conversion stream",
closeError,
)
}
}
})
}

// Helper function to send text in chunks with configuration object
async function sendTextInChunks(
stream: SSEStreamingApi,
text: string,
geminiResponse: GeminiResponse,
) {
const chunkSize = Math.max(1, Math.min(50, text.length))
let lastWritePromise: Promise<void> = Promise.resolve()

for (let i = 0; i < text.length; i += chunkSize) {
const chunk = text.slice(i, i + chunkSize)
const isLast = i + chunkSize >= text.length
const streamResponse: GeminiStreamResponse = {
candidates: [
{
content: {
parts: [{ text: chunk }],
role: "model",
},
finishReason:
isLast ? geminiResponse.candidates[0]?.finishReason : undefined,
index: 0,
},
],
...(isLast && geminiResponse.usageMetadata ?
{ usageMetadata: geminiResponse.usageMetadata }
: {}),
}

// Wait for previous write to complete before writing new chunk
await lastWritePromise
lastWritePromise = stream.writeSSE({
data: JSON.stringify(streamResponse),
})
}

// Wait for final write to complete
await lastWritePromise
}

// Helper function to send fallback response
async function sendFallbackResponse(
stream: SSEStreamingApi,
geminiResponse: GeminiResponse,
) {
const streamResponse: GeminiStreamResponse = {
candidates: geminiResponse.candidates,
usageMetadata: geminiResponse.usageMetadata,
}

await stream.writeSSE({ data: JSON.stringify(streamResponse) })
}

// Accumulative JSON parser for handling incomplete chunks (based on LiteLLM research)
class StreamingJSONParser {
private accumulatedData = ""
private parseMode: "direct" | "accumulated" = "direct"

parseChunk(rawData: string): unknown {
if (this.parseMode === "direct") {
try {
return JSON.parse(rawData)
} catch {
// Switch to accumulated mode on first failure (LiteLLM pattern)
this.parseMode = "accumulated"
this.accumulatedData = rawData
return null
}
} else {
// Accumulated mode - keep building until valid JSON
this.accumulatedData += rawData
try {
const result = JSON.parse(this.accumulatedData) as unknown
// Success - reset for next chunk
this.accumulatedData = ""
this.parseMode = "direct" // Can switch back to direct mode
return result
} catch {
// Continue accumulating
return null
}
}
}
}

// Global parser instance for the stream
// let streamParser = new StreamingJSONParser()

// Helper function to process chunk and write to stream
async function processAndWriteChunk(params: {
rawEvent: { data?: string }
stream: SSEStreamingApi
lastWritePromise: Promise<void>
streamParser: StreamingJSONParser
}): Promise<{ newWritePromise: Promise<void>; hasFinishReason: boolean }> {
const { rawEvent, stream, lastWritePromise, streamParser } = params

if (!rawEvent.data) {
return { newWritePromise: lastWritePromise, hasFinishReason: false }
}

try {
const chunk = streamParser.parseChunk(rawEvent.data)

// If parser returns null, we're still accumulating
if (!chunk) {
return { newWritePromise: lastWritePromise, hasFinishReason: false }
}

const geminiChunk = translateOpenAIChunkToGemini(
chunk as ChatCompletionChunk,
)

if (geminiChunk) {
// Check if this chunk contains a finish reason
const chunkHasFinishReason = geminiChunk.candidates.some(
(c) => c.finishReason && c.finishReason !== "FINISH_REASON_UNSPECIFIED",
)

// Wait for previous write to complete before writing new chunk
await lastWritePromise
const newWritePromise = stream.writeSSE({
data: JSON.stringify(geminiChunk),
})

return { newWritePromise, hasFinishReason: chunkHasFinishReason }
} else {
return { newWritePromise: lastWritePromise, hasFinishReason: false }
}
} catch (parseError) {
console.error("[GEMINI_STREAM] Error parsing chunk", parseError)
return { newWritePromise: lastWritePromise, hasFinishReason: false }
}
}

// Helper function to handle streaming response processing
function handleStreamingResponse(
c: Context,
response: AsyncIterable<{ data?: string }>,
) {
return streamSSE(c, async (stream) => {
// Create a parser instance for this stream (each request gets its own parser)
const streamParser = new StreamingJSONParser()
let lastWritePromise: Promise<void> = Promise.resolve()

try {
for await (const rawEvent of response) {
if (rawEvent.data === "[DONE]") {
break
}

const result = await processAndWriteChunk({
rawEvent,
stream,
lastWritePromise,
streamParser,
})
lastWritePromise = result.newWritePromise
}

// Wait for all writes to complete before closing
await lastWritePromise

// Add a small delay to ensure all data is flushed
await new Promise((resolve) => setTimeout(resolve, 50))
} catch (error) {
console.error("[GEMINI_STREAM] Error in streaming processing", error)
// Ensure we don't leave the stream hanging
} finally {
// Always close the stream, but with proper cleanup
try {
await stream.close()
} catch (closeError) {
console.error("[GEMINI_STREAM] Error closing stream", closeError)
}
}
})
}

// Streaming generation endpoint
export async function handleGeminiStreamGeneration(c: Context) {
const model = extractModelFromUrl(c.req.url)

if (!model) {
throw new Error("Model name is required in URL path")
}

await checkRateLimit(state)

const geminiPayload = await c.req.json<GeminiRequest>()

const openAIPayload = translateGeminiToOpenAIStream(geminiPayload, model)

if (state.manualApprove) {
await awaitApproval()
}

const response = await createChatCompletions(openAIPayload)

if (isNonStreaming(response)) {
const geminiResponse = translateOpenAIToGemini(response)

return handleNonStreamingToStreaming(c, geminiResponse)
}

return handleStreamingResponse(c, response)
}

// Token counting endpoint
export async function handleGeminiCountTokens(c: Context) {
const model = extractModelFromUrl(c.req.url)

if (!model) {
throw new Error("Model name is required in URL path")
}

const geminiPayload = await c.req.json<GeminiCountTokensRequest>()

const openAIPayload = translateGeminiCountTokensToOpenAI(geminiPayload, model)

const tokenCounts = getTokenCount(openAIPayload.messages)

const totalTokens = tokenCounts.input + tokenCounts.output
const geminiResponse = translateTokenCountToGemini(totalTokens)

return c.json(geminiResponse)
}

const isNonStreaming = (
response: Awaited<ReturnType<typeof createChatCompletions>>,
): response is ChatCompletionResponse => "choices" in response
Comment on lines +320 to +322
Copy link

Copilot AI Sep 21, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The function definition is incomplete - it's missing the closing parenthesis and function body. This will cause a syntax error.

Copilot uses AI. Check for mistakes.
58 changes: 58 additions & 0 deletions src/routes/generate-content/route.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,58 @@
import { Hono } from "hono"

import { forwardError } from "~/lib/error"

import {
handleGeminiGeneration,
handleGeminiStreamGeneration,
handleGeminiCountTokens,
} from "./handler"

const router = new Hono()

// Streaming generation endpoint
// POST /v1beta/models/{model}:streamGenerateContent
router.post("/v1beta/models/*", async (c, next) => {
const url = c.req.url
if (url.includes(":streamGenerateContent")) {
try {
return await handleGeminiStreamGeneration(c)
} catch (error) {
return await forwardError(c, error)
}
}
await next()
})

// Token counting endpoint
// POST /v1beta/models/{model}:countTokens
router.post("/v1beta/models/*", async (c, next) => {
const url = c.req.url
if (url.includes(":countTokens")) {
try {
return await handleGeminiCountTokens(c)
} catch (error) {
return await forwardError(c, error)
}
}
await next()
})

// Standard generation endpoint
// POST /v1beta/models/{model}:generateContent
router.post("/v1beta/models/*", async (c, next) => {
const url = c.req.url
if (
url.includes(":generateContent")
&& !url.includes(":streamGenerateContent")
) {
try {
return await handleGeminiGeneration(c)
} catch (error) {
return await forwardError(c, error)
}
}
await next()
})

export { router as geminiRouter }
Loading