Skip to content

Commit a18ce44

Browse files
committed
Adds workflow scheduling time in report
1 parent 5c5ab1a commit a18ce44

4 files changed

Lines changed: 25 additions & 20 deletions

File tree

internal/api/workflow.go

Lines changed: 11 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -7,7 +7,6 @@ import (
77
"fmt"
88
"io"
99
"log"
10-
"maps"
1110
"net/http"
1211
"time"
1312

@@ -218,6 +217,7 @@ func InvokeWorkflow(e echo.Context) error {
218217
req.Resuming = false
219218
req.Id = fmt.Sprintf("%v-%s%d", wflow.Name, node.LocalNode.String()[len(node.LocalNode.String())-5:], req.Arrival.Nanosecond())
220219
req.ExecReport.ResponseTime = 0.0
220+
req.ExecReport.SchedulingTime = 0.0
221221
req.ExecReport.Result = nil // NOTE: this should be nil until workflow completion
222222
req.ExecReport.Reports = map[string]*function.ExecutionReport{}
223223

@@ -238,13 +238,13 @@ func handleWorkflowInvocation(e echo.Context, req *workflow.Request) error {
238238
return
239239
}
240240

241-
log.Printf("Invocation succeeded. Publishing: %v", req.ExecReport)
242241
req.ExecReport.ResponseTime = time.Now().Sub(req.Arrival).Seconds()
243242
workflow.PublishAsyncInvocationResponse(req.Id, workflow.InvocationResponse{
244-
Success: true,
245-
Result: req.ExecReport.Result,
246-
Reports: req.ExecReport.Reports,
247-
ResponseTime: req.ExecReport.ResponseTime,
243+
Success: true,
244+
Result: req.ExecReport.Result,
245+
Reports: req.ExecReport.Reports,
246+
ResponseTime: req.ExecReport.ResponseTime,
247+
SchedulingTime: req.ExecReport.SchedulingTime,
248248
})
249249
}()
250250

@@ -264,14 +264,12 @@ func handleWorkflowInvocation(e echo.Context, req *workflow.Request) error {
264264
} else {
265265
req.ExecReport.ResponseTime = time.Now().Sub(req.Arrival).Seconds()
266266

267-
// TODO
268-
log.Printf("Invocation succeeded. Returning %d reports for keys: %v", len(req.ExecReport.Reports), maps.Keys(req.ExecReport.Reports))
269-
270267
return e.JSON(http.StatusOK, workflow.InvocationResponse{
271-
Success: true,
272-
Result: req.ExecReport.Result,
273-
Reports: req.ExecReport.Reports,
274-
ResponseTime: req.ExecReport.ResponseTime,
268+
Success: true,
269+
Result: req.ExecReport.Result,
270+
Reports: req.ExecReport.Reports,
271+
ResponseTime: req.ExecReport.ResponseTime,
272+
SchedulingTime: req.ExecReport.SchedulingTime,
275273
})
276274
}
277275
}

internal/workflow/report.go

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -10,9 +10,10 @@ func CreateExecutionReportId(task Task) string {
1010
}
1111

1212
type ExecutionReport struct {
13-
Result map[string]interface{}
14-
Reports map[string]*function.ExecutionReport
15-
ResponseTime float64 // time waited by the user to get the output of the entire workflow
13+
Result map[string]interface{}
14+
Reports map[string]*function.ExecutionReport
15+
ResponseTime float64 // time waited by the user to get the output of the entire workflow
16+
SchedulingTime float64
1617
}
1718

1819
func (cer *ExecutionReport) String() string {

internal/workflow/request.go

Lines changed: 5 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -42,10 +42,11 @@ func NewRequest(reqId string, workflow *Workflow, params map[string]interface{},
4242
}
4343

4444
type InvocationResponse struct {
45-
Success bool
46-
Result map[string]interface{}
47-
Reports map[string]*function.ExecutionReport
48-
ResponseTime float64 // time waited by the user to get the output of the entire workflow (in seconds)
45+
Success bool
46+
Result map[string]interface{}
47+
Reports map[string]*function.ExecutionReport
48+
ResponseTime float64 // time waited by the user to get the output of the entire workflow (in seconds)
49+
SchedulingTime float64
4950
}
5051

5152
type AsyncInvocationResponse struct {

internal/workflow/workflow.go

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -441,7 +441,10 @@ func (wflow *Workflow) Invoke(r *Request) error {
441441
}
442442

443443
for len(progress.ReadyToExecute) > 0 {
444+
t0 := time.Now()
444445
decision, err := offloadingPolicy.Evaluate(r, progress)
446+
policyTime := time.Since(t0).Seconds()
447+
r.ExecReport.SchedulingTime += policyTime
445448

446449
if err != nil {
447450
return fmt.Errorf("an error occurred in policy evaluation: %v", err)
@@ -616,6 +619,8 @@ func offload(r *Request, policyDecision *OffloadingDecision) error {
616619
r.ExecReport.Reports[k] = v
617620
}
618621

622+
r.ExecReport.SchedulingTime += response.SchedulingTime
623+
619624
if response.Result == nil {
620625
// workflow execution is not complete after offloading
621626
r.ExecReport.Result = nil

0 commit comments

Comments
 (0)