Skip to content

Commit f389fc6

Browse files
committed
Fix Pipeline redis.Error wrapping and separate state from writeMu
- Pipeline now returns results with nil error for redis.Error/redis.Nil, only propagating transport/context errors. This preserves Redis transaction semantics in execTxn callers. - handleUnsub and writeUnsubAll now mutate goroutine-confined state and pre-compute counts before acquiring writeMu, keeping writeMu strictly for dconn write serialization.
1 parent 77ee8a1 commit f389fc6

2 files changed

Lines changed: 28 additions & 11 deletions

File tree

proxy/backend.go

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@ package proxy
22

33
import (
44
"context"
5+
"errors"
56
"fmt"
67
"time"
78

@@ -92,6 +93,13 @@ func (b *RedisBackend) Pipeline(ctx context.Context, cmds [][]any) ([]*redis.Cmd
9293
}
9394
_, err := pipe.Exec(ctx)
9495
if err != nil {
96+
// go-redis pipelines return redis.Error for Redis reply errors (e.g., EXECABORT).
97+
// Return results with nil error so callers can read per-command results (especially EXEC).
98+
// Only propagate true transport/context errors.
99+
var redisErr redis.Error
100+
if errors.As(err, &redisErr) || errors.Is(err, redis.Nil) {
101+
return results, nil
102+
}
95103
return results, fmt.Errorf("pipeline exec: %w", err)
96104
}
97105
return results, nil

proxy/pubsub.go

Lines changed: 20 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -496,18 +496,22 @@ func (s *pubsubSession) handleUnsub(args [][]byte, isPattern bool) {
496496
s.writeRedisError(err)
497497
return
498498
}
499-
// Update state then write replies.
500-
s.writeMu.Lock()
501-
for _, n := range names {
499+
// Update state (goroutine-confined) and pre-compute counts before taking writeMu.
500+
counts := make([]int, len(names))
501+
for i, n := range names {
502502
if isPattern {
503503
delete(s.patternSet, n)
504504
} else {
505505
delete(s.channelSet, n)
506506
}
507+
counts[i] = s.subCount()
508+
}
509+
s.writeMu.Lock()
510+
for i, n := range names {
507511
s.dconn.WriteArray(pubsubArrayReply)
508512
s.dconn.WriteBulkString(kind)
509513
s.dconn.WriteBulkString(n)
510-
s.dconn.WriteInt64(int64(s.subCount()))
514+
s.dconn.WriteInt64(int64(counts[i]))
511515
}
512516
_ = s.dconn.Flush()
513517
s.writeMu.Unlock()
@@ -521,37 +525,42 @@ func (s *pubsubSession) writeUnsubAll(kind string, isPattern bool) {
521525
set = s.patternSet
522526
}
523527

524-
s.writeMu.Lock()
525-
defer s.writeMu.Unlock()
526-
527528
if len(set) == 0 {
528529
// No subscriptions: single reply with null channel (matching Redis).
530+
s.writeMu.Lock()
529531
s.dconn.WriteArray(pubsubArrayReply)
530532
s.dconn.WriteBulkString(kind)
531533
s.dconn.WriteNull()
532534
s.dconn.WriteInt64(int64(s.subCount()))
533535
_ = s.dconn.Flush()
536+
s.writeMu.Unlock()
534537
return
535538
}
536539

537-
// Collect names, then remove one-by-one to decrement count per reply.
540+
// Collect names and pre-compute decreasing counts (state is goroutine-confined).
538541
names := make([]string, 0, len(set))
539542
for n := range set {
540543
names = append(names, n)
541544
}
542-
543-
for _, n := range names {
545+
counts := make([]int, len(names))
546+
for i, n := range names {
544547
if isPattern {
545548
delete(s.patternSet, n)
546549
} else {
547550
delete(s.channelSet, n)
548551
}
552+
counts[i] = s.subCount()
553+
}
554+
555+
s.writeMu.Lock()
556+
for i, n := range names {
549557
s.dconn.WriteArray(pubsubArrayReply)
550558
s.dconn.WriteBulkString(kind)
551559
s.dconn.WriteBulkString(n)
552-
s.dconn.WriteInt64(int64(s.subCount()))
560+
s.dconn.WriteInt64(int64(counts[i]))
553561
}
554562
_ = s.dconn.Flush()
563+
s.writeMu.Unlock()
555564
}
556565

557566
// --- Ping handlers ---

0 commit comments

Comments
 (0)