Skip to content
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ const {
}))

vi.mock('@/lib/auth/hybrid', () => ({
AuthType: { SESSION: 'session', API_KEY: 'api_key', INTERNAL_JWT: 'internal_jwt' },
checkHybridAuth: mockCheckHybridAuth,
}))

Expand Down
86 changes: 51 additions & 35 deletions apps/sim/executor/execution/block-executor.ts
Original file line number Diff line number Diff line change
Expand Up @@ -77,7 +77,7 @@ export class BlockExecutor {
if (!isSentinel) {
blockLog = this.createBlockLog(ctx, node.id, block, node)
ctx.blockLogs.push(blockLog)
this.callOnBlockStart(ctx, node, block, blockLog.executionOrder)
await this.callOnBlockStart(ctx, node, block, blockLog.executionOrder)
}

const startTime = performance.now()
Expand Down Expand Up @@ -105,7 +105,7 @@ export class BlockExecutor {
}
} catch (error) {
cleanupSelfReference?.()
return this.handleBlockError(
return await this.handleBlockError(
error,
ctx,
node,
Expand Down Expand Up @@ -179,7 +179,7 @@ export class BlockExecutor {
const displayOutput = filterOutputForLog(block.metadata?.id || '', normalizedOutput, {
block,
})
this.callOnBlockComplete(
await this.callOnBlockComplete(
ctx,
node,
block,
Expand All @@ -195,7 +195,7 @@ export class BlockExecutor {

return normalizedOutput
} catch (error) {
return this.handleBlockError(
return await this.handleBlockError(
error,
ctx,
node,
Expand Down Expand Up @@ -226,7 +226,7 @@ export class BlockExecutor {
return this.blockHandlers.find((h) => h.canHandle(block))
}

private handleBlockError(
private async handleBlockError(
error: unknown,
ctx: ExecutionContext,
node: DAGNode,
Expand All @@ -236,7 +236,7 @@ export class BlockExecutor {
resolvedInputs: Record<string, any>,
isSentinel: boolean,
phase: 'input_resolution' | 'execution'
): NormalizedBlockOutput {
): Promise<NormalizedBlockOutput> {
const duration = performance.now() - startTime
const errorMessage = normalizeError(error)
const hasResolvedInputs =
Expand Down Expand Up @@ -287,7 +287,7 @@ export class BlockExecutor {
? error.childWorkflowInstanceId
: undefined
const displayOutput = filterOutputForLog(block.metadata?.id || '', errorOutput, { block })
this.callOnBlockComplete(
await this.callOnBlockComplete(
ctx,
node,
block,
Expand Down Expand Up @@ -439,31 +439,39 @@ export class BlockExecutor {
return redactApiKeys(result)
}

private callOnBlockStart(
private async callOnBlockStart(
ctx: ExecutionContext,
node: DAGNode,
block: SerializedBlock,
executionOrder: number
): void {
): Promise<void> {
const blockId = node.metadata?.originalBlockId ?? node.id
const blockName = block.metadata?.name ?? blockId
const blockType = block.metadata?.id ?? DEFAULTS.BLOCK_TYPE

const iterationContext = getIterationContext(ctx, node?.metadata)

if (this.contextExtensions.onBlockStart) {
this.contextExtensions.onBlockStart(
blockId,
blockName,
blockType,
executionOrder,
iterationContext,
ctx.childWorkflowContext
)
try {
await this.contextExtensions.onBlockStart(
blockId,
blockName,
blockType,
executionOrder,
iterationContext,
ctx.childWorkflowContext
)
} catch (error) {
logger.warn('Block start callback failed', {
blockId,
blockType,
error: error instanceof Error ? error.message : String(error),
})
}
}
}

private callOnBlockComplete(
private async callOnBlockComplete(
ctx: ExecutionContext,
node: DAGNode,
block: SerializedBlock,
Expand All @@ -474,30 +482,38 @@ export class BlockExecutor {
executionOrder: number,
endedAt: string,
childWorkflowInstanceId?: string
): void {
): Promise<void> {
const blockId = node.metadata?.originalBlockId ?? node.id
const blockName = block.metadata?.name ?? blockId
const blockType = block.metadata?.id ?? DEFAULTS.BLOCK_TYPE

const iterationContext = getIterationContext(ctx, node?.metadata)

if (this.contextExtensions.onBlockComplete) {
this.contextExtensions.onBlockComplete(
blockId,
blockName,
blockType,
{
input,
output,
executionTime: duration,
startedAt,
executionOrder,
endedAt,
childWorkflowInstanceId,
},
iterationContext,
ctx.childWorkflowContext
)
try {
await this.contextExtensions.onBlockComplete(
blockId,
blockName,
blockType,
{
input,
output,
executionTime: duration,
startedAt,
executionOrder,
endedAt,
childWorkflowInstanceId,
},
iterationContext,
ctx.childWorkflowContext
)
} catch (error) {
logger.warn('Block completion callback failed', {
blockId,
blockType,
error: error instanceof Error ? error.message : String(error),
})
}
}
}

Expand Down
41 changes: 24 additions & 17 deletions apps/sim/executor/orchestrators/loop.ts
Original file line number Diff line number Diff line change
Expand Up @@ -238,7 +238,7 @@ export class LoopOrchestrator {
}
if (isCancelled) {
logger.info('Loop execution cancelled', { loopId, iteration: scope.iteration })
return this.createExitResult(ctx, loopId, scope)
return await this.createExitResult(ctx, loopId, scope)
}

const iterationResults: NormalizedBlockOutput[] = []
Expand All @@ -253,7 +253,7 @@ export class LoopOrchestrator {
scope.currentIterationOutputs.clear()

if (!(await this.evaluateCondition(ctx, scope, scope.iteration + 1))) {
return this.createExitResult(ctx, loopId, scope)
return await this.createExitResult(ctx, loopId, scope)
}

scope.iteration++
Expand All @@ -269,11 +269,11 @@ export class LoopOrchestrator {
}
}

private createExitResult(
private async createExitResult(
ctx: ExecutionContext,
loopId: string,
scope: LoopScope
): LoopContinuationResult {
): Promise<LoopContinuationResult> {
const results = scope.allIterationOutputs
const output = { results }
this.state.setBlockOutput(loopId, output, DEFAULTS.EXECUTION_TIME)
Expand All @@ -282,19 +282,26 @@ export class LoopOrchestrator {
const now = new Date().toISOString()
const iterationContext = buildContainerIterationContext(ctx, loopId)

this.contextExtensions.onBlockComplete(
loopId,
'Loop',
'loop',
{
output,
executionTime: DEFAULTS.EXECUTION_TIME,
startedAt: now,
executionOrder: getNextExecutionOrder(ctx),
endedAt: now,
},
iterationContext
)
try {
await this.contextExtensions.onBlockComplete(
loopId,
'Loop',
'loop',
{
output,
executionTime: DEFAULTS.EXECUTION_TIME,
startedAt: now,
executionOrder: getNextExecutionOrder(ctx),
endedAt: now,
},
iterationContext
)
} catch (error) {
logger.warn('Loop completion callback failed', {
loopId,
error: error instanceof Error ? error.message : String(error),
})
}
}

return {
Expand Down
16 changes: 8 additions & 8 deletions apps/sim/executor/orchestrators/node.ts
Original file line number Diff line number Diff line change
Expand Up @@ -92,7 +92,7 @@ export class NodeExecutionOrchestrator {
const isParallelSentinel = node.metadata.isParallelSentinel

if (isParallelSentinel) {
return this.handleParallelSentinel(ctx, node, sentinelType, parallelId)
return await this.handleParallelSentinel(ctx, node, sentinelType, parallelId)
}

switch (sentinelType) {
Expand Down Expand Up @@ -142,12 +142,12 @@ export class NodeExecutionOrchestrator {
}
}

private handleParallelSentinel(
private async handleParallelSentinel(
ctx: ExecutionContext,
node: DAGNode,
sentinelType: string | undefined,
parallelId: string | undefined
): NormalizedBlockOutput {
): Promise<NormalizedBlockOutput> {
if (!parallelId) {
logger.warn('Parallel sentinel called without parallelId')
return {}
Expand Down Expand Up @@ -176,7 +176,7 @@ export class NodeExecutionOrchestrator {
}

if (sentinelType === 'end') {
const result = this.parallelOrchestrator.aggregateParallelResults(ctx, parallelId)
const result = await this.parallelOrchestrator.aggregateParallelResults(ctx, parallelId)
return {
results: result.results || [],
sentinelEnd: true,
Expand Down Expand Up @@ -210,7 +210,7 @@ export class NodeExecutionOrchestrator {
} else if (isParallelBranch) {
const parallelId = this.findParallelIdForNode(node.id)
if (parallelId) {
this.handleParallelNodeCompletion(ctx, node, output, parallelId)
await this.handleParallelNodeCompletion(ctx, node, output, parallelId)
} else {
this.handleRegularNodeCompletion(ctx, node, output)
}
Expand All @@ -229,12 +229,12 @@ export class NodeExecutionOrchestrator {
this.state.setBlockOutput(node.id, output)
}

private handleParallelNodeCompletion(
private async handleParallelNodeCompletion(
ctx: ExecutionContext,
node: DAGNode,
output: NormalizedBlockOutput,
parallelId: string
): void {
): Promise<void> {
const scope = this.parallelOrchestrator.getParallelScope(ctx, parallelId)
if (!scope) {
const parallelConfig = this.dag.parallelConfigs.get(parallelId)
Expand All @@ -248,7 +248,7 @@ export class NodeExecutionOrchestrator {
output
)
if (allComplete) {
this.parallelOrchestrator.aggregateParallelResults(ctx, parallelId)
await this.parallelOrchestrator.aggregateParallelResults(ctx, parallelId)
}

this.state.setBlockOutput(node.id, output)
Expand Down
38 changes: 24 additions & 14 deletions apps/sim/executor/orchestrators/parallel.ts
Original file line number Diff line number Diff line change
Expand Up @@ -291,7 +291,10 @@ export class ParallelOrchestrator {
return allComplete
}

aggregateParallelResults(ctx: ExecutionContext, parallelId: string): ParallelAggregationResult {
async aggregateParallelResults(
ctx: ExecutionContext,
parallelId: string
): Promise<ParallelAggregationResult> {
const scope = ctx.parallelExecutions?.get(parallelId)
if (!scope) {
logger.error('Parallel scope not found for aggregation', { parallelId })
Expand All @@ -316,19 +319,26 @@ export class ParallelOrchestrator {
const now = new Date().toISOString()
const iterationContext = buildContainerIterationContext(ctx, parallelId)

this.contextExtensions.onBlockComplete(
parallelId,
'Parallel',
'parallel',
{
output,
executionTime: 0,
startedAt: now,
executionOrder: getNextExecutionOrder(ctx),
endedAt: now,
},
iterationContext
)
try {
await this.contextExtensions.onBlockComplete(
parallelId,
'Parallel',
'parallel',
{
output,
executionTime: 0,
startedAt: now,
executionOrder: getNextExecutionOrder(ctx),
endedAt: now,
},
iterationContext
)
} catch (error) {
logger.warn('Parallel completion callback failed', {
parallelId,
error: error instanceof Error ? error.message : String(error),
})
}
}

return {
Expand Down
Loading
Loading