Skip to content

Commit 972e1da

Browse files
committed
perf(redis): bounded Lua VM pool with configurable max-idle cap
Replace the sync.Pool-backed luaStatePool with a buffered-channel pool sized at a configurable cap. The previous implementation had two operational issues visible from the production 5-node cluster: - No operator-tunable upper bound. sync.Pool's only governor was the Go GC; under steady high-rate EVAL load the pool retained however many states the GC happened not to clear that cycle, so the memory floor moved with allocation rate and GOGC. - No saturation signal. There was no way to observe whether the pool was being right-sized for the workload — Hits()/Misses() showed reuse rate but never said "this many puts were rejected because the pool was at capacity." New pool: - Backing: buffered chan *pooledLuaState of capacity maxIdle. - Default cap: 64. Empirically each pooled state holds ~200 KiB of long-lived heap (base stdlib + redis/cjson/cmsgpack closures + per-state snapshot tables); 64 covers redcon's typical per-connection EVAL concurrency without retaining a long warm tail after bursts. - get(): non-blocking recv; on empty channel falls through to a fresh allocation (records a Miss). Throughput is therefore not capped — only the steady-state idle count is. - put(): non-blocking send; on full channel drops the state, closes it eagerly, and increments Drops. Drops > 0 is the "raise maxIdle" diagnostic. - Idle() and MaxIdle() exposed for metrics / admin diagnostics. Configuration: - main.go: new --redisLuaMaxIdleStates int flag (default 64). - adapter.WithLuaPoolMaxIdle(n int) RedisServerOption records the cap; NewRedisServer materializes the pool AFTER the option loop so the cap takes effect. Test fixtures that bypass NewRedisServer get the default via getLuaPool's lazy init. - Non-positive values clamp to defaultLuaPoolMaxIdle so a misconfigured --redisLuaMaxIdleStates=0 yields the default rather than a permanently-disabled pool (which would silently drop every put). Caller audit (semantic change — sync.Pool → bounded channel): - All in-tree callers of newLuaStatePool / *luaStatePool are in adapter/redis*.go and adapter/redis_lua*_test.go. The pool's Get/Put surface is unchanged; new accessors (Drops, Idle, MaxIdle) are additive. - newLuaStatePool() keeps its no-arg signature for test fixtures; its behaviour is unchanged (defaults to defaultLuaPoolMaxIdle, which is large enough to retain anything the existing test suite puts back). - The production hot path (adapter/redis_lua.go runLuaScript) only calls Get/Put — no behavioural change. - Behavioural difference vs sync.Pool that callers MAY observe: the bounded pool will not retain states beyond maxIdle. The existing TestLua_PoolRecordsReuseVsAllocation tolerates GC- driven drops, so it stays green. No test asserted "the pool retains states past GC," which would have been incorrect under sync.Pool's documented contract anyway. Tests: - All existing TestLua_VMReuse* / TestLua_PoolSerial* / TestLua_PoolRecordsReuseVsAllocation continue to pass. - New TestLua_PoolBoundedOverflow locks in the cap invariant: 2*maxIdle puts retain exactly maxIdle, surplus increments Drops, draining produces exactly maxIdle hits. - New TestLua_PoolBoundedClampsZeroAndNegative locks in the non-positive→default clamp so a misconfigured CLI doesn't silently disable pooling. Self-review (CLAUDE.md 5 lenses): 1. Data loss — N/A. Pool drops only reset/closed *lua.LStates; no committed state lives there. 2. Concurrency — channel ops are O(1) atomic CAS; no new locks introduced. Race tests pass. 3. Performance — channel ops are comparable to sync.Pool's per-P slabs; the change does not alter the hot path's allocation pattern. Pool saturation now produces eager state.Close() on overflow, freeing per-state resources sooner than GC would. 4. Data consistency — N/A. 5. Test coverage — two new table-driven tests cover the new bounded invariants; all existing pool tests retained. Test command: go test -race -count=1 -short ./adapter -- 564s, all green. go vet ./adapter/ -- clean. go build ./... -- clean.
1 parent 12ec8ba commit 972e1da

4 files changed

Lines changed: 230 additions & 41 deletions

File tree

adapter/redis.go

Lines changed: 37 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -181,6 +181,12 @@ type RedisServer struct {
181181
scriptCache map[string]string
182182
luaPool *luaStatePool
183183
luaPoolOnce sync.Once
184+
// luaPoolMaxIdle is the configured cap on idle pooled *lua.LStates.
185+
// Set via WithLuaPoolMaxIdle before NewRedisServer materializes the
186+
// pool; getLuaPool falls back to defaultLuaPoolMaxIdle when the
187+
// value is non-positive (covers test fixtures that bypass
188+
// NewRedisServer).
189+
luaPoolMaxIdle int
184190
traceCommands bool
185191
traceSeq atomic.Uint64
186192
redisAddr string
@@ -278,6 +284,23 @@ func WithLuaFastPathObserver(observer monitoring.LuaFastPathObserver) RedisServe
278284
}
279285
}
280286

287+
// WithLuaPoolMaxIdle caps the number of idle *lua.LState instances
288+
// the Lua VM pool retains between EVALs. The cap controls the steady-
289+
// state memory floor of the pool (maxIdle * per-state footprint —
290+
// empirically ~200 KiB) without bounding throughput: get() falls
291+
// through to a fresh allocation when the pool is empty, and put()
292+
// drops a state to the GC when the pool is full. n <= 0 is clamped
293+
// to defaultLuaPoolMaxIdle, matching newLuaStatePoolWithMaxIdle.
294+
//
295+
// Passing this option overrides the default. The option records the
296+
// requested cap on the RedisServer; the pool itself is constructed
297+
// after all options are applied so the recorded cap takes effect.
298+
func WithLuaPoolMaxIdle(n int) RedisServerOption {
299+
return func(r *RedisServer) {
300+
r.luaPoolMaxIdle = n
301+
}
302+
}
303+
281304
// luaFastPathCmdZRangeByScore is the shared label for ZRANGEBYSCORE
282305
// and ZREVRANGEBYSCORE fast-path outcomes. Both directions take the
283306
// same branch through zsetRangeByScoreFast so sharing one label
@@ -395,8 +418,12 @@ func NewRedisServer(listen net.Listener, redisAddr string, store store.MVCCStore
395418
leaderClients: make(map[string]*redis.Client),
396419
pubsub: newRedisPubSub(),
397420
scriptCache: map[string]string{},
398-
luaPool: newLuaStatePool(),
399-
traceCommands: os.Getenv("ELASTICKV_REDIS_TRACE") == "1",
421+
// luaPool is materialized after the option loop so
422+
// WithLuaPoolMaxIdle can influence its sizing. Test fixtures
423+
// that bypass NewRedisServer construct the pool lazily via
424+
// getLuaPool, which honors luaPoolMaxIdle the same way.
425+
luaPool: nil,
426+
traceCommands: os.Getenv("ELASTICKV_REDIS_TRACE") == "1",
400427
baseCtx: baseCtx,
401428
baseCancel: baseCancel,
402429
streamWaiters: newKeyWaiterRegistry(),
@@ -414,6 +441,14 @@ func NewRedisServer(listen net.Listener, redisAddr string, store store.MVCCStore
414441
opt(r)
415442
}
416443
}
444+
// Materialize the Lua VM pool after option processing so
445+
// WithLuaPoolMaxIdle can choose the cap. newLuaStatePoolWithMaxIdle
446+
// clamps non-positive values to defaultLuaPoolMaxIdle, so callers
447+
// that omit the option still get a sensible default. The
448+
// luaPoolOnce barrier in getLuaPool keeps test fixtures that build
449+
// a bare &RedisServer{} literal (and never call NewRedisServer)
450+
// from racing on the same field.
451+
r.luaPool = newLuaStatePoolWithMaxIdle(r.luaPoolMaxIdle)
417452

418453
return r
419454
}

adapter/redis_lua_pool.go

Lines changed: 102 additions & 39 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,6 @@
11
package adapter
22

33
import (
4-
"sync"
54
"sync/atomic"
65

76
lua "github.com/yuin/gopher-lua"
@@ -89,12 +88,46 @@ const luaWhitelistedTableHint = 8
8988
// The registry-backed binding is also the reason redis.call is
9089
// lock-free in the hot path, unlike the first iteration which used
9190
// a package-level map guarded by sync.RWMutex.
91+
// defaultLuaPoolMaxIdle is the default upper bound on idle pooled
92+
// *lua.LState instances retained for reuse. Each pooled state holds
93+
// the base stdlib + redis/cjson/cmsgpack closures + per-state
94+
// snapshot tables (globals / tables / metatables); empirically ~200
95+
// KiB of long-lived heap per state. 64 is sized to comfortably cover
96+
// typical Redis-side EVAL/EVALSHA concurrency (one in-flight script
97+
// per connection up to redcon's default worker pool) without
98+
// retaining a long tail of warm states after a burst subsides.
99+
//
100+
// Operators expecting sustained higher concurrency can raise the cap
101+
// with --redisLuaMaxIdleStates; concurrency that exceeds the cap
102+
// still works correctly — excess get() calls fall through to a fresh
103+
// allocation (miss) and excess put() calls drop the state for the GC
104+
// (drop). The cap therefore controls memory floor, not throughput
105+
// ceiling.
106+
const defaultLuaPoolMaxIdle = 64
107+
108+
// luaStatePool pools *lua.LState instances to cut heap/GC pressure on
109+
// high-rate EVAL / EVALSHA workloads (e.g. BullMQ ~10 scripts/s, where
110+
// each fresh state allocs ~34% of in-use heap via newFuncContext,
111+
// newRegistry, newFunctionProto).
112+
//
113+
// Internal storage is a buffered channel of capacity maxIdle. We
114+
// previously used sync.Pool, which GC-clears on every cycle and has
115+
// no operator-tunable capacity; the bounded channel gives a
116+
// predictable memory floor (maxIdle * per-state footprint) and an
117+
// observable "states dropped because the pool was full" counter that
118+
// makes mis-sizing visible. The channel ops are O(1) atomic CAS on
119+
// the buffered chan; under high concurrency the contention is
120+
// comparable to sync.Pool's per-P slabs and well below the cost of
121+
// a single Lua eval. See TestLua_PoolBoundedOverflow for the
122+
// invariants.
92123
type luaStatePool struct {
93-
pool sync.Pool
124+
idle chan *pooledLuaState
125+
maxIdle int
94126

95-
// hits / misses are exposed for tests and metrics.
127+
// hits / misses / drops are exposed for tests and metrics.
96128
hits atomic.Uint64
97129
misses atomic.Uint64
130+
drops atomic.Uint64
98131
}
99132

100133
// pooledLuaState wraps a *lua.LState plus the immutable snapshot of
@@ -188,23 +221,33 @@ func luaLookupContext(state *lua.LState) (*luaScriptContext, bool) {
188221
func (r *RedisServer) getLuaPool() *luaStatePool {
189222
r.luaPoolOnce.Do(func() {
190223
if r.luaPool == nil {
191-
r.luaPool = newLuaStatePool()
224+
r.luaPool = newLuaStatePoolWithMaxIdle(r.luaPoolMaxIdle)
192225
}
193226
})
194227
return r.luaPool
195228
}
196229

197-
// newLuaStatePool returns a pool that lazily allocates
198-
// *pooledLuaState instances on demand. The pool deliberately does NOT
199-
// set sync.Pool.New: if it did, p.pool.Get() would auto-invoke the
200-
// constructor on an empty pool and we could not distinguish a fresh
201-
// allocation from a reused instance. Instead, get() inspects the
202-
// result of p.pool.Get() -- a nil return signals an empty pool and
203-
// drives the miss counter plus an explicit newPooledLuaState() call.
204-
// This keeps the hit/miss metrics honest, which is what the serial
205-
// reuse tests and the observability counters rely on.
230+
// newLuaStatePool returns a bounded pool sized at defaultLuaPoolMaxIdle.
231+
// Used by test fixtures and any caller that does not thread a
232+
// configured cap through. Production wires the explicit cap via
233+
// newLuaStatePoolWithMaxIdle from NewRedisServer.
206234
func newLuaStatePool() *luaStatePool {
207-
return &luaStatePool{}
235+
return newLuaStatePoolWithMaxIdle(defaultLuaPoolMaxIdle)
236+
}
237+
238+
// newLuaStatePoolWithMaxIdle returns a pool whose idle backing
239+
// channel is sized at maxIdle. Non-positive values clamp to
240+
// defaultLuaPoolMaxIdle to keep get/put semantics well-defined
241+
// (cap=0 would make every put() drop, which is never what we want
242+
// — callers asking for "no pool" should bypass the pool entirely).
243+
func newLuaStatePoolWithMaxIdle(maxIdle int) *luaStatePool {
244+
if maxIdle < 1 {
245+
maxIdle = defaultLuaPoolMaxIdle
246+
}
247+
return &luaStatePool{
248+
idle: make(chan *pooledLuaState, maxIdle),
249+
maxIdle: maxIdle,
250+
}
208251
}
209252

210253
// newPooledLuaState builds a fresh pooled state: base libs, dangerous
@@ -484,36 +527,37 @@ func resetTableContents(tbl *lua.LTable, originalFields map[lua.LValue]lua.LValu
484527
// pointer write to the state-local ctxBinding userdata -- no lock,
485528
// no global map.
486529
//
487-
// Because newLuaStatePool does NOT set sync.Pool.New, p.pool.Get()
488-
// returns nil when the pool is empty; that is the signal for a miss
489-
// (fresh allocation). A non-nil return is a genuine reuse and counts
490-
// as a hit. The defensive type-assertion guard preserves behaviour if
491-
// a future refactor ever puts something unexpected into the pool.
530+
// A non-blocking channel recv on an empty idle pool returns the
531+
// zero value (no element), which we treat as a miss. The defensive
532+
// nil guard preserves behaviour if a future refactor ever sends an
533+
// unexpected value through the channel.
492534
func (p *luaStatePool) get(ctx *luaScriptContext) *pooledLuaState {
493-
v := p.pool.Get()
494-
if v == nil {
495-
p.misses.Add(1)
496-
pls := newPooledLuaState()
535+
select {
536+
case pls := <-p.idle:
537+
if pls == nil {
538+
// Defence in depth: a nil element on the channel is
539+
// treated as an allocation miss rather than a panic.
540+
p.misses.Add(1)
541+
pls = newPooledLuaState()
542+
pls.ctxBinding.Value = ctx
543+
return pls
544+
}
545+
p.hits.Add(1)
497546
pls.ctxBinding.Value = ctx
498547
return pls
499-
}
500-
pls, ok := v.(*pooledLuaState)
501-
if !ok || pls == nil {
502-
// Defence in depth: anything other than a *pooledLuaState is
503-
// treated as an allocation miss rather than a silent hit.
548+
default:
504549
p.misses.Add(1)
505-
pls = newPooledLuaState()
550+
pls := newPooledLuaState()
506551
pls.ctxBinding.Value = ctx
507552
return pls
508553
}
509-
p.hits.Add(1)
510-
pls.ctxBinding.Value = ctx
511-
return pls
512554
}
513555

514-
// put resets the state and returns it to the pool. If the state is
515-
// somehow closed (shouldn't happen on the happy path), it is dropped
516-
// so a dead VM is never handed out again.
556+
// put resets the state and tries to return it to the pool. If the
557+
// state is closed (shouldn't happen on the happy path) or the idle
558+
// channel is full it is dropped so the GC can reclaim it; the drop
559+
// counter makes pool saturation observable so operators can tune
560+
// maxIdle.
517561
func (p *luaStatePool) put(pls *pooledLuaState) {
518562
if pls == nil || pls.state == nil {
519563
return
@@ -528,14 +572,33 @@ func (p *luaStatePool) put(pls *pooledLuaState) {
528572
return
529573
}
530574
pls.reset()
531-
p.pool.Put(pls)
575+
select {
576+
case p.idle <- pls:
577+
default:
578+
// Idle channel is full — drop the state. Close it so any
579+
// per-state resources are released eagerly; the channel
580+
// holds room only for steady-state reuse, not for warm
581+
// states left over from a burst peak.
582+
p.drops.Add(1)
583+
pls.state.Close()
584+
}
532585
}
533586

534-
// Hits / Misses are test hooks. They count Get() outcomes, not
535-
// allocations proper, but in practice they track allocation avoidance
536-
// well enough for the "is the pool actually being used?" test.
587+
// Hits / Misses / Drops are test hooks. They count Get/Put outcomes,
588+
// not allocations proper, but in practice they track allocation
589+
// avoidance well enough for the "is the pool actually being used?"
590+
// test and the "is maxIdle too low for the workload?" diagnostic.
537591
func (p *luaStatePool) Hits() uint64 { return p.hits.Load() }
538592
func (p *luaStatePool) Misses() uint64 { return p.misses.Load() }
593+
func (p *luaStatePool) Drops() uint64 { return p.drops.Load() }
594+
595+
// Idle reports the number of states currently sitting in the pool.
596+
// Useful for metrics gauges and for tests asserting bounded retention.
597+
func (p *luaStatePool) Idle() int { return len(p.idle) }
598+
599+
// MaxIdle reports the configured cap. Exposed for diagnostics so
600+
// /admin can surface "(idle / maxIdle)" to operators.
601+
func (p *luaStatePool) MaxIdle() int { return p.maxIdle }
539602

540603
// registerPooledRedisModule installs redis.call / redis.pcall /
541604
// redis.sha1hex / redis.status_reply / redis.error_reply where the

adapter/redis_lua_pool_test.go

Lines changed: 89 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -627,3 +627,92 @@ func TestLua_VMReuseClearsContext(t *testing.T) {
627627
"pooled LState leaked the original request ctx across put")
628628
}
629629
}
630+
631+
// TestLua_PoolBoundedOverflow is the load-bearing invariant for the
632+
// channel-backed bounded pool. With sync.Pool the upper bound on
633+
// retained states was implicit (GC-driven, varies per cycle); the new
634+
// implementation must:
635+
//
636+
// 1. Retain exactly maxIdle put() arrivals; the (maxIdle+1)th arrival
637+
// drops cleanly and is reflected in Drops().
638+
// 2. Continue to serve subsequent get() calls from the retained pool
639+
// until the pool is drained.
640+
// 3. Never panic or leak a half-bound state when a put() overflows.
641+
func TestLua_PoolBoundedOverflow(t *testing.T) {
642+
t.Parallel()
643+
644+
const maxIdle = 4
645+
pool := newLuaStatePoolWithMaxIdle(maxIdle)
646+
require.Equal(t, maxIdle, pool.MaxIdle(),
647+
"MaxIdle must echo the configured cap so operators can verify it via metrics")
648+
649+
// Acquire 2*maxIdle distinct states. Because the empty pool
650+
// allocates a fresh state on every get(), these are all misses
651+
// and all references are independent.
652+
const acquired = 2 * maxIdle
653+
plsList := make([]*pooledLuaState, 0, acquired)
654+
for i := 0; i < acquired; i++ {
655+
plsList = append(plsList, pool.get(nil))
656+
}
657+
require.Equal(t, uint64(0), pool.Hits(), "no put() yet, so no get() can be a hit")
658+
require.Equal(t, uint64(acquired), pool.Misses(),
659+
"every empty-pool get() must record a miss")
660+
require.Equal(t, 0, pool.Idle(), "idle count must be zero before any put()")
661+
662+
// Release all 2*maxIdle back. Only maxIdle should survive in the
663+
// pool; the rest drop and increment the drop counter.
664+
for _, pls := range plsList {
665+
pool.put(pls)
666+
}
667+
require.Equal(t, maxIdle, pool.Idle(),
668+
"pool must retain exactly maxIdle states after %d puts on a cap=%d pool", acquired, maxIdle)
669+
require.Equal(t, uint64(acquired-maxIdle), pool.Drops(),
670+
"overflow puts must increment the drop counter so saturation is observable")
671+
672+
// Draining the pool must produce maxIdle hits and then fall through
673+
// to misses, proving the surviving states are usable, not corrupted
674+
// by the overflow path.
675+
hitsBefore := pool.Hits()
676+
for i := 0; i < maxIdle; i++ {
677+
drained := pool.get(nil)
678+
require.NotNil(t, drained, "drain get must return a usable state")
679+
}
680+
require.Equal(t, uint64(maxIdle), pool.Hits()-hitsBefore,
681+
"draining the pool must produce exactly maxIdle hits")
682+
require.Equal(t, 0, pool.Idle(), "pool must be empty after draining maxIdle states")
683+
684+
// One more get must miss (allocate fresh), confirming the bounded
685+
// pool does not silently grow above maxIdle.
686+
pool.get(nil)
687+
require.Equal(t, uint64(acquired+1), pool.Misses(),
688+
"first get() after full drain must record a miss")
689+
}
690+
691+
// TestLua_PoolBoundedClampsZeroAndNegative locks in the contract that
692+
// non-positive maxIdle values clamp to defaultLuaPoolMaxIdle rather
693+
// than silently producing a cap=0 pool (which would make every put()
694+
// drop). Operators that misconfigure --redisLuaMaxIdleStates=0 get the
695+
// default behaviour, not a permanently disabled pool.
696+
func TestLua_PoolBoundedClampsZeroAndNegative(t *testing.T) {
697+
t.Parallel()
698+
699+
cases := []struct {
700+
name string
701+
maxIdle int
702+
}{
703+
{"zero clamps to default", 0},
704+
{"negative clamps to default", -7},
705+
}
706+
for _, tc := range cases {
707+
t.Run(tc.name, func(t *testing.T) {
708+
t.Parallel()
709+
pool := newLuaStatePoolWithMaxIdle(tc.maxIdle)
710+
require.Equal(t, defaultLuaPoolMaxIdle, pool.MaxIdle(),
711+
"non-positive maxIdle must clamp to defaultLuaPoolMaxIdle")
712+
// And the pool must actually retain on put (proving cap > 0).
713+
pool.put(pool.get(nil))
714+
require.Equal(t, 1, pool.Idle(),
715+
"clamped pool must retain put() arrivals, not silently drop them")
716+
})
717+
}
718+
}

main.go

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -97,6 +97,7 @@ var (
9797
raftId = flag.String("raftId", "", "Node id used by Raft")
9898
raftEngineName = flag.String("raftEngine", string(raftEngineEtcd), "Raft engine implementation (etcd|hashicorp)")
9999
raftDir = flag.String("raftDataDir", "data/", "Raft data dir")
100+
redisLuaMaxIdleStates = flag.Int("redisLuaMaxIdleStates", 64, "Maximum number of idle *lua.LState instances retained by the Redis Lua VM pool. Each state holds ~200 KiB; lower values reduce steady-state memory at the cost of more allocations under burst, higher values absorb bursts at the cost of memory floor. Non-positive values clamp to the default.")
100101
raftBootstrap = flag.Bool("raftBootstrap", false, "Whether to bootstrap the Raft cluster")
101102
raftBootstrapMembers = flag.String("raftBootstrapMembers", "", "Comma-separated bootstrap raft members (raftID=host:port,...)")
102103
raftJoinAsLearner = flag.Bool("raftJoinAsLearner", false, "Local node expects to join an existing cluster as a learner; if a post-apply ConfState lists this node as a voter instead, an ERROR-level alarm fires (the node keeps running -- the flag is an operator alarm, not a consensus veto). See docs/design/2026_04_26_proposed_raft_learner.md §4.5.")
@@ -1524,6 +1525,7 @@ func startRedisServer(ctx context.Context, lc *net.ListenConfig, eg *errgroup.Gr
15241525
adapter.WithLuaObserver(metricsRegistry.LuaObserver()),
15251526
adapter.WithLuaFastPathObserver(metricsRegistry.LuaFastPathObserver()),
15261527
adapter.WithRedisCompactor(deltaCompactor),
1528+
adapter.WithLuaPoolMaxIdle(*redisLuaMaxIdleStates),
15271529
)
15281530
eg.Go(func() error {
15291531
defer redisServer.Stop()

0 commit comments

Comments
 (0)