Skip to content

Commit 938c435

Browse files
committed
Make sure that FanIn returns output in consistent order
1 parent b6a7240 commit 938c435

3 files changed

Lines changed: 6 additions & 41 deletions

File tree

internal/api/workflow.go

Lines changed: 2 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -10,12 +10,12 @@ import (
1010
"time"
1111

1212
"github.com/cornelk/hashmap"
13+
"github.com/labstack/echo/v4"
14+
"github.com/labstack/gommon/log"
1315
"github.com/serverledge-faas/serverledge/internal/client"
1416
"github.com/serverledge-faas/serverledge/internal/function"
1517
"github.com/serverledge-faas/serverledge/internal/node"
1618
"github.com/serverledge-faas/serverledge/internal/workflow"
17-
"github.com/labstack/echo/v4"
18-
"github.com/labstack/gommon/log"
1919
)
2020

2121
func CreateWorkflowFromASL(e echo.Context) error {
@@ -169,24 +169,13 @@ func InvokeWorkflow(e echo.Context) error {
169169
req.Async = clientReq.Async
170170

171171
req.Id = fmt.Sprintf("%v-%s%d", wflow.Name, node.NodeIdentifier[len(node.NodeIdentifier)-5:], req.Arrival.Nanosecond())
172-
173-
// TODO: do we really need to initialize all the reports at this point?
174172
req.ExecReport.Reports = hashmap.New[workflow.ExecutionReportId, *function.ExecutionReport]() // make(map[workflow.ExecutionReportId]*function.ExecutionReport)
175-
for nodeId := range wflow.Nodes {
176-
task := wflow.Nodes[nodeId]
177-
execReportId := workflow.CreateExecutionReportId(task)
178-
req.ExecReport.Reports.Set(execReportId, &function.ExecutionReport{
179-
OffloadLatency: 0,
180-
SchedAction: "",
181-
})
182-
}
183173

184174
if req.Async {
185175
go workflow.SubmitAsyncWorkflowInvocationRequest(req)
186176
return e.JSON(http.StatusOK, function.AsyncResponse{ReqId: req.Id})
187177
}
188178

189-
// sync execution
190179
err = workflow.SubmitWorkflowInvocationRequest(req)
191180

192181
if errors.Is(err, node.OutOfResourcesErr) {

internal/workflow/fanin_task.go

Lines changed: 0 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -35,17 +35,6 @@ func (f *FanInNode) Equals(cmp types.Comparable) bool {
3535
}
3636
}
3737

38-
// Exec already have all inputs when executing, so it simply merges them with the chosen policy
39-
func (f *FanInNode) Exec(_ *Request, params ...map[string]interface{}) (map[string]interface{}, error) {
40-
output := make(map[string]interface{})
41-
42-
for i, inputMap := range params {
43-
output[fmt.Sprintf("%d", i)] = inputMap
44-
}
45-
46-
return output, nil
47-
}
48-
4938
func (f *FanInNode) AddOutput(workflow *Workflow, taskId TaskId) error {
5039
f.OutputTo = taskId
5140
return nil

internal/workflow/workflow.go

Lines changed: 4 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -10,9 +10,9 @@ import (
1010
"strings"
1111
"time"
1212

13+
"github.com/labstack/gommon/log"
1314
"github.com/serverledge-faas/serverledge/internal/cache"
1415
"github.com/serverledge-faas/serverledge/utils"
15-
"github.com/labstack/gommon/log"
1616

1717
"github.com/serverledge-faas/serverledge/internal/asl"
1818
"github.com/serverledge-faas/serverledge/internal/function"
@@ -406,8 +406,7 @@ func (workflow *Workflow) executeParallel(progress *Progress, input *PartialData
406406

407407
for i, outChan := range outputChannels {
408408
out := <-outChan
409-
task := parallelTasks[i]
410-
outputMap[fmt.Sprintf("%s", task.GetId())] = out
409+
outputMap[fmt.Sprintf("%d", i)] = out
411410
err := progress.CompleteNode(parallelTasks[i].GetId())
412411
if err != nil {
413412
return nil, progress, err
@@ -424,20 +423,8 @@ func (workflow *Workflow) executeParallel(progress *Progress, input *PartialData
424423
}
425424

426425
func (workflow *Workflow) executeFanIn(progress *Progress, input *PartialData, task *FanInNode, r *Request) (*PartialData, *Progress, bool, error) {
427-
inputs := make([]map[string]interface{}, task.FanInDegree)
428-
i := 0
429-
for _, inputMap := range input.Data {
430-
inputs[i] = inputMap.(map[string]interface{})
431-
i++
432-
}
433-
434-
output, err := task.Exec(r, inputs...)
435-
if err != nil {
436-
return nil, progress, false, err
437-
}
438-
439-
outputData := NewPartialData(ReqId(r.Id), task.GetNext()[0], task.GetId(), output)
440-
err = progress.CompleteNode(task.GetId())
426+
outputData := NewPartialData(ReqId(r.Id), task.GetNext()[0], task.GetId(), input.Data)
427+
err := progress.CompleteNode(task.GetId())
441428
if err != nil {
442429
return nil, progress, false, err
443430
}

0 commit comments

Comments
 (0)