Skip to content

Commit 813ed4f

Browse files
Merge upstream/main (auto-sync feat/copilot)
- 959067e feat(usage): introduce executor type tracking in usage reporting - f353979 feat(watcher, redisqueue): add usage refresh notification support
2 parents 861b009 + f353979 commit 813ed4f

22 files changed

Lines changed: 333 additions & 54 deletions

internal/api/redis_queue_protocol_integration_test.go

Lines changed: 124 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -159,6 +159,68 @@ func readRESPArrayOfBulkStrings(r *bufio.Reader) ([][]byte, error) {
159159
return out, nil
160160
}
161161

162+
func readTestRESPPubSubSubscribe(r *bufio.Reader) (string, int, error) {
163+
prefix, errRead := r.ReadByte()
164+
if errRead != nil {
165+
return "", 0, errRead
166+
}
167+
if prefix != '*' {
168+
return "", 0, fmt.Errorf("expected array prefix '*', got %q", prefix)
169+
}
170+
line, errLine := readTestRESPLine(r)
171+
if errLine != nil {
172+
return "", 0, errLine
173+
}
174+
count, errParse := strconv.Atoi(line)
175+
if errParse != nil {
176+
return "", 0, fmt.Errorf("invalid array length %q: %v", line, errParse)
177+
}
178+
if count != 3 {
179+
return "", 0, fmt.Errorf("subscribe ack length = %d, want 3", count)
180+
}
181+
kind, errKind := readTestRESPBulkString(r)
182+
if errKind != nil {
183+
return "", 0, errKind
184+
}
185+
if string(kind) != "subscribe" {
186+
return "", 0, fmt.Errorf("subscribe ack kind = %q", string(kind))
187+
}
188+
channel, errChannel := readTestRESPBulkString(r)
189+
if errChannel != nil {
190+
return "", 0, errChannel
191+
}
192+
prefix, errRead = r.ReadByte()
193+
if errRead != nil {
194+
return "", 0, errRead
195+
}
196+
if prefix != ':' {
197+
return "", 0, fmt.Errorf("expected integer prefix ':', got %q", prefix)
198+
}
199+
line, errLine = readTestRESPLine(r)
200+
if errLine != nil {
201+
return "", 0, errLine
202+
}
203+
subscriptions, errParse := strconv.Atoi(line)
204+
if errParse != nil {
205+
return "", 0, fmt.Errorf("invalid subscription count %q: %v", line, errParse)
206+
}
207+
return string(channel), subscriptions, nil
208+
}
209+
210+
func readTestRESPPubSubMessage(r *bufio.Reader) (string, []byte, error) {
211+
items, errItems := readRESPArrayOfBulkStrings(r)
212+
if errItems != nil {
213+
return "", nil, errItems
214+
}
215+
if len(items) != 3 {
216+
return "", nil, fmt.Errorf("pubsub message length = %d, want 3", len(items))
217+
}
218+
if string(items[0]) != "message" {
219+
return "", nil, fmt.Errorf("pubsub message kind = %q", string(items[0]))
220+
}
221+
return string(items[1]), items[2], nil
222+
}
223+
162224
func TestRedisProtocol_ManagementDisabled_RejectsConnection(t *testing.T) {
163225
t.Setenv("MANAGEMENT_PASSWORD", "")
164226
redisqueue.SetEnabled(false)
@@ -235,6 +297,68 @@ func TestRedisProtocol_HomeEnabled_DisablesConnection(t *testing.T) {
235297
}
236298
}
237299

300+
func TestRedisProtocol_SUBSCRIBE_UsageSendsSupportRefresh(t *testing.T) {
301+
const managementPassword = "test-management-password"
302+
303+
t.Setenv("MANAGEMENT_PASSWORD", managementPassword)
304+
redisqueue.SetEnabled(false)
305+
t.Cleanup(func() { redisqueue.SetEnabled(false) })
306+
307+
server := newTestServer(t)
308+
if !server.managementRoutesEnabled.Load() {
309+
t.Fatalf("expected managementRoutesEnabled to be true")
310+
}
311+
312+
addr, stop := startRedisMuxListener(t, server)
313+
t.Cleanup(stop)
314+
315+
conn, errDial := net.DialTimeout("tcp", addr, time.Second)
316+
if errDial != nil {
317+
t.Fatalf("failed to dial redis listener: %v", errDial)
318+
}
319+
t.Cleanup(func() { _ = conn.Close() })
320+
321+
reader := bufio.NewReader(conn)
322+
_ = conn.SetDeadline(time.Now().Add(5 * time.Second))
323+
324+
if errWrite := writeTestRESPCommand(conn, "AUTH", managementPassword); errWrite != nil {
325+
t.Fatalf("failed to write AUTH command: %v", errWrite)
326+
}
327+
if msg, errRead := readTestRESPSimpleString(reader); errRead != nil {
328+
t.Fatalf("failed to read AUTH response: %v", errRead)
329+
} else if msg != "OK" {
330+
t.Fatalf("unexpected AUTH response: %q", msg)
331+
}
332+
333+
if errWrite := writeTestRESPCommand(conn, "SUBSCRIBE", "usage"); errWrite != nil {
334+
t.Fatalf("failed to write SUBSCRIBE command: %v", errWrite)
335+
}
336+
channel, subscriptions, errSubscribe := readTestRESPPubSubSubscribe(reader)
337+
if errSubscribe != nil {
338+
t.Fatalf("failed to read subscribe response: %v", errSubscribe)
339+
}
340+
if channel != "usage" || subscriptions != 1 {
341+
t.Fatalf("unexpected subscribe response channel=%q subscriptions=%d", channel, subscriptions)
342+
}
343+
344+
channel, payload, errMessage := readTestRESPPubSubMessage(reader)
345+
if errMessage != nil {
346+
t.Fatalf("failed to read support refresh message: %v", errMessage)
347+
}
348+
if channel != "usage" || string(payload) != `{"support_refresh":true}` {
349+
t.Fatalf("unexpected support refresh message channel=%q payload=%q", channel, string(payload))
350+
}
351+
352+
redisqueue.Enqueue([]byte(`{"id":1}`))
353+
channel, payload, errMessage = readTestRESPPubSubMessage(reader)
354+
if errMessage != nil {
355+
t.Fatalf("failed to read usage message: %v", errMessage)
356+
}
357+
if channel != "usage" || string(payload) != `{"id":1}` {
358+
t.Fatalf("unexpected usage message channel=%q payload=%q", channel, string(payload))
359+
}
360+
}
361+
238362
func TestRedisProtocol_AUTH_And_PopContracts(t *testing.T) {
239363
const managementPassword = "test-management-password"
240364

internal/redisqueue/plugin.go

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -42,6 +42,10 @@ func (p *usageQueuePlugin) HandleUsage(ctx context.Context, record coreusage.Rec
4242
if provider == "" {
4343
provider = "unknown"
4444
}
45+
executorType := strings.TrimSpace(record.ExecutorType)
46+
if executorType == "" {
47+
executorType = "unknown"
48+
}
4549
authType := strings.TrimSpace(record.AuthType)
4650
if authType == "" {
4751
authType = "unknown"
@@ -94,6 +98,7 @@ func (p *usageQueuePlugin) HandleUsage(ctx context.Context, record coreusage.Rec
9498
payload, err := json.Marshal(queuedUsageDetail{
9599
requestDetail: detail,
96100
Provider: provider,
101+
ExecutorType: executorType,
97102
Model: modelName,
98103
Alias: aliasName,
99104
Endpoint: resolveEndpoint(ctx),
@@ -112,6 +117,7 @@ func (p *usageQueuePlugin) HandleUsage(ctx context.Context, record coreusage.Rec
112117
type queuedUsageDetail struct {
113118
requestDetail
114119
Provider string `json:"provider"`
120+
ExecutorType string `json:"executor_type"`
115121
Model string `json:"model"`
116122
Alias string `json:"alias"`
117123
Endpoint string `json:"endpoint"`

internal/redisqueue/plugin_test.go

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,7 @@ func TestUsageQueuePluginPayloadIncludesStableFieldsAndSuccess(t *testing.T) {
2626
plugin := &usageQueuePlugin{}
2727
plugin.HandleUsage(ctx, coreusage.Record{
2828
Provider: "openai",
29+
ExecutorType: "KimiExecutor",
2930
Model: "gpt-5.4",
3031
Alias: "client-gpt",
3132
APIKey: "test-key",
@@ -47,6 +48,7 @@ func TestUsageQueuePluginPayloadIncludesStableFieldsAndSuccess(t *testing.T) {
4748

4849
payload := popSinglePayload(t)
4950
requireStringField(t, payload, "provider", "openai")
51+
requireStringField(t, payload, "executor_type", "KimiExecutor")
5052
requireStringField(t, payload, "model", "gpt-5.4")
5153
requireStringField(t, payload, "alias", "client-gpt")
5254
requireStringField(t, payload, "endpoint", "POST /v1/chat/completions")

internal/redisqueue/queue.go

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,9 @@ const (
1010
defaultRetentionSeconds int64 = 60
1111
maxRetentionSeconds int64 = 3600
1212
usageSubscriberBuffer = 256
13+
14+
usageSupportRefreshPayload = `{"support_refresh":true}`
15+
usageRefreshPayload = `{"refresh":true}`
1316
)
1417

1518
type queueItem struct {
@@ -83,6 +86,10 @@ func SubscribeUsage() (<-chan []byte, func()) {
8386
return global.subscribeUsage()
8487
}
8588

89+
func NotifyUsageRefresh() {
90+
global.publishToSubscribers([]byte(usageRefreshPayload))
91+
}
92+
8693
func (q *queue) clear() {
8794
q.mu.Lock()
8895

@@ -137,6 +144,7 @@ func (q *queue) publishToSubscribers(payload []byte) bool {
137144

138145
func (q *queue) subscribeUsage() (<-chan []byte, func()) {
139146
subscriber := make(chan []byte, usageSubscriberBuffer)
147+
subscriber <- []byte(usageSupportRefreshPayload)
140148

141149
q.mu.Lock()
142150
if q.subscribers == nil {

internal/redisqueue/queue_test.go

Lines changed: 23 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,9 @@ func TestEnqueueBroadcastsToUsageSubscribersAndSkipsQueue(t *testing.T) {
1212
second, unsubscribeSecond := SubscribeUsage()
1313
defer unsubscribeSecond()
1414

15+
requireUsageSubscriberPayload(t, first, usageSupportRefreshPayload)
16+
requireUsageSubscriberPayload(t, second, usageSupportRefreshPayload)
17+
1518
Enqueue([]byte("usage-record"))
1619

1720
requireUsageSubscriberPayload(t, first, "usage-record")
@@ -37,6 +40,8 @@ func TestSetEnabledFalseClosesUsageSubscribers(t *testing.T) {
3740
subscriber, unsubscribe := SubscribeUsage()
3841
defer unsubscribe()
3942

43+
requireUsageSubscriberPayload(t, subscriber, usageSupportRefreshPayload)
44+
4045
SetEnabled(false)
4146

4247
select {
@@ -50,6 +55,24 @@ func TestSetEnabledFalseClosesUsageSubscribers(t *testing.T) {
5055
})
5156
}
5257

58+
func TestNotifyUsageRefreshBroadcastsOnlyToUsageSubscribers(t *testing.T) {
59+
withEnabledQueue(t, func() {
60+
subscriber, unsubscribe := SubscribeUsage()
61+
defer unsubscribe()
62+
63+
requireUsageSubscriberPayload(t, subscriber, usageSupportRefreshPayload)
64+
65+
NotifyUsageRefresh()
66+
requireUsageSubscriberPayload(t, subscriber, usageRefreshPayload)
67+
68+
unsubscribe()
69+
NotifyUsageRefresh()
70+
if items := PopOldest(1); len(items) != 0 {
71+
t.Fatalf("PopOldest() items = %q, want empty after refresh notification without subscribers", items)
72+
}
73+
})
74+
}
75+
5376
func requireUsageSubscriberPayload(t *testing.T, subscriber <-chan []byte, want string) {
5477
t.Helper()
5578

internal/runtime/executor/aistudio_executor.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -128,7 +128,7 @@ func (e *AIStudioExecutor) Execute(ctx context.Context, auth *cliproxyauth.Auth,
128128
return resp, statusErr{code: http.StatusNotImplemented, msg: "/responses/compact not supported"}
129129
}
130130
baseModel := thinking.ParseSuffix(req.Model).ModelName
131-
reporter := helps.NewUsageReporter(ctx, e.Identifier(), baseModel, auth)
131+
reporter := helps.NewExecutorUsageReporter(ctx, e, baseModel, auth)
132132
defer reporter.TrackFailure(ctx, &err)
133133

134134
translatedReq, body, err := e.translateRequest(req, opts, false)
@@ -196,7 +196,7 @@ func (e *AIStudioExecutor) ExecuteStream(ctx context.Context, auth *cliproxyauth
196196
return nil, statusErr{code: http.StatusNotImplemented, msg: "/responses/compact not supported"}
197197
}
198198
baseModel := thinking.ParseSuffix(req.Model).ModelName
199-
reporter := helps.NewUsageReporter(ctx, e.Identifier(), baseModel, auth)
199+
reporter := helps.NewExecutorUsageReporter(ctx, e, baseModel, auth)
200200
defer reporter.TrackFailure(ctx, &err)
201201

202202
translatedReq, body, err := e.translateRequest(req, opts, true)

internal/runtime/executor/antigravity_executor.go

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -530,7 +530,7 @@ func (e *AntigravityExecutor) Execute(ctx context.Context, auth *cliproxyauth.Au
530530
return e.executeClaudeNonStream(ctx, auth, req, opts)
531531
}
532532

533-
reporter := helps.NewUsageReporter(ctx, e.Identifier(), baseModel, auth)
533+
reporter := helps.NewExecutorUsageReporter(ctx, e, baseModel, auth)
534534
defer reporter.TrackFailure(ctx, &err)
535535

536536
from := opts.SourceFormat
@@ -730,7 +730,7 @@ func (e *AntigravityExecutor) executeClaudeNonStream(ctx context.Context, auth *
730730
return resp, statusErr{code: http.StatusTooManyRequests, msg: fmt.Sprintf("auth in short cooldown, %s remaining", remaining), retryAfter: &d}
731731
}
732732

733-
reporter := helps.NewUsageReporter(ctx, e.Identifier(), baseModel, auth)
733+
reporter := helps.NewExecutorUsageReporter(ctx, e, baseModel, auth)
734734
defer reporter.TrackFailure(ctx, &err)
735735

736736
from := opts.SourceFormat
@@ -1192,7 +1192,7 @@ func (e *AntigravityExecutor) ExecuteStream(ctx context.Context, auth *cliproxya
11921192
return nil, statusErr{code: http.StatusTooManyRequests, msg: fmt.Sprintf("auth in short cooldown, %s remaining", remaining), retryAfter: &d}
11931193
}
11941194

1195-
reporter := helps.NewUsageReporter(ctx, e.Identifier(), baseModel, auth)
1195+
reporter := helps.NewExecutorUsageReporter(ctx, e, baseModel, auth)
11961196
defer reporter.TrackFailure(ctx, &err)
11971197

11981198
from := opts.SourceFormat

internal/runtime/executor/claude_executor.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -171,7 +171,7 @@ func (e *ClaudeExecutor) Execute(ctx context.Context, auth *cliproxyauth.Auth, r
171171
baseURL = "https://api.anthropic.com"
172172
}
173173

174-
reporter := helps.NewUsageReporter(ctx, e.Identifier(), baseModel, auth)
174+
reporter := helps.NewExecutorUsageReporter(ctx, e, baseModel, auth)
175175
defer reporter.TrackFailure(ctx, &err)
176176
from := opts.SourceFormat
177177
to := sdktranslator.FromString("claude")
@@ -354,7 +354,7 @@ func (e *ClaudeExecutor) ExecuteStream(ctx context.Context, auth *cliproxyauth.A
354354
baseURL = "https://api.anthropic.com"
355355
}
356356

357-
reporter := helps.NewUsageReporter(ctx, e.Identifier(), baseModel, auth)
357+
reporter := helps.NewExecutorUsageReporter(ctx, e, baseModel, auth)
358358
defer reporter.TrackFailure(ctx, &err)
359359
from := opts.SourceFormat
360360
to := sdktranslator.FromString("claude")

internal/runtime/executor/codex_executor.go

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -264,7 +264,7 @@ func (e *CodexExecutor) Execute(ctx context.Context, auth *cliproxyauth.Auth, re
264264
baseURL = "https://chatgpt.com/backend-api/codex"
265265
}
266266

267-
reporter := helps.NewUsageReporter(ctx, e.Identifier(), baseModel, auth)
267+
reporter := helps.NewExecutorUsageReporter(ctx, e, baseModel, auth)
268268
defer reporter.TrackFailure(ctx, &err)
269269

270270
from := opts.SourceFormat
@@ -431,7 +431,7 @@ func (e *CodexExecutor) executeCompact(ctx context.Context, auth *cliproxyauth.A
431431
baseURL = "https://chatgpt.com/backend-api/codex"
432432
}
433433

434-
reporter := helps.NewUsageReporter(ctx, e.Identifier(), baseModel, auth)
434+
reporter := helps.NewExecutorUsageReporter(ctx, e, baseModel, auth)
435435
defer reporter.TrackFailure(ctx, &err)
436436

437437
from := opts.SourceFormat
@@ -536,7 +536,7 @@ func (e *CodexExecutor) ExecuteStream(ctx context.Context, auth *cliproxyauth.Au
536536
baseURL = "https://chatgpt.com/backend-api/codex"
537537
}
538538

539-
reporter := helps.NewUsageReporter(ctx, e.Identifier(), baseModel, auth)
539+
reporter := helps.NewExecutorUsageReporter(ctx, e, baseModel, auth)
540540
defer reporter.TrackFailure(ctx, &err)
541541

542542
from := opts.SourceFormat

internal/runtime/executor/codex_openai_images.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -89,7 +89,7 @@ func (e *CodexExecutor) executeOpenAIImage(ctx context.Context, auth *cliproxyau
8989
}
9090

9191
mainModel := e.resolveGPTImage2BaseModel()
92-
reporter := helps.NewUsageReporter(ctx, e.Identifier(), mainModel, auth)
92+
reporter := helps.NewExecutorUsageReporter(ctx, e, mainModel, auth)
9393
defer reporter.TrackFailure(ctx, &err)
9494

9595
body, errBuild := e.prepareCodexOpenAIImageBody(prepared.Body, req, opts, mainModel)
@@ -182,7 +182,7 @@ func (e *CodexExecutor) executeOpenAIImageStream(ctx context.Context, auth *clip
182182
}
183183

184184
mainModel := e.resolveGPTImage2BaseModel()
185-
reporter := helps.NewUsageReporter(ctx, e.Identifier(), mainModel, auth)
185+
reporter := helps.NewExecutorUsageReporter(ctx, e, mainModel, auth)
186186
defer reporter.TrackFailure(ctx, &err)
187187

188188
body, errBuild := e.prepareCodexOpenAIImageBody(prepared.Body, req, opts, mainModel)

0 commit comments

Comments
 (0)