-
Notifications
You must be signed in to change notification settings - Fork 3.7k
Expand file tree
/
Copy pathutils.ts
More file actions
244 lines (212 loc) · 7.3 KB
/
Copy pathutils.ts
File metadata and controls
244 lines (212 loc) · 7.3 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
import {
isWorkflowBlockAncestorLocked,
isWorkflowBlockProtected,
} from '@sim/workflow-types/workflow'
import type { Edge } from 'reactflow'
import type { BlockState, Loop, Parallel } from '@/stores/workflows/workflow/types'
const DEFAULT_LOOP_ITERATIONS = 5
const DEFAULT_PARALLEL_BATCH_SIZE = 20
const MAX_PARALLEL_BATCH_SIZE = 100
export function clampParallelBatchSize(batchSize: unknown): number {
const parsed = typeof batchSize === 'number' ? batchSize : Number.parseInt(String(batchSize), 10)
if (Number.isNaN(parsed)) {
return DEFAULT_PARALLEL_BATCH_SIZE
}
return Math.max(1, Math.min(MAX_PARALLEL_BATCH_SIZE, parsed))
}
/**
* Check if adding an edge would create a cycle in the graph.
* Uses depth-first search to detect if the source node is reachable from the target node.
*
* @param edges - Current edges in the graph
* @param sourceId - Source node ID of the proposed edge
* @param targetId - Target node ID of the proposed edge
* @returns true if adding this edge would create a cycle
*/
export function wouldCreateCycle(edges: Edge[], sourceId: string, targetId: string): boolean {
if (sourceId === targetId) {
return true
}
const adjacencyList = new Map<string, string[]>()
for (const edge of edges) {
if (!adjacencyList.has(edge.source)) {
adjacencyList.set(edge.source, [])
}
adjacencyList.get(edge.source)!.push(edge.target)
}
const visited = new Set<string>()
function canReachSource(currentNode: string): boolean {
if (currentNode === sourceId) {
return true
}
if (visited.has(currentNode)) {
return false
}
visited.add(currentNode)
const neighbors = adjacencyList.get(currentNode) || []
for (const neighbor of neighbors) {
if (canReachSource(neighbor)) {
return true
}
}
return false
}
return canReachSource(targetId)
}
/**
* Convert UI loop block to executor Loop format
*
* @param loopBlockId - ID of the loop block to convert
* @param blocks - Record of all blocks in the workflow
* @returns Loop object for execution engine or undefined if not a valid loop
*/
export function convertLoopBlockToLoop(
loopBlockId: string,
blocks: Record<string, BlockState>
): Loop | undefined {
const loopBlock = blocks[loopBlockId]
if (!loopBlock || loopBlock.type !== 'loop') return undefined
const loopType = loopBlock.data?.loopType || 'for'
const loop: Loop = {
id: loopBlockId,
nodes: findChildNodes(loopBlockId, blocks),
iterations: loopBlock.data?.count || DEFAULT_LOOP_ITERATIONS,
loopType,
enabled: loopBlock.enabled,
}
loop.forEachItems = loopBlock.data?.collection || ''
loop.whileCondition = loopBlock.data?.whileCondition || ''
loop.doWhileCondition = loopBlock.data?.doWhileCondition || ''
return loop
}
/**
* Convert UI parallel block to executor Parallel format
*
* @param parallelBlockId - ID of the parallel block to convert
* @param blocks - Record of all blocks in the workflow
* @returns Parallel object for execution engine or undefined if not a valid parallel block
*/
export function convertParallelBlockToParallel(
parallelBlockId: string,
blocks: Record<string, BlockState>
): Parallel | undefined {
const parallelBlock = blocks[parallelBlockId]
if (!parallelBlock || parallelBlock.type !== 'parallel') return undefined
const parallelType = parallelBlock.data?.parallelType || 'count'
const validParallelTypes = ['collection', 'count'] as const
const validatedParallelType = validParallelTypes.includes(parallelType as any)
? parallelType
: 'collection'
const distribution =
validatedParallelType === 'collection' ? parallelBlock.data?.collection || '' : undefined
const count = parallelBlock.data?.count || 5
const batchSize = clampParallelBatchSize(parallelBlock.data?.batchSize)
return {
id: parallelBlockId,
nodes: findChildNodes(parallelBlockId, blocks),
distribution,
count,
parallelType: validatedParallelType,
batchSize,
enabled: parallelBlock.enabled,
}
}
/**
* Find all nodes that are children of this container (loop or parallel)
*
* @param containerId - ID of the container to find children for
* @param blocks - Record of all blocks in the workflow
* @returns Array of node IDs that are direct children of this container
*/
export function findChildNodes(containerId: string, blocks: Record<string, BlockState>): string[] {
return Object.values(blocks)
.filter((block) => block.data?.parentId === containerId)
.map((block) => block.id)
}
/**
* Find all descendant nodes, including children, grandchildren, etc.
*
* @param containerId - ID of the container to find descendants for
* @param blocks - Record of all blocks in the workflow
* @returns Array of node IDs that are descendants of this container
*/
export function findAllDescendantNodes(
containerId: string,
blocks: Record<string, BlockState>
): string[] {
const descendants: string[] = []
const visited = new Set<string>()
const stack = [containerId]
while (stack.length > 0) {
const current = stack.pop()!
if (visited.has(current)) continue
visited.add(current)
for (const block of Object.values(blocks)) {
if (block.data?.parentId === current) {
descendants.push(block.id)
stack.push(block.id)
}
}
}
return descendants
}
/**
* Checks if any ancestor container of a block is locked.
* Unlike {@link isBlockProtected}, this ignores the block's own locked state.
*
* @param blockId - The ID of the block to check
* @param blocks - Record of all blocks in the workflow
* @returns True if any ancestor is locked
*/
export function isAncestorProtected(blockId: string, blocks: Record<string, BlockState>): boolean {
return isWorkflowBlockAncestorLocked(blockId, blocks)
}
/**
* Checks if a block is protected from editing/deletion.
* A block is protected if it is locked or if any ancestor container is locked.
*
* @param blockId - The ID of the block to check
* @param blocks - Record of all blocks in the workflow
* @returns True if the block is protected
*/
export function isBlockProtected(blockId: string, blocks: Record<string, BlockState>): boolean {
return isWorkflowBlockProtected(blockId, blocks)
}
/**
* Builds a complete collection of loops from the UI blocks
*
* @param blocks - Record of all blocks in the workflow
* @returns Record of Loop objects for execution engine
*/
export function generateLoopBlocks(blocks: Record<string, BlockState>): Record<string, Loop> {
const loops: Record<string, Loop> = {}
Object.entries(blocks)
.filter(([_, block]) => block.type === 'loop')
.forEach(([id, block]) => {
const loop = convertLoopBlockToLoop(id, blocks)
if (loop) {
loops[id] = loop
}
})
return loops
}
/**
* Builds a complete collection of parallel blocks from the UI blocks
*
* @param blocks - Record of all blocks in the workflow
* @returns Record of Parallel objects for execution engine
*/
export function generateParallelBlocks(
blocks: Record<string, BlockState>
): Record<string, Parallel> {
const parallels: Record<string, Parallel> = {}
Object.entries(blocks)
.filter(([_, block]) => block.type === 'parallel')
.forEach(([id, block]) => {
const parallel = convertParallelBlockToParallel(id, blocks)
if (parallel) {
parallels[id] = parallel
}
})
return parallels
}