Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
129 changes: 129 additions & 0 deletions index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -434,6 +434,35 @@ const STATUS_ZOMBIE_MS = 60 * 60_000
const STATUS_PATH = process.env.TAAS_AFFINITY_RUNS_STATUS_PATH
|| path.join(os.homedir(), ".openclaw", "alien-studio", "runs-status.json")

// ── Zombie auto-abort configuration ───────────────────────────────────────────
// ⚠️ PLUGIN BLOCKED: No plugin-side dispatch for chat.abort exists in the OpenClaw
// plugin SDK. The default abortRun function logs a warning and is a no-op.
// When the SDK adds api.runtime.chat.abort() or dispatchGatewayMethod(),
// replace the default abortRun in register() below.
// Set TAAS_AFFINITY_AUTO_ABORT_ZOMBIES=true to enable (currently opt-in).
const AUTO_ABORT_ENABLED = process.env.TAAS_AFFINITY_AUTO_ABORT_ZOMBIES === "true"
const AUTO_ABORT_DRY_RUN = process.env.TAAS_AFFINITY_AUTO_ABORT_DRY_RUN === "true"
const AUTO_ABORT_THRESHOLD_MS = Number(process.env.TAAS_AFFINITY_AUTO_ABORT_THRESHOLD_MS) || STATUS_ZOMBIE_MS
const AUTO_ABORT_CHECK_INTERVAL_MS = Number(process.env.TAAS_AFFINITY_AUTO_ABORT_CHECK_INTERVAL_MS) || 60_000
const AUTO_ABORT_ABORTED_SET_MAX = 1000

/** In-memory set of sessionKeys already aborted in this process (LRU-bounded). */
const abortedInThisProcess: string[] = []

/**
* Injected abort function. Default logs a warning because the plugin SDK does not
* expose a way to dispatch chat.abort from inside a plugin. Replace via
* setAbortRunFn() in tests or when the SDK adds the capability.
*/
let abortRun: (sessionKey: string) => Promise<void> = async (sessionKey: string) => {
console.warn("[taas-affinity] auto-abort: chat.abort not available via plugin SDK; sessionKey=" + sessionKey)
}

/** Replace the abort function (for testing or future SDK integration). */
function setAbortRunFn(fn: (sessionKey: string) => Promise<void>): void {
abortRun = fn
}

function stopRequesterBridgePoller(leaseId?: string): void {
if (!leaseId) return
const state = activePollers.get(leaseId)
Expand Down Expand Up @@ -894,6 +923,94 @@ function writeRunStatus(agentsDir?: string, statusPath?: string): void {
}
}

// ── Zombie auto-abort ──────────────────────────────────────────────────────────
let abortCheckInProgress = false

function runAbortCheck(agentsDir?: string): void {
if (abortCheckInProgress) return
abortCheckInProgress = true
try {
const base = agentsDir || path.join(os.homedir(), ".openclaw", "agents")
let agents: string[]
try { agents = fs.readdirSync(base) } catch { agents = [] }

const zombies: Array<{ sessionKey: string; idleMs: number }> = []

for (const agentId of agents) {
const sessionsDir = path.join(base, agentId, "sessions")
let entries: string[]
try { entries = fs.readdirSync(sessionsDir) } catch { continue }

let mainKey: string | null = null
try {
const idJson = fs.readFileSync(path.join(base, agentId, "agent", "identity.json"), "utf8")
const parsed = JSON.parse(idJson)
if (typeof parsed.mainKey === "string") mainKey = parsed.mainKey
} catch { /* no identity file */ }

let mainUuid: string | null = null
if (mainKey) {
const parts = mainKey.split(":")
const lastPart = parts[parts.length - 1]
if (lastPart !== "main" && lastPart.length >= 8) {
mainUuid = lastPart
}
}

for (const name of entries) {
if (!name.endsWith(".jsonl.lock")) continue
const sessionUuid = name.replace(/\.jsonl\.lock$/, "")
const fp = path.join(sessionsDir, name)
let st: fs.Stats
try { st = fs.statSync(fp) } catch { continue }

const idleMs = Date.now() - st.mtimeMs
if (idleMs < AUTO_ABORT_THRESHOLD_MS) continue

let sessionKey: string
if (mainKey && sessionUuid === mainUuid) {
sessionKey = "agent:" + agentId + ":main"
} else {
sessionKey = "agent:" + agentId + ":session:" + sessionUuid
}

zombies.push({ sessionKey, idleMs })
}
}

// When auto-abort is disabled (default), just log candidates
if (!AUTO_ABORT_ENABLED) {
if (zombies.length > 0) {
console.info("[taas-affinity] zombie candidates (auto-abort disabled): " + zombies.length + " runs: " + zombies.map(z => z.sessionKey).join(", "))
}
return
}

// Auto-abort is enabled — abort each zombie not already aborted in this process
for (const z of zombies) {
if (abortedInThisProcess.includes(z.sessionKey)) continue
// Cap the set at AUTO_ABORT_ABORTED_SET_MAX (FIFO eviction)
if (abortedInThisProcess.length >= AUTO_ABORT_ABORTED_SET_MAX) {
abortedInThisProcess.shift()
}
abortedInThisProcess.push(z.sessionKey)

if (AUTO_ABORT_DRY_RUN) {
console.info("[taas-affinity] auto-abort DRY RUN: would abort sessionKey=" + z.sessionKey + " idleMs=" + z.idleMs)
} else {
console.warn("[taas-affinity] auto-aborting zombie run sessionKey=" + z.sessionKey + " idleMs=" + z.idleMs)
abortRun(z.sessionKey).catch((e: unknown) => {
console.warn("[taas-affinity] auto-abort failed sessionKey=" + z.sessionKey, e)
})
}
}
} catch (e) {
console.warn("[taas-affinity] abort-check error", e)
} finally {
abortCheckInProgress = false
}
}

// ── Background task scheduler ────────────────────────────────────────────────
const backgroundTimers: (NodeJS.Timeout)[] = []

Expand All @@ -912,6 +1029,13 @@ function startBackgroundTasks(): void {
backgroundTimers.push(setInterval(() => writeRunStatus(), STATUS_INTERVAL_MS))
}, 5_000)
backgroundTimers.push(statusInit)

// Zombie auto-abort — 10s initial delay, then every AUTO_ABORT_CHECK_INTERVAL_MS
const abortInit = setTimeout(() => {
runAbortCheck()
backgroundTimers.push(setInterval(() => runAbortCheck(), AUTO_ABORT_CHECK_INTERVAL_MS))
}, 10_000)
backgroundTimers.push(abortInit)
}

const LAST_ROUTE_LIMIT = 256
Expand Down Expand Up @@ -1161,7 +1285,12 @@ export default {
_testExports: {
runTrashSweep,
writeRunStatus,
runAbortCheck,
setAbortRunFn,
resetSweepInProgress: () => { sweepInProgress = false },
resetStatusInProgress: () => { statusInProgress = false },
resetAbortCheckInProgress: () => { abortCheckInProgress = false },
clearAbortedInThisProcess: () => { abortedInThisProcess.length = 0 },
getAbortedInThisProcess: () => [...abortedInThisProcess],
},
}
229 changes: 229 additions & 0 deletions test/auto-abort.test.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,229 @@
import assert from "node:assert/strict"
import fs from "node:fs"
import os from "node:os"
import path from "node:path"
import { test } from "node:test"

// Helper: dynamic import with cache bust
async function loadPlugin(env: Record<string, string | undefined> = {}) {
const oldEnv: Record<string, string | undefined> = {}
for (const [key, value] of Object.entries(env)) {
oldEnv[key] = process.env[key]
if (value === undefined) delete process.env[key]
else process.env[key] = value
}
const mod = await import(`../index.ts?bust=${Date.now()}-${Math.random()}`)
for (const [key, value] of Object.entries(oldEnv)) {
if (value === undefined) delete process.env[key]
else process.env[key] = value
}
return mod
}

// Helper: create a temp agents dir with zombie lock files
function createTempAgentsDir(structure: Array<{
agentId: string
sessions: Array<{ uuid: string; mtimeAgeMs: number; pid?: number }>
identityJson?: Record<string, unknown>
}>) {
const base = fs.mkdtempSync(path.join(os.tmpdir(), "taas-abort-test-"))
for (const { agentId, sessions = [], identityJson } of structure) {
const sessDir = path.join(base, agentId, "sessions")
fs.mkdirSync(sessDir, { recursive: true })
for (const s of sessions) {
const lockPath = path.join(sessDir, s.uuid + ".jsonl.lock")
fs.writeFileSync(lockPath, s.pid != null ? String(s.pid) : "12345")
const newMtime = new Date(Date.now() - s.mtimeAgeMs)
fs.utimesSync(lockPath, newMtime, newMtime)
}
if (identityJson) {
const agentDir = path.join(base, agentId, "agent")
fs.mkdirSync(agentDir, { recursive: true })
fs.writeFileSync(path.join(agentDir, "identity.json"), JSON.stringify(identityJson))
}
}
return base
}

// Collect console output
function collectConsole() {
const infoLogs: string[] = []
const warnLogs: string[] = []
const origInfo = console.info
const origWarn = console.warn
console.info = (...args: unknown[]) => { infoLogs.push(args.map(String).join(" ")) }
console.warn = (...args: unknown[]) => { warnLogs.push(args.map(String).join(" ")) }
return {
infoLogs,
warnLogs,
restore() {
console.info = origInfo
console.warn = origWarn
},
}
}

// AC-AUTO-ABORT.1: With auto-abort enabled, classify one zombie session, mock the abort callback, assert it was called with the right sessionKey.
test("AC-AUTO-ABORT.1: auto-abort enabled calls abortRun for zombie session", async () => {
const mod = await loadPlugin({ TAAS_AFFINITY_AUTO_ABORT_ZOMBIES: "true" })
const { runAbortCheck, setAbortRunFn, resetAbortCheckInProgress, clearAbortedInThisProcess } = mod.default._testExports

clearAbortedInThisProcess()
resetAbortCheckInProgress()

const aborted: Array<{ sessionKey: string }> = []
setAbortRunFn(async (sessionKey: string) => {
aborted.push({ sessionKey })
})

// Create a zombie: lock file older than 60 min
const agentsDir = createTempAgentsDir([{
agentId: "test-agent",
sessions: [{ uuid: "abc12345-6789-def0-1234-567890abcdef", mtimeAgeMs: 65 * 60 * 1000 }],
}])

const logs = collectConsole()
try {
runAbortCheck(agentsDir)
} finally {
logs.restore()
}

assert.ok(aborted.length === 1, `Expected 1 abort, got ${aborted.length}`)
assert.match(aborted[0].sessionKey, /^agent:test-agent:session:abc12345/)
})

// AC-AUTO-ABORT.2: With auto-abort disabled (default), classify zombies, assert the abort callback was NOT called and one log line listing candidates.
test("AC-AUTO-ABORT.2: auto-abort disabled logs candidates without aborting", async () => {
const mod = await loadPlugin({ TAAS_AFFINITY_AUTO_ABORT_ZOMBIES: undefined })
const { runAbortCheck, setAbortRunFn, resetAbortCheckInProgress, clearAbortedInThisProcess } = mod.default._testExports

clearAbortedInThisProcess()
resetAbortCheckInProgress()

const aborted: string[] = []
setAbortRunFn(async (sessionKey: string) => {
aborted.push(sessionKey)
})

// Create two zombie lock files
const agentsDir = createTempAgentsDir([{
agentId: "main",
sessions: [
{ uuid: "deadbeef-0000-0000-0000-000000000001", mtimeAgeMs: 70 * 60 * 1000 },
{ uuid: "deadbeef-0000-0000-0000-000000000002", mtimeAgeMs: 65 * 60 * 1000 },
],
}])

const logs = collectConsole()
try {
runAbortCheck(agentsDir)
} finally {
logs.restore()
}

assert.equal(aborted.length, 0, "abortRun should NOT be called when auto-abort is disabled")
// Check that we logged the candidates
const candidateLog = logs.infoLogs.find(l => l.includes("zombie candidates (auto-abort disabled)"))
assert.ok(candidateLog, `Expected candidate log, got: ${JSON.stringify(logs.infoLogs)}`)
assert.ok(candidateLog!.includes("2 runs"), `Expected '2 runs' in log, got: ${candidateLog}`)
})

// AC-AUTO-ABORT.3: Same zombie classified twice in successive ticks → abort callback called ONLY ONCE (idempotence via the in-memory set).
test("AC-AUTO-ABORT.3: idempotent — same zombie only aborted once across two ticks", async () => {
const mod = await loadPlugin({ TAAS_AFFINITY_AUTO_ABORT_ZOMBIES: "true" })
const { runAbortCheck, setAbortRunFn, resetAbortCheckInProgress, clearAbortedInThisProcess } = mod.default._testExports

clearAbortedInThisProcess()
resetAbortCheckInProgress()

const aborted: string[] = []
setAbortRunFn(async (sessionKey: string) => {
aborted.push(sessionKey)
})

const agentsDir = createTempAgentsDir([{
agentId: "test-agent",
sessions: [{ uuid: "idempotent-0000-0000-000000000001", mtimeAgeMs: 66 * 60 * 1000 }],
}])

// First tick
resetAbortCheckInProgress()
runAbortCheck(agentsDir)
assert.equal(aborted.length, 1, "First tick should abort once")

// Second tick — same zombie is still there
resetAbortCheckInProgress()
runAbortCheck(agentsDir)
assert.equal(aborted.length, 1, "Second tick should NOT abort again — idempotent")
})

// AC-AUTO-ABORT.4: 1001 distinct zombies all aborted → the in-memory set caps at 1000 entries.
test("AC-AUTO-ABORT.4: aborted set caps at 1000 entries (LRU eviction)", async () => {
const mod = await loadPlugin({ TAAS_AFFINITY_AUTO_ABORT_ZOMBIES: "true" })
const { runAbortCheck, setAbortRunFn, resetAbortCheckInProgress, clearAbortedInThisProcess, getAbortedInThisProcess } = mod.default._testExports

clearAbortedInThisProcess()
resetAbortCheckInProgress()

const aborted: string[] = []
setAbortRunFn(async (sessionKey: string) => {
aborted.push(sessionKey)
})

// Create 1100 zombie lock files (more than the 1000 cap)
const sessions = []
for (let i = 0; i < 1100; i++) {
sessions.push({
uuid: `zombie-${String(i).padStart(8, "0")}-0000-000000000000`,
mtimeAgeMs: (61 + (i % 10)) * 60 * 1000, // 61-70 min idle
})
}
const agentsDir = createTempAgentsDir([{
agentId: "load-agent",
sessions,
}])

resetAbortCheckInProgress()
runAbortCheck(agentsDir)

// All 1100 should have been passed to abortRun (they're new each time)
assert.equal(aborted.length, 1100, `Expected 1100 aborted calls, got ${aborted.length}`)

// The in-memory set should be capped at 1000
const set = getAbortedInThisProcess()
assert.ok(set.length <= 1000 && set.length > 0, `Expected set <= 1000, got ${set.length}`)
})

// Cleanup temp dirs (node:test handles most, but ensure)
test("AC-AUTO-ABORT.5: dry-run mode logs but does not call abortRun", async () => {
const mod = await loadPlugin({
TAAS_AFFINITY_AUTO_ABORT_ZOMBIES: "true",
TAAS_AFFINITY_AUTO_ABORT_DRY_RUN: "true",
})
const { runAbortCheck, setAbortRunFn, resetAbortCheckInProgress, clearAbortedInThisProcess } = mod.default._testExports

clearAbortedInThisProcess()
resetAbortCheckInProgress()

const aborted: string[] = []
setAbortRunFn(async (sessionKey: string) => {
aborted.push(sessionKey)
})

const agentsDir = createTempAgentsDir([{
agentId: "dry-agent",
sessions: [{ uuid: "dryrun-test-0000-0000-000000000001", mtimeAgeMs: 65 * 60 * 1000 }],
}])

const logs = collectConsole()
try {
runAbortCheck(agentsDir)
} finally {
logs.restore()
}

assert.equal(aborted.length, 0, "dry-run should NOT call abortRun")
const dryLog = logs.infoLogs.find(l => l.includes("auto-abort DRY RUN"))
assert.ok(dryLog, `Expected DRY RUN log, got: ${JSON.stringify(logs.infoLogs)}`)
})
Loading