Skip to content

Commit 55efca7

Browse files
committed
feat(recovery): improve crash recovery handling with auto mode support
- Replace 'workflow:skip' with 'workflow:stop' for proper workflow termination - Add sendRecoveryPrompt callback to centralize recovery logic - Sync paused state between machine context and mode for consistency - Enhance queue restoration to always attempt loading from agent config - Implement separate recovery flows for auto and manual modes
1 parent 605dc97 commit 55efca7

6 files changed

Lines changed: 114 additions & 39 deletions

File tree

src/agents/monitoring/cleanup.ts

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -142,8 +142,8 @@ export class MonitoringCleanup {
142142
// Suppress all error/warn logs during graceful shutdown
143143
setShuttingDown(true);
144144

145-
// Emit workflow:skip to abort the currently running step
146-
(process as NodeJS.EventEmitter).emit('workflow:skip');
145+
// Emit workflow:stop to stop the workflow (not skip to next step)
146+
(process as NodeJS.EventEmitter).emit('workflow:stop');
147147

148148
// Save session state for active agents before cleanup (for resume on restart)
149149
if (this.workflowHandlers.onBeforeCleanup) {

src/workflows/recovery/index.ts

Lines changed: 58 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -30,6 +30,23 @@ export type {
3030
CrashRestoreResult,
3131
} from './types.js';
3232

33+
/**
34+
* Options for sending recovery prompt
35+
*/
36+
export interface SendRecoveryPromptOptions {
37+
/** Prompt to send */
38+
resumePrompt: string;
39+
/** Monitoring ID for the session */
40+
resumeMonitoringId?: number;
41+
/** Source of the prompt */
42+
source: 'controller';
43+
}
44+
45+
/**
46+
* Callback to send recovery prompt to agent
47+
*/
48+
export type SendRecoveryPromptFn = (options: SendRecoveryPromptOptions) => Promise<void>;
49+
3350
/**
3451
* Options for handleCrashRecovery
3552
*/
@@ -54,6 +71,8 @@ export interface HandleCrashRecoveryOptions {
5471
indexManager: StepIndexManager;
5572
/** Current session (optional) */
5673
session?: StepSession | null;
74+
/** Callback to send recovery prompt (required for auto mode recovery) */
75+
sendRecoveryPrompt?: SendRecoveryPromptFn;
5776
}
5877

5978
/**
@@ -119,13 +138,46 @@ export async function handleCrashRecovery(
119138

120139
const restoration = await restoreFromCrash(restoreCtx);
121140

122-
// 3. Transition state machine to awaiting
123-
machine.send({
124-
type: 'STEP_COMPLETE',
125-
output: { output: '', monitoringId: stepData?.monitoringId },
126-
});
141+
// 3. Handle recovery based on mode
142+
const isAutoMode = machine.context.autoMode;
143+
const recoveryPrompt = 'Continue where you left off. Review what was accomplished and proceed with the next logical step.';
144+
145+
if (isAutoMode) {
146+
// Auto mode: Send recovery prompt directly before transitioning
147+
// This centralizes all recovery logic here instead of scattering to wait.ts/delegated.ts
148+
if (!options.sendRecoveryPrompt) {
149+
throw new Error('[recovery] Auto mode crash recovery requires sendRecoveryPrompt callback');
150+
}
127151

128-
debug('[recovery] Crash recovery complete, transitioning to awaiting state');
152+
debug('[recovery] Auto mode: sending recovery prompt to agent');
153+
emitter.updateAgentStatus(uniqueAgentId, 'running');
154+
155+
// Transition to running state before sending prompt
156+
machine.send({ type: 'RESUME' });
157+
158+
// Send recovery prompt and wait for agent response
159+
await options.sendRecoveryPrompt({
160+
resumePrompt: recoveryPrompt,
161+
resumeMonitoringId: stepData?.monitoringId,
162+
source: 'controller',
163+
});
164+
165+
debug('[recovery] Recovery prompt sent, agent responded');
166+
167+
// After agent responds, the state machine will have transitioned to awaiting/delegated
168+
// The normal flow will continue from there (chained prompts, etc.)
169+
debug('[recovery] Crash recovery complete (auto mode)');
170+
} else {
171+
// Manual mode: Pause and wait for user input
172+
machine.context.paused = true;
173+
174+
machine.send({
175+
type: 'STEP_COMPLETE',
176+
output: { output: '', monitoringId: stepData?.monitoringId },
177+
});
178+
179+
debug('[recovery] Crash recovery complete, transitioning to awaiting state (manual mode, paused)');
180+
}
129181

130182
return { handled: true, detection, restoration };
131183
}

src/workflows/recovery/restore.ts

Lines changed: 29 additions & 27 deletions
Original file line numberDiff line numberDiff line change
@@ -58,42 +58,44 @@ export async function restoreFromCrash(ctx: CrashRestoreContext): Promise<CrashR
5858
monitoringId: stepData.monitoringId,
5959
};
6060

61-
// 4. Restore queue from completedChains
61+
// 4. Restore queue from agent config (always try, not just when completedChains exists)
6262
let queueRestored = false;
6363
let promptCount = 0;
6464
let resumeIndex = 0;
6565

66-
if (stepData.completedChains && stepData.completedChains.length > 0) {
67-
const agentConfig = await loadAgentConfig(step.agentId, cwd);
66+
// Always try to load chained prompts from agent config
67+
const agentConfig = await loadAgentConfig(step.agentId, cwd);
6868

69-
if (agentConfig?.chainedPromptsPath) {
70-
const selectedConditions = await getSelectedConditions(cmRoot);
71-
const chainedPrompts = await loadChainedPrompts(
72-
agentConfig.chainedPromptsPath,
73-
cwd,
74-
selectedConditions
75-
);
76-
77-
if (chainedPrompts.length > 0) {
78-
// Use existing helper to calculate resume index
79-
resumeIndex = getNextChainIndex(stepData);
80-
promptCount = chainedPrompts.length;
69+
if (agentConfig?.chainedPromptsPath) {
70+
const selectedConditions = await getSelectedConditions(cmRoot);
71+
const chainedPrompts = await loadChainedPrompts(
72+
agentConfig.chainedPromptsPath,
73+
cwd,
74+
selectedConditions
75+
);
8176

82-
debug(
83-
'[recovery/restore] Restoring queue: %d prompts, resuming at index %d',
84-
promptCount,
85-
resumeIndex
86-
);
77+
if (chainedPrompts.length > 0) {
78+
// Use completedChains to determine resume index (or 0 if none completed yet)
79+
resumeIndex = stepData.completedChains && stepData.completedChains.length > 0
80+
? getNextChainIndex(stepData)
81+
: 0;
82+
promptCount = chainedPrompts.length;
8783

88-
// Use session if available, otherwise fall back to indexManager directly
89-
if (session) {
90-
session.initializeFromPersisted(chainedPrompts, resumeIndex);
91-
} else {
92-
indexManager.initQueue(chainedPrompts, resumeIndex);
93-
}
84+
debug(
85+
'[recovery/restore] Restoring queue: %d prompts, resuming at index %d (completedChains=%d)',
86+
promptCount,
87+
resumeIndex,
88+
stepData.completedChains?.length ?? 0
89+
);
9490

95-
queueRestored = true;
91+
// Use session if available, otherwise fall back to indexManager directly
92+
if (session) {
93+
session.initializeFromPersisted(chainedPrompts, resumeIndex);
94+
} else {
95+
indexManager.initQueue(chainedPrompts, resumeIndex);
9696
}
97+
98+
queueRestored = true;
9799
}
98100
}
99101

src/workflows/runner/delegated.ts

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,12 @@ export interface DelegatedCallbacks {
2727
export async function handleDelegated(ctx: RunnerContext, callbacks: DelegatedCallbacks): Promise<void> {
2828
const machineCtx = ctx.machine.context;
2929

30+
// Sync paused state from machineCtx to mode (for crash recovery)
31+
if (machineCtx.paused && !ctx.mode.paused) {
32+
debug('[Runner:delegated] Syncing paused state from machineCtx to mode (recovery)');
33+
ctx.mode.pause();
34+
}
35+
3036
debug('[Runner:delegated] Handling delegated state, promptQueue=%d items, queueIndex=%d, autoMode=%s',
3137
ctx.indexManager.promptQueue.length, ctx.indexManager.promptQueueIndex, ctx.mode.autoMode);
3238

src/workflows/runner/wait.ts

Lines changed: 11 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -31,19 +31,26 @@ export interface WaitCallbacks {
3131
export async function handleWaiting(ctx: RunnerContext, callbacks: WaitCallbacks): Promise<void> {
3232
const machineCtx = ctx.machine.context;
3333

34+
// Sync paused state from machineCtx to mode (for crash recovery)
35+
// Recovery sets machineCtx.paused directly, mode needs to be synced
36+
if (machineCtx.paused && !ctx.mode.paused) {
37+
debug('[Runner] Syncing paused state from machineCtx to mode (recovery)');
38+
ctx.mode.pause();
39+
}
40+
3441
debug('[Runner] Handling waiting state, autoMode=%s, paused=%s, promptQueue=%d items, queueIndex=%d',
3542
ctx.mode.autoMode, ctx.mode.paused, ctx.indexManager.promptQueue.length, ctx.indexManager.promptQueueIndex);
3643

44+
// Get current step info
45+
const step = ctx.moduleSteps[machineCtx.currentStepIndex];
46+
const stepUniqueAgentId = getUniqueAgentId(step, machineCtx.currentStepIndex);
47+
3748
// Get queue state from session (uses indexManager as single source of truth)
3849
const session = ctx.getCurrentSession();
3950
const hasChainedPrompts = session
4051
? !session.isQueueExhausted
4152
: !ctx.indexManager.isQueueExhausted();
4253

43-
// Get current step and resolve interactive behavior
44-
const step = ctx.moduleSteps[machineCtx.currentStepIndex];
45-
const stepUniqueAgentId = getUniqueAgentId(step, machineCtx.currentStepIndex);
46-
4754
// Resolve interactive behavior using single source of truth
4855
const behavior = resolveInteractiveBehavior({
4956
step,

src/workflows/step/run.ts

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -74,6 +74,14 @@ export async function runStepFresh(ctx: RunnerContext): Promise<RunStepResult |
7474
machine: ctx.machine,
7575
indexManager: ctx.indexManager,
7676
session: ctx.getCurrentSession(),
77+
// Callback to send recovery prompt - centralizes all recovery logic in recovery module
78+
sendRecoveryPrompt: async (options) => {
79+
await runStepResume(ctx, {
80+
resumePrompt: options.resumePrompt,
81+
resumeMonitoringId: options.resumeMonitoringId,
82+
source: options.source,
83+
});
84+
},
7785
});
7886

7987
if (recoveryResult.handled) {

0 commit comments

Comments
 (0)