Skip to content

Commit 605dc97

Browse files
committed
feat(workflows): add crash recovery module for workflow steps
Implement crash recovery detection and restoration logic for workflow steps. The new module centralizes recovery operations including: - Detecting crash recovery scenarios - Restoring monitoring IDs and agent status - Rebuilding prompt queues from persisted data - Transitioning state machine to awaiting state Removed obsolete resume.ts as its functionality is now handled by the recovery module.
1 parent 7517b7c commit 605dc97

9 files changed

Lines changed: 367 additions & 231 deletions

File tree

src/config/index.ts

Lines changed: 0 additions & 1 deletion
This file was deleted.

src/config/schema/environment.schema.ts

Lines changed: 0 additions & 7 deletions
This file was deleted.

src/workflows/recovery/detect.ts

Lines changed: 42 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,42 @@
1+
/**
2+
* Crash Recovery Detection
3+
*
4+
* Determines if a step should resume from a previous crash.
5+
* Uses existing lifecycle helpers for consistency.
6+
*/
7+
8+
import { isStepResumable } from '../indexing/lifecycle.js';
9+
import type { StepData } from '../indexing/types.js';
10+
import type { CrashDetectionResult } from './types.js';
11+
12+
/**
13+
* Detect if a step needs crash recovery
14+
*
15+
* A step needs recovery if:
16+
* 1. It has a sessionId (agent conversation was started)
17+
* 2. It does NOT have completedAt (step didn't finish)
18+
*
19+
* @param stepData - Step data from persistence (may be null)
20+
* @returns Detection result with recovery info
21+
*/
22+
export function detectCrashRecovery(stepData: StepData | null): CrashDetectionResult {
23+
// Reuse existing helper from lifecycle.ts
24+
if (!stepData || !isStepResumable(stepData)) {
25+
return { isRecovering: false };
26+
}
27+
28+
return {
29+
isRecovering: true,
30+
sessionId: stepData.sessionId,
31+
monitoringId: stepData.monitoringId,
32+
completedChains: stepData.completedChains,
33+
};
34+
}
35+
36+
/**
37+
* Check if step data indicates a crash recovery scenario
38+
* (Simplified version for guards/conditions)
39+
*/
40+
export function isCrashRecovery(stepData: StepData | null): boolean {
41+
return isStepResumable(stepData);
42+
}

src/workflows/recovery/index.ts

Lines changed: 131 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,131 @@
1+
/**
2+
* Crash Recovery Module
3+
*
4+
* Handles detection and restoration of workflow steps after crashes.
5+
* Centralizes all crash recovery logic for maintainability.
6+
*/
7+
8+
import { debug } from '../../shared/logging/logger.js';
9+
import type { StepData } from '../indexing/types.js';
10+
import type { WorkflowEventEmitter } from '../events/index.js';
11+
import type { StepIndexManager } from '../indexing/index.js';
12+
import type { StateMachine } from '../state/types.js';
13+
import type { ModuleStep } from '../templates/types.js';
14+
import type { StepSession } from '../session/index.js';
15+
16+
import { detectCrashRecovery } from './detect.js';
17+
import { restoreFromCrash } from './restore.js';
18+
import type {
19+
CrashDetectionResult,
20+
CrashRestoreContext,
21+
CrashRestoreResult,
22+
} from './types.js';
23+
24+
// Re-exports
25+
export { detectCrashRecovery, isCrashRecovery } from './detect.js';
26+
export { restoreFromCrash } from './restore.js';
27+
export type {
28+
CrashDetectionResult,
29+
CrashRestoreContext,
30+
CrashRestoreResult,
31+
} from './types.js';
32+
33+
/**
34+
* Options for handleCrashRecovery
35+
*/
36+
export interface HandleCrashRecoveryOptions {
37+
/** Step data from persistence */
38+
stepData: StepData | null;
39+
/** Current step definition */
40+
step: ModuleStep;
41+
/** Step index */
42+
stepIndex: number;
43+
/** Unique agent ID */
44+
uniqueAgentId: string;
45+
/** Working directory */
46+
cwd: string;
47+
/** .codemachine root */
48+
cmRoot: string;
49+
/** Event emitter */
50+
emitter: WorkflowEventEmitter;
51+
/** State machine */
52+
machine: StateMachine;
53+
/** Index manager */
54+
indexManager: StepIndexManager;
55+
/** Current session (optional) */
56+
session?: StepSession | null;
57+
}
58+
59+
/**
60+
* Result from handleCrashRecovery
61+
*/
62+
export interface HandleCrashRecoveryResult {
63+
/** Whether crash recovery was handled */
64+
handled: boolean;
65+
/** Detection result */
66+
detection: CrashDetectionResult;
67+
/** Restoration result (if handled) */
68+
restoration?: CrashRestoreResult;
69+
}
70+
71+
/**
72+
* Main entry point for crash recovery
73+
*
74+
* Detects if a step needs crash recovery and performs restoration if needed.
75+
* Returns whether recovery was handled - caller should skip normal execution if true.
76+
*
77+
* @param options - All context needed for recovery
78+
* @returns Result indicating if recovery was handled
79+
*/
80+
export async function handleCrashRecovery(
81+
options: HandleCrashRecoveryOptions
82+
): Promise<HandleCrashRecoveryResult> {
83+
const {
84+
stepData,
85+
step,
86+
stepIndex,
87+
uniqueAgentId,
88+
cwd,
89+
cmRoot,
90+
emitter,
91+
machine,
92+
indexManager,
93+
session,
94+
} = options;
95+
96+
// 1. Detect if crash recovery is needed
97+
const detection = detectCrashRecovery(stepData);
98+
99+
if (!detection.isRecovering) {
100+
debug('[recovery] No crash recovery needed for step %d', stepIndex);
101+
return { handled: false, detection };
102+
}
103+
104+
debug('[recovery] Crash recovery detected for step %d', stepIndex);
105+
106+
// 2. Perform restoration
107+
const restoreCtx: CrashRestoreContext = {
108+
stepData: stepData!, // Safe - detection.isRecovering implies stepData exists
109+
step,
110+
stepIndex,
111+
uniqueAgentId,
112+
cwd,
113+
cmRoot,
114+
emitter,
115+
machineContext: machine.context,
116+
indexManager,
117+
session,
118+
};
119+
120+
const restoration = await restoreFromCrash(restoreCtx);
121+
122+
// 3. Transition state machine to awaiting
123+
machine.send({
124+
type: 'STEP_COMPLETE',
125+
output: { output: '', monitoringId: stepData?.monitoringId },
126+
});
127+
128+
debug('[recovery] Crash recovery complete, transitioning to awaiting state');
129+
130+
return { handled: true, detection, restoration };
131+
}

src/workflows/recovery/restore.ts

Lines changed: 113 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,113 @@
1+
/**
2+
* Crash Recovery Restoration
3+
*
4+
* Restores step state after a crash, including:
5+
* - Registering monitoring ID with emitter
6+
* - Updating agent status
7+
* - Restoring machine context
8+
* - Restoring prompt queue from completedChains
9+
*/
10+
11+
import { debug } from '../../shared/logging/logger.js';
12+
import { getNextChainIndex } from '../indexing/lifecycle.js';
13+
import { loadAgentConfig } from '../../agents/runner/index.js';
14+
import { loadChainedPrompts } from '../../agents/runner/chained.js';
15+
import { getSelectedConditions } from '../../shared/workflows/template.js';
16+
import type { CrashRestoreContext, CrashRestoreResult } from './types.js';
17+
18+
/**
19+
* Restore step state after crash recovery
20+
*
21+
* Performs all necessary restoration:
22+
* 1. Register monitoring ID with emitter (for log panel)
23+
* 2. Update agent status to 'awaiting'
24+
* 3. Set machine context (currentMonitoringId, currentOutput)
25+
* 4. Restore prompt queue from completedChains
26+
*
27+
* @param ctx - Restoration context with all dependencies
28+
* @returns Restoration result
29+
*/
30+
export async function restoreFromCrash(ctx: CrashRestoreContext): Promise<CrashRestoreResult> {
31+
const {
32+
stepData,
33+
step,
34+
stepIndex,
35+
uniqueAgentId,
36+
cwd,
37+
cmRoot,
38+
emitter,
39+
machineContext,
40+
indexManager,
41+
session,
42+
} = ctx;
43+
44+
debug('[recovery/restore] Restoring step %d from crash', stepIndex);
45+
46+
// 1. Register monitoring ID with emitter (for log panel connection)
47+
if (stepData.monitoringId !== undefined) {
48+
emitter.registerMonitoringId(uniqueAgentId, stepData.monitoringId);
49+
}
50+
51+
// 2. Update agent status to awaiting
52+
emitter.updateAgentStatus(uniqueAgentId, 'awaiting');
53+
54+
// 3. Set machine context
55+
machineContext.currentMonitoringId = stepData.monitoringId;
56+
machineContext.currentOutput = {
57+
output: '',
58+
monitoringId: stepData.monitoringId,
59+
};
60+
61+
// 4. Restore queue from completedChains
62+
let queueRestored = false;
63+
let promptCount = 0;
64+
let resumeIndex = 0;
65+
66+
if (stepData.completedChains && stepData.completedChains.length > 0) {
67+
const agentConfig = await loadAgentConfig(step.agentId, cwd);
68+
69+
if (agentConfig?.chainedPromptsPath) {
70+
const selectedConditions = await getSelectedConditions(cmRoot);
71+
const chainedPrompts = await loadChainedPrompts(
72+
agentConfig.chainedPromptsPath,
73+
cwd,
74+
selectedConditions
75+
);
76+
77+
if (chainedPrompts.length > 0) {
78+
// Use existing helper to calculate resume index
79+
resumeIndex = getNextChainIndex(stepData);
80+
promptCount = chainedPrompts.length;
81+
82+
debug(
83+
'[recovery/restore] Restoring queue: %d prompts, resuming at index %d',
84+
promptCount,
85+
resumeIndex
86+
);
87+
88+
// Use session if available, otherwise fall back to indexManager directly
89+
if (session) {
90+
session.initializeFromPersisted(chainedPrompts, resumeIndex);
91+
} else {
92+
indexManager.initQueue(chainedPrompts, resumeIndex);
93+
}
94+
95+
queueRestored = true;
96+
}
97+
}
98+
}
99+
100+
debug(
101+
'[recovery/restore] Restoration complete: queueRestored=%s, promptCount=%d, resumeIndex=%d',
102+
queueRestored,
103+
promptCount,
104+
resumeIndex
105+
);
106+
107+
return {
108+
success: true,
109+
queueRestored,
110+
promptCount,
111+
resumeIndex,
112+
};
113+
}

src/workflows/recovery/types.ts

Lines changed: 66 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,66 @@
1+
/**
2+
* Crash Recovery Types
3+
*
4+
* Types specific to crash recovery operations.
5+
*/
6+
7+
import type { StepData } from '../indexing/types.js';
8+
import type { WorkflowEventEmitter } from '../events/index.js';
9+
import type { StepIndexManager } from '../indexing/index.js';
10+
import type { WorkflowContext } from '../state/types.js';
11+
import type { ModuleStep } from '../templates/types.js';
12+
import type { StepSession } from '../session/index.js';
13+
14+
/**
15+
* Result of crash recovery detection
16+
*/
17+
export interface CrashDetectionResult {
18+
/** Whether this step is resuming from a crash */
19+
isRecovering: boolean;
20+
/** Session ID to resume (if recovering) */
21+
sessionId?: string;
22+
/** Monitoring ID to resume (if recovering) */
23+
monitoringId?: number;
24+
/** Completed chain indices (if any) */
25+
completedChains?: number[];
26+
}
27+
28+
/**
29+
* Context needed for crash recovery restoration
30+
*/
31+
export interface CrashRestoreContext {
32+
/** Step data containing session info */
33+
stepData: StepData;
34+
/** Current step definition */
35+
step: ModuleStep;
36+
/** Step index */
37+
stepIndex: number;
38+
/** Unique agent ID for this step */
39+
uniqueAgentId: string;
40+
/** Working directory */
41+
cwd: string;
42+
/** .codemachine root directory */
43+
cmRoot: string;
44+
/** Event emitter for UI updates */
45+
emitter: WorkflowEventEmitter;
46+
/** State machine context */
47+
machineContext: WorkflowContext;
48+
/** Index manager for queue operations */
49+
indexManager: StepIndexManager;
50+
/** Current step session (optional) */
51+
session?: StepSession | null;
52+
}
53+
54+
/**
55+
* Result of crash recovery restoration
56+
*/
57+
export interface CrashRestoreResult {
58+
/** Whether restoration was successful */
59+
success: boolean;
60+
/** Queue was restored with chained prompts */
61+
queueRestored: boolean;
62+
/** Number of prompts in restored queue */
63+
promptCount: number;
64+
/** Resume index in the queue */
65+
resumeIndex: number;
66+
}

src/workflows/runner/index.ts

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -38,7 +38,6 @@ import { handleDelegated } from './delegated.js';
3838
export type { WorkflowRunnerOptions, RunnerContext } from './types.js';
3939
export { handleWaiting } from './wait.js';
4040
export { handleDelegated } from './delegated.js';
41-
export * from './resume.js';
4241

4342
/**
4443
* Workflow runner class

0 commit comments

Comments
 (0)