Skip to content

Commit f5e7578

Browse files
committed
chore: lint fix1
1 parent a599656 commit f5e7578

File tree

16 files changed

+156
-166
lines changed

16 files changed

+156
-166
lines changed

fixtures/fixtures.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -92,7 +92,7 @@ var (
9292
OaiResponsesBlockingConversation []byte
9393

9494
//go:embed openai/responses/blocking/http_error.txtar
95-
OaiResponsesBlockingHttpErr []byte
95+
OaiResponsesBlockingHTTPErr []byte
9696

9797
//go:embed openai/responses/blocking/prev_response_id.txtar
9898
OaiResponsesBlockingPrevResponseID []byte
@@ -139,7 +139,7 @@ var (
139139
OaiResponsesStreamingConversation []byte
140140

141141
//go:embed openai/responses/streaming/http_error.txtar
142-
OaiResponsesStreamingHttpErr []byte
142+
OaiResponsesStreamingHTTPErr []byte
143143

144144
//go:embed openai/responses/streaming/prev_response_id.txtar
145145
OaiResponsesStreamingPrevResponseID []byte

intercept/apidump/apidump.go

Lines changed: 11 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -131,21 +131,21 @@ func (d *dumper) dumpResponse(resp *http.Response) error {
131131
return xerrors.Errorf("write response header terminator: %w", err)
132132
}
133133

134-
// Wrap the response body to capture it as it streams
135-
if resp.Body != nil {
136-
resp.Body = &streamingBodyDumper{
137-
body: resp.Body,
138-
dumpPath: dumpPath,
139-
headerData: headerBuf.Bytes(),
140-
logger: func(err error) {
141-
d.logger.Named("apidump").Warn(context.Background(), "failed to initialize response dump", slog.Error(err))
142-
},
143-
}
144-
} else {
134+
if resp.Body == nil {
145135
// No body, just write headers
146136
return os.WriteFile(dumpPath, headerBuf.Bytes(), 0o644)
147137
}
148138

139+
// Wrap the response body to capture it as it streams
140+
resp.Body = &streamingBodyDumper{
141+
body: resp.Body,
142+
dumpPath: dumpPath,
143+
headerData: headerBuf.Bytes(),
144+
logger: func(err error) {
145+
d.logger.Named("apidump").Warn(context.Background(), "failed to initialize response dump", slog.Error(err))
146+
},
147+
}
148+
149149
return nil
150150
}
151151

intercept/chatcompletions/base.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -115,7 +115,7 @@ func (i *interceptionBase) Model() string {
115115
return "coder-aibridge-unknown"
116116
}
117117

118-
return string(i.req.Model)
118+
return i.req.Model
119119
}
120120

121121
func (i *interceptionBase) newErrorResponse(err error) map[string]any {

intercept/chatcompletions/paramswrap_test.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -144,7 +144,7 @@ func generatePayload(messageCount int) []byte {
144144
}
145145
// Use realistic message content size
146146
content := fmt.Sprintf("This is message number %d with some realistic content that might appear in a conversation.", i+1)
147-
messages = append(messages, fmt.Sprintf(`{"role": "%s", "content": "%s"}`, role, content))
147+
messages = append(messages, fmt.Sprintf(`{"role": %q, "content": %q}`, role, content))
148148
}
149149

150150
return []byte(fmt.Sprintf(`{

intercept/chatcompletions/streaming.go

Lines changed: 34 additions & 36 deletions
Original file line numberDiff line numberDiff line change
@@ -190,16 +190,14 @@ func (i *StreamingInterception) ProcessRequest(w http.ResponseWriter, r *http.Re
190190
})
191191

192192
toolCall = nil
193-
} else {
193+
} else if stream.Err() == nil {
194194
// When the provider responds with only tool calls (no text content),
195195
// no chunks are relayed to the client, so the stream is not yet
196196
// initiated. Initiate it here so the SSE headers are sent and the
197197
// ping ticker is started, preventing client timeout during tool invocation.
198198
// Only initiate if no stream error, if there's an error, we'll return
199199
// an HTTP error response instead of starting an SSE stream.
200-
if stream.Err() == nil {
201-
events.InitiateStream(w)
202-
}
200+
events.InitiateStream(w)
203201
}
204202
}
205203

@@ -232,43 +230,43 @@ func (i *StreamingInterception) ProcessRequest(w http.ResponseWriter, r *http.Re
232230
})
233231
}
234232

235-
if events.IsStreaming() {
236-
// Check if the stream encountered any errors.
237-
if streamErr := stream.Err(); streamErr != nil {
238-
if eventstream.IsUnrecoverableError(streamErr) {
239-
logger.Debug(ctx, "stream terminated", slog.Error(streamErr))
240-
// We can't reflect an error back if there's a connection error or the request context was canceled.
241-
} else if oaiErr := getErrorResponse(streamErr); oaiErr != nil {
242-
logger.Warn(ctx, "openai stream error", slog.Error(streamErr))
243-
interceptionErr = oaiErr
244-
} else {
245-
logger.Warn(ctx, "unknown error", slog.Error(streamErr))
246-
// Unfortunately, the OpenAI SDK does not support parsing errors received in the stream
247-
// into known types (i.e. [shared.OverloadedError]).
248-
// See https://github.com/openai/openai-go/blob/v2.7.0/packages/ssestream/ssestream.go#L171
249-
// All it does is wrap the payload in an error - which is all we can return, currently.
250-
interceptionErr = newErrorResponse(xerrors.Errorf("unknown stream error: %w", streamErr))
251-
}
252-
} else if lastErr != nil {
253-
// Otherwise check if any logical errors occurred during processing.
254-
logger.Warn(ctx, "stream failed", slog.Error(lastErr))
255-
interceptionErr = newErrorResponse(xerrors.Errorf("processing error: %w", lastErr))
256-
}
257-
258-
if interceptionErr != nil {
259-
payload, err := i.marshalErr(interceptionErr)
260-
if err != nil {
261-
logger.Warn(ctx, "failed to marshal error", slog.Error(err), slog.F("error_payload", slog.F("%+v", interceptionErr)))
262-
} else if err := events.Send(streamCtx, payload); err != nil {
263-
logger.Warn(ctx, "failed to relay error", slog.Error(err), slog.F("payload", payload))
264-
}
265-
}
266-
} else {
233+
if !events.IsStreaming() {
267234
// response/downstream Stream has not started yet; write error response and exit.
268235
i.writeUpstreamError(w, getErrorResponse(stream.Err()))
269236
return stream.Err()
270237
}
271238

239+
// Check if the stream encountered any errors.
240+
if streamErr := stream.Err(); streamErr != nil {
241+
if eventstream.IsUnrecoverableError(streamErr) {
242+
logger.Debug(ctx, "stream terminated", slog.Error(streamErr))
243+
// We can't reflect an error back if there's a connection error or the request context was canceled.
244+
} else if oaiErr := getErrorResponse(streamErr); oaiErr != nil {
245+
logger.Warn(ctx, "openai stream error", slog.Error(streamErr))
246+
interceptionErr = oaiErr
247+
} else {
248+
logger.Warn(ctx, "unknown error", slog.Error(streamErr))
249+
// Unfortunately, the OpenAI SDK does not support parsing errors received in the stream
250+
// into known types (i.e. [shared.OverloadedError]).
251+
// See https://github.com/openai/openai-go/blob/v2.7.0/packages/ssestream/ssestream.go#L171
252+
// All it does is wrap the payload in an error - which is all we can return, currently.
253+
interceptionErr = newErrorResponse(xerrors.Errorf("unknown stream error: %w", streamErr))
254+
}
255+
} else if lastErr != nil {
256+
// Otherwise check if any logical errors occurred during processing.
257+
logger.Warn(ctx, "stream failed", slog.Error(lastErr))
258+
interceptionErr = newErrorResponse(xerrors.Errorf("processing error: %w", lastErr))
259+
}
260+
261+
if interceptionErr != nil {
262+
payload, err := i.marshalErr(interceptionErr)
263+
if err != nil {
264+
logger.Warn(ctx, "failed to marshal error", slog.Error(err), slog.F("error_payload", slog.F("%+v", interceptionErr)))
265+
} else if err := events.Send(streamCtx, payload); err != nil {
266+
logger.Warn(ctx, "failed to relay error", slog.Error(err), slog.F("payload", payload))
267+
}
268+
}
269+
272270
// No tool call, nothing more to do.
273271
if toolCall == nil {
274272
break

intercept/eventstream/eventstream.go

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -240,8 +240,7 @@ func flush(w http.ResponseWriter) (err error) {
240240
}
241241

242242
defer func() {
243-
if r := recover(); r != nil {
244-
// Likely a broken connection, don't spam the logs.
243+
if r := recover(); r != nil { //nolint:revive // Intentionally swallowed; likely a broken connection.
245244
}
246245
}()
247246

intercept/messages/base.go

Lines changed: 8 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -183,17 +183,15 @@ func (i *interceptionBase) extractModelThoughts(msg *anthropic.Message) []*recor
183183

184184
var thoughtRecords []*recorder.ModelThoughtRecord
185185
for _, block := range msg.Content {
186-
switch variant := block.AsAny().(type) {
187-
case anthropic.ThinkingBlock:
188-
if variant.Thinking == "" {
189-
continue
190-
}
191-
thoughtRecords = append(thoughtRecords, &recorder.ModelThoughtRecord{
192-
Content: variant.Thinking,
193-
Metadata: recorder.Metadata{"source": recorder.ThoughtSourceThinking},
194-
})
195-
}
196186
// anthropic.RedactedThinkingBlock also exists, but there's nothing useful we can capture.
187+
variant, ok := block.AsAny().(anthropic.ThinkingBlock)
188+
if !ok || variant.Thinking == "" {
189+
continue
190+
}
191+
thoughtRecords = append(thoughtRecords, &recorder.ModelThoughtRecord{
192+
Content: variant.Thinking,
193+
Metadata: recorder.Metadata{"source": recorder.ThoughtSourceThinking},
194+
})
197195
}
198196
return thoughtRecords
199197
}

intercept/messages/streaming.go

Lines changed: 20 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -179,8 +179,7 @@ newStream:
179179
// Tool-related handling.
180180
switch event.Type {
181181
case string(constant.ValueOf[constant.ContentBlockStart]()):
182-
switch block := event.AsContentBlockStart().ContentBlock.AsAny().(type) {
183-
case anthropic.ToolUseBlock:
182+
if block, ok := event.AsContentBlockStart().ContentBlock.AsAny().(anthropic.ToolUseBlock); ok {
184183
lastToolName = block.Name
185184

186185
if i.mcpProxy != nil && i.mcpProxy.GetTool(block.Name) != nil {
@@ -307,8 +306,7 @@ newStream:
307306
foundTools int
308307
)
309308
for _, block := range message.Content {
310-
switch variant := block.AsAny().(type) {
311-
case anthropic.ToolUseBlock:
309+
if variant, ok := block.AsAny().(anthropic.ToolUseBlock); ok {
312310
foundTools++
313311
if variant.Name == name {
314312
input = variant.Input
@@ -431,24 +429,23 @@ newStream:
431429
// Causes a new stream to be run with updated messages.
432430
isFirst = false
433431
continue newStream
434-
} else {
435-
// Find all the non-injected tools and track their uses.
436-
for _, block := range message.Content {
437-
switch variant := block.AsAny().(type) {
438-
case anthropic.ToolUseBlock:
439-
if i.mcpProxy != nil && i.mcpProxy.GetTool(variant.Name) != nil {
440-
continue
441-
}
432+
}
442433

443-
_ = i.recorder.RecordToolUsage(streamCtx, &recorder.ToolUsageRecord{
444-
InterceptionID: i.ID().String(),
445-
MsgID: message.ID,
446-
ToolCallID: variant.ID,
447-
Tool: variant.Name,
448-
Args: variant.Input,
449-
Injected: false,
450-
})
434+
// Find all the non-injected tools and track their uses.
435+
for _, block := range message.Content {
436+
if variant, ok := block.AsAny().(anthropic.ToolUseBlock); ok {
437+
if i.mcpProxy != nil && i.mcpProxy.GetTool(variant.Name) != nil {
438+
continue
451439
}
440+
441+
_ = i.recorder.RecordToolUsage(streamCtx, &recorder.ToolUsageRecord{
442+
InterceptionID: i.ID().String(),
443+
MsgID: message.ID,
444+
ToolCallID: variant.ID,
445+
Tool: variant.Name,
446+
Args: variant.Input,
447+
Injected: false,
448+
})
452449
}
453450
}
454451
}
@@ -464,11 +461,10 @@ newStream:
464461
if eventstream.IsUnrecoverableError(err) {
465462
logger.Debug(ctx, "processing terminated", slog.Error(err))
466463
break // Stop processing if client disconnected or context canceled.
467-
} else {
468-
logger.Warn(ctx, "failed to relay event", slog.Error(err))
469-
lastErr = xerrors.Errorf("relay event: %w", err)
470-
break
471464
}
465+
logger.Warn(ctx, "failed to relay event", slog.Error(err))
466+
lastErr = xerrors.Errorf("relay event: %w", err)
467+
break
472468
}
473469
}
474470

intercept/responses/base.go

Lines changed: 8 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -222,15 +222,15 @@ func (i *responsesInterceptionBase) recordNonInjectedToolUsage(ctx context.Conte
222222

223223
func (i *responsesInterceptionBase) parseFunctionCallJSONArgs(ctx context.Context, raw string) recorder.ToolArgs {
224224
trimmed := strings.TrimSpace(raw)
225-
if trimmed != "" {
226-
var args recorder.ToolArgs
227-
if err := json.Unmarshal([]byte(trimmed), &args); err != nil {
228-
i.logger.Warn(ctx, "failed to unmarshal tool args", slog.Error(err))
229-
} else {
230-
return args
231-
}
225+
if trimmed == "" {
226+
return trimmed
227+
}
228+
var args recorder.ToolArgs
229+
if err := json.Unmarshal([]byte(trimmed), &args); err != nil {
230+
i.logger.Warn(ctx, "failed to unmarshal tool args", slog.Error(err))
231+
return trimmed
232232
}
233-
return trimmed
233+
return args
234234
}
235235

236236
func (i *responsesInterceptionBase) recordTokenUsage(ctx context.Context, response *responses.Response) {

internal/integrationtest/circuit_breaker_test.go

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -28,11 +28,11 @@ const (
2828
)
2929

3030
func anthropicSuccessResponse(model string) string {
31-
return fmt.Sprintf(`{"id":"msg_01","type":"message","role":"assistant","content":[{"type":"text","text":"Hello!"}],"model":"%s","stop_reason":"end_turn","usage":{"input_tokens":10,"output_tokens":5}}`, model)
31+
return fmt.Sprintf(`{"id":"msg_01","type":"message","role":"assistant","content":[{"type":"text","text":"Hello!"}],"model":%q,"stop_reason":"end_turn","usage":{"input_tokens":10,"output_tokens":5}}`, model)
3232
}
3333

3434
func openAISuccessResponse(model string) string {
35-
return fmt.Sprintf(`{"id":"chatcmpl-123","object":"chat.completion","created":1677652288,"model":"%s","choices":[{"index":0,"message":{"role":"assistant","content":"Hello!"},"finish_reason":"stop"}],"usage":{"prompt_tokens":9,"completion_tokens":12,"total_tokens":21}}`, model)
35+
return fmt.Sprintf(`{"id":"chatcmpl-123","object":"chat.completion","created":1677652288,"model":%q,"choices":[{"index":0,"message":{"role":"assistant","content":"Hello!"},"finish_reason":"stop"}],"usage":{"prompt_tokens":9,"completion_tokens":12,"total_tokens":21}}`, model)
3636
}
3737

3838
// TestCircuitBreaker_FullRecoveryCycle tests the complete circuit breaker lifecycle:
@@ -555,7 +555,7 @@ func TestCircuitBreaker_PerModelIsolation(t *testing.T) {
555555
)
556556

557557
doRequest := func(model string) *http.Response {
558-
body := fmt.Sprintf(`{"model":"%s","max_tokens":1024,"messages":[{"role":"user","content":"hi"}]}`, model)
558+
body := fmt.Sprintf(`{"model":%q,"max_tokens":1024,"messages":[{"role":"user","content":"hi"}]}`, model)
559559
resp := bridgeServer.makeRequest(t, http.MethodPost, pathAnthropicMessages, []byte(body), http.Header{
560560
"x-api-key": {"test"},
561561
"anthropic-version": {"2023-06-01"},

0 commit comments

Comments
 (0)