Skip to content

Commit 2e8d5c3

Browse files
committed
feat(scheduler): strict priority-tier selection + state machine hardening
- selectByLoadBalance now only picks from the highest priority tier; lower-priority accounts are used only after higher tiers are exhausted - ToggleScheduling routes through ManualRecover/ManualDisable to ensure route cache invalidation (previously bypassed state machine) - AccountDead always marks disabled, even for pool accounts (was degraded which auto-recovered and caused error loops) - transitionActive protects disabled state from Success callback override - Probe only applies success judgments to state machine; skips image models - Account extra field: full CRUD through API/store/frontend - Platform-level AccountFilterFunc hooks for model-specific filtering - EditGroupModal: two-column grid layout for toggle switches
1 parent 1fd3cee commit 2e8d5c3

20 files changed

Lines changed: 205 additions & 69 deletions

File tree

backend/ent/account.go

Lines changed: 1 addition & 1 deletion
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

backend/ent/schema/account.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -38,7 +38,7 @@ func (Account) Fields() []ent.Field {
3838
field.String("error_msg").Default("").
3939
Comment("进入当前状态的原因(给运维看)"),
4040
field.Bool("upstream_is_pool").Default(false).
41-
Comment("上游是账号池:Dead 判决降级为 degraded,避免池抖动把本地账号永久标 disabled"),
41+
Comment("上游是账号池:UpstreamTransient 走软降级 degraded;AccountDead 仍标 disabled"),
4242
field.Time("last_used_at").Optional().Nillable(),
4343
field.JSON("extra", map[string]interface{}{}).Optional().Default(map[string]interface{}{}).
4444
Comment("扩展配置(max_rpm / max_window_cost / max_sessions 等)"),

backend/internal/app/account/service.go

Lines changed: 26 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -60,6 +60,10 @@ type StateWriter interface {
6060
ClearRateLimitMarkers(ctx context.Context, accountID int) int
6161
// MarkDisabled 永久禁用(凭证失效等,需要人工重新验证)。
6262
MarkDisabled(ctx context.Context, accountID int, reason string)
63+
// ManualRecover 手动恢复到 active 并清除路由缓存。
64+
ManualRecover(ctx context.Context, accountID int) error
65+
// ManualDisable 手动禁用并清除路由缓存。
66+
ManualDisable(ctx context.Context, accountID int, reason string) error
6367
}
6468

6569
type Service struct {
@@ -328,6 +332,9 @@ func (r *BulkResult) appendFailure(id int, err error) {
328332

329333
// ToggleScheduling 快速切换账号调度状态。active ↔ disabled。
330334
// 其它中间态(rate_limited / degraded)一律视为"非 disabled",切换后目标 = disabled。
335+
//
336+
// 通过 StateWriter 走状态机路径,确保路由缓存立即失效——
337+
// 旧实现直接写 repo 绕过状态机,导致缓存里的旧快照让账号"自己起来"。
331338
func (s *Service) ToggleScheduling(ctx context.Context, id int) (ToggleResult, error) {
332339
logger := sdk.LoggerFromContext(ctx)
333340
item, err := s.repo.FindByID(ctx, id, LoadOptions{})
@@ -338,23 +345,31 @@ func (s *Service) ToggleScheduling(ctx context.Context, id int) (ToggleResult, e
338345
return ToggleResult{}, err
339346
}
340347

341-
newState := "disabled"
348+
var newState string
342349
if item.State == "disabled" {
343350
newState = "active"
351+
if s.stateWriter != nil {
352+
if err := s.stateWriter.ManualRecover(ctx, id); err != nil {
353+
logger.Error("account_manual_recover_failed",
354+
sdk.LogFieldAccountID, id, sdk.LogFieldError, err)
355+
return ToggleResult{}, err
356+
}
357+
}
358+
} else {
359+
newState = "disabled"
360+
if s.stateWriter != nil {
361+
if err := s.stateWriter.ManualDisable(ctx, id, "管理员手动关闭调度"); err != nil {
362+
logger.Error("account_manual_disable_failed",
363+
sdk.LogFieldAccountID, id, sdk.LogFieldError, err)
364+
return ToggleResult{}, err
365+
}
366+
}
344367
}
345368

346-
updated, err := s.repo.Update(ctx, id, UpdateInput{State: &newState})
347-
if err != nil {
348-
logger.Error("account_credential_persist_failed",
349-
sdk.LogFieldAccountID, id,
350-
"op", "toggle_scheduling",
351-
sdk.LogFieldError, err)
352-
return ToggleResult{}, err
353-
}
354369
logger.Info("account_status_changed",
355370
sdk.LogFieldAccountID, id,
356-
"state", updated.State)
357-
return ToggleResult{ID: updated.ID, State: updated.State}, nil
371+
"state", newState)
372+
return ToggleResult{ID: id, State: newState}, nil
358373
}
359374

360375
// PrepareConnectivityTest 准备账号连通性测试。

backend/internal/app/account/service_test.go

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -208,6 +208,15 @@ func (s *stubStateWriter) MarkDisabled(_ context.Context, accountID int, reason
208208
s.disabled[accountID] = reason
209209
}
210210

211+
func (s *stubStateWriter) ManualRecover(_ context.Context, _ int) error {
212+
return nil
213+
}
214+
215+
func (s *stubStateWriter) ManualDisable(_ context.Context, accountID int, reason string) error {
216+
s.disabled[accountID] = reason
217+
return nil
218+
}
219+
211220
type stubPluginCatalog struct {
212221
models []sdk.ModelInfo
213222
}

backend/internal/app/account/types.go

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -113,6 +113,7 @@ type CreateInput struct {
113113
RateMultiplier float64
114114
GroupIDs []int64
115115
UpstreamIsPool bool
116+
Extra map[string]any
116117
}
117118

118119
// UpdateInput 更新账号输入。
@@ -132,6 +133,8 @@ type UpdateInput struct {
132133
HasGroupIDs bool
133134
ProxyID *int64
134135
HasProxyID bool
136+
Extra map[string]any
137+
HasExtra bool
135138
}
136139

137140
// ToggleResult 快速切换调度状态结果。

backend/internal/infra/store/account_store.go

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -134,6 +134,9 @@ func (s *AccountStore) Create(ctx context.Context, input appaccount.CreateInput)
134134
SetRateMultiplier(input.RateMultiplier).
135135
SetUpstreamIsPool(input.UpstreamIsPool)
136136

137+
if input.Extra != nil {
138+
builder = builder.SetExtra(input.Extra)
139+
}
137140
if len(input.GroupIDs) > 0 {
138141
builder = builder.AddGroupIDs(toIntSlice(input.GroupIDs)...)
139142
}
@@ -190,6 +193,13 @@ func (s *AccountStore) Update(ctx context.Context, id int, input appaccount.Upda
190193
builder = builder.ClearProxy().SetProxyID(int(*input.ProxyID))
191194
}
192195
}
196+
if input.HasExtra {
197+
if input.Extra == nil {
198+
builder = builder.SetExtra(map[string]interface{}{})
199+
} else {
200+
builder = builder.SetExtra(input.Extra)
201+
}
202+
}
193203

194204
if _, err := builder.Save(ctx); err != nil {
195205
if ent.IsNotFound(err) {

backend/internal/plugin/host_service.go

Lines changed: 23 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -296,7 +296,7 @@ func (h *HostService) probeForward(ctx context.Context, req *pb.HostProbeForward
296296
model := req.Model
297297
if model == "" {
298298
if models := h.manager.GetModels(g.Platform); len(models) > 0 {
299-
model = models[0].ID
299+
model = pickProbeModel(models)
300300
}
301301
}
302302
if model == "" {
@@ -333,9 +333,6 @@ func (h *HostService) probeForward(ctx context.Context, req *pb.HostProbeForward
333333
"max_tokens": 5,
334334
})
335335

336-
// X-Airgate-Internal 让下游网关(如 gateway-claude 的 claude_code_only 开关)
337-
// 识别这是 HostService 自家的黑盒探测流量,跳过面向外部客户端的身份闸。
338-
// 与 account.TestAccount 的管理后台测试走同一约定,插件侧统一用这一个 header 判。
339336
fwdReq := &sdk.ForwardRequest{
340337
Account: &sdk.Account{
341338
ID: int64(accFull.ID),
@@ -371,15 +368,17 @@ func (h *HostService) probeForward(ctx context.Context, req *pb.HostProbeForward
371368
return resp, nil
372369
}
373370

374-
// 探测的判决同样交给状态机(与真实流量同一入口),让探测信号驱动账号状态。
375-
h.scheduler.Apply(ctx, acc.ID, scheduler.Judgment{
376-
Kind: outcome.Kind,
377-
RetryAfter: outcome.RetryAfter,
378-
Reason: outcome.Reason,
379-
Duration: latency,
380-
IsPool: accFull.UpstreamIsPool,
381-
Family: scheduler.ModelFamily(accFull.Platform, model),
382-
})
371+
// 探测成功时通知状态机,让降级账号有机会恢复;探测失败时不触发降级,
372+
// 避免探测模型不可用(如上游缺通道)误伤整个账号的可调度性。
373+
// 失败信号由 health 插件自行记录到 group_health_probes,不经过账号状态机。
374+
if outcome.Kind.IsSuccess() {
375+
h.scheduler.Apply(ctx, acc.ID, scheduler.Judgment{
376+
Kind: outcome.Kind,
377+
Duration: latency,
378+
IsPool: accFull.UpstreamIsPool,
379+
Family: scheduler.ModelFamily(accFull.Platform, model),
380+
})
381+
}
383382

384383
switch outcome.Kind {
385384
case sdk.OutcomeSuccess:
@@ -1222,6 +1221,17 @@ func errProbeResp(kind, msg string, start time.Time) *pb.HostProbeForwardRespons
12221221
}
12231222
}
12241223

1224+
// pickProbeModel 从模型列表中选一个非图片模型用于探测。
1225+
// 图片模型探测需要实际生图(成本高),跳过;如果全是图片模型则返回空。
1226+
func pickProbeModel(models []sdk.ModelInfo) string {
1227+
for _, m := range models {
1228+
if !isImageModel(m.ID) {
1229+
return m.ID
1230+
}
1231+
}
1232+
return ""
1233+
}
1234+
12251235
// truncateProbeErr 限制 error_msg 长度,避免巨型上游错误体污染探测表。
12261236
func truncateProbeErr(s string) string {
12271237
const max = 512

backend/internal/scheduler/admin.go

Lines changed: 11 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -10,26 +10,34 @@ import (
1010
// 管理员 / 配额巡检的状态写入口。这些调用不经过 Apply —— 它们是"外部已知事实"
1111
// 的直接落库,不需要 RPM 回退、失败计数等逻辑。
1212

13-
// ManualRecover 运维手动把账号恢复到 active:清状态、清到期、清原因。
13+
// ManualRecover 运维手动把账号恢复到 active:清状态、清到期、清原因,并立即刷新路由缓存
1414
func (s *Scheduler) ManualRecover(ctx context.Context, accountID int) error {
1515
dbCtx, cancel := context.WithTimeout(ctx, dbTimeout)
1616
defer cancel()
17-
return s.db.Account.UpdateOneID(accountID).
17+
err := s.db.Account.UpdateOneID(accountID).
1818
SetState(account.StateActive).
1919
ClearStateUntil().
2020
SetErrorMsg("").
2121
Exec(dbCtx)
22+
if err == nil {
23+
s.routeCache.InvalidateAll()
24+
}
25+
return err
2226
}
2327

2428
// ManualDisable 运维手动禁用账号(语义等同自动 disabled,需要再次 ManualRecover 才能恢复)。
2529
func (s *Scheduler) ManualDisable(ctx context.Context, accountID int, reason string) error {
2630
dbCtx, cancel := context.WithTimeout(ctx, dbTimeout)
2731
defer cancel()
28-
return s.db.Account.UpdateOneID(accountID).
32+
err := s.db.Account.UpdateOneID(accountID).
2933
SetState(account.StateDisabled).
3034
ClearStateUntil().
3135
SetErrorMsg(truncateReason(reason)).
3236
Exec(dbCtx)
37+
if err == nil {
38+
s.routeCache.InvalidateAll()
39+
}
40+
return err
3341
}
3442

3543
// MarkRateLimited 配额巡检发现额度窗口已满时打入 rate_limited 直到 until。

backend/internal/scheduler/schedulability.go

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,16 @@ const (
1212
NotSchedulable
1313
)
1414

15+
// ExtraString 从 account.Extra 中安全提取 string 值
16+
func ExtraString(extra map[string]interface{}, key string) string {
17+
v, ok := extra[key]
18+
if !ok {
19+
return ""
20+
}
21+
s, _ := v.(string)
22+
return s
23+
}
24+
1525
// ExtraFloat64 从 account.Extra 中安全提取 float64 值
1626
func ExtraFloat64(extra map[string]interface{}, key string) float64 {
1727
v, ok := extra[key]

backend/internal/scheduler/scheduler.go

Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,11 @@ var (
2020
// dbTimeout 后台 DB 操作超时,防止 goroutine 泄漏。
2121
const dbTimeout = 10 * time.Second
2222

23+
// AccountFilterFunc 平台级账号过滤回调。
24+
// 在 SelectAccount 的模型路由之后、状态过滤之前执行,用于实现平台特有的账号筛选逻辑
25+
// (如 OpenAI 平台按 capability 区分生图/对话账号)。
26+
type AccountFilterFunc func(candidates []*ent.Account, model string) []*ent.Account
27+
2328
// Scheduler 账户调度器。
2429
//
2530
// 两层职责清晰分离:
@@ -39,6 +44,16 @@ type Scheduler struct {
3944
state *StateMachine
4045
familyCooldown *FamilyCooldown
4146
routeCache *routeCache
47+
accountFilters map[string]AccountFilterFunc
48+
}
49+
50+
// SetAccountFilter 注册平台级账号过滤函数。
51+
// 在 SelectAccount 选号管线中,模型路由之后、状态过滤之前执行。
52+
func (s *Scheduler) SetAccountFilter(platform string, fn AccountFilterFunc) {
53+
if s.accountFilters == nil {
54+
s.accountFilters = make(map[string]AccountFilterFunc)
55+
}
56+
s.accountFilters[platform] = fn
4257
}
4358

4459
// NewScheduler 构造调度器。

0 commit comments

Comments
 (0)