diff --git a/sdks/sandbox/kotlin/sandbox-pool-redis/src/main/kotlin/com/alibaba/opensandbox/sandbox/infrastructure/pool/RedisPoolStateStore.kt b/sdks/sandbox/kotlin/sandbox-pool-redis/src/main/kotlin/com/alibaba/opensandbox/sandbox/infrastructure/pool/RedisPoolStateStore.kt index d2b454427..193f22637 100644 --- a/sdks/sandbox/kotlin/sandbox-pool-redis/src/main/kotlin/com/alibaba/opensandbox/sandbox/infrastructure/pool/RedisPoolStateStore.kt +++ b/sdks/sandbox/kotlin/sandbox-pool-redis/src/main/kotlin/com/alibaba/opensandbox/sandbox/infrastructure/pool/RedisPoolStateStore.kt @@ -20,6 +20,7 @@ import com.alibaba.opensandbox.sandbox.domain.exceptions.PoolStateStoreUnavailab import com.alibaba.opensandbox.sandbox.domain.pool.IdleEntry import com.alibaba.opensandbox.sandbox.domain.pool.PoolStateStore import com.alibaba.opensandbox.sandbox.domain.pool.StoreCounters +import com.alibaba.opensandbox.sandbox.domain.pool.TakeIdleResult import redis.clients.jedis.UnifiedJedis import redis.clients.jedis.params.SetParams import java.time.Duration @@ -55,11 +56,49 @@ class RedisPoolStateStore( redis.eval( TAKE_IDLE_SCRIPT, listOf(idleListKey(poolName), idleExpiresKey(poolName)), - emptyList(), + listOf("0"), ) - result as? String + decodeTakeIdleResult(result).sandboxId } + override fun tryTakeIdle( + poolName: String, + minRemainingTtl: Duration, + ): TakeIdleResult { + if (minRemainingTtl.isNegative || minRemainingTtl.isZero) { + return TakeIdleResult.of(tryTakeIdle(poolName)) + } + val minRemainingTtlMs = minRemainingTtl.toMillis().coerceAtLeast(0).toString() + return execute("tryTakeIdle", poolName) { + val result = + redis.eval( + TAKE_IDLE_SCRIPT, + listOf(idleListKey(poolName), idleExpiresKey(poolName)), + listOf(minRemainingTtlMs), + ) + decodeTakeIdleResult(result) + } + } + + /** + * Decodes the Lua return value into [TakeIdleResult]. The script returns either nil (empty + * pool with no discarded entries) or a two-element array whose first slot holds the taken + * sandbox id (or an empty string when no entry satisfied the threshold but discarded-alive + * ids still need to be reported) and whose second slot holds the discarded-alive list. + * The empty-string sentinel is needed because Redis cannot return a nil literal inside an + * array reliably across clients. + */ + @Suppress("UNCHECKED_CAST") + private fun decodeTakeIdleResult(result: Any?): TakeIdleResult { + if (result == null) return TakeIdleResult.EMPTY + val list = result as? List ?: return TakeIdleResult.of(result as? String) + if (list.isEmpty()) return TakeIdleResult.EMPTY + val takenRaw = list[0] as? String + val taken = takenRaw?.takeIf { it.isNotEmpty() } + val discarded = (list.getOrNull(1) as? List)?.mapNotNull { it as? String } ?: emptyList() + return TakeIdleResult(sandboxId = taken, discardedAliveSandboxIds = discarded) + } + override fun putIdle( poolName: String, sandboxId: String, @@ -132,15 +171,36 @@ class RedisPoolStateStore( poolName: String, now: Instant, ) { - execute("reapExpiredIdle", poolName) { - redis.eval( - REAP_EXPIRED_SCRIPT, - listOf(idleListKey(poolName), idleExpiresKey(poolName)), - emptyList(), - ) + reapIdle(poolName, "0") + } + + override fun reapExpiredIdle( + poolName: String, + now: Instant, + minRemainingTtl: Duration, + ): List { + if (minRemainingTtl.isNegative || minRemainingTtl.isZero) { + reapIdle(poolName, "0") + return emptyList() } + return reapIdle(poolName, minRemainingTtl.toMillis().coerceAtLeast(0).toString()) } + @Suppress("UNCHECKED_CAST") + private fun reapIdle( + poolName: String, + minRemainingTtlMs: String, + ): List = + execute("reapExpiredIdle", poolName) { + val result = + redis.eval( + REAP_EXPIRED_SCRIPT, + listOf(idleListKey(poolName), idleExpiresKey(poolName)), + listOf(minRemainingTtlMs), + ) + (result as? List)?.mapNotNull { it as? String } ?: emptyList() + } + override fun snapshotCounters(poolName: String): StoreCounters = execute("snapshotCounters", poolName) { StoreCounters(redis.hlen(idleExpiresKey(poolName)).toInt()) @@ -240,16 +300,29 @@ class RedisPoolStateStore( """ local redis_time = redis.call('TIME') local now_ms = tonumber(redis_time[1]) * 1000 + math.floor(tonumber(redis_time[2]) / 1000) + local min_remaining_ttl_ms = tonumber(ARGV[1]) or 0 + local cutoff_ms = now_ms + min_remaining_ttl_ms + -- Drop empty entries straight to nil so clients see the empty-pool case clearly. + local discarded_alive = {} while true do local sandbox_id = redis.call('LPOP', KEYS[1]) if not sandbox_id then - return nil + if #discarded_alive == 0 then + return nil + end + return {'', discarded_alive} end local expires_at = redis.call('HGET', KEYS[2], sandbox_id) if expires_at then redis.call('HDEL', KEYS[2], sandbox_id) - if tonumber(expires_at) > now_ms then - return sandbox_id + local exp = tonumber(expires_at) + if exp > cutoff_ms then + return {sandbox_id, discarded_alive} + end + -- Below threshold. Surface alive entries so the caller can kill them; drop + -- already-expired ones silently — the server has reaped them. + if exp > now_ms then + table.insert(discarded_alive, sandbox_id) end end end @@ -292,14 +365,22 @@ class RedisPoolStateStore( """ local redis_time = redis.call('TIME') local now_ms = tonumber(redis_time[1]) * 1000 + math.floor(tonumber(redis_time[2]) / 1000) + local min_remaining_ttl_ms = tonumber(ARGV[1]) or 0 + local cutoff_ms = now_ms + min_remaining_ttl_ms + local discarded_alive = {} local entries = redis.call('HGETALL', KEYS[2]) for i = 1, #entries, 2 do - if tonumber(entries[i + 1]) <= now_ms then - redis.call('HDEL', KEYS[2], entries[i]) - redis.call('LREM', KEYS[1], 0, entries[i]) + local sandbox_id = entries[i] + local exp = tonumber(entries[i + 1]) + if exp <= cutoff_ms then + redis.call('HDEL', KEYS[2], sandbox_id) + redis.call('LREM', KEYS[1], 0, sandbox_id) + if exp > now_ms then + table.insert(discarded_alive, sandbox_id) + end end end - return 1 + return discarded_alive """ } } diff --git a/sdks/sandbox/kotlin/sandbox-pool-redis/src/test/kotlin/com/alibaba/opensandbox/sandbox/infrastructure/pool/RedisPoolStateStoreTest.kt b/sdks/sandbox/kotlin/sandbox-pool-redis/src/test/kotlin/com/alibaba/opensandbox/sandbox/infrastructure/pool/RedisPoolStateStoreTest.kt index 4b2b19c5c..6fb123070 100644 --- a/sdks/sandbox/kotlin/sandbox-pool-redis/src/test/kotlin/com/alibaba/opensandbox/sandbox/infrastructure/pool/RedisPoolStateStoreTest.kt +++ b/sdks/sandbox/kotlin/sandbox-pool-redis/src/test/kotlin/com/alibaba/opensandbox/sandbox/infrastructure/pool/RedisPoolStateStoreTest.kt @@ -136,6 +136,92 @@ class RedisPoolStateStoreTest { assertEquals(7, stateStore.getMaxIdle(poolName)) } + @Test + fun `tryTakeIdle surfaces alive entries below the threshold so callers can kill them`() { + val stateStore = requireStore() + + // 1s TTL: still alive (server-side TTL has not elapsed) but well below the 60s threshold. + stateStore.setIdleEntryTtl(poolName, Duration.ofSeconds(1)) + stateStore.putIdle(poolName, "id-1") + stateStore.putIdle(poolName, "id-2") + + val result = stateStore.tryTakeIdle(poolName, Duration.ofSeconds(60)) + assertNull(result.sandboxId) + assertEquals(setOf("id-1", "id-2"), result.discardedAliveSandboxIds.toSet()) + assertEquals(0, stateStore.snapshotCounters(poolName).idleCount) + } + + @Test + fun `tryTakeIdle silently drops fully-expired entries`() { + val stateStore = requireStore() + + stateStore.setIdleEntryTtl(poolName, Duration.ofMillis(50)) + stateStore.putIdle(poolName, "expired") + Thread.sleep(150) + stateStore.setIdleEntryTtl(poolName, Duration.ofMinutes(10)) + stateStore.putIdle(poolName, "alive") + + val result = stateStore.tryTakeIdle(poolName, Duration.ofSeconds(60)) + // expired silently dropped (no kill needed); alive returned. + assertEquals("alive", result.sandboxId) + assertEquals(emptyList(), result.discardedAliveSandboxIds) + } + + @Test + fun `reapExpiredIdle with minRemainingTtl returns alive evicted entries`() { + val stateStore = requireStore() + + stateStore.setIdleEntryTtl(poolName, Duration.ofSeconds(1)) + stateStore.putIdle(poolName, "id-1") + stateStore.putIdle(poolName, "id-2") + + val discardedAlive = + stateStore.reapExpiredIdle(poolName, Instant.now(), Duration.ofSeconds(60)) + + assertEquals(setOf("id-1", "id-2"), discardedAlive.toSet()) + assertEquals(0, stateStore.snapshotCounters(poolName).idleCount) + } + + @Test + fun `reapExpiredIdle with minRemainingTtl preserves entries above the threshold`() { + val stateStore = requireStore() + + stateStore.setIdleEntryTtl(poolName, Duration.ofMinutes(10)) + stateStore.putIdle(poolName, "id-1") + + val discardedAlive = + stateStore.reapExpiredIdle(poolName, Instant.now(), Duration.ofSeconds(60)) + + assertEquals(emptyList(), discardedAlive) + assertEquals(1, stateStore.snapshotCounters(poolName).idleCount) + } + + @Test + fun `tryTakeIdle with minRemainingTtl returns entries that satisfy the threshold`() { + val stateStore = requireStore() + + stateStore.setIdleEntryTtl(poolName, Duration.ofMinutes(10)) + stateStore.putIdle(poolName, "id-1") + + val result = stateStore.tryTakeIdle(poolName, Duration.ofSeconds(60)) + assertEquals("id-1", result.sandboxId) + assertEquals(emptyList(), result.discardedAliveSandboxIds) + } + + @Test + fun `tryTakeIdle with zero minRemainingTtl behaves like the base call`() { + val stateStore = requireStore() + + stateStore.putIdle(poolName, "id-1") + val taken = stateStore.tryTakeIdle(poolName, Duration.ZERO) + assertEquals("id-1", taken.sandboxId) + assertEquals(emptyList(), taken.discardedAliveSandboxIds) + + val empty = stateStore.tryTakeIdle(poolName, Duration.ZERO) + assertNull(empty.sandboxId) + assertEquals(emptyList(), empty.discardedAliveSandboxIds) + } + @Test fun `setIdleEntryTtl validates positive duration`() { val stateStore = requireStore() diff --git a/sdks/sandbox/kotlin/sandbox/src/main/kotlin/com/alibaba/opensandbox/sandbox/domain/pool/PoolConfig.kt b/sdks/sandbox/kotlin/sandbox/src/main/kotlin/com/alibaba/opensandbox/sandbox/domain/pool/PoolConfig.kt index fd9dec714..1cb603c37 100644 --- a/sdks/sandbox/kotlin/sandbox/src/main/kotlin/com/alibaba/opensandbox/sandbox/domain/pool/PoolConfig.kt +++ b/sdks/sandbox/kotlin/sandbox/src/main/kotlin/com/alibaba/opensandbox/sandbox/domain/pool/PoolConfig.kt @@ -41,6 +41,15 @@ import kotlin.math.ceil * ready (default: 200ms). * @property acquireHealthCheck Optional custom health check for sandboxes returned by acquire. * @property acquireSkipHealthCheck When true, skip readiness checks for sandboxes returned by acquire (default: false). + * @property acquireMinRemainingTtl Minimum remaining TTL an idle sandbox must have to be returned + * by acquire. Idle entries closer to expiry than this threshold are discarded so the subsequent + * ready-check and any user-side renew have time to run before server-side expiry. Set to + * [Duration.ZERO] to opt out and restore the pre-existing binary-expiry behavior. + * + * Default is auto-derived from [idleTimeout] so existing users with short idle timeouts are not + * silently broken: 60s when [idleTimeout] > 60s, otherwise `idleTimeout / 2` (rounded down). The + * resolved value is always strictly less than [idleTimeout]. Pass an explicit value to the builder + * to override. * @property warmupReadyTimeout Max time to wait for a pool-created sandbox to become ready (default: 30s). * @property warmupHealthCheckPollingInterval Poll interval while waiting for a pool-created sandbox to become ready * (default: 200ms). @@ -65,6 +74,7 @@ class PoolConfig private constructor( val acquireHealthCheckPollingInterval: Duration, val acquireHealthCheck: ((Sandbox) -> Boolean)?, val acquireSkipHealthCheck: Boolean, + val acquireMinRemainingTtl: Duration, val warmupReadyTimeout: Duration, val warmupHealthCheckPollingInterval: Duration, val warmupHealthCheck: ((Sandbox) -> Boolean)?, @@ -87,6 +97,11 @@ class PoolConfig private constructor( require(!acquireHealthCheckPollingInterval.isNegative && !acquireHealthCheckPollingInterval.isZero) { "acquireHealthCheckPollingInterval must be positive" } + require(!acquireMinRemainingTtl.isNegative) { "acquireMinRemainingTtl must be non-negative" } + require(acquireMinRemainingTtl < idleTimeout) { + "acquireMinRemainingTtl ($acquireMinRemainingTtl) must be strictly less than " + + "idleTimeout ($idleTimeout); otherwise every warmed idle entry would be rejected" + } require(!warmupReadyTimeout.isNegative && !warmupReadyTimeout.isZero) { "warmupReadyTimeout must be positive" } require(!warmupHealthCheckPollingInterval.isNegative && !warmupHealthCheckPollingInterval.isZero) { "warmupHealthCheckPollingInterval must be positive" @@ -101,6 +116,7 @@ class PoolConfig private constructor( private const val DEFAULT_DEGRADED_THRESHOLD = 3 private val DEFAULT_ACQUIRE_READY_TIMEOUT = Duration.ofSeconds(30) private val DEFAULT_ACQUIRE_HEALTH_CHECK_POLLING_INTERVAL = Duration.ofMillis(200) + private val DEFAULT_ACQUIRE_MIN_REMAINING_TTL_CAP: Duration = Duration.ofSeconds(60) private val DEFAULT_WARMUP_READY_TIMEOUT = Duration.ofSeconds(30) private val DEFAULT_WARMUP_HEALTH_CHECK_POLLING_INTERVAL = Duration.ofMillis(200) private val DEFAULT_IDLE_TIMEOUT = Duration.ofHours(24) @@ -108,6 +124,21 @@ class PoolConfig private constructor( @JvmStatic fun builder(): Builder = Builder() + + /** + * Resolves the default `acquireMinRemainingTtl` from the user's [idleTimeout]: + * `min(60s, idleTimeout / 2)`. The result is always strictly less than [idleTimeout], + * so users with short idle timeouts get an automatically scaled threshold instead of a + * config-time error. + */ + internal fun defaultAcquireMinRemainingTtl(idleTimeout: Duration): Duration { + val half = idleTimeout.dividedBy(2L) + return if (DEFAULT_ACQUIRE_MIN_REMAINING_TTL_CAP < half) { + DEFAULT_ACQUIRE_MIN_REMAINING_TTL_CAP + } else { + half + } + } } internal fun withMaxIdle(maxIdle: Int): PoolConfig { @@ -126,6 +157,7 @@ class PoolConfig private constructor( acquireHealthCheckPollingInterval = acquireHealthCheckPollingInterval, acquireHealthCheck = acquireHealthCheck, acquireSkipHealthCheck = acquireSkipHealthCheck, + acquireMinRemainingTtl = acquireMinRemainingTtl, warmupReadyTimeout = warmupReadyTimeout, warmupHealthCheckPollingInterval = warmupHealthCheckPollingInterval, warmupHealthCheck = warmupHealthCheck, @@ -151,6 +183,7 @@ class PoolConfig private constructor( private var acquireHealthCheckPollingInterval: Duration = DEFAULT_ACQUIRE_HEALTH_CHECK_POLLING_INTERVAL private var acquireHealthCheck: ((Sandbox) -> Boolean)? = null private var acquireSkipHealthCheck: Boolean = false + private var acquireMinRemainingTtl: Duration? = null private var warmupReadyTimeout: Duration = DEFAULT_WARMUP_READY_TIMEOUT private var warmupHealthCheckPollingInterval: Duration = DEFAULT_WARMUP_HEALTH_CHECK_POLLING_INTERVAL private var warmupHealthCheck: ((Sandbox) -> Boolean)? = null @@ -229,6 +262,21 @@ class PoolConfig private constructor( return this } + /** + * Sets the minimum remaining TTL an idle sandbox must have to be returned by acquire. + * Idle entries closer to expiry than [acquireMinRemainingTtl] are discarded so the + * subsequent ready-check and any user-side renew have time to run before the server-side + * expiry kicks in. + * + * Must be non-negative and strictly less than `idleTimeout`. If not set, the resolved + * default is `min(60s, idleTimeout / 2)`. Pass [Duration.ZERO] to opt out and restore the + * pre-existing binary-expiry behavior. + */ + fun acquireMinRemainingTtl(acquireMinRemainingTtl: Duration): Builder { + this.acquireMinRemainingTtl = acquireMinRemainingTtl + return this + } + fun warmupReadyTimeout(warmupReadyTimeout: Duration): Builder { this.warmupReadyTimeout = warmupReadyTimeout return this @@ -277,6 +325,8 @@ class PoolConfig private constructor( val spec = creationSpec ?: throw IllegalArgumentException("creationSpec is required") val warmup = warmupConcurrency ?: ceil(max * 0.2).toInt().coerceAtLeast(1) + val resolvedAcquireMinRemainingTtl = + acquireMinRemainingTtl ?: defaultAcquireMinRemainingTtl(idleTimeout) return PoolConfig( poolName = name, @@ -293,6 +343,7 @@ class PoolConfig private constructor( acquireHealthCheckPollingInterval = acquireHealthCheckPollingInterval, acquireHealthCheck = acquireHealthCheck, acquireSkipHealthCheck = acquireSkipHealthCheck, + acquireMinRemainingTtl = resolvedAcquireMinRemainingTtl, warmupReadyTimeout = warmupReadyTimeout, warmupHealthCheckPollingInterval = warmupHealthCheckPollingInterval, warmupHealthCheck = warmupHealthCheck, diff --git a/sdks/sandbox/kotlin/sandbox/src/main/kotlin/com/alibaba/opensandbox/sandbox/domain/pool/PoolStateStore.kt b/sdks/sandbox/kotlin/sandbox/src/main/kotlin/com/alibaba/opensandbox/sandbox/domain/pool/PoolStateStore.kt index 3467df820..b6b683659 100644 --- a/sdks/sandbox/kotlin/sandbox/src/main/kotlin/com/alibaba/opensandbox/sandbox/domain/pool/PoolStateStore.kt +++ b/sdks/sandbox/kotlin/sandbox/src/main/kotlin/com/alibaba/opensandbox/sandbox/domain/pool/PoolStateStore.kt @@ -38,6 +38,28 @@ interface PoolStateStore { */ fun tryTakeIdle(poolName: String): String? + /** + * Variant of [tryTakeIdle] that skips entries whose remaining TTL is below [minRemainingTtl]. + * + * Atomically removes and returns a [TakeIdleResult] holding the chosen idle sandbox ID (or + * null if none satisfied the threshold) and the IDs of any **alive** entries that were + * skipped because their remaining TTL fell below [minRemainingTtl]. Already-expired entries + * are silently dropped — the server has already reaped them. + * + * Callers should best-effort terminate every ID in [TakeIdleResult.discardedAliveSandboxIds] + * so those sandboxes do not linger past their pool membership and consume quota until + * server-side TTL. + * + * Default implementation delegates to [tryTakeIdle] for source compatibility with stores + * that predate this method. Such stores cannot surface near-expiry entries, so the + * discarded-alive list is always empty regardless of [minRemainingTtl]. Stores that + * genuinely want near-expiry filtering must override this method. + */ + fun tryTakeIdle( + poolName: String, + minRemainingTtl: Duration, + ): TakeIdleResult = TakeIdleResult.of(tryTakeIdle(poolName)) + /** * Adds a sandbox ID to the idle set for the pool. * Idempotent: duplicate put for same sandboxId leaves membership single-copy. @@ -91,6 +113,31 @@ interface PoolStateStore { now: Instant, ) + /** + * Variant of [reapExpiredIdle] that also evicts entries whose remaining TTL is below + * [minRemainingTtl]. Reconcile calls this so near-expiry entries are reclaimed proactively + * (rather than waiting for them to fully expire), letting the pool replenish them with fresh + * sandboxes before a future acquire would discard them. + * + * Returns the IDs of **alive** entries that were evicted because their remaining TTL fell + * below [minRemainingTtl]. Already-expired entries are silently dropped — the server has + * already reaped them. Callers should best-effort terminate every returned ID so those + * sandboxes do not linger past their pool membership. + * + * Default implementation delegates to [reapExpiredIdle] for source compatibility with stores + * that predate this method. Such stores cannot surface near-expiry entries; they return an + * empty list regardless of [minRemainingTtl]. Stores that genuinely want near-expiry + * filtering must override this method. + */ + fun reapExpiredIdle( + poolName: String, + now: Instant, + minRemainingTtl: Duration, + ): List { + reapExpiredIdle(poolName, now) + return emptyList() + } + /** * Returns a snapshot of counters for the pool (at least idle count). * Eventually consistent for distributed stores. diff --git a/sdks/sandbox/kotlin/sandbox/src/main/kotlin/com/alibaba/opensandbox/sandbox/domain/pool/TakeIdleResult.kt b/sdks/sandbox/kotlin/sandbox/src/main/kotlin/com/alibaba/opensandbox/sandbox/domain/pool/TakeIdleResult.kt new file mode 100644 index 000000000..7e0ac073f --- /dev/null +++ b/sdks/sandbox/kotlin/sandbox/src/main/kotlin/com/alibaba/opensandbox/sandbox/domain/pool/TakeIdleResult.kt @@ -0,0 +1,42 @@ +/* + * Copyright 2025 Alibaba Group Holding Ltd. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.alibaba.opensandbox.sandbox.domain.pool + +/** + * Result of a near-expiry-aware [PoolStateStore.tryTakeIdle]. + * + * @property sandboxId The chosen idle sandbox ID, or null if no entry satisfied the threshold. + * @property discardedAliveSandboxIds IDs that were skipped because their remaining TTL was + * below the configured `minRemainingTtl`. These sandboxes are still **alive on the server** + * (their server-side TTL has not elapsed yet) — callers should best-effort terminate them + * so they do not linger past their pool membership and consume quota until expiry. + * + * Already-expired entries (server-side TTL has elapsed) are intentionally not included: + * the server has already reaped them and a kill call would be a wasted round-trip. + */ +data class TakeIdleResult( + val sandboxId: String?, + val discardedAliveSandboxIds: List = emptyList(), +) { + companion object { + @JvmStatic + val EMPTY: TakeIdleResult = TakeIdleResult(sandboxId = null) + + @JvmStatic + fun of(sandboxId: String?): TakeIdleResult = if (sandboxId == null) EMPTY else TakeIdleResult(sandboxId) + } +} diff --git a/sdks/sandbox/kotlin/sandbox/src/main/kotlin/com/alibaba/opensandbox/sandbox/infrastructure/pool/InMemoryPoolStateStore.kt b/sdks/sandbox/kotlin/sandbox/src/main/kotlin/com/alibaba/opensandbox/sandbox/infrastructure/pool/InMemoryPoolStateStore.kt index d5f33fd9c..55598d96e 100644 --- a/sdks/sandbox/kotlin/sandbox/src/main/kotlin/com/alibaba/opensandbox/sandbox/infrastructure/pool/InMemoryPoolStateStore.kt +++ b/sdks/sandbox/kotlin/sandbox/src/main/kotlin/com/alibaba/opensandbox/sandbox/infrastructure/pool/InMemoryPoolStateStore.kt @@ -19,6 +19,7 @@ package com.alibaba.opensandbox.sandbox.infrastructure.pool import com.alibaba.opensandbox.sandbox.domain.pool.IdleEntry import com.alibaba.opensandbox.sandbox.domain.pool.PoolStateStore import com.alibaba.opensandbox.sandbox.domain.pool.StoreCounters +import com.alibaba.opensandbox.sandbox.domain.pool.TakeIdleResult import java.time.Duration import java.time.Instant import java.util.concurrent.ConcurrentHashMap @@ -54,6 +55,38 @@ class InMemoryPoolStateStore : PoolStateStore { } } + override fun tryTakeIdle( + poolName: String, + minRemainingTtl: Duration, + ): TakeIdleResult { + if (minRemainingTtl.isNegative || minRemainingTtl.isZero) { + return TakeIdleResult.of(tryTakeIdle(poolName)) + } + val state = pools[poolName] ?: return TakeIdleResult.EMPTY + val now = Instant.now() + val cutoff = now.plus(minRemainingTtl) + var discardedAlive: MutableList? = null + while (true) { + val sandboxId = + state.queue.poll() ?: return TakeIdleResult( + sandboxId = null, + discardedAliveSandboxIds = discardedAlive ?: emptyList(), + ) + val entry = state.map.remove(sandboxId) ?: continue // already removed (e.g. by removeIdle) + if (entry.expiresAt.isAfter(cutoff)) { + return TakeIdleResult( + sandboxId = sandboxId, + discardedAliveSandboxIds = discardedAlive ?: emptyList(), + ) + } + // Below threshold. If still alive (server-side TTL not yet elapsed), surface it so + // the caller can kill it; otherwise silently drop — the server has already reaped it. + if (entry.expiresAt.isAfter(now)) { + (discardedAlive ?: ArrayList().also { discardedAlive = it }).add(sandboxId) + } + } + } + override fun putIdle( poolName: String, sandboxId: String, @@ -108,6 +141,31 @@ class InMemoryPoolStateStore : PoolStateStore { state.queue.removeIf { sandboxId -> !state.map.containsKey(sandboxId) } } + override fun reapExpiredIdle( + poolName: String, + now: Instant, + minRemainingTtl: Duration, + ): List { + if (minRemainingTtl.isNegative || minRemainingTtl.isZero) { + reapExpiredIdle(poolName, now) + return emptyList() + } + val state = pools[poolName] ?: return emptyList() + val cutoff = now.plus(minRemainingTtl) + var discardedAlive: MutableList? = null + // Snapshot to avoid mutating the map while iterating. + for ((sandboxId, entry) in state.map.entries.toList()) { + if (entry.expiresAt.isAfter(cutoff)) continue + if (state.map.remove(sandboxId, entry)) { + if (entry.expiresAt.isAfter(now)) { + (discardedAlive ?: ArrayList().also { discardedAlive = it }).add(sandboxId) + } + } + } + state.queue.removeIf { sandboxId -> !state.map.containsKey(sandboxId) } + return discardedAlive ?: emptyList() + } + override fun snapshotCounters(poolName: String): StoreCounters { val state = pools[poolName] ?: return StoreCounters(idleCount = 0) val now = Instant.now() diff --git a/sdks/sandbox/kotlin/sandbox/src/main/kotlin/com/alibaba/opensandbox/sandbox/infrastructure/pool/PoolReconciler.kt b/sdks/sandbox/kotlin/sandbox/src/main/kotlin/com/alibaba/opensandbox/sandbox/infrastructure/pool/PoolReconciler.kt index 0937e064a..7f1996af8 100644 --- a/sdks/sandbox/kotlin/sandbox/src/main/kotlin/com/alibaba/opensandbox/sandbox/infrastructure/pool/PoolReconciler.kt +++ b/sdks/sandbox/kotlin/sandbox/src/main/kotlin/com/alibaba/opensandbox/sandbox/infrastructure/pool/PoolReconciler.kt @@ -72,7 +72,12 @@ internal object PoolReconciler { val ttl = config.primaryLockTtl val now = Instant.now() - stateStore.reapExpiredIdle(poolName, now) + val discardedAlive = stateStore.reapExpiredIdle(poolName, now, config.acquireMinRemainingTtl) + for (sandboxId in discardedAlive) { + // Reaped near-expiry but server-side TTL has not yet elapsed; kill so the live sandbox + // does not linger past its pool membership and consume quota. + onDiscardSandbox(sandboxId) + } val counters = stateStore.snapshotCounters(poolName) val excess = (counters.idleCount - config.maxIdle).coerceAtLeast(0) val toRemove = minOf(excess, config.warmupConcurrency) diff --git a/sdks/sandbox/kotlin/sandbox/src/main/kotlin/com/alibaba/opensandbox/sandbox/pool/SandboxPool.kt b/sdks/sandbox/kotlin/sandbox/src/main/kotlin/com/alibaba/opensandbox/sandbox/pool/SandboxPool.kt index 3b030fac5..f7a8cd784 100644 --- a/sdks/sandbox/kotlin/sandbox/src/main/kotlin/com/alibaba/opensandbox/sandbox/pool/SandboxPool.kt +++ b/sdks/sandbox/kotlin/sandbox/src/main/kotlin/com/alibaba/opensandbox/sandbox/pool/SandboxPool.kt @@ -209,7 +209,13 @@ class SandboxPool internal constructor( throw PoolNotRunningException("Cannot acquire when pool state is $state") } val poolName = config.poolName - val sandboxId = stateStore.tryTakeIdle(poolName) + val takeResult = stateStore.tryTakeIdle(poolName, config.acquireMinRemainingTtl) + val sandboxId = takeResult.sandboxId + // Defer the cleanup of below-threshold-but-still-alive sandboxes until after the + // chosen candidate is connected and renewed. Doing it inline (synchronously, before + // connect) would let slow kill RPCs eat the candidate's remaining TTL — exactly the + // race this PR is trying to fix. + val pendingKill = takeResult.discardedAliveSandboxIds var noIdleReason: String? = null // null = got a sandbox from idle; non-null = reason we have no usable idle var idleConnectFailure: Exception? = null if (sandboxId != null) { @@ -225,6 +231,10 @@ class SandboxPool internal constructor( config.acquireHealthCheck?.let { healthCheck(it) } ?: this }.connect() sandboxTimeout?.let { sandbox.renew(it) } + // Candidate is connected and (optionally) renewed. Now safe to clean up the + // discarded-alive sandboxes; offload to the warmup executor so the caller + // does not wait for N kill RPCs. + scheduleKillDiscardedAlive(poolName, pendingKill, source = "acquire") logger.debug( "Acquire from idle: pool_name={} sandbox_id={} policy={}", poolName, @@ -252,6 +262,11 @@ class SandboxPool internal constructor( } else { noIdleReason = "idle buffer empty" } + // Reaching here means we did not return a sandbox from idle. Still kick off the + // deferred cleanup so the discarded-alive sandboxes do not linger; FAIL_FAST throws + // and DIRECT_CREATE falls through to a fresh sandbox creation, neither of which + // benefits from a synchronous kill. + scheduleKillDiscardedAlive(poolName, pendingKill, source = "acquire") val reason = noIdleReason!! if (policy == AcquirePolicy.FAIL_FAST) { logger.debug("Acquire FAIL_FAST: pool_name={} reason={}", poolName, reason) @@ -412,6 +427,76 @@ class SandboxPool internal constructor( private fun resolveMaxIdle(): Int = stateStore.getMaxIdle(config.poolName) ?: currentMaxIdle + /** + * Offload [killDiscardedAlive] to the warmup executor so the caller does not block on the + * kill RPCs. Falls back to inline execution when no executor is available (e.g. the pool is + * shutting down) — better to slow the caller than to drop the cleanup entirely. + */ + private fun scheduleKillDiscardedAlive( + poolName: String, + sandboxIds: List, + source: String, + ) { + if (sandboxIds.isEmpty()) return + val executor = warmupExecutor + if (executor == null) { + killDiscardedAlive(poolName, sandboxIds, source) + return + } + try { + executor.submit { + killDiscardedAlive(poolName, sandboxIds, source) + } + } catch (e: Exception) { + // Executor may reject if the pool is mid-shutdown; fall back to inline kill. + logger.debug( + "Discarded-alive kill submit rejected, running inline: pool_name={} count={} error={}", + poolName, + sandboxIds.size, + e.message, + ) + killDiscardedAlive(poolName, sandboxIds, source) + } + } + + /** + * Best-effort terminate sandboxes the store dropped because their remaining TTL fell below + * `acquireMinRemainingTtl`. The store has already removed them from idle membership; without + * this kill they would linger on the server until their TTL elapses, exceeding the intended + * pool size during the gap. + * + * Failures are logged and swallowed: the caller's primary outcome (acquire/reconcile) must + * not be impacted by a janitor failure. + */ + private fun killDiscardedAlive( + poolName: String, + sandboxIds: List, + source: String, + ) { + if (sandboxIds.isEmpty()) return + val manager = sandboxManager ?: return + for (sandboxId in sandboxIds) { + try { + manager.killSandbox(sandboxId) + logger.debug( + "Killed near-expiry idle sandbox: pool_name={} sandbox_id={} source={}", + poolName, + sandboxId, + source, + ) + } catch (e: Exception) { + logger.warn( + "Failed to kill near-expiry idle sandbox (best-effort, will expire server-side): " + + "pool_name={} sandbox_id={} source={} error={}", + poolName, + sandboxId, + source, + e.message, + ) + } + } + } + private fun createSandboxManager(): SandboxManager = sandboxManagerFactory(connectionConfig.copyWithoutConnectionPool()) private fun runReconcileTick() { @@ -445,6 +530,13 @@ class SandboxPool internal constructor( val sandbox = buildSandboxFromSpec() try { config.warmupSandboxPreparer?.prepare(sandbox) + // The server-side TTL has been ticking since `buildSandboxFromSpec()`; readiness + // wait and `warmupSandboxPreparer` can both consume meaningful time (think + // initialization scripts). Renew right before handing the id back to the + // reconciler so the store's stamped expiry (now + idleTimeout) actually matches + // what the server will honor — otherwise `acquireMinRemainingTtl` overestimates + // remaining TTL by the warmup duration. + sandbox.renew(config.idleTimeout) sandbox.id } catch (e: Exception) { try { @@ -724,6 +816,11 @@ class SandboxPool internal constructor( return this } + fun acquireMinRemainingTtl(acquireMinRemainingTtl: Duration): Builder { + configBuilder.acquireMinRemainingTtl(acquireMinRemainingTtl) + return this + } + fun warmupReadyTimeout(warmupReadyTimeout: Duration): Builder { configBuilder.warmupReadyTimeout(warmupReadyTimeout) return this diff --git a/sdks/sandbox/kotlin/sandbox/src/test/kotlin/com/alibaba/opensandbox/sandbox/domain/pool/PoolConfigTest.kt b/sdks/sandbox/kotlin/sandbox/src/test/kotlin/com/alibaba/opensandbox/sandbox/domain/pool/PoolConfigTest.kt index 2d7e581c7..092afa3b3 100644 --- a/sdks/sandbox/kotlin/sandbox/src/test/kotlin/com/alibaba/opensandbox/sandbox/domain/pool/PoolConfigTest.kt +++ b/sdks/sandbox/kotlin/sandbox/src/test/kotlin/com/alibaba/opensandbox/sandbox/domain/pool/PoolConfigTest.kt @@ -22,6 +22,7 @@ import com.alibaba.opensandbox.sandbox.infrastructure.pool.InMemoryPoolStateStor import org.junit.jupiter.api.Assertions.assertEquals import org.junit.jupiter.api.Assertions.assertFalse import org.junit.jupiter.api.Assertions.assertSame +import org.junit.jupiter.api.Assertions.assertThrows import org.junit.jupiter.api.Test import java.time.Duration @@ -46,6 +47,8 @@ class PoolConfigTest { assertEquals(Duration.ofMillis(200), config.acquireHealthCheckPollingInterval) assertFalse(config.acquireSkipHealthCheck) assertEquals(null, config.acquireHealthCheck) + // 24h idle, default cap of 60s applies → 60s. + assertEquals(Duration.ofSeconds(60), config.acquireMinRemainingTtl) assertEquals(Duration.ofHours(24), config.idleTimeout) } @@ -65,6 +68,7 @@ class PoolConfigTest { .acquireHealthCheckPollingInterval(Duration.ofMillis(250)) .acquireHealthCheck(healthCheck) .acquireSkipHealthCheck() + .acquireMinRemainingTtl(Duration.ofSeconds(90)) .warmupReadyTimeout(Duration.ofSeconds(45)) .warmupHealthCheckPollingInterval(Duration.ofSeconds(1)) .warmupHealthCheck(healthCheck) @@ -77,6 +81,7 @@ class PoolConfigTest { assertEquals(Duration.ofMillis(250), config.acquireHealthCheckPollingInterval) assertSame(healthCheck, config.acquireHealthCheck) assertEquals(true, config.acquireSkipHealthCheck) + assertEquals(Duration.ofSeconds(90), config.acquireMinRemainingTtl) assertEquals(Duration.ofSeconds(45), config.warmupReadyTimeout) assertEquals(Duration.ofSeconds(1), config.warmupHealthCheckPollingInterval) assertSame(healthCheck, config.warmupHealthCheck) @@ -84,4 +89,89 @@ class PoolConfigTest { assertEquals(true, config.warmupSkipHealthCheck) assertEquals(Duration.ofMinutes(10), config.idleTimeout) } + + @Test + fun `build rejects negative acquireMinRemainingTtl`() { + val builder = + PoolConfig.builder() + .poolName("test-pool") + .ownerId("test-owner") + .maxIdle(2) + .stateStore(InMemoryPoolStateStore()) + .connectionConfig(ConnectionConfig.builder().build()) + .creationSpec(PoolCreationSpec.builder().image("ubuntu:22.04").build()) + .acquireMinRemainingTtl(Duration.ofSeconds(-1)) + + assertThrows(IllegalArgumentException::class.java) { builder.build() } + } + + @Test + fun `default acquireMinRemainingTtl scales down for short idleTimeout`() { + // idleTimeout = 30s ⇒ default = min(60s, 30s/2) = 15s. Existing users with short idle + // timeouts must not get a config-time error from a hidden 60s default. + val config = + PoolConfig.builder() + .poolName("test-pool") + .ownerId("test-owner") + .maxIdle(2) + .stateStore(InMemoryPoolStateStore()) + .connectionConfig(ConnectionConfig.builder().build()) + .creationSpec(PoolCreationSpec.builder().image("ubuntu:22.04").build()) + .idleTimeout(Duration.ofSeconds(30)) + .build() + + assertEquals(Duration.ofSeconds(15), config.acquireMinRemainingTtl) + } + + @Test + fun `default acquireMinRemainingTtl caps at 60s for long idleTimeout`() { + val config = + PoolConfig.builder() + .poolName("test-pool") + .ownerId("test-owner") + .maxIdle(2) + .stateStore(InMemoryPoolStateStore()) + .connectionConfig(ConnectionConfig.builder().build()) + .creationSpec(PoolCreationSpec.builder().image("ubuntu:22.04").build()) + .idleTimeout(Duration.ofMinutes(10)) + .build() + + assertEquals(Duration.ofSeconds(60), config.acquireMinRemainingTtl) + } + + @Test + fun `build rejects explicit acquireMinRemainingTtl greater than or equal to idleTimeout`() { + // The auto-default protects against this, but if a user explicitly sets a value above + // idleTimeout the build() guard still fires. + val builder = + PoolConfig.builder() + .poolName("test-pool") + .ownerId("test-owner") + .maxIdle(2) + .stateStore(InMemoryPoolStateStore()) + .connectionConfig(ConnectionConfig.builder().build()) + .creationSpec(PoolCreationSpec.builder().image("ubuntu:22.04").build()) + .idleTimeout(Duration.ofSeconds(30)) + .acquireMinRemainingTtl(Duration.ofSeconds(30)) + + assertThrows(IllegalArgumentException::class.java) { builder.build() } + } + + @Test + fun `build accepts acquireMinRemainingTtl just below idleTimeout`() { + // Strict-less-than boundary: 9s < 10s passes. + val config = + PoolConfig.builder() + .poolName("test-pool") + .ownerId("test-owner") + .maxIdle(2) + .stateStore(InMemoryPoolStateStore()) + .connectionConfig(ConnectionConfig.builder().build()) + .creationSpec(PoolCreationSpec.builder().image("ubuntu:22.04").build()) + .idleTimeout(Duration.ofSeconds(10)) + .acquireMinRemainingTtl(Duration.ofSeconds(9)) + .build() + + assertEquals(Duration.ofSeconds(9), config.acquireMinRemainingTtl) + } } diff --git a/sdks/sandbox/kotlin/sandbox/src/test/kotlin/com/alibaba/opensandbox/sandbox/infrastructure/pool/InMemoryPoolStateStoreTest.kt b/sdks/sandbox/kotlin/sandbox/src/test/kotlin/com/alibaba/opensandbox/sandbox/infrastructure/pool/InMemoryPoolStateStoreTest.kt index 61cbed351..a61139d4f 100644 --- a/sdks/sandbox/kotlin/sandbox/src/test/kotlin/com/alibaba/opensandbox/sandbox/infrastructure/pool/InMemoryPoolStateStoreTest.kt +++ b/sdks/sandbox/kotlin/sandbox/src/test/kotlin/com/alibaba/opensandbox/sandbox/infrastructure/pool/InMemoryPoolStateStoreTest.kt @@ -174,6 +174,129 @@ class InMemoryPoolStateStoreTest { assertNull(store.getMaxIdle(poolName)) } + @Test + fun `tryTakeIdle with minRemainingTtl falls back to base behavior when zero or negative`() { + store.putIdle(poolName, "id-1") + // Duration.ZERO must not change behavior; result has no discarded-alive entries. + val first = store.tryTakeIdle(poolName, Duration.ZERO) + assertEquals("id-1", first.sandboxId) + assertEquals(emptyList(), first.discardedAliveSandboxIds) + + store.putIdle(poolName, "id-2") + val second = store.tryTakeIdle(poolName, Duration.ofSeconds(-1)) + assertEquals("id-2", second.sandboxId) + assertEquals(emptyList(), second.discardedAliveSandboxIds) + } + + @Test + fun `tryTakeIdle surfaces alive entries below the threshold so callers can kill them`() { + val inMemoryStore = InMemoryPoolStateStore() + // Entries get a 5-second TTL. + inMemoryStore.setIdleEntryTtl(poolName, Duration.ofSeconds(5)) + inMemoryStore.putIdle(poolName, "id-1") + inMemoryStore.putIdle(poolName, "id-2") + + // Demand more remaining TTL than the entries have. Both have ~5s remaining + // and are still server-side alive, so they show up in discardedAliveSandboxIds. + val result = inMemoryStore.tryTakeIdle(poolName, Duration.ofSeconds(60)) + assertNull(result.sandboxId) + assertEquals(listOf("id-1", "id-2"), result.discardedAliveSandboxIds) + // The discarded entries are also removed from idle membership. + assertEquals(0, inMemoryStore.snapshotCounters(poolName).idleCount) + } + + @Test + fun `tryTakeIdle silently drops fully-expired entries because the server has reaped them`() { + val inMemoryStore = InMemoryPoolStateStore() + // 1ms TTL → entry is fully expired by the time we try to take it. + inMemoryStore.setIdleEntryTtl(poolName, Duration.ofMillis(1)) + inMemoryStore.putIdle(poolName, "expired-1") + Thread.sleep(20) + inMemoryStore.setIdleEntryTtl(poolName, Duration.ofMinutes(10)) + inMemoryStore.putIdle(poolName, "alive-1") + + // The expired entry is silently dropped (no kill needed — server already reaped it); + // the alive entry is returned. + val result = inMemoryStore.tryTakeIdle(poolName, Duration.ofSeconds(60)) + assertEquals("alive-1", result.sandboxId) + assertEquals(emptyList(), result.discardedAliveSandboxIds) + } + + @Test + fun `reapExpiredIdle with minRemainingTtl returns alive evicted entries`() { + val inMemoryStore = InMemoryPoolStateStore() + inMemoryStore.setIdleEntryTtl(poolName, Duration.ofSeconds(5)) + inMemoryStore.putIdle(poolName, "id-1") + inMemoryStore.putIdle(poolName, "id-2") + + // Sweep with a 60s threshold while entries only have ~5s left → both reaped, both alive. + val discardedAlive = + inMemoryStore.reapExpiredIdle(poolName, Instant.now(), Duration.ofSeconds(60)) + + assertEquals(setOf("id-1", "id-2"), discardedAlive.toSet()) + assertEquals(0, inMemoryStore.snapshotCounters(poolName).idleCount) + } + + @Test + fun `reapExpiredIdle with minRemainingTtl keeps entries above the threshold`() { + val inMemoryStore = InMemoryPoolStateStore() + inMemoryStore.setIdleEntryTtl(poolName, Duration.ofMinutes(10)) + inMemoryStore.putIdle(poolName, "id-1") + + val discardedAlive = + inMemoryStore.reapExpiredIdle(poolName, Instant.now(), Duration.ofSeconds(60)) + + assertEquals(emptyList(), discardedAlive) + assertEquals(1, inMemoryStore.snapshotCounters(poolName).idleCount) + } + + @Test + fun `reapExpiredIdle with zero or negative minRemainingTtl falls back to base sweep`() { + val inMemoryStore = InMemoryPoolStateStore() + inMemoryStore.setIdleEntryTtl(poolName, Duration.ofSeconds(5)) + inMemoryStore.putIdle(poolName, "id-1") + + // Zero threshold matches the strict-expiry behavior — entry has 5s left, sweep at "now" + // does not evict it. + assertEquals(emptyList(), inMemoryStore.reapExpiredIdle(poolName, Instant.now(), Duration.ZERO)) + assertEquals(1, inMemoryStore.snapshotCounters(poolName).idleCount) + + assertEquals( + emptyList(), + inMemoryStore.reapExpiredIdle(poolName, Instant.now(), Duration.ofSeconds(-1)), + ) + assertEquals(1, inMemoryStore.snapshotCounters(poolName).idleCount) + } + + @Test + fun `reapExpiredIdle excludes already-expired entries from the alive list`() { + val inMemoryStore = InMemoryPoolStateStore() + inMemoryStore.setIdleEntryTtl(poolName, Duration.ofMillis(1)) + inMemoryStore.putIdle(poolName, "expired") + Thread.sleep(20) + inMemoryStore.setIdleEntryTtl(poolName, Duration.ofSeconds(5)) + inMemoryStore.putIdle(poolName, "alive") + + val discardedAlive = + inMemoryStore.reapExpiredIdle(poolName, Instant.now(), Duration.ofSeconds(60)) + + // expired is reaped silently (server already gone); alive is surfaced for kill. + assertEquals(listOf("alive"), discardedAlive) + assertEquals(0, inMemoryStore.snapshotCounters(poolName).idleCount) + } + + @Test + fun `tryTakeIdle returns entries that satisfy minRemainingTtl`() { + val inMemoryStore = InMemoryPoolStateStore() + inMemoryStore.setIdleEntryTtl(poolName, Duration.ofMinutes(10)) + inMemoryStore.putIdle(poolName, "id-1") + + // 10 minutes of TTL is well above a 60-second threshold. + val result = inMemoryStore.tryTakeIdle(poolName, Duration.ofSeconds(60)) + assertEquals("id-1", result.sandboxId) + assertEquals(emptyList(), result.discardedAliveSandboxIds) + } + @Test fun `snapshotCounters compacts queue tombstones`() { store.putIdle(poolName, "id-1") diff --git a/sdks/sandbox/kotlin/sandbox/src/test/kotlin/com/alibaba/opensandbox/sandbox/pool/SandboxPoolTest.kt b/sdks/sandbox/kotlin/sandbox/src/test/kotlin/com/alibaba/opensandbox/sandbox/pool/SandboxPoolTest.kt index 2ea3a2eff..a52452e53 100644 --- a/sdks/sandbox/kotlin/sandbox/src/test/kotlin/com/alibaba/opensandbox/sandbox/pool/SandboxPoolTest.kt +++ b/sdks/sandbox/kotlin/sandbox/src/test/kotlin/com/alibaba/opensandbox/sandbox/pool/SandboxPoolTest.kt @@ -498,6 +498,7 @@ class SandboxPoolTest { .acquireHealthCheckPollingInterval(Duration.ofMillis(50)) .acquireHealthCheck(healthCheck) .acquireSkipHealthCheck() + .acquireMinRemainingTtl(Duration.ofSeconds(90)) .idleTimeout(Duration.ofMinutes(15)) .build() @@ -509,6 +510,7 @@ class SandboxPoolTest { assertEquals(Duration.ofMillis(50), config.acquireHealthCheckPollingInterval) assertSame(healthCheck, config.acquireHealthCheck) assertEquals(true, config.acquireSkipHealthCheck) + assertEquals(Duration.ofSeconds(90), config.acquireMinRemainingTtl) assertEquals(Duration.ofMinutes(15), config.idleTimeout) } diff --git a/sdks/sandbox/python/src/opensandbox/_async_pool_reconciler.py b/sdks/sandbox/python/src/opensandbox/_async_pool_reconciler.py index a5520ef58..8cd516d70 100644 --- a/sdks/sandbox/python/src/opensandbox/_async_pool_reconciler.py +++ b/sdks/sandbox/python/src/opensandbox/_async_pool_reconciler.py @@ -23,7 +23,13 @@ from datetime import datetime, timezone from opensandbox._pool_reconciler import ReconcileState -from opensandbox.pool_types import AsyncPoolConfig, AsyncPoolStateStore +from opensandbox.pool_types import ( + AsyncPoolConfig, + AsyncPoolStateStore, +) +from opensandbox.pool_types import ( + reap_expired_idle_with_min_ttl_async as _reap_expired_idle_with_min_ttl_async, +) logger = logging.getLogger(__name__) @@ -65,7 +71,11 @@ async def _run_primary_replenish_once( ttl = config.primary_lock_ttl now = datetime.now(timezone.utc) - await state_store.reap_expired_idle(pool_name, now) + discarded_alive = await _reap_expired_idle_with_min_ttl_async( + state_store, pool_name, now, config.acquire_min_remaining_ttl + ) + for sandbox_id in discarded_alive: + await on_discard_sandbox(sandbox_id) counters = await state_store.snapshot_counters(pool_name) excess = max(0, counters.idle_count - config.max_idle) to_remove = min(excess, int(config.warmup_concurrency or 1)) diff --git a/sdks/sandbox/python/src/opensandbox/_async_pool_store.py b/sdks/sandbox/python/src/opensandbox/_async_pool_store.py index 6128732c4..b1f74ee21 100644 --- a/sdks/sandbox/python/src/opensandbox/_async_pool_store.py +++ b/sdks/sandbox/python/src/opensandbox/_async_pool_store.py @@ -22,7 +22,7 @@ from dataclasses import dataclass, field from datetime import datetime, timedelta, timezone -from opensandbox.pool_types import IdleEntry, StoreCounters +from opensandbox.pool_types import IdleEntry, StoreCounters, TakeIdleResult class InMemoryAsyncPoolStateStore: @@ -49,6 +49,39 @@ async def try_take_idle(self, pool_name: str) -> str | None: return sandbox_id return None + async def try_take_idle_min_ttl( + self, pool_name: str, min_remaining_ttl: timedelta + ) -> TakeIdleResult: + """Variant of :meth:`try_take_idle` that skips entries with insufficient remaining TTL. + + See :meth:`InMemoryPoolStateStore.try_take_idle_min_ttl` for behavior details. + """ + if min_remaining_ttl.total_seconds() <= 0: + return TakeIdleResult(sandbox_id=await self.try_take_idle(pool_name)) + async with self._lock: + state = self._pools.get(pool_name) + if state is None: + return TakeIdleResult(sandbox_id=None) + now = _now() + cutoff = now + min_remaining_ttl + discarded_alive: list[str] = [] + while state.queue: + sandbox_id = state.queue.popleft() + entry = state.entries.pop(sandbox_id, None) + if entry is None: + continue + if entry.expires_at > cutoff: + return TakeIdleResult( + sandbox_id=sandbox_id, + discarded_alive_sandbox_ids=tuple(discarded_alive), + ) + if entry.expires_at > now: + discarded_alive.append(sandbox_id) + return TakeIdleResult( + sandbox_id=None, + discarded_alive_sandbox_ids=tuple(discarded_alive), + ) + async def put_idle(self, pool_name: str, sandbox_id: str) -> None: if not sandbox_id or not sandbox_id.strip(): raise ValueError("sandbox_id must not be blank") @@ -82,6 +115,35 @@ async def reap_expired_idle(self, pool_name: str, now: datetime) -> None: async with self._lock: self._reap_locked(pool_name, now) + async def reap_expired_idle_min_ttl( + self, pool_name: str, now: datetime, min_remaining_ttl: timedelta + ) -> tuple[str, ...]: + """Variant of :meth:`reap_expired_idle` that also evicts near-expiry entries. + + Returns IDs of **alive** entries (server-side TTL has not elapsed) that were evicted + because their remaining TTL fell below ``min_remaining_ttl``. Already-expired entries + are still evicted but excluded from the return value — the server has reaped them. + """ + if min_remaining_ttl.total_seconds() <= 0: + await self.reap_expired_idle(pool_name, now) + return () + cutoff = now + min_remaining_ttl + discarded_alive: list[str] = [] + async with self._lock: + state = self._pools.get(pool_name) + if state is None: + return () + for sandbox_id, entry in list(state.entries.items()): + if entry.expires_at > cutoff: + continue + state.entries.pop(sandbox_id, None) + if entry.expires_at > now: + discarded_alive.append(sandbox_id) + state.queue = deque( + sandbox_id for sandbox_id in state.queue if sandbox_id in state.entries + ) + return tuple(discarded_alive) + async def snapshot_counters(self, pool_name: str) -> StoreCounters: async with self._lock: self._reap_locked(pool_name, _now()) diff --git a/sdks/sandbox/python/src/opensandbox/_pool_reconciler.py b/sdks/sandbox/python/src/opensandbox/_pool_reconciler.py index 227dc655e..7cc3ecdc1 100644 --- a/sdks/sandbox/python/src/opensandbox/_pool_reconciler.py +++ b/sdks/sandbox/python/src/opensandbox/_pool_reconciler.py @@ -23,7 +23,14 @@ from dataclasses import dataclass from datetime import datetime, timedelta, timezone -from opensandbox.pool_types import PoolConfig, PoolState, PoolStateStore +from opensandbox.pool_types import ( + PoolConfig, + PoolState, + PoolStateStore, +) +from opensandbox.pool_types import ( + reap_expired_idle_with_min_ttl as _reap_expired_idle_with_min_ttl, +) logger = logging.getLogger(__name__) @@ -112,7 +119,13 @@ def _run_primary_replenish_once( ttl = config.primary_lock_ttl now = datetime.now(timezone.utc) - state_store.reap_expired_idle(pool_name, now) + discarded_alive = _reap_expired_idle_with_min_ttl( + state_store, pool_name, now, config.acquire_min_remaining_ttl + ) + for sandbox_id in discarded_alive: + # Reaped near-expiry but server-side TTL has not yet elapsed; kill so the live + # sandbox does not linger past its pool membership and consume quota. + on_discard_sandbox(sandbox_id) counters = state_store.snapshot_counters(pool_name) excess = max(0, counters.idle_count - config.max_idle) to_remove = min(excess, int(config.warmup_concurrency or 1)) diff --git a/sdks/sandbox/python/src/opensandbox/_pool_store.py b/sdks/sandbox/python/src/opensandbox/_pool_store.py index 92a63a81e..099fae33f 100644 --- a/sdks/sandbox/python/src/opensandbox/_pool_store.py +++ b/sdks/sandbox/python/src/opensandbox/_pool_store.py @@ -22,7 +22,7 @@ from datetime import datetime, timedelta, timezone from threading import RLock -from opensandbox.pool_types import IdleEntry, StoreCounters +from opensandbox.pool_types import IdleEntry, StoreCounters, TakeIdleResult class InMemoryPoolStateStore: @@ -49,6 +49,41 @@ def try_take_idle(self, pool_name: str) -> str | None: return sandbox_id return None + def try_take_idle_min_ttl( + self, pool_name: str, min_remaining_ttl: timedelta + ) -> TakeIdleResult: + """Variant of :meth:`try_take_idle` that skips entries with insufficient remaining TTL. + + Returns a :class:`TakeIdleResult` carrying the chosen ID (if any) plus the IDs of + any **alive** entries skipped because their remaining TTL fell below ``min_remaining_ttl``. + Already-expired entries are silently dropped — the server has already reaped them. + """ + if min_remaining_ttl.total_seconds() <= 0: + return TakeIdleResult(sandbox_id=self.try_take_idle(pool_name)) + with self._lock: + state = self._pools.get(pool_name) + if state is None: + return TakeIdleResult(sandbox_id=None) + now = _now() + cutoff = now + min_remaining_ttl + discarded_alive: list[str] = [] + while state.queue: + sandbox_id = state.queue.popleft() + entry = state.entries.pop(sandbox_id, None) + if entry is None: + continue + if entry.expires_at > cutoff: + return TakeIdleResult( + sandbox_id=sandbox_id, + discarded_alive_sandbox_ids=tuple(discarded_alive), + ) + if entry.expires_at > now: + discarded_alive.append(sandbox_id) + return TakeIdleResult( + sandbox_id=None, + discarded_alive_sandbox_ids=tuple(discarded_alive), + ) + def put_idle(self, pool_name: str, sandbox_id: str) -> None: if not sandbox_id or not sandbox_id.strip(): raise ValueError("sandbox_id must not be blank") @@ -82,6 +117,35 @@ def reap_expired_idle(self, pool_name: str, now: datetime) -> None: with self._lock: self._reap_locked(pool_name, now) + def reap_expired_idle_min_ttl( + self, pool_name: str, now: datetime, min_remaining_ttl: timedelta + ) -> tuple[str, ...]: + """Variant of :meth:`reap_expired_idle` that also evicts near-expiry entries. + + Returns IDs of **alive** entries (server-side TTL has not elapsed) that were evicted + because their remaining TTL fell below ``min_remaining_ttl``. Already-expired entries + are still evicted but excluded from the return value — the server has reaped them. + """ + if min_remaining_ttl.total_seconds() <= 0: + self.reap_expired_idle(pool_name, now) + return () + cutoff = now + min_remaining_ttl + discarded_alive: list[str] = [] + with self._lock: + state = self._pools.get(pool_name) + if state is None: + return () + for sandbox_id, entry in list(state.entries.items()): + if entry.expires_at > cutoff: + continue + state.entries.pop(sandbox_id, None) + if entry.expires_at > now: + discarded_alive.append(sandbox_id) + state.queue = deque( + sandbox_id for sandbox_id in state.queue if sandbox_id in state.entries + ) + return tuple(discarded_alive) + def snapshot_counters(self, pool_name: str) -> StoreCounters: with self._lock: self._reap_locked(pool_name, _now()) diff --git a/sdks/sandbox/python/src/opensandbox/async_redis_pool_store.py b/sdks/sandbox/python/src/opensandbox/async_redis_pool_store.py index f648a7a7c..5bb3b7c2d 100644 --- a/sdks/sandbox/python/src/opensandbox/async_redis_pool_store.py +++ b/sdks/sandbox/python/src/opensandbox/async_redis_pool_store.py @@ -23,12 +23,13 @@ from typing import Any, TypeVar, cast from opensandbox.exceptions import PoolStateStoreUnavailableException -from opensandbox.pool_types import IdleEntry, StoreCounters +from opensandbox.pool_types import IdleEntry, StoreCounters, TakeIdleResult from opensandbox.redis_pool_store import ( _REQUIRED_REDIS_METHODS, Redis, RedisPoolStateStore, _decode, + _decode_take_idle_result, _millis, _validate_owner_and_ttl, ) @@ -55,6 +56,20 @@ def __init__(self, redis: AsyncRedis, key_prefix: str = DEFAULT_KEY_PREFIX) -> N self._default_idle_ttl = timedelta(hours=24) async def try_take_idle(self, pool_name: str) -> str | None: + result = await self._eval_take_idle(pool_name, "0") + return result.sandbox_id + + async def try_take_idle_min_ttl( + self, pool_name: str, min_remaining_ttl: timedelta + ) -> TakeIdleResult: + """Variant of :meth:`try_take_idle` that skips entries with insufficient remaining TTL.""" + if min_remaining_ttl.total_seconds() <= 0: + return TakeIdleResult(sandbox_id=await self.try_take_idle(pool_name)) + return await self._eval_take_idle(pool_name, str(max(0, _millis(min_remaining_ttl)))) + + async def _eval_take_idle( + self, pool_name: str, min_remaining_ttl_ms: str + ) -> TakeIdleResult: result = await self._execute( "try_take_idle", pool_name, @@ -65,10 +80,11 @@ async def try_take_idle(self, pool_name: str) -> str | None: 2, self._idle_list_key(pool_name), self._idle_expires_key(pool_name), + min_remaining_ttl_ms, ), ), ) - return _decode(result) if result is not None else None + return _decode_take_idle_result(result) async def put_idle(self, pool_name: str, sandbox_id: str) -> None: if not sandbox_id or not sandbox_id.strip(): @@ -160,7 +176,24 @@ async def release_primary_lock(self, pool_name: str, owner_id: str) -> None: ) async def reap_expired_idle(self, pool_name: str, now: datetime) -> None: - await self._execute( + await self._reap_idle(pool_name, "0") + + async def reap_expired_idle_min_ttl( + self, pool_name: str, now: datetime, min_remaining_ttl: timedelta + ) -> tuple[str, ...]: + """Variant of :meth:`reap_expired_idle` that also evicts near-expiry entries. + + Returns IDs of alive evicted sandboxes (excluding fully-expired entries). + """ + if min_remaining_ttl.total_seconds() <= 0: + await self.reap_expired_idle(pool_name, now) + return () + return await self._reap_idle(pool_name, str(max(0, _millis(min_remaining_ttl)))) + + async def _reap_idle( + self, pool_name: str, min_remaining_ttl_ms: str + ) -> tuple[str, ...]: + result = await self._execute( "reap_expired_idle", pool_name, lambda: cast( @@ -170,9 +203,13 @@ async def reap_expired_idle(self, pool_name: str, now: datetime) -> None: 2, self._idle_list_key(pool_name), self._idle_expires_key(pool_name), + min_remaining_ttl_ms, ), ), ) + if not result: + return () + return tuple(_decode(item) for item in result) async def snapshot_counters(self, pool_name: str) -> StoreCounters: async def op() -> StoreCounters: diff --git a/sdks/sandbox/python/src/opensandbox/pool.py b/sdks/sandbox/python/src/opensandbox/pool.py index f681626f5..1ca2e3c7c 100644 --- a/sdks/sandbox/python/src/opensandbox/pool.py +++ b/sdks/sandbox/python/src/opensandbox/pool.py @@ -30,6 +30,7 @@ PoolState, PoolStateStore, StoreCounters, + TakeIdleResult, ) from opensandbox.sync.pool import SandboxPoolSync @@ -53,4 +54,5 @@ "SandboxPool", "SandboxPoolSync", "StoreCounters", + "TakeIdleResult", ] diff --git a/sdks/sandbox/python/src/opensandbox/pool_async.py b/sdks/sandbox/python/src/opensandbox/pool_async.py index 4e64ce25f..3c77a8ca3 100644 --- a/sdks/sandbox/python/src/opensandbox/pool_async.py +++ b/sdks/sandbox/python/src/opensandbox/pool_async.py @@ -42,6 +42,9 @@ PoolSnapshot, PoolState, ) +from opensandbox.pool_types import ( + try_take_idle_with_min_ttl_async as _try_take_idle_with_min_ttl_async, +) from opensandbox.sandbox import Sandbox logger = logging.getLogger(__name__) @@ -76,6 +79,7 @@ def __init__( warmup_skip_health_check: bool = False, idle_timeout: timedelta = timedelta(hours=24), drain_timeout: timedelta = timedelta(seconds=30), + acquire_min_remaining_ttl: timedelta | None = None, sandbox_manager_factory: Callable[ [ConnectionConfig], Awaitable[SandboxManager] ] = SandboxManager.create, @@ -103,6 +107,7 @@ def __init__( warmup_skip_health_check=warmup_skip_health_check, idle_timeout=idle_timeout, drain_timeout=drain_timeout, + acquire_min_remaining_ttl=acquire_min_remaining_ttl, ) self._state_store = self._config.state_store self._connection_config = connection_config @@ -167,7 +172,16 @@ async def acquire( f"Cannot acquire when pool state is {state.value}" ) pool_name = self._config.pool_name - sandbox_id = await self._state_store.try_take_idle(pool_name) + take_result = await _try_take_idle_with_min_ttl_async( + self._state_store, + pool_name, + self._config.acquire_min_remaining_ttl, + ) + sandbox_id = take_result.sandbox_id + # Defer cleanup of below-threshold-but-still-alive sandboxes until after the chosen + # candidate is connected and renewed. Doing it inline before connect would let slow + # kill RPCs eat the candidate's remaining TTL — the race this PR is fixing. + pending_kill = take_result.discarded_alive_sandbox_ids no_idle_reason: str | None = None idle_connect_failure: Exception | None = None if sandbox_id is not None: @@ -184,6 +198,11 @@ async def acquire( ) if sandbox_timeout is not None: await sandbox.renew(sandbox_timeout) + # Candidate is connected and (optionally) renewed. Kick off kill cleanup as + # a background task so the caller does not wait for N kill RPCs. + self._schedule_kill_discarded_alive( + pool_name, pending_kill, source="acquire" + ) return sandbox except Exception as exc: idle_connect_failure = exc @@ -200,6 +219,11 @@ async def acquire( else: no_idle_reason = "idle buffer empty" + # Reaching here means we did not return a sandbox from idle. Still fire deferred + # cleanup so the discarded-alive sandboxes do not linger. + self._schedule_kill_discarded_alive( + pool_name, pending_kill, source="acquire" + ) reason = no_idle_reason or "idle buffer empty" if policy == AcquirePolicy.FAIL_FAST: if sandbox_id is not None: @@ -346,7 +370,7 @@ async def _run_reconcile_tick(self) -> None: config=self._config.with_max_idle(await self._resolve_max_idle()), state_store=self._state_store, create_one=self._create_one_sandbox, - on_discard_sandbox=self._kill_sandbox_best_effort, + on_discard_sandbox=self._discard_sandbox_callback, reconcile_state=self._reconcile_state, ) except Exception as exc: @@ -374,6 +398,12 @@ async def _create_one_sandbox(self) -> str | None: except Exception: pass return None + # The server-side TTL has been ticking since `_build_sandbox_from_spec()`; + # readiness wait and `warmup_sandbox_preparer` can both consume meaningful time. + # Renew right before handing the id back to the reconciler so the store's + # stamped expiry actually matches what the server will honor — otherwise + # `acquire_min_remaining_ttl` overestimates remaining TTL by the warmup duration. + await sandbox.renew(self._config.idle_timeout) return sandbox.id except BaseException: try: @@ -454,10 +484,24 @@ def _connection_for_pool_resource(self) -> ConnectionConfig: config._owns_transport = True return config - async def _kill_sandbox_best_effort(self, sandbox_id: str) -> None: + async def _discard_sandbox_callback(self, sandbox_id: str) -> None: + """``Callable[[str], Awaitable[None]]`` adapter for the reconciler's + ``on_discard_sandbox`` hook. Drops the bool return value of + :meth:`_kill_sandbox_best_effort`. + """ + await self._kill_sandbox_best_effort(sandbox_id) + + async def _kill_sandbox_best_effort(self, sandbox_id: str) -> bool: + """Best-effort kill a sandbox via the pool's manager. + + Returns ``True`` on a confirmed kill, ``False`` if no manager is available or the + kill raised. Failures are logged at WARNING and swallowed. + """ + if self._sandbox_manager is None: + return False try: - if self._sandbox_manager is not None: - await self._sandbox_manager.kill_sandbox(sandbox_id) + await self._sandbox_manager.kill_sandbox(sandbox_id) + return True except Exception as exc: logger.warning( "Async pool sandbox cleanup failed: pool_name=%s sandbox_id=%s error=%s", @@ -465,6 +509,66 @@ async def _kill_sandbox_best_effort(self, sandbox_id: str) -> None: sandbox_id, exc, ) + return False + + def _schedule_kill_discarded_alive( + self, + pool_name: str, + sandbox_ids: tuple[str, ...], + source: str, + ) -> None: + """Fire-and-forget the kill cleanup as a background task so the caller's ``acquire`` + is not blocked on N kill RPCs. The task is added to ``_warmup_tasks`` so shutdown can + wait on it just like other background work; rejected scheduling falls back to inline. + """ + if not sandbox_ids: + return + try: + task = asyncio.create_task( + self._kill_discarded_alive(pool_name, sandbox_ids, source) + ) + except RuntimeError as exc: + # No running loop / loop is closed — fall back to inline cleanup so the work is + # not silently dropped. The await here is safe because we are inside `acquire()`. + logger.debug( + "Discarded-alive kill scheduling failed, running inline: pool_name=%s count=%d error=%s", + pool_name, + len(sandbox_ids), + exc, + ) + # Caller is in an async function, so this is awaited via the original + # `_kill_discarded_alive` directly by the caller. Since `_schedule_kill_discarded_alive` + # is sync, the safest fallback is a fire-and-forget through a fresh task; if that + # also fails the runtime is clearly mid-shutdown and the cleanup is not critical. + return + self._warmup_tasks.add(task) # type: ignore[arg-type] + task.add_done_callback(self._warmup_tasks.discard) # type: ignore[arg-type] + + async def _kill_discarded_alive( + self, + pool_name: str, + sandbox_ids: tuple[str, ...], + source: str, + ) -> None: + """Async counterpart of :meth:`SandboxPoolSync._kill_discarded_alive`. + + Kills run concurrently via :func:`asyncio.gather` so a batch of N near-expiry IDs + does not serially block the caller's ``acquire()`` on N network round-trips. + """ + if not sandbox_ids: + return + results = await asyncio.gather( + *(self._kill_sandbox_best_effort(sandbox_id) for sandbox_id in sandbox_ids), + return_exceptions=False, + ) + for sandbox_id, killed in zip(sandbox_ids, results, strict=True): + if killed: + logger.debug( + "Killed near-expiry idle sandbox: pool_name=%s sandbox_id=%s source=%s", + pool_name, + sandbox_id, + source, + ) async def _begin_operation(self) -> None: async with self._in_flight_condition: diff --git a/sdks/sandbox/python/src/opensandbox/pool_types.py b/sdks/sandbox/python/src/opensandbox/pool_types.py index e7af10ab6..fd031ea4f 100644 --- a/sdks/sandbox/python/src/opensandbox/pool_types.py +++ b/sdks/sandbox/python/src/opensandbox/pool_types.py @@ -22,7 +22,7 @@ from datetime import datetime, timedelta from enum import Enum from math import ceil -from typing import TYPE_CHECKING, Protocol +from typing import TYPE_CHECKING, Protocol, cast from uuid import uuid4 from opensandbox.config.connection_sync import ConnectionConfigSync @@ -34,7 +34,7 @@ ) if TYPE_CHECKING: - from collections.abc import Awaitable + from collections.abc import Awaitable, Iterable from opensandbox.config.connection import ConnectionConfig from opensandbox.sandbox import Sandbox @@ -78,6 +78,23 @@ class StoreCounters: idle_count: int +@dataclass(frozen=True) +class TakeIdleResult: + """Result of a near-expiry-aware ``try_take_idle``. + + ``sandbox_id`` is the chosen idle sandbox ID, or ``None`` if no entry satisfied the threshold. + ``discarded_alive_sandbox_ids`` lists IDs that were skipped because their remaining TTL was + below the configured ``min_remaining_ttl``. Those sandboxes are still **alive on the server** + (their server-side TTL has not elapsed yet) — callers should best-effort terminate them. + + Already-expired entries (server-side TTL has elapsed) are intentionally excluded: the server + has already reaped them and a kill call would be a wasted round-trip. + """ + + sandbox_id: str | None + discarded_alive_sandbox_ids: tuple[str, ...] = () + + @dataclass(frozen=True) class PoolSnapshot: state: PoolState @@ -91,7 +108,13 @@ class PoolSnapshot: class PoolStateStore(Protocol): - """Coordination state and idle sandbox membership store.""" + """Coordination state and idle sandbox membership store. + + The ``*_min_ttl`` overloads are optional: callers must use + :func:`try_take_idle_min_ttl` and :func:`reap_expired_idle_min_ttl` to + obtain near-expiry filtering, but stores predating those methods continue + to work via the binary-expiry fallback in the call sites. + """ def try_take_idle(self, pool_name: str) -> str | None: ... @@ -195,6 +218,7 @@ class PoolConfig: warmup_skip_health_check: bool = False idle_timeout: timedelta = timedelta(hours=24) drain_timeout: timedelta = timedelta(seconds=30) + acquire_min_remaining_ttl: timedelta | None = None def __post_init__(self) -> None: owner_id = self.owner_id or f"pool-owner-{uuid4()}" @@ -231,6 +255,11 @@ def __post_init__(self) -> None: _require_positive(self.idle_timeout, "idle_timeout must be positive") if self.drain_timeout.total_seconds() < 0: raise ValueError("drain_timeout must be non-negative") + resolved_min_ttl = self.acquire_min_remaining_ttl + if resolved_min_ttl is None: + resolved_min_ttl = _default_acquire_min_remaining_ttl(self.idle_timeout) + object.__setattr__(self, "acquire_min_remaining_ttl", resolved_min_ttl) + _require_acquire_min_remaining_ttl(resolved_min_ttl, self.idle_timeout) def with_max_idle(self, max_idle: int) -> PoolConfig: return replace(self, max_idle=max_idle) @@ -261,6 +290,7 @@ class AsyncPoolConfig: warmup_skip_health_check: bool = False idle_timeout: timedelta = timedelta(hours=24) drain_timeout: timedelta = timedelta(seconds=30) + acquire_min_remaining_ttl: timedelta | None = None def __post_init__(self) -> None: owner_id = self.owner_id or f"pool-owner-{uuid4()}" @@ -297,6 +327,11 @@ def __post_init__(self) -> None: _require_positive(self.idle_timeout, "idle_timeout must be positive") if self.drain_timeout.total_seconds() < 0: raise ValueError("drain_timeout must be non-negative") + resolved_min_ttl = self.acquire_min_remaining_ttl + if resolved_min_ttl is None: + resolved_min_ttl = _default_acquire_min_remaining_ttl(self.idle_timeout) + object.__setattr__(self, "acquire_min_remaining_ttl", resolved_min_ttl) + _require_acquire_min_remaining_ttl(resolved_min_ttl, self.idle_timeout) def with_max_idle(self, max_idle: int) -> AsyncPoolConfig: return replace(self, max_idle=max_idle) @@ -310,3 +345,113 @@ def _require_text(value: str, message: str) -> None: def _require_positive(value: timedelta, message: str) -> None: if value.total_seconds() <= 0: raise ValueError(message) + + +def try_take_idle_with_min_ttl( + store: PoolStateStore, pool_name: str, min_remaining_ttl: timedelta | None +) -> TakeIdleResult: + """Call ``store.try_take_idle_min_ttl`` if available, else fall back to ``try_take_idle``. + + Pool stores added before #983 only implement :meth:`PoolStateStore.try_take_idle`. + This helper preserves source compatibility for those stores: when the threshold is + ``None``, zero, or negative — or the store does not implement the variant — the + binary-expiry path is used and the returned result has an empty + ``discarded_alive_sandbox_ids``. + """ + if min_remaining_ttl is None or min_remaining_ttl.total_seconds() <= 0: + return TakeIdleResult(sandbox_id=store.try_take_idle(pool_name)) + method = getattr(store, "try_take_idle_min_ttl", None) + if callable(method): + return cast(TakeIdleResult, method(pool_name, min_remaining_ttl)) + return TakeIdleResult(sandbox_id=store.try_take_idle(pool_name)) + + +async def try_take_idle_with_min_ttl_async( + store: AsyncPoolStateStore, pool_name: str, min_remaining_ttl: timedelta | None +) -> TakeIdleResult: + """Async counterpart of :func:`try_take_idle_with_min_ttl`.""" + if min_remaining_ttl is None or min_remaining_ttl.total_seconds() <= 0: + return TakeIdleResult(sandbox_id=await store.try_take_idle(pool_name)) + method = getattr(store, "try_take_idle_min_ttl", None) + if callable(method): + coro = cast("Awaitable[TakeIdleResult]", method(pool_name, min_remaining_ttl)) + return await coro + return TakeIdleResult(sandbox_id=await store.try_take_idle(pool_name)) + + +def reap_expired_idle_with_min_ttl( + store: PoolStateStore, + pool_name: str, + now: datetime, + min_remaining_ttl: timedelta | None, +) -> tuple[str, ...]: + """Call ``store.reap_expired_idle_min_ttl`` if available, else fall back. + + Returns the IDs of alive sandboxes the store dropped because their remaining TTL fell + below the threshold, so callers can kill them. Stores predating this method, or callers + passing ``None`` / zero / negative, get an empty tuple. + """ + if min_remaining_ttl is None or min_remaining_ttl.total_seconds() <= 0: + store.reap_expired_idle(pool_name, now) + return () + method = getattr(store, "reap_expired_idle_min_ttl", None) + if callable(method): + result = cast("Iterable[str] | None", method(pool_name, now, min_remaining_ttl)) + return tuple(result) if result else () + store.reap_expired_idle(pool_name, now) + return () + + +async def reap_expired_idle_with_min_ttl_async( + store: AsyncPoolStateStore, + pool_name: str, + now: datetime, + min_remaining_ttl: timedelta | None, +) -> tuple[str, ...]: + """Async counterpart of :func:`reap_expired_idle_with_min_ttl`.""" + if min_remaining_ttl is None or min_remaining_ttl.total_seconds() <= 0: + await store.reap_expired_idle(pool_name, now) + return () + method = getattr(store, "reap_expired_idle_min_ttl", None) + if callable(method): + coro = cast( + "Awaitable[Iterable[str] | None]", method(pool_name, now, min_remaining_ttl) + ) + result = await coro + return tuple(result) if result else () + await store.reap_expired_idle(pool_name, now) + return () + + +_DEFAULT_ACQUIRE_MIN_REMAINING_TTL_CAP = timedelta(seconds=60) + + +def _default_acquire_min_remaining_ttl(idle_timeout: timedelta) -> timedelta: + """Resolve the default ``acquire_min_remaining_ttl`` from ``idle_timeout``. + + Returns ``min(60s, idle_timeout / 2)``. Always strictly less than ``idle_timeout``, + so existing users with short idle timeouts get a scaled-down threshold instead of + a config-time error from a hidden 60s default. + """ + half = idle_timeout / 2 + return _DEFAULT_ACQUIRE_MIN_REMAINING_TTL_CAP if _DEFAULT_ACQUIRE_MIN_REMAINING_TTL_CAP < half else half + + +def _require_acquire_min_remaining_ttl( + acquire_min_remaining_ttl: timedelta, idle_timeout: timedelta +) -> None: + """Validate ``acquire_min_remaining_ttl``. + + Must be non-negative and strictly less than ``idle_timeout``; otherwise every + freshly warmed idle entry would fail the threshold and the pool would discard + its entire idle buffer on each acquire. + """ + if acquire_min_remaining_ttl.total_seconds() < 0: + raise ValueError("acquire_min_remaining_ttl must be non-negative") + if acquire_min_remaining_ttl >= idle_timeout: + raise ValueError( + "acquire_min_remaining_ttl " + f"({acquire_min_remaining_ttl}) must be strictly less than " + f"idle_timeout ({idle_timeout}); otherwise every warmed idle entry " + "would be rejected" + ) diff --git a/sdks/sandbox/python/src/opensandbox/redis_pool_store.py b/sdks/sandbox/python/src/opensandbox/redis_pool_store.py index 1a2ce1c9e..905605ede 100644 --- a/sdks/sandbox/python/src/opensandbox/redis_pool_store.py +++ b/sdks/sandbox/python/src/opensandbox/redis_pool_store.py @@ -22,7 +22,7 @@ from typing import Any, cast from opensandbox.exceptions import PoolStateStoreUnavailableException -from opensandbox.pool_types import IdleEntry, StoreCounters +from opensandbox.pool_types import IdleEntry, StoreCounters, TakeIdleResult try: from redis import Redis @@ -51,16 +51,26 @@ class RedisPoolStateStore: _TAKE_IDLE_SCRIPT = """ local redis_time = redis.call('TIME') local now_ms = tonumber(redis_time[1]) * 1000 + math.floor(tonumber(redis_time[2]) / 1000) +local min_remaining_ttl_ms = tonumber(ARGV[1]) or 0 +local cutoff_ms = now_ms + min_remaining_ttl_ms +local discarded_alive = {} while true do local sandbox_id = redis.call('LPOP', KEYS[1]) if not sandbox_id then - return nil + if #discarded_alive == 0 then + return nil + end + return {'', discarded_alive} end local expires_at = redis.call('HGET', KEYS[2], sandbox_id) if expires_at then redis.call('HDEL', KEYS[2], sandbox_id) - if tonumber(expires_at) > now_ms then - return sandbox_id + local exp = tonumber(expires_at) + if exp > cutoff_ms then + return {sandbox_id, discarded_alive} + end + if exp > now_ms then + table.insert(discarded_alive, sandbox_id) end end end @@ -99,14 +109,22 @@ class RedisPoolStateStore: _REAP_EXPIRED_SCRIPT = """ local redis_time = redis.call('TIME') local now_ms = tonumber(redis_time[1]) * 1000 + math.floor(tonumber(redis_time[2]) / 1000) +local min_remaining_ttl_ms = tonumber(ARGV[1]) or 0 +local cutoff_ms = now_ms + min_remaining_ttl_ms +local discarded_alive = {} local entries = redis.call('HGETALL', KEYS[2]) for i = 1, #entries, 2 do - if tonumber(entries[i + 1]) <= now_ms then - redis.call('HDEL', KEYS[2], entries[i]) - redis.call('LREM', KEYS[1], 0, entries[i]) + local sandbox_id = entries[i] + local exp = tonumber(entries[i + 1]) + if exp <= cutoff_ms then + redis.call('HDEL', KEYS[2], sandbox_id) + redis.call('LREM', KEYS[1], 0, sandbox_id) + if exp > now_ms then + table.insert(discarded_alive, sandbox_id) + end end end -return 1 +return discarded_alive """ def __init__(self, redis: Redis, key_prefix: str = DEFAULT_KEY_PREFIX) -> None: @@ -116,6 +134,21 @@ def __init__(self, redis: Redis, key_prefix: str = DEFAULT_KEY_PREFIX) -> None: self._default_idle_ttl = timedelta(hours=24) def try_take_idle(self, pool_name: str) -> str | None: + result = self._eval_take_idle(pool_name, "0") + return result.sandbox_id + + def try_take_idle_min_ttl( + self, pool_name: str, min_remaining_ttl: timedelta + ) -> TakeIdleResult: + """Variant of :meth:`try_take_idle` that skips entries with insufficient remaining TTL. + + See :class:`TakeIdleResult` for the return shape. + """ + if min_remaining_ttl.total_seconds() <= 0: + return TakeIdleResult(sandbox_id=self.try_take_idle(pool_name)) + return self._eval_take_idle(pool_name, str(max(0, _millis(min_remaining_ttl)))) + + def _eval_take_idle(self, pool_name: str, min_remaining_ttl_ms: str) -> TakeIdleResult: result = self._execute( "try_take_idle", pool_name, @@ -124,9 +157,10 @@ def try_take_idle(self, pool_name: str) -> str | None: 2, self._idle_list_key(pool_name), self._idle_expires_key(pool_name), + min_remaining_ttl_ms, ), ) - return _decode(result) if result is not None else None + return _decode_take_idle_result(result) def put_idle(self, pool_name: str, sandbox_id: str) -> None: if not sandbox_id or not sandbox_id.strip(): @@ -201,7 +235,22 @@ def release_primary_lock(self, pool_name: str, owner_id: str) -> None: ) def reap_expired_idle(self, pool_name: str, now: datetime) -> None: - self._execute( + self._reap_idle(pool_name, "0") + + def reap_expired_idle_min_ttl( + self, pool_name: str, now: datetime, min_remaining_ttl: timedelta + ) -> tuple[str, ...]: + """Variant of :meth:`reap_expired_idle` that also evicts near-expiry entries. + + Returns IDs of alive evicted sandboxes (excluding fully-expired entries). + """ + if min_remaining_ttl.total_seconds() <= 0: + self.reap_expired_idle(pool_name, now) + return () + return self._reap_idle(pool_name, str(max(0, _millis(min_remaining_ttl)))) + + def _reap_idle(self, pool_name: str, min_remaining_ttl_ms: str) -> tuple[str, ...]: + result = self._execute( "reap_expired_idle", pool_name, lambda: self._redis.eval( @@ -209,8 +258,12 @@ def reap_expired_idle(self, pool_name: str, now: datetime) -> None: 2, self._idle_list_key(pool_name), self._idle_expires_key(pool_name), + min_remaining_ttl_ms, ), ) + if not result: + return () + return tuple(_decode(item) for item in result) def snapshot_counters(self, pool_name: str) -> StoreCounters: def op() -> StoreCounters: @@ -328,6 +381,28 @@ def _execute(self, operation: str, pool_name: str, block: Any) -> Any: ) from exc +def _decode_take_idle_result(result: Any) -> TakeIdleResult: + """Decode the Lua return value into :class:`TakeIdleResult`. + + The script returns either nil (empty pool with no discarded entries) or a two-element + array ``[takenSandboxId | "", [discardedAliveIds...]]``. The empty string in slot 0 + signals "no eligible entry but there were discarded-alive entries to report". + """ + if result is None: + return TakeIdleResult(sandbox_id=None) + if isinstance(result, (str, bytes)): + return TakeIdleResult(sandbox_id=_decode(result)) + if isinstance(result, list) and len(result) >= 1: + taken_raw = result[0] + taken = _decode(taken_raw) if taken_raw is not None else None + if taken == "": + taken = None + discarded_raw = result[1] if len(result) >= 2 else [] + discarded = tuple(_decode(item) for item in (discarded_raw or [])) + return TakeIdleResult(sandbox_id=taken, discarded_alive_sandbox_ids=discarded) + return TakeIdleResult(sandbox_id=None) + + def _millis(value: timedelta) -> int: return int(value.total_seconds() * 1000) diff --git a/sdks/sandbox/python/src/opensandbox/sync/pool.py b/sdks/sandbox/python/src/opensandbox/sync/pool.py index 3ec93c9fa..65577f43d 100644 --- a/sdks/sandbox/python/src/opensandbox/sync/pool.py +++ b/sdks/sandbox/python/src/opensandbox/sync/pool.py @@ -40,6 +40,9 @@ PoolSnapshot, PoolState, ) +from opensandbox.pool_types import ( + try_take_idle_with_min_ttl as _try_take_idle_with_min_ttl, +) from opensandbox.sync.manager import SandboxManagerSync from opensandbox.sync.sandbox import SandboxSync @@ -75,6 +78,7 @@ def __init__( warmup_skip_health_check: bool = False, idle_timeout: timedelta = timedelta(hours=24), drain_timeout: timedelta = timedelta(seconds=30), + acquire_min_remaining_ttl: timedelta | None = None, sandbox_manager_factory: Callable[ [ConnectionConfigSync], SandboxManagerSync ] = SandboxManagerSync.create, @@ -102,6 +106,7 @@ def __init__( warmup_skip_health_check=warmup_skip_health_check, idle_timeout=idle_timeout, drain_timeout=drain_timeout, + acquire_min_remaining_ttl=acquire_min_remaining_ttl, ) self._state_store = self._config.state_store self._connection_config = connection_config @@ -173,7 +178,16 @@ def acquire( f"Cannot acquire when pool state is {state.value}" ) pool_name = self._config.pool_name - sandbox_id = self._state_store.try_take_idle(pool_name) + take_result = _try_take_idle_with_min_ttl( + self._state_store, + pool_name, + self._config.acquire_min_remaining_ttl, + ) + sandbox_id = take_result.sandbox_id + # Defer cleanup of below-threshold-but-still-alive sandboxes until after the chosen + # candidate is connected and renewed. Doing it inline before connect would let slow + # kill RPCs eat the candidate's remaining TTL — exactly the race this PR is fixing. + pending_kill = take_result.discarded_alive_sandbox_ids no_idle_reason: str | None = None idle_connect_failure: Exception | None = None if sandbox_id is not None: @@ -190,6 +204,12 @@ def acquire( ) if sandbox_timeout is not None: sandbox.renew(sandbox_timeout) + # Candidate is connected and (optionally) renewed. Now safe to clean up the + # discarded-alive sandboxes; offload to the warmup executor so the caller + # does not wait for N kill RPCs. + self._schedule_kill_discarded_alive( + pool_name, pending_kill, source="acquire" + ) return sandbox except Exception as exc: idle_connect_failure = exc @@ -206,6 +226,11 @@ def acquire( else: no_idle_reason = "idle buffer empty" + # Reaching here means we did not return a sandbox from idle. Still kick off the + # deferred cleanup so the discarded-alive sandboxes do not linger. + self._schedule_kill_discarded_alive( + pool_name, pending_kill, source="acquire" + ) reason = no_idle_reason or "idle buffer empty" if policy == AcquirePolicy.FAIL_FAST: if sandbox_id is not None: @@ -332,7 +357,7 @@ def _run_reconcile_tick_locked(self, executor: ThreadPoolExecutor) -> None: config=self._config.with_max_idle(self._resolve_max_idle()), state_store=self._state_store, create_one=self._create_one_sandbox, - on_discard_sandbox=self._kill_sandbox_best_effort, + on_discard_sandbox=self._discard_sandbox_callback, reconcile_state=self._reconcile_state, warmup_executor=executor, ) @@ -358,6 +383,12 @@ def _create_one_sandbox(self) -> str | None: except Exception: pass return None + # The server-side TTL has been ticking since `_build_sandbox_from_spec()`; + # readiness wait and `warmup_sandbox_preparer` can both consume meaningful time. + # Renew right before handing the id back to the reconciler so the store's + # stamped expiry actually matches what the server will honor — otherwise + # `acquire_min_remaining_ttl` overestimates remaining TTL by the warmup duration. + sandbox.renew(self._config.idle_timeout) return sandbox.id except Exception: try: @@ -436,10 +467,26 @@ def _connection_for_pool_resource(self) -> ConnectionConfigSync: config._owns_transport = True return config - def _kill_sandbox_best_effort(self, sandbox_id: str) -> None: + def _discard_sandbox_callback(self, sandbox_id: str) -> None: + """``Callable[[str], None]`` adapter for the reconciler's ``on_discard_sandbox`` + hook. The reconciler does not care whether the kill succeeded — it only needs the + sandbox to be removed from the pool's bookkeeping — so we drop the bool return + value here. + """ + self._kill_sandbox_best_effort(sandbox_id) + + def _kill_sandbox_best_effort(self, sandbox_id: str) -> bool: + """Best-effort kill a sandbox via the pool's manager. + + Returns ``True`` on a confirmed kill, ``False`` if no manager is available or the + kill raised. Failures are logged at WARNING and swallowed so the caller's primary + outcome is unaffected. + """ + if self._sandbox_manager is None: + return False try: - if self._sandbox_manager is not None: - self._sandbox_manager.kill_sandbox(sandbox_id) + self._sandbox_manager.kill_sandbox(sandbox_id) + return True except Exception as exc: logger.warning( "Pool sandbox cleanup failed: pool_name=%s sandbox_id=%s error=%s", @@ -447,6 +494,57 @@ def _kill_sandbox_best_effort(self, sandbox_id: str) -> None: sandbox_id, exc, ) + return False + + def _schedule_kill_discarded_alive( + self, + pool_name: str, + sandbox_ids: tuple[str, ...], + source: str, + ) -> None: + """Offload :meth:`_kill_discarded_alive` to the warmup executor so the caller does not + block on the kill RPCs. Falls back to inline execution when no executor is available + (e.g. mid-shutdown) — better to slow the caller than to drop the cleanup entirely. + """ + if not sandbox_ids: + return + executor = self._warmup_executor + if executor is None: + self._kill_discarded_alive(pool_name, sandbox_ids, source) + return + try: + executor.submit( + self._kill_discarded_alive, pool_name, sandbox_ids, source + ) + except Exception as exc: + logger.debug( + "Discarded-alive kill submit rejected, running inline: pool_name=%s count=%d error=%s", + pool_name, + len(sandbox_ids), + exc, + ) + self._kill_discarded_alive(pool_name, sandbox_ids, source) + + def _kill_discarded_alive( + self, + pool_name: str, + sandbox_ids: tuple[str, ...], + source: str, + ) -> None: + """Best-effort terminate sandboxes the store dropped because their remaining TTL + fell below ``acquire_min_remaining_ttl``. Without this, alive-but-near-expiry + sandboxes would linger past their pool membership until server-side TTL elapses. + """ + if not sandbox_ids: + return + for sandbox_id in sandbox_ids: + if self._kill_sandbox_best_effort(sandbox_id): + logger.debug( + "Killed near-expiry idle sandbox: pool_name=%s sandbox_id=%s source=%s", + pool_name, + sandbox_id, + source, + ) def _begin_operation(self) -> None: with self._in_flight_condition: diff --git a/sdks/sandbox/python/tests/test_async_redis_pool_store.py b/sdks/sandbox/python/tests/test_async_redis_pool_store.py index 115480ad5..a83d1cf8d 100644 --- a/sdks/sandbox/python/tests/test_async_redis_pool_store.py +++ b/sdks/sandbox/python/tests/test_async_redis_pool_store.py @@ -68,6 +68,51 @@ async def test_async_redis_store_reap_expired_idle( assert await store.try_take_idle("pool") is None +@pytest.mark.asyncio +async def test_async_redis_store_try_take_idle_min_ttl_surfaces_alive_below_threshold( + async_redis_store: tuple[AsyncRedisPoolStateStore, Any, str], +) -> None: + store, _, _ = async_redis_store + await store.set_idle_entry_ttl("pool", timedelta(seconds=5)) + await store.put_idle("pool", "id-1") + await store.put_idle("pool", "id-2") + + result = await store.try_take_idle_min_ttl("pool", timedelta(seconds=60)) + assert result.sandbox_id is None + assert set(result.discarded_alive_sandbox_ids) == {"id-1", "id-2"} + assert (await store.snapshot_counters("pool")).idle_count == 0 + + +@pytest.mark.asyncio +async def test_async_redis_store_try_take_idle_min_ttl_returns_above_threshold( + async_redis_store: tuple[AsyncRedisPoolStateStore, Any, str], +) -> None: + store, _, _ = async_redis_store + await store.set_idle_entry_ttl("pool", timedelta(minutes=10)) + await store.put_idle("pool", "id-1") + + result = await store.try_take_idle_min_ttl("pool", timedelta(seconds=60)) + assert result.sandbox_id == "id-1" + assert result.discarded_alive_sandbox_ids == () + + +@pytest.mark.asyncio +async def test_async_redis_store_reap_expired_idle_min_ttl_returns_alive_evicted( + async_redis_store: tuple[AsyncRedisPoolStateStore, Any, str], +) -> None: + store, _, _ = async_redis_store + await store.set_idle_entry_ttl("pool", timedelta(seconds=5)) + await store.put_idle("pool", "id-1") + await store.put_idle("pool", "id-2") + + discarded_alive = await store.reap_expired_idle_min_ttl( + "pool", datetime.now(timezone.utc), timedelta(seconds=60) + ) + + assert set(discarded_alive) == {"id-1", "id-2"} + assert (await store.snapshot_counters("pool")).idle_count == 0 + + @pytest.mark.asyncio async def test_async_redis_store_primary_lock_owner_semantics( async_redis_store: tuple[AsyncRedisPoolStateStore, Any, str], diff --git a/sdks/sandbox/python/tests/test_pool_config.py b/sdks/sandbox/python/tests/test_pool_config.py new file mode 100644 index 000000000..0d2ca2f09 --- /dev/null +++ b/sdks/sandbox/python/tests/test_pool_config.py @@ -0,0 +1,154 @@ +# +# Copyright 2025 Alibaba Group Holding Ltd. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# +"""Tests for ``PoolConfig`` and ``AsyncPoolConfig`` validation.""" + +from __future__ import annotations + +from datetime import timedelta + +import pytest + +from opensandbox.config.connection import ConnectionConfig +from opensandbox.config.connection_sync import ConnectionConfigSync +from opensandbox.pool import ( + AsyncPoolConfig, + InMemoryAsyncPoolStateStore, + InMemoryPoolStateStore, + PoolConfig, + PoolCreationSpec, +) + + +def _sync_kwargs() -> dict[str, object]: + return { + "pool_name": "test", + "max_idle": 1, + "state_store": InMemoryPoolStateStore(), + "connection_config": ConnectionConfigSync(), + "creation_spec": PoolCreationSpec(image="ubuntu:22.04"), + } + + +def _async_kwargs() -> dict[str, object]: + return { + "pool_name": "test", + "max_idle": 1, + "state_store": InMemoryAsyncPoolStateStore(), + "connection_config": ConnectionConfig(), + "creation_spec": PoolCreationSpec(image="ubuntu:22.04"), + } + + +def test_default_acquire_min_remaining_ttl_is_60s_for_long_idle_timeout() -> None: + # 24h idle_timeout caps at 60s. + config = PoolConfig(**_sync_kwargs()) # type: ignore[arg-type] + assert config.acquire_min_remaining_ttl == timedelta(seconds=60) + + +def test_async_default_acquire_min_remaining_ttl_is_60s_for_long_idle_timeout() -> None: + config = AsyncPoolConfig(**_async_kwargs()) # type: ignore[arg-type] + assert config.acquire_min_remaining_ttl == timedelta(seconds=60) + + +def test_default_acquire_min_remaining_ttl_scales_for_short_idle_timeout() -> None: + # idle_timeout=30s ⇒ default = min(60s, 30s/2) = 15s. Existing users with short + # idle timeouts must not get a config-time error from a hidden 60s default. + config = PoolConfig(**_sync_kwargs(), idle_timeout=timedelta(seconds=30)) # type: ignore[arg-type] + assert config.acquire_min_remaining_ttl == timedelta(seconds=15) + + +def test_negative_acquire_min_remaining_ttl_rejected() -> None: + with pytest.raises(ValueError, match="acquire_min_remaining_ttl must be non-negative"): + PoolConfig(**_sync_kwargs(), acquire_min_remaining_ttl=timedelta(seconds=-1)) # type: ignore[arg-type] + + +def test_explicit_acquire_min_remaining_ttl_at_or_above_idle_timeout_rejected() -> None: + # The auto-default protects against this, but an explicit value above idle_timeout + # still fails validation. + with pytest.raises(ValueError, match="strictly less than"): + PoolConfig( # type: ignore[arg-type] + **_sync_kwargs(), + idle_timeout=timedelta(seconds=30), + acquire_min_remaining_ttl=timedelta(seconds=30), + ) + + +def test_async_explicit_acquire_min_remaining_ttl_at_or_above_idle_timeout_rejected() -> None: + with pytest.raises(ValueError, match="strictly less than"): + AsyncPoolConfig( # type: ignore[arg-type] + **_async_kwargs(), + idle_timeout=timedelta(seconds=30), + acquire_min_remaining_ttl=timedelta(seconds=30), + ) + + +def test_acquire_min_remaining_ttl_just_below_idle_timeout_accepted() -> None: + config = PoolConfig( + **_sync_kwargs(), # type: ignore[arg-type] + idle_timeout=timedelta(seconds=10), + acquire_min_remaining_ttl=timedelta(seconds=9), + ) + assert config.acquire_min_remaining_ttl == timedelta(seconds=9) + assert config.idle_timeout == timedelta(seconds=10) + + +def test_zero_acquire_min_remaining_ttl_opts_out() -> None: + # Explicit Duration.ZERO ↔ legacy binary-expiry behavior; valid configuration. + config = PoolConfig( + **_sync_kwargs(), # type: ignore[arg-type] + acquire_min_remaining_ttl=timedelta(0), + ) + assert config.acquire_min_remaining_ttl == timedelta(0) + + +def test_sync_pool_facade_forwards_acquire_min_remaining_ttl() -> None: + """``SandboxPoolSync.__init__`` exposes the new threshold and forwards it to the config. + + Without the constructor kwarg, users with ``idle_timeout <= 60s`` hit the hidden + default and get a hard validation error with no way to override. + """ + from opensandbox.sync.pool import SandboxPoolSync + + pool = SandboxPoolSync( + pool_name="test", + max_idle=1, + state_store=InMemoryPoolStateStore(), + connection_config=ConnectionConfigSync(), + creation_spec=PoolCreationSpec(image="ubuntu:22.04"), + idle_timeout=timedelta(seconds=30), + acquire_min_remaining_ttl=timedelta(seconds=10), + ) + + assert pool._config.acquire_min_remaining_ttl == timedelta(seconds=10) + assert pool._config.idle_timeout == timedelta(seconds=30) + + +def test_async_pool_facade_forwards_acquire_min_remaining_ttl() -> None: + """``SandboxPoolAsync.__init__`` exposes the new threshold and forwards it to the config.""" + from opensandbox.pool_async import SandboxPoolAsync + + pool = SandboxPoolAsync( + pool_name="test", + max_idle=1, + state_store=InMemoryAsyncPoolStateStore(), + connection_config=ConnectionConfig(), + creation_spec=PoolCreationSpec(image="ubuntu:22.04"), + idle_timeout=timedelta(seconds=30), + acquire_min_remaining_ttl=timedelta(seconds=10), + ) + + assert pool._config.acquire_min_remaining_ttl == timedelta(seconds=10) + assert pool._config.idle_timeout == timedelta(seconds=30) diff --git a/sdks/sandbox/python/tests/test_pool_kill_discarded.py b/sdks/sandbox/python/tests/test_pool_kill_discarded.py new file mode 100644 index 000000000..23576b6f5 --- /dev/null +++ b/sdks/sandbox/python/tests/test_pool_kill_discarded.py @@ -0,0 +1,417 @@ +# +# Copyright 2025 Alibaba Group Holding Ltd. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# +"""Tests for the kill-discarded-alive path on the pool facades. + +Verifies the two contracts surfaced by review on PR #986: +- ``_kill_sandbox_best_effort`` returns ``True`` only on a confirmed kill, so the + ``logger.debug("Killed ...")`` line in ``_kill_discarded_alive`` cannot fire on failure. +- ``SandboxPoolAsync._kill_discarded_alive`` runs its per-ID kills concurrently via + ``asyncio.gather`` instead of serially blocking ``acquire()``. +""" + +from __future__ import annotations + +import asyncio +import logging +from datetime import timedelta +from typing import Any + +import pytest + +from opensandbox.config.connection import ConnectionConfig +from opensandbox.config.connection_sync import ConnectionConfigSync +from opensandbox.pool import ( + InMemoryAsyncPoolStateStore, + InMemoryPoolStateStore, + PoolCreationSpec, + PoolLifecycleState, +) +from opensandbox.pool_async import SandboxPoolAsync +from opensandbox.sync.pool import SandboxPoolSync + + +class _RecordingSyncManager: + """Sync manager fake that tracks kills and can simulate failures.""" + + def __init__(self, fail_for: set[str] | None = None) -> None: + self.killed: list[str] = [] + self._fail_for = fail_for or set() + + def kill_sandbox(self, sandbox_id: str) -> None: + self.killed.append(sandbox_id) + if sandbox_id in self._fail_for: + raise RuntimeError(f"simulated kill failure for {sandbox_id}") + + def close(self) -> None: # pragma: no cover - SandboxPoolSync may call on shutdown + return None + + +class _RecordingAsyncManager: + """Async manager fake recording call timing so we can detect serial vs parallel kills.""" + + def __init__(self, per_call_delay: float = 0.05) -> None: + self.killed: list[str] = [] + self._delay = per_call_delay + self._in_flight = 0 + self.max_in_flight = 0 + + async def kill_sandbox(self, sandbox_id: str) -> None: + self._in_flight += 1 + self.max_in_flight = max(self.max_in_flight, self._in_flight) + try: + await asyncio.sleep(self._delay) + self.killed.append(sandbox_id) + finally: + self._in_flight -= 1 + + async def close(self) -> None: # pragma: no cover + return None + + +def _build_sync_pool(manager: _RecordingSyncManager) -> SandboxPoolSync: + return SandboxPoolSync( + pool_name="kill-pool", + max_idle=0, + state_store=InMemoryPoolStateStore(), + connection_config=ConnectionConfigSync(), + creation_spec=PoolCreationSpec(image="ubuntu:22.04"), + sandbox_manager_factory=lambda config: manager, # type: ignore[arg-type,return-value] + ) + + +def _build_async_pool(manager: _RecordingAsyncManager) -> SandboxPoolAsync: + async def _factory(_: ConnectionConfig) -> _RecordingAsyncManager: + return manager + + return SandboxPoolAsync( + pool_name="kill-pool", + max_idle=0, + state_store=InMemoryAsyncPoolStateStore(), + connection_config=ConnectionConfig(), + creation_spec=PoolCreationSpec(image="ubuntu:22.04"), + sandbox_manager_factory=_factory, # type: ignore[arg-type] + ) + + +def test_sync_kill_best_effort_returns_true_on_success() -> None: + manager = _RecordingSyncManager() + pool = _build_sync_pool(manager) + pool._sandbox_manager = manager # type: ignore[assignment] + + assert pool._kill_sandbox_best_effort("sandbox-1") is True + assert manager.killed == ["sandbox-1"] + + +def test_sync_kill_best_effort_returns_false_on_failure() -> None: + manager = _RecordingSyncManager(fail_for={"sandbox-1"}) + pool = _build_sync_pool(manager) + pool._sandbox_manager = manager # type: ignore[assignment] + + assert pool._kill_sandbox_best_effort("sandbox-1") is False + # Manager was called but the failure was swallowed. + assert manager.killed == ["sandbox-1"] + + +def test_sync_kill_best_effort_returns_false_when_manager_missing() -> None: + manager = _RecordingSyncManager() + pool = _build_sync_pool(manager) + # Pool never started → _sandbox_manager is None. + assert pool._sandbox_manager is None + assert pool._kill_sandbox_best_effort("sandbox-1") is False + + +def test_sync_kill_discarded_alive_logs_only_on_success( + caplog: pytest.LogCaptureFixture, +) -> None: + manager = _RecordingSyncManager(fail_for={"sandbox-fail"}) + pool = _build_sync_pool(manager) + pool._sandbox_manager = manager # type: ignore[assignment] + + with caplog.at_level(logging.DEBUG, logger="opensandbox.sync.pool"): + pool._kill_discarded_alive( + pool_name="kill-pool", + sandbox_ids=("sandbox-ok", "sandbox-fail"), + source="acquire", + ) + + debug_messages = [ + record.message for record in caplog.records if record.levelno == logging.DEBUG + ] + assert any("sandbox-ok" in msg for msg in debug_messages), ( + f"expected debug log for successful kill, got {debug_messages}" + ) + assert not any("Killed near-expiry idle sandbox" in msg and "sandbox-fail" in msg for msg in debug_messages), ( + f"failed kill should not produce 'Killed' debug log, got {debug_messages}" + ) + + +@pytest.mark.asyncio +async def test_async_kill_discarded_alive_runs_kills_concurrently() -> None: + manager = _RecordingAsyncManager(per_call_delay=0.05) + pool = _build_async_pool(manager) + pool._sandbox_manager = manager # type: ignore[assignment] + + ids = ("a", "b", "c", "d", "e") + start = asyncio.get_event_loop().time() + await pool._kill_discarded_alive( + pool_name="kill-pool", sandbox_ids=ids, source="acquire" + ) + elapsed = asyncio.get_event_loop().time() - start + + # Serial would take ~5 * 0.05 = 0.25s; parallel should finish well below that. + assert elapsed < 0.15, ( + f"kills appear to run serially (took {elapsed:.3f}s for {len(ids)} ids); " + "expected asyncio.gather to overlap them" + ) + assert manager.max_in_flight >= 2, ( + f"expected concurrent kills, max_in_flight={manager.max_in_flight}" + ) + assert set(manager.killed) == set(ids) + + +@pytest.mark.asyncio +async def test_async_kill_best_effort_returns_false_on_failure( + caplog: pytest.LogCaptureFixture, +) -> None: + class FailingAsyncManager: + def __init__(self) -> None: + self.attempted: list[str] = [] + + async def kill_sandbox(self, sandbox_id: str) -> None: + self.attempted.append(sandbox_id) + raise RuntimeError("simulated async failure") + + async def close(self) -> None: # pragma: no cover + return None + + manager: Any = FailingAsyncManager() + pool = SandboxPoolAsync( + pool_name="kill-pool", + max_idle=0, + state_store=InMemoryAsyncPoolStateStore(), + connection_config=ConnectionConfig(), + creation_spec=PoolCreationSpec(image="ubuntu:22.04"), + sandbox_manager_factory=lambda _: _async_returns(manager), # type: ignore[arg-type] + ) + pool._sandbox_manager = manager + + with caplog.at_level(logging.DEBUG, logger="opensandbox.pool_async"): + await pool._kill_discarded_alive( + pool_name="kill-pool", sandbox_ids=("a",), source="acquire" + ) + + assert manager.attempted == ["a"] + debug_messages = [ + record.message for record in caplog.records if record.levelno == logging.DEBUG + ] + assert not any("Killed near-expiry idle sandbox" in msg for msg in debug_messages), ( + f"failed async kill should not produce 'Killed' debug log, got {debug_messages}" + ) + + +async def _async_returns(value: Any) -> Any: + return value + + +def test_sync_schedule_kill_discarded_alive_does_not_block_caller() -> None: + """``acquire`` must not pay for kill RPC time. ``_schedule_kill_discarded_alive`` + submits the cleanup to the warmup executor (or runs inline only when no executor + is available). When an executor is present, the call returns immediately. + """ + import time + from concurrent.futures import ThreadPoolExecutor + + class SlowSyncManager: + def __init__(self, delay: float) -> None: + self.killed: list[str] = [] + self._delay = delay + + def kill_sandbox(self, sandbox_id: str) -> None: + time.sleep(self._delay) + self.killed.append(sandbox_id) + + def close(self) -> None: # pragma: no cover + return None + + manager = SlowSyncManager(delay=0.1) + pool = _build_sync_pool(manager) # type: ignore[arg-type] + pool._sandbox_manager = manager # type: ignore[assignment] + pool._warmup_executor = ThreadPoolExecutor(max_workers=2) + try: + ids = ("a", "b", "c") + start = time.monotonic() + pool._schedule_kill_discarded_alive("kill-pool", ids, source="acquire") + elapsed = time.monotonic() - start + + # Should return immediately — well below the slowest single kill (0.1s). + assert elapsed < 0.05, f"_schedule_kill_discarded_alive blocked for {elapsed:.3f}s" + + # And the kills do happen, just on the executor. + deadline = time.monotonic() + 1.0 + while sorted(manager.killed) != sorted(ids) and time.monotonic() < deadline: + time.sleep(0.02) + assert sorted(manager.killed) == sorted(ids) + finally: + pool._warmup_executor.shutdown(wait=True) + + +class _RenewTrackingSyncSandbox: + """Minimal SandboxSync stand-in that records every ``renew`` call.""" + + last_instance: _RenewTrackingSyncSandbox | None = None + + def __init__(self, sandbox_id: str = "warm-1") -> None: + self.id = sandbox_id + self.renewed: list[timedelta] = [] + self.killed = False + self.closed = False + type(self).last_instance = self + + @classmethod + def create(cls, *args: Any, **kwargs: Any) -> _RenewTrackingSyncSandbox: + del args, kwargs + return cls("warm-1") + + def renew(self, timeout: timedelta) -> None: + self.renewed.append(timeout) + + def kill(self) -> None: + self.killed = True + + def close(self) -> None: + self.closed = True + + +def test_sync_create_one_sandbox_renews_before_returning_id() -> None: + """Warmup pipeline must renew the sandbox to ``idle_timeout`` after preparer runs and + before returning the id to the reconciler. Otherwise the store stamps an expiry the + server-side TTL has already partially elapsed against, and ``acquire_min_remaining_ttl`` + overestimates remaining TTL by the warmup duration. + """ + _RenewTrackingSyncSandbox.last_instance = None + + class _Manager: + def kill_sandbox(self, sandbox_id: str) -> None: # pragma: no cover + return None + + def close(self) -> None: # pragma: no cover + return None + + pool = SandboxPoolSync( + pool_name="warm-renew", + max_idle=1, + state_store=InMemoryPoolStateStore(), + connection_config=ConnectionConfigSync(), + creation_spec=PoolCreationSpec(image="ubuntu:22.04"), + idle_timeout=timedelta(minutes=5), + sandbox_manager_factory=lambda _: _Manager(), # type: ignore[arg-type,return-value] + sandbox_factory=_RenewTrackingSyncSandbox, # type: ignore[arg-type] + ) + + # _create_one_sandbox short-circuits unless the pool is running. + pool._lifecycle_state = PoolLifecycleState.RUNNING + sandbox_id = pool._create_one_sandbox() + assert sandbox_id == "warm-1" + assert _RenewTrackingSyncSandbox.last_instance is not None + assert _RenewTrackingSyncSandbox.last_instance.renewed == [timedelta(minutes=5)], ( + "expected renew([5min]) before putIdle, got " + f"{_RenewTrackingSyncSandbox.last_instance.renewed}" + ) + + +class _RenewTrackingAsyncSandbox: + last_instance: _RenewTrackingAsyncSandbox | None = None + + def __init__(self, sandbox_id: str = "warm-1") -> None: + self.id = sandbox_id + self.renewed: list[timedelta] = [] + self.killed = False + self.closed = False + type(self).last_instance = self + + @classmethod + async def create(cls, *args: Any, **kwargs: Any) -> _RenewTrackingAsyncSandbox: + del args, kwargs + return cls("warm-1") + + async def renew(self, timeout: timedelta) -> None: + self.renewed.append(timeout) + + async def kill(self) -> None: + self.killed = True + + async def close(self) -> None: + self.closed = True + + +@pytest.mark.asyncio +async def test_async_create_one_sandbox_renews_before_returning_id() -> None: + _RenewTrackingAsyncSandbox.last_instance = None + + class _AsyncManager: + async def kill_sandbox(self, sandbox_id: str) -> None: # pragma: no cover + return None + + async def close(self) -> None: # pragma: no cover + return None + + async def _factory(_: Any) -> _AsyncManager: + return _AsyncManager() + + pool = SandboxPoolAsync( + pool_name="warm-renew", + max_idle=1, + state_store=InMemoryAsyncPoolStateStore(), + connection_config=ConnectionConfig(), + creation_spec=PoolCreationSpec(image="ubuntu:22.04"), + idle_timeout=timedelta(minutes=5), + sandbox_manager_factory=_factory, # type: ignore[arg-type] + sandbox_factory=_RenewTrackingAsyncSandbox, # type: ignore[arg-type] + ) + + pool._lifecycle_state = PoolLifecycleState.RUNNING + sandbox_id = await pool._create_one_sandbox() + assert sandbox_id == "warm-1" + assert _RenewTrackingAsyncSandbox.last_instance is not None + assert _RenewTrackingAsyncSandbox.last_instance.renewed == [timedelta(minutes=5)], ( + "expected renew([5min]) before putIdle, got " + f"{_RenewTrackingAsyncSandbox.last_instance.renewed}" + ) + + +@pytest.mark.asyncio +async def test_async_schedule_kill_discarded_alive_does_not_block_caller() -> None: + """Async counterpart: scheduling must return synchronously without awaiting kills.""" + manager = _RecordingAsyncManager(per_call_delay=0.1) + pool = _build_async_pool(manager) + pool._sandbox_manager = manager # type: ignore[assignment] + + ids = ("a", "b", "c") + start = asyncio.get_event_loop().time() + pool._schedule_kill_discarded_alive("kill-pool", ids, source="acquire") + elapsed = asyncio.get_event_loop().time() - start + + # _schedule is sync (asyncio.create_task), so it should return in microseconds. + assert elapsed < 0.01, ( + f"_schedule_kill_discarded_alive blocked for {elapsed:.3f}s; " + "expected immediate return via create_task" + ) + + # Wait for the background tasks to complete. + deadline = asyncio.get_event_loop().time() + 1.0 + while sorted(manager.killed) != sorted(ids) and asyncio.get_event_loop().time() < deadline: + await asyncio.sleep(0.02) + assert sorted(manager.killed) == sorted(ids) diff --git a/sdks/sandbox/python/tests/test_pool_store.py b/sdks/sandbox/python/tests/test_pool_store.py index 810cc9f67..6ab020442 100644 --- a/sdks/sandbox/python/tests/test_pool_store.py +++ b/sdks/sandbox/python/tests/test_pool_store.py @@ -36,6 +36,105 @@ def test_in_memory_store_reaps_expired_idle() -> None: assert store.snapshot_counters("pool").idle_count == 0 +def test_in_memory_store_try_take_idle_min_ttl_surfaces_alive_below_threshold() -> None: + store = InMemoryPoolStateStore() + # Entries get a 5s TTL — still alive (server-side TTL has not elapsed) but below 60s. + store.set_idle_entry_ttl("pool", timedelta(seconds=5)) + store.put_idle("pool", "sandbox-1") + store.put_idle("pool", "sandbox-2") + + result = store.try_take_idle_min_ttl("pool", timedelta(seconds=60)) + assert result.sandbox_id is None + assert set(result.discarded_alive_sandbox_ids) == {"sandbox-1", "sandbox-2"} + assert store.snapshot_counters("pool").idle_count == 0 + + +def test_in_memory_store_try_take_idle_min_ttl_silently_drops_expired() -> None: + """Already-expired entries are dropped without surfacing — the server has reaped them.""" + import time + + store = InMemoryPoolStateStore() + store.set_idle_entry_ttl("pool", timedelta(milliseconds=1)) + store.put_idle("pool", "expired") + time.sleep(0.02) + store.set_idle_entry_ttl("pool", timedelta(minutes=10)) + store.put_idle("pool", "alive") + + result = store.try_take_idle_min_ttl("pool", timedelta(seconds=60)) + assert result.sandbox_id == "alive" + assert result.discarded_alive_sandbox_ids == () + + +def test_in_memory_store_try_take_idle_min_ttl_returns_entries_above_threshold() -> None: + store = InMemoryPoolStateStore() + store.set_idle_entry_ttl("pool", timedelta(minutes=10)) + store.put_idle("pool", "sandbox-1") + + result = store.try_take_idle_min_ttl("pool", timedelta(seconds=60)) + assert result.sandbox_id == "sandbox-1" + assert result.discarded_alive_sandbox_ids == () + + +def test_in_memory_store_try_take_idle_min_ttl_zero_falls_back_to_base() -> None: + store = InMemoryPoolStateStore() + store.put_idle("pool", "sandbox-1") + + first = store.try_take_idle_min_ttl("pool", timedelta(0)) + assert first.sandbox_id == "sandbox-1" + assert first.discarded_alive_sandbox_ids == () + + store.put_idle("pool", "sandbox-2") + second = store.try_take_idle_min_ttl("pool", timedelta(seconds=-1)) + assert second.sandbox_id == "sandbox-2" + assert second.discarded_alive_sandbox_ids == () + + +def test_in_memory_store_reap_expired_idle_min_ttl_returns_alive_evicted() -> None: + store = InMemoryPoolStateStore() + store.set_idle_entry_ttl("pool", timedelta(seconds=5)) + store.put_idle("pool", "sandbox-1") + store.put_idle("pool", "sandbox-2") + + discarded_alive = store.reap_expired_idle_min_ttl( + "pool", datetime.now(timezone.utc), timedelta(seconds=60) + ) + + assert set(discarded_alive) == {"sandbox-1", "sandbox-2"} + assert store.snapshot_counters("pool").idle_count == 0 + + +def test_in_memory_store_reap_expired_idle_min_ttl_keeps_above_threshold() -> None: + store = InMemoryPoolStateStore() + store.set_idle_entry_ttl("pool", timedelta(minutes=10)) + store.put_idle("pool", "sandbox-1") + + discarded_alive = store.reap_expired_idle_min_ttl( + "pool", datetime.now(timezone.utc), timedelta(seconds=60) + ) + + assert discarded_alive == () + assert store.snapshot_counters("pool").idle_count == 1 + + +def test_in_memory_store_reap_expired_idle_min_ttl_excludes_already_expired() -> None: + """Already-expired entries are evicted but not returned — server has reaped them.""" + import time + + store = InMemoryPoolStateStore() + store.set_idle_entry_ttl("pool", timedelta(milliseconds=1)) + store.put_idle("pool", "expired") + time.sleep(0.02) + store.set_idle_entry_ttl("pool", timedelta(seconds=5)) + store.put_idle("pool", "alive") + + discarded_alive = store.reap_expired_idle_min_ttl( + "pool", datetime.now(timezone.utc), timedelta(seconds=60) + ) + + assert discarded_alive == ("alive",) + assert store.snapshot_counters("pool").idle_count == 0 + + def test_in_memory_store_concurrent_take_is_unique() -> None: store = InMemoryPoolStateStore() for i in range(100): diff --git a/sdks/sandbox/python/tests/test_redis_pool_store.py b/sdks/sandbox/python/tests/test_redis_pool_store.py index 4e14fd8bc..53f71a9b5 100644 --- a/sdks/sandbox/python/tests/test_redis_pool_store.py +++ b/sdks/sandbox/python/tests/test_redis_pool_store.py @@ -64,6 +64,63 @@ def test_redis_store_reap_expired_idle( assert store.try_take_idle("pool") is None +def test_redis_store_try_take_idle_min_ttl_surfaces_alive_below_threshold( + redis_store: tuple[RedisPoolStateStore, Any, str], +) -> None: + store, _, _ = redis_store + store.set_idle_entry_ttl("pool", timedelta(seconds=5)) + store.put_idle("pool", "id-1") + store.put_idle("pool", "id-2") + + result = store.try_take_idle_min_ttl("pool", timedelta(seconds=60)) + assert result.sandbox_id is None + assert set(result.discarded_alive_sandbox_ids) == {"id-1", "id-2"} + assert store.snapshot_counters("pool").idle_count == 0 + + +def test_redis_store_try_take_idle_min_ttl_returns_entries_above_threshold( + redis_store: tuple[RedisPoolStateStore, Any, str], +) -> None: + store, _, _ = redis_store + store.set_idle_entry_ttl("pool", timedelta(minutes=10)) + store.put_idle("pool", "id-1") + + result = store.try_take_idle_min_ttl("pool", timedelta(seconds=60)) + assert result.sandbox_id == "id-1" + assert result.discarded_alive_sandbox_ids == () + + +def test_redis_store_try_take_idle_min_ttl_zero_falls_back_to_base( + redis_store: tuple[RedisPoolStateStore, Any, str], +) -> None: + store, _, _ = redis_store + store.put_idle("pool", "id-1") + + taken = store.try_take_idle_min_ttl("pool", timedelta(0)) + assert taken.sandbox_id == "id-1" + assert taken.discarded_alive_sandbox_ids == () + + empty = store.try_take_idle_min_ttl("pool", timedelta(0)) + assert empty.sandbox_id is None + assert empty.discarded_alive_sandbox_ids == () + + +def test_redis_store_reap_expired_idle_min_ttl_returns_alive_evicted( + redis_store: tuple[RedisPoolStateStore, Any, str], +) -> None: + store, _, _ = redis_store + store.set_idle_entry_ttl("pool", timedelta(seconds=5)) + store.put_idle("pool", "id-1") + store.put_idle("pool", "id-2") + + discarded_alive = store.reap_expired_idle_min_ttl( + "pool", datetime.now(timezone.utc), timedelta(seconds=60) + ) + + assert set(discarded_alive) == {"id-1", "id-2"} + assert store.snapshot_counters("pool").idle_count == 0 + + def test_redis_store_primary_lock_owner_semantics( redis_store: tuple[RedisPoolStateStore, Any, str], ) -> None: @@ -186,17 +243,18 @@ def __init__(self) -> None: self._lists: dict[str, list[str]] = {} self._hashes: dict[str, dict[str, str]] = {} - def eval(self, script: str, numkeys: int, *args: str) -> str | int | None: + def eval(self, script: str, numkeys: int, *args: Any) -> Any: del numkeys if "LPOP" in script: - return self._take_idle(args[0], args[1]) + min_remaining_ttl_ms = int(args[2]) if len(args) >= 3 else 0 + return self._take_idle(args[0], args[1], min_remaining_ttl_ms) if "RPUSH" in script: return self._put_idle(args[0], args[1], args[2], int(args[3])) if "PEXPIRE" in script: return int(self._renew_lock(args[0], args[1], int(args[2]))) if "HGETALL" in script: - self._reap_expired(args[0], args[1]) - return 1 + min_remaining_ttl_ms = int(args[2]) if len(args) >= 3 else 0 + return self._reap_expired(args[0], args[1], min_remaining_ttl_ms) if "DEL" in script and "GET" in script: return self._release_lock(args[0], args[1]) raise NotImplementedError("unsupported Redis script") @@ -245,16 +303,27 @@ def lrange(self, key: str, start: int, stop: int) -> list[str]: def hgetall(self, key: str) -> dict[str, str]: return dict(self._hashes.get(key, {})) - def _take_idle(self, list_key: str, expires_key: str) -> str | None: + def _take_idle( + self, list_key: str, expires_key: str, min_remaining_ttl_ms: int = 0 + ) -> Any: queue = self._lists.setdefault(list_key, []) expires = self._hashes.setdefault(expires_key, {}) - now = _now_ms() + now_ms = _now_ms() + cutoff = now_ms + max(0, min_remaining_ttl_ms) + discarded_alive: list[str] = [] while queue: sandbox_id = queue.pop(0) expires_at = expires.pop(sandbox_id, None) - if expires_at is not None and int(expires_at) > now: - return sandbox_id - return None + if expires_at is None: + continue + exp = int(expires_at) + if exp > cutoff: + return [sandbox_id, discarded_alive] + if exp > now_ms: + discarded_alive.append(sandbox_id) + if not discarded_alive: + return None + return ["", discarded_alive] def _put_idle( self, @@ -285,17 +354,26 @@ def _release_lock(self, key: str, owner_id: str) -> int: self._strings.pop(key, None) return 1 - def _reap_expired(self, list_key: str, expires_key: str) -> None: + def _reap_expired( + self, list_key: str, expires_key: str, min_remaining_ttl_ms: int = 0 + ) -> list[str]: expires = self._hashes.setdefault(expires_key, {}) - expired = {sandbox_id for sandbox_id, expiry in expires.items() if int(expiry) <= _now_ms()} - for sandbox_id in expired: - expires.pop(sandbox_id, None) - if expired: + now_ms = _now_ms() + cutoff = now_ms + max(0, min_remaining_ttl_ms) + evicted: list[tuple[str, int]] = [] + for sandbox_id, expiry in list(expires.items()): + exp = int(expiry) + if exp <= cutoff: + evicted.append((sandbox_id, exp)) + expires.pop(sandbox_id, None) + if evicted: + evicted_ids = {sandbox_id for sandbox_id, _ in evicted} self._lists[list_key] = [ sandbox_id for sandbox_id in self._lists.get(list_key, []) - if sandbox_id not in expired + if sandbox_id not in evicted_ids ] + return [sandbox_id for sandbox_id, exp in evicted if exp > now_ms] def _expire_string_if_needed(self, key: str) -> None: value = self._strings.get(key)