Skip to content

Commit d8bc9fb

Browse files
committed
ILP policy: handling Cloud and Edge nodes
1 parent 8e3ce02 commit d8bc9fb

1 file changed

Lines changed: 41 additions & 22 deletions

File tree

internal/workflow/ilp_offloading_policy.go

Lines changed: 41 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -41,7 +41,7 @@ type ilpParams struct {
4141
ExecTime map[string]float64 `json:"exectime"` // map[json.dumps((task, node))] = time
4242
}
4343

44-
type taskPlacement map[TaskId]string // TODO: might be useful to have a Node type with utility functions to retrieve node info...
44+
type taskPlacement map[TaskId]string
4545

4646
func initParams() ilpParams {
4747
return ilpParams{
@@ -109,6 +109,7 @@ func tupleKey(s1, s2 string) string {
109109
return string(keyBytes)
110110
}
111111

112+
// TODO: Update this function as soon as tasks that split/merge their inputs are implemented (e.g., Map)
112113
func computeOutputSize(workflow *Workflow, inputParamsSize float64) map[string]float64 {
113114

114115
task := workflow.Start
@@ -120,7 +121,7 @@ func computeOutputSize(workflow *Workflow, inputParamsSize float64) map[string]f
120121
outputSize := make(map[string]float64)
121122
avgOutputSize := metrics.GetMetrics().AvgOutputSize
122123

123-
var currentInputSize = inputParamsSize // TODO: propagating input size to output only works for sequential connections
124+
var currentInputSize = inputParamsSize
124125

125126
for len(toVisit) > 0 {
126127
task := toVisit[0]
@@ -191,9 +192,12 @@ func (policy *IlpOffloadingPolicy) Evaluate(r *Request, p *Progress) (Offloading
191192

192193
// Prepare parameters for ILP
193194
params := initParams()
194-
params.CloudNodes = []string{CLOUD}
195+
params.CloudNodes = make([]string, 0)
196+
if registration.GetRemoteOffloadingTarget() != nil {
197+
params.CloudNodes = append(params.CloudNodes, CLOUD)
198+
}
195199
params.EdgeNodes = []string{LOCAL}
196-
params.Deadline = r.QoS.MaxRespT - time.Now().Sub(r.Arrival).Seconds() // TODO: what if deadline is <0?
200+
params.Deadline = r.QoS.MaxRespT - time.Now().Sub(r.Arrival).Seconds()
197201
params.HandlingNode = LOCAL
198202
params.NodeMemory[LOCAL] = (float64)(node.Resources.AvailableMemMB)
199203

@@ -234,24 +238,34 @@ func (policy *IlpOffloadingPolicy) Evaluate(r *Request, p *Progress) (Offloading
234238
}
235239
}
236240

237-
// Distances to Cloud and Data Store
238-
distanceToCloud := registration.GetRemoteOffloadingTargetLatencyMs() / 1000.0
239-
for _, n := range params.EdgeNodes {
240-
params.NodeLatency[tupleKey(n, CLOUD)] = distanceToCloud
241-
params.NodeLatency[tupleKey(CLOUD, n)] = distanceToCloud
241+
if len(params.CloudNodes) > 0 {
242+
// Distances to Cloud and Data Store
243+
distanceToCloud := registration.GetRemoteOffloadingTargetLatencyMs() / 1000.0
244+
for _, n := range params.EdgeNodes {
245+
params.NodeLatency[tupleKey(n, CLOUD)] = distanceToCloud
246+
params.NodeLatency[tupleKey(CLOUD, n)] = distanceToCloud
242247

243-
// TODO: we assume distance to Cloud == distance to DS (for all Edge nodes)
244-
params.DSLatency[n] = distanceToCloud
248+
// TODO: we assume distance to Cloud == distance to DS (for all Edge nodes)
249+
params.DSLatency[n] = distanceToCloud
250+
}
251+
params.NodeLatency[tupleKey(CLOUD, CLOUD)] = 0.0
252+
params.DSLatency[CLOUD] = 0.001
253+
} else {
254+
distanceToDS := 0.100 // TODO: how to compute?
255+
for _, n := range params.EdgeNodes {
256+
params.DSLatency[n] = distanceToDS
257+
}
245258
}
246-
params.NodeLatency[tupleKey(CLOUD, CLOUD)] = 0.0
247-
params.DSLatency[CLOUD] = 0.001
248259

249260
// Bandwidth (we assume identical)
250261
dsBandwidth := config.GetFloat(config.OFFLOADING_POLICY_NODE_TO_DATA_STORE_BANDWIDTH, 100.0)
251262
for _, n := range params.EdgeNodes {
252263
params.DSBandwidth[n] = dsBandwidth
253264
}
254-
params.DSBandwidth[CLOUD] = config.GetFloat(config.OFFLOADING_POLICY_CLOUD_TO_DATA_STORE_BANDWIDTH, dsBandwidth*10)
265+
266+
if len(params.CloudNodes) > 0 {
267+
params.DSBandwidth[CLOUD] = config.GetFloat(config.OFFLOADING_POLICY_CLOUD_TO_DATA_STORE_BANDWIDTH, dsBandwidth*10)
268+
}
255269

256270
// Execution Times
257271
retrievedMetrics := metrics.GetMetrics()
@@ -276,18 +290,22 @@ func (policy *IlpOffloadingPolicy) Evaluate(r *Request, p *Progress) (Offloading
276290
}
277291
}
278292

279-
// Cloud
280-
t, found := retrievedMetrics.AvgRemoteExecutionTime[f.Name]
281-
if found {
282-
params.ExecTime[tupleKey(string(tid), CLOUD)] = t
283-
} else {
284-
params.ExecTime[tupleKey(string(tid), CLOUD)] = 1 // no data: just guessing
293+
if len(params.CloudNodes) > 0 {
294+
// Cloud
295+
t, found := retrievedMetrics.AvgRemoteExecutionTime[f.Name]
296+
if found {
297+
params.ExecTime[tupleKey(string(tid), CLOUD)] = t
298+
} else {
299+
params.ExecTime[tupleKey(string(tid), CLOUD)] = 1 // no data: just guessing
300+
}
285301
}
286302
} else {
287303
for _, n := range params.EdgeNodes {
288304
params.ExecTime[tupleKey(string(tid), n)] = 0.0001
289305
}
290-
params.ExecTime[tupleKey(string(tid), CLOUD)] = 0.0001
306+
if len(params.CloudNodes) > 0 {
307+
params.ExecTime[tupleKey(string(tid), CLOUD)] = 0.0001
308+
}
291309
}
292310
}
293311

@@ -415,10 +433,11 @@ func computeDecisionFromPlacement(placement taskPlacement, p *Progress, r *Reque
415433

416434
plan := OffloadingPlan{ToExecute: toExecute}
417435

418-
// TODO: retrieve node URL from solution
419436
var remoteNodeReg *registration.NodeRegistration
420437
if remoteNode == CLOUD {
421438
remoteNodeReg = registration.GetRemoteOffloadingTarget()
439+
} else {
440+
remoteNodeReg = registration.GetPeerFromKey(remoteNode)
422441
}
423442

424443
decision := OffloadingDecision{true, remoteNodeReg.APIUrl(), plan}

0 commit comments

Comments
 (0)