Skip to content

Commit 6056430

Browse files
committed
feat(scheduler): prefer accounts nearing 7d reset
Prioritize paid accounts whose 7d quota reset is within the urgency window while preserving remaining-quota safeguards. Fixes #122
1 parent e8342cc commit 6056430

10 files changed

Lines changed: 225 additions & 67 deletions

File tree

admin/handler.go

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -450,6 +450,7 @@ type schedulerBreakdownResponse struct {
450450
SuccessBonus float64 `json:"success_bonus"`
451451
UsagePenalty7d float64 `json:"usage_penalty_7d"`
452452
UsageUrgencyBonus5h float64 `json:"usage_urgency_bonus_5h"`
453+
UsageUrgencyBonus7d float64 `json:"usage_urgency_bonus_7d"`
453454
LatencyPenalty float64 `json:"latency_penalty"`
454455
SuccessRatePenalty float64 `json:"success_rate_penalty"`
455456
}
@@ -538,6 +539,7 @@ func (h *Handler) ListAccounts(c *gin.Context) {
538539
SuccessBonus: debug.Breakdown.SuccessBonus,
539540
UsagePenalty7d: debug.Breakdown.UsagePenalty7d,
540541
UsageUrgencyBonus5h: debug.Breakdown.UsageUrgencyBonus5h,
542+
UsageUrgencyBonus7d: debug.Breakdown.UsageUrgencyBonus7d,
541543
LatencyPenalty: debug.Breakdown.LatencyPenalty,
542544
SuccessRatePenalty: debug.Breakdown.SuccessRatePenalty,
543545
}

admin/responses_test.go

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -436,6 +436,7 @@ func TestSchedulerBreakdownResponse(t *testing.T) {
436436
SuccessBonus: 4.0,
437437
UsagePenalty7d: 8.0,
438438
UsageUrgencyBonus5h: 6.0,
439+
UsageUrgencyBonus7d: 7.0,
439440
LatencyPenalty: 2.5,
440441
SuccessRatePenalty: 1.5,
441442
}
@@ -449,6 +450,9 @@ func TestSchedulerBreakdownResponse(t *testing.T) {
449450
if resp.UsageUrgencyBonus5h != 6.0 {
450451
t.Errorf("UsageUrgencyBonus5h = %v, want 6.0", resp.UsageUrgencyBonus5h)
451452
}
453+
if resp.UsageUrgencyBonus7d != 7.0 {
454+
t.Errorf("UsageUrgencyBonus7d = %v, want 7.0", resp.UsageUrgencyBonus7d)
455+
}
452456
}
453457

454458
// ==================== Usage Logs Response Tests ====================

auth/fast_scheduler.go

Lines changed: 16 additions & 59 deletions
Original file line numberDiff line numberDiff line change
@@ -30,17 +30,14 @@ type fastSchedulerPosition struct {
3030
// 它不在请求热路径内重算全量 score,而是直接复用 Account 上已缓存的
3131
// HealthTier / DispatchScore / DynamicConcurrencyLimit。
3232
//
33-
// 调度策略:两阶段扫描
34-
// 1. 优先在验证过的账号(TotalRequests > 10,排在桶前部)中 round-robin
35-
// 2. 验证账号全忙时,回退到全量 round-robin
33+
// 调度策略:按健康层级分桶,桶内按调度分排序后 round-robin。
34+
// 验证过的账号只作为同分 tie-breaker,避免历史请求量盖过额度快重置优先级。
3635
type FastScheduler struct {
37-
mu sync.RWMutex
38-
baseLimit int64
39-
buckets map[AccountHealthTier][]fastSchedulerEntry
40-
positions map[int64]fastSchedulerPosition
41-
cursors [3]atomic.Uint64
42-
provenBounds [3]int // 每个 tier 桶中验证过的账号数量(排在前面)
43-
provenCurs [3]atomic.Uint64 // 验证账号专用 round-robin 游标
36+
mu sync.RWMutex
37+
baseLimit int64
38+
buckets map[AccountHealthTier][]fastSchedulerEntry
39+
positions map[int64]fastSchedulerPosition
40+
cursors [3]atomic.Uint64
4441
}
4542

4643
func NewFastScheduler(baseLimit int64) *FastScheduler {
@@ -112,24 +109,22 @@ func (s *FastScheduler) Rebuild(accounts []*Account) {
112109
}
113110

114111
// 每个桶只排序一次 + 重建位置索引 + 计算验证账号边界
115-
for tierIdx, tier := range fastSchedulerTierOrder {
112+
for _, tier := range fastSchedulerTierOrder {
116113
entries := s.buckets[tier]
117114
if len(entries) == 0 {
118-
s.provenBounds[tierIdx] = 0
119115
continue
120116
}
121117
sort.SliceStable(entries, func(i, j int) bool {
122-
if entries[i].proven != entries[j].proven {
123-
return entries[i].proven
124-
}
125118
if entries[i].dispatchScore == entries[j].dispatchScore {
119+
if entries[i].proven != entries[j].proven {
120+
return entries[i].proven
121+
}
126122
return entries[i].dbID < entries[j].dbID
127123
}
128124
return entries[i].dispatchScore > entries[j].dispatchScore
129125
})
130126
s.buckets[tier] = entries
131127
s.rebuildPositionsLocked(tier)
132-
s.provenBounds[tierIdx] = countProvenEntries(entries)
133128
}
134129
}
135130

@@ -172,7 +167,6 @@ func (s *FastScheduler) Acquire() *Account {
172167
}
173168

174169
// AcquireExcluding 获取下一个可用账号,排除指定的账号 ID 集合
175-
// 两阶段调度:优先在验证过的账号中选取,全忙时回退到全量扫描
176170
func (s *FastScheduler) AcquireExcluding(apiKeyID int64, exclude map[int64]bool) *Account {
177171
return s.AcquireExcludingWithFilter(apiKeyID, exclude, nil)
178172
}
@@ -197,20 +191,6 @@ func (s *FastScheduler) AcquireExcludingWithFilter(apiKeyID int64, exclude map[i
197191
continue
198192
}
199193

200-
// 阶段 1:优先在验证过的账号(桶前部 provenBound 个)中 round-robin
201-
provenBound := s.provenBounds[tierIdx]
202-
if provenBound > 0 {
203-
acc, stale := s.scanRangeLocked(tier, 0, provenBound, &s.provenCurs[tierIdx], baseLimit, now, apiKeyID, exclude, filter)
204-
if acc != nil {
205-
return acc
206-
}
207-
if stale {
208-
changed = true
209-
break
210-
}
211-
}
212-
213-
// 阶段 2:回退到全量 round-robin
214194
acc, stale := s.scanRangeLocked(tier, 0, len(bucket), &s.cursors[tierIdx], baseLimit, now, apiKeyID, exclude, filter)
215195
if acc != nil {
216196
return acc
@@ -309,23 +289,16 @@ func (s *FastScheduler) insertLocked(acc *Account, now time.Time) {
309289
proven: proven,
310290
})
311291
sort.SliceStable(entries, func(i, j int) bool {
312-
if entries[i].proven != entries[j].proven {
313-
return entries[i].proven
314-
}
315292
if entries[i].dispatchScore == entries[j].dispatchScore {
293+
if entries[i].proven != entries[j].proven {
294+
return entries[i].proven
295+
}
316296
return entries[i].dbID < entries[j].dbID
317297
}
318298
return entries[i].dispatchScore > entries[j].dispatchScore
319299
})
320300
s.buckets[tier] = entries
321301
s.rebuildPositionsLocked(tier)
322-
// 更新该 tier 的验证账号边界
323-
for tierIdx, t := range fastSchedulerTierOrder {
324-
if t == tier {
325-
s.provenBounds[tierIdx] = countProvenEntries(entries)
326-
break
327-
}
328-
}
329302
}
330303

331304
func (s *FastScheduler) removeLocked(dbID int64) {
@@ -345,23 +318,6 @@ func (s *FastScheduler) removeLocked(dbID int64) {
345318
s.buckets[pos.tier] = entries
346319
delete(s.positions, dbID)
347320
s.rebuildPositionsLocked(pos.tier)
348-
// 更新该 tier 的验证账号边界
349-
for tierIdx, t := range fastSchedulerTierOrder {
350-
if t == pos.tier {
351-
s.provenBounds[tierIdx] = countProvenEntries(entries)
352-
break
353-
}
354-
}
355-
}
356-
357-
// countProvenEntries 统计桶中验证过的账号数量(TotalRequests > 10,排在前面)
358-
func countProvenEntries(entries []fastSchedulerEntry) int {
359-
for i, e := range entries {
360-
if !e.proven {
361-
return i
362-
}
363-
}
364-
return len(entries) // 全部都是验证过的
365321
}
366322

367323
func (s *FastScheduler) rebuildPositionsLocked(tier AccountHealthTier) {
@@ -377,7 +333,8 @@ func (a *Account) fastSchedulerSnapshot(baseLimit int64, now time.Time) (Account
377333
a.mu.Lock()
378334
defer a.mu.Unlock()
379335

380-
if isPremium5hPlan(a.PlanType) && a.UsagePercent5hValid {
336+
if (isPremium5hPlan(a.PlanType) && a.UsagePercent5hValid) ||
337+
(IsPlusOrHigherPlan(a.PlanType) && a.UsagePercent7dValid) {
381338
a.recomputeSchedulerLocked(baseLimit)
382339
}
383340

auth/fast_scheduler_test.go

Lines changed: 55 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -362,15 +362,36 @@ func TestBuildFastSchedulerFromStore(t *testing.T) {
362362
}
363363
}
364364

365-
func TestFastSchedulerProvenPhaseUsesTotalRequestsOnly(t *testing.T) {
366-
premium := newFastSchedulerTestAccount(1, HealthTierHealthy, 150, 1)
367-
atomic.StoreInt64(&premium.TotalRequests, 0)
365+
func TestFastSchedulerDispatchScoreOutranksProvenHistory(t *testing.T) {
366+
highScore := newFastSchedulerTestAccount(1, HealthTierHealthy, 150, 1)
367+
atomic.StoreInt64(&highScore.TotalRequests, 0)
368368

369369
proven := newFastSchedulerTestAccount(2, HealthTierHealthy, 100, 1)
370370
atomic.StoreInt64(&proven.TotalRequests, 11)
371371

372372
scheduler := NewFastScheduler(1)
373-
scheduler.Rebuild([]*Account{premium, proven})
373+
scheduler.Rebuild([]*Account{highScore, proven})
374+
375+
got := scheduler.Acquire()
376+
if got == nil {
377+
t.Fatal("Acquire() returned nil")
378+
}
379+
defer scheduler.Release(got)
380+
381+
if got.DBID != highScore.DBID {
382+
t.Fatalf("Acquire() picked dbID=%d, want high-score account %d", got.DBID, highScore.DBID)
383+
}
384+
}
385+
386+
func TestFastSchedulerProvenHistoryBreaksDispatchScoreTies(t *testing.T) {
387+
unproven := newFastSchedulerTestAccount(1, HealthTierHealthy, 100, 1)
388+
atomic.StoreInt64(&unproven.TotalRequests, 0)
389+
390+
proven := newFastSchedulerTestAccount(2, HealthTierHealthy, 100, 1)
391+
atomic.StoreInt64(&proven.TotalRequests, 11)
392+
393+
scheduler := NewFastScheduler(1)
394+
scheduler.Rebuild([]*Account{unproven, proven})
374395

375396
got := scheduler.Acquire()
376397
if got == nil {
@@ -379,7 +400,36 @@ func TestFastSchedulerProvenPhaseUsesTotalRequestsOnly(t *testing.T) {
379400
defer scheduler.Release(got)
380401

381402
if got.DBID != proven.DBID {
382-
t.Fatalf("Acquire() picked dbID=%d, want proven account %d", got.DBID, proven.DBID)
403+
t.Fatalf("Acquire() picked dbID=%d, want proven tie-breaker account %d", got.DBID, proven.DBID)
404+
}
405+
}
406+
407+
func TestFastSchedulerPrefersPremium7dResetSoonOverProvenAccount(t *testing.T) {
408+
now := time.Now()
409+
later := newFastSchedulerTestAccount(1, HealthTierHealthy, 150, 1)
410+
later.PlanType = "plus"
411+
later.UsagePercent7d = 68
412+
later.UsagePercent7dValid = true
413+
later.Reset7dAt = now.Add(5 * 24 * time.Hour)
414+
atomic.StoreInt64(&later.TotalRequests, 450)
415+
416+
soon := newFastSchedulerTestAccount(2, HealthTierHealthy, 150, 1)
417+
soon.PlanType = "plus"
418+
soon.UsagePercent7d = 63
419+
soon.UsagePercent7dValid = true
420+
soon.Reset7dAt = now.Add(36 * time.Hour)
421+
422+
scheduler := NewFastScheduler(1)
423+
scheduler.Rebuild([]*Account{later, soon})
424+
425+
got := scheduler.Acquire()
426+
if got == nil {
427+
t.Fatal("Acquire() returned nil")
428+
}
429+
defer scheduler.Release(got)
430+
431+
if got.DBID != soon.DBID {
432+
t.Fatalf("Acquire() picked dbID=%d, want 7d reset-soon account %d", got.DBID, soon.DBID)
383433
}
384434
}
385435

auth/store.go

Lines changed: 51 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -134,6 +134,10 @@ const (
134134
premium5hUrgencyMaxBonus = 25.0
135135
premium5hUrgencyMinRemainingPct = 5.0
136136
premium5hUrgencyFullRemainingPct = 50.0
137+
premium7dUrgencyWindow = 72 * time.Hour
138+
premium7dUrgencyMaxBonus = 80.0
139+
premium7dUrgencyMinRemainingPct = 5.0
140+
premium7dUrgencyFullRemainingPct = 70.0
137141
)
138142

139143
// SchedulerBreakdown 调度评分拆解
@@ -147,6 +151,7 @@ type SchedulerBreakdown struct {
147151
ProvenBonus float64 // 经过验证的账号(TotalRequests > 10)加分
148152
UsagePenalty7d float64
149153
UsageUrgencyBonus5h float64
154+
UsageUrgencyBonus7d float64
150155
LatencyPenalty float64
151156
SuccessRatePenalty float64 // 滑动窗口成功率惩罚
152157
}
@@ -680,6 +685,49 @@ func (a *Account) premium5hUsageUrgencyBonusLocked(now time.Time) float64 {
680685
return premium5hUrgencyMaxBonus * timeFactor * quotaFactor
681686
}
682687

688+
func (a *Account) premium7dUsageUrgencyBonusLocked(now time.Time) float64 {
689+
if !IsPlusOrHigherPlan(a.PlanType) {
690+
return 0
691+
}
692+
if !a.UsagePercent7dValid || a.Reset7dAt.IsZero() {
693+
return 0
694+
}
695+
if a.UsagePercent7d >= 100 {
696+
return 0
697+
}
698+
if a.AccessToken == "" || a.Status == StatusError || a.HealthTier == HealthTierBanned {
699+
return 0
700+
}
701+
if atomic.LoadInt32(&a.DispatchPaused) != 0 {
702+
return 0
703+
}
704+
if a.Status == StatusCooldown && now.Before(a.CooldownUtil) {
705+
return 0
706+
}
707+
708+
timeRemaining := a.Reset7dAt.Sub(now)
709+
if timeRemaining <= 0 || timeRemaining > premium7dUrgencyWindow {
710+
return 0
711+
}
712+
713+
quotaRemaining := 100 - a.UsagePercent7d
714+
if quotaRemaining <= premium7dUrgencyMinRemainingPct {
715+
return 0
716+
}
717+
718+
timeFactor := 1 - float64(timeRemaining)/float64(premium7dUrgencyWindow)
719+
quotaFactor := quotaRemaining / premium7dUrgencyFullRemainingPct
720+
if quotaFactor > 1 {
721+
quotaFactor = 1
722+
}
723+
if quotaFactor < 0 {
724+
quotaFactor = 0
725+
}
726+
weightedQuotaFactor := 0.6 + 0.4*quotaFactor
727+
728+
return premium7dUrgencyMaxBonus * timeFactor * weightedQuotaFactor
729+
}
730+
683731
func (a *Account) effectiveBaseConcurrencyLocked(storeBaseLimit int64) int64 {
684732
if a.BaseConcurrencyOverride != nil && *a.BaseConcurrencyOverride > 0 {
685733
return *a.BaseConcurrencyOverride
@@ -770,8 +818,9 @@ func (a *Account) recomputeSchedulerLocked(baseLimit int64) {
770818
scoreBiasEffective := a.effectiveScoreBiasLocked(now, tier)
771819
if a.dispatchBonusEligibleLocked(now, tier) {
772820
breakdown.UsageUrgencyBonus5h = a.premium5hUsageUrgencyBonusLocked(now)
821+
breakdown.UsageUrgencyBonus7d = a.premium7dUsageUrgencyBonusLocked(now)
773822
}
774-
dispatchScore := score + float64(scoreBiasEffective) + breakdown.UsageUrgencyBonus5h
823+
dispatchScore := score + float64(scoreBiasEffective) + breakdown.UsageUrgencyBonus5h + breakdown.UsageUrgencyBonus7d
775824

776825
a.HealthTier = tier
777826
a.SchedulerScore = score
@@ -1274,6 +1323,7 @@ func (a *Account) GetSchedulerDebugSnapshot(baseLimit int64) SchedulerDebugSnaps
12741323
breakdown := a.schedulerBreakdownLocked(now)
12751324
if a.dispatchBonusEligibleLocked(now, a.HealthTier) {
12761325
breakdown.UsageUrgencyBonus5h = a.premium5hUsageUrgencyBonusLocked(now)
1326+
breakdown.UsageUrgencyBonus7d = a.premium7dUsageUrgencyBonusLocked(now)
12771327
}
12781328
return SchedulerDebugSnapshot{
12791329
HealthTier: string(a.HealthTier),

0 commit comments

Comments
 (0)