Skip to content

Commit 2c04e8a

Browse files
author
ccs-upstream-sync[bot]
committed
Merge remote-tracking branch 'upstream/main' into upstream-sync/20260507-0430
2 parents 0ef9c79 + 785b00c commit 2c04e8a

9 files changed

Lines changed: 247 additions & 1 deletion

File tree

README.md

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -110,6 +110,10 @@ Standalone persistence and visualization service for CLIProxyAPI, with periodic
110110

111111
Local-first usage and quota dashboard for CLIProxyAPI. It collects per-request token usage from the Redis-compatible usage queue into SQLite, visualizes daily and recent-window usage by account and model, and shows Codex 5h/7d quota remaining in a local web UI.
112112

113+
### [CPA-Manager](https://github.com/seakee/CPA-Manager)
114+
115+
Full CLIProxyAPI management center with request-level monitoring and cost estimates. CPA-Manager tracks collected requests by account, model, channel, latency, status, and token usage; estimates cost with editable model prices and one-click LiteLLM price sync; persists events in SQLite; and provides Codex account-pool operations with batch inspection, quota detection, unhealthy account discovery, cleanup suggestions, and one-click execution for day-to-day multi-account maintenance.
116+
113117
## Amp CLI Support
114118

115119
CLIProxyAPI includes integrated support for [Amp CLI](https://ampcode.com) and Amp IDE extensions, enabling you to use your Google/ChatGPT/Claude OAuth subscriptions with Amp's coding tools:

README_CN.md

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -110,6 +110,10 @@ docker compose up -d
110110

111111
面向 CLIProxyAPI 的本地优先使用量与配额看板。它从 Redis 兼容使用量队列采集每次请求的 Token 消耗并写入 SQLite,按账号和模型可视化每日及最近时间窗口的用量,并在本地网页中显示 Codex 5h/7d 配额余量。
112112

113+
### [CPA-Manager](https://github.com/seakee/CPA-Manager)
114+
115+
面向 CLIProxyAPI 的完整管理中心,提供请求级监控和费用预估。CPA-Manager 可按账号、模型、渠道、延迟、状态和 token 用量追踪采集到的请求;支持可编辑模型价格与一键同步 LiteLLM 价格来估算费用;用 SQLite 持久化事件;并提供面向 Codex 账号池的批量巡检、配额识别、异常账号定位、清理建议与一键执行能力,适合多账号池的日常运维管理。
116+
113117
## Amp CLI 支持
114118

115119
CLIProxyAPI 已内置对 [Amp CLI](https://ampcode.com) 和 Amp IDE 扩展的支持,可让你使用自己的 Google/ChatGPT/Claude OAuth 订阅来配合 Amp 编码工具:

README_JA.md

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -78,6 +78,10 @@ CLIProxyAPI向けの独立した使用量永続化・可視化サービス。CLI
7878

7979
CLIProxyAPI向けのローカル優先の使用量・クォータダッシュボード。Redis互換の使用量キューからリクエストごとのToken使用量を収集してSQLiteに保存し、アカウント別・モデル別の日次および直近時間枠の使用量を可視化し、Codex 5h/7dクォータ残量をローカルWeb UIで表示します。
8080

81+
### [CPA-Manager](https://github.com/seakee/CPA-Manager)
82+
83+
リクエスト単位の監視とコスト推定を備えたCLIProxyAPI向けのフル管理センターです。CPA-Managerは、収集したリクエストをアカウント、モデル、チャネル、レイテンシ、ステータス、Token使用量ごとに追跡し、編集可能なモデル価格とLiteLLM価格のワンクリック同期でコストを推定します。SQLiteでイベントを永続化し、Codexアカウントプール向けに一括検査、クォータ判定、異常アカウント検出、クリーンアップ提案、ワンクリック実行を提供し、日常的なマルチアカウント運用に適しています。
84+
8185
## Amp CLIサポート
8286

8387
CLIProxyAPIは[Amp CLI](https://ampcode.com)およびAmp IDE拡張機能の統合サポートを含んでおり、Google/ChatGPT/ClaudeのOAuthサブスクリプションをAmpのコーディングツールで使用できます:

internal/api/modules/amp/routes.go

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -199,6 +199,7 @@ func (m *AmpModule) registerManagementRoutes(engine *gin.Engine, baseHandler *ha
199199
ampAPI.Any("/telemetry/*path", proxyHandler)
200200
ampAPI.Any("/threads", proxyHandler)
201201
ampAPI.Any("/threads/*path", proxyHandler)
202+
ampAPI.Any("/thread-actors", proxyHandler)
202203
ampAPI.Any("/otel", proxyHandler)
203204
ampAPI.Any("/otel/*path", proxyHandler)
204205
ampAPI.Any("/tab", proxyHandler)

internal/api/modules/amp/routes_test.go

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -49,6 +49,7 @@ func TestRegisterManagementRoutes(t *testing.T) {
4949
{"/api/meta", http.MethodGet},
5050
{"/api/telemetry", http.MethodGet},
5151
{"/api/threads", http.MethodGet},
52+
{"/api/thread-actors", http.MethodPost},
5253
{"/threads/", http.MethodGet},
5354
{"/threads.rss", http.MethodGet}, // Root-level route (no /api prefix)
5455
{"/api/otel", http.MethodGet},

internal/runtime/executor/codex_websockets_executor.go

Lines changed: 39 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -76,6 +76,9 @@ type codexWebsocketSession struct {
7676
activeCancel context.CancelFunc
7777

7878
readerConn *websocket.Conn
79+
80+
upstreamDisconnectOnce sync.Once
81+
upstreamDisconnectCh chan error
7982
}
8083

8184
func NewCodexWebsocketsExecutor(cfg *config.Config) *CodexWebsocketsExecutor {
@@ -151,6 +154,22 @@ func (s *codexWebsocketSession) configureConn(conn *websocket.Conn) {
151154
})
152155
}
153156

157+
func (s *codexWebsocketSession) notifyUpstreamDisconnect(err error) {
158+
if s == nil {
159+
return
160+
}
161+
s.upstreamDisconnectOnce.Do(func() {
162+
if s.upstreamDisconnectCh == nil {
163+
return
164+
}
165+
select {
166+
case s.upstreamDisconnectCh <- err:
167+
default:
168+
}
169+
close(s.upstreamDisconnectCh)
170+
})
171+
}
172+
154173
func (e *CodexWebsocketsExecutor) Execute(ctx context.Context, auth *cliproxyauth.Auth, req cliproxyexecutor.Request, opts cliproxyexecutor.Options) (resp cliproxyexecutor.Response, err error) {
155174
if ctx == nil {
156175
ctx = context.Background()
@@ -1221,11 +1240,22 @@ func (e *CodexWebsocketsExecutor) getOrCreateSession(sessionID string) *codexWeb
12211240
if sess, ok := store.sessions[sessionID]; ok && sess != nil {
12221241
return sess
12231242
}
1224-
sess := &codexWebsocketSession{sessionID: sessionID}
1243+
sess := &codexWebsocketSession{
1244+
sessionID: sessionID,
1245+
upstreamDisconnectCh: make(chan error, 1),
1246+
}
12251247
store.sessions[sessionID] = sess
12261248
return sess
12271249
}
12281250

1251+
func (e *CodexWebsocketsExecutor) UpstreamDisconnectChan(sessionID string) <-chan error {
1252+
sess := e.getOrCreateSession(sessionID)
1253+
if sess == nil {
1254+
return nil
1255+
}
1256+
return sess.upstreamDisconnectCh
1257+
}
1258+
12291259
func (e *CodexWebsocketsExecutor) ensureUpstreamConn(ctx context.Context, auth *cliproxyauth.Auth, sess *codexWebsocketSession, authID string, wsURL string, headers http.Header) (*websocket.Conn, *http.Response, error) {
12301260
if sess == nil {
12311261
return e.dialCodexWebsocket(ctx, auth, wsURL, headers)
@@ -1354,6 +1384,7 @@ func (e *CodexWebsocketsExecutor) invalidateUpstreamConn(sess *codexWebsocketSes
13541384
sess.connMu.Unlock()
13551385

13561386
logCodexWebsocketDisconnected(sessionID, authID, wsURL, reason, err)
1387+
sess.notifyUpstreamDisconnect(err)
13571388
if errClose := conn.Close(); errClose != nil {
13581389
log.Errorf("codex websockets executor: close websocket error: %v", errClose)
13591390
}
@@ -1592,6 +1623,13 @@ func (e *CodexAutoExecutor) CloseExecutionSession(sessionID string) {
15921623
e.wsExec.CloseExecutionSession(sessionID)
15931624
}
15941625

1626+
func (e *CodexAutoExecutor) UpstreamDisconnectChan(sessionID string) <-chan error {
1627+
if e == nil || e.wsExec == nil {
1628+
return nil
1629+
}
1630+
return e.wsExec.UpstreamDisconnectChan(sessionID)
1631+
}
1632+
15951633
func codexWebsocketsEnabled(auth *cliproxyauth.Auth) bool {
15961634
if auth == nil {
15971635
return false

internal/runtime/executor/codex_websockets_executor_test.go

Lines changed: 59 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@ package executor
33
import (
44
"bytes"
55
"context"
6+
"errors"
67
"net/http"
78
"net/http/httptest"
89
"strings"
@@ -92,6 +93,64 @@ func TestCodexWebsocketsExecutePreservesPreviousResponseIDUpstream(t *testing.T)
9293
}
9394
}
9495

96+
func TestCodexWebsocketsUpstreamDisconnectChanSignalsOnInvalidate(t *testing.T) {
97+
upgrader := websocket.Upgrader{CheckOrigin: func(*http.Request) bool { return true }}
98+
server := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
99+
conn, err := upgrader.Upgrade(w, r, nil)
100+
if err != nil {
101+
t.Errorf("upgrade websocket: %v", err)
102+
return
103+
}
104+
defer func() { _ = conn.Close() }()
105+
for {
106+
if _, _, errRead := conn.ReadMessage(); errRead != nil {
107+
return
108+
}
109+
}
110+
}))
111+
defer server.Close()
112+
113+
wsURL := "ws" + strings.TrimPrefix(server.URL, "http")
114+
conn, _, err := websocket.DefaultDialer.Dial(wsURL, nil)
115+
if err != nil {
116+
t.Fatalf("dial websocket: %v", err)
117+
}
118+
defer func() { _ = conn.Close() }()
119+
120+
exec := NewCodexWebsocketsExecutor(&config.Config{})
121+
sessionID := "sess-1"
122+
disconnectCh := exec.UpstreamDisconnectChan(sessionID)
123+
if disconnectCh == nil {
124+
t.Fatal("expected disconnect channel")
125+
}
126+
127+
sess := exec.getOrCreateSession(sessionID)
128+
if sess == nil {
129+
t.Fatal("expected session")
130+
}
131+
sess.connMu.Lock()
132+
sess.conn = conn
133+
sess.authID = "auth-1"
134+
sess.wsURL = "ws://example.test/responses"
135+
sess.readerConn = conn
136+
sess.connMu.Unlock()
137+
138+
upstreamErr := errors.New("upstream gone")
139+
exec.invalidateUpstreamConn(sess, conn, "test_invalidate", upstreamErr)
140+
141+
select {
142+
case errRead, ok := <-disconnectCh:
143+
if !ok {
144+
t.Fatal("expected disconnect channel to deliver error before closing")
145+
}
146+
if errRead == nil || errRead.Error() != upstreamErr.Error() {
147+
t.Fatalf("disconnect error = %v, want %v", errRead, upstreamErr)
148+
}
149+
case <-time.After(5 * time.Second):
150+
t.Fatal("timed out waiting for disconnect signal")
151+
}
152+
}
153+
95154
func TestApplyCodexWebsocketHeadersDefaultsToCurrentResponsesBeta(t *testing.T) {
96155
headers := applyCodexWebsocketHeaders(context.Background(), http.Header{}, nil, "", nil)
97156

sdk/api/handlers/openai/openai_responses_websocket.go

Lines changed: 25 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -56,6 +56,31 @@ func (h *OpenAIResponsesAPIHandler) ResponsesWebsocket(c *gin.Context) {
5656
retainResponsesWebsocketToolCaches(downstreamSessionKey)
5757
clientIP := websocketClientAddress(c)
5858
log.Infof("responses websocket: client connected id=%s remote=%s", passthroughSessionID, clientIP)
59+
60+
wsDone := make(chan struct{})
61+
defer close(wsDone)
62+
63+
if h != nil && h.AuthManager != nil {
64+
if exec, ok := h.AuthManager.Executor("codex"); ok && exec != nil {
65+
type upstreamDisconnectSubscriber interface {
66+
UpstreamDisconnectChan(sessionID string) <-chan error
67+
}
68+
if subscriber, ok := exec.(upstreamDisconnectSubscriber); ok && subscriber != nil {
69+
disconnectCh := subscriber.UpstreamDisconnectChan(passthroughSessionID)
70+
if disconnectCh != nil {
71+
go func() {
72+
select {
73+
case <-wsDone:
74+
return
75+
case <-disconnectCh:
76+
_ = conn.Close()
77+
}
78+
}()
79+
}
80+
}
81+
}
82+
}
83+
5984
var wsTerminateErr error
6085
var wsTimelineLog strings.Builder
6186
defer func() {

sdk/api/handlers/openai/openai_responses_websocket_test.go

Lines changed: 110 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -85,6 +85,79 @@ func (e websocketPinnedFailoverStatusError) Error() string { return e.msg }
8585

8686
func (e websocketPinnedFailoverStatusError) StatusCode() int { return e.status }
8787

88+
type websocketUpstreamDisconnectExecutor struct {
89+
mu sync.Mutex
90+
subscribed chan string
91+
sessions map[string]chan error
92+
}
93+
94+
func (e *websocketUpstreamDisconnectExecutor) Identifier() string { return "codex" }
95+
96+
func (e *websocketUpstreamDisconnectExecutor) UpstreamDisconnectChan(sessionID string) <-chan error {
97+
sessionID = strings.TrimSpace(sessionID)
98+
if sessionID == "" {
99+
return nil
100+
}
101+
e.mu.Lock()
102+
if e.sessions == nil {
103+
e.sessions = make(map[string]chan error)
104+
}
105+
ch, ok := e.sessions[sessionID]
106+
if !ok {
107+
ch = make(chan error, 1)
108+
e.sessions[sessionID] = ch
109+
}
110+
subscribed := e.subscribed
111+
e.mu.Unlock()
112+
113+
if subscribed != nil {
114+
select {
115+
case subscribed <- sessionID:
116+
default:
117+
}
118+
}
119+
return ch
120+
}
121+
122+
func (e *websocketUpstreamDisconnectExecutor) TriggerDisconnect(sessionID string, err error) {
123+
sessionID = strings.TrimSpace(sessionID)
124+
if sessionID == "" {
125+
return
126+
}
127+
e.mu.Lock()
128+
ch := e.sessions[sessionID]
129+
delete(e.sessions, sessionID)
130+
e.mu.Unlock()
131+
if ch == nil {
132+
return
133+
}
134+
select {
135+
case ch <- err:
136+
default:
137+
}
138+
close(ch)
139+
}
140+
141+
func (e *websocketUpstreamDisconnectExecutor) Execute(context.Context, *coreauth.Auth, coreexecutor.Request, coreexecutor.Options) (coreexecutor.Response, error) {
142+
return coreexecutor.Response{}, errors.New("not implemented")
143+
}
144+
145+
func (e *websocketUpstreamDisconnectExecutor) ExecuteStream(context.Context, *coreauth.Auth, coreexecutor.Request, coreexecutor.Options) (*coreexecutor.StreamResult, error) {
146+
return nil, errors.New("not implemented")
147+
}
148+
149+
func (e *websocketUpstreamDisconnectExecutor) Refresh(_ context.Context, auth *coreauth.Auth) (*coreauth.Auth, error) {
150+
return auth, nil
151+
}
152+
153+
func (e *websocketUpstreamDisconnectExecutor) CountTokens(context.Context, *coreauth.Auth, coreexecutor.Request, coreexecutor.Options) (coreexecutor.Response, error) {
154+
return coreexecutor.Response{}, errors.New("not implemented")
155+
}
156+
157+
func (e *websocketUpstreamDisconnectExecutor) HttpRequest(context.Context, *coreauth.Auth, *http.Request) (*http.Response, error) {
158+
return nil, errors.New("not implemented")
159+
}
160+
88161
func (e *websocketAuthCaptureExecutor) Identifier() string { return "test-provider" }
89162

90163
func (e *websocketAuthCaptureExecutor) Execute(context.Context, *coreauth.Auth, coreexecutor.Request, coreexecutor.Options) (coreexecutor.Response, error) {
@@ -934,6 +1007,43 @@ func TestResponsesWebsocketTimelineRecordsDisconnectEvent(t *testing.T) {
9341007
}
9351008
}
9361009

1010+
func TestResponsesWebsocketClosesOnCodexUpstreamDisconnect(t *testing.T) {
1011+
gin.SetMode(gin.TestMode)
1012+
1013+
executor := &websocketUpstreamDisconnectExecutor{subscribed: make(chan string, 1)}
1014+
manager := coreauth.NewManager(nil, nil, nil)
1015+
manager.RegisterExecutor(executor)
1016+
base := handlers.NewBaseAPIHandlers(&sdkconfig.SDKConfig{}, manager)
1017+
h := NewOpenAIResponsesAPIHandler(base)
1018+
1019+
router := gin.New()
1020+
router.GET("/v1/responses/ws", h.ResponsesWebsocket)
1021+
server := httptest.NewServer(router)
1022+
defer server.Close()
1023+
1024+
wsURL := "ws" + strings.TrimPrefix(server.URL, "http") + "/v1/responses/ws"
1025+
conn, _, err := websocket.DefaultDialer.Dial(wsURL, nil)
1026+
if err != nil {
1027+
t.Fatalf("dial websocket: %v", err)
1028+
}
1029+
defer func() { _ = conn.Close() }()
1030+
1031+
var sessionID string
1032+
select {
1033+
case sessionID = <-executor.subscribed:
1034+
case <-time.After(5 * time.Second):
1035+
t.Fatal("timed out waiting for upstream disconnect subscription")
1036+
}
1037+
1038+
executor.TriggerDisconnect(sessionID, errors.New("upstream disconnected"))
1039+
1040+
_ = conn.SetReadDeadline(time.Now().Add(2 * time.Second))
1041+
_, _, err = conn.ReadMessage()
1042+
if err == nil {
1043+
t.Fatalf("expected downstream websocket to close after upstream disconnect")
1044+
}
1045+
}
1046+
9371047
func TestWebsocketUpstreamSupportsIncrementalInputForModel(t *testing.T) {
9381048
manager := coreauth.NewManager(nil, nil, nil)
9391049
auth := &coreauth.Auth{

0 commit comments

Comments
 (0)