Skip to content

Commit 5b66cef

Browse files
committed
fix(proxy): retry first-token timeouts by round
1 parent e68e06c commit 5b66cef

4 files changed

Lines changed: 244 additions & 53 deletions

File tree

proxy/handler.go

Lines changed: 81 additions & 38 deletions
Original file line numberDiff line numberDiff line change
@@ -1190,7 +1190,7 @@ func (h *Handler) Responses(c *gin.Context) {
11901190
rateLimitRetries := 0
11911191
var lastStatusCode int
11921192
var lastBody []byte
1193-
excludeAccounts := make(map[int64]bool) // 重试时排除已失败的账号
1193+
retryExclusions := newRetryAccountExclusions()
11941194
invalidEncryptedContentRetried := false
11951195

11961196
// 上游 ctx 生命周期:每次 attempt 开始前用新的 drainable ctx 替换,
@@ -1203,18 +1203,14 @@ func (h *Handler) Responses(c *gin.Context) {
12031203
}()
12041204

12051205
for attempt := 0; ; attempt++ {
1206-
account, stickyProxyURL := h.nextAccountForSessionWithFilter(affinityKey, apiKeyID, excludeAccounts, accountFilter)
1206+
account, stickyProxyURL := h.nextRetryAccountForSession(c.Request.Context(), affinityKey, apiKeyID, retryExclusions, accountFilter)
12071207
if account == nil {
1208-
// 排队等待可用账号(最多 30s)
1209-
account, stickyProxyURL = h.store.WaitForSessionAvailableWithFilter(c.Request.Context(), affinityKey, 30*time.Second, apiKeyID, excludeAccounts, accountFilter)
1210-
if account == nil {
1211-
if lastStatusCode == http.StatusTooManyRequests && len(lastBody) > 0 {
1212-
h.sendFinalUpstreamError(c, lastStatusCode, lastBody)
1213-
return
1214-
}
1215-
c.JSON(http.StatusServiceUnavailable, noAvailableAccountError(effectiveModel))
1208+
if lastStatusCode == http.StatusTooManyRequests && len(lastBody) > 0 {
1209+
h.sendFinalUpstreamError(c, lastStatusCode, lastBody)
12161210
return
12171211
}
1212+
c.JSON(http.StatusServiceUnavailable, noAvailableAccountError(effectiveModel))
1213+
return
12181214
}
12191215

12201216
start := time.Now()
@@ -1258,20 +1254,33 @@ func (h *Handler) Responses(c *gin.Context) {
12581254
if timedOut {
12591255
reqErr = firstTokenTimeoutError(currentFirstTokenTimeout())
12601256
}
1261-
if kind := classifyTransportFailure(reqErr); kind != "" {
1257+
kind := classifyTransportFailure(reqErr)
1258+
retryable := IsRetryableError(reqErr) || kind != ""
1259+
shouldRetry := false
1260+
if retryable {
1261+
shouldRetry = shouldRetryRequestError(reqErr, &generalRetries, maxRetries)
1262+
}
1263+
if kind != "" && !(timedOut && shouldRetry) {
12621264
h.store.ReportRequestFailure(account, kind, time.Duration(durationMs)*time.Millisecond)
12631265
}
12641266
h.store.Release(account)
12651267
h.store.UnbindSessionAffinity(affinityKey, account.ID())
1266-
excludeAccounts[account.ID()] = true
1268+
if timedOut && shouldRetry {
1269+
retryExclusions.MarkSoftFirstTokenTimeout(account.ID())
1270+
log.Printf("OpenAI Responses 上游首字超时,断开并重试 (attempt %d/%d, account %d): %v", attempt+1, maxRetries+1, account.ID(), reqErr)
1271+
continue
1272+
}
1273+
if !timedOut {
1274+
retryExclusions.MarkHard(account.ID())
1275+
}
12671276

1268-
if !IsRetryableError(reqErr) && classifyTransportFailure(reqErr) == "" {
1277+
if !retryable {
12691278
ErrorToGinResponse(c, reqErr)
12701279
return
12711280
}
12721281

12731282
log.Printf("OpenAI Responses 上游请求失败 (attempt %d): %v", attempt+1, reqErr)
1274-
if shouldRetryRequestError(reqErr, &generalRetries, maxRetries) {
1283+
if shouldRetry {
12751284
continue
12761285
}
12771286
ErrorToGinResponse(c, reqErr)
@@ -1311,7 +1320,7 @@ func (h *Handler) Responses(c *gin.Context) {
13111320
}
13121321
h.store.Release(account)
13131322
h.store.UnbindSessionAffinity(affinityKey, account.ID())
1314-
excludeAccounts[account.ID()] = true
1323+
retryExclusions.MarkHard(account.ID())
13151324

13161325
log.Printf("OpenAI Responses 上游返回错误 (attempt %d, status %d): %s", attempt+1, resp.StatusCode, string(errBody))
13171326
logUpstreamError("/v1/responses", resp.StatusCode, model, account.ID(), errBody)
@@ -1461,7 +1470,11 @@ func (h *Handler) Responses(c *gin.Context) {
14611470
if shouldTransparentRetryStream(outcome, attempt, maxRetries, wroteAnyBody, c.Request.Context().Err(), writeErr) {
14621471
log.Printf("OpenAI Responses 上游流在首包前断开,重置连接并重试 (attempt %d/%d, account %d): %s", attempt+1, maxRetries+1, account.ID(), outcome.failureMessage)
14631472
recyclePooledClient(account, proxyURL)
1464-
h.store.ReportRequestFailure(account, outcome.failureKind, time.Duration(totalDuration)*time.Millisecond)
1473+
if isFirstTokenTimeoutOutcome(outcome) {
1474+
retryExclusions.MarkSoftFirstTokenTimeout(account.ID())
1475+
} else {
1476+
h.store.ReportRequestFailure(account, outcome.failureKind, time.Duration(totalDuration)*time.Millisecond)
1477+
}
14651478
resp.Body.Close()
14661479
h.store.Release(account)
14671480
h.store.UnbindSessionAffinity(affinityKey, account.ID())
@@ -1553,21 +1566,34 @@ func (h *Handler) Responses(c *gin.Context) {
15531566
if timedOut {
15541567
reqErr = firstTokenTimeoutError(currentFirstTokenTimeout())
15551568
}
1556-
if kind := classifyTransportFailure(reqErr); kind != "" {
1569+
kind := classifyTransportFailure(reqErr)
1570+
retryable := IsRetryableError(reqErr) || kind != ""
1571+
shouldRetry := false
1572+
if retryable {
1573+
shouldRetry = shouldRetryRequestError(reqErr, &generalRetries, maxRetries)
1574+
}
1575+
if kind != "" && !(timedOut && shouldRetry) {
15571576
h.store.ReportRequestFailure(account, kind, time.Duration(durationMs)*time.Millisecond)
15581577
}
15591578
h.store.Release(account)
15601579
h.store.UnbindSessionAffinity(affinityKey, account.ID())
1561-
excludeAccounts[account.ID()] = true
1580+
if timedOut && shouldRetry {
1581+
retryExclusions.MarkSoftFirstTokenTimeout(account.ID())
1582+
log.Printf("上游首字超时,断开并重试 (attempt %d/%d, account %d, /v1/responses): %v", attempt+1, maxRetries+1, account.ID(), reqErr)
1583+
continue
1584+
}
1585+
if !timedOut {
1586+
retryExclusions.MarkHard(account.ID())
1587+
}
15621588

15631589
// 不可重试的结构化错误直接返回
1564-
if !IsRetryableError(reqErr) && classifyTransportFailure(reqErr) == "" {
1590+
if !retryable {
15651591
ErrorToGinResponse(c, reqErr)
15661592
return
15671593
}
15681594

15691595
log.Printf("上游请求失败 (attempt %d): %v", attempt+1, reqErr)
1570-
if shouldRetryRequestError(reqErr, &generalRetries, maxRetries) {
1596+
if shouldRetry {
15711597
continue
15721598
}
15731599
ErrorToGinResponse(c, reqErr)
@@ -1605,7 +1631,7 @@ func (h *Handler) Responses(c *gin.Context) {
16051631
SyncCodexUsageState(h.store, account, resp)
16061632
h.store.Release(account)
16071633
h.store.UnbindSessionAffinity(affinityKey, account.ID())
1608-
excludeAccounts[account.ID()] = true
1634+
retryExclusions.MarkHard(account.ID())
16091635

16101636
log.Printf("上游返回错误 (attempt %d, status %d): %s", attempt+1, resp.StatusCode, string(errBody))
16111637
logUpstreamError("/v1/responses", resp.StatusCode, model, account.ID(), errBody)
@@ -1813,7 +1839,11 @@ func (h *Handler) Responses(c *gin.Context) {
18131839
log.Printf("上游流在首包前断开,重置连接并重试 (attempt %d/%d, account %d, /v1/responses): %s", attempt+1, maxRetries+1, account.ID(), outcome.failureMessage)
18141840
recyclePooledClient(account, proxyURL)
18151841
SyncCodexUsageState(h.store, account, resp)
1816-
h.store.ReportRequestFailure(account, outcome.failureKind, time.Duration(totalDuration)*time.Millisecond)
1842+
if isFirstTokenTimeoutOutcome(outcome) {
1843+
retryExclusions.MarkSoftFirstTokenTimeout(account.ID())
1844+
} else {
1845+
h.store.ReportRequestFailure(account, outcome.failureKind, time.Duration(totalDuration)*time.Millisecond)
1846+
}
18171847
resp.Body.Close()
18181848
h.store.Release(account)
18191849
h.store.UnbindSessionAffinity(affinityKey, account.ID())
@@ -2217,7 +2247,7 @@ func (h *Handler) ChatCompletions(c *gin.Context) {
22172247
rateLimitRetries := 0
22182248
var lastStatusCode int
22192249
var lastBody []byte
2220-
excludeAccounts := make(map[int64]bool) // 重试时排除已失败的账号
2250+
retryExclusions := newRetryAccountExclusions()
22212251

22222252
// 上游 ctx 生命周期:每次 attempt 开始前用新的 drainable ctx 替换,
22232253
// defer 兜底确保函数退出时上游被释放。
@@ -2229,18 +2259,14 @@ func (h *Handler) ChatCompletions(c *gin.Context) {
22292259
}()
22302260

22312261
for attempt := 0; ; attempt++ {
2232-
account, stickyProxyURL := h.nextAccountForSessionWithFilter(affinityKey, apiKeyID, excludeAccounts, accountFilter)
2262+
account, stickyProxyURL := h.nextRetryAccountForSession(c.Request.Context(), affinityKey, apiKeyID, retryExclusions, accountFilter)
22332263
if account == nil {
2234-
// 排队等待可用账号(最多 30s)
2235-
account, stickyProxyURL = h.store.WaitForSessionAvailableWithFilter(c.Request.Context(), affinityKey, 30*time.Second, apiKeyID, excludeAccounts, accountFilter)
2236-
if account == nil {
2237-
if lastStatusCode == http.StatusTooManyRequests && len(lastBody) > 0 {
2238-
h.sendFinalUpstreamError(c, lastStatusCode, lastBody)
2239-
return
2240-
}
2241-
c.JSON(http.StatusServiceUnavailable, noAvailableAccountError(effectiveModel))
2264+
if lastStatusCode == http.StatusTooManyRequests && len(lastBody) > 0 {
2265+
h.sendFinalUpstreamError(c, lastStatusCode, lastBody)
22422266
return
22432267
}
2268+
c.JSON(http.StatusServiceUnavailable, noAvailableAccountError(effectiveModel))
2269+
return
22442270
}
22452271

22462272
start := time.Now()
@@ -2283,21 +2309,34 @@ func (h *Handler) ChatCompletions(c *gin.Context) {
22832309
if timedOut {
22842310
reqErr = firstTokenTimeoutError(currentFirstTokenTimeout())
22852311
}
2286-
if kind := classifyTransportFailure(reqErr); kind != "" {
2312+
kind := classifyTransportFailure(reqErr)
2313+
retryable := IsRetryableError(reqErr) || kind != ""
2314+
shouldRetry := false
2315+
if retryable {
2316+
shouldRetry = shouldRetryRequestError(reqErr, &generalRetries, maxRetries)
2317+
}
2318+
if kind != "" && !(timedOut && shouldRetry) {
22872319
h.store.ReportRequestFailure(account, kind, time.Duration(durationMs)*time.Millisecond)
22882320
}
22892321
h.store.Release(account)
22902322
h.store.UnbindSessionAffinity(affinityKey, account.ID())
2291-
excludeAccounts[account.ID()] = true
2323+
if timedOut && shouldRetry {
2324+
retryExclusions.MarkSoftFirstTokenTimeout(account.ID())
2325+
log.Printf("上游首字超时,断开并重试 (attempt %d/%d, account %d, /v1/chat/completions): %v", attempt+1, maxRetries+1, account.ID(), reqErr)
2326+
continue
2327+
}
2328+
if !timedOut {
2329+
retryExclusions.MarkHard(account.ID())
2330+
}
22922331

22932332
// 不可重试的结构化错误直接返回
2294-
if !IsRetryableError(reqErr) && classifyTransportFailure(reqErr) == "" {
2333+
if !retryable {
22952334
ErrorToGinResponse(c, reqErr)
22962335
return
22972336
}
22982337

22992338
log.Printf("上游请求失败 (attempt %d): %v", attempt+1, reqErr)
2300-
if shouldRetryRequestError(reqErr, &generalRetries, maxRetries) {
2339+
if shouldRetry {
23012340
continue
23022341
}
23032342
ErrorToGinResponse(c, reqErr)
@@ -2314,7 +2353,7 @@ func (h *Handler) ChatCompletions(c *gin.Context) {
23142353
resp.Body.Close()
23152354
h.store.Release(account)
23162355
h.store.UnbindSessionAffinity(affinityKey, account.ID())
2317-
excludeAccounts[account.ID()] = true
2356+
retryExclusions.MarkHard(account.ID())
23182357

23192358
log.Printf("上游返回错误 (attempt %d, status %d): %s", attempt+1, resp.StatusCode, string(errBody))
23202359
logUpstreamError("/v1/chat/completions", resp.StatusCode, model, account.ID(), errBody)
@@ -2525,7 +2564,11 @@ func (h *Handler) ChatCompletions(c *gin.Context) {
25252564
log.Printf("上游流在首包前断开,重置连接并重试 (attempt %d/%d, account %d, /v1/chat/completions): %s", attempt+1, maxRetries+1, account.ID(), outcome.failureMessage)
25262565
recyclePooledClient(account, proxyURL)
25272566
SyncCodexUsageState(h.store, account, resp)
2528-
h.store.ReportRequestFailure(account, outcome.failureKind, time.Duration(totalDuration)*time.Millisecond)
2567+
if isFirstTokenTimeoutOutcome(outcome) {
2568+
retryExclusions.MarkSoftFirstTokenTimeout(account.ID())
2569+
} else {
2570+
h.store.ReportRequestFailure(account, outcome.failureKind, time.Duration(totalDuration)*time.Millisecond)
2571+
}
25292572
resp.Body.Close()
25302573
h.store.Release(account)
25312574
h.store.UnbindSessionAffinity(affinityKey, account.ID())

proxy/handler_anthropic.go

Lines changed: 29 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -135,7 +135,7 @@ func (h *Handler) Messages(c *gin.Context) {
135135
rateLimitRetries := 0
136136
var lastStatusCode int
137137
var lastBody []byte
138-
excludeAccounts := make(map[int64]bool)
138+
retryExclusions := newRetryAccountExclusions()
139139

140140
var lastUpstreamCancel context.CancelFunc
141141
defer func() {
@@ -145,17 +145,14 @@ func (h *Handler) Messages(c *gin.Context) {
145145
}()
146146

147147
for attempt := 0; ; attempt++ {
148-
account, stickyProxyURL := h.nextAccountForSessionWithFilter(affinityKey, apiKeyID, excludeAccounts, accountFilter)
148+
account, stickyProxyURL := h.nextRetryAccountForSession(c.Request.Context(), affinityKey, apiKeyID, retryExclusions, accountFilter)
149149
if account == nil {
150-
account, stickyProxyURL = h.store.WaitForSessionAvailableWithFilter(c.Request.Context(), affinityKey, 30*time.Second, apiKeyID, excludeAccounts, accountFilter)
151-
if account == nil {
152-
if lastStatusCode == http.StatusTooManyRequests && len(lastBody) > 0 {
153-
sendAnthropicError(c, http.StatusTooManyRequests, "rate_limit_error", "All accounts rate limited")
154-
return
155-
}
156-
sendAnthropicError(c, http.StatusServiceUnavailable, "overloaded_error", noAvailableAnthropicAccountMessage(effectiveModel))
150+
if lastStatusCode == http.StatusTooManyRequests && len(lastBody) > 0 {
151+
sendAnthropicError(c, http.StatusTooManyRequests, "rate_limit_error", "All accounts rate limited")
157152
return
158153
}
154+
sendAnthropicError(c, http.StatusServiceUnavailable, "overloaded_error", noAvailableAnthropicAccountMessage(effectiveModel))
155+
return
159156
}
160157

161158
start := time.Now()
@@ -197,20 +194,33 @@ func (h *Handler) Messages(c *gin.Context) {
197194
if timedOut {
198195
reqErr = firstTokenTimeoutError(currentFirstTokenTimeout())
199196
}
200-
if kind := classifyTransportFailure(reqErr); kind != "" {
197+
kind := classifyTransportFailure(reqErr)
198+
retryable := IsRetryableError(reqErr) || kind != ""
199+
shouldRetry := false
200+
if retryable {
201+
shouldRetry = shouldRetryRequestError(reqErr, &generalRetries, maxRetries)
202+
}
203+
if kind != "" && !(timedOut && shouldRetry) {
201204
h.store.ReportRequestFailure(account, kind, time.Duration(durationMs)*time.Millisecond)
202205
}
203206
h.store.Release(account)
204207
h.store.UnbindSessionAffinity(affinityKey, account.ID())
205-
excludeAccounts[account.ID()] = true
208+
if timedOut && shouldRetry {
209+
retryExclusions.MarkSoftFirstTokenTimeout(account.ID())
210+
log.Printf("上游首字超时,断开并重试 (attempt %d/%d, account %d, /v1/messages): %v", attempt+1, maxRetries+1, account.ID(), reqErr)
211+
continue
212+
}
213+
if !timedOut {
214+
retryExclusions.MarkHard(account.ID())
215+
}
206216

207-
if !IsRetryableError(reqErr) && classifyTransportFailure(reqErr) == "" {
217+
if !retryable {
208218
sendAnthropicError(c, http.StatusBadGateway, "api_error", "Upstream request failed")
209219
return
210220
}
211221

212222
log.Printf("上游请求失败 (attempt %d, /v1/messages): %v", attempt+1, reqErr)
213-
if shouldRetryRequestError(reqErr, &generalRetries, maxRetries) {
223+
if shouldRetry {
214224
continue
215225
}
216226
sendAnthropicError(c, http.StatusBadGateway, "api_error", "Upstream request failed")
@@ -229,7 +239,7 @@ func (h *Handler) Messages(c *gin.Context) {
229239
resp.Body.Close()
230240
h.store.Release(account)
231241
h.store.UnbindSessionAffinity(affinityKey, account.ID())
232-
excludeAccounts[account.ID()] = true
242+
retryExclusions.MarkHard(account.ID())
233243

234244
log.Printf("上游返回错误 (attempt %d, status %d, /v1/messages): %s", attempt+1, resp.StatusCode, string(errBody))
235245
logUpstreamError("/v1/messages", resp.StatusCode, model, account.ID(), errBody)
@@ -447,7 +457,11 @@ func (h *Handler) Messages(c *gin.Context) {
447457
if usagePct, ok := parseCodexUsageHeaders(resp, account); ok {
448458
h.store.PersistUsageSnapshot(account, usagePct)
449459
}
450-
h.store.ReportRequestFailure(account, outcome.failureKind, time.Duration(totalDuration)*time.Millisecond)
460+
if isFirstTokenTimeoutOutcome(outcome) {
461+
retryExclusions.MarkSoftFirstTokenTimeout(account.ID())
462+
} else {
463+
h.store.ReportRequestFailure(account, outcome.failureKind, time.Duration(totalDuration)*time.Millisecond)
464+
}
451465
resp.Body.Close()
452466
h.store.Release(account)
453467
h.store.UnbindSessionAffinity(affinityKey, account.ID())

0 commit comments

Comments
 (0)