Skip to content

Commit e7f5f39

Browse files
committed
fix: remove per-message account lock
1 parent 68e7658 commit e7f5f39

3 files changed

Lines changed: 13 additions & 45 deletions

File tree

backend/internal/plugin/forwarder.go

Lines changed: 6 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -70,9 +70,8 @@ const queueMaxPollInterval = 2 * time.Second
7070
// 499 是 nginx 风格的 Client Closed Request,仅用于本地日志和状态归类。
7171
const statusClientClosedRequest = 499
7272

73-
// allRoutesFailedDefaultRetryAfter 客户端最终被拒时,若没有任何上游 RetryAfter 可参考
74-
// (比如 max_concurrency 打满、所有账号都在冷却但 state_until 没回填到这一层),
75-
// 给客户端一个保守的退避建议。1s 既能避免雪崩,又比 60s 更贴合"瞬时打满"的真实恢复节奏。
73+
// allRoutesFailedDefaultRetryAfter 客户端最终因真实上游限流被拒时,若没有任何上游 RetryAfter 可参考,
74+
// 给客户端一个保守的退避建议。1s 既能避免雪崩,又比 60s 更贴合"瞬时限流"的真实恢复节奏。
7675
const allRoutesFailedDefaultRetryAfter = time.Second
7776

7877
// Forward 入口。失败时自动 failover 到其它账号,最多 maxFailoverAttempts 次。
@@ -454,11 +453,10 @@ func selectAllRoutesFailureResponse(summary allRoutesFailureSummary) allRoutesFa
454453
}
455454
if summary.localCapacitySeen {
456455
return allRoutesFailureResponse{
457-
status: http.StatusTooManyRequests,
458-
errType: "rate_limit_error",
459-
code: "all_routes_capacity_exhausted",
460-
message: "上游容量暂时不足,请稍后重试",
461-
retryAfter: allRoutesFailedDefaultRetryAfter,
456+
status: http.StatusServiceUnavailable,
457+
errType: "server_error",
458+
code: "all_routes_failed",
459+
message: "请求暂时无法完成,请稍后重试",
462460
}
463461
}
464462
if summary.upstreamTimeoutSeen {

backend/internal/plugin/forwarder_test.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -403,8 +403,8 @@ func TestSelectAllRoutesFailureResponse(t *testing.T) {
403403
localCapacitySeen: true,
404404
upstreamFailureSeen: true,
405405
},
406-
wantStatus: http.StatusTooManyRequests,
407-
wantCode: "all_routes_capacity_exhausted",
406+
wantStatus: http.StatusServiceUnavailable,
407+
wantCode: "all_routes_failed",
408408
},
409409
{
410410
name: "upstream timeout",

backend/internal/plugin/quota.go

Lines changed: 5 additions & 35 deletions
Original file line numberDiff line numberDiff line change
@@ -121,12 +121,12 @@ func (f *Forwarder) pickAccount(c *gin.Context, state *forwardState, excludeIDs
121121
return scheduler.ErrNoAvailableAccount
122122
}
123123

124-
// acquireAccountSlot 获取账号级闸门:RPM 配额 + 真实用户消息串行锁 + 账号并发槽。
124+
// acquireAccountSlot 获取账号级闸门:RPM 配额 + 账号并发槽。
125125
// 返回 release func 与 ok 标记。ok=false 表示当前账号暂不可用(RPM 已满 / 并发已满),
126126
// 调用方应把本账号加入 excludeIDs 并 failover 到下一个账号。失败时不写客户端响应——
127127
// 由主循环在 failover 全部用尽时兜底写 503。
128128
//
129-
// 每次 failover attempt 都要重新 acquire。release 顺序和 acquire 顺序相反
129+
// 每次 failover attempt 都要重新 acquire。账号实际并发只由 MaxConcurrency 控制
130130
func (f *Forwarder) acquireAccountSlot(c *gin.Context, state *forwardState) (func(), bool) {
131131
ctx := c.Request.Context()
132132
releaseCtx := context.Background()
@@ -140,54 +140,24 @@ func (f *Forwarder) acquireAccountSlot(c *gin.Context, state *forwardState) (fun
140140
return nil, false
141141
}
142142

143-
// 2. 消息锁 + 均摊延迟(仅真实用户消息)
144-
releaseMsgLock := func() {}
145-
if scheduler.IsRealUserMessage(state.body) {
146-
acquired, err := f.scheduler.AcquireMessageLock(ctx, state.account.ID, state.requestID, state.account.Extra)
147-
if err != nil {
148-
releaseMsgLock()
149-
f.scheduler.DecrementRPM(ctx, state.account.ID)
150-
slog.Info("账号消息锁获取失败,尝试 failover",
151-
"account_id", state.account.ID,
152-
"error", err,
153-
)
154-
return nil, false
155-
}
156-
if !acquired {
157-
releaseMsgLock()
158-
f.scheduler.DecrementRPM(ctx, state.account.ID)
159-
slog.Info("账号消息锁排队已满,尝试 failover",
160-
"account_id", state.account.ID,
161-
"max_waiters", scheduler.ExtraInt(state.account.Extra, "msg_lock_max_waiters"),
162-
)
163-
return nil, false
164-
}
165-
releaseMsgLock = func() {
166-
f.scheduler.ReleaseMessageLock(releaseCtx, state.account.ID, state.requestID)
167-
}
168-
f.scheduler.EnforceMessageDelay(ctx, state.account.ID, state.account.Extra)
169-
}
170-
171-
// 3. 账号并发槽
143+
// 2. 账号并发槽
172144
maxConc := state.account.MaxConcurrency
173145
if maxConc <= 0 {
174146
maxConc = scheduler.DefaultAccountMaxConcurrency
175147
}
176148
slotTTL := time.Duration(scheduler.ExtraInt(state.account.Extra, "slot_ttl_seconds")) * time.Second
177149

178150
if err := f.concurrency.AcquireSlot(ctx, state.account.ID, state.requestID, maxConc, slotTTL); err != nil {
179-
releaseMsgLock()
180151
f.scheduler.DecrementRPM(ctx, state.account.ID)
181152
slog.Info("账号并发已满,尝试 failover",
182153
"account_id", state.account.ID, "max_concurrency", maxConc)
183154
return nil, false
184155
}
185156

186-
// 反向释放:slot → msg lock。RPM 不在 release 里回退——正常完成流程会通过
187-
// scheduler.Apply 决定是否 DecrementRPM(非 Success 判决都会回退)。
157+
// RPM 不在 release 里回退——正常完成流程会通过 scheduler.Apply 决定是否 DecrementRPM
158+
//(非 Success 判决都会回退)。
188159
return func() {
189160
f.concurrency.ReleaseSlot(releaseCtx, state.account.ID, state.requestID)
190-
releaseMsgLock()
191161
}, true
192162
}
193163

0 commit comments

Comments
 (0)