Skip to content

Commit 18e1aaa

Browse files
committed
Refactor error handling in pubsub session
1 parent 4308912 commit 18e1aaa

3 files changed

Lines changed: 16 additions & 37 deletions

File tree

proxy/proxy.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -231,7 +231,7 @@ func (p *ProxyServer) startPubSubSession(conn redcon.Conn, cmdName string, args
231231
}
232232
if err != nil {
233233
upstream.Close()
234-
conn.WriteError("ERR " + err.Error())
234+
writeRedisError(conn, err)
235235
return
236236
}
237237

proxy/pubsub.go

Lines changed: 15 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@ package proxy
22

33
import (
44
"context"
5+
"errors"
56
"fmt"
67
"log/slog"
78
"strings"
@@ -326,7 +327,7 @@ func (s *pubsubSession) reenterPubSub(cmdName string, args [][]byte) {
326327
}
327328
if err != nil {
328329
upstream.Close()
329-
s.writeError("ERR " + err.Error())
330+
s.writeRedisError(err)
330331
return
331332
}
332333

@@ -401,6 +402,7 @@ func (s *pubsubSession) handleSubscribe(args [][]byte) {
401402
channels := byteSlicesToStrings(args[1:])
402403
if err := s.upstream.Subscribe(context.Background(), channels...); err != nil {
403404
s.logger.Warn("upstream subscribe failed", "err", err)
405+
s.writeRedisError(err)
404406
return
405407
}
406408
s.mu.Lock()
@@ -424,6 +426,7 @@ func (s *pubsubSession) handlePSubscribe(args [][]byte) {
424426
pats := byteSlicesToStrings(args[1:])
425427
if err := s.upstream.PSubscribe(context.Background(), pats...); err != nil {
426428
s.logger.Warn("upstream psubscribe failed", "err", err)
429+
s.writeRedisError(err)
427430
return
428431
}
429432
s.mu.Lock()
@@ -563,6 +566,17 @@ func (s *pubsubSession) writeError(msg string) {
563566
_ = s.dconn.Flush()
564567
}
565568

569+
// writeRedisError writes an upstream error, preserving redis.Error prefixes verbatim
570+
// and normalizing other errors to "ERR ..." (matching writeRedisError in proxy.go).
571+
func (s *pubsubSession) writeRedisError(err error) {
572+
var redisErr redis.Error
573+
if errors.As(err, &redisErr) {
574+
s.writeError(redisErr.Error())
575+
return
576+
}
577+
s.writeError("ERR " + err.Error())
578+
}
579+
566580
func (s *pubsubSession) writeString(msg string) {
567581
s.mu.Lock()
568582
defer s.mu.Unlock()

proxy/pubsub_test.go

Lines changed: 0 additions & 35 deletions
Original file line numberDiff line numberDiff line change
@@ -484,41 +484,6 @@ func TestPubSub_SelectAuthSilentlyAccepted(t *testing.T) {
484484
}
485485
}
486486

487-
// TestPubSub_DuplicateSubscribeDoesNotOvercount verifies that subscribing to the
488-
// same channel multiple times doesn't inflate the subscription count.
489-
func TestPubSub_DuplicateSubscribeDoesNotOvercount(t *testing.T) {
490-
s := newTestSession(newMockDetachedConn())
491-
492-
// First subscribe
493-
s.channelSet["ch1"] = struct{}{}
494-
assert.Equal(t, 1, s.subCount())
495-
496-
// Duplicate subscribe (idempotent)
497-
s.channelSet["ch1"] = struct{}{}
498-
assert.Equal(t, 1, s.subCount(), "duplicate subscribe must not increase count")
499-
500-
// Add new channel
501-
s.channelSet["ch2"] = struct{}{}
502-
assert.Equal(t, 2, s.subCount())
503-
}
504-
505-
// TestPubSub_UnsubscribeNonExistent verifies that unsubscribing from a channel
506-
// that was never subscribed does not affect the count.
507-
func TestPubSub_UnsubscribeNonExistent(t *testing.T) {
508-
s := newTestSession(newMockDetachedConn())
509-
510-
s.channelSet["ch1"] = struct{}{}
511-
s.channelSet["ch2"] = struct{}{}
512-
513-
// Delete non-existent: no panic, no effect
514-
delete(s.channelSet, "never-subscribed")
515-
assert.Equal(t, 2, s.subCount())
516-
517-
// Delete existing
518-
delete(s.channelSet, "ch1")
519-
assert.Equal(t, 1, s.subCount())
520-
}
521-
522487
// TestPubSub_CleanupClosesUpstream verifies that cleanup closes upstream and dconn.
523488
func TestPubSub_CleanupClosesUpstream(t *testing.T) {
524489
dconn := newMockDetachedConn()

0 commit comments

Comments
 (0)