Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
16 commits
Select commit Hold shift + click to select a range
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
122 changes: 109 additions & 13 deletions sdk/cliproxy/auth/conductor.go
Original file line number Diff line number Diff line change
Expand Up @@ -1559,13 +1559,14 @@ func (m *Manager) Execute(ctx context.Context, providers []string, req cliproxye
_, maxRetryCredentials, maxWait := m.retrySettings()

var lastErr error
var failedAuthID string
for attempt := 0; ; attempt++ {
resp, errExec := m.executeMixedOnce(ctx, normalized, req, opts, maxRetryCredentials)
resp, errExec := m.executeMixedOnce(ctx, normalized, req, opts, maxRetryCredentials, &failedAuthID)
if errExec == nil {
return resp, nil
}
lastErr = errExec
wait, shouldRetry := m.shouldRetryAfterError(errExec, attempt, normalized, req.Model, maxWait)
wait, shouldRetry := m.shouldRetryAfterError(errExec, attempt, normalized, req.Model, maxWait, failedAuthID)
if !shouldRetry {
break
}
Expand Down Expand Up @@ -1594,13 +1595,14 @@ func (m *Manager) ExecuteCount(ctx context.Context, providers []string, req clip
_, maxRetryCredentials, maxWait := m.retrySettings()

var lastErr error
var failedAuthID string
for attempt := 0; ; attempt++ {
resp, errExec := m.executeCountMixedOnce(ctx, normalized, req, opts, maxRetryCredentials)
resp, errExec := m.executeCountMixedOnce(ctx, normalized, req, opts, maxRetryCredentials, &failedAuthID)
if errExec == nil {
return resp, nil
}
lastErr = errExec
wait, shouldRetry := m.shouldRetryAfterError(errExec, attempt, normalized, req.Model, maxWait)
wait, shouldRetry := m.shouldRetryAfterError(errExec, attempt, normalized, req.Model, maxWait, failedAuthID)
if !shouldRetry {
break
}
Expand All @@ -1625,13 +1627,14 @@ func (m *Manager) ExecuteStream(ctx context.Context, providers []string, req cli
_, maxRetryCredentials, maxWait := m.retrySettings()

var lastErr error
var failedAuthID string
for attempt := 0; ; attempt++ {
result, errStream := m.executeStreamMixedOnce(ctx, normalized, req, opts, maxRetryCredentials)
result, errStream := m.executeStreamMixedOnce(ctx, normalized, req, opts, maxRetryCredentials, &failedAuthID)
if errStream == nil {
return result, nil
}
lastErr = errStream
wait, shouldRetry := m.shouldRetryAfterError(errStream, attempt, normalized, req.Model, maxWait)
wait, shouldRetry := m.shouldRetryAfterError(errStream, attempt, normalized, req.Model, maxWait, failedAuthID)
if !shouldRetry {
break
}
Expand All @@ -1654,7 +1657,7 @@ func (m *Manager) ExecuteStream(ctx context.Context, providers []string, req cli
return nil, &Error{Code: "auth_not_found", Message: "no auth available"}
}

func (m *Manager) executeMixedOnce(ctx context.Context, providers []string, req cliproxyexecutor.Request, opts cliproxyexecutor.Options, maxRetryCredentials int) (cliproxyexecutor.Response, error) {
func (m *Manager) executeMixedOnce(ctx context.Context, providers []string, req cliproxyexecutor.Request, opts cliproxyexecutor.Options, maxRetryCredentials int, failedAuthID *string) (cliproxyexecutor.Response, error) {
if len(providers) == 0 {
return cliproxyexecutor.Response{}, &Error{Code: "provider_not_found", Message: "no provider supplied"}
}
Expand Down Expand Up @@ -1709,6 +1712,9 @@ func (m *Manager) executeMixedOnce(ctx context.Context, providers []string, req
result.Error.HTTPStatus = se.StatusCode()
}
m.MarkResult(execCtx, result)
if failedAuthID != nil {
*failedAuthID = auth.ID
}
lastErr = errPrepare
continue
}
Expand Down Expand Up @@ -1744,6 +1750,9 @@ func (m *Manager) executeMixedOnce(ctx context.Context, providers []string, req
if isRequestInvalidError(authErr) {
return cliproxyexecutor.Response{}, authErr
}
if failedAuthID != nil {
*failedAuthID = auth.ID
}
lastErr = authErr
if homeMode {
homeAuthCount++
Expand All @@ -1753,7 +1762,7 @@ func (m *Manager) executeMixedOnce(ctx context.Context, providers []string, req
}
}

func (m *Manager) executeCountMixedOnce(ctx context.Context, providers []string, req cliproxyexecutor.Request, opts cliproxyexecutor.Options, maxRetryCredentials int) (cliproxyexecutor.Response, error) {
func (m *Manager) executeCountMixedOnce(ctx context.Context, providers []string, req cliproxyexecutor.Request, opts cliproxyexecutor.Options, maxRetryCredentials int, failedAuthID *string) (cliproxyexecutor.Response, error) {
if len(providers) == 0 {
return cliproxyexecutor.Response{}, &Error{Code: "provider_not_found", Message: "no provider supplied"}
}
Expand Down Expand Up @@ -1808,6 +1817,9 @@ func (m *Manager) executeCountMixedOnce(ctx context.Context, providers []string,
result.Error.HTTPStatus = se.StatusCode()
}
m.MarkResult(execCtx, result)
if failedAuthID != nil {
*failedAuthID = auth.ID
}
lastErr = errPrepare
continue
}
Expand Down Expand Up @@ -1843,6 +1855,9 @@ func (m *Manager) executeCountMixedOnce(ctx context.Context, providers []string,
if isRequestInvalidError(authErr) {
return cliproxyexecutor.Response{}, authErr
}
if failedAuthID != nil {
*failedAuthID = auth.ID
}
lastErr = authErr
if homeMode {
homeAuthCount++
Expand All @@ -1852,7 +1867,7 @@ func (m *Manager) executeCountMixedOnce(ctx context.Context, providers []string,
}
}

func (m *Manager) executeStreamMixedOnce(ctx context.Context, providers []string, req cliproxyexecutor.Request, opts cliproxyexecutor.Options, maxRetryCredentials int) (*cliproxyexecutor.StreamResult, error) {
func (m *Manager) executeStreamMixedOnce(ctx context.Context, providers []string, req cliproxyexecutor.Request, opts cliproxyexecutor.Options, maxRetryCredentials int, failedAuthID *string) (*cliproxyexecutor.StreamResult, error) {
if len(providers) == 0 {
return nil, &Error{Code: "provider_not_found", Message: "no provider supplied"}
}
Expand Down Expand Up @@ -1905,6 +1920,9 @@ func (m *Manager) executeStreamMixedOnce(ctx context.Context, providers []string
result.Error.HTTPStatus = se.StatusCode()
}
m.MarkResult(execCtx, result)
if failedAuthID != nil {
*failedAuthID = auth.ID
}
lastErr = errPrepare
continue
}
Expand All @@ -1917,6 +1935,9 @@ func (m *Manager) executeStreamMixedOnce(ctx context.Context, providers []string
if isRequestInvalidError(errStream) {
return nil, errStream
}
if failedAuthID != nil {
*failedAuthID = auth.ID
}
lastErr = errStream
if homeMode {
homeAuthCount++
Expand Down Expand Up @@ -2569,11 +2590,47 @@ func (m *Manager) retryAllowed(attempt int, providers []string) bool {
return false
}

func (m *Manager) shouldRetryAfterError(err error, attempt int, providers []string, model string, maxWait time.Duration) (time.Duration, bool) {
if err == nil {
return 0, false
// eofRetryAllowed decides whether a statusless "unexpected EOF" may be retried.
// It honors the auth-file scoped request_retry override of the credential that
// actually failed (failedAuthID) so a credential that disabled retries is not
// retried just because another credential for the provider still allows it.
// When the failing auth is unknown it falls back to the provider-wide check.
func (m *Manager) eofRetryAllowed(attempt int, providers []string, failedAuthID string) bool {
if allowed, known := m.authRetryAllowed(attempt, failedAuthID); known {
return allowed
}
if maxWait <= 0 {
return m.retryAllowed(attempt, providers)
}

// authRetryAllowed reports whether the specific auth permits another attempt,
// honoring its auth-file scoped request_retry override. The second return value
// is false when the auth is unknown (empty ID or not registered).
func (m *Manager) authRetryAllowed(attempt int, authID string) (bool, bool) {
if m == nil || attempt < 0 {
return false, false
}
authID = strings.TrimSpace(authID)
if authID == "" {
return false, false
}
m.mu.RLock()
auth := m.auths[authID]
m.mu.RUnlock()
if auth == nil {
return false, false
}
effectiveRetry := int(m.requestRetry.Load())
if override, ok := auth.RequestRetryOverride(); ok {
effectiveRetry = override
}
if effectiveRetry < 0 {
effectiveRetry = 0
}
return attempt < effectiveRetry, true
}

func (m *Manager) shouldRetryAfterError(err error, attempt int, providers []string, model string, maxWait time.Duration, failedAuthID string) (time.Duration, bool) {
if err == nil {
return 0, false
}
status := statusCodeFromError(err)
Expand All @@ -2583,6 +2640,31 @@ func (m *Manager) shouldRetryAfterError(err error, attempt int, providers []stri
if isRequestInvalidError(err) {
return 0, false
}
// Treat transient, statusless upstream stream interruptions (e.g.
// "unexpected EOF") like a normal retryable API error instead of surfacing
// them to the client. These carry no HTTP status, so they would otherwise
// fall through the 429-only retry path below and abort the request. The
// status==0 guard is important: a status-bearing error (e.g. a 5xx whose
// body merely contains "unexpected EOF") has already had a cooldown applied
// by MarkResult, so it must fall through to the cooldown-aware path below
// rather than force an immediate retry that selection would reject with a
// model-cooldown error. eofRetryAllowed also honors the failed credential's
// auth-file scoped request_retry override.
//
// This is intentionally checked before the maxWait gate below: an EOF retry
// is immediate (wait=0) and needs no cooldown budget, so it must remain
// gated by the retry count alone and stay available even when
// max-retry-interval is 0 (which only caps how long we wait on cooled-down
// credentials).
if status == 0 && isUnexpectedEOFError(err) {
if !m.eofRetryAllowed(attempt, providers, failedAuthID) {
return 0, false
}
return 0, true
Comment thread
hex-ci marked this conversation as resolved.
}
if maxWait <= 0 {
return 0, false
}
wait, found := m.closestCooldownWait(providers, model, attempt)
if found {
if wait > maxWait {
Expand Down Expand Up @@ -2969,6 +3051,20 @@ func errorString(err error) string {
return err.Error()
}

// isUnexpectedEOFError reports whether err represents an "unexpected EOF"
// surfaced when an upstream connection or stream is cut off mid-response.
// These are transient connection failures rather than genuine API errors, so
// callers should retry them instead of returning them to the client.
func isUnexpectedEOFError(err error) bool {
if err == nil {
return false
}
if errors.Is(err, io.ErrUnexpectedEOF) {
return true
}
return strings.Contains(strings.ToLower(err.Error()), "unexpected eof")
}

func statusCodeFromError(err error) int {
if err == nil {
return 0
Expand Down
Loading
Loading