Skip to content

Commit 760d1e3

Browse files
feat: add spawn circuit breaker, lifecycle logging, and heartbeat auto-recovery
Addresses remaining items from modem-dev#185 (control-agent resilience): ## Circuit breaker (agent-spawn.ts) After 3 consecutive spawn failures, the circuit opens and rejects new spawn attempts for 5 minutes (cooldown). This prevents resource waste when spawns are failing due to systemic issues (missing API keys, model unavailability, etc.). The circuit transitions: closed → open (after 3 failures) → half-open (after cooldown) → closed (on success) Failure tracking counts tmux spawn failures, readiness timeouts, and aborted readiness checks. Validation errors (bad name, missing model) don't affect the circuit. ## Worker lifecycle logging (agent-spawn.ts) All spawn events are logged to ~/.pi/agent/logs/worker-lifecycle.jsonl: - spawn_started: when a spawn attempt begins - spawn_success: readiness verified (includes ready_after_ms) - spawn_failed: tmux error, readiness timeout, or abort - circuit_rejected: spawn refused by open circuit New `spawn_status` tool exposes circuit breaker state and recent lifecycle events for observability. ## Heartbeat auto-recovery (heartbeat.ts) Before prompting the control-agent about failures, the heartbeat now attempts automatic recovery for two failure types: - Bridge down: kills existing bridge tmux session, clears port holders, runs startup-pi.sh, verifies bridge comes back - Orphaned dev-agents: kills tmux session, removes stale alias If recovery succeeds, no LLM tokens are consumed (same as healthy check). Only unrecoverable failures prompt the agent. Recovery actions are logged to ~/.pi/agent/logs/auto-recovery.jsonl for audit and debugging. ## Tests - Fixed test harness to handle multiple tool registrations - Added circuit breaker test (3 failures → open → rejected) - Added spawn_status tool registration test - All 128 tests pass (73 heartbeat + 6 agent-spawn + 49 memory) Refs modem-dev#185 Co-authored-by: Darcy Clarke <darcy@darcyclarke.me>
1 parent b0e7756 commit 760d1e3

3 files changed

Lines changed: 471 additions & 12 deletions

File tree

pi/extensions/agent-spawn.test.mjs

Lines changed: 56 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -13,16 +13,16 @@ function randomId() {
1313
}
1414

1515
function createExtensionHarness(execImpl) {
16-
let registeredTool = null;
16+
const registeredTools = {};
1717
const pi = {
1818
registerTool(tool) {
19-
registeredTool = tool;
19+
registeredTools[tool.name] = tool;
2020
},
2121
exec: execImpl,
2222
};
2323
agentSpawnExtension(pi);
24-
if (!registeredTool) throw new Error("agent_spawn tool was not registered");
25-
return registeredTool;
24+
if (!registeredTools.agent_spawn) throw new Error("agent_spawn tool was not registered");
25+
return registeredTools.agent_spawn;
2626
}
2727

2828
function startUnixSocketServer(socketPath) {
@@ -242,4 +242,56 @@ describe("agent_spawn extension tool", () => {
242242
expect(result.details.aborted).toBe(true);
243243
expect(Date.now() - startedAt).toBeLessThan(1000);
244244
});
245+
246+
it("opens circuit breaker after 3 consecutive failures", async () => {
247+
const root = mkdtempSync(path.join(tmpdir(), "agent-spawn-test-"));
248+
tempDirs.push(root);
249+
const worktree = path.join(root, "worktree");
250+
const skillPath = path.join(root, "dev-skill");
251+
const controlDir = path.join(root, "session-control");
252+
process.env[CONTROL_DIR_ENV] = controlDir;
253+
mkdirSync(worktree, { recursive: true });
254+
mkdirSync(skillPath, { recursive: true });
255+
mkdirSync(controlDir, { recursive: true });
256+
257+
// Spawns succeed at tmux level but readiness always times out (1s timeout)
258+
const execSpy = vi.fn(async () => ({ stdout: "", stderr: "", code: 0, killed: false }));
259+
const tool = createExtensionHarness(execSpy);
260+
261+
const params = {
262+
session_name: `dev-agent-circuit-${randomId()}`,
263+
cwd: worktree,
264+
skill_path: skillPath,
265+
model: "anthropic/claude-opus-4-6",
266+
ready_timeout_sec: 1,
267+
};
268+
269+
// Fail 3 times (readiness timeout)
270+
for (let i = 0; i < 3; i++) {
271+
params.session_name = `dev-agent-circuit-${randomId()}`;
272+
const result = await tool.execute("id", params, undefined, undefined, {});
273+
expect(result.isError).toBe(true);
274+
expect(result.details.error).toBe("readiness_timeout");
275+
}
276+
277+
// 4th attempt should be rejected by circuit breaker
278+
params.session_name = `dev-agent-circuit-${randomId()}`;
279+
const rejected = await tool.execute("id", params, undefined, undefined, {});
280+
expect(rejected.isError).toBe(true);
281+
expect(rejected.details.error).toBe("circuit_open");
282+
expect(String(rejected.content[0].text)).toContain("Circuit breaker OPEN");
283+
});
284+
285+
it("exposes spawn_status tool", () => {
286+
const registeredTools = {};
287+
const pi = {
288+
registerTool(tool) {
289+
registeredTools[tool.name] = tool;
290+
},
291+
exec: async () => ({ stdout: "", stderr: "", code: 0 }),
292+
};
293+
agentSpawnExtension(pi);
294+
expect(registeredTools.spawn_status).toBeDefined();
295+
expect(registeredTools.spawn_status.name).toBe("spawn_status");
296+
});
245297
});

pi/extensions/agent-spawn.ts

Lines changed: 227 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
import type { ExtensionAPI } from "@mariozechner/pi-coding-agent";
22
import { Type } from "@sinclair/typebox";
3-
import { existsSync, mkdirSync, readlinkSync, statSync } from "node:fs";
3+
import { appendFileSync, existsSync, mkdirSync, readlinkSync, statSync } from "node:fs";
44
import net from "node:net";
55
import { homedir } from "node:os";
66
import { dirname, join, resolve } from "node:path";
@@ -15,8 +15,101 @@ const READINESS_POLL_MS = 200;
1515
const SOCKET_PROBE_TIMEOUT_MS = 300;
1616
const TMUX_SPAWN_TIMEOUT_MS = 15_000;
1717

18+
// Circuit breaker defaults
19+
const CIRCUIT_FAILURE_THRESHOLD = 3;
20+
const CIRCUIT_COOLDOWN_MS = 5 * 60 * 1000; // 5 minutes
21+
1822
type SpawnStage = "spawn" | "wait_alias" | "wait_socket" | "probe" | "aborted";
1923

24+
// ── Circuit Breaker ─────────────────────────────────────────────────────────
25+
26+
type CircuitState = "closed" | "open" | "half-open";
27+
28+
type CircuitBreaker = {
29+
state: CircuitState;
30+
consecutiveFailures: number;
31+
lastFailureAt: number | null;
32+
lastSuccessAt: number | null;
33+
totalFailures: number;
34+
totalSuccesses: number;
35+
};
36+
37+
function createCircuitBreaker(): CircuitBreaker {
38+
return {
39+
state: "closed",
40+
consecutiveFailures: 0,
41+
lastFailureAt: null,
42+
lastSuccessAt: null,
43+
totalFailures: 0,
44+
totalSuccesses: 0,
45+
};
46+
}
47+
48+
function recordSuccess(cb: CircuitBreaker): void {
49+
cb.consecutiveFailures = 0;
50+
cb.lastSuccessAt = Date.now();
51+
cb.totalSuccesses++;
52+
cb.state = "closed";
53+
}
54+
55+
function recordFailure(cb: CircuitBreaker): void {
56+
cb.consecutiveFailures++;
57+
cb.lastFailureAt = Date.now();
58+
cb.totalFailures++;
59+
if (cb.consecutiveFailures >= CIRCUIT_FAILURE_THRESHOLD) {
60+
cb.state = "open";
61+
}
62+
}
63+
64+
function isCircuitOpen(cb: CircuitBreaker): boolean {
65+
if (cb.state !== "open") return false;
66+
// Check if cooldown has elapsed → transition to half-open
67+
if (cb.lastFailureAt && Date.now() - cb.lastFailureAt >= CIRCUIT_COOLDOWN_MS) {
68+
cb.state = "half-open";
69+
return false;
70+
}
71+
return true;
72+
}
73+
74+
function circuitStatus(cb: CircuitBreaker): string {
75+
const cooldownRemaining =
76+
cb.state === "open" && cb.lastFailureAt
77+
? Math.max(0, CIRCUIT_COOLDOWN_MS - (Date.now() - cb.lastFailureAt))
78+
: 0;
79+
return [
80+
`State: ${cb.state}`,
81+
`Consecutive failures: ${cb.consecutiveFailures}/${CIRCUIT_FAILURE_THRESHOLD}`,
82+
`Total: ${cb.totalSuccesses} ok, ${cb.totalFailures} failed`,
83+
`Last success: ${cb.lastSuccessAt ? new Date(cb.lastSuccessAt).toISOString() : "never"}`,
84+
`Last failure: ${cb.lastFailureAt ? new Date(cb.lastFailureAt).toISOString() : "never"}`,
85+
cb.state === "open" ? `Cooldown remaining: ${Math.round(cooldownRemaining / 1000)}s` : "",
86+
]
87+
.filter(Boolean)
88+
.join("\n ");
89+
}
90+
91+
// ── Lifecycle Log ───────────────────────────────────────────────────────────
92+
93+
const LIFECYCLE_LOG_PATH = join(homedir(), ".pi", "agent", "logs", "worker-lifecycle.jsonl");
94+
95+
type LifecycleEvent = {
96+
timestamp: string;
97+
session_name: string;
98+
event: "spawn_started" | "spawn_success" | "spawn_failed" | "circuit_rejected";
99+
stage?: string;
100+
ready_after_ms?: number;
101+
error?: string;
102+
};
103+
104+
function logLifecycleEvent(event: LifecycleEvent): void {
105+
try {
106+
mkdirSync(dirname(LIFECYCLE_LOG_PATH), { recursive: true });
107+
appendFileSync(LIFECYCLE_LOG_PATH, JSON.stringify(event) + "\n");
108+
} catch {
109+
// Best-effort — don't break spawn on logging failure
110+
}
111+
}
112+
20113
type ReadinessResult = {
21114
ready: boolean;
22115
aborted: boolean;
@@ -192,11 +285,14 @@ type AgentSpawnInput = {
192285
};
193286

194287
export default function agentSpawnExtension(pi: ExtensionAPI): void {
288+
const circuit = createCircuitBreaker();
289+
195290
pi.registerTool({
196291
name: "agent_spawn",
197292
label: "Agent Spawn",
198293
description:
199-
"Spawn a pi session in tmux and verify readiness through session-control alias/socket with a bounded timeout.",
294+
"Spawn a pi session in tmux and verify readiness through session-control alias/socket with a bounded timeout. " +
295+
"Includes a circuit breaker: after 3 consecutive failures, spawns are rejected for 5 minutes to prevent resource waste.",
200296
parameters: Type.Object({
201297
session_name: Type.String({ description: "Target session name (also PI_SESSION_NAME)" }),
202298
cwd: Type.String({ description: "Working directory for the new session" }),
@@ -215,6 +311,38 @@ export default function agentSpawnExtension(pi: ExtensionAPI): void {
215311
const model = input.model?.trim();
216312
const readyTimeoutSec = clampReadyTimeout(input.ready_timeout_sec);
217313

314+
// Circuit breaker check
315+
if (isCircuitOpen(circuit)) {
316+
const cooldownLeft = circuit.lastFailureAt
317+
? Math.max(0, CIRCUIT_COOLDOWN_MS - (Date.now() - circuit.lastFailureAt))
318+
: 0;
319+
logLifecycleEvent({
320+
timestamp: new Date().toISOString(),
321+
session_name: sessionName || "unknown",
322+
event: "circuit_rejected",
323+
error: `Circuit open after ${circuit.consecutiveFailures} failures. Cooldown: ${Math.round(cooldownLeft / 1000)}s`,
324+
});
325+
return {
326+
content: [{
327+
type: "text",
328+
text:
329+
`⚡ Circuit breaker OPEN — ${circuit.consecutiveFailures} consecutive spawn failures. ` +
330+
`Refusing new spawns for ${Math.round(cooldownLeft / 1000)}s to prevent resource waste. ` +
331+
`Investigate the root cause (check logs, API keys, model availability).`,
332+
}],
333+
isError: true,
334+
details: {
335+
error: "circuit_open",
336+
circuit: {
337+
state: circuit.state,
338+
consecutive_failures: circuit.consecutiveFailures,
339+
cooldown_remaining_sec: Math.round(cooldownLeft / 1000),
340+
last_failure: circuit.lastFailureAt ? new Date(circuit.lastFailureAt).toISOString() : null,
341+
},
342+
},
343+
};
344+
}
345+
218346
if (!sessionName || !isSafeName(sessionName)) {
219347
return {
220348
content: [{ type: "text", text: "Invalid session_name. Use only letters, numbers, '.', '_', and '-'." }],
@@ -273,6 +401,12 @@ export default function agentSpawnExtension(pi: ExtensionAPI): void {
273401
};
274402
}
275403

404+
logLifecycleEvent({
405+
timestamp: new Date().toISOString(),
406+
session_name: sessionName,
407+
event: "spawn_started",
408+
});
409+
276410
const tmuxCommand = [
277411
`cd ${shellQuote(cwdPath)}`,
278412
'export PATH="$HOME/.varlock/bin:$HOME/opt/node/bin:$PATH"',
@@ -290,6 +424,14 @@ export default function agentSpawnExtension(pi: ExtensionAPI): void {
290424
);
291425

292426
if (spawnResult.code !== 0) {
427+
recordFailure(circuit);
428+
logLifecycleEvent({
429+
timestamp: new Date().toISOString(),
430+
session_name: sessionName,
431+
event: "spawn_failed",
432+
stage: "spawn",
433+
error: `tmux exit code ${spawnResult.code}`,
434+
});
293435
return {
294436
content: [{ type: "text", text: `Failed to spawn tmux session ${sessionName}.` }],
295437
isError: true,
@@ -304,6 +446,8 @@ export default function agentSpawnExtension(pi: ExtensionAPI): void {
304446
stdout: spawnResult.stdout,
305447
stderr: spawnResult.stderr,
306448
exit_code: spawnResult.code,
449+
circuit_state: circuit.state,
450+
circuit_failures: circuit.consecutiveFailures,
307451
},
308452
};
309453
}
@@ -321,9 +465,19 @@ export default function agentSpawnExtension(pi: ExtensionAPI): void {
321465
ready_after_ms: readiness.readyAfterMs,
322466
stage: readiness.stage,
323467
error: readiness.ready ? null : readiness.aborted ? "readiness_aborted" : "readiness_timeout",
468+
circuit_state: circuit.state,
469+
circuit_failures: circuit.consecutiveFailures,
324470
};
325471

326472
if (readiness.aborted) {
473+
recordFailure(circuit);
474+
logLifecycleEvent({
475+
timestamp: new Date().toISOString(),
476+
session_name: sessionName,
477+
event: "spawn_failed",
478+
stage: "aborted",
479+
error: "readiness_aborted",
480+
});
327481
return {
328482
content: [{
329483
type: "text",
@@ -335,6 +489,15 @@ export default function agentSpawnExtension(pi: ExtensionAPI): void {
335489
}
336490

337491
if (!readiness.ready) {
492+
recordFailure(circuit);
493+
logLifecycleEvent({
494+
timestamp: new Date().toISOString(),
495+
session_name: sessionName,
496+
event: "spawn_failed",
497+
stage: readiness.stage,
498+
ready_after_ms: readiness.readyAfterMs,
499+
error: "readiness_timeout",
500+
});
338501
return {
339502
content: [{
340503
type: "text",
@@ -347,6 +510,15 @@ export default function agentSpawnExtension(pi: ExtensionAPI): void {
347510
};
348511
}
349512

513+
recordSuccess(circuit);
514+
logLifecycleEvent({
515+
timestamp: new Date().toISOString(),
516+
session_name: sessionName,
517+
event: "spawn_success",
518+
stage: readiness.stage,
519+
ready_after_ms: readiness.readyAfterMs,
520+
});
521+
350522
return {
351523
content: [{
352524
type: "text",
@@ -358,4 +530,57 @@ export default function agentSpawnExtension(pi: ExtensionAPI): void {
358530
};
359531
},
360532
});
533+
534+
// ── spawn_status tool ─────────────────────────────────────────────────────
535+
536+
pi.registerTool({
537+
name: "spawn_status",
538+
label: "Spawn Status",
539+
description:
540+
"Check the agent_spawn circuit breaker state and recent worker lifecycle events.",
541+
parameters: Type.Object({}),
542+
async execute() {
543+
let recentEvents = "";
544+
try {
545+
const { execSync } = require("node:child_process");
546+
const tail = execSync(`tail -20 "${LIFECYCLE_LOG_PATH}" 2>/dev/null`, { encoding: "utf-8" });
547+
if (tail.trim()) {
548+
const lines = tail.trim().split("\n");
549+
recentEvents = lines
550+
.map((line: string) => {
551+
try {
552+
const e = JSON.parse(line) as LifecycleEvent;
553+
return ` ${e.timestamp} ${e.event} ${e.session_name}${e.error ? ` (${e.error})` : ""}${e.ready_after_ms ? ` [${e.ready_after_ms}ms]` : ""}`;
554+
} catch {
555+
return ` (unparseable)`;
556+
}
557+
})
558+
.join("\n");
559+
}
560+
} catch {
561+
recentEvents = " (no lifecycle log)";
562+
}
563+
564+
return {
565+
content: [{
566+
type: "text" as const,
567+
text: [
568+
"Spawn Circuit Breaker:",
569+
` ${circuitStatus(circuit)}`,
570+
"",
571+
"Recent lifecycle events:",
572+
recentEvents || " (none)",
573+
].join("\n"),
574+
}],
575+
details: {
576+
circuit: {
577+
state: circuit.state,
578+
consecutive_failures: circuit.consecutiveFailures,
579+
total_successes: circuit.totalSuccesses,
580+
total_failures: circuit.totalFailures,
581+
},
582+
},
583+
};
584+
},
585+
});
361586
}

0 commit comments

Comments
 (0)