Skip to content

Commit 4f51cf1

Browse files
committed
Fixes ExecReport initialization
1 parent 772a4e2 commit 4f51cf1

4 files changed

Lines changed: 24 additions & 5 deletions

File tree

internal/api/workflow.go

Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,7 @@ import (
77
"fmt"
88
"io"
99
"log"
10+
"maps"
1011
"net/http"
1112
"time"
1213

@@ -172,7 +173,7 @@ func ResumeWorkflow(e echo.Context) error {
172173
req.Resuming = true
173174
req.Id = clientReq.ReqId
174175
req.ExecReport.ResponseTime = 0.0
175-
req.ExecReport.Result = map[string]interface{}{}
176+
req.ExecReport.Result = nil // NOTE: this should be nil until workflow completion
176177
req.ExecReport.Reports = map[string]*function.ExecutionReport{}
177178

178179
if clientReq.Plan.ToExecute != nil {
@@ -217,7 +218,7 @@ func InvokeWorkflow(e echo.Context) error {
217218
req.Resuming = false
218219
req.Id = fmt.Sprintf("%v-%s%d", wflow.Name, node.LocalNode.String()[len(node.LocalNode.String())-5:], req.Arrival.Nanosecond())
219220
req.ExecReport.ResponseTime = 0.0
220-
req.ExecReport.Result = map[string]interface{}{}
221+
req.ExecReport.Result = nil // NOTE: this should be nil until workflow completion
221222
req.ExecReport.Reports = map[string]*function.ExecutionReport{}
222223

223224
return handleWorkflowInvocation(e, req)
@@ -263,6 +264,9 @@ func handleWorkflowInvocation(e echo.Context, req *workflow.Request) error {
263264
} else {
264265
req.ExecReport.ResponseTime = time.Now().Sub(req.Arrival).Seconds()
265266

267+
// TODO
268+
log.Printf("Invocation succeeded. Returning %d reports for keys: %v", len(req.ExecReport.Reports), maps.Keys(req.ExecReport.Reports))
269+
266270
return e.JSON(http.StatusOK, workflow.InvocationResponse{
267271
Success: true,
268272
Result: req.ExecReport.Result,

internal/workflow/ilp_offloading_policy.go

Lines changed: 8 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -85,7 +85,14 @@ func (policy *IlpOffloadingPolicy) Evaluate(r *Request, p *Progress) (Offloading
8585
cacheSolution(r, &placement, defaultTTL)
8686

8787
for k, v := range placement {
88-
fmt.Printf("Task: %s -> %s \n", k, v)
88+
taskType := ""
89+
task, found := r.W.Tasks[k]
90+
if !found {
91+
taskType = "?"
92+
} else {
93+
taskType = fmt.Sprintf("%v", task.GetType())
94+
}
95+
fmt.Printf("[Req %s] Task: %s (%v) -> %s \n", r.Id, k, taskType, v)
8996
}
9097

9198
// parse results and make a decision

internal/workflow/remote_offloading_policy.go

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,7 @@ import (
88
"net/http"
99
"strconv"
1010
"strings"
11+
"sync"
1112
"time"
1213

1314
"github.com/serverledge-faas/serverledge/internal/config"
@@ -72,9 +73,13 @@ type cachedPlacement struct {
7273
ttl int
7374
}
7475

76+
var placementCacheMutex sync.Mutex = sync.Mutex{}
7577
var placementCache map[string]*cachedPlacement
7678

7779
func getCachedSolution(r *Request) (*taskPlacement, bool) {
80+
placementCacheMutex.Lock()
81+
defer placementCacheMutex.Unlock()
82+
7883
if placementCache == nil {
7984
placementCache = make(map[string]*cachedPlacement)
8085
return nil, false
@@ -96,6 +101,9 @@ func getCachedSolution(r *Request) (*taskPlacement, bool) {
96101
}
97102

98103
func cacheSolution(r *Request, sol *taskPlacement, ttl int) {
104+
placementCacheMutex.Lock()
105+
defer placementCacheMutex.Unlock()
106+
99107
if placementCache == nil {
100108
placementCache = make(map[string]*cachedPlacement)
101109
}

internal/workflow/workflow.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -26,7 +26,7 @@ import (
2626
"github.com/serverledge-faas/serverledge/internal/types"
2727
)
2828

29-
var offloadingPolicy OffloadingPolicy = &IlpOffloadingPolicy{}
29+
var offloadingPolicy OffloadingPolicy = nil
3030

3131
func CreateOffloadingPolicy() {
3232
policyConf := config.GetString(config.WORKFLOW_OFFLOADING_POLICY, "disable")
@@ -475,6 +475,7 @@ func (wflow *Workflow) Invoke(r *Request) error {
475475
if err != nil {
476476
return fmt.Errorf("Could not retrieve progress after offloading: %v", err)
477477
}
478+
478479
log.Printf("Ready to execute after offloading: %v", progress.ReadyToExecute)
479480
} else {
480481
// pick next executable task
@@ -506,7 +507,6 @@ func (wflow *Workflow) Invoke(r *Request) error {
506507

507508
input, found = dataMap[previousTask]
508509
if !found {
509-
log.Printf("Input not found in dataMap for previousTask %s", previousTask)
510510
input, err = RetrievePartialData(requestId, previousTask)
511511
if err != nil {
512512
return fmt.Errorf("could not retrieve partial data: %v", err)

0 commit comments

Comments
 (0)