Skip to content

Commit a560d2f

Browse files
authored
Merge pull request #3363 from kangjwme/feat/prefer-soonest-reset-scheduling
feat(scheduling): opt-in "prefer soonest reset" account selection
2 parents d2ff9c0 + 510adf7 commit a560d2f

6 files changed

Lines changed: 261 additions & 4 deletions

File tree

backend/internal/config/config.go

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -957,6 +957,9 @@ type GatewayOpenAIWSSchedulerScoreWeights struct {
957957
Queue float64 `mapstructure:"queue"`
958958
ErrorRate float64 `mapstructure:"error_rate"`
959959
TTFT float64 `mapstructure:"ttft"`
960+
// Reset 倾向「会话窗口最早重置」的账号(use-it-or-lose-it)。
961+
// >0 时,剩余重置时间越短的账号得分越高,从而被优先用尽。默认 0(关闭,不改变原有行为)。
962+
Reset float64 `mapstructure:"reset"`
960963
}
961964

962965
// GatewayOpenAISchedulerConfig OpenAI 高级调度器配置。
@@ -1055,6 +1058,11 @@ type GatewaySchedulingConfig struct {
10551058
// 兜底层账户选择策略: "last_used"(按最后使用时间排序,默认) 或 "random"(随机)
10561059
FallbackSelectionMode string `mapstructure:"fallback_selection_mode"`
10571060

1061+
// PreferSoonestReset 开启后,负载感知选择会优先选用「会话窗口最早重置」的账号
1062+
// (use-it-or-lose-it:先用尽即将重置的账号,保留重置时间还很久的账号)。
1063+
// 默认 false,保持原有「优先级 → 负载率 → LRU」行为不变。
1064+
PreferSoonestReset bool `mapstructure:"prefer_soonest_reset"`
1065+
10581066
// 负载计算
10591067
LoadBatchEnabled bool `mapstructure:"load_batch_enabled"`
10601068
LoadBatchCacheTTLMS int `mapstructure:"load_batch_cache_ttl_ms"`
@@ -1870,6 +1878,7 @@ func setDefaults() {
18701878
viper.SetDefault("gateway.openai_ws.scheduler_score_weights.queue", 0.7)
18711879
viper.SetDefault("gateway.openai_ws.scheduler_score_weights.error_rate", 0.8)
18721880
viper.SetDefault("gateway.openai_ws.scheduler_score_weights.ttft", 0.5)
1881+
viper.SetDefault("gateway.openai_ws.scheduler_score_weights.reset", 0.0)
18731882
// OpenAI HTTP upstream protocol strategy
18741883
viper.SetDefault("gateway.openai_http2.enabled", true)
18751884
viper.SetDefault("gateway.openai_http2.allow_proxy_fallback_to_http1", true)
@@ -1906,6 +1915,7 @@ func setDefaults() {
19061915
viper.SetDefault("gateway.scheduling.fallback_wait_timeout", 30*time.Second)
19071916
viper.SetDefault("gateway.scheduling.fallback_max_waiting", 100)
19081917
viper.SetDefault("gateway.scheduling.fallback_selection_mode", "last_used")
1918+
viper.SetDefault("gateway.scheduling.prefer_soonest_reset", false)
19091919
viper.SetDefault("gateway.scheduling.load_batch_enabled", true)
19101920
viper.SetDefault("gateway.scheduling.load_batch_cache_ttl_ms", 200)
19111921
viper.SetDefault("gateway.scheduling.snapshot_mget_chunk_size", 128)

backend/internal/service/gateway_service.go

Lines changed: 40 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -2218,13 +2218,17 @@ func (s *GatewayService) SelectAccountWithLoadAwareness(ctx context.Context, gro
22182218
}
22192219
}
22202220

2221-
// 分层过滤选择:优先级 → 负载率 → LRU
2221+
// 分层过滤选择:优先级 →(可选)最早重置 → 负载率 → LRU
22222222
for len(available) > 0 {
22232223
// 1. 取优先级最小的集合
22242224
candidates := filterByMinPriority(available)
2225-
// 2. 取负载率最低的集合
2225+
// 2. (可选)use-it-or-lose-it:优先选用会话窗口最早重置的账号
2226+
if cfg.PreferSoonestReset {
2227+
candidates = filterBySoonestReset(candidates)
2228+
}
2229+
// 3. 取负载率最低的集合
22262230
candidates = filterByMinLoadRate(candidates)
2227-
// 3. LRU 选择最久未用的账号
2231+
// 4. LRU 选择最久未用的账号
22282232
selected := selectByLRU(candidates, preferOAuth)
22292233
if selected == nil {
22302234
break
@@ -2983,6 +2987,39 @@ func filterByMinLoadRate(accounts []accountWithLoad) []accountWithLoad {
29832987
return result
29842988
}
29852989

2990+
// filterBySoonestReset 过滤出「会话窗口最早重置」的账号集合(use-it-or-lose-it)。
2991+
// 仅保留拥有未来重置时间(SessionWindowEnd 在当前时间之后)且最早的账号;
2992+
// 窗口为空或已过期的账号视为无活跃窗口、优先级最低。
2993+
// 当所有账号都没有活跃窗口时,返回原集合(不改变后续 LRU 选择)。
2994+
func filterBySoonestReset(accounts []accountWithLoad) []accountWithLoad {
2995+
if len(accounts) <= 1 {
2996+
return accounts
2997+
}
2998+
now := time.Now()
2999+
var minEnd *time.Time
3000+
for _, acc := range accounts {
3001+
end := acc.account.SessionWindowEnd
3002+
if end == nil || !now.Before(*end) {
3003+
continue
3004+
}
3005+
if minEnd == nil || end.Before(*minEnd) {
3006+
minEnd = end
3007+
}
3008+
}
3009+
if minEnd == nil {
3010+
// 没有任何账号拥有活跃窗口,保持原集合
3011+
return accounts
3012+
}
3013+
result := make([]accountWithLoad, 0, len(accounts))
3014+
for _, acc := range accounts {
3015+
end := acc.account.SessionWindowEnd
3016+
if end != nil && now.Before(*end) && end.Equal(*minEnd) {
3017+
result = append(result, acc)
3018+
}
3019+
}
3020+
return result
3021+
}
3022+
29863023
// selectByLRU 从集合中选择最久未用的账号
29873024
// 如果有多个账号具有相同的最小 LastUsedAt,则随机选择一个
29883025
func selectByLRU(accounts []accountWithLoad, preferOAuth bool) *accountWithLoad {
Lines changed: 81 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,81 @@
1+
//go:build unit
2+
3+
package service
4+
5+
import (
6+
"testing"
7+
"time"
8+
9+
"github.com/stretchr/testify/require"
10+
)
11+
12+
func accWithWindowEnd(id int64, end *time.Time) accountWithLoad {
13+
return accountWithLoad{
14+
account: &Account{
15+
ID: id,
16+
Schedulable: true,
17+
Status: StatusActive,
18+
SessionWindowEnd: end,
19+
},
20+
loadInfo: &AccountLoadInfo{AccountID: id},
21+
}
22+
}
23+
24+
func TestFilterBySoonestReset_PicksSoonestFutureWindow(t *testing.T) {
25+
now := time.Now()
26+
soon := now.Add(1 * time.Hour)
27+
later := now.Add(24 * time.Hour)
28+
accounts := []accountWithLoad{
29+
accWithWindowEnd(1, testTimePtr(later)),
30+
accWithWindowEnd(2, testTimePtr(soon)),
31+
accWithWindowEnd(3, testTimePtr(later)),
32+
}
33+
got := filterBySoonestReset(accounts)
34+
require.Len(t, got, 1)
35+
require.Equal(t, int64(2), got[0].account.ID, "重置时间最早的账号被选中")
36+
}
37+
38+
func TestFilterBySoonestReset_IgnoresNilAndExpiredWindows(t *testing.T) {
39+
now := time.Now()
40+
expired := now.Add(-1 * time.Hour)
41+
active := now.Add(2 * time.Hour)
42+
accounts := []accountWithLoad{
43+
accWithWindowEnd(1, nil), // 无活跃窗口
44+
accWithWindowEnd(2, testTimePtr(expired)), // 已过期,视为无活跃窗口
45+
accWithWindowEnd(3, testTimePtr(active)), // 唯一活跃窗口
46+
}
47+
got := filterBySoonestReset(accounts)
48+
require.Len(t, got, 1)
49+
require.Equal(t, int64(3), got[0].account.ID, "仅保留拥有未来重置时间的账号")
50+
}
51+
52+
func TestFilterBySoonestReset_NoActiveWindowReturnsAll(t *testing.T) {
53+
now := time.Now()
54+
expired := now.Add(-30 * time.Minute)
55+
accounts := []accountWithLoad{
56+
accWithWindowEnd(1, nil),
57+
accWithWindowEnd(2, testTimePtr(expired)),
58+
}
59+
got := filterBySoonestReset(accounts)
60+
require.Len(t, got, 2, "没有任何账号拥有活跃窗口时,返回原集合不做过滤")
61+
}
62+
63+
func TestFilterBySoonestReset_TiedSoonestKeepsAll(t *testing.T) {
64+
now := time.Now()
65+
end := now.Add(90 * time.Minute)
66+
accounts := []accountWithLoad{
67+
accWithWindowEnd(1, testTimePtr(end)),
68+
accWithWindowEnd(2, testTimePtr(end)),
69+
accWithWindowEnd(3, testTimePtr(now.Add(5*time.Hour))),
70+
}
71+
got := filterBySoonestReset(accounts)
72+
require.Len(t, got, 2, "并列最早重置的账号都保留,交由后续 LRU 决定")
73+
ids := map[int64]bool{got[0].account.ID: true, got[1].account.ID: true}
74+
require.True(t, ids[1] && ids[2])
75+
}
76+
77+
func TestFilterBySoonestReset_SingleOrEmptyUnchanged(t *testing.T) {
78+
require.Empty(t, filterBySoonestReset(nil))
79+
single := []accountWithLoad{accWithWindowEnd(1, nil)}
80+
require.Len(t, filterBySoonestReset(single), 1)
81+
}

backend/internal/service/openai_account_scheduler.go

Lines changed: 46 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -745,6 +745,35 @@ func (s *defaultOpenAIAccountScheduler) buildOpenAIAccountLoadPlan(
745745
plan.loadSkew = calcLoadSkewByMoments(loadRateSum, loadRateSumSquares, len(candidates))
746746

747747
weights := s.service.openAIWSSchedulerWeights()
748+
749+
// Reset 因子(use-it-or-lose-it):在拥有「未来会话窗口结束时间」的账号中,
750+
// 剩余时间越短 → 因子越接近 1(越早重置越优先用尽)。无活跃窗口的账号因子为 0。
751+
// 仅在 weights.Reset > 0 时计算,默认关闭不影响原有行为。
752+
minResetRemaining, maxResetRemaining := 0.0, 0.0
753+
hasResetSample := false
754+
if weights.Reset > 0 {
755+
now := time.Now()
756+
for _, candidate := range candidates {
757+
end := candidate.account.SessionWindowEnd
758+
if end == nil || !now.Before(*end) {
759+
continue
760+
}
761+
remaining := end.Sub(now).Seconds()
762+
if !hasResetSample {
763+
minResetRemaining, maxResetRemaining = remaining, remaining
764+
hasResetSample = true
765+
continue
766+
}
767+
if remaining < minResetRemaining {
768+
minResetRemaining = remaining
769+
}
770+
if remaining > maxResetRemaining {
771+
maxResetRemaining = remaining
772+
}
773+
}
774+
}
775+
776+
now := time.Now()
748777
for i := range candidates {
749778
item := &candidates[i]
750779
priorityFactor := 1.0
@@ -758,12 +787,24 @@ func (s *defaultOpenAIAccountScheduler) buildOpenAIAccountLoadPlan(
758787
if item.hasTTFT && hasTTFTSample && maxTTFT > minTTFT {
759788
ttftFactor = 1 - clamp01((item.ttft-minTTFT)/(maxTTFT-minTTFT))
760789
}
790+
resetFactor := 0.0
791+
if weights.Reset > 0 && hasResetSample {
792+
if end := item.account.SessionWindowEnd; end != nil && now.Before(*end) {
793+
if maxResetRemaining > minResetRemaining {
794+
resetFactor = 1 - clamp01((end.Sub(now).Seconds()-minResetRemaining)/(maxResetRemaining-minResetRemaining))
795+
} else {
796+
// 所有有窗口的账号剩余时间相同:一律给满分,让其优于无窗口账号。
797+
resetFactor = 1
798+
}
799+
}
800+
}
761801

762802
item.score = weights.Priority*priorityFactor +
763803
weights.Load*loadFactor +
764804
weights.Queue*queueFactor +
765805
weights.ErrorRate*errorFactor +
766-
weights.TTFT*ttftFactor
806+
weights.TTFT*ttftFactor +
807+
weights.Reset*resetFactor
767808
}
768809
plan.candidates = candidates
769810

@@ -1415,6 +1456,7 @@ func (s *OpenAIGatewayService) openAIWSSchedulerWeights() GatewayOpenAIWSSchedul
14151456
Queue: s.cfg.Gateway.OpenAIWS.SchedulerScoreWeights.Queue,
14161457
ErrorRate: s.cfg.Gateway.OpenAIWS.SchedulerScoreWeights.ErrorRate,
14171458
TTFT: s.cfg.Gateway.OpenAIWS.SchedulerScoreWeights.TTFT,
1459+
Reset: s.cfg.Gateway.OpenAIWS.SchedulerScoreWeights.Reset,
14181460
}
14191461
}
14201462
return GatewayOpenAIWSSchedulerScoreWeightsView{
@@ -1423,6 +1465,7 @@ func (s *OpenAIGatewayService) openAIWSSchedulerWeights() GatewayOpenAIWSSchedul
14231465
Queue: 0.7,
14241466
ErrorRate: 0.8,
14251467
TTFT: 0.5,
1468+
Reset: 0.0,
14261469
}
14271470
}
14281471

@@ -1432,6 +1475,8 @@ type GatewayOpenAIWSSchedulerScoreWeightsView struct {
14321475
Queue float64
14331476
ErrorRate float64
14341477
TTFT float64
1478+
// Reset 倾向「会话窗口最早重置」的账号;0 表示关闭(默认)。
1479+
Reset float64
14351480
}
14361481

14371482
func clamp01(value float64) float64 {
Lines changed: 77 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,77 @@
1+
package service
2+
3+
import (
4+
"testing"
5+
"time"
6+
7+
"github.com/Wei-Shaw/sub2api/internal/config"
8+
"github.com/stretchr/testify/require"
9+
)
10+
11+
func openAIResetTestScheduler(reset float64) *defaultOpenAIAccountScheduler {
12+
cfg := &config.Config{}
13+
cfg.Gateway.OpenAIWS.SchedulerScoreWeights = config.GatewayOpenAIWSSchedulerScoreWeights{
14+
Priority: 1.0,
15+
Load: 1.0,
16+
Queue: 0.7,
17+
ErrorRate: 0.8,
18+
TTFT: 0.5,
19+
Reset: reset,
20+
}
21+
return &defaultOpenAIAccountScheduler{service: &OpenAIGatewayService{cfg: cfg}}
22+
}
23+
24+
func openAIPlanScores(plan openAIAccountLoadPlan) map[int64]float64 {
25+
scores := make(map[int64]float64, len(plan.candidates))
26+
for _, c := range plan.candidates {
27+
scores[c.account.ID] = c.score
28+
}
29+
return scores
30+
}
31+
32+
// Reset 权重 > 0 时,会话窗口最早重置的账号应获得更高分。
33+
func TestBuildOpenAIAccountLoadPlan_ResetWeightPrefersSoonestReset(t *testing.T) {
34+
now := time.Now()
35+
soon := now.Add(1 * time.Hour)
36+
later := now.Add(20 * time.Hour)
37+
filtered := []*Account{
38+
{ID: 1, Priority: 0, SessionWindowEnd: &later},
39+
{ID: 2, Priority: 0, SessionWindowEnd: &soon},
40+
}
41+
sched := openAIResetTestScheduler(5.0)
42+
43+
plan := sched.buildOpenAIAccountLoadPlan(OpenAIAccountScheduleRequest{}, filtered, map[int64]*AccountLoadInfo{})
44+
scores := openAIPlanScores(plan)
45+
require.Greater(t, scores[2], scores[1], "重置时间最早的账号(ID=2)得分更高")
46+
}
47+
48+
// Reset 权重为 0(默认)时,窗口重置时间不应影响打分,保持原有行为。
49+
func TestBuildOpenAIAccountLoadPlan_ResetWeightZeroNoEffect(t *testing.T) {
50+
now := time.Now()
51+
soon := now.Add(1 * time.Hour)
52+
later := now.Add(20 * time.Hour)
53+
filtered := []*Account{
54+
{ID: 1, Priority: 0, SessionWindowEnd: &later},
55+
{ID: 2, Priority: 0, SessionWindowEnd: &soon},
56+
}
57+
sched := openAIResetTestScheduler(0.0)
58+
59+
plan := sched.buildOpenAIAccountLoadPlan(OpenAIAccountScheduleRequest{}, filtered, map[int64]*AccountLoadInfo{})
60+
scores := openAIPlanScores(plan)
61+
require.Equal(t, scores[1], scores[2], "Reset 权重为 0 时两账号得分相同")
62+
}
63+
64+
// 无活跃窗口的账号 reset 因子为 0,应低于拥有未来窗口的账号。
65+
func TestBuildOpenAIAccountLoadPlan_ResetWeightIgnoresNilWindow(t *testing.T) {
66+
now := time.Now()
67+
soon := now.Add(2 * time.Hour)
68+
filtered := []*Account{
69+
{ID: 1, Priority: 0, SessionWindowEnd: nil},
70+
{ID: 2, Priority: 0, SessionWindowEnd: &soon},
71+
}
72+
sched := openAIResetTestScheduler(5.0)
73+
74+
plan := sched.buildOpenAIAccountLoadPlan(OpenAIAccountScheduleRequest{}, filtered, map[int64]*AccountLoadInfo{})
75+
scores := openAIPlanScores(plan)
76+
require.Greater(t, scores[2], scores[1], "拥有活跃窗口的账号得分高于无窗口账号")
77+
}

deploy/config.example.yaml

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -320,6 +320,9 @@ gateway:
320320
queue: 0.7
321321
error_rate: 0.8
322322
ttft: 0.5
323+
# use-it-or-lose-it:倾向「会话窗口最早重置」的账号,剩余重置时间越短得分越高。
324+
# 0 表示关闭(默认,不改变原有行为);调大可让即将重置的账号被优先用尽。
325+
reset: 0.0
323326
# OpenAI 高级调度器补充配置
324327
openai_scheduler:
325328
# 是否允许 session_hash sticky 在账号健康度恶化时临时逃逸;false 可一键回退旧行为
@@ -421,6 +424,10 @@ gateway:
421424
# Fallback max waiting queue size
422425
# 兜底最大排队长度
423426
fallback_max_waiting: 100
427+
# Prefer the account whose session window resets soonest (use-it-or-lose-it).
428+
# 负载感知选择时优先用尽「会话窗口最早重置」的账号;false 保持
429+
# 原有「优先级 → 负载率 → LRU」行为(默认)。
430+
prefer_soonest_reset: false
424431
# Enable batch load calculation for scheduling
425432
# 启用调度批量负载计算
426433
load_batch_enabled: true

0 commit comments

Comments
 (0)