Skip to content

Commit 17830e9

Browse files
committed
WIP: ILP-based offloading policy
1 parent 4ef046c commit 17830e9

5 files changed

Lines changed: 79 additions & 44 deletions

File tree

internal/api/workflow.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -170,7 +170,7 @@ func ResumeWorkflow(e echo.Context) error {
170170
req.ExecReport.Reports = map[string]*function.ExecutionReport{}
171171

172172
if clientReq.Plan.ToExecute != nil {
173-
req.Plan = &workflow.ExecutionPlan{ToExecute: clientReq.Plan.ToExecute}
173+
req.Plan = &workflow.OffloadingPlan{ToExecute: clientReq.Plan.ToExecute}
174174
} else {
175175
req.Plan = nil
176176
}

internal/workflow/ilp_offloading_policy.go

Lines changed: 71 additions & 36 deletions
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,8 @@ import (
1717
type IlpOffloadingPolicy struct{}
1818

1919
type ilpParams struct {
20-
N []string `json:"N"` // Set of nodes
20+
CloudNodes []string `json:"cloud_nodes"` // Set of Cloud nodes
21+
EdgeNodes []string `json:"edge_nodes"` // Set of Edge nodes
2122
NodeMemory map[string]float64 `json:"node_memory"` // Memory per node
2223
DSLatency map[string]float64 `json:"ds_latency"` // Latency per node
2324
DSBandwidth map[string]float64 `json:"ds_bandwidth"` // Bandwidth per node
@@ -37,6 +38,8 @@ type ilpParams struct {
3738
ExecTime map[string]float64 `json:"exectime"` // map[json.dumps((task, node))] = time
3839
}
3940

41+
type taskPlacement map[TaskId]string // TODO: might be useful to have a Node type with utility functions to retrieve node info...
42+
4043
func initParams() ilpParams {
4144
return ilpParams{
4245
Adj: make(map[string][]string),
@@ -137,19 +140,22 @@ func (policy *IlpOffloadingPolicy) Evaluate(r *Request, p *Progress) (Offloading
137140
return OffloadingDecision{Offload: false}, nil
138141
}
139142

140-
// TODO: prepare parameters for ILP
143+
// Prepare parameters for ILP
141144
params := initParams()
142-
params.N = []string{LOCAL, CLOUD}
145+
params.CloudNodes = []string{CLOUD}
146+
params.EdgeNodes = []string{LOCAL}
143147
params.Deadline = r.QoS.MaxRespT
144148
params.HandlingNode = LOCAL
145149
params.NodeMemory[LOCAL] = (float64)(node.Resources.AvailableMemMB)
146150

151+
// TODO: introduce task and node labels
152+
147153
// Add available Edge peers
148154
nearbyServers := registration.Reg.NearbyServersMap
149155
if nearbyServers != nil {
150156
for k, v := range nearbyServers {
151157
if v.AvailableMemMB > 0 && v.AvailableCPUs > 0 {
152-
params.N = append(params.N, k)
158+
params.EdgeNodes = append(params.EdgeNodes, k)
153159
params.NodeMemory[k] = float64(v.AvailableMemMB)
154160
}
155161
}
@@ -158,15 +164,15 @@ func (policy *IlpOffloadingPolicy) Evaluate(r *Request, p *Progress) (Offloading
158164
// Compute distances
159165
for key1, v1 := range nearbyServers {
160166
var distance float64
161-
if !slices.Contains(params.N, key1) {
167+
if !slices.Contains(params.EdgeNodes, key1) {
162168
continue
163169
}
164170
distance = registration.Reg.Client.DistanceTo(&v1.Coordinates).Seconds() / 2
165171
params.NodeLatency[tupleKey(LOCAL, key1)] = distance
166172
params.NodeLatency[tupleKey(key1, LOCAL)] = distance
167173

168174
for key2, v2 := range nearbyServers {
169-
if !slices.Contains(params.N, key2) {
175+
if !slices.Contains(params.EdgeNodes, key2) {
170176
continue
171177
}
172178
if key1 == key2 {
@@ -179,46 +185,70 @@ func (policy *IlpOffloadingPolicy) Evaluate(r *Request, p *Progress) (Offloading
179185
}
180186
}
181187

182-
// Distances to Cloud and Data Store
183-
// TODO: we assume distance to Cloud == distance to DS (for all Edge nodes)
184-
distanceToCloud := 0.100 // TODO
185-
for _, n := range params.N {
186-
if n == CLOUD {
187-
params.NodeLatency[tupleKey(n, n)] = 0.0
188-
params.DSLatency[n] = 0.0
188+
// Execution Times
189+
retrievedTimes := metrics.GetMetrics().AvgExecutionTimeAllNodes
190+
for tid, task := range r.W.Tasks {
191+
ft, ok := task.(*FunctionTask)
192+
if ok {
193+
f, _ := function.GetFunction(ft.Func)
194+
for _, n := range params.EdgeNodes {
195+
nodeTimes, found := retrievedTimes[n]
196+
if !found {
197+
params.ExecTime[tupleKey(string(tid), n)] = 1 // TODO: just guessing
198+
continue
199+
}
200+
t, found := nodeTimes[f.Name]
201+
if found {
202+
params.ExecTime[tupleKey(string(tid), n)] = t
203+
} else {
204+
params.ExecTime[tupleKey(string(tid), n)] = 1 // TODO: just guessing
205+
}
206+
}
207+
params.ExecTime[tupleKey(string(tid), CLOUD)] = 0.1 // TODO: how to retrieve cloud metrics? cloud nodes should use unique identifier??
189208
} else {
190-
params.NodeLatency[tupleKey(n, CLOUD)] = distanceToCloud
191-
params.NodeLatency[tupleKey(CLOUD, n)] = distanceToCloud
192-
params.DSLatency[n] = distanceToCloud
209+
for _, n := range params.EdgeNodes {
210+
params.ExecTime[tupleKey(string(tid), n)] = 0.0001
211+
}
212+
params.ExecTime[tupleKey(string(tid), CLOUD)] = 0.0001
193213
}
194214
}
195215

216+
// Distances to Cloud and Data Store
217+
// TODO: we assume distance to Cloud == distance to DS (for all Edge nodes)
218+
distanceToCloud := 0.100 // TODO: measure Cloud latency (ping? or, retrieve from offloadingLatency )
219+
for _, n := range params.EdgeNodes {
220+
params.NodeLatency[tupleKey(n, CLOUD)] = distanceToCloud
221+
params.NodeLatency[tupleKey(CLOUD, n)] = distanceToCloud
222+
params.DSLatency[n] = distanceToCloud
223+
}
224+
params.NodeLatency[tupleKey(CLOUD, CLOUD)] = 0.0
225+
params.DSLatency[CLOUD] = 0.001
226+
196227
// Bandwidth (we assume identical)
197-
dsBandwidth := 100.0 // TODO
198-
for _, n := range params.N {
199-
if n == CLOUD {
200-
params.DSBandwidth[n] = dsBandwidth * 10
201-
} else {
202-
params.DSBandwidth[n] = dsBandwidth
203-
}
228+
dsBandwidth := 100.0 // TODO: read from configuration?
229+
for _, n := range params.EdgeNodes {
230+
params.DSBandwidth[n] = dsBandwidth
204231
}
232+
params.DSBandwidth[CLOUD] = dsBandwidth * 10
205233

206234
// Workflow
207-
// TODO: we cannot marshal every time just to know the size!!
235+
// TODO: we cannot marshal every time just to know the size!! Measure it upon first deserialization
208236
data, _ := json.Marshal(r.Params)
209237
params.InputSize = float64(len(data))
210238
params.OutputSize = computeOutputSize(r.W, params.InputSize)
211239

212-
params.T = make([]string, len(r.W.Tasks))
240+
params.T = make([]string, 0)
213241
for tid, task := range r.W.Tasks {
214242
params.T = append(params.T, string(tid))
215-
params.Adj[string(tid)] = make([]string, 1)
243+
params.Adj[string(tid)] = make([]string, 0)
216244

217245
switch typedTask := task.(type) {
218246
case ConditionalTask:
219247
nextTasks := typedTask.GetAlternatives()
220248
for _, nextTask := range nextTasks {
221-
probability := 1.0 / float64(len(nextTasks)) // TODO
249+
// TODO: estimate branch probability:
250+
// TODO: either directly for every choice task
251+
probability := 1.0 / float64(len(nextTasks))
222252
entry := tupleKey(string(nextTask), strconv.FormatFloat(probability, 'f', 2, 32))
223253
params.Adj[string(tid)] = append(params.Adj[string(tid)], entry)
224254
}
@@ -232,6 +262,8 @@ func (policy *IlpOffloadingPolicy) Evaluate(r *Request, p *Progress) (Offloading
232262
if ok {
233263
f, _ := function.GetFunction(ft.Func)
234264
params.TaskMemory[string(tid)] = float64(f.MemoryMB)
265+
} else {
266+
params.TaskMemory[string(tid)] = float64(10)
235267
}
236268

237269
default:
@@ -240,42 +272,45 @@ func (policy *IlpOffloadingPolicy) Evaluate(r *Request, p *Progress) (Offloading
240272

241273
}
242274

243-
// TODO: resolve ILP
244275
// Serialize to JSON
245276
jsonData, err := json.Marshal(params)
246277
if err != nil {
247278
panic(err)
248279
}
249280

250281
// Create POST request
251-
url := "http://localhost:8080/"
282+
url := "http://localhost:8080/" // TODO: configurable
252283
req, err := http.NewRequest("POST", url, bytes.NewBuffer(jsonData))
253284
if err != nil {
254285
panic(err)
255286
}
256287
req.Header.Set("Content-Type", "application/json")
257288

258289
// Send the request
259-
client := &http.Client{Timeout: 10 * time.Second}
290+
client := &http.Client{Timeout: 10 * time.Second} // TODO: check if http client is cached
260291
resp, err := client.Do(req)
261292
if err != nil {
262-
panic(err)
293+
fmt.Println(err)
294+
return OffloadingDecision{Offload: false}, err
263295
}
264296
defer resp.Body.Close()
265297

266298
// Read and print response
267-
var result map[string]interface{}
268-
err = json.NewDecoder(resp.Body).Decode(&result)
299+
var placement taskPlacement
300+
err = json.NewDecoder(resp.Body).Decode(&placement)
269301
if err != nil {
270-
panic(err)
302+
fmt.Println(err)
303+
return OffloadingDecision{Offload: false}, err
271304
}
272305

273-
fmt.Printf("Server response: %+v\n", result)
306+
for k, v := range placement {
307+
fmt.Printf("Task: %s -> %s \n", k, v)
308+
}
274309

275310
// TODO: parse results and make a decision
276311

277312
//if completed >= 2 && completed < 4 {
278-
// plan := ExecutionPlan{ToExecute: p.ReadyToExecute} // TODO
313+
// plan := OffloadingPlan{ToExecute: p.ReadyToExecute} // TODO
279314
// return OffloadingDecision{true, "127.0.0.1:1323", plan}, nil
280315
//}
281316

internal/workflow/offloading_policy.go

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -3,14 +3,14 @@ package workflow
33
type OffloadingDecision struct {
44
Offload bool `json:"offload"`
55
RemoteHost string `json:"remote_host"`
6-
ExecutionPlan
6+
OffloadingPlan
77
}
88

99
type OffloadingPolicy interface {
1010
Evaluate(r *Request, p *Progress) (OffloadingDecision, error)
1111
}
1212

13-
type ExecutionPlan struct {
13+
type OffloadingPlan struct {
1414
ToExecute []TaskId
1515
}
1616

@@ -38,7 +38,7 @@ func (policy *SimpleOffloadingPolicy) Evaluate(r *Request, p *Progress) (Offload
3838
}
3939

4040
if completed >= 2 && completed < 4 {
41-
plan := ExecutionPlan{ToExecute: p.ReadyToExecute} // TODO
41+
plan := OffloadingPlan{ToExecute: p.ReadyToExecute} // TODO
4242
return OffloadingDecision{true, "127.0.0.1:1323", plan}, nil
4343
}
4444

internal/workflow/request.go

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -19,8 +19,8 @@ type Request struct {
1919
QoS function.RequestQoS // every function should have its QoS
2020
CanDoOffloading bool // every function inherits this flag
2121
Async bool
22-
Resuming bool // indicating whether the function is resuming from a previous (partial) execution
23-
Plan *ExecutionPlan // optional; execution plan
22+
Resuming bool // indicating whether the function is resuming from a previous (partial) execution
23+
Plan *OffloadingPlan // optional; execution plan
2424
}
2525

2626
func NewRequest(reqId string, workflow *Workflow, params map[string]interface{}) *Request {
@@ -53,5 +53,5 @@ type AsyncInvocationResponse struct {
5353
type WorkflowInvocationResumeRequest struct {
5454
ReqId string
5555
client.WorkflowInvocationRequest
56-
Plan ExecutionPlan
56+
Plan OffloadingPlan
5757
}

internal/workflow/workflow.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -529,7 +529,7 @@ func offload(r *Request, policyDecision *OffloadingDecision) error {
529529
CanDoOffloading: false,
530530
Async: false, // we force a synchronous request
531531
},
532-
Plan: policyDecision.ExecutionPlan,
532+
Plan: policyDecision.OffloadingPlan,
533533
}
534534
invocationBody, err := json.Marshal(request)
535535
if err != nil {

0 commit comments

Comments
 (0)