Skip to content

Commit b2312cc

Browse files
committed
Moves execute*() functions into task files
1 parent 938c435 commit b2312cc

8 files changed

Lines changed: 156 additions & 166 deletions

File tree

internal/workflow/choice_task.go

Lines changed: 54 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -62,6 +62,60 @@ func (c *ChoiceNode) AddOutput(workflow *Workflow, taskId TaskId) error {
6262
return nil
6363
}
6464

65+
func (workflow *Workflow) executeChoice(progress *Progress, input *PartialData, task *ChoiceNode, r *Request) (*PartialData, *Progress, bool, error) {
66+
67+
outputData := NewPartialData(ReqId(r.Id), "", task.GetId(), nil) // partial initialization of outputData
68+
69+
// NOTE: we do not call task.CheckInput() as this task has no signature to match against
70+
71+
// simply evaluate the Conditions and set the matching one
72+
matchedCondition := -1
73+
for i, condition := range task.Conditions {
74+
ok, err := condition.Evaluate(input.Data)
75+
if err != nil {
76+
return nil, progress, false, fmt.Errorf("error while testing condition: %v", err)
77+
}
78+
if ok {
79+
matchedCondition = i
80+
break
81+
}
82+
}
83+
84+
if matchedCondition < 0 {
85+
return nil, progress, false, fmt.Errorf("no condition is met")
86+
}
87+
88+
nextTaskId := task.Alternatives[matchedCondition]
89+
outputData.ForTask = nextTaskId
90+
outputData.Data = input.Data
91+
92+
nextTask, ok := workflow.Find(nextTaskId)
93+
if !ok {
94+
return nil, progress, false, fmt.Errorf("node not found while preparing output")
95+
}
96+
switch typedTask := nextTask.(type) {
97+
case *SimpleNode:
98+
err := typedTask.MapOutput(outputData.Data)
99+
if err != nil {
100+
return outputData, progress, false, fmt.Errorf("choice task %s cannot prepare the output for simple task: %v", task.String(), err)
101+
}
102+
}
103+
104+
// for task node, we skip all branch that will not be executed
105+
nodesToSkip := task.GetNodesToSkip(workflow, matchedCondition)
106+
errSkip := progress.SkipAll(nodesToSkip)
107+
if errSkip != nil {
108+
return outputData, progress, false, errSkip
109+
}
110+
111+
err := progress.CompleteNode(task.GetId())
112+
if err != nil {
113+
return outputData, progress, false, err
114+
}
115+
116+
return outputData, progress, true, nil
117+
}
118+
65119
// VisitBranch returns all node ids of a branch under a choice node; branch number starts from 0
66120
func (c *ChoiceNode) VisitBranch(workflow *Workflow, branch int) []Task {
67121
branchNodes := make([]Task, 0)

internal/workflow/end_task.go

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -41,6 +41,14 @@ func (e *EndNode) Equals(cmp types.Comparable) bool {
4141
return e.Id == e2.Id && e.NodeType == e2.NodeType
4242
}
4343

44+
func (workflow *Workflow) executeEnd(progress *Progress, partialData *PartialData, node *EndNode, r *Request) (*PartialData, *Progress, bool, error) {
45+
err := progress.CompleteNode(node.Id)
46+
if err != nil {
47+
return partialData, progress, false, err
48+
}
49+
return partialData, progress, false, nil // false because we want to stop when reaching the end
50+
}
51+
4452
func (e *EndNode) AddOutput(workflow *Workflow, taskId TaskId) error {
4553
return nil // should not do anything. End node cannot be chained to anything
4654
}

internal/workflow/fail_task.go

Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -42,6 +42,22 @@ func (f *FailNode) Equals(cmp types.Comparable) bool {
4242
f.BranchId == f2.BranchId
4343
}
4444

45+
func (workflow *Workflow) executeFail(progress *Progress, task *FailNode, r *Request) (*PartialData, *Progress, bool, error) {
46+
47+
forNode := task.GetNext()[0]
48+
output := make(map[string]interface{})
49+
output[task.Error] = task.Cause
50+
outputData := NewPartialData(ReqId(r.Id), forNode, task.GetId(), output)
51+
52+
err := progress.CompleteNode(task.GetId())
53+
if err != nil {
54+
return outputData, progress, false, err
55+
}
56+
57+
shouldContinueExecution := task.GetNodeType() != Fail && task.GetNodeType() != Succeed
58+
return outputData, progress, shouldContinueExecution, nil
59+
}
60+
4561
func (f *FailNode) AddOutput(workflow *Workflow, taskId TaskId) error {
4662
_, ok := workflow.Nodes[taskId].(*EndNode)
4763
if !ok {

internal/workflow/fanin_task.go

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,16 @@ func NewFanInNode(fanInDegree int) *FanInNode {
2626
return &fanIn
2727
}
2828

29+
func (workflow *Workflow) executeFanIn(progress *Progress, input *PartialData, task *FanInNode, r *Request) (*PartialData, *Progress, bool, error) {
30+
outputData := NewPartialData(ReqId(r.Id), task.GetNext()[0], task.GetId(), input.Data)
31+
err := progress.CompleteNode(task.GetId())
32+
if err != nil {
33+
return nil, progress, false, err
34+
}
35+
36+
return outputData, progress, true, nil
37+
}
38+
2939
func (f *FanInNode) Equals(cmp types.Comparable) bool {
3040
switch f1 := cmp.(type) {
3141
case *FanInNode:

internal/workflow/fanout_task.go

Lines changed: 28 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -55,46 +55,57 @@ func (f *FanOutNode) Equals(cmp types.Comparable) bool {
5555
}
5656
}
5757

58-
// Exec splits the output for the next parallel dags
58+
// executeFanOut splits the output for the next parallel dags
5959
// Scatter mode can only be used if the value held in the map is of type slice. Subdivides each map entry to a different node
6060
// Broadcast mode can always be used. Copies the entire map to each of the subsequent nodes
61-
func (f *FanOutNode) Exec(_ *Request, params ...map[string]interface{}) (map[string]interface{}, error) {
61+
func (workflow *Workflow) executeFanOut(progress *Progress, input *PartialData, task *FanOutNode, r *Request) (*PartialData, *Progress, bool, error) {
6262
output := make(map[string]interface{})
6363

6464
// TODO: avoid forcing the interface implementation, so that the signature of Exec can be adapted
65-
if len(params) != 1 {
66-
return nil, fmt.Errorf("failed to get one input for choice node: received %d inputs", len(params))
65+
if len(input.Data) != 1 {
66+
return nil, progress, false, fmt.Errorf("failed to get one input for choice node: received %d inputs", len(input.Data))
6767
}
6868

6969
// input -> output: map["input":1] -> map["0":map["input":1], "1":map["input":1]]
70-
if f.Type == Broadcast {
71-
for _, nextNode := range f.GetNext() {
70+
if task.Type == Broadcast {
71+
for _, nextNode := range task.GetNext() {
7272
// TODO: check if a copy of params[0] is needed
73-
output[string(nextNode)] = params[0] // simply returns input, that will be copied to each subsequent node
73+
output[string(nextNode)] = input.Data // simply returns input, that will be copied to each subsequent node
7474
}
75-
} else if f.Type == Scatter { // scatter only accepts an array with exactly fanOutDegree elements. However, multiple input values are allowed
76-
inputName := maps.Keys(params[0])[0]
77-
inputToScatter := params[0][inputName]
75+
} else if task.Type == Scatter { // scatter only accepts an array with exactly fanOutDegree elements. However, multiple input values are allowed
76+
inputName := maps.Keys(input.Data)[0]
77+
inputToScatter := input.Data[inputName]
7878
inputArrayToScatter, errNotSlice := utils.ConvertToSlice(inputToScatter)
7979
if errNotSlice != nil {
80-
return nil, fmt.Errorf("cannot convert input %v to slice", inputToScatter)
80+
return nil, progress, false, fmt.Errorf("cannot convert input %v to slice", inputToScatter)
8181
}
8282

83-
if len(inputArrayToScatter) != f.FanOutDegree {
84-
return nil, fmt.Errorf("input array size (%d) must be equal to fanOutDegree (%d). Check the previous node output",
85-
len(inputArrayToScatter), f.FanOutDegree)
83+
if len(inputArrayToScatter) != task.FanOutDegree {
84+
return nil, progress, false, fmt.Errorf("input array size (%d) must be equal to fanOutDegree (%d). Check the previous node output",
85+
len(inputArrayToScatter), task.FanOutDegree)
8686
}
8787

88-
for i, nextNode := range f.GetNext() {
88+
for i, nextNode := range task.GetNext() {
8989
iOutput := make(map[string]interface{})
9090
iOutput[inputName] = inputArrayToScatter[i]
9191
output[string(nextNode)] = iOutput
9292
}
9393
} else {
94-
return nil, fmt.Errorf("invalid fanout mode: %d", f.Type)
94+
return nil, progress, false, fmt.Errorf("invalid fanout mode: %d", task.Type)
9595
}
9696

97-
return output, nil
97+
/* using forNode = "" in order to create a special input to handle fanout
98+
* case with Data field which contains a map[string]interface{} with the key set
99+
* to nodeId and the value which is also a map[string]interface{} containing the
100+
* effective input for the nth-parallel node */
101+
outputData := NewPartialData(ReqId(r.Id), "", task.GetId(), output)
102+
//newOutputDataMap := make(map[string]interface{}) // TODO: consider using a map of PartialData rather than a single PartialData object
103+
104+
err := progress.CompleteNode(task.GetId())
105+
if err != nil {
106+
return nil, progress, false, err
107+
}
108+
return outputData, progress, true, nil
98109
}
99110

100111
func (f *FanOutNode) AddOutput(workflow *Workflow, taskId TaskId) error {

internal/workflow/simple_task.go

Lines changed: 30 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -5,12 +5,12 @@ import (
55
"fmt"
66
"time"
77

8+
"github.com/labstack/gommon/log"
9+
"github.com/lithammer/shortuuid"
810
"github.com/serverledge-faas/serverledge/internal/function"
911
"github.com/serverledge-faas/serverledge/internal/node"
1012
"github.com/serverledge-faas/serverledge/internal/scheduling"
1113
"github.com/serverledge-faas/serverledge/internal/types"
12-
"github.com/labstack/gommon/log"
13-
"github.com/lithammer/shortuuid"
1414
)
1515

1616
// SimpleNode is a Task that receives one input and sends one result
@@ -30,7 +30,34 @@ func NewSimpleNode(f string) *SimpleNode {
3030
}
3131
}
3232

33-
func (s *SimpleNode) Exec(compRequest *Request, params ...map[string]interface{}) (map[string]interface{}, error) {
33+
func (workflow *Workflow) executeSimple(progress *Progress, input *PartialData, task *SimpleNode, r *Request) (*PartialData, *Progress, bool, error) {
34+
35+
err := task.CheckInput(input.Data)
36+
if err != nil {
37+
return nil, progress, false, err
38+
}
39+
output, err := task.exec(r, input.Data)
40+
if err != nil {
41+
return nil, progress, false, err
42+
}
43+
44+
errSend := task.PrepareOutput(workflow, output)
45+
if errSend != nil {
46+
return nil, progress, false, fmt.Errorf("the node %s cannot send the output2: %v", task.String(), errSend)
47+
}
48+
49+
nextTask := task.GetNext()[0]
50+
outputData := NewPartialData(ReqId(r.Id), nextTask, task.Id, output)
51+
52+
err = progress.CompleteNode(task.Id)
53+
if err != nil {
54+
return outputData, progress, false, err
55+
}
56+
57+
return outputData, progress, true, nil
58+
}
59+
60+
func (s *SimpleNode) exec(compRequest *Request, params ...map[string]interface{}) (map[string]interface{}, error) {
3461
funct, ok := function.GetFunction(s.Func)
3562
if !ok {
3663
return nil, fmt.Errorf("SimpleNode.function is null: you must initialize SimpleNode's function to execute it")

internal/workflow/start_task.go

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -46,6 +46,15 @@ func (s *StartNode) AddOutput(workflow *Workflow, nodeId TaskId) error {
4646
return nil
4747
}
4848

49+
func (workflow *Workflow) executeStart(progress *Progress, partialData *PartialData, node *StartNode) (*PartialData, *Progress, bool, error) {
50+
51+
err := progress.CompleteNode(node.GetId())
52+
if err != nil {
53+
return partialData, progress, false, err
54+
}
55+
return partialData, progress, true, nil
56+
}
57+
4958
func (s *StartNode) GetNext() []TaskId {
5059
// we only have one output
5160
return []TaskId{s.Next}

0 commit comments

Comments
 (0)