Skip to content

Commit 2f5bda5

Browse files
Merge upstream/main (auto-sync feat/copilot)
- 55440f0 feat(auth): add runtime auth removal and unscheduling logic - fd30944 feat(auth): add error event publishing and Redis queue integration
2 parents 22dbb38 + fd30944 commit 2f5bda5

13 files changed

Lines changed: 769 additions & 87 deletions

internal/api/handlers/management/auth_files.go

Lines changed: 7 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -770,7 +770,7 @@ func (h *Handler) DeleteAuthFile(c *gin.Context) {
770770
return
771771
}
772772
deleted++
773-
h.disableAuth(ctx, full)
773+
h.removeAuth(ctx, full)
774774
}
775775
}
776776
c.JSON(200, gin.H{"status": "ok", "deleted": deleted})
@@ -976,9 +976,9 @@ func (h *Handler) deleteAuthFileByName(ctx context.Context, name string) (string
976976
return filepath.Base(name), http.StatusInternalServerError, errDeleteRecord
977977
}
978978
if targetID != "" {
979-
h.disableAuth(ctx, targetID)
979+
h.removeAuth(ctx, targetID)
980980
} else {
981-
h.disableAuth(ctx, targetPath)
981+
h.removeAuth(ctx, targetPath)
982982
}
983983
return filepath.Base(name), http.StatusOK, nil
984984
}
@@ -1558,33 +1558,23 @@ func syncAuthFileDisabledState(auth *coreauth.Auth) {
15581558
auth.StatusMessage = ""
15591559
}
15601560

1561-
func (h *Handler) disableAuth(ctx context.Context, id string) {
1561+
func (h *Handler) removeAuth(ctx context.Context, id string) {
15621562
if h == nil || h.authManager == nil {
15631563
return
15641564
}
15651565
id = strings.TrimSpace(id)
15661566
if id == "" {
15671567
return
15681568
}
1569-
if auth, ok := h.authManager.GetByID(id); ok {
1570-
auth.Disabled = true
1571-
auth.Status = coreauth.StatusDisabled
1572-
auth.StatusMessage = "removed via management API"
1573-
auth.UpdatedAt = time.Now()
1574-
_, _ = h.authManager.Update(ctx, auth)
1569+
if _, ok := h.authManager.GetByID(id); ok {
1570+
h.authManager.Remove(ctx, id)
15751571
return
15761572
}
15771573
authID := h.authIDForPath(id)
15781574
if authID == "" {
15791575
return
15801576
}
1581-
if auth, ok := h.authManager.GetByID(authID); ok {
1582-
auth.Disabled = true
1583-
auth.Status = coreauth.StatusDisabled
1584-
auth.StatusMessage = "removed via management API"
1585-
auth.UpdatedAt = time.Now()
1586-
_, _ = h.authManager.Update(ctx, auth)
1587-
}
1577+
h.authManager.Remove(ctx, authID)
15881578
}
15891579

15901580
func (h *Handler) deleteTokenRecord(ctx context.Context, path string) error {

internal/api/handlers/management/auth_files_delete_test.go

Lines changed: 46 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -127,3 +127,49 @@ func TestDeleteAuthFile_FallbackToAuthDirPath(t *testing.T) {
127127
t.Fatalf("expected auth file to be removed from auth dir, stat err: %v", errStat)
128128
}
129129
}
130+
131+
func TestDeleteAuthFile_RemovesRuntimeAuth(t *testing.T) {
132+
t.Setenv("MANAGEMENT_PASSWORD", "")
133+
gin.SetMode(gin.TestMode)
134+
135+
authDir := t.TempDir()
136+
fileName := "runtime-remove-user.json"
137+
filePath := filepath.Join(authDir, fileName)
138+
if errWrite := os.WriteFile(filePath, []byte(`{"type":"codex","email":"runtime@example.com"}`), 0o600); errWrite != nil {
139+
t.Fatalf("failed to write auth file: %v", errWrite)
140+
}
141+
142+
manager := coreauth.NewManager(nil, nil, nil)
143+
record := &coreauth.Auth{
144+
ID: "runtime-remove-auth",
145+
FileName: fileName,
146+
Provider: "codex",
147+
Status: coreauth.StatusActive,
148+
Attributes: map[string]string{
149+
"path": filePath,
150+
},
151+
Metadata: map[string]any{
152+
"type": "codex",
153+
"email": "runtime@example.com",
154+
},
155+
}
156+
if _, errRegister := manager.Register(context.Background(), record); errRegister != nil {
157+
t.Fatalf("failed to register auth record: %v", errRegister)
158+
}
159+
160+
h := NewHandlerWithoutConfigFilePath(&config.Config{AuthDir: authDir}, manager)
161+
h.tokenStore = &memoryAuthStore{}
162+
163+
deleteRec := httptest.NewRecorder()
164+
deleteCtx, _ := gin.CreateTestContext(deleteRec)
165+
deleteReq := httptest.NewRequest(http.MethodDelete, "/v0/management/auth-files?name="+url.QueryEscape(fileName), nil)
166+
deleteCtx.Request = deleteReq
167+
h.DeleteAuthFile(deleteCtx)
168+
169+
if deleteRec.Code != http.StatusOK {
170+
t.Fatalf("expected delete status %d, got %d with body %s", http.StatusOK, deleteRec.Code, deleteRec.Body.String())
171+
}
172+
if _, ok := manager.GetByID(record.ID); ok {
173+
t.Fatalf("expected runtime auth %q to be removed", record.ID)
174+
}
175+
}

internal/api/redis_queue_protocol.go

Lines changed: 43 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -14,7 +14,10 @@ import (
1414
log "github.com/sirupsen/logrus"
1515
)
1616

17-
const redisUsageChannel = "usage"
17+
const (
18+
redisUsageChannel = "usage"
19+
redisErrorsChannel = "errors"
20+
)
1821

1922
type redisSubscriptionCommand struct {
2023
args []string
@@ -150,15 +153,15 @@ func (s *Server) handleRedisConnection(conn net.Conn, reader *bufio.Reader) {
150153
}
151154
continue
152155
}
153-
if !strings.EqualFold(channel, redisUsageChannel) {
156+
messages, unsubscribe, ok := subscribeRedisChannel(channel)
157+
if !ok {
154158
_ = writeRedisError(writer, fmt.Sprintf("ERR unsupported channel '%s'", channel))
155159
if !flush() {
156160
return
157161
}
158162
continue
159163
}
160-
messages, unsubscribe := redisqueue.SubscribeUsage()
161-
if errWrite := writeRedisPubSubSubscribe(writer, redisUsageChannel, 1); errWrite != nil {
164+
if errWrite := writeRedisPubSubSubscribe(writer, channel, 1); errWrite != nil {
162165
unsubscribe()
163166
log.Errorf("redis protocol subscribe response error: %v", errWrite)
164167
return
@@ -167,7 +170,7 @@ func (s *Server) handleRedisConnection(conn net.Conn, reader *bufio.Reader) {
167170
unsubscribe()
168171
return
169172
}
170-
s.streamRedisUsageSubscription(reader, writer, messages, unsubscribe)
173+
s.streamRedisSubscription(reader, writer, channel, messages, unsubscribe)
171174
return
172175
case "LPOP", "RPOP":
173176
count, hasCount, ok := parsePopCount(args)
@@ -185,7 +188,14 @@ func (s *Server) handleRedisConnection(conn net.Conn, reader *bufio.Reader) {
185188
}
186189
continue
187190
}
188-
items := redisqueue.PopOldest(count)
191+
items, ok := popRedisQueueItems(args[1], count)
192+
if !ok {
193+
_ = writeRedisError(writer, fmt.Sprintf("ERR unsupported channel '%s'", strings.TrimSpace(args[1])))
194+
if !flush() {
195+
return
196+
}
197+
continue
198+
}
189199
if hasCount {
190200
_ = writeRedisArrayOfBulkStrings(writer, items)
191201
if !flush() {
@@ -213,7 +223,29 @@ func (s *Server) handleRedisConnection(conn net.Conn, reader *bufio.Reader) {
213223
}
214224
}
215225

216-
func (s *Server) streamRedisUsageSubscription(reader *bufio.Reader, writer *bufio.Writer, messages <-chan []byte, unsubscribe func()) {
226+
func subscribeRedisChannel(channel string) (<-chan []byte, func(), bool) {
227+
switch strings.ToLower(strings.TrimSpace(channel)) {
228+
case redisUsageChannel:
229+
messages, unsubscribe := redisqueue.SubscribeUsage()
230+
return messages, unsubscribe, true
231+
case redisErrorsChannel:
232+
messages, unsubscribe := redisqueue.SubscribeErrors()
233+
return messages, unsubscribe, true
234+
default:
235+
return nil, nil, false
236+
}
237+
}
238+
239+
func popRedisQueueItems(channel string, count int) ([][]byte, bool) {
240+
switch strings.ToLower(strings.TrimSpace(channel)) {
241+
case redisUsageChannel:
242+
return redisqueue.PopOldest(count), true
243+
default:
244+
return nil, false
245+
}
246+
}
247+
248+
func (s *Server) streamRedisSubscription(reader *bufio.Reader, writer *bufio.Writer, channel string, messages <-chan []byte, unsubscribe func()) {
217249
if unsubscribe == nil {
218250
return
219251
}
@@ -231,7 +263,7 @@ func (s *Server) streamRedisUsageSubscription(reader *bufio.Reader, writer *bufi
231263
if !ok {
232264
return
233265
}
234-
if errWrite := writeRedisPubSubMessage(writer, redisUsageChannel, msg); errWrite != nil {
266+
if errWrite := writeRedisPubSubMessage(writer, channel, msg); errWrite != nil {
235267
log.Errorf("redis protocol publish message error: %v", errWrite)
236268
return
237269
}
@@ -243,7 +275,7 @@ func (s *Server) streamRedisUsageSubscription(reader *bufio.Reader, writer *bufi
243275
if !ok {
244276
return
245277
}
246-
keepOpen := handleRedisSubscriptionCommand(writer, command)
278+
keepOpen := handleRedisSubscriptionCommand(writer, channel, command)
247279
if errFlush := writer.Flush(); errFlush != nil {
248280
log.Errorf("redis protocol flush error: %v", errFlush)
249281
return
@@ -277,7 +309,7 @@ func readRedisSubscriptionCommands(reader *bufio.Reader, commands chan<- redisSu
277309
}
278310
}
279311

280-
func handleRedisSubscriptionCommand(writer *bufio.Writer, command redisSubscriptionCommand) bool {
312+
func handleRedisSubscriptionCommand(writer *bufio.Writer, channel string, command redisSubscriptionCommand) bool {
281313
if command.err != nil {
282314
_ = writeRedisError(writer, "ERR "+command.err.Error())
283315
return false
@@ -297,7 +329,7 @@ func handleRedisSubscriptionCommand(writer *bufio.Writer, command redisSubscript
297329
_ = writeRedisPubSubPong(writer, payload)
298330
return true
299331
case "UNSUBSCRIBE":
300-
_ = writeRedisPubSubUnsubscribe(writer, redisUsageChannel, 0)
332+
_ = writeRedisPubSubUnsubscribe(writer, channel, 0)
301333
return false
302334
case "QUIT":
303335
_ = writeRedisSimpleString(writer, "OK")

internal/api/redis_queue_protocol_integration_test.go

Lines changed: 63 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -359,6 +359,60 @@ func TestRedisProtocol_SUBSCRIBE_UsageSendsSupportRefresh(t *testing.T) {
359359
}
360360
}
361361

362+
func TestRedisProtocol_SUBSCRIBE_ErrorsReceivesErrorEvents(t *testing.T) {
363+
const managementPassword = "test-management-password"
364+
365+
t.Setenv("MANAGEMENT_PASSWORD", managementPassword)
366+
redisqueue.SetEnabled(false)
367+
t.Cleanup(func() { redisqueue.SetEnabled(false) })
368+
369+
server := newTestServer(t)
370+
if !server.managementRoutesEnabled.Load() {
371+
t.Fatalf("expected managementRoutesEnabled to be true")
372+
}
373+
374+
addr, stop := startRedisMuxListener(t, server)
375+
t.Cleanup(stop)
376+
377+
conn, errDial := net.DialTimeout("tcp", addr, time.Second)
378+
if errDial != nil {
379+
t.Fatalf("failed to dial redis listener: %v", errDial)
380+
}
381+
t.Cleanup(func() { _ = conn.Close() })
382+
383+
reader := bufio.NewReader(conn)
384+
_ = conn.SetDeadline(time.Now().Add(5 * time.Second))
385+
386+
if errWrite := writeTestRESPCommand(conn, "AUTH", managementPassword); errWrite != nil {
387+
t.Fatalf("failed to write AUTH command: %v", errWrite)
388+
}
389+
if msg, errRead := readTestRESPSimpleString(reader); errRead != nil {
390+
t.Fatalf("failed to read AUTH response: %v", errRead)
391+
} else if msg != "OK" {
392+
t.Fatalf("unexpected AUTH response: %q", msg)
393+
}
394+
395+
if errWrite := writeTestRESPCommand(conn, "SUBSCRIBE", "errors"); errWrite != nil {
396+
t.Fatalf("failed to write SUBSCRIBE command: %v", errWrite)
397+
}
398+
channel, subscriptions, errSubscribe := readTestRESPPubSubSubscribe(reader)
399+
if errSubscribe != nil {
400+
t.Fatalf("failed to read subscribe response: %v", errSubscribe)
401+
}
402+
if channel != "errors" || subscriptions != 1 {
403+
t.Fatalf("unexpected subscribe response channel=%q subscriptions=%d", channel, subscriptions)
404+
}
405+
406+
redisqueue.EnqueueError([]byte(`{"auth_index":"auth-1","status_code":401}`))
407+
channel, payload, errMessage := readTestRESPPubSubMessage(reader)
408+
if errMessage != nil {
409+
t.Fatalf("failed to read error message: %v", errMessage)
410+
}
411+
if channel != "errors" || string(payload) != `{"auth_index":"auth-1","status_code":401}` {
412+
t.Fatalf("unexpected error message channel=%q payload=%q", channel, string(payload))
413+
}
414+
}
415+
362416
func TestRedisProtocol_AUTH_And_PopContracts(t *testing.T) {
363417
const managementPassword = "test-management-password"
364418

@@ -450,4 +504,13 @@ func TestRedisProtocol_AUTH_And_PopContracts(t *testing.T) {
450504
if len(emptyItems) != 0 {
451505
t.Fatalf("expected empty array for empty queue with count, got %#v", emptyItems)
452506
}
507+
508+
if errWrite := writeTestRESPCommand(conn, "RPOP", "errors", "2"); errWrite != nil {
509+
t.Fatalf("failed to write RPOP errors count command: %v", errWrite)
510+
}
511+
if msg, errRead := readTestRESPError(reader); errRead != nil {
512+
t.Fatalf("failed to read RPOP errors response: %v", errRead)
513+
} else if msg != "ERR unsupported channel 'errors'" {
514+
t.Fatalf("unexpected RPOP errors response: %q", msg)
515+
}
453516
}

internal/redisqueue/queue.go

Lines changed: 25 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,7 @@ const (
1010
defaultRetentionSeconds int64 = 60
1111
maxRetentionSeconds int64 = 3600
1212
usageSubscriberBuffer = 256
13+
errorSubscriberBuffer = 256
1314

1415
usageSupportRefreshPayload = `{"support_refresh":true}`
1516
usageRefreshPayload = `{"refresh":true}`
@@ -32,6 +33,7 @@ var (
3233
enabled atomic.Bool
3334
retentionSeconds atomic.Int64
3435
global queue
36+
errorGlobal queue
3537
)
3638

3739
func init() {
@@ -42,6 +44,7 @@ func SetEnabled(value bool) {
4244
enabled.Store(value)
4345
if !value {
4446
global.clear()
47+
errorGlobal.clear()
4548
}
4649
}
4750

@@ -72,6 +75,16 @@ func Enqueue(payload []byte) {
7275
global.enqueue(payload)
7376
}
7477

78+
func EnqueueError(payload []byte) {
79+
if !Enabled() {
80+
return
81+
}
82+
if len(payload) == 0 {
83+
return
84+
}
85+
errorGlobal.publishToSubscribers(payload)
86+
}
87+
7588
func PopOldest(count int) [][]byte {
7689
if !Enabled() {
7790
return nil
@@ -83,7 +96,11 @@ func PopOldest(count int) [][]byte {
8396
}
8497

8598
func SubscribeUsage() (<-chan []byte, func()) {
86-
return global.subscribeUsage()
99+
return global.subscribe(usageSubscriberBuffer, []byte(usageSupportRefreshPayload))
100+
}
101+
102+
func SubscribeErrors() (<-chan []byte, func()) {
103+
return errorGlobal.subscribe(errorSubscriberBuffer, nil)
87104
}
88105

89106
func NotifyUsageRefresh() {
@@ -142,9 +159,11 @@ func (q *queue) publishToSubscribers(payload []byte) bool {
142159
return true
143160
}
144161

145-
func (q *queue) subscribeUsage() (<-chan []byte, func()) {
146-
subscriber := make(chan []byte, usageSubscriberBuffer)
147-
subscriber <- []byte(usageSupportRefreshPayload)
162+
func (q *queue) subscribe(buffer int, initialPayload []byte) (<-chan []byte, func()) {
163+
subscriber := make(chan []byte, buffer)
164+
if len(initialPayload) > 0 {
165+
subscriber <- append([]byte(nil), initialPayload...)
166+
}
148167

149168
q.mu.Lock()
150169
if q.subscribers == nil {
@@ -158,13 +177,13 @@ func (q *queue) subscribeUsage() (<-chan []byte, func()) {
158177
var once sync.Once
159178
unsubscribe := func() {
160179
once.Do(func() {
161-
q.unsubscribeUsage(id)
180+
q.unsubscribe(id)
162181
})
163182
}
164183
return subscriber, unsubscribe
165184
}
166185

167-
func (q *queue) unsubscribeUsage(id uint64) {
186+
func (q *queue) unsubscribe(id uint64) {
168187
q.mu.Lock()
169188
subscriber, ok := q.subscribers[id]
170189
if ok {

0 commit comments

Comments
 (0)