Skip to content

Commit a624b03

Browse files
matnargrussorusso
andcommitted
Code review: report and workflow invocation
Co-authored-by: Gabriele Russo Russo <gabri.russo17@gmail.com>
1 parent cc12621 commit a624b03

5 files changed

Lines changed: 50 additions & 94 deletions

File tree

internal/api/workflow.go

Lines changed: 28 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -9,7 +9,6 @@ import (
99
"net/http"
1010
"time"
1111

12-
"github.com/cornelk/hashmap"
1312
"github.com/labstack/echo/v4"
1413
"github.com/labstack/gommon/log"
1514
"github.com/serverledge-faas/serverledge/internal/client"
@@ -160,7 +159,6 @@ func InvokeWorkflow(e echo.Context) error {
160159
}
161160

162161
req := workflowInvocationRequestPool.Get().(*workflow.Request)
163-
defer workflowInvocationRequestPool.Put(req) // at the end of the function, the function.NewRequest is added to the pool.
164162
req.W = wflow
165163
req.Params = clientReq.Params
166164
req.Arrival = time.Now()
@@ -169,38 +167,49 @@ func InvokeWorkflow(e echo.Context) error {
169167
req.Async = clientReq.Async
170168

171169
req.Id = fmt.Sprintf("%v-%s%d", wflow.Name, node.NodeIdentifier[len(node.NodeIdentifier)-5:], req.Arrival.Nanosecond())
172-
req.ExecReport.Reports = hashmap.New[workflow.ExecutionReportId, *function.ExecutionReport]() // make(map[workflow.ExecutionReportId]*function.ExecutionReport)
170+
req.ExecReport.Reports = map[string]*function.ExecutionReport{}
173171

174172
if req.Async {
175-
go workflow.SubmitAsyncWorkflowInvocationRequest(req)
173+
go func() {
174+
_, errInvoke := req.W.Invoke(req)
175+
176+
defer workflowInvocationRequestPool.Put(req)
177+
178+
if errInvoke != nil {
179+
log.Printf("Invocation failed: %v", errInvoke)
180+
workflow.PublishAsyncInvocationResponse(req.Id, workflow.InvocationResponse{Success: false})
181+
return
182+
}
183+
184+
req.ExecReport.ResponseTime = time.Now().Sub(req.Arrival).Seconds()
185+
workflow.PublishAsyncInvocationResponse(req.Id, workflow.InvocationResponse{
186+
Success: true,
187+
Result: req.ExecReport.Result,
188+
Reports: req.ExecReport.Reports,
189+
ResponseTime: req.ExecReport.ResponseTime,
190+
})
191+
}()
192+
176193
return e.JSON(http.StatusOK, function.AsyncResponse{ReqId: req.Id})
177194
}
178195

179-
err = workflow.SubmitWorkflowInvocationRequest(req)
196+
// Synchronous execution of the workflow
197+
_, err = req.W.Invoke(req)
198+
199+
defer workflowInvocationRequestPool.Put(req)
180200

181201
if errors.Is(err, node.OutOfResourcesErr) {
182202
return e.String(http.StatusTooManyRequests, "")
183203
} else if err != nil {
184204
log.Printf("Invocation failed: %v", err)
185-
v := struct {
186-
Error string
187-
Progress string
188-
}{
189-
Error: err.Error(),
190-
Progress: req.ExecReport.Progress.PrettyString(),
191-
}
192-
return e.JSON(http.StatusInternalServerError, v)
205+
return e.JSON(http.StatusInternalServerError, err.Error())
193206
} else {
194-
reports := make(map[string]*function.ExecutionReport)
195-
req.ExecReport.Reports.Range(func(id workflow.ExecutionReportId, report *function.ExecutionReport) bool {
196-
reports[string(id)] = report
197-
return true
198-
})
207+
req.ExecReport.ResponseTime = time.Now().Sub(req.Arrival).Seconds()
199208

200209
return e.JSON(http.StatusOK, workflow.InvocationResponse{
201210
Success: true,
202211
Result: req.ExecReport.Result,
203-
Reports: reports,
212+
Reports: req.ExecReport.Reports,
204213
ResponseTime: req.ExecReport.ResponseTime,
205214
})
206215
}

internal/workflow/report.go

Lines changed: 19 additions & 30 deletions
Original file line numberDiff line numberDiff line change
@@ -2,49 +2,38 @@ package workflow
22

33
import (
44
"fmt"
5-
"github.com/cornelk/hashmap"
65
"github.com/serverledge-faas/serverledge/internal/function"
76
)
87

9-
type ExecutionReportId string
10-
11-
func CreateExecutionReportId(task Task) ExecutionReportId {
12-
return ExecutionReportId(printType(task.GetType()) + "_" + string(task.GetId()))
8+
func CreateExecutionReportId(task Task) string {
9+
return printType(task.GetType()) + "_" + string(task.GetId())
1310
}
1411

1512
type ExecutionReport struct {
1613
Result map[string]interface{}
17-
Reports *hashmap.Map[ExecutionReportId, *function.ExecutionReport]
18-
ResponseTime float64 // time waited by the user to get the output of the entire workflow
19-
Progress *Progress `json:"-"` // skipped in Json marshaling
14+
Reports map[string]*function.ExecutionReport
15+
ResponseTime float64 // time waited by the user to get the output of the entire workflow
2016
}
2117

2218
func (cer *ExecutionReport) String() string {
2319
str := "["
2420
str += fmt.Sprintf("\n\tResponseTime: %f,", cer.ResponseTime)
25-
str += "\n\tReports: ["
26-
if cer.Reports.Len() > 0 {
27-
j := 0
28-
cer.Reports.Range(func(id ExecutionReportId, report *function.ExecutionReport) bool {
29-
schedAction := "''"
30-
if report.SchedAction != "" {
31-
schedAction = report.SchedAction
32-
}
33-
output := "''"
34-
if report.Output != "" {
35-
output = report.Output
36-
}
21+
str += "\n\tReports: "
22+
23+
for id, report := range cer.Reports {
24+
schedAction := "''"
25+
if report.SchedAction != "" {
26+
schedAction = report.SchedAction
27+
}
28+
output := "''"
29+
if report.Output != "" {
30+
output = report.Output
31+
}
3732

38-
str += fmt.Sprintf("\n\t\t%s: {ResponseTime: %f, IsWarmStart: %v, InitTime: %f, OffloadLatency: %f, Duration: %f, SchedAction: %v, Output: %s, Result: %s}", id, report.ResponseTime, report.IsWarmStart, report.InitTime, report.OffloadLatency, report.Duration, schedAction, output, report.Result)
39-
if j < cer.Reports.Len()-1 {
40-
str += ","
41-
}
42-
if j == cer.Reports.Len()-1 {
43-
str += "\n\t]"
44-
}
45-
j++
46-
return true
47-
})
33+
str += fmt.Sprintf("\n\t\t%s: {ResponseTime: %f, IsWarmStart: %v, InitTime: %f, OffloadLatency: %f,"+
34+
" Duration: %f, SchedAction: %v, Output: %s, Result: %s}", id, report.ResponseTime, report.IsWarmStart,
35+
report.InitTime, report.OffloadLatency, report.Duration, schedAction, output, report.Result)
36+
str += ",\n"
4837
}
4938

5039
str += "\n\tResult: {"

internal/workflow/scheduler.go

Lines changed: 0 additions & 41 deletions
This file was deleted.

internal/workflow/simple_task.go

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -103,11 +103,10 @@ func (s *SimpleTask) exec(compRequest *Request, params ...map[string]interface{}
103103
return nil, fmt.Errorf("output type checking failed: %v", err)
104104
}
105105
}
106-
107106
}
108107

109108
// saving execution report for this function
110-
compRequest.ExecReport.Reports.Set(CreateExecutionReportId(s), &report)
109+
compRequest.ExecReport.Reports[CreateExecutionReportId(s)] = &report
111110

112111
return outputData, nil
113112
}

internal/workflow/workflow.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -297,7 +297,6 @@ func (workflow *Workflow) Execute(r *Request, input *PartialData, progress *Prog
297297
}
298298
if err != nil {
299299
progress.Fail(n.GetId())
300-
r.ExecReport.Progress = progress
301300
return output, progress, false, err
302301
}
303302
} else {
@@ -437,12 +436,13 @@ func (workflow *Workflow) Invoke(r *Request) (ExecutionReport, error) {
437436
// executing workflow
438437
pd, progress, shouldContinue, err = workflow.Execute(r, pd, progress)
439438
if err != nil {
440-
return ExecutionReport{Result: nil, Progress: progress}, fmt.Errorf("failed workflow execution: %v", err)
439+
return ExecutionReport{}, fmt.Errorf("failed workflow execution: %v", err)
441440
}
442441
}
443442

444443
r.ExecReport.Result = pd.Data
445444

445+
// TODO: remove r.ExecReport
446446
return r.ExecReport, nil
447447
}
448448

0 commit comments

Comments
 (0)