Skip to content

Commit c780b6a

Browse files
joeykrugsteipete
authored andcommitted
fix: address all review comments on PR openclaw#47719 + implement resume context and config idempotency guard
1 parent 44304ba commit c780b6a

3 files changed

Lines changed: 201 additions & 15 deletions

File tree

src/agents/subagent-orphan-recovery.test.ts

Lines changed: 109 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,14 @@ vi.mock("../gateway/call.js", () => ({
1919
callGateway: vi.fn(async () => ({ runId: "test-run-id" })),
2020
}));
2121

22+
vi.mock("../gateway/session-utils.fs.js", () => ({
23+
readSessionMessages: vi.fn(() => []),
24+
}));
25+
26+
vi.mock("./subagent-registry.js", () => ({
27+
replaceSubagentRunAfterSteer: vi.fn(() => true),
28+
}));
29+
2230
function createTestRunRecord(overrides: Partial<SubagentRunRecord> = {}): SubagentRunRecord {
2331
return {
2432
runId: "run-1",
@@ -45,6 +53,7 @@ describe("subagent-orphan-recovery", () => {
4553
it("recovers orphaned sessions with abortedLastRun=true", async () => {
4654
const sessions = await import("../config/sessions.js");
4755
const gateway = await import("../gateway/call.js");
56+
const subagentRegistry = await import("./subagent-registry.js");
4857

4958
const sessionEntry = {
5059
sessionId: "session-abc",
@@ -78,6 +87,10 @@ describe("subagent-orphan-recovery", () => {
7887
expect(params.sessionKey).toBe("agent:main:subagent:test-session-1");
7988
expect(params.message).toContain("gateway reload");
8089
expect(params.message).toContain("Test task: implement feature X");
90+
expect(subagentRegistry.replaceSubagentRunAfterSteer).toHaveBeenCalledWith({
91+
previousRunId: "run-1",
92+
nextRunId: "test-run-id",
93+
});
8194
});
8295

8396
it("skips sessions that are not aborted", async () => {
@@ -321,4 +334,100 @@ describe("subagent-orphan-recovery", () => {
321334
expect(message.length).toBeLessThan(5000);
322335
expect(message).toContain("...");
323336
});
337+
338+
it("includes last human message in resume when available", async () => {
339+
const sessions = await import("../config/sessions.js");
340+
const gateway = await import("../gateway/call.js");
341+
const sessionUtils = await import("../gateway/session-utils.fs.js");
342+
343+
vi.mocked(sessions.loadSessionStore).mockReturnValue({
344+
"agent:main:subagent:test-session-1": {
345+
sessionId: "session-abc",
346+
updatedAt: Date.now(),
347+
abortedLastRun: true,
348+
sessionFile: "session-abc.jsonl",
349+
},
350+
});
351+
352+
vi.mocked(sessionUtils.readSessionMessages).mockReturnValue([
353+
{ role: "user", content: [{ type: "text", text: "Please build feature Y" }] },
354+
{ role: "assistant", content: [{ type: "text", text: "Working on it..." }] },
355+
{ role: "user", content: [{ type: "text", text: "Also add tests for it" }] },
356+
{ role: "assistant", content: [{ type: "text", text: "Sure, adding tests now." }] },
357+
]);
358+
359+
const activeRuns = new Map<string, SubagentRunRecord>();
360+
activeRuns.set("run-1", createTestRunRecord());
361+
362+
const { recoverOrphanedSubagentSessions } = await import("./subagent-orphan-recovery.js");
363+
await recoverOrphanedSubagentSessions({ getActiveRuns: () => activeRuns });
364+
365+
const callArgs = vi.mocked(gateway.callGateway).mock.calls[0];
366+
const params = callArgs[0].params as Record<string, unknown>;
367+
const message = params.message as string;
368+
expect(message).toContain("Also add tests for it");
369+
expect(message).toContain("last message from the user");
370+
});
371+
372+
it("adds config change hint when assistant messages reference config modifications", async () => {
373+
const sessions = await import("../config/sessions.js");
374+
const gateway = await import("../gateway/call.js");
375+
const sessionUtils = await import("../gateway/session-utils.fs.js");
376+
377+
vi.mocked(sessions.loadSessionStore).mockReturnValue({
378+
"agent:main:subagent:test-session-1": {
379+
sessionId: "session-abc",
380+
updatedAt: Date.now(),
381+
abortedLastRun: true,
382+
},
383+
});
384+
385+
vi.mocked(sessionUtils.readSessionMessages).mockReturnValue([
386+
{ role: "user", content: "Update the config" },
387+
{ role: "assistant", content: "I've modified openclaw.json to add the new setting." },
388+
]);
389+
390+
const activeRuns = new Map<string, SubagentRunRecord>();
391+
activeRuns.set("run-1", createTestRunRecord());
392+
393+
const { recoverOrphanedSubagentSessions } = await import("./subagent-orphan-recovery.js");
394+
await recoverOrphanedSubagentSessions({ getActiveRuns: () => activeRuns });
395+
396+
const callArgs = vi.mocked(gateway.callGateway).mock.calls[0];
397+
const params = callArgs[0].params as Record<string, unknown>;
398+
const message = params.message as string;
399+
expect(message).toContain("config changes from your previous run were already applied");
400+
});
401+
402+
it("prevents duplicate resume when updateSessionStore fails", async () => {
403+
const sessions = await import("../config/sessions.js");
404+
const gateway = await import("../gateway/call.js");
405+
406+
vi.mocked(gateway.callGateway).mockResolvedValue({ runId: "new-run" } as never);
407+
vi.mocked(sessions.updateSessionStore).mockRejectedValue(new Error("write failed"));
408+
409+
vi.mocked(sessions.loadSessionStore).mockReturnValue({
410+
"agent:main:subagent:test-session-1": {
411+
sessionId: "session-abc",
412+
updatedAt: Date.now(),
413+
abortedLastRun: true,
414+
},
415+
});
416+
417+
const activeRuns = new Map<string, SubagentRunRecord>();
418+
activeRuns.set("run-1", createTestRunRecord());
419+
activeRuns.set(
420+
"run-2",
421+
createTestRunRecord({
422+
runId: "run-2",
423+
}),
424+
);
425+
426+
const { recoverOrphanedSubagentSessions } = await import("./subagent-orphan-recovery.js");
427+
const result = await recoverOrphanedSubagentSessions({ getActiveRuns: () => activeRuns });
428+
429+
expect(result.recovered).toBe(1);
430+
expect(result.skipped).toBe(1);
431+
expect(gateway.callGateway).toHaveBeenCalledOnce();
432+
});
324433
});

src/agents/subagent-orphan-recovery.ts

Lines changed: 87 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,9 @@ import {
1919
type SessionEntry,
2020
} from "../config/sessions.js";
2121
import { callGateway } from "../gateway/call.js";
22+
import { readSessionMessages } from "../gateway/session-utils.fs.js";
2223
import { createSubsystemLogger } from "../logging/subsystem.js";
24+
import { replaceSubagentRunAfterSteer } from "./subagent-registry.js";
2325
import type { SubagentRunRecord } from "./subagent-registry.types.js";
2426

2527
const log = createSubsystemLogger("subagent-orphan-recovery");
@@ -30,14 +32,45 @@ const DEFAULT_RECOVERY_DELAY_MS = 5_000;
3032
/**
3133
* Build the resume message for an orphaned subagent.
3234
*/
33-
function buildResumeMessage(task: string): string {
35+
function buildResumeMessage(task: string, lastHumanMessage?: string): string {
3436
const maxTaskLen = 2000;
3537
const truncatedTask = task.length > maxTaskLen ? `${task.slice(0, maxTaskLen)}...` : task;
3638

37-
return (
39+
let message =
3840
`[System] Your previous turn was interrupted by a gateway reload. ` +
39-
`Your task was:\n\n${truncatedTask}\n\nPlease continue where you left off.`
40-
);
41+
`Your original task was:\n\n${truncatedTask}\n\n`;
42+
43+
if (lastHumanMessage) {
44+
message += `The last message from the user before the interruption was:\n\n${lastHumanMessage}\n\n`;
45+
}
46+
47+
message += `Please continue where you left off.`;
48+
return message;
49+
}
50+
51+
function extractMessageText(msg: unknown): string | undefined {
52+
if (!msg || typeof msg !== "object") {
53+
return undefined;
54+
}
55+
const m = msg as Record<string, unknown>;
56+
if (typeof m.content === "string") {
57+
return m.content;
58+
}
59+
if (Array.isArray(m.content)) {
60+
const text = m.content
61+
.filter(
62+
(c: unknown) =>
63+
typeof c === "object" &&
64+
c !== null &&
65+
(c as Record<string, unknown>).type === "text" &&
66+
typeof (c as Record<string, unknown>).text === "string",
67+
)
68+
.map((c: unknown) => (c as Record<string, string>).text)
69+
.filter(Boolean)
70+
.join("\n");
71+
return text || undefined;
72+
}
73+
return undefined;
4174
}
4275

4376
/**
@@ -46,11 +79,17 @@ function buildResumeMessage(task: string): string {
4679
async function resumeOrphanedSession(params: {
4780
sessionKey: string;
4881
task: string;
82+
lastHumanMessage?: string;
83+
configChangeHint?: string;
84+
originalRunId: string;
4985
}): Promise<boolean> {
50-
const resumeMessage = buildResumeMessage(params.task);
86+
let resumeMessage = buildResumeMessage(params.task, params.lastHumanMessage);
87+
if (params.configChangeHint) {
88+
resumeMessage += params.configChangeHint;
89+
}
5190

5291
try {
53-
await callGateway<{ runId: string }>({
92+
const result = await callGateway<{ runId: string }>({
5493
method: "agent",
5594
params: {
5695
message: resumeMessage,
@@ -61,6 +100,10 @@ async function resumeOrphanedSession(params: {
61100
},
62101
timeoutMs: 10_000,
63102
});
103+
replaceSubagentRunAfterSteer({
104+
previousRunId: params.originalRunId,
105+
nextRunId: result.runId,
106+
});
64107
log.info(`resumed orphaned session: ${params.sessionKey}`);
65108
return true;
66109
} catch (err) {
@@ -84,6 +127,8 @@ export async function recoverOrphanedSubagentSessions(params: {
84127
getActiveRuns: () => Map<string, SubagentRunRecord>;
85128
}): Promise<{ recovered: number; failed: number; skipped: number }> {
86129
const result = { recovered: 0, failed: 0, skipped: 0 };
130+
const resumedSessionKeys = new Set<string>();
131+
const configChangePattern = /openclaw\.json|openclaw gateway restart|config\.patch/i;
87132

88133
try {
89134
const activeRuns = params.getActiveRuns();
@@ -104,6 +149,10 @@ export async function recoverOrphanedSubagentSessions(params: {
104149
if (!childSessionKey) {
105150
continue;
106151
}
152+
if (resumedSessionKeys.has(childSessionKey)) {
153+
result.skipped++;
154+
continue;
155+
}
107156

108157
try {
109158
const agentId = resolveAgentIdFromSessionKey(childSessionKey);
@@ -129,25 +178,49 @@ export async function recoverOrphanedSubagentSessions(params: {
129178

130179
log.info(`found orphaned subagent session: ${childSessionKey} (run=${runId})`);
131180

181+
const messages = readSessionMessages(entry.sessionId, storePath, entry.sessionFile);
182+
const lastHumanMessage = [...messages]
183+
.toReversed()
184+
.find((msg) => (msg as { role?: unknown } | null)?.role === "user");
185+
const configChangeDetected = messages.some((msg) => {
186+
if ((msg as { role?: unknown } | null)?.role !== "assistant") {
187+
return false;
188+
}
189+
const text = extractMessageText(msg);
190+
return typeof text === "string" && configChangePattern.test(text);
191+
});
192+
132193
// Resume the session with the original task context.
133194
// We intentionally do NOT clear abortedLastRun before attempting
134195
// the resume — if callGateway fails (e.g. gateway still booting),
135196
// the flag stays true so the next restart can retry.
136197
const resumed = await resumeOrphanedSession({
137198
sessionKey: childSessionKey,
138199
task: runRecord.task,
200+
lastHumanMessage: extractMessageText(lastHumanMessage),
201+
configChangeHint: configChangeDetected
202+
? "\n\n[config changes from your previous run were already applied — do not re-modify openclaw.json or restart the gateway]"
203+
: undefined,
204+
originalRunId: runId,
139205
});
140206

141207
if (resumed) {
208+
resumedSessionKeys.add(childSessionKey);
142209
// Only clear the aborted flag after confirmed successful resume.
143-
await updateSessionStore(storePath, (currentStore) => {
144-
const current = currentStore[childSessionKey];
145-
if (current) {
146-
current.abortedLastRun = false;
147-
current.updatedAt = Date.now();
148-
currentStore[childSessionKey] = current;
149-
}
150-
});
210+
try {
211+
await updateSessionStore(storePath, (currentStore) => {
212+
const current = currentStore[childSessionKey];
213+
if (current) {
214+
current.abortedLastRun = false;
215+
current.updatedAt = Date.now();
216+
currentStore[childSessionKey] = current;
217+
}
218+
});
219+
} catch (err) {
220+
log.warn(
221+
`resume succeeded but failed to update session store for ${childSessionKey}: ${String(err)}`,
222+
);
223+
}
151224
result.recovered++;
152225
} else {
153226
// Flag stays as abortedLastRun=true so next restart can retry

src/cli/daemon-cli/lifecycle.ts

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -50,8 +50,12 @@ function resolveGatewayPortFallback(): Promise<number> {
5050
}
5151

5252
async function assertUnmanagedGatewayRestartEnabled(port: number): Promise<void> {
53+
const cfg = await readBestEffortConfig().catch(() => undefined);
54+
const tlsEnabled = !!(cfg as { gateway?: { tls?: { enabled?: unknown } } } | undefined)?.gateway
55+
?.tls?.enabled;
56+
const scheme = tlsEnabled ? "wss" : "ws";
5357
const probe = await probeGateway({
54-
url: `ws://127.0.0.1:${port}`,
58+
url: `${scheme}://127.0.0.1:${port}`,
5559
auth: {
5660
token: process.env.OPENCLAW_GATEWAY_TOKEN?.trim() || undefined,
5761
password: process.env.OPENCLAW_GATEWAY_PASSWORD?.trim() || undefined,

0 commit comments

Comments
 (0)