Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
14 changes: 12 additions & 2 deletions backend/internal/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -966,6 +966,10 @@ type GatewayOpenAIWSSchedulerScoreWeights struct {
Reset float64 `mapstructure:"reset"`
// QuotaHeadroom 倾向 7d 剩余额度更健康的账号;默认 0(关闭,不改变原有行为)。
QuotaHeadroom float64 `mapstructure:"quota_headroom"`
// Quota5h is a soft Codex 5-hour quota headroom weight: lower 5h usage scores higher.
Quota5h float64 `mapstructure:"quota_5h"`
// Quota7d is a soft Codex 7-day quota headroom weight: lower 7d usage scores higher.
Quota7d float64 `mapstructure:"quota_7d"`
}

// GatewayOpenAISchedulerConfig OpenAI 高级调度器配置。
Expand Down Expand Up @@ -1887,6 +1891,8 @@ func setDefaults() {
viper.SetDefault("gateway.openai_ws.scheduler_score_weights.ttft", 0.5)
viper.SetDefault("gateway.openai_ws.scheduler_score_weights.reset", 0.0)
viper.SetDefault("gateway.openai_ws.scheduler_score_weights.quota_headroom", 0.0)
viper.SetDefault("gateway.openai_ws.scheduler_score_weights.quota_5h", 0.4)
viper.SetDefault("gateway.openai_ws.scheduler_score_weights.quota_7d", 0.3)
// OpenAI HTTP upstream protocol strategy
viper.SetDefault("gateway.openai_http2.enabled", true)
viper.SetDefault("gateway.openai_http2.allow_proxy_fallback_to_http1", true)
Expand Down Expand Up @@ -2667,15 +2673,19 @@ func (c *Config) Validate() error {
c.Gateway.OpenAIWS.SchedulerScoreWeights.Queue < 0 ||
c.Gateway.OpenAIWS.SchedulerScoreWeights.ErrorRate < 0 ||
c.Gateway.OpenAIWS.SchedulerScoreWeights.TTFT < 0 ||
c.Gateway.OpenAIWS.SchedulerScoreWeights.QuotaHeadroom < 0 {
c.Gateway.OpenAIWS.SchedulerScoreWeights.QuotaHeadroom < 0 ||
c.Gateway.OpenAIWS.SchedulerScoreWeights.Quota5h < 0 ||
c.Gateway.OpenAIWS.SchedulerScoreWeights.Quota7d < 0 {
return fmt.Errorf("gateway.openai_ws.scheduler_score_weights.* must be non-negative")
}
weightSum := c.Gateway.OpenAIWS.SchedulerScoreWeights.Priority +
c.Gateway.OpenAIWS.SchedulerScoreWeights.Load +
c.Gateway.OpenAIWS.SchedulerScoreWeights.Queue +
c.Gateway.OpenAIWS.SchedulerScoreWeights.ErrorRate +
c.Gateway.OpenAIWS.SchedulerScoreWeights.TTFT +
c.Gateway.OpenAIWS.SchedulerScoreWeights.QuotaHeadroom
c.Gateway.OpenAIWS.SchedulerScoreWeights.QuotaHeadroom +
c.Gateway.OpenAIWS.SchedulerScoreWeights.Quota5h +
c.Gateway.OpenAIWS.SchedulerScoreWeights.Quota7d
if weightSum <= 0 {
return fmt.Errorf("gateway.openai_ws.scheduler_score_weights must not all be zero")
}
Expand Down
8 changes: 8 additions & 0 deletions backend/internal/config/config_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -119,6 +119,12 @@ func TestLoadDefaultOpenAIWSConfig(t *testing.T) {
if cfg.Gateway.OpenAIScheduler.StickyEscapeErrorRate != 0.5 {
t.Fatalf("Gateway.OpenAIScheduler.StickyEscapeErrorRate = %v, want 0.5", cfg.Gateway.OpenAIScheduler.StickyEscapeErrorRate)
}
if cfg.Gateway.OpenAIWS.SchedulerScoreWeights.Quota5h != 0.4 {
t.Fatalf("Gateway.OpenAIWS.SchedulerScoreWeights.Quota5h = %v, want 0.4", cfg.Gateway.OpenAIWS.SchedulerScoreWeights.Quota5h)
}
if cfg.Gateway.OpenAIWS.SchedulerScoreWeights.Quota7d != 0.3 {
t.Fatalf("Gateway.OpenAIWS.SchedulerScoreWeights.Quota7d = %v, want 0.3", cfg.Gateway.OpenAIWS.SchedulerScoreWeights.Quota7d)
}
if !cfg.Gateway.OpenAIWS.SessionHashReadOldFallback {
t.Fatalf("Gateway.OpenAIWS.SessionHashReadOldFallback = false, want true")
}
Expand Down Expand Up @@ -1733,6 +1739,8 @@ func TestValidateConfig_OpenAIWSRules(t *testing.T) {
c.Gateway.OpenAIWS.SchedulerScoreWeights.Queue = 0
c.Gateway.OpenAIWS.SchedulerScoreWeights.ErrorRate = 0
c.Gateway.OpenAIWS.SchedulerScoreWeights.TTFT = 0
c.Gateway.OpenAIWS.SchedulerScoreWeights.Quota5h = 0
c.Gateway.OpenAIWS.SchedulerScoreWeights.Quota7d = 0
},
wantErr: "gateway.openai_ws.scheduler_score_weights must not all be zero",
},
Expand Down
21 changes: 20 additions & 1 deletion backend/internal/service/openai_account_scheduler.go
Original file line number Diff line number Diff line change
Expand Up @@ -816,7 +816,9 @@ func (s *defaultOpenAIAccountScheduler) buildOpenAIAccountLoadPlan(
weights.ErrorRate*errorFactor +
weights.TTFT*ttftFactor +
weights.Reset*resetFactor +
weights.QuotaHeadroom*quotaHeadroomFactor
weights.QuotaHeadroom*quotaHeadroomFactor +
weights.Quota5h*openAICodexQuotaHeadroomFactor(item.account, "5h", now) +
weights.Quota7d*openAICodexQuotaHeadroomFactor(item.account, "7d", now)
}
plan.candidates = candidates

Expand Down Expand Up @@ -1478,6 +1480,8 @@ func (s *OpenAIGatewayService) openAIWSSchedulerWeights() GatewayOpenAIWSSchedul
TTFT: s.cfg.Gateway.OpenAIWS.SchedulerScoreWeights.TTFT,
Reset: s.cfg.Gateway.OpenAIWS.SchedulerScoreWeights.Reset,
QuotaHeadroom: s.cfg.Gateway.OpenAIWS.SchedulerScoreWeights.QuotaHeadroom,
Quota5h: s.cfg.Gateway.OpenAIWS.SchedulerScoreWeights.Quota5h,
Quota7d: s.cfg.Gateway.OpenAIWS.SchedulerScoreWeights.Quota7d,
}
}
return GatewayOpenAIWSSchedulerScoreWeightsView{
Expand All @@ -1488,6 +1492,8 @@ func (s *OpenAIGatewayService) openAIWSSchedulerWeights() GatewayOpenAIWSSchedul
TTFT: 0.5,
Reset: 0.0,
QuotaHeadroom: 0.0,
Quota5h: 0.4,
Quota7d: 0.3,
}
}

Expand All @@ -1500,6 +1506,8 @@ type GatewayOpenAIWSSchedulerScoreWeightsView struct {
// Reset 倾向「会话窗口最早重置」的账号;0 表示关闭(默认)。
Reset float64
QuotaHeadroom float64
Quota5h float64
Quota7d float64
}

func openAIQuotaHeadroomFactor(account *Account, now time.Time) float64 {
Expand Down Expand Up @@ -1543,6 +1551,17 @@ func openAIQuotaWindowResetAny(extra map[string]any, now time.Time, windows ...s
return false
}

func openAICodexQuotaHeadroomFactor(account *Account, window string, now time.Time) float64 {
if account == nil {
return 0.5
}
utilization, ok := resolveOpenAIQuotaUtilization(account.Extra, window, now)
if !ok {
return 0.5
}
return 1 - clamp01(utilization)
}

func clamp01(value float64) float64 {
switch {
case value < 0:
Expand Down
160 changes: 160 additions & 0 deletions backend/internal/service/openai_account_scheduler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2365,6 +2365,8 @@ func TestOpenAIGatewayService_SchedulerWrappersAndDefaults(t *testing.T) {
require.Equal(t, 0.7, defaultWeights.Queue)
require.Equal(t, 0.8, defaultWeights.ErrorRate)
require.Equal(t, 0.5, defaultWeights.TTFT)
require.Equal(t, 0.4, defaultWeights.Quota5h)
require.Equal(t, 0.3, defaultWeights.Quota7d)

cfg := &config.Config{}
cfg.Gateway.OpenAIWS.LBTopK = 9
Expand All @@ -2374,6 +2376,8 @@ func TestOpenAIGatewayService_SchedulerWrappersAndDefaults(t *testing.T) {
cfg.Gateway.OpenAIWS.SchedulerScoreWeights.Queue = 0.4
cfg.Gateway.OpenAIWS.SchedulerScoreWeights.ErrorRate = 0.5
cfg.Gateway.OpenAIWS.SchedulerScoreWeights.TTFT = 0.6
cfg.Gateway.OpenAIWS.SchedulerScoreWeights.Quota5h = 0.7
cfg.Gateway.OpenAIWS.SchedulerScoreWeights.Quota7d = 0.8
svcWithCfg := &OpenAIGatewayService{cfg: cfg}

require.Equal(t, 9, svcWithCfg.openAIWSLBTopK())
Expand All @@ -2384,6 +2388,162 @@ func TestOpenAIGatewayService_SchedulerWrappersAndDefaults(t *testing.T) {
require.Equal(t, 0.4, customWeights.Queue)
require.Equal(t, 0.5, customWeights.ErrorRate)
require.Equal(t, 0.6, customWeights.TTFT)
require.Equal(t, 0.7, customWeights.Quota5h)
require.Equal(t, 0.8, customWeights.Quota7d)
}

func TestOpenAIAccountScheduler_CodexQuotaWeightsPreferMoreHeadroom(t *testing.T) {
resetOpenAIAdvancedSchedulerSettingCacheForTest()

now := time.Now().UTC()
cfg := &config.Config{}
cfg.Gateway.OpenAIWS.LBTopK = 3
cfg.Gateway.OpenAIWS.SchedulerScoreWeights.Priority = 0
cfg.Gateway.OpenAIWS.SchedulerScoreWeights.Load = 0
cfg.Gateway.OpenAIWS.SchedulerScoreWeights.Queue = 0
cfg.Gateway.OpenAIWS.SchedulerScoreWeights.ErrorRate = 0
cfg.Gateway.OpenAIWS.SchedulerScoreWeights.TTFT = 0
cfg.Gateway.OpenAIWS.SchedulerScoreWeights.Quota5h = 1
cfg.Gateway.OpenAIWS.SchedulerScoreWeights.Quota7d = 1

highHeadroom := &Account{
ID: 9101,
Platform: PlatformOpenAI,
Type: AccountTypeOAuth,
Status: StatusActive,
Schedulable: true,
Concurrency: 1,
Priority: 0,
Extra: map[string]any{
"codex_5h_used_percent": 10.0,
"codex_7d_used_percent": 20.0,
"codex_5h_reset_at": now.Add(2 * time.Hour).Format(time.RFC3339),
"codex_7d_reset_at": now.Add(48 * time.Hour).Format(time.RFC3339),
"codex_usage_updated_at": now.Format(time.RFC3339),
},
}
lowHeadroom := &Account{
ID: 9102,
Platform: PlatformOpenAI,
Type: AccountTypeOAuth,
Status: StatusActive,
Schedulable: true,
Concurrency: 1,
Priority: 0,
Extra: map[string]any{
"codex_5h_used_percent": 70.0,
"codex_7d_used_percent": 90.0,
"codex_5h_reset_at": now.Add(2 * time.Hour).Format(time.RFC3339),
"codex_7d_reset_at": now.Add(48 * time.Hour).Format(time.RFC3339),
"codex_usage_updated_at": now.Format(time.RFC3339),
},
}
noSignal := &Account{
ID: 9103,
Platform: PlatformOpenAI,
Type: AccountTypeOAuth,
Status: StatusActive,
Schedulable: true,
Concurrency: 1,
Priority: 0,
}

svc := &OpenAIGatewayService{cfg: cfg}
scheduler := newDefaultOpenAIAccountScheduler(svc, nil).(*defaultOpenAIAccountScheduler)
plan := scheduler.buildOpenAIAccountLoadPlan(
OpenAIAccountScheduleRequest{RequestedModel: "gpt-5.1"},
[]*Account{lowHeadroom, noSignal, highHeadroom},
map[int64]*AccountLoadInfo{},
)

scores := map[int64]float64{}
for _, candidate := range plan.candidates {
scores[candidate.account.ID] = candidate.score
}

require.Greater(t, scores[9101], scores[9103], "fresh high headroom should outrank neutral no-signal accounts")
require.Greater(t, scores[9103], scores[9102], "neutral no-signal accounts should outrank low-headroom accounts")
require.InDelta(t, 1.7, scores[9101], 1e-9)
require.InDelta(t, 1.0, scores[9103], 1e-9)
require.InDelta(t, 0.4, scores[9102], 1e-9)
}

func TestOpenAIAccountScheduler_CodexQuotaWeightsTreatStaleOrResetWindowsAsNeutral(t *testing.T) {
resetOpenAIAdvancedSchedulerSettingCacheForTest()

now := time.Now().UTC()
cfg := &config.Config{}
cfg.Gateway.OpenAIWS.LBTopK = 3
cfg.Gateway.OpenAIWS.SchedulerScoreWeights.Priority = 0
cfg.Gateway.OpenAIWS.SchedulerScoreWeights.Load = 0
cfg.Gateway.OpenAIWS.SchedulerScoreWeights.Queue = 0
cfg.Gateway.OpenAIWS.SchedulerScoreWeights.ErrorRate = 0
cfg.Gateway.OpenAIWS.SchedulerScoreWeights.TTFT = 0
cfg.Gateway.OpenAIWS.SchedulerScoreWeights.Quota5h = 1
cfg.Gateway.OpenAIWS.SchedulerScoreWeights.Quota7d = 1

freshLowHeadroom := &Account{
ID: 9201,
Platform: PlatformOpenAI,
Type: AccountTypeOAuth,
Status: StatusActive,
Schedulable: true,
Concurrency: 1,
Extra: map[string]any{
"codex_5h_used_percent": 90.0,
"codex_7d_used_percent": 90.0,
"codex_5h_reset_at": now.Add(2 * time.Hour).Format(time.RFC3339),
"codex_7d_reset_at": now.Add(48 * time.Hour).Format(time.RFC3339),
"codex_usage_updated_at": now.Format(time.RFC3339),
},
}
resetWindow := &Account{
ID: 9202,
Platform: PlatformOpenAI,
Type: AccountTypeOAuth,
Status: StatusActive,
Schedulable: true,
Concurrency: 1,
Extra: map[string]any{
"codex_5h_used_percent": 90.0,
"codex_7d_used_percent": 90.0,
"codex_5h_reset_at": now.Add(-1 * time.Minute).Format(time.RFC3339),
"codex_7d_reset_at": now.Add(-1 * time.Minute).Format(time.RFC3339),
"codex_usage_updated_at": now.Format(time.RFC3339),
},
}
staleSnapshot := &Account{
ID: 9203,
Platform: PlatformOpenAI,
Type: AccountTypeOAuth,
Status: StatusActive,
Schedulable: true,
Concurrency: 1,
Extra: map[string]any{
"codex_5h_used_percent": 90.0,
"codex_7d_used_percent": 90.0,
"codex_5h_reset_at": now.Add(2 * time.Hour).Format(time.RFC3339),
"codex_7d_reset_at": now.Add(48 * time.Hour).Format(time.RFC3339),
"codex_usage_updated_at": now.Add(-3 * time.Hour).Format(time.RFC3339),
},
}

svc := &OpenAIGatewayService{cfg: cfg}
scheduler := newDefaultOpenAIAccountScheduler(svc, nil).(*defaultOpenAIAccountScheduler)
plan := scheduler.buildOpenAIAccountLoadPlan(
OpenAIAccountScheduleRequest{RequestedModel: "gpt-5.1"},
[]*Account{freshLowHeadroom, resetWindow, staleSnapshot},
map[int64]*AccountLoadInfo{},
)

scores := map[int64]float64{}
for _, candidate := range plan.candidates {
scores[candidate.account.ID] = candidate.score
}

require.InDelta(t, 0.2, scores[9201], 1e-9)
require.InDelta(t, 1.0, scores[9202], 1e-9)
require.InDelta(t, 1.0, scores[9203], 1e-9)
}

func TestDefaultOpenAIAccountScheduler_IsAccountTransportCompatible_Branches(t *testing.T) {
Expand Down
6 changes: 6 additions & 0 deletions deploy/config.example.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -325,6 +325,12 @@ gateway:
reset: 0.0
# 倾向 7d 剩余额度更健康的账号;0 表示关闭(默认,不改变原有行为),小流量灰度可设为 0.3。
quota_headroom: 0.0
# Codex 5h quota headroom weight. Lower 5h usage gets a higher scheduler score.
# Set to 0 to disable the short-window quota signal.
quota_5h: 0.4
# Codex 7d quota headroom weight. Lower 7d usage gets a higher scheduler score.
# Set to 0 to disable the weekly quota signal.
quota_7d: 0.3
# OpenAI 高级调度器补充配置
openai_scheduler:
# 是否允许 session_hash sticky 在账号健康度恶化时临时逃逸;false 可一键回退旧行为
Expand Down
Loading