Skip to content

Commit 34d4993

Browse files
committed
Avoids recursive DAG visit
1 parent 547c9fb commit 34d4993

3 files changed

Lines changed: 25 additions & 79 deletions

File tree

internal/test/workflow_test.go

Lines changed: 4 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -448,17 +448,16 @@ func TestVisit(t *testing.T) {
448448

449449
choice := startNext.GetNext()[0]
450450

451-
nodeList := make([]workflow.Task, 0)
452-
visitedNodes := workflow.Visit(complexWorkflow, complexWorkflow.Start.Id, nodeList, false)
451+
visitedNodes := workflow.Visit(complexWorkflow, complexWorkflow.Start.Id, false)
453452
u.AssertEquals(t, len(complexWorkflow.Tasks), len(visitedNodes))
454453

455-
visitedNodes = workflow.Visit(complexWorkflow, complexWorkflow.Start.Id, nodeList, true)
454+
visitedNodes = workflow.Visit(complexWorkflow, complexWorkflow.Start.Id, true)
456455
u.AssertEquals(t, len(complexWorkflow.Tasks)-1, len(visitedNodes))
457456

458-
visitedNodes = workflow.Visit(complexWorkflow, choice, nodeList, false)
457+
visitedNodes = workflow.Visit(complexWorkflow, choice, false)
459458
u.AssertEquals(t, 8, len(visitedNodes))
460459

461-
visitedNodes = workflow.Visit(complexWorkflow, choice, nodeList, true)
460+
visitedNodes = workflow.Visit(complexWorkflow, choice, true)
462461
u.AssertEquals(t, 7, len(visitedNodes))
463462

464463
}

internal/workflow/choice_task.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -75,7 +75,7 @@ func (c *ChoiceTask) VisitBranch(workflow *Workflow, branch int) []Task {
7575
return branchTasks
7676
}
7777
taskId := c.NextTasks[branch]
78-
return Visit(workflow, taskId, branchTasks, true)
78+
return Visit(workflow, taskId, true)
7979
}
8080

8181
// GetTasksToSkip skips all tasks that are in a branch that will not be executed.

internal/workflow/workflow.go

Lines changed: 20 additions & 73 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@ import (
66
"errors"
77
"fmt"
88
"github.com/serverledge-faas/serverledge/internal/client"
9+
"golang.org/x/exp/slices"
910
"io"
1011
"net/http"
1112
"sort"
@@ -114,89 +115,35 @@ func (w *Workflow) computePreviousTasks() {
114115
}
115116
}
116117

117-
// Visit visits the workflow starting from the given task and return a list of visited tasks.
118-
// If excludeEnd = true, the EndTask will not be in the output list
119-
func Visit(workflow *Workflow, taskId TaskId, tasks []Task, excludeEnd bool) []Task {
118+
func Visit(workflow *Workflow, taskId TaskId, excludeEnd bool) []Task {
119+
120120
task, ok := workflow.Find(taskId)
121121
if !ok {
122122
return []Task{}
123123
}
124-
if !isTaskPresent(task, tasks) {
124+
125+
tasks := make([]Task, 0)
126+
visited := make(map[TaskId]bool)
127+
toVisit := []Task{task}
128+
129+
for len(toVisit) > 0 {
130+
task := toVisit[0]
125131
tasks = append(tasks, task)
126-
}
127-
switch n := task.(type) {
128-
case *StartTask:
129-
toAdd := Visit(workflow, n.GetNext()[0], tasks, excludeEnd)
130-
for _, add := range toAdd {
131-
if !isTaskPresent(add, tasks) {
132-
// only when isEndTask = true, excludeEnd = true -> we don't add the task
133-
if !isEndTask(add) || !excludeEnd {
134-
tasks = append(tasks, add)
135-
}
136-
}
137-
}
138-
return tasks
139-
case *SimpleTask, *PassTask, *SuccessTask, *FailureTask:
140-
toAdd := Visit(workflow, n.GetNext()[0], tasks, excludeEnd)
141-
for _, add := range toAdd {
142-
if !isTaskPresent(add, tasks) {
143-
if !isEndTask(add) || !excludeEnd {
144-
tasks = append(tasks, add)
145-
}
146-
}
147-
}
148-
return tasks
149-
case *EndTask:
150-
if !excludeEnd { // move end task to the end of the visit list
151-
endTask := n
152-
// get index of end task to remove\
153-
indexToRemove := -1
154-
for i, task := range tasks {
155-
if isEndTask(task) {
156-
indexToRemove = i
157-
break
158-
}
159-
}
160-
// remove end task
161-
tasks = append(tasks[:indexToRemove], tasks[indexToRemove+1:]...)
162-
// append at the end of the visited task list
163-
tasks = append(tasks, endTask)
164-
}
165-
return tasks
166-
case *ChoiceTask:
167-
for _, alternative := range n.GetNext() {
168-
toAdd := Visit(workflow, alternative, tasks, excludeEnd)
169-
for _, add := range toAdd {
170-
if !isTaskPresent(add, tasks) {
171-
if !isEndTask(add) || !excludeEnd {
172-
tasks = append(tasks, add)
173-
}
174-
}
175-
}
176-
}
177-
return tasks
178-
case *FanOutTask:
179-
for _, parallelBranch := range n.GetNext() {
180-
toAdd := Visit(workflow, parallelBranch, tasks, excludeEnd)
181-
for _, add := range toAdd {
182-
if !isTaskPresent(add, tasks) {
183-
if !isEndTask(add) || !excludeEnd {
184-
tasks = append(tasks, add)
132+
toVisit = toVisit[1:]
133+
visited[task.GetId()] = true
134+
135+
for _, nextTask := range task.GetNext() {
136+
if _, ok := visited[nextTask]; !ok {
137+
nt := workflow.Tasks[nextTask]
138+
if !excludeEnd || nt.GetType() != End {
139+
if !slices.Contains(toVisit, nt) {
140+
toVisit = append(toVisit, nt)
185141
}
186142
}
187143
}
188144
}
189-
return tasks
190-
case *FanInTask:
191-
toAdd := Visit(workflow, n.GetNext()[0], tasks, excludeEnd)
192-
for _, add := range toAdd {
193-
if !isTaskPresent(add, tasks) {
194-
if !isEndTask(add) || !excludeEnd {
195-
tasks = append(tasks, add)
196-
}
197-
}
198-
}
199145
}
146+
200147
return tasks
201148
}
202149

0 commit comments

Comments
 (0)