Skip to content

Commit b935ea7

Browse files
grussorussomatnar
andcommitted
Parsing ILP solution into an OffloadingDecision
Co-authored-by: Matteo Nardelli <matnar@gmail.com>
1 parent 1837711 commit b935ea7

3 files changed

Lines changed: 182 additions & 109 deletions

File tree

internal/workflow/ilp_offloading_policy.go

Lines changed: 69 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -4,14 +4,14 @@ import (
44
"bytes"
55
"encoding/json"
66
"fmt"
7+
"log"
78
"net/http"
89
"strconv"
910
"time"
1011

1112
"github.com/serverledge-faas/serverledge/internal/config"
1213
"github.com/serverledge-faas/serverledge/internal/function"
1314
"github.com/serverledge-faas/serverledge/internal/metrics"
14-
"github.com/serverledge-faas/serverledge/internal/node"
1515
"github.com/serverledge-faas/serverledge/internal/registration"
1616
"golang.org/x/exp/slices"
1717
)
@@ -62,6 +62,26 @@ const LOCAL = "LOCAL"
6262

6363
var httpClient = &http.Client{Timeout: 10 * time.Second}
6464

65+
var cachedSolutions map[string]taskPlacement
66+
67+
func getCachedSolution(r *Request) (*taskPlacement, bool) {
68+
if cachedSolutions == nil {
69+
cachedSolutions = make(map[string]taskPlacement)
70+
return nil, false
71+
}
72+
73+
sol, ok := cachedSolutions[r.Id]
74+
return &sol, ok
75+
}
76+
77+
func cacheSolution(r *Request, sol *taskPlacement) {
78+
if cachedSolutions == nil {
79+
cachedSolutions = make(map[string]taskPlacement)
80+
}
81+
82+
cachedSolutions[r.Id] = *sol
83+
}
84+
6585
func tupleKey(s1, s2 string) string {
6686
keyBytes, _ := json.Marshal([]string{s1, s2})
6787
return string(keyBytes)
@@ -140,8 +160,13 @@ func (policy *IlpOffloadingPolicy) Evaluate(r *Request, p *Progress) (Offloading
140160
}
141161

142162
if completed > 0 {
143-
// TODO: we do not handle scheduling during workflow execution at the moment
144-
return OffloadingDecision{Offload: false}, nil
163+
placement, found := getCachedSolution(r)
164+
if !found {
165+
log.Println("Unable to find solution for ", r.Id)
166+
// TODO: solve the ILP
167+
return OffloadingDecision{Offload: false}, nil
168+
}
169+
return computeDecisionFromPlacement(*placement, p, r), nil
145170
}
146171

147172
// Prepare parameters for ILP
@@ -150,7 +175,7 @@ func (policy *IlpOffloadingPolicy) Evaluate(r *Request, p *Progress) (Offloading
150175
params.EdgeNodes = []string{LOCAL}
151176
params.Deadline = r.QoS.MaxRespT
152177
params.HandlingNode = LOCAL
153-
params.NodeMemory[LOCAL] = (float64)(node.Resources.AvailableMemMB)
178+
params.NodeMemory[LOCAL] = 10 // (float64)(node.Resources.AvailableMemMB)
154179

155180
// TODO: introduce task and node labels
156181

@@ -328,12 +353,46 @@ func (policy *IlpOffloadingPolicy) Evaluate(r *Request, p *Progress) (Offloading
328353
fmt.Printf("Task: %s -> %s \n", k, v)
329354
}
330355

331-
// TODO: parse results and make a decision
356+
// parse results and make a decision
357+
return computeDecisionFromPlacement(placement, p, r), nil
358+
}
332359

333-
//if completed >= 2 && completed < 4 {
334-
// plan := OffloadingPlan{ToExecute: p.ReadyToExecute} // TODO
335-
// return OffloadingDecision{true, "127.0.0.1:1323", plan}, nil
336-
//}
360+
func computeDecisionFromPlacement(placement taskPlacement, p *Progress, r *Request) OffloadingDecision {
361+
362+
var localExecution = false
363+
var remoteNode string
364+
for _, t := range p.ReadyToExecute {
365+
assignedNode := placement[t]
366+
if assignedNode == LOCAL {
367+
localExecution = true
368+
break
369+
} else {
370+
remoteNode = assignedNode
371+
}
372+
}
373+
if localExecution {
374+
log.Println("Continuing with local execution")
375+
cacheSolution(r, &placement)
376+
return OffloadingDecision{Offload: false}
377+
}
378+
379+
// Retrieve all tasks assigned to n
380+
toExecute := make([]TaskId, 0)
381+
for t, assignedNode := range placement {
382+
if assignedNode == remoteNode {
383+
toExecute = append(toExecute, t)
384+
}
385+
}
386+
387+
plan := OffloadingPlan{ToExecute: toExecute}
388+
389+
// TODO: retrieve node URL from solution
390+
var remoteNodeReg *registration.NodeRegistration
391+
if remoteNode == CLOUD {
392+
remoteNodeReg = registration.GetRemoteOffloadingTarget()
393+
}
337394

338-
return OffloadingDecision{Offload: false}, nil
395+
decision := OffloadingDecision{true, remoteNodeReg.APIUrl(), plan}
396+
log.Printf("Decision: %v\n", decision)
397+
return decision
339398
}

internal/workflow/task_data.go

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -68,7 +68,8 @@ func (td *TaskData) Save(reqId ReqId, task TaskId) error {
6868
}
6969
// saves the json object into etcd
7070
key := getTaskDataEtcdKey(reqId, task)
71-
log.Printf("Saving PD on etcd with key: %s\n", key)
71+
//log.Printf("Saving PD on etcd : %v\n", td.Data)
72+
log.Printf("Saving PD on etcd with key: %s and payload: %v\n", key, string(payload))
7273

7374
_, err = cli.Put(ctx, key, string(payload))
7475
if err != nil {
@@ -85,6 +86,7 @@ func RetrievePartialData(reqId ReqId, task TaskId) (*TaskData, error) {
8586
ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
8687
defer cancel()
8788
key := getTaskDataEtcdKey(reqId, task)
89+
log.Printf("Retrieving partial data with key: %s\n", key)
8890
getResponse, err := cli.Get(ctx, key)
8991
if err != nil {
9092
return nil, fmt.Errorf("failed to retrieve partialDatas for requestId: %s", key)

0 commit comments

Comments
 (0)