Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
123 changes: 99 additions & 24 deletions backend/internal/service/openai_gateway_service.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,9 +38,10 @@ const (
// ChatGPT internal API for OAuth accounts
chatgptCodexURL = "https://chatgpt.com/backend-api/codex/responses"
// OpenAI Platform API for API Key accounts (fallback)
openaiPlatformAPIURL = "https://api.openai.com/v1/responses"
openaiStickySessionTTL = time.Hour // 粘性会话TTL
codexCLIUserAgent = "codex_cli_rs/0.125.0"
openaiPlatformAPIURL = "https://api.openai.com/v1/responses"
openaiStickySessionTTL = time.Hour // 粘性会话TTL
codexCLIUserAgent = "codex_cli_rs/0.125.0"
codexRemoteCompactionV2Feature = "remote_compaction_v2"
// codex_cli_only 拒绝时单个请求头日志长度上限(字符)
codexCLIOnlyHeaderValueMaxBytes = 256

Expand All @@ -61,29 +62,39 @@ const (

// OpenAI allowed headers whitelist (for non-passthrough).
var openaiAllowedHeaders = map[string]bool{
"accept-language": true,
"content-type": true,
"conversation_id": true,
"user-agent": true,
"originator": true,
"session_id": true,
"x-codex-turn-state": true,
"x-codex-turn-metadata": true,
"accept-language": true,
"content-type": true,
"conversation_id": true,
"user-agent": true,
"version": true,
"originator": true,
"session_id": true,
"x-codex-beta-features": true,
"x-codex-installation-id": true,
"x-codex-parent-thread-id": true,
"x-codex-turn-state": true,
"x-codex-turn-metadata": true,
"x-codex-window-id": true,
}

// OpenAI passthrough allowed headers whitelist.
// 透传模式下仅放行这些低风险请求头,避免将非标准/环境噪声头传给上游触发风控。
var openaiPassthroughAllowedHeaders = map[string]bool{
"accept": true,
"accept-language": true,
"content-type": true,
"conversation_id": true,
"openai-beta": true,
"user-agent": true,
"originator": true,
"session_id": true,
"x-codex-turn-state": true,
"x-codex-turn-metadata": true,
"accept": true,
"accept-language": true,
"content-type": true,
"conversation_id": true,
"openai-beta": true,
"user-agent": true,
"version": true,
"originator": true,
"session_id": true,
"x-codex-beta-features": true,
"x-codex-installation-id": true,
"x-codex-parent-thread-id": true,
"x-codex-turn-state": true,
"x-codex-turn-metadata": true,
"x-codex-window-id": true,
}

// codex_cli_only 拒绝时记录的请求头白名单(仅用于诊断日志,不参与上游透传)
Expand All @@ -96,12 +107,70 @@ var codexCLIOnlyDebugHeaderWhitelist = []string{
"Originator",
"Session_ID",
"Conversation_ID",
"X-Codex-Beta-Features",
"X-Codex-Installation-Id",
"X-Codex-Parent-Thread-Id",
"X-Codex-Turn-State",
"X-Codex-Turn-Metadata",
"X-Codex-Window-Id",
"X-Request-ID",
"X-Client-Request-ID",
"X-Forwarded-For",
"X-Real-IP",
}

func ensureCodexContextCompactionBetaHeader(header http.Header, body []byte) {
if !openAIRequestBodyHasContextCompaction(body) {
return
}

existing := strings.TrimSpace(header.Get("X-Codex-Beta-Features"))
if existing == "" {
header.Set("X-Codex-Beta-Features", codexRemoteCompactionV2Feature)
return
}

for _, token := range strings.Split(existing, ",") {
if strings.TrimSpace(token) == codexRemoteCompactionV2Feature {
return
}
}
header.Set("X-Codex-Beta-Features", existing+","+codexRemoteCompactionV2Feature)
}

func openAIRequestBodyHasContextCompaction(body []byte) bool {
if !bytes.Contains(body, []byte("context_compaction")) {
return false
}

var payload any
if err := json.Unmarshal(body, &payload); err != nil {
return false
}
return jsonValueHasType(payload, "context_compaction")
}

func jsonValueHasType(value any, itemType string) bool {
switch typed := value.(type) {
case map[string]any:
if actual, ok := typed["type"].(string); ok && actual == itemType {
return true
}
for _, child := range typed {
if jsonValueHasType(child, itemType) {
return true
}
}
case []any:
for _, child := range typed {
if jsonValueHasType(child, itemType) {
return true
}
}
}
return false
}

// OpenAICodexUsageSnapshot represents Codex API usage limits from response headers
type OpenAICodexUsageSnapshot struct {
PrimaryUsedPercent *float64 `json:"primary_used_percent,omitempty"`
Expand Down Expand Up @@ -3149,6 +3218,7 @@ func (s *OpenAIGatewayService) buildUpstreamRequestOpenAIPassthrough(
req.Header.Del("x-api-key")
req.Header.Del("x-goog-api-key")
req.Header.Set("authorization", "Bearer "+token)
ensureCodexContextCompactionBetaHeader(req.Header, body)

// OAuth 透传到 ChatGPT internal API 时补齐必要头。
if account.Type == AccountTypeOAuth {
Expand Down Expand Up @@ -3178,6 +3248,9 @@ func (s *OpenAIGatewayService) buildUpstreamRequestOpenAIPassthrough(
if req.Header.Get("originator") == "" {
req.Header.Set("originator", "codex_cli_rs")
}
if req.Header.Get("version") == "" {
req.Header.Set("version", codexCLIVersion)
}
// 用隔离后的 session 标识符覆盖客户端透传值,防止跨用户会话碰撞。
if clientSessionID == "" {
clientSessionID = promptCacheKey
Expand Down Expand Up @@ -3799,6 +3872,7 @@ func writeOpenAIPassthroughResponseHeaders(dst http.Header, src http.Header, fil
"x-codex-secondary-reset-after-seconds",
"x-codex-secondary-window-minutes",
"x-codex-primary-over-secondary-limit-percent",
"x-codex-turn-state",
} {
vals := getCaseInsensitiveValues(src, rawKey)
if len(vals) == 0 {
Expand Down Expand Up @@ -3864,6 +3938,7 @@ func (s *OpenAIGatewayService) buildUpstreamRequest(ctx context.Context, c *gin.
}
}
}
ensureCodexContextCompactionBetaHeader(req.Header, body)
if account.Type == AccountTypeOAuth {
compatMessagesBridge := isOpenAICompatMessagesBridgeContext(c) || isOpenAICompatMessagesBridgeBody(body)
// 清除客户端透传的 session 头,后续用隔离后的值重新设置,防止跨用户会话碰撞。
Expand All @@ -3881,14 +3956,14 @@ func (s *OpenAIGatewayService) buildUpstreamRequest(ctx context.Context, c *gin.
apiKeyID := getAPIKeyIDFromContext(c)
if isOpenAIResponsesCompactPath(c) {
req.Header.Set("accept", "application/json")
if req.Header.Get("version") == "" {
req.Header.Set("version", codexCLIVersion)
}
compactSession := resolveOpenAICompactSessionID(c)
req.Header.Set("session_id", isolateOpenAISessionID(apiKeyID, compactSession))
} else {
req.Header.Set("accept", "text/event-stream")
}
if req.Header.Get("version") == "" {
req.Header.Set("version", codexCLIVersion)
}
if promptCacheKey != "" {
isolated := isolateOpenAISessionID(apiKeyID, promptCacheKey)
req.Header.Set("session_id", isolated)
Expand Down
159 changes: 155 additions & 4 deletions backend/internal/service/openai_gateway_service_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1558,7 +1558,7 @@ func TestOpenAIStreamingHeadersOverride(t *testing.T) {
MaxLineSize: defaultMaxLineSize,
},
}
svc := &OpenAIGatewayService{cfg: cfg}
svc := &OpenAIGatewayService{cfg: cfg, responseHeaderFilter: compileResponseHeaderFilter(cfg)}

rec := httptest.NewRecorder()
c, _ := gin.CreateTestContext(rec)
Expand All @@ -1569,9 +1569,10 @@ func TestOpenAIStreamingHeadersOverride(t *testing.T) {
StatusCode: http.StatusOK,
Body: pr,
Header: http.Header{
"Cache-Control": []string{"upstream"},
"X-Request-Id": []string{"req-123"},
"Content-Type": []string{"application/custom"},
"Cache-Control": []string{"upstream"},
"X-Request-Id": []string{"req-123"},
"X-Codex-Turn-State": []string{"turn-state-123"},
"Content-Type": []string{"application/custom"},
},
}

Expand All @@ -1595,6 +1596,45 @@ func TestOpenAIStreamingHeadersOverride(t *testing.T) {
if rec.Header().Get("X-Request-Id") != "req-123" {
t.Fatalf("expected X-Request-Id passthrough, got %q", rec.Header().Get("X-Request-Id"))
}
if rec.Header().Get("X-Codex-Turn-State") != "turn-state-123" {
t.Fatalf("expected X-Codex-Turn-State passthrough, got %q", rec.Header().Get("X-Codex-Turn-State"))
}
}

func TestOpenAIStreamingPreservesContextCompactionItem(t *testing.T) {
gin.SetMode(gin.TestMode)
cfg := &config.Config{
Gateway: config.GatewayConfig{
StreamDataIntervalTimeout: 0,
StreamKeepaliveInterval: 0,
MaxLineSize: defaultMaxLineSize,
},
}
svc := &OpenAIGatewayService{cfg: cfg}

rec := httptest.NewRecorder()
c, _ := gin.CreateTestContext(rec)
c.Request = httptest.NewRequest(http.MethodPost, "/", nil)

pr, pw := io.Pipe()
resp := &http.Response{
StatusCode: http.StatusOK,
Body: pr,
Header: http.Header{"Content-Type": []string{"text/event-stream"}},
}

go func() {
defer func() { _ = pw.Close() }()
_, _ = pw.Write([]byte("data: {\"type\":\"response.output_item.done\",\"item\":{\"type\":\"context_compaction\",\"encrypted_content\":\"enc-123\"}}\n\n"))
_, _ = pw.Write([]byte("data: {\"type\":\"response.completed\",\"response\":{\"usage\":{\"input_tokens\":3,\"output_tokens\":1}}}\n\n"))
}()

result, err := svc.handleStreamingResponse(c.Request.Context(), resp, c, &Account{ID: 1}, time.Now(), "model", "model")
_ = pr.Close()
require.NoError(t, err)
require.NotNil(t, result)
require.Contains(t, rec.Body.String(), `"type":"context_compaction"`)
require.Contains(t, rec.Body.String(), `"encrypted_content":"enc-123"`)
}

func TestOpenAIStreamingReuseScannerBufferAndStillWorks(t *testing.T) {
Expand Down Expand Up @@ -1822,6 +1862,117 @@ func TestOpenAIBuildUpstreamRequestCompactForcesJSONAcceptForOAuth(t *testing.T)
require.NotEmpty(t, req.Header.Get("Session_Id"))
}

func TestOpenAIBuildUpstreamRequestPreservesCodexBetaFeatures(t *testing.T) {
gin.SetMode(gin.TestMode)
rec := httptest.NewRecorder()
c, _ := gin.CreateTestContext(rec)
body := []byte(`{"model":"gpt-5.5","input":[{"type":"context_compaction"}]}`)
c.Request = httptest.NewRequest(http.MethodPost, "/v1/responses", bytes.NewReader(body))
c.Request.Header.Set("X-Codex-Beta-Features", "remote_compaction_v2")
c.Request.Header.Set("X-Codex-Installation-Id", "install-123")
c.Request.Header.Set("X-Codex-Parent-Thread-Id", "parent-123")
c.Request.Header.Set("X-Codex-Turn-Metadata", `{"turn_id":"turn-123"}`)
c.Request.Header.Set("X-Codex-Window-Id", "window-123")
c.Request.Header.Set("Version", "0.999.0")

svc := &OpenAIGatewayService{}
account := &Account{
Type: AccountTypeOAuth,
Credentials: map[string]any{"chatgpt_account_id": "chatgpt-acc"},
}

req, err := svc.buildUpstreamRequest(c.Request.Context(), c, account, body, "token", false, "", true)

require.NoError(t, err)
require.Equal(t, "remote_compaction_v2", req.Header.Get("X-Codex-Beta-Features"))
require.Equal(t, "install-123", req.Header.Get("X-Codex-Installation-Id"))
require.Equal(t, "parent-123", req.Header.Get("X-Codex-Parent-Thread-Id"))
require.Equal(t, `{"turn_id":"turn-123"}`, req.Header.Get("X-Codex-Turn-Metadata"))
require.Equal(t, "window-123", req.Header.Get("X-Codex-Window-Id"))
require.Equal(t, "0.999.0", req.Header.Get("Version"))
}

func TestOpenAIBuildUpstreamRequestInfersCodexBetaFeaturesForContextCompaction(t *testing.T) {
gin.SetMode(gin.TestMode)
rec := httptest.NewRecorder()
c, _ := gin.CreateTestContext(rec)
body := []byte(`{"model":"gpt-5.5","input":[{"type":"message","content":"hi"},{"type":"context_compaction"}]}`)
c.Request = httptest.NewRequest(http.MethodPost, "/v1/responses", bytes.NewReader(body))

svc := &OpenAIGatewayService{}
account := &Account{Type: AccountTypeOAuth}

req, err := svc.buildUpstreamRequest(c.Request.Context(), c, account, body, "token", true, "", true)

require.NoError(t, err)
require.Equal(t, "remote_compaction_v2", req.Header.Get("X-Codex-Beta-Features"))
require.Equal(t, codexCLIVersion, req.Header.Get("Version"))
}

func TestOpenAIBuildUpstreamRequestAppendsCodexBetaFeaturesForContextCompaction(t *testing.T) {
gin.SetMode(gin.TestMode)
rec := httptest.NewRecorder()
c, _ := gin.CreateTestContext(rec)
body := []byte(`{"model":"gpt-5.5","input":[{"type":"context_compaction"}]}`)
c.Request = httptest.NewRequest(http.MethodPost, "/v1/responses", bytes.NewReader(body))
c.Request.Header.Set("X-Codex-Beta-Features", "some_other_feature")

svc := &OpenAIGatewayService{}
account := &Account{Type: AccountTypeOAuth}

req, err := svc.buildUpstreamRequest(c.Request.Context(), c, account, body, "token", true, "", true)

require.NoError(t, err)
require.Equal(t, "some_other_feature,remote_compaction_v2", req.Header.Get("X-Codex-Beta-Features"))
}

func TestOpenAIBuildUpstreamRequestOpenAIPassthroughPreservesCodexBetaFeatures(t *testing.T) {
gin.SetMode(gin.TestMode)
rec := httptest.NewRecorder()
c, _ := gin.CreateTestContext(rec)
body := []byte(`{"model":"gpt-5.5","input":[{"type":"context_compaction"}]}`)
c.Request = httptest.NewRequest(http.MethodPost, "/v1/responses", bytes.NewReader(body))
c.Request.Header.Set("X-Codex-Beta-Features", "remote_compaction_v2")
c.Request.Header.Set("X-Codex-Installation-Id", "install-123")
c.Request.Header.Set("X-Codex-Parent-Thread-Id", "parent-123")
c.Request.Header.Set("X-Codex-Turn-Metadata", `{"turn_id":"turn-123"}`)
c.Request.Header.Set("X-Codex-Window-Id", "window-123")
c.Request.Header.Set("Version", "0.999.0")

svc := &OpenAIGatewayService{}
account := &Account{
Type: AccountTypeOAuth,
Credentials: map[string]any{"chatgpt_account_id": "chatgpt-acc"},
}

req, err := svc.buildUpstreamRequestOpenAIPassthrough(c.Request.Context(), c, account, body, "token")

require.NoError(t, err)
require.Equal(t, "remote_compaction_v2", req.Header.Get("X-Codex-Beta-Features"))
require.Equal(t, "install-123", req.Header.Get("X-Codex-Installation-Id"))
require.Equal(t, "parent-123", req.Header.Get("X-Codex-Parent-Thread-Id"))
require.Equal(t, `{"turn_id":"turn-123"}`, req.Header.Get("X-Codex-Turn-Metadata"))
require.Equal(t, "window-123", req.Header.Get("X-Codex-Window-Id"))
require.Equal(t, "0.999.0", req.Header.Get("Version"))
}

func TestOpenAIBuildUpstreamRequestOpenAIPassthroughInfersCodexBetaFeaturesForContextCompaction(t *testing.T) {
gin.SetMode(gin.TestMode)
rec := httptest.NewRecorder()
c, _ := gin.CreateTestContext(rec)
body := []byte(`{"model":"gpt-5.5","input":[{"type":"context_compaction"}]}`)
c.Request = httptest.NewRequest(http.MethodPost, "/v1/responses", bytes.NewReader(body))

svc := &OpenAIGatewayService{}
account := &Account{Type: AccountTypeOAuth}

req, err := svc.buildUpstreamRequestOpenAIPassthrough(c.Request.Context(), c, account, body, "token")

require.NoError(t, err)
require.Equal(t, "remote_compaction_v2", req.Header.Get("X-Codex-Beta-Features"))
require.Equal(t, codexCLIVersion, req.Header.Get("Version"))
}

func TestOpenAIBuildUpstreamRequestOAuthMessagesBridgeUsesSessionOnly(t *testing.T) {
gin.SetMode(gin.TestMode)
rec := httptest.NewRecorder()
Expand Down
1 change: 1 addition & 0 deletions backend/internal/util/responseheaders/responseheaders.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ var defaultAllowed = map[string]struct{}{
"retry-after": {},
"location": {},
"www-authenticate": {},
"x-codex-turn-state": {},
}

// hopByHopHeaders 是跳过的 hop-by-hop 头部,这些头部由 HTTP 库自动处理
Expand Down
Loading
Loading