Skip to content

Commit 43954b6

Browse files
feat: add spawn circuit breaker, lifecycle logging, and heartbeat auto-recovery
Addresses remaining items from #185 (control-agent resilience): After 3 consecutive spawn failures, the circuit opens and rejects new spawn attempts for 5 minutes (cooldown). This prevents resource waste when spawns are failing due to systemic issues (missing API keys, model unavailability, etc.). The circuit transitions: closed → open (after 3 failures) → half-open (after cooldown) → closed (on success) Failure tracking counts tmux spawn failures, readiness timeouts, and aborted readiness checks. Validation errors (bad name, missing model) don't affect the circuit. All spawn events are logged to ~/.pi/agent/logs/worker-lifecycle.jsonl: - spawn_started: when a spawn attempt begins - spawn_success: readiness verified (includes ready_after_ms) - spawn_failed: tmux error, readiness timeout, or abort - circuit_rejected: spawn refused by open circuit New `spawn_status` tool exposes circuit breaker state and recent lifecycle events for observability. Before prompting the control-agent about failures, the heartbeat now attempts automatic recovery for two failure types: - Bridge down: kills existing bridge tmux session, clears port holders, runs startup-pi.sh, verifies bridge comes back - Orphaned dev-agents: kills tmux session, removes stale alias If recovery succeeds, no LLM tokens are consumed (same as healthy check). Only unrecoverable failures prompt the agent. Recovery actions are logged to ~/.pi/agent/logs/auto-recovery.jsonl for audit and debugging. - Fixed test harness to handle multiple tool registrations - Added circuit breaker test (3 failures → open → rejected) - Added spawn_status tool registration test - All 128 tests pass (73 heartbeat + 6 agent-spawn + 49 memory) Refs #185 Co-authored-by: Darcy Clarke <darcy@darcyclarke.me>
1 parent 070a21f commit 43954b6

3 files changed

Lines changed: 471 additions & 12 deletions

File tree

pi/extensions/agent-spawn.test.mjs

Lines changed: 56 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -13,16 +13,16 @@ function randomId() {
1313
}
1414

1515
function createExtensionHarness(execImpl) {
16-
let registeredTool = null;
16+
const registeredTools = {};
1717
const pi = {
1818
registerTool(tool) {
19-
registeredTool = tool;
19+
registeredTools[tool.name] = tool;
2020
},
2121
exec: execImpl,
2222
};
2323
agentSpawnExtension(pi);
24-
if (!registeredTool) throw new Error("agent_spawn tool was not registered");
25-
return registeredTool;
24+
if (!registeredTools.agent_spawn) throw new Error("agent_spawn tool was not registered");
25+
return registeredTools.agent_spawn;
2626
}
2727

2828
function startUnixSocketServer(socketPath) {
@@ -242,4 +242,56 @@ describe("agent_spawn extension tool", () => {
242242
expect(result.details.aborted).toBe(true);
243243
expect(Date.now() - startedAt).toBeLessThan(1000);
244244
});
245+
246+
it("opens circuit breaker after 3 consecutive failures", async () => {
247+
const root = mkdtempSync(path.join(tmpdir(), "agent-spawn-test-"));
248+
tempDirs.push(root);
249+
const worktree = path.join(root, "worktree");
250+
const skillPath = path.join(root, "dev-skill");
251+
const controlDir = path.join(root, "session-control");
252+
process.env[CONTROL_DIR_ENV] = controlDir;
253+
mkdirSync(worktree, { recursive: true });
254+
mkdirSync(skillPath, { recursive: true });
255+
mkdirSync(controlDir, { recursive: true });
256+
257+
// Spawns succeed at tmux level but readiness always times out (1s timeout)
258+
const execSpy = vi.fn(async () => ({ stdout: "", stderr: "", code: 0, killed: false }));
259+
const tool = createExtensionHarness(execSpy);
260+
261+
const params = {
262+
session_name: `dev-agent-circuit-${randomId()}`,
263+
cwd: worktree,
264+
skill_path: skillPath,
265+
model: "anthropic/claude-opus-4-6",
266+
ready_timeout_sec: 1,
267+
};
268+
269+
// Fail 3 times (readiness timeout)
270+
for (let i = 0; i < 3; i++) {
271+
params.session_name = `dev-agent-circuit-${randomId()}`;
272+
const result = await tool.execute("id", params, undefined, undefined, {});
273+
expect(result.isError).toBe(true);
274+
expect(result.details.error).toBe("readiness_timeout");
275+
}
276+
277+
// 4th attempt should be rejected by circuit breaker
278+
params.session_name = `dev-agent-circuit-${randomId()}`;
279+
const rejected = await tool.execute("id", params, undefined, undefined, {});
280+
expect(rejected.isError).toBe(true);
281+
expect(rejected.details.error).toBe("circuit_open");
282+
expect(String(rejected.content[0].text)).toContain("Circuit breaker OPEN");
283+
});
284+
285+
it("exposes spawn_status tool", () => {
286+
const registeredTools = {};
287+
const pi = {
288+
registerTool(tool) {
289+
registeredTools[tool.name] = tool;
290+
},
291+
exec: async () => ({ stdout: "", stderr: "", code: 0 }),
292+
};
293+
agentSpawnExtension(pi);
294+
expect(registeredTools.spawn_status).toBeDefined();
295+
expect(registeredTools.spawn_status.name).toBe("spawn_status");
296+
});
245297
});

pi/extensions/agent-spawn.ts

Lines changed: 227 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
import type { ExtensionAPI } from "@mariozechner/pi-coding-agent";
22
import { Type } from "@sinclair/typebox";
3-
import { existsSync, mkdirSync, readlinkSync, statSync } from "node:fs";
3+
import { appendFileSync, existsSync, mkdirSync, readlinkSync, statSync } from "node:fs";
44
import net from "node:net";
55
import { homedir } from "node:os";
66
import { dirname, join, resolve } from "node:path";
@@ -15,8 +15,101 @@ const READINESS_POLL_MS = 200;
1515
const SOCKET_PROBE_TIMEOUT_MS = 300;
1616
const TMUX_SPAWN_TIMEOUT_MS = 15_000;
1717

18+
// Circuit breaker defaults
19+
const CIRCUIT_FAILURE_THRESHOLD = 3;
20+
const CIRCUIT_COOLDOWN_MS = 5 * 60 * 1000; // 5 minutes
21+
1822
type SpawnStage = "spawn" | "wait_alias" | "wait_socket" | "probe" | "aborted";
1923

24+
// ── Circuit Breaker ─────────────────────────────────────────────────────────
25+
26+
type CircuitState = "closed" | "open" | "half-open";
27+
28+
type CircuitBreaker = {
29+
state: CircuitState;
30+
consecutiveFailures: number;
31+
lastFailureAt: number | null;
32+
lastSuccessAt: number | null;
33+
totalFailures: number;
34+
totalSuccesses: number;
35+
};
36+
37+
function createCircuitBreaker(): CircuitBreaker {
38+
return {
39+
state: "closed",
40+
consecutiveFailures: 0,
41+
lastFailureAt: null,
42+
lastSuccessAt: null,
43+
totalFailures: 0,
44+
totalSuccesses: 0,
45+
};
46+
}
47+
48+
function recordSuccess(cb: CircuitBreaker): void {
49+
cb.consecutiveFailures = 0;
50+
cb.lastSuccessAt = Date.now();
51+
cb.totalSuccesses++;
52+
cb.state = "closed";
53+
}
54+
55+
function recordFailure(cb: CircuitBreaker): void {
56+
cb.consecutiveFailures++;
57+
cb.lastFailureAt = Date.now();
58+
cb.totalFailures++;
59+
if (cb.consecutiveFailures >= CIRCUIT_FAILURE_THRESHOLD) {
60+
cb.state = "open";
61+
}
62+
}
63+
64+
function isCircuitOpen(cb: CircuitBreaker): boolean {
65+
if (cb.state !== "open") return false;
66+
// Check if cooldown has elapsed → transition to half-open
67+
if (cb.lastFailureAt && Date.now() - cb.lastFailureAt >= CIRCUIT_COOLDOWN_MS) {
68+
cb.state = "half-open";
69+
return false;
70+
}
71+
return true;
72+
}
73+
74+
function circuitStatus(cb: CircuitBreaker): string {
75+
const cooldownRemaining =
76+
cb.state === "open" && cb.lastFailureAt
77+
? Math.max(0, CIRCUIT_COOLDOWN_MS - (Date.now() - cb.lastFailureAt))
78+
: 0;
79+
return [
80+
`State: ${cb.state}`,
81+
`Consecutive failures: ${cb.consecutiveFailures}/${CIRCUIT_FAILURE_THRESHOLD}`,
82+
`Total: ${cb.totalSuccesses} ok, ${cb.totalFailures} failed`,
83+
`Last success: ${cb.lastSuccessAt ? new Date(cb.lastSuccessAt).toISOString() : "never"}`,
84+
`Last failure: ${cb.lastFailureAt ? new Date(cb.lastFailureAt).toISOString() : "never"}`,
85+
cb.state === "open" ? `Cooldown remaining: ${Math.round(cooldownRemaining / 1000)}s` : "",
86+
]
87+
.filter(Boolean)
88+
.join("\n ");
89+
}
90+
91+
// ── Lifecycle Log ───────────────────────────────────────────────────────────
92+
93+
const LIFECYCLE_LOG_PATH = join(homedir(), ".pi", "agent", "logs", "worker-lifecycle.jsonl");
94+
95+
type LifecycleEvent = {
96+
timestamp: string;
97+
session_name: string;
98+
event: "spawn_started" | "spawn_success" | "spawn_failed" | "circuit_rejected";
99+
stage?: string;
100+
ready_after_ms?: number;
101+
error?: string;
102+
};
103+
104+
function logLifecycleEvent(event: LifecycleEvent): void {
105+
try {
106+
mkdirSync(dirname(LIFECYCLE_LOG_PATH), { recursive: true });
107+
appendFileSync(LIFECYCLE_LOG_PATH, JSON.stringify(event) + "\n");
108+
} catch {
109+
// Best-effort — don't break spawn on logging failure
110+
}
111+
}
112+
20113
type ReadinessResult = {
21114
ready: boolean;
22115
aborted: boolean;
@@ -192,11 +285,14 @@ type AgentSpawnInput = {
192285
};
193286

194287
export default function agentSpawnExtension(pi: ExtensionAPI): void {
288+
const circuit = createCircuitBreaker();
289+
195290
pi.registerTool({
196291
name: "agent_spawn",
197292
label: "Agent Spawn",
198293
description:
199-
"Spawn a pi session in tmux and verify readiness through session-control alias/socket with a bounded timeout.",
294+
"Spawn a pi session in tmux and verify readiness through session-control alias/socket with a bounded timeout. " +
295+
"Includes a circuit breaker: after 3 consecutive failures, spawns are rejected for 5 minutes to prevent resource waste.",
200296
parameters: Type.Object({
201297
session_name: Type.String({ description: "Target session name (also PI_SESSION_NAME)" }),
202298
cwd: Type.String({ description: "Working directory for the new session" }),
@@ -215,6 +311,38 @@ export default function agentSpawnExtension(pi: ExtensionAPI): void {
215311
const model = input.model?.trim();
216312
const readyTimeoutSec = clampReadyTimeout(input.ready_timeout_sec);
217313

314+
// Circuit breaker check
315+
if (isCircuitOpen(circuit)) {
316+
const cooldownLeft = circuit.lastFailureAt
317+
? Math.max(0, CIRCUIT_COOLDOWN_MS - (Date.now() - circuit.lastFailureAt))
318+
: 0;
319+
logLifecycleEvent({
320+
timestamp: new Date().toISOString(),
321+
session_name: sessionName || "unknown",
322+
event: "circuit_rejected",
323+
error: `Circuit open after ${circuit.consecutiveFailures} failures. Cooldown: ${Math.round(cooldownLeft / 1000)}s`,
324+
});
325+
return {
326+
content: [{
327+
type: "text",
328+
text:
329+
`⚡ Circuit breaker OPEN — ${circuit.consecutiveFailures} consecutive spawn failures. ` +
330+
`Refusing new spawns for ${Math.round(cooldownLeft / 1000)}s to prevent resource waste. ` +
331+
`Investigate the root cause (check logs, API keys, model availability).`,
332+
}],
333+
isError: true,
334+
details: {
335+
error: "circuit_open",
336+
circuit: {
337+
state: circuit.state,
338+
consecutive_failures: circuit.consecutiveFailures,
339+
cooldown_remaining_sec: Math.round(cooldownLeft / 1000),
340+
last_failure: circuit.lastFailureAt ? new Date(circuit.lastFailureAt).toISOString() : null,
341+
},
342+
},
343+
};
344+
}
345+
218346
if (!sessionName || !isSafeName(sessionName)) {
219347
return {
220348
content: [{ type: "text", text: "Invalid session_name. Use only letters, numbers, '.', '_', and '-'." }],
@@ -273,6 +401,12 @@ export default function agentSpawnExtension(pi: ExtensionAPI): void {
273401
};
274402
}
275403

404+
logLifecycleEvent({
405+
timestamp: new Date().toISOString(),
406+
session_name: sessionName,
407+
event: "spawn_started",
408+
});
409+
276410
const tmuxCommand = [
277411
`cd ${shellQuote(cwdPath)}`,
278412
'export PATH="$HOME/.varlock/bin:$HOME/opt/node/bin:$PATH"',
@@ -290,6 +424,14 @@ export default function agentSpawnExtension(pi: ExtensionAPI): void {
290424
);
291425

292426
if (spawnResult.code !== 0) {
427+
recordFailure(circuit);
428+
logLifecycleEvent({
429+
timestamp: new Date().toISOString(),
430+
session_name: sessionName,
431+
event: "spawn_failed",
432+
stage: "spawn",
433+
error: `tmux exit code ${spawnResult.code}`,
434+
});
293435
return {
294436
content: [{ type: "text", text: `Failed to spawn tmux session ${sessionName}.` }],
295437
isError: true,
@@ -304,6 +446,8 @@ export default function agentSpawnExtension(pi: ExtensionAPI): void {
304446
stdout: spawnResult.stdout,
305447
stderr: spawnResult.stderr,
306448
exit_code: spawnResult.code,
449+
circuit_state: circuit.state,
450+
circuit_failures: circuit.consecutiveFailures,
307451
},
308452
};
309453
}
@@ -321,9 +465,19 @@ export default function agentSpawnExtension(pi: ExtensionAPI): void {
321465
ready_after_ms: readiness.readyAfterMs,
322466
stage: readiness.stage,
323467
error: readiness.ready ? null : readiness.aborted ? "readiness_aborted" : "readiness_timeout",
468+
circuit_state: circuit.state,
469+
circuit_failures: circuit.consecutiveFailures,
324470
};
325471

326472
if (readiness.aborted) {
473+
recordFailure(circuit);
474+
logLifecycleEvent({
475+
timestamp: new Date().toISOString(),
476+
session_name: sessionName,
477+
event: "spawn_failed",
478+
stage: "aborted",
479+
error: "readiness_aborted",
480+
});
327481
return {
328482
content: [{
329483
type: "text",
@@ -335,6 +489,15 @@ export default function agentSpawnExtension(pi: ExtensionAPI): void {
335489
}
336490

337491
if (!readiness.ready) {
492+
recordFailure(circuit);
493+
logLifecycleEvent({
494+
timestamp: new Date().toISOString(),
495+
session_name: sessionName,
496+
event: "spawn_failed",
497+
stage: readiness.stage,
498+
ready_after_ms: readiness.readyAfterMs,
499+
error: "readiness_timeout",
500+
});
338501
return {
339502
content: [{
340503
type: "text",
@@ -347,6 +510,15 @@ export default function agentSpawnExtension(pi: ExtensionAPI): void {
347510
};
348511
}
349512

513+
recordSuccess(circuit);
514+
logLifecycleEvent({
515+
timestamp: new Date().toISOString(),
516+
session_name: sessionName,
517+
event: "spawn_success",
518+
stage: readiness.stage,
519+
ready_after_ms: readiness.readyAfterMs,
520+
});
521+
350522
return {
351523
content: [{
352524
type: "text",
@@ -358,4 +530,57 @@ export default function agentSpawnExtension(pi: ExtensionAPI): void {
358530
};
359531
},
360532
});
533+
534+
// ── spawn_status tool ─────────────────────────────────────────────────────
535+
536+
pi.registerTool({
537+
name: "spawn_status",
538+
label: "Spawn Status",
539+
description:
540+
"Check the agent_spawn circuit breaker state and recent worker lifecycle events.",
541+
parameters: Type.Object({}),
542+
async execute() {
543+
let recentEvents = "";
544+
try {
545+
const { execSync } = require("node:child_process");
546+
const tail = execSync(`tail -20 "${LIFECYCLE_LOG_PATH}" 2>/dev/null`, { encoding: "utf-8" });
547+
if (tail.trim()) {
548+
const lines = tail.trim().split("\n");
549+
recentEvents = lines
550+
.map((line: string) => {
551+
try {
552+
const e = JSON.parse(line) as LifecycleEvent;
553+
return ` ${e.timestamp} ${e.event} ${e.session_name}${e.error ? ` (${e.error})` : ""}${e.ready_after_ms ? ` [${e.ready_after_ms}ms]` : ""}`;
554+
} catch {
555+
return ` (unparseable)`;
556+
}
557+
})
558+
.join("\n");
559+
}
560+
} catch {
561+
recentEvents = " (no lifecycle log)";
562+
}
563+
564+
return {
565+
content: [{
566+
type: "text" as const,
567+
text: [
568+
"Spawn Circuit Breaker:",
569+
` ${circuitStatus(circuit)}`,
570+
"",
571+
"Recent lifecycle events:",
572+
recentEvents || " (none)",
573+
].join("\n"),
574+
}],
575+
details: {
576+
circuit: {
577+
state: circuit.state,
578+
consecutive_failures: circuit.consecutiveFailures,
579+
total_successes: circuit.totalSuccesses,
580+
total_failures: circuit.totalFailures,
581+
},
582+
},
583+
};
584+
},
585+
});
361586
}

0 commit comments

Comments
 (0)