Skip to content
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
111 changes: 111 additions & 0 deletions apps/sim/executor/execution/edge-manager.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -875,6 +875,117 @@ describe('EdgeManager', () => {
})
})

describe('deactivateResumedEdge', () => {
it('prunes a resumed pause block error edge without firing the error target', () => {
// Models the HITL-resume bug: a pause block and a regular block both feed
// an error-notifier via `error` handles. On a fully successful run the
// notifier must never run.
const pauseId = 'pause-block'
const regularId = 'regular-block'
const notifyId = 'error-notify'

const pauseNode = createMockNode(
pauseId,
[
{ target: 'next', sourceHandle: EDGE.SOURCE },
{ target: notifyId, sourceHandle: EDGE.ERROR },
],
[]
)
const regularNode = createMockNode(regularId, [
{ target: notifyId, sourceHandle: EDGE.ERROR },
])
const notifyNode = createMockNode(notifyId, [], [pauseId, regularId])

const dag = createMockDAG(
new Map<string, DAGNode>([
[pauseId, pauseNode],
[regularId, regularNode],
[notifyId, notifyNode],
])
)
const edgeManager = new EdgeManager(dag)

// Resume releases the pause block's error edge as deactivated.
edgeManager.deactivateResumedEdge(pauseId, notifyId, EDGE.ERROR)

expect(edgeManager.getDeactivatedEdges()).toContain(
JSON.stringify([pauseId, notifyId, EDGE.ERROR])
)
expect(edgeManager.getNodesWithActivatedEdge()).not.toContain(notifyId)

// The regular block then completes successfully → its error edge deactivates too.
const readyNodes = edgeManager.processOutgoingEdges(regularNode, { result: 'ok' })

// With no error edge ever activated, the notifier is never scheduled.
expect(readyNodes).not.toContain(notifyId)
expect(edgeManager.getNodesWithActivatedEdge()).not.toContain(notifyId)
})

it('still fires the error target when a real upstream block errors', () => {
// Same topology, but here the regular block genuinely errors — the notifier
// must fire even though the pause block's error edge was pruned on resume.
const pauseId = 'pause-block'
const regularId = 'regular-block'
const notifyId = 'error-notify'

const regularNode = createMockNode(regularId, [
{ target: notifyId, sourceHandle: EDGE.ERROR },
])
const notifyNode = createMockNode(notifyId, [], [pauseId, regularId])

const dag = createMockDAG(
new Map<string, DAGNode>([
[regularId, regularNode],
[notifyId, notifyNode],
])
)
const edgeManager = new EdgeManager(dag)

edgeManager.deactivateResumedEdge(pauseId, notifyId, EDGE.ERROR)

const readyNodes = edgeManager.processOutgoingEdges(regularNode, { error: 'boom' })

expect(readyNodes).toContain(notifyId)
expect(edgeManager.getNodesWithActivatedEdge()).toContain(notifyId)
})

it('leaves a convergence join ready+activated after pruning a pause error edge', () => {
// Join `C` is fed by `B.source` (succeeds) and `P.error` (pause block).
// After B activates and P's error edge is pruned on resume, C must be both
// activated and ready so the engine re-queues it.
const sourceId = 'block-b'
const pauseId = 'pause-block'
const joinId = 'join-c'

const sourceNode = createMockNode(sourceId, [{ target: joinId, sourceHandle: EDGE.SOURCE }])
const pauseNode = createMockNode(pauseId, [{ target: joinId, sourceHandle: EDGE.ERROR }])
const joinNode = createMockNode(joinId, [], [sourceId, pauseId])

const edgeManager = new EdgeManager(
createMockDAG(
new Map<string, DAGNode>([
[sourceId, sourceNode],
[pauseId, pauseNode],
[joinId, joinNode],
])
)
)

// Phase 1: B succeeds → activates B→C, but C still waits on the pause edge.
const readyAfterB = edgeManager.processOutgoingEdges(sourceNode, { result: 'ok' })
expect(readyAfterB).not.toContain(joinId)
expect(edgeManager.hasActivatedEdge(joinId)).toBe(true)
expect(edgeManager.isNodeReady(joinNode)).toBe(false)

// Resume: pause block's error edge is pruned → C is now ready (and stays
// activated), which is exactly the state the engine uses to re-queue it.
edgeManager.deactivateResumedEdge(pauseId, joinId, EDGE.ERROR)
expect(edgeManager.hasActivatedEdge(joinId)).toBe(true)
expect(edgeManager.isNodeReady(joinNode)).toBe(true)
})
})

describe('Diamond pattern (convergent paths)', () => {
it('should handle diamond: condition splits then converges at merge point', () => {
const conditionId = 'condition-1'
Expand Down
27 changes: 8 additions & 19 deletions apps/sim/executor/execution/edge-manager.ts
Original file line number Diff line number Diff line change
Expand Up @@ -134,6 +134,10 @@ export class EdgeManager {
return Array.from(this.nodesWithActivatedEdge)
}

hasActivatedEdge(nodeId: string): boolean {
return this.nodesWithActivatedEdge.has(nodeId)
}

restoreDeactivatedEdges(edgeKeys?: string[], activatedNodeIds?: string[]): void {
this.deactivatedEdges = new Set(
(edgeKeys ?? []).map((edgeKey) => this.normalizeSerializedEdgeKey(edgeKey))
Expand All @@ -145,17 +149,10 @@ export class EdgeManager {
this.nodesWithActivatedEdge.add(nodeId)
}

/**
* Clear deactivated edges for a set of nodes (used when restoring loop state for next iteration).
*
* Only clears edges whose SOURCE is in the provided set. Edges pointing INTO a node in the set
* whose source lives outside (e.g. an external branch whose path was cascade-deactivated) must
* remain deactivated — otherwise `countActiveIncomingEdges` would count a source that will never
* fire again, stalling the loop on its next iteration.
*
* Deactivated edge keys encode the source separately so node IDs with shared prefixes
* cannot clear each other's deactivated edges.
*/
deactivateResumedEdge(sourceId: string, targetId: string, sourceHandle?: string): void {
this.deactivateEdgeAndDescendants(sourceId, targetId, sourceHandle)
}
Comment thread
icecrasher321 marked this conversation as resolved.

clearDeactivatedEdgesForNodes(nodeIds: Set<string>): void {
const edgesToRemove: string[] = []
for (const edgeKey of this.deactivatedEdges) {
Expand All @@ -182,11 +179,6 @@ export class EdgeManager {
return targetNode ? this.isNodeReady(targetNode) : false
}

/**
* Checks if the cascade target sentinel belongs to the same subflow as the source node.
* A condition inside a loop that hits a dead-end should still allow the enclosing
* loop's sentinel to fire so the loop can continue or exit.
*/
private isEnclosingSentinel(sourceNode: DAGNode, sentinelId: string): boolean {
const sentinel = this.dag.nodes.get(sentinelId)
if (!sentinel?.metadata.isSentinel) return false
Expand Down Expand Up @@ -321,9 +313,6 @@ export class EdgeManager {
}
}

/**
* Checks if a node has any active incoming edges besides the one being excluded.
*/
private hasActiveIncomingEdges(node: DAGNode, excludeEdgeKey: string): boolean {
for (const incomingSourceId of node.incomingEdges) {
const incomingNode = this.dag.nodes.get(incomingSourceId)
Expand Down
132 changes: 132 additions & 0 deletions apps/sim/executor/execution/engine.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -112,6 +112,8 @@ function createMockEdgeManager(
getDeactivatedEdges: vi.fn(() => []),
getNodesWithActivatedEdge: vi.fn(() => []),
markNodeWithActivatedEdge: vi.fn(),
deactivateResumedEdge: vi.fn(),
hasActivatedEdge: vi.fn(() => false),
} as unknown as MockEdgeManager
}

Expand Down Expand Up @@ -231,6 +233,136 @@ describe('ExecutionEngine', () => {
expect(nodeOrchestrator.executeNode).not.toHaveBeenCalled()
})

it('deactivates a resumed pause block error edge instead of firing it', async () => {
// Pause block has two outgoing edges: a `source` continuation edge and an
// `error` edge to an error-notifier. On a successful resume the error edge
// must be deactivated (never marked/queued); only the continuation fires.
const pauseNode = createMockNode('pause-block', 'human_in_the_loop')
pauseNode.outgoingEdges.set('pause-block→next-source', {
target: 'next',
sourceHandle: EDGE.SOURCE,
})
pauseNode.outgoingEdges.set('pause-block→notify-error', {
target: 'error-notify',
sourceHandle: EDGE.ERROR,
})

const nextNode = createMockNode('next', 'function')
nextNode.incomingEdges.add('pause-block')

const errorNotifyNode = createMockNode('error-notify', 'gmail')
errorNotifyNode.incomingEdges.add('pause-block')

const dag = createMockDAG([pauseNode, nextNode, errorNotifyNode])
const context = createMockContext({
metadata: {
executionId: 'test-execution',
startTime: new Date().toISOString(),
pendingBlocks: [],
// remainingEdges omit sourceHandle (as persisted snapshots do), forcing
// the engine to resolve the handle from the live DAG.
remainingEdges: [
{ source: 'pause-block', target: 'next' },
{ source: 'pause-block', target: 'error-notify' },
],
} as any,
})
const edgeManager = createMockEdgeManager(() => [])
const nodeOrchestrator = createMockNodeOrchestrator()

const engine = new ExecutionEngine(context, dag, edgeManager, nodeOrchestrator)
await engine.run()

// Continuation edge fires; error edge is deactivated, not activated/queued.
expect(edgeManager.markNodeWithActivatedEdge).toHaveBeenCalledWith('next')
expect(edgeManager.markNodeWithActivatedEdge).not.toHaveBeenCalledWith('error-notify')
expect(edgeManager.deactivateResumedEdge).toHaveBeenCalledWith(
'pause-block',
'error-notify',
EDGE.ERROR
)
// A pure error-handler target (never activated) must not be executed.
expect(nodeOrchestrator.executeNode).not.toHaveBeenCalledWith(context, 'error-notify')
})

it('re-queues a convergence target when a resumed pause error edge is pruned', async () => {
// The join is fed by a succeeding block's `source` edge (activated in
// phase 1) AND the pause block's `error` edge. On resume the error edge is
// pruned, but the join must still run because it already had a genuine
// activation — otherwise it would be silently stranded.
const pauseNode = createMockNode('pause-block', 'human_in_the_loop')
pauseNode.outgoingEdges.set('pause-block→join-error', {
target: 'join',
sourceHandle: EDGE.ERROR,
})
const joinNode = createMockNode('join', 'function')
joinNode.incomingEdges.add('pause-block')

const dag = createMockDAG([pauseNode, joinNode])
const context = createMockContext({
metadata: {
executionId: 'test-execution',
startTime: new Date().toISOString(),
pendingBlocks: [],
remainingEdges: [{ source: 'pause-block', target: 'join' }],
} as any,
})
const edgeManager = createMockEdgeManager(() => [])
// Join already received a genuine activation in phase 1 and is now ready.
vi.mocked(edgeManager.hasActivatedEdge).mockReturnValue(true)
vi.mocked(edgeManager.isNodeReady).mockReturnValue(true)
const nodeOrchestrator = createMockNodeOrchestrator()

const engine = new ExecutionEngine(context, dag, edgeManager, nodeOrchestrator)
await engine.run()

expect(edgeManager.deactivateResumedEdge).toHaveBeenCalledWith(
'pause-block',
'join',
EDGE.ERROR
)
// Pruning is via deactivation, never force-activation...
expect(edgeManager.markNodeWithActivatedEdge).not.toHaveBeenCalledWith('join')
// ...but the already-activated convergence node still runs.
expect(nodeOrchestrator.executeNode).toHaveBeenCalledWith(context, 'join')
})

it('prefers the continuation handle when a pause block also errors into the same target', async () => {
// Pause block wires BOTH source and error into the same target. The error
// edge is registered first, but on a successful resume the continuation
// (source) handle must win, so the target is activated, not pruned.
const pauseNode = createMockNode('pause-block', 'human_in_the_loop')
pauseNode.outgoingEdges.set('pause-block→both-error', {
target: 'both',
sourceHandle: EDGE.ERROR,
})
pauseNode.outgoingEdges.set('pause-block→both-source', {
target: 'both',
sourceHandle: EDGE.SOURCE,
})
const bothNode = createMockNode('both', 'function')
bothNode.incomingEdges.add('pause-block')

const dag = createMockDAG([pauseNode, bothNode])
const context = createMockContext({
metadata: {
executionId: 'test-execution',
startTime: new Date().toISOString(),
pendingBlocks: [],
remainingEdges: [{ source: 'pause-block', target: 'both' }],
} as any,
})
const edgeManager = createMockEdgeManager(() => [])
const nodeOrchestrator = createMockNodeOrchestrator()

const engine = new ExecutionEngine(context, dag, edgeManager, nodeOrchestrator)
await engine.run()

expect(edgeManager.markNodeWithActivatedEdge).toHaveBeenCalledWith('both')
expect(edgeManager.deactivateResumedEdge).not.toHaveBeenCalled()
expect(nodeOrchestrator.executeNode).toHaveBeenCalledWith(context, 'both')
})

it('should execute all nodes in a multi-node workflow', async () => {
const nodes = [
createMockNode('start', 'starter'),
Expand Down
63 changes: 54 additions & 9 deletions apps/sim/executor/execution/engine.ts
Original file line number Diff line number Diff line change
Expand Up @@ -287,17 +287,33 @@ export class ExecutionEngine {

for (const edge of remainingEdges) {
const targetNode = this.dag.nodes.get(edge.target)
if (targetNode) {
const hadEdge = targetNode.incomingEdges.has(edge.source)
targetNode.incomingEdges.delete(edge.source)
if (hadEdge) {
this.edgeManager.markNodeWithActivatedEdge(targetNode.id)
}

if (this.edgeManager.isNodeReady(targetNode)) {
this.execLogger.info('Node became ready after edge removal', { nodeId: targetNode.id })
if (!targetNode) continue

const sourceHandle = this.resolveRemainingEdgeHandle(edge)
if (sourceHandle === EDGE.ERROR) {
this.edgeManager.deactivateResumedEdge(edge.source, targetNode.id, sourceHandle)

if (
this.edgeManager.hasActivatedEdge(targetNode.id) &&
this.edgeManager.isNodeReady(targetNode)
) {
this.execLogger.info('Convergence node ready after pruning resumed error edge', {
nodeId: targetNode.id,
})
this.addToQueue(targetNode.id)
}
continue
}

const hadEdge = targetNode.incomingEdges.has(edge.source)
targetNode.incomingEdges.delete(edge.source)
if (hadEdge) {
this.edgeManager.markNodeWithActivatedEdge(targetNode.id)
}

if (this.edgeManager.isNodeReady(targetNode)) {
this.execLogger.info('Node became ready after edge removal', { nodeId: targetNode.id })
this.addToQueue(targetNode.id)
}
}

Expand Down Expand Up @@ -351,6 +367,35 @@ export class ExecutionEngine {
}
}

/**
* Resolves the source handle for an edge released during pause/resume.
* Persisted `remainingEdges` may omit the handle, so fall back to the live DAG
* edge. When a source has both a continuation and an `error` edge to the same
* target, the continuation handle wins — a successful resume must not prune it.
*/
private resolveRemainingEdgeHandle(edge: {
source: string
target: string
sourceHandle?: string
}): string | undefined {
if (edge.sourceHandle !== undefined) return edge.sourceHandle

const sourceNode = this.dag.nodes.get(edge.source)
if (!sourceNode) return undefined

let hasErrorEdge = false
for (const [, outgoing] of sourceNode.outgoingEdges) {
if (outgoing.target !== edge.target) continue
if (outgoing.sourceHandle === EDGE.ERROR) {
hasErrorEdge = true
continue
}
return outgoing.sourceHandle
}

return hasErrorEdge ? EDGE.ERROR : undefined
}

private async processQueue(): Promise<void> {
while (this.readyQueue.length > 0) {
if (this.checkCancellation() || this.errorFlag) {
Expand Down
Loading