Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
15 changes: 10 additions & 5 deletions backend/internal/handler/gateway_handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -308,7 +308,8 @@ func (h *GatewayHandler) Messages(c *gin.Context) {
zap.String("platform", platform),
zap.Error(err),
)
h.handleStreamingAwareError(c, http.StatusServiceUnavailable, "api_error", "No available accounts: "+err.Error(), streamStarted)
status, errType, message := h.openAIAccountSelectionErrorResponse(c, platform, apiKey.GroupID, service.OpenAIAccountSelectionNoAvailableMessage, "No available accounts: "+err.Error())
h.handleStreamingAwareError(c, status, errType, message, streamStarted)
return
}
action := fs.HandleSelectionExhausted(c.Request.Context())
Expand Down Expand Up @@ -357,7 +358,8 @@ func (h *GatewayHandler) Messages(c *gin.Context) {
zap.String("model", reqModel),
zap.String("platform", platform),
)
h.handleStreamingAwareError(c, http.StatusServiceUnavailable, "api_error", "No available accounts", streamStarted)
status, errType, message := h.openAIAccountSelectionErrorResponse(c, platform, apiKey.GroupID, service.OpenAIAccountSelectionNoAvailableMessage, "No available accounts")
h.handleStreamingAwareError(c, status, errType, message, streamStarted)
return
}
accountWaitCounted := false
Expand Down Expand Up @@ -586,7 +588,8 @@ func (h *GatewayHandler) Messages(c *gin.Context) {
zap.Bool("fallback_used", fallbackUsed),
zap.Error(err),
)
h.handleStreamingAwareError(c, http.StatusServiceUnavailable, "api_error", "No available accounts: "+err.Error(), streamStarted)
status, errType, message := h.openAIAccountSelectionErrorResponse(c, platform, currentAPIKey.GroupID, service.OpenAIAccountSelectionNoAvailableMessage, "No available accounts: "+err.Error())
h.handleStreamingAwareError(c, status, errType, message, streamStarted)
return
}
action := fs.HandleSelectionExhausted(c.Request.Context())
Expand Down Expand Up @@ -645,7 +648,8 @@ func (h *GatewayHandler) Messages(c *gin.Context) {
zap.String("model", reqModel),
zap.String("platform", platform),
)
h.handleStreamingAwareError(c, http.StatusServiceUnavailable, "api_error", "No available accounts", streamStarted)
status, errType, message := h.openAIAccountSelectionErrorResponse(c, platform, currentAPIKey.GroupID, service.OpenAIAccountSelectionNoAvailableMessage, "No available accounts")
h.handleStreamingAwareError(c, status, errType, message, streamStarted)
return
}
accountWaitCounted := false
Expand Down Expand Up @@ -1786,7 +1790,8 @@ func (h *GatewayHandler) CountTokens(c *gin.Context) {
if err != nil {
reqLog.Warn("gateway.count_tokens_select_account_failed", zap.Error(err))
markOpsRoutingCapacityLimitedIfNoAvailable(c, err)
h.errorResponse(c, http.StatusServiceUnavailable, "api_error", "Service temporarily unavailable")
status, errType, message := h.openAIAccountSelectionErrorResponse(c, apiKeyGroupPlatform(apiKey), apiKey.GroupID, service.OpenAIAccountSelectionNoAvailableMessage, "Service temporarily unavailable")
h.errorResponse(c, status, errType, message)
return
}
setOpsSelectedAccount(c, account.ID, account.Platform)
Expand Down
6 changes: 4 additions & 2 deletions backend/internal/handler/gateway_handler_chat_completions.go
Original file line number Diff line number Diff line change
Expand Up @@ -163,7 +163,8 @@ func (h *GatewayHandler) ChatCompletions(c *gin.Context) {
if err != nil {
if len(fs.FailedAccountIDs) == 0 {
markOpsRoutingCapacityLimitedIfNoAvailable(c, err)
h.chatCompletionsErrorResponse(c, http.StatusServiceUnavailable, "api_error", "No available accounts: "+err.Error())
status, errType, message := h.openAIAccountSelectionErrorResponse(c, groupPlatform, apiKey.GroupID, service.OpenAIAccountSelectionNoAvailableMessage, "No available accounts: "+err.Error())
h.chatCompletionsErrorResponse(c, status, errType, message)
return
}
action := fs.HandleSelectionExhausted(c.Request.Context())
Expand All @@ -189,7 +190,8 @@ func (h *GatewayHandler) ChatCompletions(c *gin.Context) {
if !selection.Acquired {
if selection.WaitPlan == nil {
markOpsRoutingCapacityLimited(c)
h.chatCompletionsErrorResponse(c, http.StatusServiceUnavailable, "api_error", "No available accounts")
status, errType, message := h.openAIAccountSelectionErrorResponse(c, groupPlatform, apiKey.GroupID, service.OpenAIAccountSelectionNoAvailableMessage, "No available accounts")
h.chatCompletionsErrorResponse(c, status, errType, message)
return
}
accountReleaseFunc, err = h.concurrencyHelper.AcquireAccountSlotWithWaitTimeout(
Expand Down
6 changes: 4 additions & 2 deletions backend/internal/handler/gateway_handler_responses.go
Original file line number Diff line number Diff line change
Expand Up @@ -161,7 +161,8 @@ func (h *GatewayHandler) Responses(c *gin.Context) {
if err != nil {
if len(fs.FailedAccountIDs) == 0 {
markOpsRoutingCapacityLimitedIfNoAvailable(c, err)
h.responsesErrorResponse(c, http.StatusServiceUnavailable, "api_error", "No available accounts: "+err.Error())
status, errType, message := h.openAIAccountSelectionErrorResponse(c, apiKeyGroupPlatform(apiKey), apiKey.GroupID, service.OpenAIAccountSelectionNoAvailableMessage, "No available accounts: "+err.Error())
h.responsesErrorResponse(c, status, errType, message)
return
}
action := fs.HandleSelectionExhausted(requestCtx)
Expand All @@ -187,7 +188,8 @@ func (h *GatewayHandler) Responses(c *gin.Context) {
if !selection.Acquired {
if selection.WaitPlan == nil {
markOpsRoutingCapacityLimited(c)
h.responsesErrorResponse(c, http.StatusServiceUnavailable, "api_error", "No available accounts")
status, errType, message := h.openAIAccountSelectionErrorResponse(c, apiKeyGroupPlatform(apiKey), apiKey.GroupID, service.OpenAIAccountSelectionNoAvailableMessage, "No available accounts")
h.responsesErrorResponse(c, status, errType, message)
return
}
accountReleaseFunc, err = h.concurrencyHelper.AcquireAccountSlotWithWaitTimeout(
Expand Down
92 changes: 92 additions & 0 deletions backend/internal/handler/openai_account_selection_error.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,92 @@
package handler

import (
"context"
"net/http"

"github.com/Wei-Shaw/sub2api/internal/service"
"github.com/gin-gonic/gin"
)

type accountSelectionErrorTemplateDataFunc func(context.Context, string, *int64) service.AccountSelectionErrorTemplateData

func openAIAccountSelectionErrorResponse(
c *gin.Context,
errorPassthroughService *service.ErrorPassthroughService,
templateData accountSelectionErrorTemplateDataFunc,
platform string,
groupID *int64,
matchMessage string,
defaultMessage string,
) (int, string, string) {
status := http.StatusServiceUnavailable
errType := "api_error"
message := defaultMessage
if errorPassthroughService == nil {
return status, errType, message
}

body := []byte(matchMessage)
if platform == "" {
platform = service.PlatformOpenAI
}
rule := errorPassthroughService.MatchRule(platform, service.OpenAIAccountSelectionNoAvailableStatus, body)
if rule == nil && platform != service.PlatformOpenAI {
rule = errorPassthroughService.MatchRule(service.PlatformOpenAI, service.OpenAIAccountSelectionNoAvailableStatus, body)
}
if rule == nil {
return status, errType, message
}

if !rule.PassthroughCode && rule.ResponseCode != nil {
status = *rule.ResponseCode
}
if !rule.PassthroughBody && rule.CustomMessage != nil {
data := service.AccountSelectionErrorTemplateData{}
if templateData != nil {
ctx := context.Background()
if c != nil && c.Request != nil {
ctx = c.Request.Context()
}
data = templateData(ctx, platform, groupID)
}
message = service.RenderAccountSelectionErrorMessage(*rule.CustomMessage, data)
} else {
message = matchMessage
}
if rule.SkipMonitoring && c != nil {
c.Set(service.OpsSkipPassthroughKey, true)
}
return status, "upstream_error", message
}

func (h *OpenAIGatewayHandler) openAIAccountSelectionErrorResponse(c *gin.Context, groupID *int64, matchMessage string, defaultMessage string) (int, string, string) {
var templateData accountSelectionErrorTemplateDataFunc
if h != nil && h.gatewayService != nil {
templateData = h.gatewayService.AccountSelectionErrorTemplateData
}
var errorPassthroughService *service.ErrorPassthroughService
if h != nil {
errorPassthroughService = h.errorPassthroughService
}
return openAIAccountSelectionErrorResponse(c, errorPassthroughService, templateData, service.PlatformOpenAI, groupID, matchMessage, defaultMessage)
}

func (h *GatewayHandler) openAIAccountSelectionErrorResponse(c *gin.Context, platform string, groupID *int64, matchMessage string, defaultMessage string) (int, string, string) {
var templateData accountSelectionErrorTemplateDataFunc
if h != nil && h.gatewayService != nil {
templateData = h.gatewayService.AccountSelectionErrorTemplateData
}
var errorPassthroughService *service.ErrorPassthroughService
if h != nil {
errorPassthroughService = h.errorPassthroughService
}
return openAIAccountSelectionErrorResponse(c, errorPassthroughService, templateData, platform, groupID, matchMessage, defaultMessage)
}

func apiKeyGroupPlatform(apiKey *service.APIKey) string {
if apiKey == nil || apiKey.Group == nil {
return ""
}
return apiKey.Group.Platform
}
92 changes: 92 additions & 0 deletions backend/internal/handler/openai_account_selection_error_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,92 @@
//go:build unit

package handler

import (
"context"
"net/http"
"net/http/httptest"
"testing"

"github.com/Wei-Shaw/sub2api/internal/model"
"github.com/Wei-Shaw/sub2api/internal/service"
"github.com/gin-gonic/gin"
)

func TestOpenAIAccountSelectionErrorResponse_DefaultWithoutRule(t *testing.T) {
c, _ := gin.CreateTestContext(httptest.NewRecorder())
c.Request = httptest.NewRequest(http.MethodPost, "/responses", nil)
h := &OpenAIGatewayHandler{}

status, errType, message := h.openAIAccountSelectionErrorResponse(c, nil, service.OpenAIAccountSelectionNoAvailableMessage, "Service temporarily unavailable")

if status != http.StatusServiceUnavailable {
t.Fatalf("status=%d want %d", status, http.StatusServiceUnavailable)
}
if errType != "api_error" {
t.Fatalf("errType=%s want api_error", errType)
}
if message != "Service temporarily unavailable" {
t.Fatalf("message=%q", message)
}
}

func TestGatewayAccountSelectionErrorResponse_CustomRule(t *testing.T) {
c, _ := gin.CreateTestContext(httptest.NewRecorder())
c.Request = httptest.NewRequest(http.MethodPost, "/responses", nil)
responseCode := http.StatusTooManyRequests
customMessage := "账户额度已用完"
svc := service.NewErrorPassthroughService(&accountSelectionErrorRuleRepo{rules: []*model.ErrorPassthroughRule{
{
ID: 1,
Name: "OpenAI no available accounts",
Enabled: true,
Priority: 0,
ErrorCodes: []int{service.OpenAIAccountSelectionNoAvailableStatus},
Keywords: []string{service.OpenAIAccountSelectionNoAvailableMessage},
MatchMode: model.MatchModeAll,
Platforms: []string{model.PlatformOpenAI},
PassthroughCode: false,
ResponseCode: &responseCode,
PassthroughBody: false,
CustomMessage: &customMessage,
},
}}, nil)
h := &GatewayHandler{errorPassthroughService: svc}

status, errType, message := h.openAIAccountSelectionErrorResponse(c, service.PlatformGemini, nil, service.OpenAIAccountSelectionNoAvailableMessage, "No available accounts: no available accounts")

if status != http.StatusTooManyRequests {
t.Fatalf("status=%d want %d", status, http.StatusTooManyRequests)
}
if errType != "upstream_error" {
t.Fatalf("errType=%s want upstream_error", errType)
}
if message != customMessage {
t.Fatalf("message=%q want %q", message, customMessage)
}
}

type accountSelectionErrorRuleRepo struct {
rules []*model.ErrorPassthroughRule
}

func (r *accountSelectionErrorRuleRepo) List(context.Context) ([]*model.ErrorPassthroughRule, error) {
return r.rules, nil
}

func (r *accountSelectionErrorRuleRepo) GetByID(context.Context, int64) (*model.ErrorPassthroughRule, error) {
panic("unexpected GetByID")
}

func (r *accountSelectionErrorRuleRepo) Create(context.Context, *model.ErrorPassthroughRule) (*model.ErrorPassthroughRule, error) {
panic("unexpected Create")
}

func (r *accountSelectionErrorRuleRepo) Update(context.Context, *model.ErrorPassthroughRule) (*model.ErrorPassthroughRule, error) {
panic("unexpected Update")
}

func (r *accountSelectionErrorRuleRepo) Delete(context.Context, int64) error {
panic("unexpected Delete")
}
6 changes: 4 additions & 2 deletions backend/internal/handler/openai_chat_completions.go
Original file line number Diff line number Diff line change
Expand Up @@ -152,7 +152,8 @@ func (h *OpenAIGatewayHandler) ChatCompletions(c *gin.Context) {
)
if len(failedAccountIDs) == 0 {
markOpsRoutingCapacityLimitedIfNoAvailable(c, err)
h.handleStreamingAwareError(c, http.StatusServiceUnavailable, "api_error", "Service temporarily unavailable", streamStarted)
status, errType, message := h.openAIAccountSelectionErrorResponse(c, apiKey.GroupID, service.OpenAIAccountSelectionNoAvailableMessage, "Service temporarily unavailable")
h.handleStreamingAwareError(c, status, errType, message, streamStarted)
return
} else {
if lastFailoverErr != nil {
Expand All @@ -165,7 +166,8 @@ func (h *OpenAIGatewayHandler) ChatCompletions(c *gin.Context) {
}
if selection == nil || selection.Account == nil {
markOpsRoutingCapacityLimited(c)
h.handleStreamingAwareError(c, http.StatusServiceUnavailable, "api_error", "No available accounts", streamStarted)
status, errType, message := h.openAIAccountSelectionErrorResponse(c, apiKey.GroupID, service.OpenAIAccountSelectionNoAvailableMessage, "No available accounts")
h.handleStreamingAwareError(c, status, errType, message, streamStarted)
return
}
account := selection.Account
Expand Down
6 changes: 4 additions & 2 deletions backend/internal/handler/openai_embeddings.go
Original file line number Diff line number Diff line change
Expand Up @@ -125,7 +125,8 @@ func (h *OpenAIGatewayHandler) Embeddings(c *gin.Context) {
)
if len(failedAccountIDs) == 0 {
markOpsRoutingCapacityLimitedIfNoAvailable(c, err)
h.errorResponse(c, http.StatusServiceUnavailable, "api_error", "Service temporarily unavailable")
status, errType, message := h.openAIAccountSelectionErrorResponse(c, apiKey.GroupID, service.OpenAIAccountSelectionNoAvailableMessage, "Service temporarily unavailable")
h.errorResponse(c, status, errType, message)
return
}
if lastFailoverErr != nil {
Expand All @@ -137,7 +138,8 @@ func (h *OpenAIGatewayHandler) Embeddings(c *gin.Context) {
}
if selection == nil || selection.Account == nil {
markOpsRoutingCapacityLimited(c)
h.errorResponse(c, http.StatusServiceUnavailable, "api_error", "No available accounts")
status, errType, message := h.openAIAccountSelectionErrorResponse(c, apiKey.GroupID, service.OpenAIAccountSelectionNoAvailableMessage, "No available accounts")
h.errorResponse(c, status, errType, message)
return
}
account := selection.Account
Expand Down
18 changes: 12 additions & 6 deletions backend/internal/handler/openai_gateway_handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -344,7 +344,8 @@ func (h *OpenAIGatewayHandler) Responses(c *gin.Context) {
h.handleStreamingAwareError(c, http.StatusServiceUnavailable, "compact_not_supported", "No available OpenAI accounts support /responses/compact", streamStarted)
return
}
h.handleStreamingAwareError(c, http.StatusServiceUnavailable, "api_error", "Service temporarily unavailable", streamStarted)
status, errType, message := h.openAIAccountSelectionErrorResponse(c, apiKey.GroupID, service.OpenAIAccountSelectionNoAvailableMessage, "Service temporarily unavailable")
h.handleStreamingAwareError(c, status, errType, message, streamStarted)
return
}
if lastFailoverErr != nil {
Expand All @@ -356,7 +357,8 @@ func (h *OpenAIGatewayHandler) Responses(c *gin.Context) {
}
if selection == nil || selection.Account == nil {
markOpsRoutingCapacityLimited(c)
h.handleStreamingAwareError(c, http.StatusServiceUnavailable, "api_error", "No available accounts", streamStarted)
status, errType, message := h.openAIAccountSelectionErrorResponse(c, apiKey.GroupID, service.OpenAIAccountSelectionNoAvailableMessage, "No available accounts")
h.handleStreamingAwareError(c, status, errType, message, streamStarted)
return
}
if previousResponseID != "" && selection != nil && selection.Account != nil {
Expand Down Expand Up @@ -762,7 +764,8 @@ func (h *OpenAIGatewayHandler) Messages(c *gin.Context) {
if len(failedAccountIDs) == 0 {
if err != nil {
markOpsRoutingCapacityLimitedIfNoAvailable(c, err)
h.anthropicStreamingAwareError(c, http.StatusServiceUnavailable, "api_error", "Service temporarily unavailable", streamStarted)
status, errType, message := h.openAIAccountSelectionErrorResponse(c, apiKey.GroupID, service.OpenAIAccountSelectionNoAvailableMessage, "Service temporarily unavailable")
h.anthropicStreamingAwareError(c, status, errType, message, streamStarted)
return
}
} else {
Expand All @@ -776,7 +779,8 @@ func (h *OpenAIGatewayHandler) Messages(c *gin.Context) {
}
if selection == nil || selection.Account == nil {
markOpsRoutingCapacityLimited(c)
h.anthropicStreamingAwareError(c, http.StatusServiceUnavailable, "api_error", "No available accounts", streamStarted)
status, errType, message := h.openAIAccountSelectionErrorResponse(c, apiKey.GroupID, service.OpenAIAccountSelectionNoAvailableMessage, "No available accounts")
h.anthropicStreamingAwareError(c, status, errType, message, streamStarted)
return
}
account := selection.Account
Expand Down Expand Up @@ -1061,7 +1065,8 @@ func (h *OpenAIGatewayHandler) acquireResponsesAccountSlot(
) (func(), bool) {
if selection == nil || selection.Account == nil {
markOpsRoutingCapacityLimited(c)
h.handleStreamingAwareError(c, http.StatusServiceUnavailable, "api_error", "No available accounts", *streamStarted)
status, errType, message := h.openAIAccountSelectionErrorResponse(c, groupID, service.OpenAIAccountSelectionNoAvailableMessage, "No available accounts")
h.handleStreamingAwareError(c, status, errType, message, *streamStarted)
return nil, false
}

Expand All @@ -1072,7 +1077,8 @@ func (h *OpenAIGatewayHandler) acquireResponsesAccountSlot(
}
if selection.WaitPlan == nil {
markOpsRoutingCapacityLimited(c)
h.handleStreamingAwareError(c, http.StatusServiceUnavailable, "api_error", "No available accounts", *streamStarted)
status, errType, message := h.openAIAccountSelectionErrorResponse(c, groupID, service.OpenAIAccountSelectionNoAvailableMessage, "No available accounts")
h.handleStreamingAwareError(c, status, errType, message, *streamStarted)
return nil, false
}

Expand Down
Loading
Loading