diff --git a/backend/internal/service/openai_gateway_service.go b/backend/internal/service/openai_gateway_service.go index e12b208e372..a4776ca5c9b 100644 --- a/backend/internal/service/openai_gateway_service.go +++ b/backend/internal/service/openai_gateway_service.go @@ -38,9 +38,10 @@ const ( // ChatGPT internal API for OAuth accounts chatgptCodexURL = "https://chatgpt.com/backend-api/codex/responses" // OpenAI Platform API for API Key accounts (fallback) - openaiPlatformAPIURL = "https://api.openai.com/v1/responses" - openaiStickySessionTTL = time.Hour // 粘性会话TTL - codexCLIUserAgent = "codex_cli_rs/0.125.0" + openaiPlatformAPIURL = "https://api.openai.com/v1/responses" + openaiStickySessionTTL = time.Hour // 粘性会话TTL + codexCLIUserAgent = "codex_cli_rs/0.125.0" + codexRemoteCompactionV2Feature = "remote_compaction_v2" // codex_cli_only 拒绝时单个请求头日志长度上限(字符) codexCLIOnlyHeaderValueMaxBytes = 256 @@ -61,29 +62,39 @@ const ( // OpenAI allowed headers whitelist (for non-passthrough). var openaiAllowedHeaders = map[string]bool{ - "accept-language": true, - "content-type": true, - "conversation_id": true, - "user-agent": true, - "originator": true, - "session_id": true, - "x-codex-turn-state": true, - "x-codex-turn-metadata": true, + "accept-language": true, + "content-type": true, + "conversation_id": true, + "user-agent": true, + "version": true, + "originator": true, + "session_id": true, + "x-codex-beta-features": true, + "x-codex-installation-id": true, + "x-codex-parent-thread-id": true, + "x-codex-turn-state": true, + "x-codex-turn-metadata": true, + "x-codex-window-id": true, } // OpenAI passthrough allowed headers whitelist. // 透传模式下仅放行这些低风险请求头,避免将非标准/环境噪声头传给上游触发风控。 var openaiPassthroughAllowedHeaders = map[string]bool{ - "accept": true, - "accept-language": true, - "content-type": true, - "conversation_id": true, - "openai-beta": true, - "user-agent": true, - "originator": true, - "session_id": true, - "x-codex-turn-state": true, - "x-codex-turn-metadata": true, + "accept": true, + "accept-language": true, + "content-type": true, + "conversation_id": true, + "openai-beta": true, + "user-agent": true, + "version": true, + "originator": true, + "session_id": true, + "x-codex-beta-features": true, + "x-codex-installation-id": true, + "x-codex-parent-thread-id": true, + "x-codex-turn-state": true, + "x-codex-turn-metadata": true, + "x-codex-window-id": true, } // codex_cli_only 拒绝时记录的请求头白名单(仅用于诊断日志,不参与上游透传) @@ -96,12 +107,70 @@ var codexCLIOnlyDebugHeaderWhitelist = []string{ "Originator", "Session_ID", "Conversation_ID", + "X-Codex-Beta-Features", + "X-Codex-Installation-Id", + "X-Codex-Parent-Thread-Id", + "X-Codex-Turn-State", + "X-Codex-Turn-Metadata", + "X-Codex-Window-Id", "X-Request-ID", "X-Client-Request-ID", "X-Forwarded-For", "X-Real-IP", } +func ensureCodexContextCompactionBetaHeader(header http.Header, body []byte) { + if !openAIRequestBodyHasContextCompaction(body) { + return + } + + existing := strings.TrimSpace(header.Get("X-Codex-Beta-Features")) + if existing == "" { + header.Set("X-Codex-Beta-Features", codexRemoteCompactionV2Feature) + return + } + + for _, token := range strings.Split(existing, ",") { + if strings.TrimSpace(token) == codexRemoteCompactionV2Feature { + return + } + } + header.Set("X-Codex-Beta-Features", existing+","+codexRemoteCompactionV2Feature) +} + +func openAIRequestBodyHasContextCompaction(body []byte) bool { + if !bytes.Contains(body, []byte("context_compaction")) { + return false + } + + var payload any + if err := json.Unmarshal(body, &payload); err != nil { + return false + } + return jsonValueHasType(payload, "context_compaction") +} + +func jsonValueHasType(value any, itemType string) bool { + switch typed := value.(type) { + case map[string]any: + if actual, ok := typed["type"].(string); ok && actual == itemType { + return true + } + for _, child := range typed { + if jsonValueHasType(child, itemType) { + return true + } + } + case []any: + for _, child := range typed { + if jsonValueHasType(child, itemType) { + return true + } + } + } + return false +} + // OpenAICodexUsageSnapshot represents Codex API usage limits from response headers type OpenAICodexUsageSnapshot struct { PrimaryUsedPercent *float64 `json:"primary_used_percent,omitempty"` @@ -3149,6 +3218,7 @@ func (s *OpenAIGatewayService) buildUpstreamRequestOpenAIPassthrough( req.Header.Del("x-api-key") req.Header.Del("x-goog-api-key") req.Header.Set("authorization", "Bearer "+token) + ensureCodexContextCompactionBetaHeader(req.Header, body) // OAuth 透传到 ChatGPT internal API 时补齐必要头。 if account.Type == AccountTypeOAuth { @@ -3178,6 +3248,9 @@ func (s *OpenAIGatewayService) buildUpstreamRequestOpenAIPassthrough( if req.Header.Get("originator") == "" { req.Header.Set("originator", "codex_cli_rs") } + if req.Header.Get("version") == "" { + req.Header.Set("version", codexCLIVersion) + } // 用隔离后的 session 标识符覆盖客户端透传值,防止跨用户会话碰撞。 if clientSessionID == "" { clientSessionID = promptCacheKey @@ -3799,6 +3872,7 @@ func writeOpenAIPassthroughResponseHeaders(dst http.Header, src http.Header, fil "x-codex-secondary-reset-after-seconds", "x-codex-secondary-window-minutes", "x-codex-primary-over-secondary-limit-percent", + "x-codex-turn-state", } { vals := getCaseInsensitiveValues(src, rawKey) if len(vals) == 0 { @@ -3864,6 +3938,7 @@ func (s *OpenAIGatewayService) buildUpstreamRequest(ctx context.Context, c *gin. } } } + ensureCodexContextCompactionBetaHeader(req.Header, body) if account.Type == AccountTypeOAuth { compatMessagesBridge := isOpenAICompatMessagesBridgeContext(c) || isOpenAICompatMessagesBridgeBody(body) // 清除客户端透传的 session 头,后续用隔离后的值重新设置,防止跨用户会话碰撞。 @@ -3881,14 +3956,14 @@ func (s *OpenAIGatewayService) buildUpstreamRequest(ctx context.Context, c *gin. apiKeyID := getAPIKeyIDFromContext(c) if isOpenAIResponsesCompactPath(c) { req.Header.Set("accept", "application/json") - if req.Header.Get("version") == "" { - req.Header.Set("version", codexCLIVersion) - } compactSession := resolveOpenAICompactSessionID(c) req.Header.Set("session_id", isolateOpenAISessionID(apiKeyID, compactSession)) } else { req.Header.Set("accept", "text/event-stream") } + if req.Header.Get("version") == "" { + req.Header.Set("version", codexCLIVersion) + } if promptCacheKey != "" { isolated := isolateOpenAISessionID(apiKeyID, promptCacheKey) req.Header.Set("session_id", isolated) diff --git a/backend/internal/service/openai_gateway_service_test.go b/backend/internal/service/openai_gateway_service_test.go index 84a2fe714eb..58142f14dbf 100644 --- a/backend/internal/service/openai_gateway_service_test.go +++ b/backend/internal/service/openai_gateway_service_test.go @@ -1558,7 +1558,7 @@ func TestOpenAIStreamingHeadersOverride(t *testing.T) { MaxLineSize: defaultMaxLineSize, }, } - svc := &OpenAIGatewayService{cfg: cfg} + svc := &OpenAIGatewayService{cfg: cfg, responseHeaderFilter: compileResponseHeaderFilter(cfg)} rec := httptest.NewRecorder() c, _ := gin.CreateTestContext(rec) @@ -1569,9 +1569,10 @@ func TestOpenAIStreamingHeadersOverride(t *testing.T) { StatusCode: http.StatusOK, Body: pr, Header: http.Header{ - "Cache-Control": []string{"upstream"}, - "X-Request-Id": []string{"req-123"}, - "Content-Type": []string{"application/custom"}, + "Cache-Control": []string{"upstream"}, + "X-Request-Id": []string{"req-123"}, + "X-Codex-Turn-State": []string{"turn-state-123"}, + "Content-Type": []string{"application/custom"}, }, } @@ -1595,6 +1596,45 @@ func TestOpenAIStreamingHeadersOverride(t *testing.T) { if rec.Header().Get("X-Request-Id") != "req-123" { t.Fatalf("expected X-Request-Id passthrough, got %q", rec.Header().Get("X-Request-Id")) } + if rec.Header().Get("X-Codex-Turn-State") != "turn-state-123" { + t.Fatalf("expected X-Codex-Turn-State passthrough, got %q", rec.Header().Get("X-Codex-Turn-State")) + } +} + +func TestOpenAIStreamingPreservesContextCompactionItem(t *testing.T) { + gin.SetMode(gin.TestMode) + cfg := &config.Config{ + Gateway: config.GatewayConfig{ + StreamDataIntervalTimeout: 0, + StreamKeepaliveInterval: 0, + MaxLineSize: defaultMaxLineSize, + }, + } + svc := &OpenAIGatewayService{cfg: cfg} + + rec := httptest.NewRecorder() + c, _ := gin.CreateTestContext(rec) + c.Request = httptest.NewRequest(http.MethodPost, "/", nil) + + pr, pw := io.Pipe() + resp := &http.Response{ + StatusCode: http.StatusOK, + Body: pr, + Header: http.Header{"Content-Type": []string{"text/event-stream"}}, + } + + go func() { + defer func() { _ = pw.Close() }() + _, _ = pw.Write([]byte("data: {\"type\":\"response.output_item.done\",\"item\":{\"type\":\"context_compaction\",\"encrypted_content\":\"enc-123\"}}\n\n")) + _, _ = pw.Write([]byte("data: {\"type\":\"response.completed\",\"response\":{\"usage\":{\"input_tokens\":3,\"output_tokens\":1}}}\n\n")) + }() + + result, err := svc.handleStreamingResponse(c.Request.Context(), resp, c, &Account{ID: 1}, time.Now(), "model", "model") + _ = pr.Close() + require.NoError(t, err) + require.NotNil(t, result) + require.Contains(t, rec.Body.String(), `"type":"context_compaction"`) + require.Contains(t, rec.Body.String(), `"encrypted_content":"enc-123"`) } func TestOpenAIStreamingReuseScannerBufferAndStillWorks(t *testing.T) { @@ -1822,6 +1862,117 @@ func TestOpenAIBuildUpstreamRequestCompactForcesJSONAcceptForOAuth(t *testing.T) require.NotEmpty(t, req.Header.Get("Session_Id")) } +func TestOpenAIBuildUpstreamRequestPreservesCodexBetaFeatures(t *testing.T) { + gin.SetMode(gin.TestMode) + rec := httptest.NewRecorder() + c, _ := gin.CreateTestContext(rec) + body := []byte(`{"model":"gpt-5.5","input":[{"type":"context_compaction"}]}`) + c.Request = httptest.NewRequest(http.MethodPost, "/v1/responses", bytes.NewReader(body)) + c.Request.Header.Set("X-Codex-Beta-Features", "remote_compaction_v2") + c.Request.Header.Set("X-Codex-Installation-Id", "install-123") + c.Request.Header.Set("X-Codex-Parent-Thread-Id", "parent-123") + c.Request.Header.Set("X-Codex-Turn-Metadata", `{"turn_id":"turn-123"}`) + c.Request.Header.Set("X-Codex-Window-Id", "window-123") + c.Request.Header.Set("Version", "0.999.0") + + svc := &OpenAIGatewayService{} + account := &Account{ + Type: AccountTypeOAuth, + Credentials: map[string]any{"chatgpt_account_id": "chatgpt-acc"}, + } + + req, err := svc.buildUpstreamRequest(c.Request.Context(), c, account, body, "token", false, "", true) + + require.NoError(t, err) + require.Equal(t, "remote_compaction_v2", req.Header.Get("X-Codex-Beta-Features")) + require.Equal(t, "install-123", req.Header.Get("X-Codex-Installation-Id")) + require.Equal(t, "parent-123", req.Header.Get("X-Codex-Parent-Thread-Id")) + require.Equal(t, `{"turn_id":"turn-123"}`, req.Header.Get("X-Codex-Turn-Metadata")) + require.Equal(t, "window-123", req.Header.Get("X-Codex-Window-Id")) + require.Equal(t, "0.999.0", req.Header.Get("Version")) +} + +func TestOpenAIBuildUpstreamRequestInfersCodexBetaFeaturesForContextCompaction(t *testing.T) { + gin.SetMode(gin.TestMode) + rec := httptest.NewRecorder() + c, _ := gin.CreateTestContext(rec) + body := []byte(`{"model":"gpt-5.5","input":[{"type":"message","content":"hi"},{"type":"context_compaction"}]}`) + c.Request = httptest.NewRequest(http.MethodPost, "/v1/responses", bytes.NewReader(body)) + + svc := &OpenAIGatewayService{} + account := &Account{Type: AccountTypeOAuth} + + req, err := svc.buildUpstreamRequest(c.Request.Context(), c, account, body, "token", true, "", true) + + require.NoError(t, err) + require.Equal(t, "remote_compaction_v2", req.Header.Get("X-Codex-Beta-Features")) + require.Equal(t, codexCLIVersion, req.Header.Get("Version")) +} + +func TestOpenAIBuildUpstreamRequestAppendsCodexBetaFeaturesForContextCompaction(t *testing.T) { + gin.SetMode(gin.TestMode) + rec := httptest.NewRecorder() + c, _ := gin.CreateTestContext(rec) + body := []byte(`{"model":"gpt-5.5","input":[{"type":"context_compaction"}]}`) + c.Request = httptest.NewRequest(http.MethodPost, "/v1/responses", bytes.NewReader(body)) + c.Request.Header.Set("X-Codex-Beta-Features", "some_other_feature") + + svc := &OpenAIGatewayService{} + account := &Account{Type: AccountTypeOAuth} + + req, err := svc.buildUpstreamRequest(c.Request.Context(), c, account, body, "token", true, "", true) + + require.NoError(t, err) + require.Equal(t, "some_other_feature,remote_compaction_v2", req.Header.Get("X-Codex-Beta-Features")) +} + +func TestOpenAIBuildUpstreamRequestOpenAIPassthroughPreservesCodexBetaFeatures(t *testing.T) { + gin.SetMode(gin.TestMode) + rec := httptest.NewRecorder() + c, _ := gin.CreateTestContext(rec) + body := []byte(`{"model":"gpt-5.5","input":[{"type":"context_compaction"}]}`) + c.Request = httptest.NewRequest(http.MethodPost, "/v1/responses", bytes.NewReader(body)) + c.Request.Header.Set("X-Codex-Beta-Features", "remote_compaction_v2") + c.Request.Header.Set("X-Codex-Installation-Id", "install-123") + c.Request.Header.Set("X-Codex-Parent-Thread-Id", "parent-123") + c.Request.Header.Set("X-Codex-Turn-Metadata", `{"turn_id":"turn-123"}`) + c.Request.Header.Set("X-Codex-Window-Id", "window-123") + c.Request.Header.Set("Version", "0.999.0") + + svc := &OpenAIGatewayService{} + account := &Account{ + Type: AccountTypeOAuth, + Credentials: map[string]any{"chatgpt_account_id": "chatgpt-acc"}, + } + + req, err := svc.buildUpstreamRequestOpenAIPassthrough(c.Request.Context(), c, account, body, "token") + + require.NoError(t, err) + require.Equal(t, "remote_compaction_v2", req.Header.Get("X-Codex-Beta-Features")) + require.Equal(t, "install-123", req.Header.Get("X-Codex-Installation-Id")) + require.Equal(t, "parent-123", req.Header.Get("X-Codex-Parent-Thread-Id")) + require.Equal(t, `{"turn_id":"turn-123"}`, req.Header.Get("X-Codex-Turn-Metadata")) + require.Equal(t, "window-123", req.Header.Get("X-Codex-Window-Id")) + require.Equal(t, "0.999.0", req.Header.Get("Version")) +} + +func TestOpenAIBuildUpstreamRequestOpenAIPassthroughInfersCodexBetaFeaturesForContextCompaction(t *testing.T) { + gin.SetMode(gin.TestMode) + rec := httptest.NewRecorder() + c, _ := gin.CreateTestContext(rec) + body := []byte(`{"model":"gpt-5.5","input":[{"type":"context_compaction"}]}`) + c.Request = httptest.NewRequest(http.MethodPost, "/v1/responses", bytes.NewReader(body)) + + svc := &OpenAIGatewayService{} + account := &Account{Type: AccountTypeOAuth} + + req, err := svc.buildUpstreamRequestOpenAIPassthrough(c.Request.Context(), c, account, body, "token") + + require.NoError(t, err) + require.Equal(t, "remote_compaction_v2", req.Header.Get("X-Codex-Beta-Features")) + require.Equal(t, codexCLIVersion, req.Header.Get("Version")) +} + func TestOpenAIBuildUpstreamRequestOAuthMessagesBridgeUsesSessionOnly(t *testing.T) { gin.SetMode(gin.TestMode) rec := httptest.NewRecorder() diff --git a/backend/internal/util/responseheaders/responseheaders.go b/backend/internal/util/responseheaders/responseheaders.go index 7f7baca658c..62361a39260 100644 --- a/backend/internal/util/responseheaders/responseheaders.go +++ b/backend/internal/util/responseheaders/responseheaders.go @@ -32,6 +32,7 @@ var defaultAllowed = map[string]struct{}{ "retry-after": {}, "location": {}, "www-authenticate": {}, + "x-codex-turn-state": {}, } // hopByHopHeaders 是跳过的 hop-by-hop 头部,这些头部由 HTTP 库自动处理 diff --git a/backend/internal/util/responseheaders/responseheaders_test.go b/backend/internal/util/responseheaders/responseheaders_test.go index d817559e613..514fcb6b96d 100644 --- a/backend/internal/util/responseheaders/responseheaders_test.go +++ b/backend/internal/util/responseheaders/responseheaders_test.go @@ -11,6 +11,7 @@ func TestFilterHeadersDisabledUsesDefaultAllowlist(t *testing.T) { src := http.Header{} src.Add("Content-Type", "application/json") src.Add("X-Request-Id", "req-123") + src.Add("X-Codex-Turn-State", "turn-state-123") src.Add("X-Test", "ok") src.Add("Connection", "keep-alive") src.Add("Content-Length", "123") @@ -27,6 +28,9 @@ func TestFilterHeadersDisabledUsesDefaultAllowlist(t *testing.T) { if filtered.Get("X-Request-Id") != "req-123" { t.Fatalf("expected X-Request-Id allowed, got %q", filtered.Get("X-Request-Id")) } + if filtered.Get("X-Codex-Turn-State") != "turn-state-123" { + t.Fatalf("expected X-Codex-Turn-State allowed, got %q", filtered.Get("X-Codex-Turn-State")) + } if filtered.Get("X-Test") != "" { t.Fatalf("expected X-Test removed, got %q", filtered.Get("X-Test")) }