Skip to content

Commit 1acaca3

Browse files
committed
Adds InitTime metric
1 parent bbe56b6 commit 1acaca3

4 files changed

Lines changed: 106 additions & 25 deletions

File tree

internal/metrics/metrics.go

Lines changed: 20 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -20,10 +20,11 @@ var ScrapingHandler http.Handler = nil
2020
var durationBuckets = []float64{0.002, 0.005, 0.010, 0.02, 0.03, 0.05, 0.1, 0.15, 0.3, 0.6, 1.0}
2121

2222
const (
23-
COMPLETIONS = "completed_total"
24-
EXECUTION_TIME = "execution_time"
25-
OUTPUT_SIZE = "output_size"
26-
BRANCH_COUNT = "branch_count"
23+
COMPLETIONS = "completed_total"
24+
EXECUTION_TIME = "execution_time"
25+
INITIALIZATION_TIME = "init_time"
26+
OUTPUT_SIZE = "output_size"
27+
BRANCH_COUNT = "branch_count"
2728
)
2829

2930
var (
@@ -36,6 +37,10 @@ var (
3637
Help: "Function duration",
3738
Buckets: durationBuckets,
3839
}, []string{"node", "function"})
40+
metricInitializationTime = promauto.NewHistogramVec(prometheus.HistogramOpts{
41+
Name: INITIALIZATION_TIME,
42+
Help: "Function initialization time (cold start duration)",
43+
}, []string{"node", "function"})
3944
metricOutputSize = promauto.NewHistogramVec(prometheus.HistogramOpts{
4045
Name: OUTPUT_SIZE,
4146
Help: "Function output size",
@@ -47,10 +52,11 @@ var (
4752
)
4853

4954
type RetrievedMetrics struct {
50-
Completions map[string]float64
51-
//AvgExecutionTime map[string]float64
55+
Completions map[string]float64
5256
AvgRemoteExecutionTime map[string]float64
5357
AvgEdgeExecutionTime map[string]map[string]float64
58+
AvgRemoteInitTime map[string]float64
59+
AvgEdgeInitTime map[string]map[string]float64
5460
AvgOutputSize map[string]float64
5561
BranchFrequency map[string]map[string]float64
5662
}
@@ -63,6 +69,10 @@ func (r RetrievedMetrics) String() string {
6369
s += fmt.Sprintf(" %v\n\n", r.AvgRemoteExecutionTime)
6470
s += "EDGE EXEC TIMES:\n"
6571
s += fmt.Sprintf(" %v\n\n", r.AvgEdgeExecutionTime)
72+
s += "REMOTE INIT TIMES:\n"
73+
s += fmt.Sprintf(" %v\n\n", r.AvgRemoteInitTime)
74+
s += "EDGE INIT TIMES:\n"
75+
s += fmt.Sprintf(" %v\n\n", r.AvgEdgeInitTime)
6676
s += "OUTPUT SIZE:\n"
6777
s += fmt.Sprintf(" %v\n\n", r.AvgOutputSize)
6878
s += "BRANCH FREQ:\n"
@@ -82,6 +92,7 @@ func Init() {
8292

8393
registry.MustRegister(metricCompletions)
8494
registry.MustRegister(metricExecutionTime)
95+
registry.MustRegister(metricInitializationTime)
8596
registry.MustRegister(metricOutputSize)
8697
registry.MustRegister(metricBranchCount)
8798

@@ -97,6 +108,9 @@ func AddCompletedInvocation(funcName string) {
97108
func AddFunctionDurationValue(funcName string, duration float64) {
98109
metricExecutionTime.With(prometheus.Labels{"function": funcName, "node": node.LocalNode.String()}).Observe(duration)
99110
}
111+
func AddFunctionInitTimeValue(funcName string, initTime float64) {
112+
metricInitializationTime.With(prometheus.Labels{"function": funcName, "node": node.LocalNode.String()}).Observe(initTime)
113+
}
100114
func AddFunctionOutputSizeValue(funcName string, size float64) {
101115
metricOutputSize.With(prometheus.Labels{"function": funcName}).Observe(size)
102116
}

internal/metrics/retriever.go

Lines changed: 17 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -177,14 +177,6 @@ func MetricsRetriever() {
177177
}
178178
retrievedMetrics.Completions = completionsPerFunction
179179

180-
//query = fmt.Sprintf("%s_sum{node=\"%s\"}/%s_count{node=\"%s\"}",
181-
// EXECUTION_TIME, node.LocalNode, EXECUTION_TIME, node.LocalNode)
182-
//avgFunDuration, err := retrieveByFunction(query, api, ctx)
183-
//if err != nil {
184-
// log.Printf("Error in retrieveByFunction: %v", err)
185-
//}
186-
//retrievedMetrics.AvgExecutionTime = avgFunDuration
187-
188180
query = fmt.Sprintf("%s_sum{}/%s_count{}", OUTPUT_SIZE, OUTPUT_SIZE)
189181
avgOutputSize, err := retrieveByFunction(query, api, ctx)
190182
if err != nil {
@@ -209,6 +201,14 @@ func MetricsRetriever() {
209201
}
210202
retrievedMetrics.AvgEdgeExecutionTime = avgFunDurationAllNodes
211203

204+
query = fmt.Sprintf("%s_sum{node=~\"\\\\(%s\\\\).*\"}/%s_count{node=~\"\\\\(%s\\\\).*\"}",
205+
INITIALIZATION_TIME, localArea, INITIALIZATION_TIME, localArea)
206+
avgInitTimeAllNodes, err := retrieveByFunctionAndNode(query, api, ctx)
207+
if err != nil {
208+
log.Printf("Error in retrieveByFunction: %v", err)
209+
}
210+
retrievedMetrics.AvgEdgeInitTime = avgInitTimeAllNodes
211+
212212
// CLOUD
213213
cloudArea := config.GetString(config.REGISTRY_REMOTE_AREA, "")
214214
if cloudArea != "" {
@@ -219,8 +219,17 @@ func MetricsRetriever() {
219219
log.Printf("Error in retrieveByFunction: %v", err)
220220
}
221221
retrievedMetrics.AvgRemoteExecutionTime = avgFunDuration
222+
223+
query = fmt.Sprintf("%s_sum{node=~\"\\\\(%s\\\\).*\"}/%s_count{node=~\"\\\\(%s\\\\).*\"}",
224+
INITIALIZATION_TIME, cloudArea, INITIALIZATION_TIME, cloudArea)
225+
avgInitTime, err := retrieveByFunction(query, api, ctx)
226+
if err != nil {
227+
log.Printf("Error in retrieveByFunction: %v", err)
228+
}
229+
retrievedMetrics.AvgRemoteInitTime = avgInitTime
222230
} else {
223231
retrievedMetrics.AvgRemoteExecutionTime = make(map[string]float64)
232+
retrievedMetrics.AvgRemoteInitTime = make(map[string]float64)
224233
}
225234

226235
fmt.Println("All queries completed")

internal/scheduling/scheduler.go

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -66,6 +66,7 @@ func Run(p Policy) {
6666
metrics.AddCompletedInvocation(c.fun.Name)
6767
if c.executionReport.SchedAction != SCHED_ACTION_OFFLOAD {
6868
metrics.AddFunctionDurationValue(c.fun.Name, c.executionReport.Duration)
69+
metrics.AddFunctionInitTimeValue(c.fun.Name, c.executionReport.InitTime)
6970
}
7071
outputSize := len(c.executionReport.Result)
7172
metrics.AddFunctionOutputSizeValue(r.Fun.Name, float64(outputSize))

internal/workflow/ilp_offloading_policy.go

Lines changed: 68 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -40,7 +40,8 @@ type ilpParams struct {
4040
NodeLabels map[string][]string `json:"node_labels"` // Labels per node
4141
TaskLabels map[string][]string `json:"task_labels"` // Labels per task
4242

43-
ExecTime map[string]float64 `json:"exectime"` // map[json.dumps((task, node))] = time
43+
ExecTime map[string]float64 `json:"exectime"` // map[json.dumps((task, node))] = time
44+
InitTime map[string]float64 `json:"init_time"` // map[json.dumps((task, node))] = time
4445
}
4546

4647
type taskPlacement map[TaskId]string
@@ -49,6 +50,7 @@ func initParams() ilpParams {
4950
return ilpParams{
5051
Adj: make(map[string][]string),
5152
ExecTime: make(map[string]float64),
53+
InitTime: make(map[string]float64),
5254
OutputSize: make(map[string]float64),
5355
NodeMemory: make(map[string]float64),
5456
Cost: make(map[string]float64),
@@ -63,7 +65,6 @@ func initParams() ilpParams {
6365
}
6466

6567
const CLOUD = "CLOUD"
66-
const LOCAL = "LOCAL"
6768

6869
var httpClient = &http.Client{Timeout: 10 * time.Second}
6970

@@ -176,6 +177,8 @@ func (policy *IlpOffloadingPolicy) Evaluate(r *Request, p *Progress) (Offloading
176177

177178
completed := 0
178179

180+
var LOCAL = registration.SelfRegistration.Key
181+
179182
if p == nil || !r.CanDoOffloading || len(p.ReadyToExecute) == 0 {
180183
return OffloadingDecision{Offload: false}, nil
181184
}
@@ -295,6 +298,8 @@ func (policy *IlpOffloadingPolicy) Evaluate(r *Request, p *Progress) (Offloading
295298
params.DSBandwidth[CLOUD] = config.GetFloat(config.OFFLOADING_POLICY_CLOUD_TO_DATA_STORE_BANDWIDTH, dsBandwidth*10)
296299
}
297300

301+
localWarmStatus := node.WarmStatus()
302+
298303
// Execution Times
299304
retrievedMetrics := metrics.GetMetrics()
300305
for tid, task := range r.W.Tasks {
@@ -305,34 +310,86 @@ func (policy *IlpOffloadingPolicy) Evaluate(r *Request, p *Progress) (Offloading
305310
if ok {
306311
f, _ := function.GetFunction(ft.Func)
307312
for _, n := range params.EdgeNodes {
308-
nodeTimes, found := retrievedMetrics.AvgEdgeExecutionTime[n]
313+
nId := node.NodeID{Area: registration.SelfRegistration.Area, Key: n}
314+
// Execution Times
315+
nodeTimes, found := retrievedMetrics.AvgEdgeExecutionTime[nId.String()]
316+
if !found {
317+
log.Printf("No data about exec times on %s", n)
318+
params.ExecTime[tupleKey(string(tid), n)] = 0.01 // no data: just guessing
319+
} else {
320+
t, found := nodeTimes[f.Name]
321+
if found {
322+
params.ExecTime[tupleKey(string(tid), n)] = t
323+
} else {
324+
log.Printf("No data about exec times of %s on %s", f.Name, n)
325+
params.ExecTime[tupleKey(string(tid), n)] = 0.01 // no data: just guessing
326+
}
327+
}
328+
329+
// Init Times
330+
initTimes, found := retrievedMetrics.AvgEdgeInitTime[nId.String()]
309331
if !found {
310-
params.ExecTime[tupleKey(string(tid), n)] = 1 // no data: just guessing
332+
// Unknown node
333+
params.InitTime[tupleKey(string(tid), n)] = 0.01 // no data: just guessing
311334
continue
312335
}
313-
t, found := nodeTimes[f.Name]
314-
if found {
315-
params.ExecTime[tupleKey(string(tid), n)] = t
336+
337+
coldStart := false
338+
if n == LOCAL {
339+
warmCount, ok := localWarmStatus[f.Name]
340+
if !ok || warmCount < 1 {
341+
coldStart = true
342+
}
316343
} else {
317-
params.ExecTime[tupleKey(string(tid), n)] = 1 // no data: just guessing
344+
warmCount, ok := nearbyServers[n].AvailableWarmContainers[f.Name]
345+
if !ok || warmCount < 1 {
346+
coldStart = true
347+
}
348+
}
349+
350+
if !coldStart {
351+
params.InitTime[tupleKey(string(tid), n)] = 0
352+
} else {
353+
t, found := initTimes[f.Name]
354+
if found {
355+
params.InitTime[tupleKey(string(tid), n)] = t
356+
} else {
357+
params.InitTime[tupleKey(string(tid), n)] = 0.01 // no data: just guessing
358+
}
318359
}
319360
}
320361

321362
if len(params.CloudNodes) > 0 {
322-
// Cloud
363+
// Cloud Execution Time
323364
t, found := retrievedMetrics.AvgRemoteExecutionTime[f.Name]
324365
if found {
325366
params.ExecTime[tupleKey(string(tid), CLOUD)] = t
326367
} else {
327-
params.ExecTime[tupleKey(string(tid), CLOUD)] = 1 // no data: just guessing
368+
params.ExecTime[tupleKey(string(tid), CLOUD)] = 0.01 // no data: just guessing
369+
}
370+
371+
// Init Time
372+
coldStart := true // TODO: assuming cold start
373+
if !coldStart {
374+
params.InitTime[tupleKey(string(tid), CLOUD)] = 0
375+
} else {
376+
t, found = retrievedMetrics.AvgRemoteInitTime[f.Name]
377+
if found {
378+
params.InitTime[tupleKey(string(tid), CLOUD)] = t
379+
} else {
380+
params.InitTime[tupleKey(string(tid), CLOUD)] = 0.01 // no data: just guessing
381+
}
328382
}
329383
}
330384
} else {
385+
// The task is not a Functiontask
331386
for _, n := range params.EdgeNodes {
332387
params.ExecTime[tupleKey(string(tid), n)] = 0.0001
388+
params.InitTime[tupleKey(string(tid), n)] = 0.0
333389
}
334390
if len(params.CloudNodes) > 0 {
335391
params.ExecTime[tupleKey(string(tid), CLOUD)] = 0.0001
392+
params.InitTime[tupleKey(string(tid), CLOUD)] = 0.0
336393
}
337394
}
338395
}
@@ -438,7 +495,7 @@ func computeDecisionFromPlacement(placement taskPlacement, p *Progress, r *Reque
438495
var remoteNode string
439496
for _, t := range p.ReadyToExecute {
440497
assignedNode := placement[t]
441-
if assignedNode == LOCAL {
498+
if assignedNode == registration.SelfRegistration.Key {
442499
localExecution = true
443500
break
444501
} else {

0 commit comments

Comments
 (0)