11package plugin
22
33import (
4+ "context"
5+ "errors"
46 "net/http"
57 "strings"
68 "time"
@@ -126,11 +128,7 @@ func (f *Forwarder) Forward(c *gin.Context) {
126128 startedAt := state .startedAt
127129 totalAttempts := 0
128130
129- // rateLimited* 跟踪本次请求最近一次被上游限流时的退避建议。最终走到 all_routes_failed
130- // 时用来给客户端回 429 + Retry-After,而不是无信号的 503,让 SDK 能正确退避。
131- // 多次命中限流时取最小值(最早恢复的账号决定何时重试最合理)。
132- rateLimitedSeen := false
133- rateLimitedRetryAfter := time .Duration (0 )
131+ failureSummary := allRoutesFailureSummary {}
134132
135133 for _ , route := range routes {
136134 state .selectedRoute = route
@@ -146,6 +144,7 @@ func (f *Forwarder) Forward(c *gin.Context) {
146144 exclude = append (exclude , softExclude ... )
147145
148146 if err := f .pickAccount (c , state , exclude ... ); err != nil {
147+ failureSummary .recordPickAccountError (err )
149148 if len (softExclude ) > 0 && time .Now ().Before (queueDeadline ) {
150149 softExclude = nil
151150 select {
@@ -171,6 +170,7 @@ func (f *Forwarder) Forward(c *gin.Context) {
171170 attemptLogger := logger .With (sdk .LogFieldAccountID , accountID )
172171 releaseAccountSlot , ok := f .acquireAccountSlot (c , state )
173172 if ! ok {
173+ failureSummary .recordLocalCapacityFailure ()
174174 softExclude = append (softExclude , accountID )
175175 continue
176176 }
@@ -191,6 +191,7 @@ func (f *Forwarder) Forward(c *gin.Context) {
191191 totalAttempts ++
192192
193193 if f .canFailover (c , state , execution ) {
194+ failureSummary .recordExecution (execution )
194195 attrs := []any {
195196 "attempt" , attempt ,
196197 "kind" , execution .outcome .Kind ,
@@ -207,14 +208,6 @@ func (f *Forwarder) Forward(c *gin.Context) {
207208 releaseAccountSlot ()
208209 f .applyOutcome (ctx , state , execution )
209210
210- if execution .outcome .Kind == sdk .OutcomeAccountRateLimited {
211- ra := execution .outcome .RetryAfter
212- if ! rateLimitedSeen || (ra > 0 && (rateLimitedRetryAfter == 0 || ra < rateLimitedRetryAfter )) {
213- rateLimitedSeen = true
214- rateLimitedRetryAfter = ra
215- }
216- }
217-
218211 if execution .outcome .Kind .IsAccountFault () {
219212 hardExclude = append (hardExclude , accountID )
220213 } else {
@@ -257,21 +250,146 @@ func (f *Forwarder) Forward(c *gin.Context) {
257250 if len (hardExclude ) > 0 {
258251 failAttrs = append (failAttrs , "tried_accounts" , hardExclude )
259252 }
260- if rateLimitedSeen {
261- failAttrs = append (failAttrs , "rate_limited_retry_after_ms" , rateLimitedRetryAfter .Milliseconds ())
253+ if failureSummary . rateLimitedSeen {
254+ failAttrs = append (failAttrs , "rate_limited_retry_after_ms" , failureSummary . rateLimitedRetryAfter .Milliseconds ())
262255 }
263256 logger .Error ("forward_request_failed" , failAttrs ... )
264257
265- // 走到这里都是"上游容量不足"——上游 429、家族冷却中、并发槽满 + 排队超时,
266- // 客户端视角统一归为可重试的限流,回 429 + Retry-After 让 SDK 自动退避。
267- // 真正的"无候选分组 / 配置错"已经在最前面 routes 为空时回了 no_available_route,
268- // 不会走到这里;这里再回 503 只会让客户端拿到无信号的失败,触发更猛的重试。
269- retryAfter := rateLimitedRetryAfter
258+ writeAllRoutesFailed (c , failureSummary )
259+ }
260+
261+ type allRoutesFailureSummary struct {
262+ rateLimitedSeen bool
263+ rateLimitedRetryAfter time.Duration
264+ localCapacitySeen bool
265+ accountUnavailable bool
266+ accountDeadSeen bool
267+ upstreamTimeoutSeen bool
268+ upstreamFailureSeen bool
269+ }
270+
271+ func (s * allRoutesFailureSummary ) recordExecution (execution forwardExecution ) {
272+ switch execution .outcome .Kind {
273+ case sdk .OutcomeAccountRateLimited :
274+ s .rateLimitedSeen = true
275+ s .recordRetryAfter (execution .outcome .RetryAfter )
276+ case sdk .OutcomeAccountDead :
277+ s .accountDeadSeen = true
278+ case sdk .OutcomeUpstreamTransient :
279+ if isTimeoutFailure (execution ) {
280+ s .upstreamTimeoutSeen = true
281+ return
282+ }
283+ s .upstreamFailureSeen = true
284+ case sdk .OutcomeUnknown :
285+ if execution .err != nil {
286+ s .upstreamFailureSeen = true
287+ }
288+ }
289+ }
290+
291+ func (s * allRoutesFailureSummary ) recordRetryAfter (retryAfter time.Duration ) {
270292 if retryAfter <= 0 {
271- retryAfter = allRoutesFailedDefaultRetryAfter
293+ return
294+ }
295+ if s .rateLimitedRetryAfter == 0 || retryAfter < s .rateLimitedRetryAfter {
296+ s .rateLimitedRetryAfter = retryAfter
297+ }
298+ }
299+
300+ func (s * allRoutesFailureSummary ) recordPickAccountError (error ) {
301+ s .accountUnavailable = true
302+ }
303+
304+ func (s * allRoutesFailureSummary ) recordLocalCapacityFailure () {
305+ s .localCapacitySeen = true
306+ }
307+
308+ type allRoutesFailureResponse struct {
309+ status int
310+ errType string
311+ code string
312+ message string
313+ retryAfter time.Duration
314+ }
315+
316+ func writeAllRoutesFailed (c * gin.Context , summary allRoutesFailureSummary ) {
317+ response := selectAllRoutesFailureResponse (summary )
318+ if response .status == http .StatusTooManyRequests {
319+ openAIRateLimitError (c , response .status , response .code , response .message , response .retryAfter )
320+ return
321+ }
322+ openAIError (c , response .status , response .errType , response .code , response .message )
323+ }
324+
325+ func selectAllRoutesFailureResponse (summary allRoutesFailureSummary ) allRoutesFailureResponse {
326+ if summary .rateLimitedSeen {
327+ retryAfter := summary .rateLimitedRetryAfter
328+ if retryAfter <= 0 {
329+ retryAfter = allRoutesFailedDefaultRetryAfter
330+ }
331+ return allRoutesFailureResponse {
332+ status : http .StatusTooManyRequests ,
333+ errType : "rate_limit_error" ,
334+ code : "all_routes_rate_limited" ,
335+ message : "上游账号当前被限流,请稍后重试" ,
336+ retryAfter : retryAfter ,
337+ }
338+ }
339+ if summary .localCapacitySeen {
340+ return allRoutesFailureResponse {
341+ status : http .StatusTooManyRequests ,
342+ errType : "rate_limit_error" ,
343+ code : "all_routes_capacity_exhausted" ,
344+ message : "上游容量暂时不足,请稍后重试" ,
345+ retryAfter : allRoutesFailedDefaultRetryAfter ,
346+ }
347+ }
348+ if summary .upstreamTimeoutSeen {
349+ return allRoutesFailureResponse {
350+ status : http .StatusGatewayTimeout ,
351+ errType : "server_error" ,
352+ code : "upstream_timeout" ,
353+ message : "上游请求超时,请稍后重试" ,
354+ }
355+ }
356+ if summary .upstreamFailureSeen {
357+ return allRoutesFailureResponse {
358+ status : http .StatusBadGateway ,
359+ errType : "server_error" ,
360+ code : "upstream_error" ,
361+ message : "上游服务暂不可用,请稍后重试" ,
362+ }
363+ }
364+ if summary .accountDeadSeen || summary .accountUnavailable {
365+ return allRoutesFailureResponse {
366+ status : http .StatusServiceUnavailable ,
367+ errType : "server_error" ,
368+ code : "no_available_account" ,
369+ message : "暂无可用上游账号,请稍后重试" ,
370+ }
371+ }
372+ return allRoutesFailureResponse {
373+ status : http .StatusServiceUnavailable ,
374+ errType : "server_error" ,
375+ code : "all_routes_failed" ,
376+ message : "请求暂时无法完成,请稍后重试" ,
377+ }
378+ }
379+
380+ func isTimeoutFailure (execution forwardExecution ) bool {
381+ if execution .outcome .Upstream .StatusCode == http .StatusGatewayTimeout {
382+ return true
383+ }
384+ if errors .Is (execution .err , context .DeadlineExceeded ) {
385+ return true
386+ }
387+ var timeoutErr interface { Timeout () bool }
388+ if errors .As (execution .err , & timeoutErr ) && timeoutErr .Timeout () {
389+ return true
272390 }
273- openAIRateLimitError ( c , http . StatusTooManyRequests , "all_routes_failed" ,
274- "上游容量暂时不足,请稍后重试" , retryAfter )
391+ reason := strings . ToLower ( judgmentReason ( execution ))
392+ return strings . Contains ( reason , "timeout" ) || strings . Contains ( reason , "timed out" ) || strings . Contains ( reason , "deadline exceeded" )
275393}
276394
277395func routesForAPIKey (state * forwardState , requirements routing.Requirements ) []routing.Candidate {
0 commit comments