Skip to content

Commit 059c07e

Browse files
author
ccs-upstream-sync[bot]
committed
Merge remote-tracking branch 'upstream/main' into upstream-sync/20260504-0544
2 parents b054a09 + 17be644 commit 059c07e

13 files changed

Lines changed: 1483 additions & 95 deletions

Dockerfile

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -14,7 +14,7 @@ ARG BUILD_DATE=unknown
1414

1515
RUN CGO_ENABLED=0 GOOS=linux go build -ldflags="-s -w -X 'main.Version=${VERSION}-plus' -X 'main.Commit=${COMMIT}' -X 'main.BuildDate=${BUILD_DATE}'" -o ./CLIProxyAPIPlus ./cmd/server/
1616

17-
FROM alpine:3.22.0
17+
FROM alpine:3.23
1818

1919
RUN apk add --no-cache tzdata
2020

@@ -32,4 +32,4 @@ ENV TZ=Asia/Shanghai
3232

3333
RUN cp /usr/share/zoneinfo/${TZ} /etc/localtime && echo "${TZ}" > /etc/timezone
3434

35-
CMD ["./CLIProxyAPIPlus"]
35+
CMD ["./CLIProxyAPI"]

internal/runtime/executor/claude_executor.go

Lines changed: 62 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -285,6 +285,10 @@ func (e *ClaudeExecutor) Execute(ctx context.Context, auth *cliproxyauth.Auth, r
285285
}
286286
helps.AppendAPIResponseChunk(ctx, e.cfg, data)
287287
if stream {
288+
if errValidate := validateClaudeStreamingResponse(data); errValidate != nil {
289+
helps.RecordAPIResponseError(ctx, e.cfg, errValidate)
290+
return resp, errValidate
291+
}
288292
lines := bytes.Split(data, []byte("\n"))
289293
for _, line := range lines {
290294
if detail, ok := helps.ParseClaudeStreamUsage(line); ok {
@@ -533,6 +537,64 @@ func (e *ClaudeExecutor) ExecuteStream(ctx context.Context, auth *cliproxyauth.A
533537
return &cliproxyexecutor.StreamResult{Headers: httpResp.Header.Clone(), Chunks: out}, nil
534538
}
535539

540+
func validateClaudeStreamingResponse(data []byte) error {
541+
scanner := bufio.NewScanner(bytes.NewReader(data))
542+
scanner.Buffer(nil, 52_428_800)
543+
544+
hasData := false
545+
hasMessageStart := false
546+
hasMessageDelta := false
547+
548+
for scanner.Scan() {
549+
line := bytes.TrimSpace(scanner.Bytes())
550+
if len(line) == 0 || !bytes.HasPrefix(line, []byte("data:")) {
551+
continue
552+
}
553+
payload := bytes.TrimSpace(line[len("data:"):])
554+
if len(payload) == 0 || bytes.Equal(payload, []byte("[DONE]")) {
555+
continue
556+
}
557+
hasData = true
558+
if !gjson.ValidBytes(payload) {
559+
return statusErr{code: http.StatusBadGateway, msg: "claude executor: upstream returned malformed stream data"}
560+
}
561+
562+
root := gjson.ParseBytes(payload)
563+
switch root.Get("type").String() {
564+
case "error":
565+
message := strings.TrimSpace(root.Get("error.message").String())
566+
if message == "" {
567+
message = strings.TrimSpace(root.Get("error.type").String())
568+
}
569+
if message == "" {
570+
message = "unknown upstream error"
571+
}
572+
return statusErr{code: http.StatusBadGateway, msg: "claude executor: upstream returned error event: " + message}
573+
case "message_start":
574+
message := root.Get("message")
575+
if strings.TrimSpace(message.Get("id").String()) == "" || strings.TrimSpace(message.Get("model").String()) == "" {
576+
return statusErr{code: http.StatusBadGateway, msg: "claude executor: upstream stream message_start is missing id or model"}
577+
}
578+
hasMessageStart = true
579+
case "message_delta":
580+
hasMessageDelta = true
581+
}
582+
}
583+
if errScan := scanner.Err(); errScan != nil {
584+
return errScan
585+
}
586+
if !hasData {
587+
return statusErr{code: http.StatusBadGateway, msg: "claude executor: upstream returned empty stream response"}
588+
}
589+
if !hasMessageStart {
590+
return statusErr{code: http.StatusBadGateway, msg: "claude executor: upstream stream response is missing message_start"}
591+
}
592+
if !hasMessageDelta {
593+
return statusErr{code: http.StatusBadGateway, msg: "claude executor: upstream stream response ended before message completion"}
594+
}
595+
return nil
596+
}
597+
536598
func (e *ClaudeExecutor) CountTokens(ctx context.Context, auth *cliproxyauth.Auth, req cliproxyexecutor.Request, opts cliproxyexecutor.Options) (cliproxyexecutor.Response, error) {
537599
baseModel := thinking.ParseSuffix(req.Model).ModelName
538600

internal/runtime/executor/claude_executor_test.go

Lines changed: 107 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -936,6 +936,113 @@ func TestClaudeExecutor_GeneratesNewUserIDByDefault(t *testing.T) {
936936
}
937937
}
938938

939+
func TestClaudeExecutor_ExecuteOpenAINonStreamRejectsEmptyClaudeStream(t *testing.T) {
940+
_, err := executeOpenAIChatCompletionThroughClaude(t, "")
941+
if err == nil {
942+
t.Fatal("Execute error = nil, want empty stream error")
943+
}
944+
assertStatusErr(t, err, http.StatusBadGateway)
945+
if !strings.Contains(err.Error(), "empty stream response") {
946+
t.Fatalf("Execute error = %q, want empty stream response", err.Error())
947+
}
948+
}
949+
950+
func TestClaudeExecutor_ExecuteOpenAINonStreamRejectsClaudeErrorEvent(t *testing.T) {
951+
body := `data: {"type":"error","error":{"type":"overloaded_error","message":"upstream overloaded"}}` + "\n"
952+
_, err := executeOpenAIChatCompletionThroughClaude(t, body)
953+
if err == nil {
954+
t.Fatal("Execute error = nil, want upstream error event")
955+
}
956+
assertStatusErr(t, err, http.StatusBadGateway)
957+
if !strings.Contains(err.Error(), "upstream overloaded") {
958+
t.Fatalf("Execute error = %q, want upstream overloaded", err.Error())
959+
}
960+
}
961+
962+
func TestClaudeExecutor_ExecuteOpenAINonStreamRejectsIncompleteClaudeStream(t *testing.T) {
963+
body := strings.Join([]string{
964+
`data: {"type":"message_start","message":{"id":"msg_123","model":"claude-3-5-sonnet-20241022"}}`,
965+
`data: {"type":"message_stop"}`,
966+
``,
967+
}, "\n")
968+
969+
_, err := executeOpenAIChatCompletionThroughClaude(t, body)
970+
if err == nil {
971+
t.Fatal("Execute error = nil, want incomplete stream error")
972+
}
973+
assertStatusErr(t, err, http.StatusBadGateway)
974+
if !strings.Contains(err.Error(), "ended before message completion") {
975+
t.Fatalf("Execute error = %q, want incomplete stream error", err.Error())
976+
}
977+
}
978+
979+
func TestClaudeExecutor_ExecuteOpenAINonStreamConvertsValidClaudeStream(t *testing.T) {
980+
body := strings.Join([]string{
981+
`event: message_start`,
982+
`data: {"type":"message_start","message":{"id":"msg_123","model":"claude-3-5-sonnet-20241022"}}`,
983+
`event: content_block_delta`,
984+
`data: {"type":"content_block_delta","index":0,"delta":{"type":"text_delta","text":"ok"}}`,
985+
`event: message_delta`,
986+
`data: {"type":"message_delta","delta":{"stop_reason":"end_turn"},"usage":{"input_tokens":2,"output_tokens":1}}`,
987+
`event: message_stop`,
988+
`data: {"type":"message_stop"}`,
989+
``,
990+
}, "\n")
991+
992+
resp, err := executeOpenAIChatCompletionThroughClaude(t, body)
993+
if err != nil {
994+
t.Fatalf("Execute error: %v", err)
995+
}
996+
if got := gjson.GetBytes(resp.Payload, "id").String(); got != "msg_123" {
997+
t.Fatalf("response id = %q, want msg_123; payload=%s", got, string(resp.Payload))
998+
}
999+
if got := gjson.GetBytes(resp.Payload, "model").String(); got != "claude-3-5-sonnet-20241022" {
1000+
t.Fatalf("response model = %q, want claude-3-5-sonnet-20241022", got)
1001+
}
1002+
if got := gjson.GetBytes(resp.Payload, "choices.0.message.content").String(); got != "ok" {
1003+
t.Fatalf("response content = %q, want ok", got)
1004+
}
1005+
if got := gjson.GetBytes(resp.Payload, "usage.total_tokens").Int(); got != 3 {
1006+
t.Fatalf("usage.total_tokens = %d, want 3", got)
1007+
}
1008+
}
1009+
1010+
func executeOpenAIChatCompletionThroughClaude(t *testing.T, upstreamBody string) (cliproxyexecutor.Response, error) {
1011+
t.Helper()
1012+
1013+
server := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
1014+
w.Header().Set("Content-Type", "text/event-stream")
1015+
_, _ = w.Write([]byte(upstreamBody))
1016+
}))
1017+
defer server.Close()
1018+
1019+
executor := NewClaudeExecutor(&config.Config{})
1020+
auth := &cliproxyauth.Auth{Attributes: map[string]string{
1021+
"api_key": "key-123",
1022+
"base_url": server.URL,
1023+
}}
1024+
payload := []byte(`{"model":"claude-3-5-sonnet-20241022","messages":[{"role":"user","content":"hi"}]}`)
1025+
1026+
return executor.Execute(context.Background(), auth, cliproxyexecutor.Request{
1027+
Model: "claude-3-5-sonnet-20241022",
1028+
Payload: payload,
1029+
}, cliproxyexecutor.Options{
1030+
SourceFormat: sdktranslator.FromString("openai"),
1031+
})
1032+
}
1033+
1034+
func assertStatusErr(t *testing.T, err error, want int) {
1035+
t.Helper()
1036+
1037+
status, ok := err.(interface{ StatusCode() int })
1038+
if !ok {
1039+
t.Fatalf("error %T does not expose StatusCode", err)
1040+
}
1041+
if got := status.StatusCode(); got != want {
1042+
t.Fatalf("StatusCode() = %d, want %d", got, want)
1043+
}
1044+
}
1045+
9391046
func TestStripClaudeToolPrefixFromResponse_NestedToolReference(t *testing.T) {
9401047
input := []byte(`{"content":[{"type":"tool_result","tool_use_id":"toolu_123","content":[{"type":"tool_reference","tool_name":"proxy_mcp__nia__manage_resource"}]}]}`)
9411048
out := stripClaudeToolPrefixFromResponse(input, "proxy_")

internal/runtime/executor/kimi_executor.go

Lines changed: 101 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -322,7 +322,17 @@ func normalizeKimiToolMessageLinks(body []byte) ([]byte, error) {
322322
return body, nil
323323
}
324324

325-
out := body
325+
msgs := messages.Array()
326+
out, dropped, err := filterKimiEmptyAssistantMessages(body, msgs)
327+
if err != nil {
328+
return body, err
329+
}
330+
if dropped > 0 {
331+
log.WithField("dropped_assistant_messages", dropped).Debug("kimi executor: dropped empty assistant messages")
332+
}
333+
334+
messages = gjson.GetBytes(out, "messages")
335+
msgs = messages.Array()
326336
pending := make([]string, 0)
327337
patched := 0
328338
patchedReasoning := 0
@@ -340,7 +350,6 @@ func normalizeKimiToolMessageLinks(body []byte) ([]byte, error) {
340350
}
341351
}
342352

343-
msgs := messages.Array()
344353
for msgIdx := range msgs {
345354
msg := msgs[msgIdx]
346355
role := strings.TrimSpace(msg.Get("role").String())
@@ -428,6 +437,96 @@ func normalizeKimiToolMessageLinks(body []byte) ([]byte, error) {
428437
return out, nil
429438
}
430439

440+
func filterKimiEmptyAssistantMessages(body []byte, msgs []gjson.Result) ([]byte, int, error) {
441+
kept := make([]string, 0, len(msgs))
442+
dropped := 0
443+
for _, msg := range msgs {
444+
if shouldDropKimiAssistantMessage(msg) {
445+
dropped++
446+
continue
447+
}
448+
kept = append(kept, msg.Raw)
449+
}
450+
if dropped == 0 {
451+
return body, 0, nil
452+
}
453+
454+
rawMessages := []byte("[" + strings.Join(kept, ",") + "]")
455+
out, err := sjson.SetRawBytes(body, "messages", rawMessages)
456+
if err != nil {
457+
return body, 0, fmt.Errorf("kimi executor: failed to drop empty assistant messages: %w", err)
458+
}
459+
return out, dropped, nil
460+
}
461+
462+
func shouldDropKimiAssistantMessage(msg gjson.Result) bool {
463+
if strings.TrimSpace(msg.Get("role").String()) != "assistant" {
464+
return false
465+
}
466+
if hasKimiToolCalls(msg) || hasKimiLegacyFunctionCall(msg) || hasKimiAssistantReasoning(msg) {
467+
return false
468+
}
469+
return isKimiAssistantContentEmpty(msg.Get("content"))
470+
}
471+
472+
func hasKimiToolCalls(msg gjson.Result) bool {
473+
toolCalls := msg.Get("tool_calls")
474+
return toolCalls.Exists() && toolCalls.IsArray() && len(toolCalls.Array()) > 0
475+
}
476+
477+
func hasKimiLegacyFunctionCall(msg gjson.Result) bool {
478+
functionCall := msg.Get("function_call")
479+
if !functionCall.Exists() || functionCall.Type == gjson.Null {
480+
return false
481+
}
482+
if functionCall.IsObject() && strings.TrimSpace(functionCall.Raw) == "{}" {
483+
return false
484+
}
485+
return strings.TrimSpace(functionCall.Raw) != ""
486+
}
487+
488+
func hasKimiAssistantReasoning(msg gjson.Result) bool {
489+
reasoning := msg.Get("reasoning_content")
490+
return reasoning.Exists() && strings.TrimSpace(reasoning.String()) != ""
491+
}
492+
493+
func isKimiAssistantContentEmpty(content gjson.Result) bool {
494+
if !content.Exists() || content.Type == gjson.Null {
495+
return true
496+
}
497+
if content.Type == gjson.String {
498+
return strings.TrimSpace(content.String()) == ""
499+
}
500+
if !content.IsArray() {
501+
return false
502+
}
503+
for _, part := range content.Array() {
504+
if !isKimiAssistantContentPartEmpty(part) {
505+
return false
506+
}
507+
}
508+
return true
509+
}
510+
511+
func isKimiAssistantContentPartEmpty(part gjson.Result) bool {
512+
if !part.Exists() || part.Type == gjson.Null {
513+
return true
514+
}
515+
if part.Type == gjson.String {
516+
return strings.TrimSpace(part.String()) == ""
517+
}
518+
if !part.IsObject() {
519+
return false
520+
}
521+
if text := part.Get("text"); text.Exists() {
522+
return strings.TrimSpace(text.String()) == ""
523+
}
524+
if strings.TrimSpace(part.Get("type").String()) == "text" {
525+
return true
526+
}
527+
return strings.TrimSpace(part.Raw) == "{}"
528+
}
529+
431530
func fallbackAssistantReasoning(msg gjson.Result, hasLatest bool, latest string) string {
432531
if hasLatest && strings.TrimSpace(latest) != "" {
433532
return latest

0 commit comments

Comments
 (0)