Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
76 changes: 64 additions & 12 deletions sdk/cliproxy/auth/conductor.go
Original file line number Diff line number Diff line change
Expand Up @@ -1295,13 +1295,14 @@ func (m *Manager) Execute(ctx context.Context, providers []string, req cliproxye
_, maxRetryCredentials, maxWait := m.retrySettings()

var lastErr error
var failedAuthID string
for attempt := 0; ; attempt++ {
resp, errExec := m.executeMixedOnce(ctx, normalized, req, opts, maxRetryCredentials)
resp, errExec := m.executeMixedOnce(ctx, normalized, req, opts, maxRetryCredentials, &failedAuthID)
if errExec == nil {
return resp, nil
}
lastErr = errExec
wait, shouldRetry := m.shouldRetryAfterError(errExec, attempt, normalized, req.Model, maxWait)
wait, shouldRetry := m.shouldRetryAfterError(errExec, attempt, normalized, req.Model, maxWait, failedAuthID)
if !shouldRetry {
break
}
Expand Down Expand Up @@ -1330,13 +1331,14 @@ func (m *Manager) ExecuteCount(ctx context.Context, providers []string, req clip
_, maxRetryCredentials, maxWait := m.retrySettings()

var lastErr error
var failedAuthID string
for attempt := 0; ; attempt++ {
resp, errExec := m.executeCountMixedOnce(ctx, normalized, req, opts, maxRetryCredentials)
resp, errExec := m.executeCountMixedOnce(ctx, normalized, req, opts, maxRetryCredentials, &failedAuthID)
if errExec == nil {
return resp, nil
}
lastErr = errExec
wait, shouldRetry := m.shouldRetryAfterError(errExec, attempt, normalized, req.Model, maxWait)
wait, shouldRetry := m.shouldRetryAfterError(errExec, attempt, normalized, req.Model, maxWait, failedAuthID)
if !shouldRetry {
break
}
Expand All @@ -1361,13 +1363,14 @@ func (m *Manager) ExecuteStream(ctx context.Context, providers []string, req cli
_, maxRetryCredentials, maxWait := m.retrySettings()

var lastErr error
var failedAuthID string
for attempt := 0; ; attempt++ {
result, errStream := m.executeStreamMixedOnce(ctx, normalized, req, opts, maxRetryCredentials)
result, errStream := m.executeStreamMixedOnce(ctx, normalized, req, opts, maxRetryCredentials, &failedAuthID)
if errStream == nil {
return result, nil
}
lastErr = errStream
wait, shouldRetry := m.shouldRetryAfterError(errStream, attempt, normalized, req.Model, maxWait)
wait, shouldRetry := m.shouldRetryAfterError(errStream, attempt, normalized, req.Model, maxWait, failedAuthID)
if !shouldRetry {
break
}
Expand All @@ -1390,7 +1393,7 @@ func (m *Manager) ExecuteStream(ctx context.Context, providers []string, req cli
return nil, &Error{Code: "auth_not_found", Message: "no auth available"}
}

func (m *Manager) executeMixedOnce(ctx context.Context, providers []string, req cliproxyexecutor.Request, opts cliproxyexecutor.Options, maxRetryCredentials int) (cliproxyexecutor.Response, error) {
func (m *Manager) executeMixedOnce(ctx context.Context, providers []string, req cliproxyexecutor.Request, opts cliproxyexecutor.Options, maxRetryCredentials int, failedAuthID *string) (cliproxyexecutor.Response, error) {
if len(providers) == 0 {
return cliproxyexecutor.Response{}, &Error{Code: "provider_not_found", Message: "no provider supplied"}
}
Expand Down Expand Up @@ -1423,6 +1426,9 @@ func (m *Manager) executeMixedOnce(ctx context.Context, providers []string, req
entry := logEntryWithRequestID(ctx)
debugLogAuthSelection(entry, auth, provider, req.Model)
publishSelectedAuthMetadata(opts.Metadata, auth.ID)
if failedAuthID != nil {
*failedAuthID = auth.ID
}

tried[auth.ID] = struct{}{}
execCtx := ctx
Expand Down Expand Up @@ -1489,7 +1495,7 @@ func (m *Manager) executeMixedOnce(ctx context.Context, providers []string, req
}
}

func (m *Manager) executeCountMixedOnce(ctx context.Context, providers []string, req cliproxyexecutor.Request, opts cliproxyexecutor.Options, maxRetryCredentials int) (cliproxyexecutor.Response, error) {
func (m *Manager) executeCountMixedOnce(ctx context.Context, providers []string, req cliproxyexecutor.Request, opts cliproxyexecutor.Options, maxRetryCredentials int, failedAuthID *string) (cliproxyexecutor.Response, error) {
if len(providers) == 0 {
return cliproxyexecutor.Response{}, &Error{Code: "provider_not_found", Message: "no provider supplied"}
}
Expand Down Expand Up @@ -1522,6 +1528,9 @@ func (m *Manager) executeCountMixedOnce(ctx context.Context, providers []string,
entry := logEntryWithRequestID(ctx)
debugLogAuthSelection(entry, auth, provider, req.Model)
publishSelectedAuthMetadata(opts.Metadata, auth.ID)
if failedAuthID != nil {
*failedAuthID = auth.ID
}

tried[auth.ID] = struct{}{}
execCtx := ctx
Expand Down Expand Up @@ -1588,7 +1597,7 @@ func (m *Manager) executeCountMixedOnce(ctx context.Context, providers []string,
}
}

func (m *Manager) executeStreamMixedOnce(ctx context.Context, providers []string, req cliproxyexecutor.Request, opts cliproxyexecutor.Options, maxRetryCredentials int) (*cliproxyexecutor.StreamResult, error) {
func (m *Manager) executeStreamMixedOnce(ctx context.Context, providers []string, req cliproxyexecutor.Request, opts cliproxyexecutor.Options, maxRetryCredentials int, failedAuthID *string) (*cliproxyexecutor.StreamResult, error) {
if len(providers) == 0 {
return nil, &Error{Code: "provider_not_found", Message: "no provider supplied"}
}
Expand Down Expand Up @@ -1621,6 +1630,9 @@ func (m *Manager) executeStreamMixedOnce(ctx context.Context, providers []string
entry := logEntryWithRequestID(ctx)
debugLogAuthSelection(entry, auth, provider, req.Model)
publishSelectedAuthMetadata(opts.Metadata, auth.ID)
if failedAuthID != nil {
*failedAuthID = auth.ID
}

tried[auth.ID] = struct{}{}
execCtx := ctx
Expand Down Expand Up @@ -2305,7 +2317,46 @@ func (m *Manager) retryAllowed(attempt int, providers []string) bool {
return false
}

func (m *Manager) shouldRetryAfterError(err error, attempt int, providers []string, model string, maxWait time.Duration) (time.Duration, bool) {
// eofRetryAllowed decides whether a statusless "unexpected EOF" may be retried.
// It honors the auth-file scoped request_retry override of the credential that
// actually failed (failedAuthID) so a credential that disabled retries is not
// retried just because another credential for the provider still allows it.
// When the failing auth is unknown it falls back to the provider-wide check.
func (m *Manager) eofRetryAllowed(attempt int, providers []string, failedAuthID string) bool {
if allowed, known := m.authRetryAllowed(attempt, failedAuthID); known {
return allowed
}
return m.retryAllowed(attempt, providers)
}

// authRetryAllowed reports whether the specific auth permits another attempt,
// honoring its auth-file scoped request_retry override. The second return value
// is false when the auth is unknown (empty ID or not registered).
func (m *Manager) authRetryAllowed(attempt int, authID string) (bool, bool) {
if m == nil || attempt < 0 {
return false, false
}
authID = strings.TrimSpace(authID)
if authID == "" {
return false, false
}
m.mu.RLock()
auth := m.auths[authID]
m.mu.RUnlock()
if auth == nil {
return false, false
}
effectiveRetry := int(m.requestRetry.Load())
if override, ok := auth.RequestRetryOverride(); ok {
effectiveRetry = override
}
if effectiveRetry < 0 {
effectiveRetry = 0
}
return attempt < effectiveRetry, true
}

func (m *Manager) shouldRetryAfterError(err error, attempt int, providers []string, model string, maxWait time.Duration, failedAuthID string) (time.Duration, bool) {
if err == nil {
return 0, false
}
Expand All @@ -2327,9 +2378,10 @@ func (m *Manager) shouldRetryAfterError(err error, attempt int, providers []stri
// body merely contains "unexpected EOF") has already had a cooldown applied
// by MarkResult, so it must fall through to the cooldown-aware path below
// rather than force an immediate retry that selection would reject with a
// model-cooldown error.
// model-cooldown error. eofRetryAllowed also honors the failed credential's
// auth-file scoped request_retry override.
if status == 0 && isUnexpectedEOFError(err) {
if !m.retryAllowed(attempt, providers) {
if !m.eofRetryAllowed(attempt, providers, failedAuthID) {
return 0, false
}
return 0, true
Expand Down
57 changes: 48 additions & 9 deletions sdk/cliproxy/auth/conductor_overrides_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ func TestManager_ShouldRetryAfterError_RespectsAuthRequestRetryOverride(t *testi
}

_, _, maxWait := m.retrySettings()
wait, shouldRetry := m.shouldRetryAfterError(&Error{HTTPStatus: 500, Message: "boom"}, 0, []string{"claude"}, model, maxWait)
wait, shouldRetry := m.shouldRetryAfterError(&Error{HTTPStatus: 500, Message: "boom"}, 0, []string{"claude"}, model, maxWait, "")
if shouldRetry {
t.Fatalf("expected shouldRetry=false for request_retry=0, got true (wait=%v)", wait)
}
Expand All @@ -53,15 +53,15 @@ func TestManager_ShouldRetryAfterError_RespectsAuthRequestRetryOverride(t *testi
t.Fatalf("update auth: %v", errUpdate)
}

wait, shouldRetry = m.shouldRetryAfterError(&Error{HTTPStatus: 500, Message: "boom"}, 0, []string{"claude"}, model, maxWait)
wait, shouldRetry = m.shouldRetryAfterError(&Error{HTTPStatus: 500, Message: "boom"}, 0, []string{"claude"}, model, maxWait, "")
if !shouldRetry {
t.Fatalf("expected shouldRetry=true for request_retry=1, got false")
}
if wait <= 0 {
t.Fatalf("expected wait > 0, got %v", wait)
}

_, shouldRetry = m.shouldRetryAfterError(&Error{HTTPStatus: 500, Message: "boom"}, 1, []string{"claude"}, model, maxWait)
_, shouldRetry = m.shouldRetryAfterError(&Error{HTTPStatus: 500, Message: "boom"}, 1, []string{"claude"}, model, maxWait, "")
if shouldRetry {
t.Fatalf("expected shouldRetry=false on attempt=1 for request_retry=1, got true")
}
Expand Down Expand Up @@ -101,7 +101,7 @@ func TestManager_ShouldRetryAfterError_UsesOAuthModelAliasForCooldown(t *testing
}

_, _, maxWait := m.retrySettings()
wait, shouldRetry := m.shouldRetryAfterError(&Error{HTTPStatus: 429, Message: "quota"}, 0, []string{"kimi"}, routeModel, maxWait)
wait, shouldRetry := m.shouldRetryAfterError(&Error{HTTPStatus: 429, Message: "quota"}, 0, []string{"kimi"}, routeModel, maxWait, "")
if !shouldRetry {
t.Fatalf("expected shouldRetry=true, got false (wait=%v)", wait)
}
Expand All @@ -124,7 +124,7 @@ func TestManager_ShouldRetryAfterError_RetriesUnexpectedEOF(t *testing.T) {

// A statusless "unexpected EOF" error (as produced by a truncated upstream
// stream) must be retried immediately rather than surfaced to the client.
wait, shouldRetry := m.shouldRetryAfterError(&Error{Message: "unexpected EOF"}, 0, []string{"claude"}, model, maxWait)
wait, shouldRetry := m.shouldRetryAfterError(&Error{Message: "unexpected EOF"}, 0, []string{"claude"}, model, maxWait, "")
if !shouldRetry {
t.Fatalf("expected shouldRetry=true for unexpected EOF, got false")
}
Expand All @@ -134,12 +134,12 @@ func TestManager_ShouldRetryAfterError_RetriesUnexpectedEOF(t *testing.T) {

// io.ErrUnexpectedEOF (possibly wrapped) is detected as well.
wrapped := fmt.Errorf("read stream: %w", io.ErrUnexpectedEOF)
if _, shouldRetry = m.shouldRetryAfterError(wrapped, 0, []string{"claude"}, model, maxWait); !shouldRetry {
if _, shouldRetry = m.shouldRetryAfterError(wrapped, 0, []string{"claude"}, model, maxWait, ""); !shouldRetry {
t.Fatalf("expected shouldRetry=true for wrapped io.ErrUnexpectedEOF, got false")
}

// Retries stop once the configured request-retry count is exhausted.
if _, shouldRetry = m.shouldRetryAfterError(&Error{Message: "unexpected EOF"}, 3, []string{"claude"}, model, maxWait); shouldRetry {
if _, shouldRetry = m.shouldRetryAfterError(&Error{Message: "unexpected EOF"}, 3, []string{"claude"}, model, maxWait, ""); shouldRetry {
t.Fatalf("expected shouldRetry=false on attempt=3 for request_retry=3, got true")
}
}
Expand Down Expand Up @@ -171,7 +171,7 @@ func TestManager_ShouldRetryAfterError_StatusBearingEOFUsesCooldownPath(t *testi
// immediate fast path; with a single cooled-down credential and a cooldown
// exceeding max-retry-interval, the cooldown-aware path declines the retry.
statusErr := &Error{HTTPStatus: 500, Message: "internal error: unexpected EOF"}
if wait, shouldRetry := m.shouldRetryAfterError(statusErr, 0, []string{"claude"}, model, maxWait); shouldRetry {
if wait, shouldRetry := m.shouldRetryAfterError(statusErr, 0, []string{"claude"}, model, maxWait, ""); shouldRetry {
t.Fatalf("expected shouldRetry=false for status-bearing EOF beyond cooldown, got true (wait=%v)", wait)
}
}
Expand All @@ -186,11 +186,50 @@ func TestManager_ShouldRetryAfterError_UnexpectedEOFRespectsRetryDisabled(t *tes
}

_, _, maxWait := m.retrySettings()
if _, shouldRetry := m.shouldRetryAfterError(&Error{Message: "unexpected EOF"}, 0, []string{"claude"}, "test-model", maxWait); shouldRetry {
if _, shouldRetry := m.shouldRetryAfterError(&Error{Message: "unexpected EOF"}, 0, []string{"claude"}, "test-model", maxWait, ""); shouldRetry {
t.Fatalf("expected shouldRetry=false when request-retry=0, got true")
}
}

func TestManager_ShouldRetryAfterError_UnexpectedEOFRespectsAuthRequestRetryOverride(t *testing.T) {
m := NewManager(nil, nil, nil)
m.SetRetryConfig(3, 30*time.Second, 0)

model := "test-model"
// The credential that fails has retries disabled via its auth-file override.
disabled := &Auth{
ID: "auth-disabled",
Provider: "claude",
Metadata: map[string]any{"request_retry": float64(0)},
}
// A sibling credential for the same provider still allows retries.
enabled := &Auth{ID: "auth-enabled", Provider: "claude"}
for _, a := range []*Auth{disabled, enabled} {
if _, errRegister := m.Register(context.Background(), a); errRegister != nil {
t.Fatalf("register %s: %v", a.ID, errRegister)
}
}

_, _, maxWait := m.retrySettings()
eof := &Error{Message: "unexpected EOF"}

// When the failing credential disabled retries, its statusless EOF must not
// be retried even though a sibling credential still allows retries.
if _, shouldRetry := m.shouldRetryAfterError(eof, 0, []string{"claude"}, model, maxWait, "auth-disabled"); shouldRetry {
t.Fatalf("expected shouldRetry=false for EOF on request_retry=0 credential, got true")
}

// The retry-enabled credential still retries its EOF.
if _, shouldRetry := m.shouldRetryAfterError(eof, 0, []string{"claude"}, model, maxWait, "auth-enabled"); !shouldRetry {
t.Fatalf("expected shouldRetry=true for EOF on retry-enabled credential, got false")
}

// An unknown failing auth falls back to the provider-wide retry check.
if _, shouldRetry := m.shouldRetryAfterError(eof, 0, []string{"claude"}, model, maxWait, ""); !shouldRetry {
t.Fatalf("expected shouldRetry=true for EOF with unknown auth fallback, got false")
}
}

type credentialRetryLimitExecutor struct {
id string

Expand Down