Skip to content

Commit ea9c23b

Browse files
committed
Removes unnecessary PrepareOutput()
1 parent 22185cc commit ea9c23b

4 files changed

Lines changed: 1 addition & 105 deletions

File tree

internal/function/signature.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -66,7 +66,7 @@ type OutputDef struct {
6666
Type string // the type of the output parameter
6767
}
6868

69-
// CheckInput evaluates all given outputs and if there is no output that type-checks, with the given name and the current type, returns an error
69+
// CheckOutput evaluates all given outputs and if there is no output that type-checks, with the given name and the current type, returns an error
7070
func (o OutputDef) CheckOutput(inputMap map[string]interface{}) error {
7171
val, exists := inputMap[o.Name]
7272
if !exists {

internal/workflow/choice_task.go

Lines changed: 0 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -88,18 +88,6 @@ func (workflow *Workflow) executeChoice(progress *Progress, input *PartialData,
8888
outputData.ForTask = nextTaskId
8989
outputData.Data = input.Data
9090

91-
nextTask, ok := workflow.Find(nextTaskId)
92-
if !ok {
93-
return nil, progress, false, fmt.Errorf("node not found while preparing output")
94-
}
95-
switch typedTask := nextTask.(type) {
96-
case *SimpleNode:
97-
err := typedTask.MapOutput(outputData.Data)
98-
if err != nil {
99-
return outputData, progress, false, fmt.Errorf("choice task %s cannot prepare the output for simple task: %v", task.String(), err)
100-
}
101-
}
102-
10391
// for task node, we skip all branch that will not be executed
10492
nodesToSkip := task.GetNodesToSkip(workflow, matchedCondition)
10593
errSkip := progress.SkipAll(nodesToSkip)

internal/workflow/simple_task.go

Lines changed: 0 additions & 65 deletions
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,6 @@ import (
55
"fmt"
66
"time"
77

8-
"github.com/labstack/gommon/log"
98
"github.com/lithammer/shortuuid"
109
"github.com/serverledge-faas/serverledge/internal/function"
1110
"github.com/serverledge-faas/serverledge/internal/node"
@@ -40,11 +39,6 @@ func (workflow *Workflow) executeSimple(progress *Progress, input *PartialData,
4039
return nil, progress, false, err
4140
}
4241

43-
errSend := task.PrepareOutput(workflow, output)
44-
if errSend != nil {
45-
return nil, progress, false, fmt.Errorf("the node %s cannot send the output2: %v", task.String(), errSend)
46-
}
47-
4842
nextTask := task.GetNext()[0]
4943
outputData := NewPartialData(ReqId(r.Id), nextTask, task.Id, output)
5044

@@ -151,65 +145,6 @@ func (s *SimpleNode) CheckInput(input map[string]interface{}) error {
151145
return funct.Signature.CheckOrMatchInputs(input)
152146
}
153147

154-
// PrepareOutput is used to send the output to the following function and if needed can be used to modify the SimpleNode output representation, like OutputPath
155-
// TODO: this logic might be integrated in input checking
156-
func (s *SimpleNode) PrepareOutput(workflow *Workflow, output map[string]interface{}) error {
157-
funct, exists := function.GetFunction(s.Func) // we are getting the function from cache if not already downloaded
158-
if !exists {
159-
return fmt.Errorf("funtion %s doesn't exists", s.Func)
160-
}
161-
if funct.Signature == nil {
162-
funct.Signature = function.SignatureInference(output)
163-
log.Warnf("signature of function %s is nil. Output map: %v\n", funct.Name, output)
164-
}
165-
err := funct.Signature.CheckAllOutputs(output)
166-
if err != nil {
167-
return fmt.Errorf("error while checking outputs: %v", err)
168-
}
169-
// get signature of next nodes, if present and maps the output there
170-
171-
// we have only one output node
172-
task, _ := workflow.Find(s.GetNext()[0])
173-
174-
switch nodeType := task.(type) {
175-
case *SimpleNode:
176-
return nodeType.MapOutput(output) // needed to convert type of data from one node to the next so that its signature type-checks
177-
}
178-
179-
return nil
180-
}
181-
182-
// MapOutput transforms the output for the next simpleNode, to make it compatible with its Signature
183-
func (s *SimpleNode) MapOutput(output map[string]interface{}) error {
184-
funct, exists := function.GetFunction(s.Func)
185-
if !exists {
186-
return fmt.Errorf("function %s doesn't exist", s.Func)
187-
}
188-
sign := funct.Signature
189-
190-
for _, def := range sign.GetInputs() {
191-
_, present := output[def.Name]
192-
if present {
193-
continue
194-
} else if len(sign.GetInputs()) > 1 {
195-
return fmt.Errorf("key %s does not exist in output data: %v", def.Name, output)
196-
}
197-
198-
// find an entry in the output map that successfully checks the type of the InputDefinition
199-
key, ok := def.FindEntryThatTypeChecks(output)
200-
if ok {
201-
val := output[key]
202-
delete(output, key)
203-
output[def.Name] = val
204-
} else {
205-
// otherwise if no one of the entry typechecks we are doomed
206-
return fmt.Errorf("no output entry input-checks with the next function")
207-
}
208-
}
209-
210-
return nil
211-
}
212-
213148
func (s *SimpleNode) GetNext() []TaskId {
214149
// we only have one output
215150
return []TaskId{s.OutputTo}

internal/workflow/workflow.go

Lines changed: 0 additions & 27 deletions
Original file line numberDiff line numberDiff line change
@@ -213,13 +213,6 @@ func (workflow *Workflow) executeParallel(progress *Progress, input *PartialData
213213
return
214214
}
215215

216-
errSend := simpleTask.PrepareOutput(workflow, output)
217-
if errSend != nil {
218-
errorChannels[i] <- fmt.Errorf("the node %s cannot send the output2: %v", currTask.String(), errSend)
219-
outputChannels[i] <- nil
220-
return
221-
}
222-
223216
errorChannels[i] <- nil
224217
outputChannels[i] <- output
225218
} else {
@@ -462,26 +455,6 @@ func (workflow *Workflow) Invoke(r *Request) (ExecutionReport, error) {
462455
}
463456
}
464457

465-
// TODO: we may need to handle offloading decisions here
466-
//// saving partialData and progress on etcd - implementing workflow offloading policies
467-
//err = savePartialDataToEtcd(pd)
468-
//if err != nil {
469-
// return ExecutionReport{}, err
470-
//}
471-
//err = saveProgressToEtcd(progress)
472-
//if err != nil {
473-
// return ExecutionReport{}, err
474-
//}
475-
476-
//// deleting progresses and partial datas from cache and etcd
477-
//err = DeleteProgress(requestId, cache.Persist)
478-
//if err != nil {
479-
// return ExecutionReport{}, err
480-
//}
481-
//_, errDel := DeleteAllPartialData(requestId, cache.Persist)
482-
//if errDel != nil {
483-
// return ExecutionReport{}, errDel
484-
//}
485458
r.ExecReport.Result = pd.Data
486459

487460
return r.ExecReport, nil

0 commit comments

Comments
 (0)