Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
288 changes: 288 additions & 0 deletions index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -418,6 +418,21 @@ async function postBridgeResult(resultsUrl: string, descriptor: Record<string, u
}

const DEFAULT_POLL_WAIT_MS = 25_000
// ── Trash sweeper configuration ──────────────────────────────────────────────
const SWEEP_INTERVAL_MS = Number(process.env.TAAS_AFFINITY_SWEEP_INTERVAL_MS) || 3_600_000
const SWEEP_STALE_DAYS = Number(process.env.TAAS_AFFINITY_SWEEP_STALE_DAYS) || 7
const SWEEP_TRAJECTORY_MAX_MB = Number(process.env.TAAS_AFFINITY_SWEEP_TRAJECTORY_MAX_MB) || 50
const SWEEP_TRAJECTORY_KEEP_MB = Number(process.env.TAAS_AFFINITY_SWEEP_TRAJECTORY_KEEP_MB) || 10
const SWEEP_LOCK_ORPHAN_MIN = Number(process.env.TAAS_AFFINITY_SWEEP_LOCK_ORPHAN_MIN) || 60
const SWEEP_CHECKPOINT_KEEP = Number(process.env.TAAS_AFFINITY_SWEEP_CHECKPOINT_KEEP) || 3

// ── Stuck-run status writer configuration ────────────────────────────────────
const STATUS_INTERVAL_MS = 30_000
const STATUS_WARN_MS = 5 * 60_000
const STATUS_STUCK_MS = 15 * 60_000
const STATUS_ZOMBIE_MS = 60 * 60_000
const STATUS_PATH = process.env.TAAS_AFFINITY_RUNS_STATUS_PATH
|| path.join(os.homedir(), ".openclaw", "alien-studio", "runs-status.json")

function stopRequesterBridgePoller(leaseId?: string): void {
if (!leaseId) return
Expand Down Expand Up @@ -636,6 +651,269 @@ type AutorouterCapture = {
routedContextWindow: number | null
}


// ── Trash sweeper ────────────────────────────────────────────────────────────
let sweepInProgress = false

function runTrashSweep(agentsDir?: string): void {
if (sweepInProgress) return
sweepInProgress = true
const t0 = Date.now()
let deleted = 0
let truncated = 0
let orphanedLocks = 0
try {
const base = agentsDir || path.join(os.homedir(), ".openclaw", "agents")
let agents: string[]
try { agents = fs.readdirSync(base) } catch { sweepInProgress = false; return }
const staleMs = SWEEP_STALE_DAYS * 24 * 60 * 60 * 1000
const trajectoryMaxBytes = SWEEP_TRAJECTORY_MAX_MB * 1024 * 1024
const trajectoryKeepBytes = SWEEP_TRAJECTORY_KEEP_MB * 1024 * 1024
const lockOrphanMs = SWEEP_LOCK_ORPHAN_MIN * 60 * 1000

for (const agentId of agents) {
const sessionsDir = path.join(base, agentId, "sessions")
let entries: string[]
try { entries = fs.readdirSync(sessionsDir) } catch { continue }

// Group checkpoint files by base sessionId
const checkpointMap = new Map<string, string[]>()
for (const name of entries) {
const m = name.match(/^(.+?)\.checkpoint\.\d+\.jsonl$/)
if (m) {
const arr = checkpointMap.get(m[1]) || []
arr.push(name)
checkpointMap.set(m[1], arr)
}
}

for (const name of entries) {
const fp = path.join(sessionsDir, name)
let st: fs.Stats
try { st = fs.statSync(fp) } catch { continue }
const age = Date.now() - st.mtimeMs

// .deleted.* and .reset.* files
if (name.match(/\.deleted\.\d+\.jsonl(\.lock)?$/) || name.match(/\.reset\.\d+\.jsonl(\.lock)?$/)) {
if (age > staleMs) {
try { fs.unlinkSync(fp); deleted++ } catch (e) { console.warn("[taas-affinity] trash sweep: failed to delete", fp, e) }
}
continue
}

// .checkpoint files — handled in batch below
if (name.match(/\.checkpoint\.\d+\.jsonl$/)) continue

// .trajectory.jsonl oversized
if (name.endsWith(".trajectory.jsonl") && st.size > trajectoryMaxBytes) {
try {
const fh = fs.openSync(fp, "r")
const keepFrom = Math.max(0, st.size - trajectoryKeepBytes)
const buf = Buffer.alloc(st.size - keepFrom)
fs.readSync(fh, buf, 0, buf.length, keepFrom)
fs.closeSync(fh)
// Find first newline to avoid partial line
let nlIdx = buf.indexOf(10) // \n
const dataBuf = nlIdx >= 0 ? buf.slice(nlIdx + 1) : buf
// Backup original
const bakPath = fp + ".pre-truncate-" + Date.now() + ".bak"
fs.renameSync(fp, bakPath)
fs.writeFileSync(fp, dataBuf)
truncated++
} catch (e) { console.warn("[taas-affinity] trash sweep: failed to truncate", fp, e) }
continue
}

// .lock files — orphan detection
if (name.endsWith(".jsonl.lock")) {
if (age > lockOrphanMs) {
try {
const content = fs.readFileSync(fp, "utf8").trim()
const pidMatch = content.match(/^\d+/)
if (pidMatch) {
const pid = parseInt(pidMatch[0], 10)
if (!fs.existsSync("/proc/" + pid)) {
fs.unlinkSync(fp); orphanedLocks++
}
} else {
fs.unlinkSync(fp); orphanedLocks++
}
} catch (e) { console.warn("[taas-affinity] trash sweep: failed to process lock", fp, e) }
}
continue
}
}

// Prune excess checkpoints per session
for (const [baseSessionId, files] of checkpointMap) {
const sorted = files
.map(f => ({ f, mtime: fs.statSync(path.join(sessionsDir, f)).mtimeMs }))
.sort((a, b) => b.mtime - a.mtime)
// Remove beyond KEEP limit
for (let i = SWEEP_CHECKPOINT_KEEP; i < sorted.length; i++) {
const fp = path.join(sessionsDir, sorted[i].f)
try { fs.unlinkSync(fp); deleted++ } catch (e) { console.warn("[taas-affinity] trash sweep: failed to delete checkpoint", fp, e) }
}
// Remove any stale checkpoints
for (const entry of sorted) {
const age2 = Date.now() - entry.mtime
if (age2 > staleMs) {
const fp = path.join(sessionsDir, entry.f)
try { fs.unlinkSync(fp); deleted++ } catch (e) { console.warn("[taas-affinity] trash sweep: failed to delete stale checkpoint", fp, e) }
}
}
}
}
} catch (e) {
console.warn("[taas-affinity] trash sweep error", e)
} finally {
sweepInProgress = false
console.info("[taas-affinity] trash sweep: deleted=" + deleted + " truncated=" + truncated + " orphaned_locks=" + orphanedLocks + " elapsed=" + (Date.now() - t0) + "ms")
}
}

// ── Stuck-run status writer ──────────────────────────────────────────────────
interface RunState {
agentId: string
sessionUuid: string
sessionKey: string
lockMtime: number
idleMs: number
state: "active" | "warn" | "stuck" | "zombie"
pid: number | null
pidAlive: boolean | null
}

interface RunsStatus {
generatedAt: number
thresholds: { warnMs: number; stuckMs: number; zombieMs: number }
counts: { active: number; warn: number; stuck: number; zombie: number }
runs: RunState[]
}

let statusInProgress = false

function writeRunStatus(agentsDir?: string, statusPath?: string): void {
if (statusInProgress) return
statusInProgress = true
try {
const base = agentsDir || path.join(os.homedir(), ".openclaw", "agents")
const outPath = statusPath || STATUS_PATH
let agents: string[]
try { agents = fs.readdirSync(base) } catch { agents = [] }

const runs: RunState[] = []

for (const agentId of agents) {
const sessionsDir = path.join(base, agentId, "sessions")
let entries: string[]
try { entries = fs.readdirSync(sessionsDir) } catch { continue }

// Try to read identity.json for mainKey
let mainKey: string | null = null
try {
const idJson = fs.readFileSync(path.join(base, agentId, "agent", "identity.json"), "utf8")
const parsed = JSON.parse(idJson)
if (typeof parsed.mainKey === "string") mainKey = parsed.mainKey
} catch { /* no identity file */ }

// Build main session UUID from mainKey
let mainUuid: string | null = null
if (mainKey) {
// mainKey format like "agent:<id>:main" => no UUID, or "agent:<id>:session:<uuid>"
// But for the main session, it's typically "agent:<id>:main"
// We'll compare by checking if sessionUuid appears in mainKey
const parts = mainKey.split(":")
const lastPart = parts[parts.length - 1]
if (lastPart !== "main" && lastPart.length >= 8) {
mainUuid = lastPart
}
}

for (const name of entries) {
if (!name.endsWith(".jsonl.lock")) continue
const sessionUuid = name.replace(/\.jsonl\.lock$/, "")
const fp = path.join(sessionsDir, name)
let st: fs.Stats
try { st = fs.statSync(fp) } catch { continue }

const lockMtime = st.mtimeMs
const idleMs = Date.now() - lockMtime

let state: RunState["state"]
if (idleMs < STATUS_WARN_MS) state = "active"
else if (idleMs < STATUS_STUCK_MS) state = "warn"
else if (idleMs < STATUS_ZOMBIE_MS) state = "stuck"
else state = "zombie"

let pid: number | null = null
try {
const content = fs.readFileSync(fp, "utf8").trim()
const pidMatch = content.match(/^\d+/)
if (pidMatch) pid = parseInt(pidMatch[0], 10)
} catch { /* empty */ }

let pidAlive: boolean | null = null
if (pid !== null) {
try { pidAlive = fs.existsSync("/proc/" + pid) } catch { pidAlive = null }
}

let sessionKey: string
if (mainKey && sessionUuid === mainUuid) {
sessionKey = "agent:" + agentId + ":main"
} else {
sessionKey = "agent:" + agentId + ":session:" + sessionUuid
}

runs.push({ agentId, sessionUuid, sessionKey, lockMtime, idleMs, state, pid, pidAlive })
}
}

runs.sort((a, b) => b.idleMs - a.idleMs)

const counts = { active: 0, warn: 0, stuck: 0, zombie: 0 }
for (const r of runs) counts[r.state]++

const output: RunsStatus = {
generatedAt: Date.now(),
thresholds: { warnMs: STATUS_WARN_MS, stuckMs: STATUS_STUCK_MS, zombieMs: STATUS_ZOMBIE_MS },
counts,
runs,
}

// Atomic write
const dir = path.dirname(outPath)
try { fs.mkdirSync(dir, { recursive: true }) } catch { /* may already exist */ }
const tmpPath = path.join(dir, ".runs-status.tmp." + process.pid)
fs.writeFileSync(tmpPath, JSON.stringify(output, null, 2))
fs.renameSync(tmpPath, outPath)
} catch (e) {
console.warn("[taas-affinity] runs-status write error", e)
} finally {
statusInProgress = false
}
}

// ── Background task scheduler ────────────────────────────────────────────────
const backgroundTimers: (NodeJS.Timeout)[] = []

function startBackgroundTasks(): void {
// Trash sweeper — randomised initial delay (0-30s) to stagger
const sweepDelay = Math.floor(Math.random() * 30_000)
const sweepInit = setTimeout(() => {
runTrashSweep()
backgroundTimers.push(setInterval(() => runTrashSweep(), SWEEP_INTERVAL_MS))
}, sweepDelay)
backgroundTimers.push(sweepInit)

// Stuck-run status writer — 5s initial delay, then every 30s
const statusInit = setTimeout(() => {
writeRunStatus()
backgroundTimers.push(setInterval(() => writeRunStatus(), STATUS_INTERVAL_MS))
}, 5_000)
backgroundTimers.push(statusInit)
}

const LAST_ROUTE_LIMIT = 256
const lastRouteBySessionId = new Map<string, AutorouterCapture>()
const lastRouteByAgentId = new Map<string, AutorouterCapture>()
Expand Down Expand Up @@ -875,5 +1153,15 @@ export default {
},
{ scope: "operator.read" }
)

// Start background tasks (trash sweeper + stuck-run status writer).
// Timers run for the lifetime of the gateway process.
try { startBackgroundTasks() } catch (e) { console.warn("[taas-affinity] failed to start background tasks", e) }
},
_testExports: {
runTrashSweep,
writeRunStatus,
resetSweepInProgress: () => { sweepInProgress = false },
resetStatusInProgress: () => { statusInProgress = false },
},
}
Loading
Loading