Skip to content

Commit fb21a17

Browse files
test(auth): preserve session affinity while waiting
1 parent a104a4e commit fb21a17

4 files changed

Lines changed: 94 additions & 10 deletions

File tree

auth/session_affinity_test.go

Lines changed: 79 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,10 @@
11
package auth
22

3-
import "testing"
3+
import (
4+
"context"
5+
"testing"
6+
"time"
7+
)
48

59
func TestNextForSessionPrefersBoundAccountAndProxy(t *testing.T) {
610
store := &Store{
@@ -45,3 +49,77 @@ func TestNextForSessionFallsBackWhenBoundAccountExcluded(t *testing.T) {
4549
t.Fatalf("proxyURL = %q, want empty fallback proxy", proxyURL)
4650
}
4751
}
52+
53+
func TestWaitForSessionAvailableReturnsBoundAccount(t *testing.T) {
54+
store := &Store{
55+
accounts: []*Account{
56+
{DBID: 1, AccessToken: "tok-1"},
57+
{DBID: 2, AccessToken: "tok-2"},
58+
},
59+
maxConcurrency: 1,
60+
}
61+
store.bindSessionAffinity("session-1", store.accounts[1], "http://proxy-2")
62+
63+
ctx, cancel := context.WithTimeout(context.Background(), 200*time.Millisecond)
64+
defer cancel()
65+
66+
acc, proxyURL := store.WaitForSessionAvailable(ctx, "session-1", 50*time.Millisecond, nil)
67+
if acc == nil {
68+
t.Fatal("expected bound account")
69+
}
70+
if acc.DBID != 2 {
71+
t.Fatalf("account DBID = %d, want %d", acc.DBID, 2)
72+
}
73+
if proxyURL != "http://proxy-2" {
74+
t.Fatalf("proxyURL = %q, want %q", proxyURL, "http://proxy-2")
75+
}
76+
}
77+
78+
func TestWaitForSessionAvailableFallsBackWhenBindingExpired(t *testing.T) {
79+
store := &Store{
80+
accounts: []*Account{
81+
{DBID: 1, AccessToken: "tok-1"},
82+
},
83+
maxConcurrency: 1,
84+
sessionBindings: map[string]sessionAffinity{"session-1": {accountID: 99, proxyURL: "http://stale", expiresAt: time.Now().Add(-time.Minute)}},
85+
}
86+
87+
ctx, cancel := context.WithTimeout(context.Background(), 200*time.Millisecond)
88+
defer cancel()
89+
90+
acc, proxyURL := store.WaitForSessionAvailable(ctx, "session-1", 50*time.Millisecond, nil)
91+
if acc == nil {
92+
t.Fatal("expected fallback account")
93+
}
94+
if acc.DBID != 1 {
95+
t.Fatalf("account DBID = %d, want %d", acc.DBID, 1)
96+
}
97+
if proxyURL != "" {
98+
t.Fatalf("proxyURL = %q, want empty fallback proxy", proxyURL)
99+
}
100+
}
101+
102+
func TestWaitForSessionAvailableRespectsExcludeSet(t *testing.T) {
103+
store := &Store{
104+
accounts: []*Account{
105+
{DBID: 1, AccessToken: "tok-1"},
106+
{DBID: 2, AccessToken: "tok-2"},
107+
},
108+
maxConcurrency: 1,
109+
}
110+
store.bindSessionAffinity("session-1", store.accounts[1], "http://proxy-2")
111+
112+
ctx, cancel := context.WithTimeout(context.Background(), 200*time.Millisecond)
113+
defer cancel()
114+
115+
acc, proxyURL := store.WaitForSessionAvailable(ctx, "session-1", 50*time.Millisecond, map[int64]bool{2: true})
116+
if acc == nil {
117+
t.Fatal("expected fallback account")
118+
}
119+
if acc.DBID != 1 {
120+
t.Fatalf("account DBID = %d, want %d", acc.DBID, 1)
121+
}
122+
if proxyURL != "" {
123+
t.Fatalf("proxyURL = %q, want empty fallback proxy", proxyURL)
124+
}
125+
}

auth/store.go

Lines changed: 12 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -1378,6 +1378,12 @@ func (s *Store) takeByIDExcluding(id int64, exclude map[int64]bool) *Account {
13781378

13791379
// WaitForAvailable 等待可用账号(带超时的请求排队)
13801380
func (s *Store) WaitForAvailable(ctx context.Context, timeout time.Duration) *Account {
1381+
acc, _ := s.WaitForSessionAvailable(ctx, "", timeout, nil)
1382+
return acc
1383+
}
1384+
1385+
// WaitForSessionAvailable waits for a session-preferred account and proxy pair.
1386+
func (s *Store) WaitForSessionAvailable(ctx context.Context, key string, timeout time.Duration, exclude map[int64]bool) (*Account, string) {
13811387
deadline := time.NewTimer(timeout)
13821388
defer deadline.Stop()
13831389

@@ -1388,13 +1394,13 @@ func (s *Store) WaitForAvailable(ctx context.Context, timeout time.Duration) *Ac
13881394
for {
13891395
select {
13901396
case <-ctx.Done():
1391-
return nil
1397+
return nil, ""
13921398
case <-deadline.C:
1393-
return nil
1399+
return nil, ""
13941400
default:
1395-
acc := s.Next()
1401+
acc, proxyURL := s.NextForSession(key, exclude)
13961402
if acc != nil {
1397-
return acc
1403+
return acc, proxyURL
13981404
}
13991405
// 等待一下再重试(指数退避,最大 500ms)
14001406
backoffTimer.Reset(backoff)
@@ -1404,9 +1410,9 @@ func (s *Store) WaitForAvailable(ctx context.Context, timeout time.Duration) *Ac
14041410
backoff *= 2
14051411
}
14061412
case <-ctx.Done():
1407-
return nil
1413+
return nil, ""
14081414
case <-deadline.C:
1409-
return nil
1415+
return nil, ""
14101416
}
14111417
}
14121418
}

proxy/handler.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -454,7 +454,7 @@ func (h *Handler) Responses(c *gin.Context) {
454454
account, stickyProxyURL := h.nextAccountForSession(sessionID, excludeAccounts)
455455
if account == nil {
456456
// 排队等待可用账号(最多 30s)
457-
account = h.store.WaitForAvailable(c.Request.Context(), 30*time.Second)
457+
account, stickyProxyURL = h.store.WaitForSessionAvailable(c.Request.Context(), sessionID, 30*time.Second, excludeAccounts)
458458
if account == nil {
459459
if lastStatusCode == http.StatusTooManyRequests && len(lastBody) > 0 {
460460
h.sendFinalUpstreamError(c, lastStatusCode, lastBody)
@@ -823,7 +823,7 @@ func (h *Handler) ChatCompletions(c *gin.Context) {
823823
account, stickyProxyURL := h.nextAccountForSession(sessionID, excludeAccounts)
824824
if account == nil {
825825
// 排队等待可用账号(最多 30s)
826-
account = h.store.WaitForAvailable(c.Request.Context(), 30*time.Second)
826+
account, stickyProxyURL = h.store.WaitForSessionAvailable(c.Request.Context(), sessionID, 30*time.Second, excludeAccounts)
827827
if account == nil {
828828
if lastStatusCode == http.StatusTooManyRequests && len(lastBody) > 0 {
829829
h.sendFinalUpstreamError(c, lastStatusCode, lastBody)

proxy/handler_anthropic.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -121,7 +121,7 @@ func (h *Handler) Messages(c *gin.Context) {
121121
for attempt := 0; attempt <= maxRetries; attempt++ {
122122
account, stickyProxyURL := h.nextAccountForSession(sessionID, excludeAccounts)
123123
if account == nil {
124-
account = h.store.WaitForAvailable(c.Request.Context(), 30*time.Second)
124+
account, stickyProxyURL = h.store.WaitForSessionAvailable(c.Request.Context(), sessionID, 30*time.Second, excludeAccounts)
125125
if account == nil {
126126
if lastStatusCode == http.StatusTooManyRequests && len(lastBody) > 0 {
127127
sendAnthropicError(c, http.StatusTooManyRequests, "rate_limit_error", "All accounts rate limited")

0 commit comments

Comments
 (0)