Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion bridge.go
Original file line number Diff line number Diff line change
Expand Up @@ -276,7 +276,7 @@ func newInterceptionProcessor(p provider.Provider, cbs *circuitbreaker.ProviderC
log.Debug(ctx, "interception ended")
}

asyncRecorder.RecordInterceptionEnded(ctx, &recorder.InterceptionRecordEnded{ID: interceptor.ID().String()})
_ = asyncRecorder.RecordInterceptionEnded(ctx, &recorder.InterceptionRecordEnded{ID: interceptor.ID().String()})

// Ensure all recording have completed before completing request.
asyncRecorder.Wait()
Expand Down
18 changes: 9 additions & 9 deletions intercept/chatcompletions/base.go
Original file line number Diff line number Diff line change
Expand Up @@ -173,7 +173,7 @@ func (i *interceptionBase) unmarshalArgs(in string) (args recorder.ToolArgs) {
}

// writeUpstreamError marshals and writes a given error.
func (i *interceptionBase) writeUpstreamError(w http.ResponseWriter, oaiErr *errorResponse) {
func (i *interceptionBase) writeUpstreamError(w http.ResponseWriter, oaiErr *chatCompletionResponseError) {
if oaiErr == nil {
return
}
Expand All @@ -183,7 +183,7 @@ func (i *interceptionBase) writeUpstreamError(w http.ResponseWriter, oaiErr *err

out, err := json.Marshal(oaiErr)
if err != nil {
i.logger.Warn(context.Background(), "failed to marshal upstream error", slog.Error(err), slog.F("error_payload", slog.F("%+v", oaiErr)))
i.logger.Warn(context.Background(), "failed to marshal upstream error", slog.Error(err), slog.F("error_payload", oaiErr))
// Response has to match expected format.
_, _ = w.Write([]byte(`{
"error": {
Expand Down Expand Up @@ -228,13 +228,13 @@ func calculateActualInputTokenUsage(in openai.CompletionUsage) int64 {
in.PromptTokensDetails.CachedTokens /* The aggregated number of text input tokens that has been cached from previous requests. */
}

func getErrorResponse(err error) *errorResponse {
func getErrorResponse(err error) *chatCompletionResponseError {
var apiErr *openai.Error
if !errors.As(err, &apiErr) {
return nil
}

return &errorResponse{
return &chatCompletionResponseError{
ErrorObject: &shared.ErrorObject{
Code: apiErr.Code,
Message: apiErr.Message,
Expand All @@ -244,15 +244,15 @@ func getErrorResponse(err error) *errorResponse {
}
}

var _ error = &errorResponse{}
var _ error = &chatCompletionResponseError{}

type errorResponse struct {
type chatCompletionResponseError struct {
ErrorObject *shared.ErrorObject `json:"error"`
StatusCode int `json:"-"`
}

func newErrorResponse(msg error) *errorResponse {
return &errorResponse{
func newErrorResponse(msg error) *chatCompletionResponseError {
return &chatCompletionResponseError{
ErrorObject: &shared.ErrorObject{
Code: "error",
Message: msg.Error(),
Expand All @@ -261,7 +261,7 @@ func newErrorResponse(msg error) *errorResponse {
}
}

func (a *errorResponse) Error() string {
func (a *chatCompletionResponseError) Error() string {
if a.ErrorObject == nil {
return ""
}
Expand Down
6 changes: 3 additions & 3 deletions intercept/chatcompletions/streaming.go
Original file line number Diff line number Diff line change
Expand Up @@ -245,7 +245,7 @@ func (i *StreamingInterception) ProcessRequest(w http.ResponseWriter, r *http.Re
logger.Warn(ctx, "openai stream error", slog.Error(streamErr))
interceptionErr = oaiErr
} else {
logger.Warn(ctx, "unknown error", slog.Error(streamErr))
logger.Warn(ctx, "unknown stream error encountered", slog.Error(streamErr))
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
logger.Warn(ctx, "unknown stream error encountered", slog.Error(streamErr))
logger.Warn(ctx, "unknown stream error", slog.Error(streamErr))

// Unfortunately, the OpenAI SDK does not support parsing errors received in the stream
// into known types (i.e. [shared.OverloadedError]).
// See https://github.com/openai/openai-go/blob/v2.7.0/packages/ssestream/ssestream.go#L171
Expand All @@ -254,14 +254,14 @@ func (i *StreamingInterception) ProcessRequest(w http.ResponseWriter, r *http.Re
}
} else if lastErr != nil {
// Otherwise check if any logical errors occurred during processing.
logger.Warn(ctx, "stream failed", slog.Error(lastErr))
logger.Warn(ctx, "stream processing failed", slog.Error(lastErr))
interceptionErr = newErrorResponse(xerrors.Errorf("processing error: %w", lastErr))
}

if interceptionErr != nil {
payload, err := i.marshalErr(interceptionErr)
if err != nil {
logger.Warn(ctx, "failed to marshal error", slog.Error(err), slog.F("error_payload", slog.F("%+v", interceptionErr)))
logger.Warn(ctx, "failed to marshal error", slog.Error(err), slog.F("error_payload", interceptionErr))
} else if err := events.Send(streamCtx, payload); err != nil {
logger.Warn(ctx, "failed to relay error", slog.Error(err), slog.F("payload", payload))
}
Expand Down
2 changes: 1 addition & 1 deletion intercept/eventstream/eventstream.go
Original file line number Diff line number Diff line change
Expand Up @@ -132,7 +132,7 @@ func (s *EventStream) Start(w http.ResponseWriter, r *http.Request) {
return
}
if err := flush(w); err != nil {
s.logger.Warn(ctx, "failed to flush", slog.Error(err))
s.logger.Warn(ctx, "failed to flush event stream", slog.Error(err))
return
}

Expand Down
18 changes: 9 additions & 9 deletions intercept/messages/base.go
Original file line number Diff line number Diff line change
Expand Up @@ -405,7 +405,7 @@ func filterBedrockBetaFlags(headers http.Header, model string) {
}

// writeUpstreamError marshals and writes a given error.
func (i *interceptionBase) writeUpstreamError(w http.ResponseWriter, antErr *ErrorResponse) {
func (i *interceptionBase) writeUpstreamError(w http.ResponseWriter, antErr *messagesResponseError) {
if antErr == nil {
return
}
Expand All @@ -415,7 +415,7 @@ func (i *interceptionBase) writeUpstreamError(w http.ResponseWriter, antErr *Err

out, err := json.Marshal(antErr)
if err != nil {
i.logger.Warn(context.Background(), "failed to marshal upstream error", slog.Error(err), slog.F("error_payload", slog.F("%+v", antErr)))
i.logger.Warn(context.Background(), "failed to marshal upstream error", slog.Error(err), slog.F("error_payload", antErr))
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good catch. I think the intention of this previous was to fmt.Sprintf("%+v", antErr); let's keep that.

// Response has to match expected format.
// See https://docs.claude.com/en/api/errors#error-shapes.
_, _ = w.Write([]byte(fmt.Sprintf(`{
Expand Down Expand Up @@ -487,7 +487,7 @@ func accumulateUsage(dest, src any) {
}
}

func getErrorResponse(err error) *ErrorResponse {
func getErrorResponse(err error) *messagesResponseError {
var apierr *anthropic.Error
if !errors.As(err, &apierr) {
return nil
Expand All @@ -505,7 +505,7 @@ func getErrorResponse(err error) *ErrorResponse {
typ = string(detail.Type)
}

return &ErrorResponse{
return &messagesResponseError{
ErrorResponse: &anthropic.ErrorResponse{
Error: anthropic.ErrorObjectUnion{
Message: msg,
Expand All @@ -517,16 +517,16 @@ func getErrorResponse(err error) *ErrorResponse {
}
}

var _ error = &ErrorResponse{}
var _ error = &messagesResponseError{}

type ErrorResponse struct {
type messagesResponseError struct {
*anthropic.ErrorResponse

StatusCode int `json:"-"`
}

func newErrorResponse(msg error) *ErrorResponse {
return &ErrorResponse{
func newErrorResponse(msg error) *messagesResponseError {
return &messagesResponseError{
ErrorResponse: &shared.ErrorResponse{
Error: shared.ErrorObjectUnion{
Message: msg.Error(),
Expand All @@ -536,7 +536,7 @@ func newErrorResponse(msg error) *ErrorResponse {
}
}

func (a *ErrorResponse) Error() string {
func (a *messagesResponseError) Error() string {
if a.ErrorResponse == nil {
return ""
}
Expand Down
12 changes: 6 additions & 6 deletions intercept/messages/streaming.go
Original file line number Diff line number Diff line change
Expand Up @@ -157,7 +157,7 @@ newStream:
for {
// TODO add outer loop span (https://github.com/coder/aibridge/issues/67)
if err := streamCtx.Err(); err != nil {
lastErr = xerrors.Errorf("stream exit: %w", err)
interceptionErr = xerrors.Errorf("stream exit: %w", err)
break
}

Expand Down Expand Up @@ -474,8 +474,8 @@ newStream:
MsgID: message.ID,
Prompt: prompt,
})
prompt = ""
promptFound = false
prompt = "" //nolint:ineffassign // reset to prevent double-recording across newStream iterations
promptFound = false //nolint:ineffassign // reset to prevent double-recording across newStream iterations
}

if events.IsStreaming() {
Expand All @@ -488,7 +488,7 @@ newStream:
logger.Warn(ctx, "anthropic stream error", slog.Error(streamErr))
interceptionErr = antErr
} else {
logger.Warn(ctx, "unknown error", slog.Error(streamErr))
logger.Warn(ctx, "unknown stream error encountered", slog.Error(streamErr))
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
logger.Warn(ctx, "unknown stream error encountered", slog.Error(streamErr))
logger.Warn(ctx, "unknown stream error", slog.Error(streamErr))

// Unfortunately, the Anthropic SDK does not support parsing errors received in the stream
// into known types (i.e. [shared.OverloadedError]).
// See https://github.com/anthropics/anthropic-sdk-go/blob/v1.12.0/packages/ssestream/ssestream.go#L172-L174
Expand All @@ -497,14 +497,14 @@ newStream:
}
} else if lastErr != nil {
// Otherwise check if any logical errors occurred during processing.
logger.Warn(ctx, "stream failed", slog.Error(lastErr))
logger.Warn(ctx, "stream processing failed", slog.Error(lastErr))
interceptionErr = newErrorResponse(xerrors.Errorf("processing error: %w", lastErr))
}

if interceptionErr != nil {
payload, err := i.marshal(interceptionErr)
if err != nil {
logger.Warn(ctx, "failed to marshal error", slog.Error(err), slog.F("error_payload", slog.F("%+v", interceptionErr)))
logger.Warn(ctx, "failed to marshal error", slog.Error(err), slog.F("error_payload", interceptionErr))
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ditto

} else if err := events.Send(streamCtx, payload); err != nil {
logger.Warn(ctx, "failed to relay error", slog.Error(err), slog.F("payload", payload))
}
Expand Down
4 changes: 4 additions & 0 deletions internal/integrationtest/circuit_breaker_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -149,12 +149,14 @@ func TestCircuitBreaker_FullRecoveryCycle(t *testing.T) {
resp := doRequest()
assert.Equal(t, http.StatusTooManyRequests, resp.StatusCode)
}
//nolint:gosec // G115: test constant, no overflow risk
assert.Equal(t, int32(cbConfig.FailureThreshold), upstreamCalls.Load())

// Phase 2: Verify circuit is open
// Request should be blocked by circuit breaker (no upstream call)
resp := doRequest()
assert.Equal(t, http.StatusServiceUnavailable, resp.StatusCode)
//nolint:gosec // G115: test constant, no overflow risk
assert.Equal(t, int32(cbConfig.FailureThreshold), upstreamCalls.Load(), "No new upstream call when circuit is open")

// Verify metrics show circuit is open
Expand Down Expand Up @@ -570,11 +572,13 @@ func TestCircuitBreaker_PerModelIsolation(t *testing.T) {
resp := doRequest("claude-sonnet-4-20250514")
assert.Equal(t, http.StatusTooManyRequests, resp.StatusCode)
}
//nolint:gosec // G115: test constant, no overflow risk
assert.Equal(t, int32(cbConfig.FailureThreshold), sonnetCalls.Load())

// Verify sonnet circuit is open
resp := doRequest("claude-sonnet-4-20250514")
assert.Equal(t, http.StatusServiceUnavailable, resp.StatusCode, "Sonnet circuit should be open")
//nolint:gosec // G115: test constant, no overflow risk
assert.Equal(t, int32(cbConfig.FailureThreshold), sonnetCalls.Load(), "No new sonnet calls when circuit is open")

// Verify sonnet metrics show circuit is open
Expand Down
2 changes: 1 addition & 1 deletion mcp/proxy_streamable_http.go
Original file line number Diff line number Diff line change
Expand Up @@ -87,7 +87,7 @@ func (p *StreamableHTTPServerProxy) Init(ctx context.Context) (outErr error) {
return xerrors.Errorf("MCP version negotiation failed; requested %q, accepts %q, received %q", version, strings.Join(mcp.ValidProtocolVersions, ","), result.ProtocolVersion)
}

p.logger.Debug(ctx, "MCP client initialized", slog.F("name", result.ServerInfo.Name), slog.F("server_version", result.ServerInfo.Version))
p.logger.Debug(ctx, "mcp client initialized", slog.F("name", result.ServerInfo.Name), slog.F("server_version", result.ServerInfo.Version))

tools, err := p.fetchTools(ctx)
if err != nil {
Expand Down
2 changes: 1 addition & 1 deletion passthrough.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ func newPassthroughRouter(prov provider.Provider, logger slog.Logger, m *metrics
// Append the request path to the upstream base path.
reqPath, err := url.JoinPath(upURL.Path, r.URL.Path)
if err != nil {
logger.Warn(ctx, "failed to join upstream path", slog.Error(err), slog.F("upstreamPath", upURL.Path), slog.F("requestPath", r.URL.Path))
logger.Warn(ctx, "failed to join upstream path", slog.Error(err), slog.F("upstream_path", upURL.Path), slog.F("request_path", r.URL.Path))
http.Error(w, "failed to join upstream path", http.StatusInternalServerError)
span.SetStatus(codes.Error, "failed to join upstream path: "+err.Error())
return
Expand Down
2 changes: 1 addition & 1 deletion provider/anthropic.go
Original file line number Diff line number Diff line change
Expand Up @@ -106,7 +106,7 @@ func (p *Anthropic) CreateInterceptor(_ http.ResponseWriter, r *http.Request, tr
path := strings.TrimPrefix(r.URL.Path, p.RoutePrefix())
if path != routeMessages {
span.SetStatus(codes.Error, "unknown route: "+r.URL.Path)
return nil, UnknownRoute
return nil, ErrUnknownRoute
}

payload, err := io.ReadAll(r.Body)
Expand Down
4 changes: 2 additions & 2 deletions provider/anthropic_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -147,7 +147,7 @@ func TestAnthropic_CreateInterceptor(t *testing.T) {
assert.Empty(t, receivedHeaders.Get("Authorization"), "client Authorization header must not reach upstream")
})

t.Run("UnknownRoute", func(t *testing.T) {
t.Run("ErrUnknownRoute", func(t *testing.T) {
t.Parallel()

body := `{"model": "claude-opus-4-5", "max_tokens": 1024, "messages": [{"role": "user", "content": "hello"}]}`
Expand All @@ -156,7 +156,7 @@ func TestAnthropic_CreateInterceptor(t *testing.T) {

interceptor, err := provider.CreateInterceptor(w, req, testTracer)

require.ErrorIs(t, err, UnknownRoute)
require.ErrorIs(t, err, ErrUnknownRoute)
require.Nil(t, interceptor)
})
}
Expand Down
2 changes: 1 addition & 1 deletion provider/copilot.go
Original file line number Diff line number Diff line change
Expand Up @@ -183,7 +183,7 @@ func (p *Copilot) CreateInterceptor(_ http.ResponseWriter, r *http.Request, trac

default:
span.SetStatus(codes.Error, "unknown route: "+r.URL.Path)
return nil, UnknownRoute
return nil, ErrUnknownRoute
}

span.SetAttributes(interceptor.TraceAttributes(r)...)
Expand Down
4 changes: 2 additions & 2 deletions provider/copilot_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -301,7 +301,7 @@ func TestCopilot_CreateInterceptor(t *testing.T) {
assert.Empty(t, receivedHeaders.Get("X-Api-Key"), "X-Api-Key must not be set upstream")
})

t.Run("UnknownRoute", func(t *testing.T) {
t.Run("ErrUnknownRoute", func(t *testing.T) {
t.Parallel()

body := `{"model": "gpt-4.1", "messages": [{"role": "user", "content": "hello"}]}`
Expand All @@ -311,7 +311,7 @@ func TestCopilot_CreateInterceptor(t *testing.T) {

interceptor, err := provider.CreateInterceptor(w, req, testTracer)

require.ErrorIs(t, err, UnknownRoute)
require.ErrorIs(t, err, ErrUnknownRoute)
require.Nil(t, interceptor)
})
}
Expand Down
2 changes: 1 addition & 1 deletion provider/openai.go
Original file line number Diff line number Diff line change
Expand Up @@ -153,7 +153,7 @@ func (p *OpenAI) CreateInterceptor(_ http.ResponseWriter, r *http.Request, trace

default:
span.SetStatus(codes.Error, "unknown route: "+r.URL.Path)
return nil, UnknownRoute
return nil, ErrUnknownRoute
}
span.SetAttributes(interceptor.TraceAttributes(r)...)
return interceptor, nil
Expand Down
2 changes: 1 addition & 1 deletion provider/provider.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ import (
"github.com/coder/aibridge/intercept"
)

var UnknownRoute = xerrors.New("unknown route")
var ErrUnknownRoute = xerrors.New("unknown route")

// Provider defines routes (bridged and passed through) for given provider.
// Bridged routes are processed by dedicated interceptors.
Expand Down
Loading