Skip to content

Commit 531b895

Browse files
committed
fix: preserve openai selection diagnostics in ops failures
1 parent 3f18299 commit 531b895

34 files changed

Lines changed: 1316 additions & 98 deletions

backend/internal/handler/gateway_handler.go

Lines changed: 10 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -352,7 +352,7 @@ func (h *GatewayHandler) Messages(c *gin.Context) {
352352
}
353353
}
354354
account := selection.Account
355-
setOpsSelectedAccount(c, account.ID, account.Platform)
355+
setOpsSelectedAccount(c, account.ID, account.Name, account.Platform)
356356

357357
// 检查请求拦截(预热请求、SUGGESTION MODE等)
358358
if account.IsInterceptWarmupEnabled() {
@@ -594,7 +594,7 @@ func (h *GatewayHandler) Messages(c *gin.Context) {
594594
}
595595
}
596596
account := selection.Account
597-
setOpsSelectedAccount(c, account.ID, account.Platform)
597+
setOpsSelectedAccount(c, account.ID, account.Name, account.Platform)
598598

599599
// [DEBUG-STICKY] 打印账号选择结果
600600
reqLog.Info("sticky.account_selected",
@@ -1312,6 +1312,10 @@ func (h *GatewayHandler) handleConcurrencyError(c *gin.Context, err error, slotT
13121312
func (h *GatewayHandler) handleFailoverExhausted(c *gin.Context, failoverErr *service.UpstreamFailoverError, platform string, streamStarted bool) {
13131313
statusCode := failoverErr.StatusCode
13141314
responseBody := failoverErr.ResponseBody
1315+
upstreamMsg := service.ExtractUpstreamErrorMessage(responseBody)
1316+
1317+
service.SetOpsUpstreamError(c, statusCode, upstreamMsg, "")
1318+
ensureOpsUpstreamErrorEvent(c, statusCode, "failover", upstreamMsg)
13151319

13161320
// 先检查透传规则
13171321
if h.errorPassthroughService != nil && len(responseBody) > 0 {
@@ -1337,10 +1341,6 @@ func (h *GatewayHandler) handleFailoverExhausted(c *gin.Context, failoverErr *se
13371341
}
13381342
}
13391343

1340-
// 记录原始上游状态码,以便 ops 错误日志捕获真实的上游错误
1341-
upstreamMsg := service.ExtractUpstreamErrorMessage(responseBody)
1342-
service.SetOpsUpstreamError(c, statusCode, upstreamMsg, "")
1343-
13441344
// 使用默认的错误映射
13451345
status, errType, errMsg := h.mapUpstreamError(statusCode)
13461346
h.handleStreamingAwareError(c, status, errType, errMsg, streamStarted)
@@ -1350,6 +1350,7 @@ func (h *GatewayHandler) handleFailoverExhausted(c *gin.Context, failoverErr *se
13501350
func (h *GatewayHandler) handleFailoverExhaustedSimple(c *gin.Context, statusCode int, streamStarted bool) {
13511351
status, errType, errMsg := h.mapUpstreamError(statusCode)
13521352
service.SetOpsUpstreamError(c, statusCode, errMsg, "")
1353+
ensureOpsUpstreamErrorEvent(c, statusCode, "failover", errMsg)
13531354
h.handleStreamingAwareError(c, status, errType, errMsg, streamStarted)
13541355
}
13551356

@@ -1372,6 +1373,8 @@ func (h *GatewayHandler) mapUpstreamError(statusCode int) (int, string, string)
13721373

13731374
// handleStreamingAwareError handles errors that may occur after streaming has started
13741375
func (h *GatewayHandler) handleStreamingAwareError(c *gin.Context, status int, errType, message string, streamStarted bool) {
1376+
message = decorateScheduledAccountErrorMessage(c, status, message)
1377+
13751378
if streamStarted {
13761379
// Stream already started, send error as SSE event then close
13771380
flusher, ok := c.Writer.(http.Flusher)
@@ -1545,7 +1548,7 @@ func (h *GatewayHandler) CountTokens(c *gin.Context) {
15451548
h.errorResponse(c, http.StatusServiceUnavailable, "api_error", "Service temporarily unavailable")
15461549
return
15471550
}
1548-
setOpsSelectedAccount(c, account.ID, account.Platform)
1551+
setOpsSelectedAccount(c, account.ID, account.Name, account.Platform)
15491552

15501553
// 转发请求(不记录使用量)
15511554
if err := h.gatewayService.ForwardCountTokens(c.Request.Context(), c, account, parsedReq); err != nil {

backend/internal/handler/gateway_handler_chat_completions.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -188,7 +188,7 @@ func (h *GatewayHandler) ChatCompletions(c *gin.Context) {
188188
}
189189
}
190190
account := selection.Account
191-
setOpsSelectedAccount(c, account.ID, account.Platform)
191+
setOpsSelectedAccount(c, account.ID, account.Name, account.Platform)
192192

193193
// 4. Acquire account concurrency slot
194194
accountReleaseFunc := selection.ReleaseFunc

backend/internal/handler/gateway_handler_responses.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -193,7 +193,7 @@ func (h *GatewayHandler) Responses(c *gin.Context) {
193193
}
194194
}
195195
account := selection.Account
196-
setOpsSelectedAccount(c, account.ID, account.Platform)
196+
setOpsSelectedAccount(c, account.ID, account.Name, account.Platform)
197197

198198
// 4. Acquire account concurrency slot
199199
accountReleaseFunc := selection.ReleaseFunc

backend/internal/handler/gemini_v1beta_handler.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -389,7 +389,7 @@ func (h *GatewayHandler) GeminiV1BetaModels(c *gin.Context) {
389389
}
390390
}
391391
account := selection.Account
392-
setOpsSelectedAccount(c, account.ID, account.Platform)
392+
setOpsSelectedAccount(c, account.ID, account.Name, account.Platform)
393393

394394
// 检测账号切换:如果粘性会话绑定的账号与当前选择的账号不同,清除 thoughtSignature
395395
// 注意:Gemini 原生 API 的 thoughtSignature 与具体上游账号强相关;跨账号透传会导致 400。

backend/internal/handler/openai_chat_completions.go

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -143,7 +143,9 @@ func (h *OpenAIGatewayHandler) ChatCompletions(c *gin.Context) {
143143
zap.Int("excluded_account_count", len(failedAccountIDs)),
144144
)
145145
if len(failedAccountIDs) == 0 {
146-
h.handleStreamingAwareError(c, http.StatusServiceUnavailable, "api_error", "Service temporarily unavailable", streamStarted)
146+
msg := selectionUnavailableMessage(err)
147+
annotateOpenAISelectionFailure(c, h.gatewayService, apiKey.GroupID, sessionHash, err, msg)
148+
h.handleStreamingAwareError(c, http.StatusServiceUnavailable, "api_error", msg, streamStarted)
147149
return
148150
} else {
149151
if lastFailoverErr != nil {
@@ -155,14 +157,15 @@ func (h *OpenAIGatewayHandler) ChatCompletions(c *gin.Context) {
155157
}
156158
}
157159
if selection == nil || selection.Account == nil {
160+
annotateOpenAISelectionFailure(c, h.gatewayService, apiKey.GroupID, sessionHash, nil, "No available accounts")
158161
h.handleStreamingAwareError(c, http.StatusServiceUnavailable, "api_error", "No available accounts", streamStarted)
159162
return
160163
}
161164
account := selection.Account
162165
sessionHash = ensureOpenAIPoolModeSessionHash(sessionHash, account)
163166
reqLog.Debug("openai_chat_completions.account_selected", zap.Int64("account_id", account.ID), zap.String("account_name", account.Name))
164167
_ = scheduleDecision
165-
setOpsSelectedAccount(c, account.ID, account.Platform)
168+
setOpsSelectedAccount(c, account.ID, account.Name, account.Platform)
166169

167170
accountReleaseFunc, acquired := h.acquireResponsesAccountSlot(c, apiKey.GroupID, sessionHash, selection, reqStream, &streamStarted, reqLog)
168171
if !acquired {

backend/internal/handler/openai_gateway_handler.go

Lines changed: 18 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -283,10 +283,13 @@ func (h *OpenAIGatewayHandler) Responses(c *gin.Context) {
283283
)
284284
if len(failedAccountIDs) == 0 {
285285
if errors.Is(err, service.ErrNoAvailableCompactAccounts) {
286+
annotateOpenAISelectionFailure(c, h.gatewayService, apiKey.GroupID, sessionHash, err, "No available OpenAI accounts support /responses/compact")
286287
h.handleStreamingAwareError(c, http.StatusServiceUnavailable, "compact_not_supported", "No available OpenAI accounts support /responses/compact", streamStarted)
287288
return
288289
}
289-
h.handleStreamingAwareError(c, http.StatusServiceUnavailable, "api_error", "Service temporarily unavailable", streamStarted)
290+
msg := selectionUnavailableMessage(err)
291+
annotateOpenAISelectionFailure(c, h.gatewayService, apiKey.GroupID, sessionHash, err, msg)
292+
h.handleStreamingAwareError(c, http.StatusServiceUnavailable, "api_error", msg, streamStarted)
290293
return
291294
}
292295
if lastFailoverErr != nil {
@@ -297,6 +300,7 @@ func (h *OpenAIGatewayHandler) Responses(c *gin.Context) {
297300
return
298301
}
299302
if selection == nil || selection.Account == nil {
303+
annotateOpenAISelectionFailure(c, h.gatewayService, apiKey.GroupID, sessionHash, nil, "No available accounts")
300304
h.handleStreamingAwareError(c, http.StatusServiceUnavailable, "api_error", "No available accounts", streamStarted)
301305
return
302306
}
@@ -315,7 +319,7 @@ func (h *OpenAIGatewayHandler) Responses(c *gin.Context) {
315319
account := selection.Account
316320
sessionHash = ensureOpenAIPoolModeSessionHash(sessionHash, account)
317321
reqLog.Debug("openai.account_selected", zap.Int64("account_id", account.ID), zap.String("account_name", account.Name))
318-
setOpsSelectedAccount(c, account.ID, account.Platform)
322+
setOpsSelectedAccount(c, account.ID, account.Name, account.Platform)
319323

320324
accountReleaseFunc, acquired := h.acquireResponsesAccountSlot(c, apiKey.GroupID, sessionHash, selection, reqStream, &streamStarted, reqLog)
321325
if !acquired {
@@ -677,7 +681,9 @@ func (h *OpenAIGatewayHandler) Messages(c *gin.Context) {
677681
)
678682
if len(failedAccountIDs) == 0 {
679683
if err != nil {
680-
h.anthropicStreamingAwareError(c, http.StatusServiceUnavailable, "api_error", "Service temporarily unavailable", streamStarted)
684+
msg := selectionUnavailableMessage(err)
685+
annotateOpenAISelectionFailure(c, h.gatewayService, apiKey.GroupID, sessionHash, err, msg)
686+
h.anthropicStreamingAwareError(c, http.StatusServiceUnavailable, "api_error", msg, streamStarted)
681687
return
682688
}
683689
} else {
@@ -690,14 +696,15 @@ func (h *OpenAIGatewayHandler) Messages(c *gin.Context) {
690696
}
691697
}
692698
if selection == nil || selection.Account == nil {
699+
annotateOpenAISelectionFailure(c, h.gatewayService, apiKey.GroupID, sessionHash, nil, "No available accounts")
693700
h.anthropicStreamingAwareError(c, http.StatusServiceUnavailable, "api_error", "No available accounts", streamStarted)
694701
return
695702
}
696703
account := selection.Account
697704
sessionHash = ensureOpenAIPoolModeSessionHash(sessionHash, account)
698705
reqLog.Debug("openai_messages.account_selected", zap.Int64("account_id", account.ID), zap.String("account_name", account.Name))
699706
_ = scheduleDecision
700-
setOpsSelectedAccount(c, account.ID, account.Platform)
707+
setOpsSelectedAccount(c, account.ID, account.Name, account.Platform)
701708

702709
accountReleaseFunc, acquired := h.acquireResponsesAccountSlot(c, apiKey.GroupID, sessionHash, selection, reqStream, &streamStarted, reqLog)
703710
if !acquired {
@@ -1598,6 +1605,10 @@ func (h *OpenAIGatewayHandler) handleConcurrencyError(c *gin.Context, err error,
15981605
func (h *OpenAIGatewayHandler) handleFailoverExhausted(c *gin.Context, failoverErr *service.UpstreamFailoverError, streamStarted bool) {
15991606
statusCode := failoverErr.StatusCode
16001607
responseBody := failoverErr.ResponseBody
1608+
upstreamMsg := service.ExtractUpstreamErrorMessage(responseBody)
1609+
1610+
service.SetOpsUpstreamError(c, statusCode, upstreamMsg, "")
1611+
ensureOpsUpstreamErrorEvent(c, statusCode, "failover", upstreamMsg)
16011612

16021613
// 先检查透传规则
16031614
if h.errorPassthroughService != nil && len(responseBody) > 0 {
@@ -1623,10 +1634,6 @@ func (h *OpenAIGatewayHandler) handleFailoverExhausted(c *gin.Context, failoverE
16231634
}
16241635
}
16251636

1626-
// 记录原始上游状态码,以便 ops 错误日志捕获真实的上游错误
1627-
upstreamMsg := service.ExtractUpstreamErrorMessage(responseBody)
1628-
service.SetOpsUpstreamError(c, statusCode, upstreamMsg, "")
1629-
16301637
// 使用默认的错误映射
16311638
status, errType, errMsg := h.mapUpstreamError(statusCode)
16321639
h.handleStreamingAwareError(c, status, errType, errMsg, streamStarted)
@@ -1636,6 +1643,7 @@ func (h *OpenAIGatewayHandler) handleFailoverExhausted(c *gin.Context, failoverE
16361643
func (h *OpenAIGatewayHandler) handleFailoverExhaustedSimple(c *gin.Context, statusCode int, streamStarted bool) {
16371644
status, errType, errMsg := h.mapUpstreamError(statusCode)
16381645
service.SetOpsUpstreamError(c, statusCode, errMsg, "")
1646+
ensureOpsUpstreamErrorEvent(c, statusCode, "failover", errMsg)
16391647
h.handleStreamingAwareError(c, status, errType, errMsg, streamStarted)
16401648
}
16411649

@@ -1658,6 +1666,8 @@ func (h *OpenAIGatewayHandler) mapUpstreamError(statusCode int) (int, string, st
16581666

16591667
// handleStreamingAwareError handles errors that may occur after streaming has started
16601668
func (h *OpenAIGatewayHandler) handleStreamingAwareError(c *gin.Context, status int, errType, message string, streamStarted bool) {
1669+
message = decorateScheduledAccountErrorMessage(c, status, message)
1670+
16611671
if streamStarted {
16621672
// Stream already started, send error as SSE event then close
16631673
flusher, ok := c.Writer.(http.Flusher)

backend/internal/handler/openai_gateway_handler_test.go

Lines changed: 44 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -132,6 +132,50 @@ func TestOpenAIHandleStreamingAwareError_NonStreaming(t *testing.T) {
132132
assert.Equal(t, "test error", errorObj["message"])
133133
}
134134

135+
func TestOpenAIHandleStreamingAwareError_NonStreaming503IncludesScheduledAccount(t *testing.T) {
136+
gin.SetMode(gin.TestMode)
137+
w := httptest.NewRecorder()
138+
c, _ := gin.CreateTestContext(w)
139+
c.Request = httptest.NewRequest(http.MethodGet, "/", nil)
140+
setOpsSelectedAccount(c, 42, "pool-account-42", service.PlatformOpenAI)
141+
142+
h := &OpenAIGatewayHandler{}
143+
h.handleStreamingAwareError(c, http.StatusServiceUnavailable, "api_error", "Service temporarily unavailable", false)
144+
145+
assert.Equal(t, http.StatusServiceUnavailable, w.Code)
146+
assert.Equal(t, "pool-account-42", w.Header().Get(scheduledAccountDebugHeader))
147+
148+
var parsed map[string]any
149+
err := json.Unmarshal(w.Body.Bytes(), &parsed)
150+
require.NoError(t, err)
151+
errorObj, ok := parsed["error"].(map[string]any)
152+
require.True(t, ok)
153+
assert.Equal(t, "api_error", errorObj["type"])
154+
assert.Equal(t, "Service temporarily unavailable [scheduled account: pool-account-42]", errorObj["message"])
155+
}
156+
157+
func TestOpenAIHandleFailoverExhaustedSimple_BackfillsUpstreamEvent(t *testing.T) {
158+
gin.SetMode(gin.TestMode)
159+
w := httptest.NewRecorder()
160+
c, _ := gin.CreateTestContext(w)
161+
c.Request = httptest.NewRequest(http.MethodGet, "/", nil)
162+
setOpsSelectedAccount(c, 7, "fallback-account", service.PlatformOpenAI)
163+
164+
h := &OpenAIGatewayHandler{}
165+
h.handleFailoverExhaustedSimple(c, http.StatusServiceUnavailable, false)
166+
167+
v, ok := c.Get(service.OpsUpstreamErrorsKey)
168+
require.True(t, ok)
169+
events, ok := v.([]*service.OpsUpstreamErrorEvent)
170+
require.True(t, ok)
171+
require.Len(t, events, 1)
172+
assert.Equal(t, int64(7), events[0].AccountID)
173+
assert.Equal(t, "fallback-account", events[0].AccountName)
174+
assert.Equal(t, http.StatusServiceUnavailable, events[0].UpstreamStatusCode)
175+
assert.Equal(t, "failover", events[0].Kind)
176+
assert.Contains(t, events[0].Message, "temporarily unavailable")
177+
}
178+
135179
func TestReadRequestBodyWithPrealloc(t *testing.T) {
136180
payload := `{"model":"gpt-5","input":"hello"}`
137181
req := httptest.NewRequest(http.MethodPost, "/v1/responses", strings.NewReader(payload))

backend/internal/handler/openai_images.go

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -157,7 +157,9 @@ func (h *OpenAIGatewayHandler) Images(c *gin.Context) {
157157
zap.Int("excluded_account_count", len(failedAccountIDs)),
158158
)
159159
if len(failedAccountIDs) == 0 {
160-
h.handleStreamingAwareError(c, http.StatusServiceUnavailable, "api_error", "No available compatible accounts", streamStarted)
160+
msg := selectionUnavailableMessage(err)
161+
annotateOpenAISelectionFailure(c, h.gatewayService, apiKey.GroupID, sessionHash, err, msg)
162+
h.handleStreamingAwareError(c, http.StatusServiceUnavailable, "api_error", msg, streamStarted)
161163
return
162164
}
163165
if lastFailoverErr != nil {
@@ -168,6 +170,7 @@ func (h *OpenAIGatewayHandler) Images(c *gin.Context) {
168170
return
169171
}
170172
if selection == nil || selection.Account == nil {
173+
annotateOpenAISelectionFailure(c, h.gatewayService, apiKey.GroupID, sessionHash, nil, "No available compatible accounts")
171174
h.handleStreamingAwareError(c, http.StatusServiceUnavailable, "api_error", "No available compatible accounts", streamStarted)
172175
return
173176
}
@@ -184,7 +187,7 @@ func (h *OpenAIGatewayHandler) Images(c *gin.Context) {
184187
account := selection.Account
185188
sessionHash = ensureOpenAIPoolModeSessionHash(sessionHash, account)
186189
reqLog.Debug("openai.images.account_selected", zap.Int64("account_id", account.ID), zap.String("account_name", account.Name))
187-
setOpsSelectedAccount(c, account.ID, account.Platform)
190+
setOpsSelectedAccount(c, account.ID, account.Name, account.Platform)
188191

189192
accountReleaseFunc, acquired := h.acquireResponsesAccountSlot(c, apiKey.GroupID, sessionHash, selection, parsed.Stream, &streamStarted, reqLog)
190193
if !acquired {

0 commit comments

Comments
 (0)