-
-
Notifications
You must be signed in to change notification settings - Fork 638
Feature/gemini api #112
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Closed
Closed
Feature/gemini api #112
Changes from 5 commits
Commits
Show all changes
35 commits
Select commit
Hold shift + click to select a range
91460f6
feat: Gemini API translation and streaming integration (squashed)
cuipengfei fa6b612
lint
cuipengfei edd2875
feat: route in server
cuipengfei bf5188e
feat: add idle timeout configuration for server
cuipengfei e6af851
Revert "lint"
cuipengfei e3e4cf5
fix input_tokens adaptation error
caozhiyuan acb4cf3
feature claude count token
caozhiyuan 8b7d835
feat(start): add option to generate Claude Code environment variables
caozhiyuan ce6f058
fix: make usage property optional in AnthropicResponse and remove usa…
caozhiyuan e0c83ee
feature: token counting for different models
caozhiyuan 8f9f449
Add comprehensive tests for content generation and translation features
cuipengfei faf03e6
trigger
cuipengfei 8f21b6e
fix: update server import for fallback non-streaming in streaming tests
cuipengfei a57c238
feature gpt-5-codex responses api
caozhiyuan d9d9445
Merge pull request #8 from caozhiyuan/feature/count-token
cuipengfei 87899a1
feat: enhance output type for function call and add content conversio…
caozhiyuan 4fc0fa0
refactor: optimize content conversion logic in convertToolResultConte…
caozhiyuan 574d47a
refactor: remove claudeCodeEnv option and update environment variable…
caozhiyuan 2b9733b
refactor: remove unused function call output type and simplify respon…
caozhiyuan 1df7e89
Merge pull request #9 from caozhiyuan/feature/gpt-5-codex
cuipengfei 505f648
feat: add signature and reasoning handling to responses translation a…
caozhiyuan 9477b45
feat: add signature to thinking messages and enhance reasoning struct…
caozhiyuan 44551f9
refactor: remove summaryIndex from ResponsesStreamState and related h…
caozhiyuan 708ae33
feat: enhance streaming response handling with ping mechanism
caozhiyuan 1beeb72
feat: refactor tool translation and streaming handling for improved f…
cuipengfei 47fb3e4
feat: responses translation add cache_read_input_tokens
caozhiyuan de096e8
refactor: remove unused tool response deduplication function and clea…
cuipengfei a54b7a6
refactor: update comments in ToolCallAccumulator for clarity and cons…
cuipengfei fcd2154
refactor: improve comments for clarity in test files
cuipengfei b287a85
Merge pull request #10 from caozhiyuan/feature/count-token
cuipengfei 4a1f46f
Merge pull request #11 from caozhiyuan/feature/gpt-5-codex
cuipengfei 6975118
Merge branch 'dev' into feature/gemini-api
cuipengfei 4abb0a5
Update tests/generate-content/validation-and-routing.test.ts
cuipengfei 7b3db7a
fix: ensure newline at end of file in server.ts
cuipengfei 48a20c1
fix: handle missing model in token counting endpoint
cuipengfei File filter
Filter by extension
Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
There are no files selected for viewing
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,335 @@ | ||
| import type { Context } from "hono" | ||
| import type { SSEStreamingApi } from "hono/streaming" | ||
|
|
||
| import { streamSSE } from "hono/streaming" | ||
|
|
||
| import { awaitApproval } from "~/lib/approval" | ||
| import { checkRateLimit } from "~/lib/rate-limit" | ||
| import { state } from "~/lib/state" | ||
| import { getTokenCount } from "~/lib/tokenizer" | ||
| import { | ||
| createChatCompletions, | ||
| type ChatCompletionResponse, | ||
| type ChatCompletionChunk, | ||
| } from "~/services/copilot/create-chat-completions" | ||
|
|
||
| // Helper function to extract model from URL path | ||
| function extractModelFromUrl(url: string): string { | ||
| const match = url.match(/\/v1beta\/models\/([^:]+):/) | ||
| if (!match) { | ||
| throw new Error("Model name is required in URL path") | ||
| } | ||
| return match[1] | ||
| } | ||
|
|
||
| import { | ||
| translateGeminiToOpenAINonStream, | ||
| translateGeminiToOpenAIStream, | ||
| translateOpenAIToGemini, | ||
| translateGeminiCountTokensToOpenAI, | ||
| translateTokenCountToGemini, | ||
| translateOpenAIChunkToGemini, | ||
| } from "./translation" | ||
| import { | ||
| type GeminiRequest, | ||
| type GeminiCountTokensRequest, | ||
| type GeminiStreamResponse, | ||
| type GeminiResponse, | ||
| } from "./types" | ||
|
|
||
| // Standard generation endpoint | ||
| export async function handleGeminiGeneration(c: Context) { | ||
| const model = extractModelFromUrl(c.req.url) | ||
|
|
||
| if (!model) { | ||
| throw new Error("Model name is required in URL path") | ||
| } | ||
|
|
||
| await checkRateLimit(state) | ||
|
|
||
| const geminiPayload = await c.req.json<GeminiRequest>() | ||
|
|
||
| const openAIPayload = translateGeminiToOpenAINonStream(geminiPayload, model) | ||
|
|
||
| if (state.manualApprove) { | ||
| await awaitApproval() | ||
| } | ||
|
|
||
| const response = await createChatCompletions(openAIPayload) | ||
|
|
||
| if (isNonStreaming(response)) { | ||
| const geminiResponse = translateOpenAIToGemini(response) | ||
|
|
||
| return c.json(geminiResponse) | ||
| } | ||
|
|
||
| // This shouldn't happen for non-streaming endpoint | ||
| throw new Error("Unexpected streaming response for non-streaming endpoint") | ||
| } | ||
|
|
||
| // Helper function to handle non-streaming response conversion | ||
| function handleNonStreamingToStreaming( | ||
| c: Context, | ||
| geminiResponse: GeminiResponse, | ||
| ) { | ||
| return streamSSE(c, async (stream) => { | ||
| try { | ||
| const firstPart = geminiResponse.candidates[0]?.content?.parts?.[0] | ||
| // eslint-disable-next-line @typescript-eslint/no-unnecessary-condition | ||
| const hasTextContent = firstPart && "text" in firstPart | ||
|
|
||
| // eslint-disable-next-line unicorn/prefer-ternary | ||
| if (hasTextContent) { | ||
| await sendTextInChunks(stream, firstPart.text, geminiResponse) | ||
| } else { | ||
| await sendFallbackResponse(stream, geminiResponse) | ||
| } | ||
|
|
||
| // Add a small delay to ensure all data is flushed | ||
| await new Promise((resolve) => setTimeout(resolve, 50)) | ||
| } catch (error) { | ||
| console.error("[GEMINI_STREAM] Error in non-streaming conversion", error) | ||
| } finally { | ||
| try { | ||
| await stream.close() | ||
| } catch (closeError) { | ||
| console.error( | ||
| "[GEMINI_STREAM] Error closing non-streaming conversion stream", | ||
| closeError, | ||
| ) | ||
| } | ||
| } | ||
| }) | ||
| } | ||
|
|
||
| // Helper function to send text in chunks with configuration object | ||
| async function sendTextInChunks( | ||
| stream: SSEStreamingApi, | ||
| text: string, | ||
| geminiResponse: GeminiResponse, | ||
| ) { | ||
| const chunkSize = Math.max(1, Math.min(50, text.length)) | ||
| let lastWritePromise: Promise<void> = Promise.resolve() | ||
|
|
||
| for (let i = 0; i < text.length; i += chunkSize) { | ||
| const chunk = text.slice(i, i + chunkSize) | ||
| const isLast = i + chunkSize >= text.length | ||
| const streamResponse: GeminiStreamResponse = { | ||
| candidates: [ | ||
| { | ||
| content: { | ||
| parts: [{ text: chunk }], | ||
| role: "model", | ||
| }, | ||
| finishReason: | ||
| isLast ? geminiResponse.candidates[0]?.finishReason : undefined, | ||
| index: 0, | ||
| }, | ||
| ], | ||
| ...(isLast && geminiResponse.usageMetadata ? | ||
| { usageMetadata: geminiResponse.usageMetadata } | ||
| : {}), | ||
| } | ||
|
|
||
| // Wait for previous write to complete before writing new chunk | ||
| await lastWritePromise | ||
| lastWritePromise = stream.writeSSE({ | ||
| data: JSON.stringify(streamResponse), | ||
| }) | ||
| } | ||
|
|
||
| // Wait for final write to complete | ||
| await lastWritePromise | ||
| } | ||
|
|
||
| // Helper function to send fallback response | ||
| async function sendFallbackResponse( | ||
| stream: SSEStreamingApi, | ||
| geminiResponse: GeminiResponse, | ||
| ) { | ||
| const streamResponse: GeminiStreamResponse = { | ||
| candidates: geminiResponse.candidates, | ||
| usageMetadata: geminiResponse.usageMetadata, | ||
| } | ||
|
|
||
| await stream.writeSSE({ data: JSON.stringify(streamResponse) }) | ||
| } | ||
|
|
||
| // Accumulative JSON parser for handling incomplete chunks (based on LiteLLM research) | ||
| class StreamingJSONParser { | ||
| private accumulatedData = "" | ||
| private parseMode: "direct" | "accumulated" = "direct" | ||
|
|
||
| parseChunk(rawData: string): unknown { | ||
| if (this.parseMode === "direct") { | ||
| try { | ||
| return JSON.parse(rawData) | ||
| } catch { | ||
| // Switch to accumulated mode on first failure (LiteLLM pattern) | ||
| this.parseMode = "accumulated" | ||
| this.accumulatedData = rawData | ||
| return null | ||
| } | ||
| } else { | ||
| // Accumulated mode - keep building until valid JSON | ||
| this.accumulatedData += rawData | ||
| try { | ||
| const result = JSON.parse(this.accumulatedData) as unknown | ||
| // Success - reset for next chunk | ||
| this.accumulatedData = "" | ||
| this.parseMode = "direct" // Can switch back to direct mode | ||
| return result | ||
| } catch { | ||
| // Continue accumulating | ||
| return null | ||
| } | ||
| } | ||
| } | ||
| } | ||
|
|
||
| // Global parser instance for the stream | ||
| // let streamParser = new StreamingJSONParser() | ||
|
|
||
| // Helper function to process chunk and write to stream | ||
| async function processAndWriteChunk(params: { | ||
| rawEvent: { data?: string } | ||
| stream: SSEStreamingApi | ||
| lastWritePromise: Promise<void> | ||
| streamParser: StreamingJSONParser | ||
| }): Promise<{ newWritePromise: Promise<void>; hasFinishReason: boolean }> { | ||
| const { rawEvent, stream, lastWritePromise, streamParser } = params | ||
|
|
||
| if (!rawEvent.data) { | ||
| return { newWritePromise: lastWritePromise, hasFinishReason: false } | ||
| } | ||
|
|
||
| try { | ||
| const chunk = streamParser.parseChunk(rawEvent.data) | ||
|
|
||
| // If parser returns null, we're still accumulating | ||
| if (!chunk) { | ||
| return { newWritePromise: lastWritePromise, hasFinishReason: false } | ||
| } | ||
|
|
||
| const geminiChunk = translateOpenAIChunkToGemini( | ||
| chunk as ChatCompletionChunk, | ||
| ) | ||
|
|
||
| if (geminiChunk) { | ||
| // Check if this chunk contains a finish reason | ||
| const chunkHasFinishReason = geminiChunk.candidates.some( | ||
| (c) => c.finishReason && c.finishReason !== "FINISH_REASON_UNSPECIFIED", | ||
| ) | ||
|
|
||
| // Wait for previous write to complete before writing new chunk | ||
| await lastWritePromise | ||
| const newWritePromise = stream.writeSSE({ | ||
| data: JSON.stringify(geminiChunk), | ||
| }) | ||
|
|
||
| return { newWritePromise, hasFinishReason: chunkHasFinishReason } | ||
| } else { | ||
| return { newWritePromise: lastWritePromise, hasFinishReason: false } | ||
| } | ||
| } catch (parseError) { | ||
| console.error("[GEMINI_STREAM] Error parsing chunk", parseError) | ||
| return { newWritePromise: lastWritePromise, hasFinishReason: false } | ||
| } | ||
| } | ||
|
|
||
| // Helper function to handle streaming response processing | ||
| function handleStreamingResponse( | ||
| c: Context, | ||
| response: AsyncIterable<{ data?: string }>, | ||
| ) { | ||
| return streamSSE(c, async (stream) => { | ||
| // Create a parser instance for this stream (each request gets its own parser) | ||
| const streamParser = new StreamingJSONParser() | ||
| let lastWritePromise: Promise<void> = Promise.resolve() | ||
|
|
||
| try { | ||
| for await (const rawEvent of response) { | ||
| if (rawEvent.data === "[DONE]") { | ||
| break | ||
| } | ||
|
|
||
| const result = await processAndWriteChunk({ | ||
| rawEvent, | ||
| stream, | ||
| lastWritePromise, | ||
| streamParser, | ||
| }) | ||
| lastWritePromise = result.newWritePromise | ||
| } | ||
|
|
||
| // Wait for all writes to complete before closing | ||
| await lastWritePromise | ||
|
|
||
| // Add a small delay to ensure all data is flushed | ||
| await new Promise((resolve) => setTimeout(resolve, 50)) | ||
| } catch (error) { | ||
| console.error("[GEMINI_STREAM] Error in streaming processing", error) | ||
| // Ensure we don't leave the stream hanging | ||
| } finally { | ||
| // Always close the stream, but with proper cleanup | ||
| try { | ||
| await stream.close() | ||
| } catch (closeError) { | ||
| console.error("[GEMINI_STREAM] Error closing stream", closeError) | ||
| } | ||
| } | ||
| }) | ||
| } | ||
|
|
||
| // Streaming generation endpoint | ||
| export async function handleGeminiStreamGeneration(c: Context) { | ||
| const model = extractModelFromUrl(c.req.url) | ||
|
|
||
| if (!model) { | ||
| throw new Error("Model name is required in URL path") | ||
| } | ||
|
|
||
| await checkRateLimit(state) | ||
|
|
||
| const geminiPayload = await c.req.json<GeminiRequest>() | ||
|
|
||
| const openAIPayload = translateGeminiToOpenAIStream(geminiPayload, model) | ||
|
|
||
| if (state.manualApprove) { | ||
| await awaitApproval() | ||
| } | ||
|
|
||
| const response = await createChatCompletions(openAIPayload) | ||
|
|
||
| if (isNonStreaming(response)) { | ||
| const geminiResponse = translateOpenAIToGemini(response) | ||
|
|
||
| return handleNonStreamingToStreaming(c, geminiResponse) | ||
| } | ||
|
|
||
| return handleStreamingResponse(c, response) | ||
| } | ||
|
|
||
| // Token counting endpoint | ||
| export async function handleGeminiCountTokens(c: Context) { | ||
| const model = extractModelFromUrl(c.req.url) | ||
|
|
||
| if (!model) { | ||
| throw new Error("Model name is required in URL path") | ||
| } | ||
|
|
||
| const geminiPayload = await c.req.json<GeminiCountTokensRequest>() | ||
|
|
||
| const openAIPayload = translateGeminiCountTokensToOpenAI(geminiPayload, model) | ||
|
|
||
| const tokenCounts = getTokenCount(openAIPayload.messages) | ||
|
|
||
| const totalTokens = tokenCounts.input + tokenCounts.output | ||
| const geminiResponse = translateTokenCountToGemini(totalTokens) | ||
|
|
||
| return c.json(geminiResponse) | ||
| } | ||
|
|
||
| const isNonStreaming = ( | ||
| response: Awaited<ReturnType<typeof createChatCompletions>>, | ||
| ): response is ChatCompletionResponse => "choices" in response | ||
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,58 @@ | ||
| import { Hono } from "hono" | ||
|
|
||
| import { forwardError } from "~/lib/error" | ||
|
|
||
| import { | ||
| handleGeminiGeneration, | ||
| handleGeminiStreamGeneration, | ||
| handleGeminiCountTokens, | ||
| } from "./handler" | ||
|
|
||
| const router = new Hono() | ||
|
|
||
| // Streaming generation endpoint | ||
| // POST /v1beta/models/{model}:streamGenerateContent | ||
| router.post("/v1beta/models/*", async (c, next) => { | ||
| const url = c.req.url | ||
| if (url.includes(":streamGenerateContent")) { | ||
| try { | ||
| return await handleGeminiStreamGeneration(c) | ||
| } catch (error) { | ||
| return await forwardError(c, error) | ||
| } | ||
| } | ||
| await next() | ||
| }) | ||
|
|
||
| // Token counting endpoint | ||
| // POST /v1beta/models/{model}:countTokens | ||
| router.post("/v1beta/models/*", async (c, next) => { | ||
| const url = c.req.url | ||
| if (url.includes(":countTokens")) { | ||
| try { | ||
| return await handleGeminiCountTokens(c) | ||
| } catch (error) { | ||
| return await forwardError(c, error) | ||
| } | ||
| } | ||
| await next() | ||
| }) | ||
|
|
||
| // Standard generation endpoint | ||
| // POST /v1beta/models/{model}:generateContent | ||
| router.post("/v1beta/models/*", async (c, next) => { | ||
| const url = c.req.url | ||
| if ( | ||
| url.includes(":generateContent") | ||
| && !url.includes(":streamGenerateContent") | ||
| ) { | ||
| try { | ||
| return await handleGeminiGeneration(c) | ||
| } catch (error) { | ||
| return await forwardError(c, error) | ||
| } | ||
| } | ||
| await next() | ||
| }) | ||
|
|
||
| export { router as geminiRouter } |
Oops, something went wrong.
Add this suggestion to a batch that can be applied as a single commit.
This suggestion is invalid because no changes were made to the code.
Suggestions cannot be applied while the pull request is closed.
Suggestions cannot be applied while viewing a subset of changes.
Only one suggestion per line can be applied in a batch.
Add this suggestion to a batch that can be applied as a single commit.
Applying suggestions on deleted lines is not supported.
You must change the existing code in this line in order to create a valid suggestion.
Outdated suggestions cannot be applied.
This suggestion has been applied or marked resolved.
Suggestions cannot be applied from pending reviews.
Suggestions cannot be applied on multi-line comments.
Suggestions cannot be applied while the pull request is queued to merge.
Suggestion cannot be applied right now. Please check back later.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The function definition is incomplete - it's missing the closing parenthesis and function body. This will cause a syntax error.