Skip to content

Commit 7517b7c

Browse files
committed
feat(monitoring): add onBeforeCleanup handler to save session state
Add new workflow handler to persist session and monitoring IDs before cleanup when workflow is interrupted. This enables resuming workflows even if interrupted before first step completion.
1 parent 0a9114c commit 7517b7c

2 files changed

Lines changed: 44 additions & 7 deletions

File tree

src/agents/monitoring/cleanup.ts

Lines changed: 17 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -18,13 +18,19 @@ export class MonitoringCleanup {
1818
private static workflowHandlers: {
1919
onStop?: () => void;
2020
onExit?: () => void;
21+
onBeforeCleanup?: () => Promise<void>;
2122
} = {};
2223

2324
/**
2425
* Register callbacks invoked during the two-stage Ctrl+C flow.
26+
* Merges with existing handlers (new handlers override existing ones for the same key).
2527
*/
26-
static registerWorkflowHandlers(handlers: { onStop?: () => void; onExit?: () => void }): void {
27-
this.workflowHandlers = handlers;
28+
static registerWorkflowHandlers(handlers: {
29+
onStop?: () => void;
30+
onExit?: () => void;
31+
onBeforeCleanup?: () => Promise<void>;
32+
}): void {
33+
this.workflowHandlers = { ...this.workflowHandlers, ...handlers };
2834
}
2935

3036
static clearWorkflowHandlers(): void {
@@ -139,6 +145,15 @@ export class MonitoringCleanup {
139145
// Emit workflow:skip to abort the currently running step
140146
(process as NodeJS.EventEmitter).emit('workflow:skip');
141147

148+
// Save session state for active agents before cleanup (for resume on restart)
149+
if (this.workflowHandlers.onBeforeCleanup) {
150+
try {
151+
await this.workflowHandlers.onBeforeCleanup();
152+
} catch (error) {
153+
logger.debug('onBeforeCleanup failed:', error);
154+
}
155+
}
156+
142157
// Stop active agents
143158
await this.stopActiveAgents();
144159

src/workflows/run.ts

Lines changed: 27 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,7 @@ import {
1919
} from '../shared/workflows/index.js';
2020
import { StepIndexManager } from './indexing/index.js';
2121
import { registry } from '../infra/engines/index.js';
22-
import { MonitoringCleanup } from '../agents/monitoring/index.js';
22+
import { MonitoringCleanup, AgentMonitorService } from '../agents/monitoring/index.js';
2323
import { WorkflowEventBus, WorkflowEventEmitter } from './events/index.js';
2424
import { validateSpecification } from '../runtime/services/index.js';
2525
import { WorkflowRunner } from './runner/index.js';
@@ -54,8 +54,33 @@ export async function runWorkflow(options: RunWorkflowOptions = {}): Promise<voi
5454
// Set up cleanup handlers
5555
MonitoringCleanup.setup();
5656

57-
// Load template
57+
// Load template (needed before we can set up the before-cleanup handler)
5858
const cmRoot = path.join(cwd, '.codemachine');
59+
const indexManager = new StepIndexManager(cmRoot);
60+
61+
// Register callback to save session state before cleanup on Ctrl+C
62+
// This ensures session/monitoring IDs are persisted even if the first turn hasn't completed
63+
MonitoringCleanup.registerWorkflowHandlers({
64+
onBeforeCleanup: async () => {
65+
const monitor = AgentMonitorService.getInstance();
66+
const activeAgents = monitor.getActiveAgents();
67+
68+
// Find root agents (no parentId) - these are the main step agents
69+
const rootAgents = activeAgents.filter((agent) => !agent.parentId);
70+
71+
for (const agent of rootAgents) {
72+
// Only save if agent has a sessionId (needed for resume)
73+
if (agent.sessionId) {
74+
const stepIndex = indexManager.currentStepIndex;
75+
debug('[Workflow] Saving session state on Ctrl+C: step=%d, sessionId=%s, monitoringId=%d',
76+
stepIndex, agent.sessionId, agent.id);
77+
await indexManager.stepSessionInitialized(stepIndex, agent.sessionId, agent.id);
78+
}
79+
}
80+
},
81+
});
82+
83+
// Load template
5984
const templatePath = options.templatePath || (await getTemplatePathFromTracking(cmRoot));
6085
const { template } = await loadTemplateWithPath(cwd, templatePath);
6186

@@ -108,9 +133,6 @@ export async function runWorkflow(options: RunWorkflowOptions = {}): Promise<voi
108133
globalThis.__workflowEventBus = eventBus;
109134
}
110135

111-
// Create step index manager
112-
const indexManager = new StepIndexManager(cmRoot);
113-
114136
// Get resume info
115137
const resumeInfo = await indexManager.getResumeInfo();
116138
const startIndex = resumeInfo.startIndex;

0 commit comments

Comments
 (0)