Skip to content

Commit fd30944

Browse files
committed
feat(auth): add error event publishing and Redis queue integration
- Introduced `publishErrorEvent` in `Manager` to publish error events to Redis. - Implemented error event structure to capture authentication errors with detailed metadata. - Added test cases for error event publishing, subscription, and Redis protocol handling. - Enhanced error and usage queue handling with `SubscribeErrors` and `EnqueueError`. Closes: router-for-me#3701
1 parent 55440f0 commit fd30944

7 files changed

Lines changed: 501 additions & 17 deletions

File tree

internal/api/redis_queue_protocol.go

Lines changed: 43 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -14,7 +14,10 @@ import (
1414
log "github.com/sirupsen/logrus"
1515
)
1616

17-
const redisUsageChannel = "usage"
17+
const (
18+
redisUsageChannel = "usage"
19+
redisErrorsChannel = "errors"
20+
)
1821

1922
type redisSubscriptionCommand struct {
2023
args []string
@@ -150,15 +153,15 @@ func (s *Server) handleRedisConnection(conn net.Conn, reader *bufio.Reader) {
150153
}
151154
continue
152155
}
153-
if !strings.EqualFold(channel, redisUsageChannel) {
156+
messages, unsubscribe, ok := subscribeRedisChannel(channel)
157+
if !ok {
154158
_ = writeRedisError(writer, fmt.Sprintf("ERR unsupported channel '%s'", channel))
155159
if !flush() {
156160
return
157161
}
158162
continue
159163
}
160-
messages, unsubscribe := redisqueue.SubscribeUsage()
161-
if errWrite := writeRedisPubSubSubscribe(writer, redisUsageChannel, 1); errWrite != nil {
164+
if errWrite := writeRedisPubSubSubscribe(writer, channel, 1); errWrite != nil {
162165
unsubscribe()
163166
log.Errorf("redis protocol subscribe response error: %v", errWrite)
164167
return
@@ -167,7 +170,7 @@ func (s *Server) handleRedisConnection(conn net.Conn, reader *bufio.Reader) {
167170
unsubscribe()
168171
return
169172
}
170-
s.streamRedisUsageSubscription(reader, writer, messages, unsubscribe)
173+
s.streamRedisSubscription(reader, writer, channel, messages, unsubscribe)
171174
return
172175
case "LPOP", "RPOP":
173176
count, hasCount, ok := parsePopCount(args)
@@ -185,7 +188,14 @@ func (s *Server) handleRedisConnection(conn net.Conn, reader *bufio.Reader) {
185188
}
186189
continue
187190
}
188-
items := redisqueue.PopOldest(count)
191+
items, ok := popRedisQueueItems(args[1], count)
192+
if !ok {
193+
_ = writeRedisError(writer, fmt.Sprintf("ERR unsupported channel '%s'", strings.TrimSpace(args[1])))
194+
if !flush() {
195+
return
196+
}
197+
continue
198+
}
189199
if hasCount {
190200
_ = writeRedisArrayOfBulkStrings(writer, items)
191201
if !flush() {
@@ -213,7 +223,29 @@ func (s *Server) handleRedisConnection(conn net.Conn, reader *bufio.Reader) {
213223
}
214224
}
215225

216-
func (s *Server) streamRedisUsageSubscription(reader *bufio.Reader, writer *bufio.Writer, messages <-chan []byte, unsubscribe func()) {
226+
func subscribeRedisChannel(channel string) (<-chan []byte, func(), bool) {
227+
switch strings.ToLower(strings.TrimSpace(channel)) {
228+
case redisUsageChannel:
229+
messages, unsubscribe := redisqueue.SubscribeUsage()
230+
return messages, unsubscribe, true
231+
case redisErrorsChannel:
232+
messages, unsubscribe := redisqueue.SubscribeErrors()
233+
return messages, unsubscribe, true
234+
default:
235+
return nil, nil, false
236+
}
237+
}
238+
239+
func popRedisQueueItems(channel string, count int) ([][]byte, bool) {
240+
switch strings.ToLower(strings.TrimSpace(channel)) {
241+
case redisUsageChannel:
242+
return redisqueue.PopOldest(count), true
243+
default:
244+
return nil, false
245+
}
246+
}
247+
248+
func (s *Server) streamRedisSubscription(reader *bufio.Reader, writer *bufio.Writer, channel string, messages <-chan []byte, unsubscribe func()) {
217249
if unsubscribe == nil {
218250
return
219251
}
@@ -231,7 +263,7 @@ func (s *Server) streamRedisUsageSubscription(reader *bufio.Reader, writer *bufi
231263
if !ok {
232264
return
233265
}
234-
if errWrite := writeRedisPubSubMessage(writer, redisUsageChannel, msg); errWrite != nil {
266+
if errWrite := writeRedisPubSubMessage(writer, channel, msg); errWrite != nil {
235267
log.Errorf("redis protocol publish message error: %v", errWrite)
236268
return
237269
}
@@ -243,7 +275,7 @@ func (s *Server) streamRedisUsageSubscription(reader *bufio.Reader, writer *bufi
243275
if !ok {
244276
return
245277
}
246-
keepOpen := handleRedisSubscriptionCommand(writer, command)
278+
keepOpen := handleRedisSubscriptionCommand(writer, channel, command)
247279
if errFlush := writer.Flush(); errFlush != nil {
248280
log.Errorf("redis protocol flush error: %v", errFlush)
249281
return
@@ -277,7 +309,7 @@ func readRedisSubscriptionCommands(reader *bufio.Reader, commands chan<- redisSu
277309
}
278310
}
279311

280-
func handleRedisSubscriptionCommand(writer *bufio.Writer, command redisSubscriptionCommand) bool {
312+
func handleRedisSubscriptionCommand(writer *bufio.Writer, channel string, command redisSubscriptionCommand) bool {
281313
if command.err != nil {
282314
_ = writeRedisError(writer, "ERR "+command.err.Error())
283315
return false
@@ -297,7 +329,7 @@ func handleRedisSubscriptionCommand(writer *bufio.Writer, command redisSubscript
297329
_ = writeRedisPubSubPong(writer, payload)
298330
return true
299331
case "UNSUBSCRIBE":
300-
_ = writeRedisPubSubUnsubscribe(writer, redisUsageChannel, 0)
332+
_ = writeRedisPubSubUnsubscribe(writer, channel, 0)
301333
return false
302334
case "QUIT":
303335
_ = writeRedisSimpleString(writer, "OK")

internal/api/redis_queue_protocol_integration_test.go

Lines changed: 63 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -359,6 +359,60 @@ func TestRedisProtocol_SUBSCRIBE_UsageSendsSupportRefresh(t *testing.T) {
359359
}
360360
}
361361

362+
func TestRedisProtocol_SUBSCRIBE_ErrorsReceivesErrorEvents(t *testing.T) {
363+
const managementPassword = "test-management-password"
364+
365+
t.Setenv("MANAGEMENT_PASSWORD", managementPassword)
366+
redisqueue.SetEnabled(false)
367+
t.Cleanup(func() { redisqueue.SetEnabled(false) })
368+
369+
server := newTestServer(t)
370+
if !server.managementRoutesEnabled.Load() {
371+
t.Fatalf("expected managementRoutesEnabled to be true")
372+
}
373+
374+
addr, stop := startRedisMuxListener(t, server)
375+
t.Cleanup(stop)
376+
377+
conn, errDial := net.DialTimeout("tcp", addr, time.Second)
378+
if errDial != nil {
379+
t.Fatalf("failed to dial redis listener: %v", errDial)
380+
}
381+
t.Cleanup(func() { _ = conn.Close() })
382+
383+
reader := bufio.NewReader(conn)
384+
_ = conn.SetDeadline(time.Now().Add(5 * time.Second))
385+
386+
if errWrite := writeTestRESPCommand(conn, "AUTH", managementPassword); errWrite != nil {
387+
t.Fatalf("failed to write AUTH command: %v", errWrite)
388+
}
389+
if msg, errRead := readTestRESPSimpleString(reader); errRead != nil {
390+
t.Fatalf("failed to read AUTH response: %v", errRead)
391+
} else if msg != "OK" {
392+
t.Fatalf("unexpected AUTH response: %q", msg)
393+
}
394+
395+
if errWrite := writeTestRESPCommand(conn, "SUBSCRIBE", "errors"); errWrite != nil {
396+
t.Fatalf("failed to write SUBSCRIBE command: %v", errWrite)
397+
}
398+
channel, subscriptions, errSubscribe := readTestRESPPubSubSubscribe(reader)
399+
if errSubscribe != nil {
400+
t.Fatalf("failed to read subscribe response: %v", errSubscribe)
401+
}
402+
if channel != "errors" || subscriptions != 1 {
403+
t.Fatalf("unexpected subscribe response channel=%q subscriptions=%d", channel, subscriptions)
404+
}
405+
406+
redisqueue.EnqueueError([]byte(`{"auth_index":"auth-1","status_code":401}`))
407+
channel, payload, errMessage := readTestRESPPubSubMessage(reader)
408+
if errMessage != nil {
409+
t.Fatalf("failed to read error message: %v", errMessage)
410+
}
411+
if channel != "errors" || string(payload) != `{"auth_index":"auth-1","status_code":401}` {
412+
t.Fatalf("unexpected error message channel=%q payload=%q", channel, string(payload))
413+
}
414+
}
415+
362416
func TestRedisProtocol_AUTH_And_PopContracts(t *testing.T) {
363417
const managementPassword = "test-management-password"
364418

@@ -450,4 +504,13 @@ func TestRedisProtocol_AUTH_And_PopContracts(t *testing.T) {
450504
if len(emptyItems) != 0 {
451505
t.Fatalf("expected empty array for empty queue with count, got %#v", emptyItems)
452506
}
507+
508+
if errWrite := writeTestRESPCommand(conn, "RPOP", "errors", "2"); errWrite != nil {
509+
t.Fatalf("failed to write RPOP errors count command: %v", errWrite)
510+
}
511+
if msg, errRead := readTestRESPError(reader); errRead != nil {
512+
t.Fatalf("failed to read RPOP errors response: %v", errRead)
513+
} else if msg != "ERR unsupported channel 'errors'" {
514+
t.Fatalf("unexpected RPOP errors response: %q", msg)
515+
}
453516
}

internal/redisqueue/queue.go

Lines changed: 25 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,7 @@ const (
1010
defaultRetentionSeconds int64 = 60
1111
maxRetentionSeconds int64 = 3600
1212
usageSubscriberBuffer = 256
13+
errorSubscriberBuffer = 256
1314

1415
usageSupportRefreshPayload = `{"support_refresh":true}`
1516
usageRefreshPayload = `{"refresh":true}`
@@ -32,6 +33,7 @@ var (
3233
enabled atomic.Bool
3334
retentionSeconds atomic.Int64
3435
global queue
36+
errorGlobal queue
3537
)
3638

3739
func init() {
@@ -42,6 +44,7 @@ func SetEnabled(value bool) {
4244
enabled.Store(value)
4345
if !value {
4446
global.clear()
47+
errorGlobal.clear()
4548
}
4649
}
4750

@@ -72,6 +75,16 @@ func Enqueue(payload []byte) {
7275
global.enqueue(payload)
7376
}
7477

78+
func EnqueueError(payload []byte) {
79+
if !Enabled() {
80+
return
81+
}
82+
if len(payload) == 0 {
83+
return
84+
}
85+
errorGlobal.publishToSubscribers(payload)
86+
}
87+
7588
func PopOldest(count int) [][]byte {
7689
if !Enabled() {
7790
return nil
@@ -83,7 +96,11 @@ func PopOldest(count int) [][]byte {
8396
}
8497

8598
func SubscribeUsage() (<-chan []byte, func()) {
86-
return global.subscribeUsage()
99+
return global.subscribe(usageSubscriberBuffer, []byte(usageSupportRefreshPayload))
100+
}
101+
102+
func SubscribeErrors() (<-chan []byte, func()) {
103+
return errorGlobal.subscribe(errorSubscriberBuffer, nil)
87104
}
88105

89106
func NotifyUsageRefresh() {
@@ -142,9 +159,11 @@ func (q *queue) publishToSubscribers(payload []byte) bool {
142159
return true
143160
}
144161

145-
func (q *queue) subscribeUsage() (<-chan []byte, func()) {
146-
subscriber := make(chan []byte, usageSubscriberBuffer)
147-
subscriber <- []byte(usageSupportRefreshPayload)
162+
func (q *queue) subscribe(buffer int, initialPayload []byte) (<-chan []byte, func()) {
163+
subscriber := make(chan []byte, buffer)
164+
if len(initialPayload) > 0 {
165+
subscriber <- append([]byte(nil), initialPayload...)
166+
}
148167

149168
q.mu.Lock()
150169
if q.subscribers == nil {
@@ -158,13 +177,13 @@ func (q *queue) subscribeUsage() (<-chan []byte, func()) {
158177
var once sync.Once
159178
unsubscribe := func() {
160179
once.Do(func() {
161-
q.unsubscribeUsage(id)
180+
q.unsubscribe(id)
162181
})
163182
}
164183
return subscriber, unsubscribe
165184
}
166185

167-
func (q *queue) unsubscribeUsage(id uint64) {
186+
func (q *queue) unsubscribe(id uint64) {
168187
q.mu.Lock()
169188
subscriber, ok := q.subscribers[id]
170189
if ok {

internal/redisqueue/queue_test.go

Lines changed: 45 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -39,6 +39,8 @@ func TestSetEnabledFalseClosesUsageSubscribers(t *testing.T) {
3939
withEnabledQueue(t, func() {
4040
subscriber, unsubscribe := SubscribeUsage()
4141
defer unsubscribe()
42+
errorSubscriber, unsubscribeErrors := SubscribeErrors()
43+
defer unsubscribeErrors()
4244

4345
requireUsageSubscriberPayload(t, subscriber, usageSupportRefreshPayload)
4446

@@ -52,19 +54,51 @@ func TestSetEnabledFalseClosesUsageSubscribers(t *testing.T) {
5254
case <-time.After(time.Second):
5355
t.Fatalf("timeout waiting for subscriber close")
5456
}
57+
58+
select {
59+
case _, ok := <-errorSubscriber:
60+
if ok {
61+
t.Fatalf("error subscriber channel remained open after SetEnabled(false)")
62+
}
63+
case <-time.After(time.Second):
64+
t.Fatalf("timeout waiting for error subscriber close")
65+
}
66+
})
67+
}
68+
69+
func TestEnqueueErrorBroadcastsToErrorSubscribersAndDiscardsWithoutSubscribers(t *testing.T) {
70+
withEnabledQueue(t, func() {
71+
subscriber, unsubscribe := SubscribeErrors()
72+
defer unsubscribe()
73+
74+
EnqueueError([]byte("error-record"))
75+
requireUsageSubscriberPayload(t, subscriber, "error-record")
76+
77+
unsubscribe()
78+
79+
EnqueueError([]byte("discarded-error"))
80+
requireErrorQueueEmpty(t)
5581
})
5682
}
5783

5884
func TestNotifyUsageRefreshBroadcastsOnlyToUsageSubscribers(t *testing.T) {
5985
withEnabledQueue(t, func() {
6086
subscriber, unsubscribe := SubscribeUsage()
6187
defer unsubscribe()
88+
errorSubscriber, unsubscribeErrors := SubscribeErrors()
89+
defer unsubscribeErrors()
6290

6391
requireUsageSubscriberPayload(t, subscriber, usageSupportRefreshPayload)
6492

6593
NotifyUsageRefresh()
6694
requireUsageSubscriberPayload(t, subscriber, usageRefreshPayload)
6795

96+
select {
97+
case got := <-errorSubscriber:
98+
t.Fatalf("error subscriber received usage refresh payload %q", string(got))
99+
default:
100+
}
101+
68102
unsubscribe()
69103
NotifyUsageRefresh()
70104
if items := PopOldest(1); len(items) != 0 {
@@ -88,3 +122,14 @@ func requireUsageSubscriberPayload(t *testing.T, subscriber <-chan []byte, want
88122
t.Fatalf("timeout waiting for subscriber payload %q", want)
89123
}
90124
}
125+
126+
func requireErrorQueueEmpty(t *testing.T) {
127+
t.Helper()
128+
129+
errorGlobal.mu.Lock()
130+
defer errorGlobal.mu.Unlock()
131+
132+
if len(errorGlobal.items)-errorGlobal.head != 0 {
133+
t.Fatalf("error queue retained %d item(s), want none", len(errorGlobal.items)-errorGlobal.head)
134+
}
135+
}

sdk/cliproxy/auth/conductor.go

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2523,6 +2523,7 @@ func (m *Manager) MarkResult(ctx context.Context, result Result) {
25232523
}
25242524

25252525
m.hook.OnResult(ctx, result)
2526+
m.publishErrorEvent(result, authSnapshot)
25262527
}
25272528

25282529
func ensureModelState(auth *Auth, model string) *ModelState {

0 commit comments

Comments
 (0)