Skip to content

Commit 738b42f

Browse files
committed
fix(executor): stop HITL error edges from firing on successful resume
1 parent 3ebb9a5 commit 738b42f

4 files changed

Lines changed: 305 additions & 28 deletions

File tree

apps/sim/executor/execution/edge-manager.test.ts

Lines changed: 111 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -875,6 +875,117 @@ describe('EdgeManager', () => {
875875
})
876876
})
877877

878+
describe('deactivateResumedEdge', () => {
879+
it('prunes a resumed pause block error edge without firing the error target', () => {
880+
// Models the HITL-resume bug: a pause block and a regular block both feed
881+
// an error-notifier via `error` handles. On a fully successful run the
882+
// notifier must never run.
883+
const pauseId = 'pause-block'
884+
const regularId = 'regular-block'
885+
const notifyId = 'error-notify'
886+
887+
const pauseNode = createMockNode(
888+
pauseId,
889+
[
890+
{ target: 'next', sourceHandle: EDGE.SOURCE },
891+
{ target: notifyId, sourceHandle: EDGE.ERROR },
892+
],
893+
[]
894+
)
895+
const regularNode = createMockNode(regularId, [
896+
{ target: notifyId, sourceHandle: EDGE.ERROR },
897+
])
898+
const notifyNode = createMockNode(notifyId, [], [pauseId, regularId])
899+
900+
const dag = createMockDAG(
901+
new Map<string, DAGNode>([
902+
[pauseId, pauseNode],
903+
[regularId, regularNode],
904+
[notifyId, notifyNode],
905+
])
906+
)
907+
const edgeManager = new EdgeManager(dag)
908+
909+
// Resume releases the pause block's error edge as deactivated.
910+
edgeManager.deactivateResumedEdge(pauseId, notifyId, EDGE.ERROR)
911+
912+
expect(edgeManager.getDeactivatedEdges()).toContain(
913+
JSON.stringify([pauseId, notifyId, EDGE.ERROR])
914+
)
915+
expect(edgeManager.getNodesWithActivatedEdge()).not.toContain(notifyId)
916+
917+
// The regular block then completes successfully → its error edge deactivates too.
918+
const readyNodes = edgeManager.processOutgoingEdges(regularNode, { result: 'ok' })
919+
920+
// With no error edge ever activated, the notifier is never scheduled.
921+
expect(readyNodes).not.toContain(notifyId)
922+
expect(edgeManager.getNodesWithActivatedEdge()).not.toContain(notifyId)
923+
})
924+
925+
it('still fires the error target when a real upstream block errors', () => {
926+
// Same topology, but here the regular block genuinely errors — the notifier
927+
// must fire even though the pause block's error edge was pruned on resume.
928+
const pauseId = 'pause-block'
929+
const regularId = 'regular-block'
930+
const notifyId = 'error-notify'
931+
932+
const regularNode = createMockNode(regularId, [
933+
{ target: notifyId, sourceHandle: EDGE.ERROR },
934+
])
935+
const notifyNode = createMockNode(notifyId, [], [pauseId, regularId])
936+
937+
const dag = createMockDAG(
938+
new Map<string, DAGNode>([
939+
[regularId, regularNode],
940+
[notifyId, notifyNode],
941+
])
942+
)
943+
const edgeManager = new EdgeManager(dag)
944+
945+
edgeManager.deactivateResumedEdge(pauseId, notifyId, EDGE.ERROR)
946+
947+
const readyNodes = edgeManager.processOutgoingEdges(regularNode, { error: 'boom' })
948+
949+
expect(readyNodes).toContain(notifyId)
950+
expect(edgeManager.getNodesWithActivatedEdge()).toContain(notifyId)
951+
})
952+
953+
it('leaves a convergence join ready+activated after pruning a pause error edge', () => {
954+
// Join `C` is fed by `B.source` (succeeds) and `P.error` (pause block).
955+
// After B activates and P's error edge is pruned on resume, C must be both
956+
// activated and ready so the engine re-queues it.
957+
const sourceId = 'block-b'
958+
const pauseId = 'pause-block'
959+
const joinId = 'join-c'
960+
961+
const sourceNode = createMockNode(sourceId, [{ target: joinId, sourceHandle: EDGE.SOURCE }])
962+
const pauseNode = createMockNode(pauseId, [{ target: joinId, sourceHandle: EDGE.ERROR }])
963+
const joinNode = createMockNode(joinId, [], [sourceId, pauseId])
964+
965+
const edgeManager = new EdgeManager(
966+
createMockDAG(
967+
new Map<string, DAGNode>([
968+
[sourceId, sourceNode],
969+
[pauseId, pauseNode],
970+
[joinId, joinNode],
971+
])
972+
)
973+
)
974+
975+
// Phase 1: B succeeds → activates B→C, but C still waits on the pause edge.
976+
const readyAfterB = edgeManager.processOutgoingEdges(sourceNode, { result: 'ok' })
977+
expect(readyAfterB).not.toContain(joinId)
978+
expect(edgeManager.hasActivatedEdge(joinId)).toBe(true)
979+
expect(edgeManager.isNodeReady(joinNode)).toBe(false)
980+
981+
// Resume: pause block's error edge is pruned → C is now ready (and stays
982+
// activated), which is exactly the state the engine uses to re-queue it.
983+
edgeManager.deactivateResumedEdge(pauseId, joinId, EDGE.ERROR)
984+
expect(edgeManager.hasActivatedEdge(joinId)).toBe(true)
985+
expect(edgeManager.isNodeReady(joinNode)).toBe(true)
986+
})
987+
})
988+
878989
describe('Diamond pattern (convergent paths)', () => {
879990
it('should handle diamond: condition splits then converges at merge point', () => {
880991
const conditionId = 'condition-1'

apps/sim/executor/execution/edge-manager.ts

Lines changed: 8 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -134,6 +134,10 @@ export class EdgeManager {
134134
return Array.from(this.nodesWithActivatedEdge)
135135
}
136136

137+
hasActivatedEdge(nodeId: string): boolean {
138+
return this.nodesWithActivatedEdge.has(nodeId)
139+
}
140+
137141
restoreDeactivatedEdges(edgeKeys?: string[], activatedNodeIds?: string[]): void {
138142
this.deactivatedEdges = new Set(
139143
(edgeKeys ?? []).map((edgeKey) => this.normalizeSerializedEdgeKey(edgeKey))
@@ -145,17 +149,10 @@ export class EdgeManager {
145149
this.nodesWithActivatedEdge.add(nodeId)
146150
}
147151

148-
/**
149-
* Clear deactivated edges for a set of nodes (used when restoring loop state for next iteration).
150-
*
151-
* Only clears edges whose SOURCE is in the provided set. Edges pointing INTO a node in the set
152-
* whose source lives outside (e.g. an external branch whose path was cascade-deactivated) must
153-
* remain deactivated — otherwise `countActiveIncomingEdges` would count a source that will never
154-
* fire again, stalling the loop on its next iteration.
155-
*
156-
* Deactivated edge keys encode the source separately so node IDs with shared prefixes
157-
* cannot clear each other's deactivated edges.
158-
*/
152+
deactivateResumedEdge(sourceId: string, targetId: string, sourceHandle?: string): void {
153+
this.deactivateEdgeAndDescendants(sourceId, targetId, sourceHandle)
154+
}
155+
159156
clearDeactivatedEdgesForNodes(nodeIds: Set<string>): void {
160157
const edgesToRemove: string[] = []
161158
for (const edgeKey of this.deactivatedEdges) {
@@ -182,11 +179,6 @@ export class EdgeManager {
182179
return targetNode ? this.isNodeReady(targetNode) : false
183180
}
184181

185-
/**
186-
* Checks if the cascade target sentinel belongs to the same subflow as the source node.
187-
* A condition inside a loop that hits a dead-end should still allow the enclosing
188-
* loop's sentinel to fire so the loop can continue or exit.
189-
*/
190182
private isEnclosingSentinel(sourceNode: DAGNode, sentinelId: string): boolean {
191183
const sentinel = this.dag.nodes.get(sentinelId)
192184
if (!sentinel?.metadata.isSentinel) return false
@@ -321,9 +313,6 @@ export class EdgeManager {
321313
}
322314
}
323315

324-
/**
325-
* Checks if a node has any active incoming edges besides the one being excluded.
326-
*/
327316
private hasActiveIncomingEdges(node: DAGNode, excludeEdgeKey: string): boolean {
328317
for (const incomingSourceId of node.incomingEdges) {
329318
const incomingNode = this.dag.nodes.get(incomingSourceId)

apps/sim/executor/execution/engine.test.ts

Lines changed: 132 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -112,6 +112,8 @@ function createMockEdgeManager(
112112
getDeactivatedEdges: vi.fn(() => []),
113113
getNodesWithActivatedEdge: vi.fn(() => []),
114114
markNodeWithActivatedEdge: vi.fn(),
115+
deactivateResumedEdge: vi.fn(),
116+
hasActivatedEdge: vi.fn(() => false),
115117
} as unknown as MockEdgeManager
116118
}
117119

@@ -231,6 +233,136 @@ describe('ExecutionEngine', () => {
231233
expect(nodeOrchestrator.executeNode).not.toHaveBeenCalled()
232234
})
233235

236+
it('deactivates a resumed pause block error edge instead of firing it', async () => {
237+
// Pause block has two outgoing edges: a `source` continuation edge and an
238+
// `error` edge to an error-notifier. On a successful resume the error edge
239+
// must be deactivated (never marked/queued); only the continuation fires.
240+
const pauseNode = createMockNode('pause-block', 'human_in_the_loop')
241+
pauseNode.outgoingEdges.set('pause-block→next-source', {
242+
target: 'next',
243+
sourceHandle: EDGE.SOURCE,
244+
})
245+
pauseNode.outgoingEdges.set('pause-block→notify-error', {
246+
target: 'error-notify',
247+
sourceHandle: EDGE.ERROR,
248+
})
249+
250+
const nextNode = createMockNode('next', 'function')
251+
nextNode.incomingEdges.add('pause-block')
252+
253+
const errorNotifyNode = createMockNode('error-notify', 'gmail')
254+
errorNotifyNode.incomingEdges.add('pause-block')
255+
256+
const dag = createMockDAG([pauseNode, nextNode, errorNotifyNode])
257+
const context = createMockContext({
258+
metadata: {
259+
executionId: 'test-execution',
260+
startTime: new Date().toISOString(),
261+
pendingBlocks: [],
262+
// remainingEdges omit sourceHandle (as persisted snapshots do), forcing
263+
// the engine to resolve the handle from the live DAG.
264+
remainingEdges: [
265+
{ source: 'pause-block', target: 'next' },
266+
{ source: 'pause-block', target: 'error-notify' },
267+
],
268+
} as any,
269+
})
270+
const edgeManager = createMockEdgeManager(() => [])
271+
const nodeOrchestrator = createMockNodeOrchestrator()
272+
273+
const engine = new ExecutionEngine(context, dag, edgeManager, nodeOrchestrator)
274+
await engine.run()
275+
276+
// Continuation edge fires; error edge is deactivated, not activated/queued.
277+
expect(edgeManager.markNodeWithActivatedEdge).toHaveBeenCalledWith('next')
278+
expect(edgeManager.markNodeWithActivatedEdge).not.toHaveBeenCalledWith('error-notify')
279+
expect(edgeManager.deactivateResumedEdge).toHaveBeenCalledWith(
280+
'pause-block',
281+
'error-notify',
282+
EDGE.ERROR
283+
)
284+
// A pure error-handler target (never activated) must not be executed.
285+
expect(nodeOrchestrator.executeNode).not.toHaveBeenCalledWith(context, 'error-notify')
286+
})
287+
288+
it('re-queues a convergence target when a resumed pause error edge is pruned', async () => {
289+
// The join is fed by a succeeding block's `source` edge (activated in
290+
// phase 1) AND the pause block's `error` edge. On resume the error edge is
291+
// pruned, but the join must still run because it already had a genuine
292+
// activation — otherwise it would be silently stranded.
293+
const pauseNode = createMockNode('pause-block', 'human_in_the_loop')
294+
pauseNode.outgoingEdges.set('pause-block→join-error', {
295+
target: 'join',
296+
sourceHandle: EDGE.ERROR,
297+
})
298+
const joinNode = createMockNode('join', 'function')
299+
joinNode.incomingEdges.add('pause-block')
300+
301+
const dag = createMockDAG([pauseNode, joinNode])
302+
const context = createMockContext({
303+
metadata: {
304+
executionId: 'test-execution',
305+
startTime: new Date().toISOString(),
306+
pendingBlocks: [],
307+
remainingEdges: [{ source: 'pause-block', target: 'join' }],
308+
} as any,
309+
})
310+
const edgeManager = createMockEdgeManager(() => [])
311+
// Join already received a genuine activation in phase 1 and is now ready.
312+
vi.mocked(edgeManager.hasActivatedEdge).mockReturnValue(true)
313+
vi.mocked(edgeManager.isNodeReady).mockReturnValue(true)
314+
const nodeOrchestrator = createMockNodeOrchestrator()
315+
316+
const engine = new ExecutionEngine(context, dag, edgeManager, nodeOrchestrator)
317+
await engine.run()
318+
319+
expect(edgeManager.deactivateResumedEdge).toHaveBeenCalledWith(
320+
'pause-block',
321+
'join',
322+
EDGE.ERROR
323+
)
324+
// Pruning is via deactivation, never force-activation...
325+
expect(edgeManager.markNodeWithActivatedEdge).not.toHaveBeenCalledWith('join')
326+
// ...but the already-activated convergence node still runs.
327+
expect(nodeOrchestrator.executeNode).toHaveBeenCalledWith(context, 'join')
328+
})
329+
330+
it('prefers the continuation handle when a pause block also errors into the same target', async () => {
331+
// Pause block wires BOTH source and error into the same target. The error
332+
// edge is registered first, but on a successful resume the continuation
333+
// (source) handle must win, so the target is activated, not pruned.
334+
const pauseNode = createMockNode('pause-block', 'human_in_the_loop')
335+
pauseNode.outgoingEdges.set('pause-block→both-error', {
336+
target: 'both',
337+
sourceHandle: EDGE.ERROR,
338+
})
339+
pauseNode.outgoingEdges.set('pause-block→both-source', {
340+
target: 'both',
341+
sourceHandle: EDGE.SOURCE,
342+
})
343+
const bothNode = createMockNode('both', 'function')
344+
bothNode.incomingEdges.add('pause-block')
345+
346+
const dag = createMockDAG([pauseNode, bothNode])
347+
const context = createMockContext({
348+
metadata: {
349+
executionId: 'test-execution',
350+
startTime: new Date().toISOString(),
351+
pendingBlocks: [],
352+
remainingEdges: [{ source: 'pause-block', target: 'both' }],
353+
} as any,
354+
})
355+
const edgeManager = createMockEdgeManager(() => [])
356+
const nodeOrchestrator = createMockNodeOrchestrator()
357+
358+
const engine = new ExecutionEngine(context, dag, edgeManager, nodeOrchestrator)
359+
await engine.run()
360+
361+
expect(edgeManager.markNodeWithActivatedEdge).toHaveBeenCalledWith('both')
362+
expect(edgeManager.deactivateResumedEdge).not.toHaveBeenCalled()
363+
expect(nodeOrchestrator.executeNode).toHaveBeenCalledWith(context, 'both')
364+
})
365+
234366
it('should execute all nodes in a multi-node workflow', async () => {
235367
const nodes = [
236368
createMockNode('start', 'starter'),

apps/sim/executor/execution/engine.ts

Lines changed: 54 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -287,17 +287,33 @@ export class ExecutionEngine {
287287

288288
for (const edge of remainingEdges) {
289289
const targetNode = this.dag.nodes.get(edge.target)
290-
if (targetNode) {
291-
const hadEdge = targetNode.incomingEdges.has(edge.source)
292-
targetNode.incomingEdges.delete(edge.source)
293-
if (hadEdge) {
294-
this.edgeManager.markNodeWithActivatedEdge(targetNode.id)
295-
}
296-
297-
if (this.edgeManager.isNodeReady(targetNode)) {
298-
this.execLogger.info('Node became ready after edge removal', { nodeId: targetNode.id })
290+
if (!targetNode) continue
291+
292+
const sourceHandle = this.resolveRemainingEdgeHandle(edge)
293+
if (sourceHandle === EDGE.ERROR) {
294+
this.edgeManager.deactivateResumedEdge(edge.source, targetNode.id, sourceHandle)
295+
296+
if (
297+
this.edgeManager.hasActivatedEdge(targetNode.id) &&
298+
this.edgeManager.isNodeReady(targetNode)
299+
) {
300+
this.execLogger.info('Convergence node ready after pruning resumed error edge', {
301+
nodeId: targetNode.id,
302+
})
299303
this.addToQueue(targetNode.id)
300304
}
305+
continue
306+
}
307+
308+
const hadEdge = targetNode.incomingEdges.has(edge.source)
309+
targetNode.incomingEdges.delete(edge.source)
310+
if (hadEdge) {
311+
this.edgeManager.markNodeWithActivatedEdge(targetNode.id)
312+
}
313+
314+
if (this.edgeManager.isNodeReady(targetNode)) {
315+
this.execLogger.info('Node became ready after edge removal', { nodeId: targetNode.id })
316+
this.addToQueue(targetNode.id)
301317
}
302318
}
303319

@@ -351,6 +367,35 @@ export class ExecutionEngine {
351367
}
352368
}
353369

370+
/**
371+
* Resolves the source handle for an edge released during pause/resume.
372+
* Persisted `remainingEdges` may omit the handle, so fall back to the live DAG
373+
* edge. When a source has both a continuation and an `error` edge to the same
374+
* target, the continuation handle wins — a successful resume must not prune it.
375+
*/
376+
private resolveRemainingEdgeHandle(edge: {
377+
source: string
378+
target: string
379+
sourceHandle?: string
380+
}): string | undefined {
381+
if (edge.sourceHandle !== undefined) return edge.sourceHandle
382+
383+
const sourceNode = this.dag.nodes.get(edge.source)
384+
if (!sourceNode) return undefined
385+
386+
let hasErrorEdge = false
387+
for (const [, outgoing] of sourceNode.outgoingEdges) {
388+
if (outgoing.target !== edge.target) continue
389+
if (outgoing.sourceHandle === EDGE.ERROR) {
390+
hasErrorEdge = true
391+
continue
392+
}
393+
return outgoing.sourceHandle
394+
}
395+
396+
return hasErrorEdge ? EDGE.ERROR : undefined
397+
}
398+
354399
private async processQueue(): Promise<void> {
355400
while (this.readyQueue.length > 0) {
356401
if (this.checkCancellation() || this.errorFlag) {

0 commit comments

Comments
 (0)