Skip to content

Commit 03736fb

Browse files
committed
perf(redis): bounded Lua VM pool — r1 review fixes
Round-1 review fixes on commit 972e1da: 1. gemini medium @ redis_lua_pool.go get() — centralize binding/return Refactor the three-branch select (hit / nil-element fallback / empty pool) so the ctxBinding assignment and return happen exactly once after the select. Pure restructuring; behaviour and metrics are unchanged. 2. gemini medium @ redis_lua_pool.go put() — fast-path drop before reset Skip pls.reset() when len(p.idle) already equals maxIdle. The reset walks every globals / tables / metatables snapshot and is the most expensive part of put(); doing it for a state that will immediately be Close()'d in the channel-full branch is wasted work the EVAL hot path would feel under sustained saturation. Race-safety: len() on a buffered channel is an atomically-loaded snapshot, so the fast-path can lose a race with a concurrent recv (empty channel that we mistakenly skip-and-drop is fine — that's equivalent to picking up a tiny drop budget) or win a race with a concurrent send (full channel that we correctly drop — saves one reset). The existing select-based drop branch stays in place as the correctness fallback when the snapshot was stale. Caller audit (semantic change in put — drop happens before reset): only production caller is adapter/redis_lua.go:135 (`defer luaPool.put(pls)`). The caller does not touch pls after put; the externally visible behaviour (put returns, state may be reused later) is unchanged. No callers grep'd that depend on the reset-then-drop ordering. 3. golangci magic-number @ main.go — derive flag default from adapter.DefaultLuaPoolMaxIdle Export defaultLuaPoolMaxIdle → DefaultLuaPoolMaxIdle and use adapter.DefaultLuaPoolMaxIdle as the --redisLuaMaxIdleStates flag default. Single source of truth, removes the mnd lint. 4. golangci gci @ adapter/redis.go, main.go — auto-format `golangci-lint fmt` cleanup, no semantic change. Test: go test -race -count=1 -run "TestLua_Pool|TestLua_VMReuse" ./adapter -- green. golangci-lint run --config=.golangci.yaml ./adapter/ . -- 0 issues.
1 parent 972e1da commit 03736fb

4 files changed

Lines changed: 94 additions & 72 deletions

File tree

adapter/redis.go

Lines changed: 18 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -171,22 +171,22 @@ var txnApplyHandlers = map[string]txnCommandHandler{
171171
// single source of truth.
172172

173173
type RedisServer struct {
174-
listen net.Listener
175-
store store.MVCCStore
176-
coordinator kv.Coordinator
177-
readTracker *kv.ActiveTimestampTracker
178-
redisTranscoder *redisTranscoder
179-
pubsub *redisPubSub
180-
scriptMu sync.RWMutex
181-
scriptCache map[string]string
182-
luaPool *luaStatePool
183-
luaPoolOnce sync.Once
174+
listen net.Listener
175+
store store.MVCCStore
176+
coordinator kv.Coordinator
177+
readTracker *kv.ActiveTimestampTracker
178+
redisTranscoder *redisTranscoder
179+
pubsub *redisPubSub
180+
scriptMu sync.RWMutex
181+
scriptCache map[string]string
182+
luaPool *luaStatePool
183+
luaPoolOnce sync.Once
184184
// luaPoolMaxIdle is the configured cap on idle pooled *lua.LStates.
185185
// Set via WithLuaPoolMaxIdle before NewRedisServer materializes the
186-
// pool; getLuaPool falls back to defaultLuaPoolMaxIdle when the
186+
// pool; getLuaPool falls back to DefaultLuaPoolMaxIdle when the
187187
// value is non-positive (covers test fixtures that bypass
188188
// NewRedisServer).
189-
luaPoolMaxIdle int
189+
luaPoolMaxIdle int
190190
traceCommands bool
191191
traceSeq atomic.Uint64
192192
redisAddr string
@@ -290,7 +290,7 @@ func WithLuaFastPathObserver(observer monitoring.LuaFastPathObserver) RedisServe
290290
// empirically ~200 KiB) without bounding throughput: get() falls
291291
// through to a fresh allocation when the pool is empty, and put()
292292
// drops a state to the GC when the pool is full. n <= 0 is clamped
293-
// to defaultLuaPoolMaxIdle, matching newLuaStatePoolWithMaxIdle.
293+
// to DefaultLuaPoolMaxIdle, matching newLuaStatePoolWithMaxIdle.
294294
//
295295
// Passing this option overrides the default. The option records the
296296
// requested cap on the RedisServer; the pool itself is constructed
@@ -424,10 +424,10 @@ func NewRedisServer(listen net.Listener, redisAddr string, store store.MVCCStore
424424
// getLuaPool, which honors luaPoolMaxIdle the same way.
425425
luaPool: nil,
426426
traceCommands: os.Getenv("ELASTICKV_REDIS_TRACE") == "1",
427-
baseCtx: baseCtx,
428-
baseCancel: baseCancel,
429-
streamWaiters: newKeyWaiterRegistry(),
430-
zsetWaiters: newKeyWaiterRegistry(),
427+
baseCtx: baseCtx,
428+
baseCancel: baseCancel,
429+
streamWaiters: newKeyWaiterRegistry(),
430+
zsetWaiters: newKeyWaiterRegistry(),
431431
}
432432
r.relay.Bind(r.publishLocal)
433433

@@ -443,7 +443,7 @@ func NewRedisServer(listen net.Listener, redisAddr string, store store.MVCCStore
443443
}
444444
// Materialize the Lua VM pool after option processing so
445445
// WithLuaPoolMaxIdle can choose the cap. newLuaStatePoolWithMaxIdle
446-
// clamps non-positive values to defaultLuaPoolMaxIdle, so callers
446+
// clamps non-positive values to DefaultLuaPoolMaxIdle, so callers
447447
// that omit the option still get a sensible default. The
448448
// luaPoolOnce barrier in getLuaPool keeps test fixtures that build
449449
// a bare &RedisServer{} literal (and never call NewRedisServer)

adapter/redis_lua_pool.go

Lines changed: 46 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -88,7 +88,7 @@ const luaWhitelistedTableHint = 8
8888
// The registry-backed binding is also the reason redis.call is
8989
// lock-free in the hot path, unlike the first iteration which used
9090
// a package-level map guarded by sync.RWMutex.
91-
// defaultLuaPoolMaxIdle is the default upper bound on idle pooled
91+
// DefaultLuaPoolMaxIdle is the default upper bound on idle pooled
9292
// *lua.LState instances retained for reuse. Each pooled state holds
9393
// the base stdlib + redis/cjson/cmsgpack closures + per-state
9494
// snapshot tables (globals / tables / metatables); empirically ~200
@@ -103,7 +103,12 @@ const luaWhitelistedTableHint = 8
103103
// allocation (miss) and excess put() calls drop the state for the GC
104104
// (drop). The cap therefore controls memory floor, not throughput
105105
// ceiling.
106-
const defaultLuaPoolMaxIdle = 64
106+
//
107+
// Exported so main.go can use it as the default for the
108+
// --redisLuaMaxIdleStates flag instead of duplicating the literal
109+
// (which trips the mnd lint and creates a drift source if the
110+
// adapter-side default changes).
111+
const DefaultLuaPoolMaxIdle = 64
107112

108113
// luaStatePool pools *lua.LState instances to cut heap/GC pressure on
109114
// high-rate EVAL / EVALSHA workloads (e.g. BullMQ ~10 scripts/s, where
@@ -227,22 +232,22 @@ func (r *RedisServer) getLuaPool() *luaStatePool {
227232
return r.luaPool
228233
}
229234

230-
// newLuaStatePool returns a bounded pool sized at defaultLuaPoolMaxIdle.
235+
// newLuaStatePool returns a bounded pool sized at DefaultLuaPoolMaxIdle.
231236
// Used by test fixtures and any caller that does not thread a
232237
// configured cap through. Production wires the explicit cap via
233238
// newLuaStatePoolWithMaxIdle from NewRedisServer.
234239
func newLuaStatePool() *luaStatePool {
235-
return newLuaStatePoolWithMaxIdle(defaultLuaPoolMaxIdle)
240+
return newLuaStatePoolWithMaxIdle(DefaultLuaPoolMaxIdle)
236241
}
237242

238243
// newLuaStatePoolWithMaxIdle returns a pool whose idle backing
239244
// channel is sized at maxIdle. Non-positive values clamp to
240-
// defaultLuaPoolMaxIdle to keep get/put semantics well-defined
245+
// DefaultLuaPoolMaxIdle to keep get/put semantics well-defined
241246
// (cap=0 would make every put() drop, which is never what we want
242247
// — callers asking for "no pool" should bypass the pool entirely).
243248
func newLuaStatePoolWithMaxIdle(maxIdle int) *luaStatePool {
244249
if maxIdle < 1 {
245-
maxIdle = defaultLuaPoolMaxIdle
250+
maxIdle = DefaultLuaPoolMaxIdle
246251
}
247252
return &luaStatePool{
248253
idle: make(chan *pooledLuaState, maxIdle),
@@ -527,37 +532,51 @@ func resetTableContents(tbl *lua.LTable, originalFields map[lua.LValue]lua.LValu
527532
// pointer write to the state-local ctxBinding userdata -- no lock,
528533
// no global map.
529534
//
530-
// A non-blocking channel recv on an empty idle pool returns the
531-
// zero value (no element), which we treat as a miss. The defensive
532-
// nil guard preserves behaviour if a future refactor ever sends an
533-
// unexpected value through the channel.
535+
// A non-blocking recv that does not match either case fires the
536+
// default branch (allocation miss). The defensive nil guard
537+
// preserves behaviour if a future refactor ever sends an unexpected
538+
// value through the idle channel.
539+
//
540+
// The ctxBinding assignment and return are centralized after the
541+
// select so every path goes through one binding step. This was a
542+
// gemini r1 review nit: refactor for clarity rather than duplicate
543+
// the binding in each branch.
534544
func (p *luaStatePool) get(ctx *luaScriptContext) *pooledLuaState {
545+
var pls *pooledLuaState
535546
select {
536-
case pls := <-p.idle:
537-
if pls == nil {
547+
case pls = <-p.idle:
548+
if pls != nil {
549+
p.hits.Add(1)
550+
} else {
538551
// Defence in depth: a nil element on the channel is
539552
// treated as an allocation miss rather than a panic.
540553
p.misses.Add(1)
541554
pls = newPooledLuaState()
542-
pls.ctxBinding.Value = ctx
543-
return pls
544555
}
545-
p.hits.Add(1)
546-
pls.ctxBinding.Value = ctx
547-
return pls
548556
default:
549557
p.misses.Add(1)
550-
pls := newPooledLuaState()
551-
pls.ctxBinding.Value = ctx
552-
return pls
558+
pls = newPooledLuaState()
553559
}
560+
pls.ctxBinding.Value = ctx
561+
return pls
554562
}
555563

556564
// put resets the state and tries to return it to the pool. If the
557565
// state is closed (shouldn't happen on the happy path) or the idle
558566
// channel is full it is dropped so the GC can reclaim it; the drop
559567
// counter makes pool saturation observable so operators can tune
560568
// maxIdle.
569+
//
570+
// Fast-path: when len(p.idle) already equals maxIdle the put is
571+
// guaranteed to overflow, so skip the (non-trivial) pls.reset()
572+
// — restoring the globals / tables / metatables snapshots is
573+
// pointless work for a state that is about to be Close()'d. This is
574+
// a gemini r1 review optimisation; under saturation the EVAL hot
575+
// path spends much less CPU on doomed resets. The check is a racy
576+
// snapshot of len(); concurrent puts can still observe stale
577+
// "len < max" and race into the select below, where the same
578+
// channel-full guard catches them — so the fast-path is a strict
579+
// improvement, not a new correctness requirement.
561580
func (p *luaStatePool) put(pls *pooledLuaState) {
562581
if pls == nil || pls.state == nil {
563582
return
@@ -571,14 +590,17 @@ func (p *luaStatePool) put(pls *pooledLuaState) {
571590
if pls.state.IsClosed() {
572591
return
573592
}
593+
if len(p.idle) >= p.maxIdle {
594+
p.drops.Add(1)
595+
pls.state.Close()
596+
return
597+
}
574598
pls.reset()
575599
select {
576600
case p.idle <- pls:
577601
default:
578-
// Idle channel is full — drop the state. Close it so any
579-
// per-state resources are released eagerly; the channel
580-
// holds room only for steady-state reuse, not for warm
581-
// states left over from a burst peak.
602+
// Idle channel filled between the fast-path check and the
603+
// select (concurrent puts winning the race). Drop the state.
582604
p.drops.Add(1)
583605
pls.state.Close()
584606
}

adapter/redis_lua_pool_test.go

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -689,7 +689,7 @@ func TestLua_PoolBoundedOverflow(t *testing.T) {
689689
}
690690

691691
// TestLua_PoolBoundedClampsZeroAndNegative locks in the contract that
692-
// non-positive maxIdle values clamp to defaultLuaPoolMaxIdle rather
692+
// non-positive maxIdle values clamp to DefaultLuaPoolMaxIdle rather
693693
// than silently producing a cap=0 pool (which would make every put()
694694
// drop). Operators that misconfigure --redisLuaMaxIdleStates=0 get the
695695
// default behaviour, not a permanently disabled pool.
@@ -707,8 +707,8 @@ func TestLua_PoolBoundedClampsZeroAndNegative(t *testing.T) {
707707
t.Run(tc.name, func(t *testing.T) {
708708
t.Parallel()
709709
pool := newLuaStatePoolWithMaxIdle(tc.maxIdle)
710-
require.Equal(t, defaultLuaPoolMaxIdle, pool.MaxIdle(),
711-
"non-positive maxIdle must clamp to defaultLuaPoolMaxIdle")
710+
require.Equal(t, DefaultLuaPoolMaxIdle, pool.MaxIdle(),
711+
"non-positive maxIdle must clamp to DefaultLuaPoolMaxIdle")
712712
// And the pool must actually retain on put (proving cap > 0).
713713
pool.put(pool.get(nil))
714714
require.Equal(t, 1, pool.Idle(),

main.go

Lines changed: 27 additions & 27 deletions
Original file line numberDiff line numberDiff line change
@@ -80,33 +80,33 @@ func durationToTicks(timeout time.Duration, tick time.Duration, min int) int {
8080
}
8181

8282
var (
83-
myAddr = flag.String("address", "localhost:50051", "TCP host+port for this node")
84-
redisAddr = flag.String("redisAddress", "localhost:6379", "TCP host+port for redis")
85-
dynamoAddr = flag.String("dynamoAddress", "localhost:8000", "TCP host+port for DynamoDB-compatible API")
86-
s3Addr = flag.String("s3Address", "", "TCP host+port for S3-compatible API; empty to disable")
87-
s3Region = flag.String("s3Region", "us-east-1", "S3 signing region")
88-
s3CredsFile = flag.String("s3CredentialsFile", "", "Path to a JSON file containing static S3 credentials")
89-
s3PathStyleOnly = flag.Bool("s3PathStyleOnly", true, "Only accept path-style S3 requests")
90-
sqsAddr = flag.String("sqsAddress", "", "TCP host+port for SQS-compatible API; empty to disable")
91-
sqsRegion = flag.String("sqsRegion", "us-east-1", "SQS signing region")
92-
sqsCredsFile = flag.String("sqsCredentialsFile", "", "Path to a JSON file containing static SQS credentials")
93-
metricsAddr = flag.String("metricsAddress", "localhost:9090", "TCP host+port for Prometheus metrics")
94-
metricsToken = flag.String("metricsToken", "", "Bearer token for Prometheus metrics; required for non-loopback metricsAddress")
95-
pprofAddr = flag.String("pprofAddress", "localhost:6060", "TCP host+port for pprof debug endpoints; empty to disable")
96-
pprofToken = flag.String("pprofToken", "", "Bearer token for pprof; required for non-loopback pprofAddress")
97-
raftId = flag.String("raftId", "", "Node id used by Raft")
98-
raftEngineName = flag.String("raftEngine", string(raftEngineEtcd), "Raft engine implementation (etcd|hashicorp)")
99-
raftDir = flag.String("raftDataDir", "data/", "Raft data dir")
100-
redisLuaMaxIdleStates = flag.Int("redisLuaMaxIdleStates", 64, "Maximum number of idle *lua.LState instances retained by the Redis Lua VM pool. Each state holds ~200 KiB; lower values reduce steady-state memory at the cost of more allocations under burst, higher values absorb bursts at the cost of memory floor. Non-positive values clamp to the default.")
101-
raftBootstrap = flag.Bool("raftBootstrap", false, "Whether to bootstrap the Raft cluster")
102-
raftBootstrapMembers = flag.String("raftBootstrapMembers", "", "Comma-separated bootstrap raft members (raftID=host:port,...)")
103-
raftJoinAsLearner = flag.Bool("raftJoinAsLearner", false, "Local node expects to join an existing cluster as a learner; if a post-apply ConfState lists this node as a voter instead, an ERROR-level alarm fires (the node keeps running -- the flag is an operator alarm, not a consensus veto). See docs/design/2026_04_26_proposed_raft_learner.md §4.5.")
104-
raftGroups = flag.String("raftGroups", "", "Comma-separated raft groups (groupID=host:port,...)")
105-
shardRanges = flag.String("shardRanges", "", "Comma-separated shard ranges (start:end=groupID,...)")
106-
raftRedisMap = flag.String("raftRedisMap", "", "Map of Raft address to Redis address (raftAddr=redisAddr,...)")
107-
raftS3Map = flag.String("raftS3Map", "", "Map of Raft address to S3 address (raftAddr=s3Addr,...)")
108-
raftDynamoMap = flag.String("raftDynamoMap", "", "Map of Raft address to DynamoDB address (raftAddr=dynamoAddr,...)")
109-
raftSqsMap = flag.String("raftSqsMap", "", "Map of Raft address to SQS address (raftAddr=sqsAddr,...)")
83+
myAddr = flag.String("address", "localhost:50051", "TCP host+port for this node")
84+
redisAddr = flag.String("redisAddress", "localhost:6379", "TCP host+port for redis")
85+
dynamoAddr = flag.String("dynamoAddress", "localhost:8000", "TCP host+port for DynamoDB-compatible API")
86+
s3Addr = flag.String("s3Address", "", "TCP host+port for S3-compatible API; empty to disable")
87+
s3Region = flag.String("s3Region", "us-east-1", "S3 signing region")
88+
s3CredsFile = flag.String("s3CredentialsFile", "", "Path to a JSON file containing static S3 credentials")
89+
s3PathStyleOnly = flag.Bool("s3PathStyleOnly", true, "Only accept path-style S3 requests")
90+
sqsAddr = flag.String("sqsAddress", "", "TCP host+port for SQS-compatible API; empty to disable")
91+
sqsRegion = flag.String("sqsRegion", "us-east-1", "SQS signing region")
92+
sqsCredsFile = flag.String("sqsCredentialsFile", "", "Path to a JSON file containing static SQS credentials")
93+
metricsAddr = flag.String("metricsAddress", "localhost:9090", "TCP host+port for Prometheus metrics")
94+
metricsToken = flag.String("metricsToken", "", "Bearer token for Prometheus metrics; required for non-loopback metricsAddress")
95+
pprofAddr = flag.String("pprofAddress", "localhost:6060", "TCP host+port for pprof debug endpoints; empty to disable")
96+
pprofToken = flag.String("pprofToken", "", "Bearer token for pprof; required for non-loopback pprofAddress")
97+
raftId = flag.String("raftId", "", "Node id used by Raft")
98+
raftEngineName = flag.String("raftEngine", string(raftEngineEtcd), "Raft engine implementation (etcd|hashicorp)")
99+
raftDir = flag.String("raftDataDir", "data/", "Raft data dir")
100+
redisLuaMaxIdleStates = flag.Int("redisLuaMaxIdleStates", adapter.DefaultLuaPoolMaxIdle, "Maximum number of idle *lua.LState instances retained by the Redis Lua VM pool. Each state holds ~200 KiB; lower values reduce steady-state memory at the cost of more allocations under burst, higher values absorb bursts at the cost of memory floor. Non-positive values clamp to the default.")
101+
raftBootstrap = flag.Bool("raftBootstrap", false, "Whether to bootstrap the Raft cluster")
102+
raftBootstrapMembers = flag.String("raftBootstrapMembers", "", "Comma-separated bootstrap raft members (raftID=host:port,...)")
103+
raftJoinAsLearner = flag.Bool("raftJoinAsLearner", false, "Local node expects to join an existing cluster as a learner; if a post-apply ConfState lists this node as a voter instead, an ERROR-level alarm fires (the node keeps running -- the flag is an operator alarm, not a consensus veto). See docs/design/2026_04_26_proposed_raft_learner.md §4.5.")
104+
raftGroups = flag.String("raftGroups", "", "Comma-separated raft groups (groupID=host:port,...)")
105+
shardRanges = flag.String("shardRanges", "", "Comma-separated shard ranges (start:end=groupID,...)")
106+
raftRedisMap = flag.String("raftRedisMap", "", "Map of Raft address to Redis address (raftAddr=redisAddr,...)")
107+
raftS3Map = flag.String("raftS3Map", "", "Map of Raft address to S3 address (raftAddr=s3Addr,...)")
108+
raftDynamoMap = flag.String("raftDynamoMap", "", "Map of Raft address to DynamoDB address (raftAddr=dynamoAddr,...)")
109+
raftSqsMap = flag.String("raftSqsMap", "", "Map of Raft address to SQS address (raftAddr=sqsAddr,...)")
110110
// HT-FIFO partition assignment (Phase 3.D §5). Distinct from
111111
// --raftSqsMap (which maps raftAddr=sqsAddr for the
112112
// proxyToLeader endpoint resolution). The grammar is

0 commit comments

Comments
 (0)