Skip to content

Commit 6cc55f2

Browse files
authored
Merge pull request opensandbox-group#986 from asiudgufgbukbsa/feat/pool-acquire-min-remaining-ttl
feat(sdks/kotlin): add acquireMinRemainingTtl to pool to skip near-ex…
2 parents 5a5043f + e5026d1 commit 6cc55f2

26 files changed

Lines changed: 2149 additions & 64 deletions

File tree

sdks/sandbox/kotlin/sandbox-pool-redis/src/main/kotlin/com/alibaba/opensandbox/sandbox/infrastructure/pool/RedisPoolStateStore.kt

Lines changed: 96 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@ import com.alibaba.opensandbox.sandbox.domain.exceptions.PoolStateStoreUnavailab
2020
import com.alibaba.opensandbox.sandbox.domain.pool.IdleEntry
2121
import com.alibaba.opensandbox.sandbox.domain.pool.PoolStateStore
2222
import com.alibaba.opensandbox.sandbox.domain.pool.StoreCounters
23+
import com.alibaba.opensandbox.sandbox.domain.pool.TakeIdleResult
2324
import redis.clients.jedis.UnifiedJedis
2425
import redis.clients.jedis.params.SetParams
2526
import java.time.Duration
@@ -55,11 +56,49 @@ class RedisPoolStateStore(
5556
redis.eval(
5657
TAKE_IDLE_SCRIPT,
5758
listOf(idleListKey(poolName), idleExpiresKey(poolName)),
58-
emptyList<String>(),
59+
listOf("0"),
5960
)
60-
result as? String
61+
decodeTakeIdleResult(result).sandboxId
6162
}
6263

64+
override fun tryTakeIdle(
65+
poolName: String,
66+
minRemainingTtl: Duration,
67+
): TakeIdleResult {
68+
if (minRemainingTtl.isNegative || minRemainingTtl.isZero) {
69+
return TakeIdleResult.of(tryTakeIdle(poolName))
70+
}
71+
val minRemainingTtlMs = minRemainingTtl.toMillis().coerceAtLeast(0).toString()
72+
return execute("tryTakeIdle", poolName) {
73+
val result =
74+
redis.eval(
75+
TAKE_IDLE_SCRIPT,
76+
listOf(idleListKey(poolName), idleExpiresKey(poolName)),
77+
listOf(minRemainingTtlMs),
78+
)
79+
decodeTakeIdleResult(result)
80+
}
81+
}
82+
83+
/**
84+
* Decodes the Lua return value into [TakeIdleResult]. The script returns either nil (empty
85+
* pool with no discarded entries) or a two-element array whose first slot holds the taken
86+
* sandbox id (or an empty string when no entry satisfied the threshold but discarded-alive
87+
* ids still need to be reported) and whose second slot holds the discarded-alive list.
88+
* The empty-string sentinel is needed because Redis cannot return a nil literal inside an
89+
* array reliably across clients.
90+
*/
91+
@Suppress("UNCHECKED_CAST")
92+
private fun decodeTakeIdleResult(result: Any?): TakeIdleResult {
93+
if (result == null) return TakeIdleResult.EMPTY
94+
val list = result as? List<Any?> ?: return TakeIdleResult.of(result as? String)
95+
if (list.isEmpty()) return TakeIdleResult.EMPTY
96+
val takenRaw = list[0] as? String
97+
val taken = takenRaw?.takeIf { it.isNotEmpty() }
98+
val discarded = (list.getOrNull(1) as? List<Any?>)?.mapNotNull { it as? String } ?: emptyList()
99+
return TakeIdleResult(sandboxId = taken, discardedAliveSandboxIds = discarded)
100+
}
101+
63102
override fun putIdle(
64103
poolName: String,
65104
sandboxId: String,
@@ -132,15 +171,36 @@ class RedisPoolStateStore(
132171
poolName: String,
133172
now: Instant,
134173
) {
135-
execute("reapExpiredIdle", poolName) {
136-
redis.eval(
137-
REAP_EXPIRED_SCRIPT,
138-
listOf(idleListKey(poolName), idleExpiresKey(poolName)),
139-
emptyList<String>(),
140-
)
174+
reapIdle(poolName, "0")
175+
}
176+
177+
override fun reapExpiredIdle(
178+
poolName: String,
179+
now: Instant,
180+
minRemainingTtl: Duration,
181+
): List<String> {
182+
if (minRemainingTtl.isNegative || minRemainingTtl.isZero) {
183+
reapIdle(poolName, "0")
184+
return emptyList()
141185
}
186+
return reapIdle(poolName, minRemainingTtl.toMillis().coerceAtLeast(0).toString())
142187
}
143188

189+
@Suppress("UNCHECKED_CAST")
190+
private fun reapIdle(
191+
poolName: String,
192+
minRemainingTtlMs: String,
193+
): List<String> =
194+
execute("reapExpiredIdle", poolName) {
195+
val result =
196+
redis.eval(
197+
REAP_EXPIRED_SCRIPT,
198+
listOf(idleListKey(poolName), idleExpiresKey(poolName)),
199+
listOf(minRemainingTtlMs),
200+
)
201+
(result as? List<Any?>)?.mapNotNull { it as? String } ?: emptyList()
202+
}
203+
144204
override fun snapshotCounters(poolName: String): StoreCounters =
145205
execute("snapshotCounters", poolName) {
146206
StoreCounters(redis.hlen(idleExpiresKey(poolName)).toInt())
@@ -240,16 +300,29 @@ class RedisPoolStateStore(
240300
"""
241301
local redis_time = redis.call('TIME')
242302
local now_ms = tonumber(redis_time[1]) * 1000 + math.floor(tonumber(redis_time[2]) / 1000)
303+
local min_remaining_ttl_ms = tonumber(ARGV[1]) or 0
304+
local cutoff_ms = now_ms + min_remaining_ttl_ms
305+
-- Drop empty entries straight to nil so clients see the empty-pool case clearly.
306+
local discarded_alive = {}
243307
while true do
244308
local sandbox_id = redis.call('LPOP', KEYS[1])
245309
if not sandbox_id then
246-
return nil
310+
if #discarded_alive == 0 then
311+
return nil
312+
end
313+
return {'', discarded_alive}
247314
end
248315
local expires_at = redis.call('HGET', KEYS[2], sandbox_id)
249316
if expires_at then
250317
redis.call('HDEL', KEYS[2], sandbox_id)
251-
if tonumber(expires_at) > now_ms then
252-
return sandbox_id
318+
local exp = tonumber(expires_at)
319+
if exp > cutoff_ms then
320+
return {sandbox_id, discarded_alive}
321+
end
322+
-- Below threshold. Surface alive entries so the caller can kill them; drop
323+
-- already-expired ones silently — the server has reaped them.
324+
if exp > now_ms then
325+
table.insert(discarded_alive, sandbox_id)
253326
end
254327
end
255328
end
@@ -292,14 +365,22 @@ class RedisPoolStateStore(
292365
"""
293366
local redis_time = redis.call('TIME')
294367
local now_ms = tonumber(redis_time[1]) * 1000 + math.floor(tonumber(redis_time[2]) / 1000)
368+
local min_remaining_ttl_ms = tonumber(ARGV[1]) or 0
369+
local cutoff_ms = now_ms + min_remaining_ttl_ms
370+
local discarded_alive = {}
295371
local entries = redis.call('HGETALL', KEYS[2])
296372
for i = 1, #entries, 2 do
297-
if tonumber(entries[i + 1]) <= now_ms then
298-
redis.call('HDEL', KEYS[2], entries[i])
299-
redis.call('LREM', KEYS[1], 0, entries[i])
373+
local sandbox_id = entries[i]
374+
local exp = tonumber(entries[i + 1])
375+
if exp <= cutoff_ms then
376+
redis.call('HDEL', KEYS[2], sandbox_id)
377+
redis.call('LREM', KEYS[1], 0, sandbox_id)
378+
if exp > now_ms then
379+
table.insert(discarded_alive, sandbox_id)
380+
end
300381
end
301382
end
302-
return 1
383+
return discarded_alive
303384
"""
304385
}
305386
}

sdks/sandbox/kotlin/sandbox-pool-redis/src/test/kotlin/com/alibaba/opensandbox/sandbox/infrastructure/pool/RedisPoolStateStoreTest.kt

Lines changed: 86 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -136,6 +136,92 @@ class RedisPoolStateStoreTest {
136136
assertEquals(7, stateStore.getMaxIdle(poolName))
137137
}
138138

139+
@Test
140+
fun `tryTakeIdle surfaces alive entries below the threshold so callers can kill them`() {
141+
val stateStore = requireStore()
142+
143+
// 1s TTL: still alive (server-side TTL has not elapsed) but well below the 60s threshold.
144+
stateStore.setIdleEntryTtl(poolName, Duration.ofSeconds(1))
145+
stateStore.putIdle(poolName, "id-1")
146+
stateStore.putIdle(poolName, "id-2")
147+
148+
val result = stateStore.tryTakeIdle(poolName, Duration.ofSeconds(60))
149+
assertNull(result.sandboxId)
150+
assertEquals(setOf("id-1", "id-2"), result.discardedAliveSandboxIds.toSet())
151+
assertEquals(0, stateStore.snapshotCounters(poolName).idleCount)
152+
}
153+
154+
@Test
155+
fun `tryTakeIdle silently drops fully-expired entries`() {
156+
val stateStore = requireStore()
157+
158+
stateStore.setIdleEntryTtl(poolName, Duration.ofMillis(50))
159+
stateStore.putIdle(poolName, "expired")
160+
Thread.sleep(150)
161+
stateStore.setIdleEntryTtl(poolName, Duration.ofMinutes(10))
162+
stateStore.putIdle(poolName, "alive")
163+
164+
val result = stateStore.tryTakeIdle(poolName, Duration.ofSeconds(60))
165+
// expired silently dropped (no kill needed); alive returned.
166+
assertEquals("alive", result.sandboxId)
167+
assertEquals(emptyList<String>(), result.discardedAliveSandboxIds)
168+
}
169+
170+
@Test
171+
fun `reapExpiredIdle with minRemainingTtl returns alive evicted entries`() {
172+
val stateStore = requireStore()
173+
174+
stateStore.setIdleEntryTtl(poolName, Duration.ofSeconds(1))
175+
stateStore.putIdle(poolName, "id-1")
176+
stateStore.putIdle(poolName, "id-2")
177+
178+
val discardedAlive =
179+
stateStore.reapExpiredIdle(poolName, Instant.now(), Duration.ofSeconds(60))
180+
181+
assertEquals(setOf("id-1", "id-2"), discardedAlive.toSet())
182+
assertEquals(0, stateStore.snapshotCounters(poolName).idleCount)
183+
}
184+
185+
@Test
186+
fun `reapExpiredIdle with minRemainingTtl preserves entries above the threshold`() {
187+
val stateStore = requireStore()
188+
189+
stateStore.setIdleEntryTtl(poolName, Duration.ofMinutes(10))
190+
stateStore.putIdle(poolName, "id-1")
191+
192+
val discardedAlive =
193+
stateStore.reapExpiredIdle(poolName, Instant.now(), Duration.ofSeconds(60))
194+
195+
assertEquals(emptyList<String>(), discardedAlive)
196+
assertEquals(1, stateStore.snapshotCounters(poolName).idleCount)
197+
}
198+
199+
@Test
200+
fun `tryTakeIdle with minRemainingTtl returns entries that satisfy the threshold`() {
201+
val stateStore = requireStore()
202+
203+
stateStore.setIdleEntryTtl(poolName, Duration.ofMinutes(10))
204+
stateStore.putIdle(poolName, "id-1")
205+
206+
val result = stateStore.tryTakeIdle(poolName, Duration.ofSeconds(60))
207+
assertEquals("id-1", result.sandboxId)
208+
assertEquals(emptyList<String>(), result.discardedAliveSandboxIds)
209+
}
210+
211+
@Test
212+
fun `tryTakeIdle with zero minRemainingTtl behaves like the base call`() {
213+
val stateStore = requireStore()
214+
215+
stateStore.putIdle(poolName, "id-1")
216+
val taken = stateStore.tryTakeIdle(poolName, Duration.ZERO)
217+
assertEquals("id-1", taken.sandboxId)
218+
assertEquals(emptyList<String>(), taken.discardedAliveSandboxIds)
219+
220+
val empty = stateStore.tryTakeIdle(poolName, Duration.ZERO)
221+
assertNull(empty.sandboxId)
222+
assertEquals(emptyList<String>(), empty.discardedAliveSandboxIds)
223+
}
224+
139225
@Test
140226
fun `setIdleEntryTtl validates positive duration`() {
141227
val stateStore = requireStore()

sdks/sandbox/kotlin/sandbox/src/main/kotlin/com/alibaba/opensandbox/sandbox/domain/pool/PoolConfig.kt

Lines changed: 51 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -41,6 +41,15 @@ import kotlin.math.ceil
4141
* ready (default: 200ms).
4242
* @property acquireHealthCheck Optional custom health check for sandboxes returned by acquire.
4343
* @property acquireSkipHealthCheck When true, skip readiness checks for sandboxes returned by acquire (default: false).
44+
* @property acquireMinRemainingTtl Minimum remaining TTL an idle sandbox must have to be returned
45+
* by acquire. Idle entries closer to expiry than this threshold are discarded so the subsequent
46+
* ready-check and any user-side renew have time to run before server-side expiry. Set to
47+
* [Duration.ZERO] to opt out and restore the pre-existing binary-expiry behavior.
48+
*
49+
* Default is auto-derived from [idleTimeout] so existing users with short idle timeouts are not
50+
* silently broken: 60s when [idleTimeout] > 60s, otherwise `idleTimeout / 2` (rounded down). The
51+
* resolved value is always strictly less than [idleTimeout]. Pass an explicit value to the builder
52+
* to override.
4453
* @property warmupReadyTimeout Max time to wait for a pool-created sandbox to become ready (default: 30s).
4554
* @property warmupHealthCheckPollingInterval Poll interval while waiting for a pool-created sandbox to become ready
4655
* (default: 200ms).
@@ -65,6 +74,7 @@ class PoolConfig private constructor(
6574
val acquireHealthCheckPollingInterval: Duration,
6675
val acquireHealthCheck: ((Sandbox) -> Boolean)?,
6776
val acquireSkipHealthCheck: Boolean,
77+
val acquireMinRemainingTtl: Duration,
6878
val warmupReadyTimeout: Duration,
6979
val warmupHealthCheckPollingInterval: Duration,
7080
val warmupHealthCheck: ((Sandbox) -> Boolean)?,
@@ -87,6 +97,11 @@ class PoolConfig private constructor(
8797
require(!acquireHealthCheckPollingInterval.isNegative && !acquireHealthCheckPollingInterval.isZero) {
8898
"acquireHealthCheckPollingInterval must be positive"
8999
}
100+
require(!acquireMinRemainingTtl.isNegative) { "acquireMinRemainingTtl must be non-negative" }
101+
require(acquireMinRemainingTtl < idleTimeout) {
102+
"acquireMinRemainingTtl ($acquireMinRemainingTtl) must be strictly less than " +
103+
"idleTimeout ($idleTimeout); otherwise every warmed idle entry would be rejected"
104+
}
90105
require(!warmupReadyTimeout.isNegative && !warmupReadyTimeout.isZero) { "warmupReadyTimeout must be positive" }
91106
require(!warmupHealthCheckPollingInterval.isNegative && !warmupHealthCheckPollingInterval.isZero) {
92107
"warmupHealthCheckPollingInterval must be positive"
@@ -101,13 +116,29 @@ class PoolConfig private constructor(
101116
private const val DEFAULT_DEGRADED_THRESHOLD = 3
102117
private val DEFAULT_ACQUIRE_READY_TIMEOUT = Duration.ofSeconds(30)
103118
private val DEFAULT_ACQUIRE_HEALTH_CHECK_POLLING_INTERVAL = Duration.ofMillis(200)
119+
private val DEFAULT_ACQUIRE_MIN_REMAINING_TTL_CAP: Duration = Duration.ofSeconds(60)
104120
private val DEFAULT_WARMUP_READY_TIMEOUT = Duration.ofSeconds(30)
105121
private val DEFAULT_WARMUP_HEALTH_CHECK_POLLING_INTERVAL = Duration.ofMillis(200)
106122
private val DEFAULT_IDLE_TIMEOUT = Duration.ofHours(24)
107123
private val DEFAULT_DRAIN_TIMEOUT = Duration.ofSeconds(30)
108124

109125
@JvmStatic
110126
fun builder(): Builder = Builder()
127+
128+
/**
129+
* Resolves the default `acquireMinRemainingTtl` from the user's [idleTimeout]:
130+
* `min(60s, idleTimeout / 2)`. The result is always strictly less than [idleTimeout],
131+
* so users with short idle timeouts get an automatically scaled threshold instead of a
132+
* config-time error.
133+
*/
134+
internal fun defaultAcquireMinRemainingTtl(idleTimeout: Duration): Duration {
135+
val half = idleTimeout.dividedBy(2L)
136+
return if (DEFAULT_ACQUIRE_MIN_REMAINING_TTL_CAP < half) {
137+
DEFAULT_ACQUIRE_MIN_REMAINING_TTL_CAP
138+
} else {
139+
half
140+
}
141+
}
111142
}
112143

113144
internal fun withMaxIdle(maxIdle: Int): PoolConfig {
@@ -126,6 +157,7 @@ class PoolConfig private constructor(
126157
acquireHealthCheckPollingInterval = acquireHealthCheckPollingInterval,
127158
acquireHealthCheck = acquireHealthCheck,
128159
acquireSkipHealthCheck = acquireSkipHealthCheck,
160+
acquireMinRemainingTtl = acquireMinRemainingTtl,
129161
warmupReadyTimeout = warmupReadyTimeout,
130162
warmupHealthCheckPollingInterval = warmupHealthCheckPollingInterval,
131163
warmupHealthCheck = warmupHealthCheck,
@@ -151,6 +183,7 @@ class PoolConfig private constructor(
151183
private var acquireHealthCheckPollingInterval: Duration = DEFAULT_ACQUIRE_HEALTH_CHECK_POLLING_INTERVAL
152184
private var acquireHealthCheck: ((Sandbox) -> Boolean)? = null
153185
private var acquireSkipHealthCheck: Boolean = false
186+
private var acquireMinRemainingTtl: Duration? = null
154187
private var warmupReadyTimeout: Duration = DEFAULT_WARMUP_READY_TIMEOUT
155188
private var warmupHealthCheckPollingInterval: Duration = DEFAULT_WARMUP_HEALTH_CHECK_POLLING_INTERVAL
156189
private var warmupHealthCheck: ((Sandbox) -> Boolean)? = null
@@ -229,6 +262,21 @@ class PoolConfig private constructor(
229262
return this
230263
}
231264

265+
/**
266+
* Sets the minimum remaining TTL an idle sandbox must have to be returned by acquire.
267+
* Idle entries closer to expiry than [acquireMinRemainingTtl] are discarded so the
268+
* subsequent ready-check and any user-side renew have time to run before the server-side
269+
* expiry kicks in.
270+
*
271+
* Must be non-negative and strictly less than `idleTimeout`. If not set, the resolved
272+
* default is `min(60s, idleTimeout / 2)`. Pass [Duration.ZERO] to opt out and restore the
273+
* pre-existing binary-expiry behavior.
274+
*/
275+
fun acquireMinRemainingTtl(acquireMinRemainingTtl: Duration): Builder {
276+
this.acquireMinRemainingTtl = acquireMinRemainingTtl
277+
return this
278+
}
279+
232280
fun warmupReadyTimeout(warmupReadyTimeout: Duration): Builder {
233281
this.warmupReadyTimeout = warmupReadyTimeout
234282
return this
@@ -277,6 +325,8 @@ class PoolConfig private constructor(
277325
val spec = creationSpec ?: throw IllegalArgumentException("creationSpec is required")
278326

279327
val warmup = warmupConcurrency ?: ceil(max * 0.2).toInt().coerceAtLeast(1)
328+
val resolvedAcquireMinRemainingTtl =
329+
acquireMinRemainingTtl ?: defaultAcquireMinRemainingTtl(idleTimeout)
280330

281331
return PoolConfig(
282332
poolName = name,
@@ -293,6 +343,7 @@ class PoolConfig private constructor(
293343
acquireHealthCheckPollingInterval = acquireHealthCheckPollingInterval,
294344
acquireHealthCheck = acquireHealthCheck,
295345
acquireSkipHealthCheck = acquireSkipHealthCheck,
346+
acquireMinRemainingTtl = resolvedAcquireMinRemainingTtl,
296347
warmupReadyTimeout = warmupReadyTimeout,
297348
warmupHealthCheckPollingInterval = warmupHealthCheckPollingInterval,
298349
warmupHealthCheck = warmupHealthCheck,

0 commit comments

Comments
 (0)