Skip to content

Commit 1568458

Browse files
committed
Review fixes: correctness, concurrency, performance, and test coverage
Correctness: - Remove redis.Nil double-wrapping in DualWriter methods - Add empty transaction result handling in execTxn - Add WATCH/UNWATCH to command table - Intercept HELLO to prevent RESP3 upgrade - Reject SELECT for non-configured DB - Fix UNSUBSCRIBE to emit per-argument replies Concurrency/distributed failures: - Split asyncSem into writeSem + shadowSem to prevent starvation - Add DualWriter.Close() with WaitGroup for graceful shutdown drain - Add AsyncDrops metric for semaphore backpressure visibility Performance: - Uppercase command name once in handleCommand, pass through to DualWriter - Remove channel label from PubSubShadowDivergences to prevent cardinality explosion Test coverage: - Add writeRedisValue/writeRedisError/writeResponse tests - Add forwardMessages message/pmessage branch tests - Add reenterPubSub error path tests - Add truncateValue type coverage tests - Add ClassifyCommand tests for new commands - Add Pipeline transport error test
1 parent 3cce409 commit 1568458

10 files changed

Lines changed: 638 additions & 182 deletions

File tree

cmd/redis-proxy/main.go

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -86,6 +86,7 @@ func run() error {
8686
defer secondary.Close()
8787

8888
dual := proxy.NewDualWriter(primary, secondary, cfg, metrics, sentryReporter, logger)
89+
defer dual.Close() // wait for in-flight async goroutines
8990
srv := proxy.NewProxyServer(cfg, dual, metrics, sentryReporter, logger)
9091

9192
// Context for graceful shutdown

proxy/command.go

Lines changed: 189 additions & 80 deletions
Original file line numberDiff line numberDiff line change
@@ -8,128 +8,237 @@ type CommandCategory int
88
const (
99
CmdRead CommandCategory = iota // GET, HGET, LRANGE, ZRANGE, etc.
1010
CmdWrite // SET, DEL, HSET, LPUSH, ZADD, etc.
11-
CmdBlocking // BZPOPMIN, XREAD (with BLOCK)
11+
CmdBlocking // BLPOP, BRPOP, BZPOPMIN, XREAD (with BLOCK)
1212
CmdPubSub // SUBSCRIBE, UNSUBSCRIBE, PSUBSCRIBE, PUNSUBSCRIBE, PUBSUB (note: PUBLISH is CmdWrite)
1313
CmdAdmin // PING, INFO, CLIENT, SELECT, QUIT, DBSIZE, SCAN, AUTH
1414
CmdTxn // MULTI, EXEC, DISCARD
1515
CmdScript // EVAL, EVALSHA
1616
)
1717

1818
var commandTable = map[string]CommandCategory{
19-
// Read commands
20-
"GET": CmdRead,
21-
"GETDEL": CmdWrite, // read+write → write
22-
"HGET": CmdRead,
23-
"HGETALL": CmdRead,
24-
"HEXISTS": CmdRead,
25-
"HLEN": CmdRead,
26-
"HMGET": CmdRead,
27-
"EXISTS": CmdRead,
28-
"KEYS": CmdRead,
29-
"LINDEX": CmdRead,
30-
"LLEN": CmdRead,
31-
"LPOS": CmdRead,
32-
"LRANGE": CmdRead,
33-
"PTTL": CmdRead,
34-
"TTL": CmdRead,
35-
"TYPE": CmdRead,
36-
"SCARD": CmdRead,
37-
"SISMEMBER": CmdRead,
38-
"SMEMBERS": CmdRead,
39-
"XLEN": CmdRead,
40-
"XRANGE": CmdRead,
41-
"XREVRANGE": CmdRead,
42-
"ZCARD": CmdRead,
43-
"ZCOUNT": CmdRead,
44-
"ZRANGE": CmdRead,
45-
"ZRANGEBYSCORE": CmdRead,
46-
"ZREVRANGE": CmdRead,
47-
"ZREVRANGEBYSCORE": CmdRead,
48-
"ZSCORE": CmdRead,
49-
"PFCOUNT": CmdRead,
19+
// ---- Read commands ----
20+
"GET": CmdRead,
21+
"GETRANGE": CmdRead,
22+
"MGET": CmdRead,
23+
"STRLEN": CmdRead,
24+
"HGET": CmdRead,
25+
"HGETALL": CmdRead,
26+
"HEXISTS": CmdRead,
27+
"HLEN": CmdRead,
28+
"HMGET": CmdRead,
29+
"HKEYS": CmdRead,
30+
"HVALS": CmdRead,
31+
"HRANDFIELD": CmdRead,
32+
"HSCAN": CmdRead,
33+
"EXISTS": CmdRead,
34+
"KEYS": CmdRead,
35+
"RANDOMKEY": CmdRead,
36+
"LINDEX": CmdRead,
37+
"LLEN": CmdRead,
38+
"LPOS": CmdRead,
39+
"LRANGE": CmdRead,
40+
"PTTL": CmdRead,
41+
"TTL": CmdRead,
42+
"TYPE": CmdRead,
43+
"SCARD": CmdRead,
44+
"SISMEMBER": CmdRead,
45+
"SMISMEMBER": CmdRead,
46+
"SMEMBERS": CmdRead,
47+
"SRANDMEMBER": CmdRead,
48+
"SDIFF": CmdRead,
49+
"SINTER": CmdRead,
50+
"SINTERCARD": CmdRead,
51+
"SUNION": CmdRead,
52+
"SSCAN": CmdRead,
53+
"XLEN": CmdRead,
54+
"XRANGE": CmdRead,
55+
"XREVRANGE": CmdRead,
56+
"XINFO": CmdRead,
57+
"XPENDING": CmdRead,
58+
"ZCARD": CmdRead,
59+
"ZCOUNT": CmdRead,
60+
"ZLEXCOUNT": CmdRead,
61+
"ZMSCORE": CmdRead,
62+
"ZRANGE": CmdRead,
63+
"ZRANGEBYSCORE": CmdRead,
64+
"ZRANGEBYLEX": CmdRead,
65+
"ZREVRANGE": CmdRead,
66+
"ZREVRANGEBYSCORE": CmdRead,
67+
"ZREVRANGEBYLEX": CmdRead,
68+
"ZRANK": CmdRead,
69+
"ZREVRANK": CmdRead,
70+
"ZSCORE": CmdRead,
71+
"ZSCAN": CmdRead,
72+
"ZDIFF": CmdRead,
73+
"PFCOUNT": CmdRead,
74+
"TOUCH": CmdRead,
75+
"DUMP": CmdRead,
76+
"GEODIST": CmdRead,
77+
"GEOHASH": CmdRead,
78+
"GEOPOS": CmdRead,
79+
"GEOSEARCH": CmdRead,
80+
"GEORADIUS_RO": CmdRead,
81+
"GEORADIUSBYMEMBER_RO": CmdRead,
82+
"OBJECT": CmdRead,
83+
"SORT_RO": CmdRead,
84+
"SUBSTR": CmdRead,
5085

51-
// Write commands
52-
"SET": CmdWrite,
53-
"SETEX": CmdWrite,
54-
"SETNX": CmdWrite,
55-
"DEL": CmdWrite,
56-
"HSET": CmdWrite,
57-
"HMSET": CmdWrite,
58-
"HDEL": CmdWrite,
59-
"HINCRBY": CmdWrite,
60-
"INCR": CmdWrite,
61-
"LPUSH": CmdWrite,
62-
"LPOP": CmdWrite,
63-
"RPUSH": CmdWrite,
64-
"RPOP": CmdWrite,
65-
"RPOPLPUSH": CmdWrite,
66-
"LREM": CmdWrite,
67-
"LSET": CmdWrite,
68-
"LTRIM": CmdWrite,
69-
"SADD": CmdWrite,
70-
"SREM": CmdWrite,
71-
"EXPIRE": CmdWrite,
72-
"PEXPIRE": CmdWrite,
73-
"RENAME": CmdWrite,
74-
"XADD": CmdWrite,
75-
"XTRIM": CmdWrite,
76-
"ZADD": CmdWrite,
77-
"ZINCRBY": CmdWrite,
78-
"ZREM": CmdWrite,
79-
"ZREMRANGEBYSCORE": CmdWrite,
80-
"ZREMRANGEBYRANK": CmdWrite,
81-
"ZPOPMIN": CmdWrite,
82-
"PFADD": CmdWrite,
83-
"FLUSHALL": CmdWrite,
84-
"FLUSHDB": CmdWrite,
85-
"PUBLISH": CmdWrite, // write to both backends
86+
// ---- Write commands ----
87+
"SET": CmdWrite,
88+
"SETEX": CmdWrite,
89+
"PSETEX": CmdWrite,
90+
"SETNX": CmdWrite,
91+
"SETRANGE": CmdWrite,
92+
"MSET": CmdWrite,
93+
"MSETNX": CmdWrite,
94+
"APPEND": CmdWrite,
95+
"GETSET": CmdWrite,
96+
"GETEX": CmdWrite,
97+
"GETDEL": CmdWrite,
98+
"DEL": CmdWrite,
99+
"UNLINK": CmdWrite,
100+
"COPY": CmdWrite,
101+
"RENAME": CmdWrite,
102+
"RENAMENX": CmdWrite,
103+
"RESTORE": CmdWrite,
104+
"INCR": CmdWrite,
105+
"INCRBY": CmdWrite,
106+
"INCRBYFLOAT": CmdWrite,
107+
"DECR": CmdWrite,
108+
"DECRBY": CmdWrite,
109+
"HSET": CmdWrite,
110+
"HMSET": CmdWrite,
111+
"HDEL": CmdWrite,
112+
"HINCRBY": CmdWrite,
113+
"HINCRBYFLOAT": CmdWrite,
114+
"HSETNX": CmdWrite,
115+
"LPUSH": CmdWrite,
116+
"LPUSHX": CmdWrite,
117+
"LPOP": CmdWrite,
118+
"RPUSH": CmdWrite,
119+
"RPUSHX": CmdWrite,
120+
"RPOP": CmdWrite,
121+
"RPOPLPUSH": CmdWrite,
122+
"LMOVE": CmdWrite,
123+
"LREM": CmdWrite,
124+
"LSET": CmdWrite,
125+
"LTRIM": CmdWrite,
126+
"LINSERT": CmdWrite,
127+
"SADD": CmdWrite,
128+
"SREM": CmdWrite,
129+
"SPOP": CmdWrite,
130+
"SMOVE": CmdWrite,
131+
"SDIFFSTORE": CmdWrite,
132+
"SINTERSTORE": CmdWrite,
133+
"SUNIONSTORE": CmdWrite,
134+
"EXPIRE": CmdWrite,
135+
"PEXPIRE": CmdWrite,
136+
"EXPIREAT": CmdWrite,
137+
"PEXPIREAT": CmdWrite,
138+
"PERSIST": CmdWrite,
139+
"SORT": CmdWrite,
140+
"XADD": CmdWrite,
141+
"XTRIM": CmdWrite,
142+
"XACK": CmdWrite,
143+
"XDEL": CmdWrite,
144+
"XGROUP": CmdWrite,
145+
"XCLAIM": CmdWrite,
146+
"XAUTOCLAIM": CmdWrite,
147+
"ZADD": CmdWrite,
148+
"ZINCRBY": CmdWrite,
149+
"ZREM": CmdWrite,
150+
"ZREMRANGEBYSCORE": CmdWrite,
151+
"ZREMRANGEBYRANK": CmdWrite,
152+
"ZREMRANGEBYLEX": CmdWrite,
153+
"ZPOPMIN": CmdWrite,
154+
"ZPOPMAX": CmdWrite,
155+
"ZRANGESTORE": CmdWrite,
156+
"ZUNIONSTORE": CmdWrite,
157+
"ZINTERSTORE": CmdWrite,
158+
"ZDIFFSTORE": CmdWrite,
159+
"PFADD": CmdWrite,
160+
"PFMERGE": CmdWrite,
161+
"GEOADD": CmdWrite,
162+
"GEORADIUS": CmdWrite,
163+
"GEORADIUSBYMEMBER": CmdWrite,
164+
"GEOSEARCHSTORE": CmdWrite,
165+
"FLUSHALL": CmdWrite,
166+
"FLUSHDB": CmdWrite,
167+
"PUBLISH": CmdWrite, // write to both backends
86168

87-
// Blocking commands
88-
"BZPOPMIN": CmdBlocking,
89-
// XREAD is handled specially in ClassifyCommand (BLOCK arg check)
169+
// ---- Blocking commands ----
170+
"BLPOP": CmdBlocking,
171+
"BRPOP": CmdBlocking,
172+
"BRPOPLPUSH": CmdBlocking,
173+
"BLMOVE": CmdBlocking,
174+
"BLMPOP": CmdBlocking,
175+
"BZPOPMIN": CmdBlocking,
176+
"BZPOPMAX": CmdBlocking,
177+
// XREAD/XREADGROUP handled specially in ClassifyCommand (BLOCK arg check)
90178

91-
// PubSub commands
179+
// ---- PubSub commands ----
92180
"SUBSCRIBE": CmdPubSub,
93181
"UNSUBSCRIBE": CmdPubSub,
94182
"PSUBSCRIBE": CmdPubSub,
95183
"PUNSUBSCRIBE": CmdPubSub,
184+
"SSUBSCRIBE": CmdPubSub,
185+
"SUNSUBSCRIBE": CmdPubSub,
96186
"PUBSUB": CmdPubSub,
97187

98-
// Admin commands — forwarded to primary only
188+
// ---- Admin commands — forwarded to primary only ----
99189
"PING": CmdAdmin,
190+
"ECHO": CmdAdmin,
100191
"INFO": CmdAdmin,
101192
"CLIENT": CmdAdmin,
102193
"SELECT": CmdAdmin,
103194
"QUIT": CmdAdmin,
195+
"RESET": CmdAdmin,
104196
"DBSIZE": CmdAdmin,
105197
"SCAN": CmdAdmin,
106198
"AUTH": CmdAdmin,
107199
"HELLO": CmdAdmin,
108200
"WAIT": CmdAdmin,
109201
"CONFIG": CmdAdmin,
110-
"OBJECT": CmdAdmin,
111202
"DEBUG": CmdAdmin,
112203
"CLUSTER": CmdAdmin,
113204
"COMMAND": CmdAdmin,
205+
"TIME": CmdAdmin,
206+
"SLOWLOG": CmdAdmin,
207+
"MEMORY": CmdAdmin,
208+
"LATENCY": CmdAdmin,
209+
"MODULE": CmdAdmin,
210+
"ACL": CmdAdmin,
211+
"SWAPDB": CmdAdmin,
212+
// WATCH/UNWATCH: connection-scoped optimistic locking.
213+
// Forwarded to primary only since the proxy uses a shared connection pool
214+
// and per-connection WATCH state cannot be maintained.
215+
"WATCH": CmdAdmin,
216+
"UNWATCH": CmdAdmin,
114217

115-
// Transaction commands
218+
// ---- Transaction commands ----
116219
"MULTI": CmdTxn,
117220
"EXEC": CmdTxn,
118221
"DISCARD": CmdTxn,
119222

120-
// Script commands
121-
"EVAL": CmdScript,
122-
"EVALSHA": CmdScript,
223+
// ---- Script commands ----
224+
"EVAL": CmdScript,
225+
"EVALSHA": CmdScript,
226+
"EVALRO": CmdScript,
227+
"EVALSHAro": CmdScript,
228+
"SCRIPT": CmdScript,
229+
"FUNCTION": CmdScript,
230+
"FCALL": CmdScript,
231+
"FCALL_RO": CmdScript,
123232
}
124233

125234
// ClassifyCommand returns the category for a Redis command name.
126-
// XREAD is classified as CmdBlocking if args contain BLOCK, otherwise CmdRead.
235+
// XREAD/XREADGROUP is classified as CmdBlocking if args contain BLOCK, otherwise CmdRead.
127236
// Unknown commands default to CmdWrite (sent to both backends).
128237
func ClassifyCommand(name string, args [][]byte) CommandCategory {
129238
upper := strings.ToUpper(name)
130239

131-
// Special case: XREAD with BLOCK
132-
if upper == "XREAD" {
240+
// Special case: XREAD/XREADGROUP with BLOCK
241+
if upper == "XREAD" || upper == "XREADGROUP" {
133242
for _, arg := range args {
134243
if strings.ToUpper(string(arg)) == "BLOCK" {
135244
return CmdBlocking

0 commit comments

Comments
 (0)