Skip to content

Commit 5c5ae30

Browse files
committed
Fix ILP resolution for partial workflows
1 parent 6e048a1 commit 5c5ae30

1 file changed

Lines changed: 29 additions & 29 deletions

File tree

internal/workflow/ilp_offloading_policy.go

Lines changed: 29 additions & 29 deletions
Original file line numberDiff line numberDiff line change
@@ -98,7 +98,7 @@ func computeOutputSize(workflow *Workflow, inputParamsSize float64) map[string]f
9898
outputSize := make(map[string]float64)
9999
avgOutputSize := metrics.GetMetrics().AvgOutputSize
100100

101-
var currentInputSize float64 // TODO: propagating input size to output only works for sequential connections
101+
var currentInputSize = inputParamsSize // TODO: propagating input size to output only works for sequential connections
102102

103103
for len(toVisit) > 0 {
104104
task := toVisit[0]
@@ -159,15 +159,12 @@ func (policy *IlpOffloadingPolicy) Evaluate(r *Request, p *Progress) (Offloading
159159
}
160160
}
161161

162-
//if completed > 0 {
163-
// placement, found := getCachedSolution(r)
164-
// if !found {
165-
// log.Println("Unable to find solution for ", r.Id)
166-
// // TODO: solve the ILP
167-
// return OffloadingDecision{Offload: false}, nil
168-
// }
169-
// return computeDecisionFromPlacement(*placement, p, r), nil
170-
//}
162+
if completed > 0 {
163+
placement, found := getCachedSolution(r) // TODO: trigger ILP resolution?
164+
if found {
165+
return computeDecisionFromPlacement(*placement, p, r), nil
166+
}
167+
}
171168

172169
// Prepare parameters for ILP
173170
params := initParams()
@@ -214,9 +211,31 @@ func (policy *IlpOffloadingPolicy) Evaluate(r *Request, p *Progress) (Offloading
214211
}
215212
}
216213

214+
// Distances to Cloud and Data Store
215+
distanceToCloud := registration.GetRemoteOffloadingTargetLatencyMs() / 1000.0
216+
for _, n := range params.EdgeNodes {
217+
params.NodeLatency[tupleKey(n, CLOUD)] = distanceToCloud
218+
params.NodeLatency[tupleKey(CLOUD, n)] = distanceToCloud
219+
220+
// TODO: we assume distance to Cloud == distance to DS (for all Edge nodes)
221+
params.DSLatency[n] = distanceToCloud
222+
}
223+
params.NodeLatency[tupleKey(CLOUD, CLOUD)] = 0.0
224+
params.DSLatency[CLOUD] = 0.001
225+
226+
// Bandwidth (we assume identical)
227+
dsBandwidth := config.GetFloat(config.OFFLOADING_POLICY_NODE_TO_DATA_STORE_BANDWIDTH, 100.0)
228+
for _, n := range params.EdgeNodes {
229+
params.DSBandwidth[n] = dsBandwidth
230+
}
231+
params.DSBandwidth[CLOUD] = config.GetFloat(config.OFFLOADING_POLICY_CLOUD_TO_DATA_STORE_BANDWIDTH, dsBandwidth*10)
232+
217233
// Execution Times
218234
retrievedMetrics := metrics.GetMetrics()
219235
for tid, task := range r.W.Tasks {
236+
if p.Status[tid] == Executed || p.Status[tid] == Skipped {
237+
continue
238+
}
220239
ft, ok := task.(*FunctionTask)
221240
if ok {
222241
f, _ := function.GetFunction(ft.Func)
@@ -249,25 +268,6 @@ func (policy *IlpOffloadingPolicy) Evaluate(r *Request, p *Progress) (Offloading
249268
}
250269
}
251270

252-
// Distances to Cloud and Data Store
253-
distanceToCloud := registration.GetRemoteOffloadingTargetLatencyMs() / 1000.0
254-
for _, n := range params.EdgeNodes {
255-
params.NodeLatency[tupleKey(n, CLOUD)] = distanceToCloud
256-
params.NodeLatency[tupleKey(CLOUD, n)] = distanceToCloud
257-
258-
// TODO: we assume distance to Cloud == distance to DS (for all Edge nodes)
259-
params.DSLatency[n] = distanceToCloud
260-
}
261-
params.NodeLatency[tupleKey(CLOUD, CLOUD)] = 0.0
262-
params.DSLatency[CLOUD] = 0.001
263-
264-
// Bandwidth (we assume identical)
265-
dsBandwidth := config.GetFloat(config.OFFLOADING_POLICY_NODE_TO_DATA_STORE_BANDWIDTH, 100.0)
266-
for _, n := range params.EdgeNodes {
267-
params.DSBandwidth[n] = dsBandwidth
268-
}
269-
params.DSBandwidth[CLOUD] = config.GetFloat(config.OFFLOADING_POLICY_CLOUD_TO_DATA_STORE_BANDWIDTH, dsBandwidth*10)
270-
271271
// Workflow
272272
params.InputSize = float64(r.ParamsSize)
273273
params.OutputSize = computeOutputSize(r.W, params.InputSize)

0 commit comments

Comments
 (0)