Skip to content

Commit a84beb5

Browse files
grussorussomatnar
andcommitted
Preparing params for ILP resolution
Co-authored-by: Matteo Nardelli <matnar@gmail.com>
1 parent 4c728fe commit a84beb5

4 files changed

Lines changed: 303 additions & 3 deletions

File tree

internal/metrics/metrics.go

Lines changed: 13 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@ var durationBuckets = []float64{0.002, 0.005, 0.010, 0.02, 0.03, 0.05, 0.1, 0.15
2121
const (
2222
COMPLETIONS = "completed_total"
2323
EXECUTION_TIME = "execution_time"
24+
OUTPUT_SIZE = "output_size"
2425
)
2526

2627
var (
@@ -33,11 +34,17 @@ var (
3334
Help: "Function duration",
3435
Buckets: durationBuckets,
3536
}, []string{"node", "function"})
37+
metricOutputSize = promauto.NewHistogramVec(prometheus.HistogramOpts{
38+
Name: OUTPUT_SIZE,
39+
Help: "Function output size",
40+
}, []string{"function"})
3641
)
3742

3843
type RetrievedMetrics struct {
39-
Completions map[string]float64
40-
AvgExecutionTime map[string]float64
44+
Completions map[string]float64
45+
AvgExecutionTime map[string]float64
46+
AvgExecutionTimeAllNodes map[string]map[string]float64
47+
AvgOutputSize map[string]float64
4148
}
4249

4350
func Init() {
@@ -51,6 +58,7 @@ func Init() {
5158

5259
registry.MustRegister(metricCompletions)
5360
registry.MustRegister(metricExecutionTime)
61+
registry.MustRegister(metricOutputSize)
5462

5563
ScrapingHandler = promhttp.HandlerFor(registry, promhttp.HandlerOpts{
5664
EnableOpenMetrics: true})
@@ -64,3 +72,6 @@ func AddCompletedInvocation(funcName string) {
6472
func AddFunctionDurationValue(funcName string, duration float64) {
6573
metricExecutionTime.With(prometheus.Labels{"function": funcName, "node": node.NodeIdentifier}).Observe(duration)
6674
}
75+
func AddFunctionOutputSizeValue(funcName string, size float64) {
76+
metricOutputSize.With(prometheus.Labels{"function": funcName}).Observe(size)
77+
}

internal/metrics/retriever.go

Lines changed: 54 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -68,6 +68,44 @@ func retrieveByFunction(query string, api v1.API, ctx context.Context) (map[stri
6868
return functionValues, nil
6969
}
7070

71+
func retrieveByFunctionAndNode(query string, api v1.API, ctx context.Context) (map[string]map[string]float64, error) {
72+
73+
result, warnings, err := api.Query(ctx, query, time.Now())
74+
if err != nil {
75+
return nil, fmt.Errorf("Failed query : %v\n", err)
76+
}
77+
78+
if len(warnings) > 0 {
79+
log.Printf("Received warnings in the execution of the : %v\n", warnings)
80+
}
81+
82+
values := make(map[string]map[string]float64)
83+
if vector, ok := result.(model.Vector); ok {
84+
for _, sample := range vector {
85+
value := float64(sample.Value)
86+
functionName, found := sample.Metric[model.LabelName("function")]
87+
if !found {
88+
log.Printf("Could not find the function name in the result : %v\n", sample)
89+
continue
90+
}
91+
nodeName, found := sample.Metric[model.LabelName("node")]
92+
if !found {
93+
log.Printf("Could not find the node name in the result : %v\n", sample)
94+
continue
95+
}
96+
_, foundInner := values[string(nodeName)]
97+
if !foundInner {
98+
values[string(nodeName)] = make(map[string]float64)
99+
}
100+
values[string(nodeName)][string(functionName)] = value
101+
}
102+
} else {
103+
return nil, fmt.Errorf("Unexpected Result %v\n", result)
104+
}
105+
106+
return values, nil
107+
}
108+
71109
func MetricsRetriever() {
72110
prometheusHost := config.GetString(config.METRICS_PROMETHEUS_HOST, "127.0.0.1")
73111
prometheusPort := config.GetInt(config.METRICS_PROMETHEUS_PORT, 9090)
@@ -83,7 +121,7 @@ func MetricsRetriever() {
83121
api := v1.NewAPI(client)
84122
ctx := context.Background()
85123

86-
ticker := time.NewTicker(time.Duration(config.GetInt(config.METRICS_RETRIEVER_INTERVAL, 10)) * time.Second)
124+
ticker := time.NewTicker(time.Duration(config.GetInt(config.METRICS_RETRIEVER_INTERVAL, 5)) * time.Second)
87125
defer ticker.Stop()
88126

89127
for {
@@ -105,7 +143,22 @@ func MetricsRetriever() {
105143
}
106144
retrievedMetrics.AvgExecutionTime = avgFunDuration
107145

146+
query = fmt.Sprintf("%s_sum{}/%s_count{}", EXECUTION_TIME, EXECUTION_TIME)
147+
avgFunDurationAllNodes, err := retrieveByFunctionAndNode(query, api, ctx)
148+
if err != nil {
149+
log.Printf("Error in retrieveByFunction: %v\n", err)
150+
}
151+
retrievedMetrics.AvgExecutionTimeAllNodes = avgFunDurationAllNodes
152+
153+
query = fmt.Sprintf("%s_sum{}/%s_count{}", OUTPUT_SIZE, OUTPUT_SIZE)
154+
avgOutputSize, err := retrieveByFunction(query, api, ctx)
155+
if err != nil {
156+
log.Printf("Error in retrieveByFunction: %v\n", err)
157+
}
158+
retrievedMetrics.AvgOutputSize = avgOutputSize
159+
108160
fmt.Println("All queries completed")
161+
fmt.Println(retrievedMetrics)
109162

110163
}
111164
}

internal/scheduling/scheduler.go

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -66,6 +66,8 @@ func Run(p Policy) {
6666
if c.executionReport.SchedAction != SCHED_ACTION_OFFLOAD {
6767
metrics.AddFunctionDurationValue(c.fun.Name, c.executionReport.Duration)
6868
}
69+
outputSize := len(c.executionReport.Result)
70+
metrics.AddFunctionOutputSizeValue(r.Fun.Name, float64(outputSize))
6971
}
7072
}
7173
}
Lines changed: 234 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,234 @@
1+
package workflow
2+
3+
import (
4+
"encoding/json"
5+
"github.com/serverledge-faas/serverledge/internal/function"
6+
"github.com/serverledge-faas/serverledge/internal/metrics"
7+
"github.com/serverledge-faas/serverledge/internal/node"
8+
"github.com/serverledge-faas/serverledge/internal/registration"
9+
"golang.org/x/exp/slices"
10+
"strconv"
11+
)
12+
13+
type IlpOffloadingPolicy struct{}
14+
15+
type ilpParams struct {
16+
N []string `json:"N"` // Set of nodes
17+
NodeMemory map[string]float64 `json:"node_memory"` // Memory per node
18+
DSLatency map[string]float64 `json:"ds_latency"` // Latency per node
19+
DSBandwidth map[string]float64 `json:"ds_bandwidth"` // Bandwidth per node
20+
NodeLatency map[string]float64 `json:"node_latency"` // map[json.dumps((src_node, dst_node))] = latency
21+
HandlingNode string `json:"handling_node"`
22+
T []string `json:"T"` // Set of tasks
23+
Adj map[string][]string `json:"adj"` // Task adjacency list
24+
TaskMemory map[string]float64 `json:"task_memory"` // Memory per task
25+
Deadline float64 `json:"deadline"` // Global deadline
26+
OutputSize map[string]float64 `json:"output_size"` // Output size per task
27+
InputSize float64 `json:"input_size"` // Input data size
28+
29+
Alpha float64 `json:"alpha"` // Weighting factor
30+
NodeLabels map[string][]string `json:"node_labels"` // Labels per node
31+
TaskLabels map[string][]string `json:"task_labels"` // Labels per task
32+
33+
ExecTime map[string]float64 `json:"exectime"` // map[json.dumps((task, node))] = time
34+
}
35+
36+
const CLOUD = "CLOUD"
37+
const LOCAL = "LOCAL"
38+
39+
func tupleKey(s1, s2 string) string {
40+
keyBytes, _ := json.Marshal([]string{s1, s2})
41+
return string(keyBytes)
42+
}
43+
44+
func computeOutputSize(workflow *Workflow, inputParamsSize float64) map[string]float64 {
45+
46+
task := workflow.Start
47+
48+
tasks := make([]Task, 0)
49+
visited := make(map[TaskId]bool)
50+
toVisit := []Task{task}
51+
52+
outputSize := make(map[string]float64)
53+
avgOutputSize := metrics.GetMetrics().AvgOutputSize
54+
55+
var currentInputSize float64 // TODO: propagating input size to output only works for sequential connections
56+
57+
for len(toVisit) > 0 {
58+
task := toVisit[0]
59+
tasks = append(tasks, task)
60+
toVisit = toVisit[1:]
61+
visited[task.GetId()] = true
62+
63+
var currentOutputSize float64
64+
65+
var nextTasks []TaskId
66+
switch typedTask := task.(type) {
67+
case ConditionalTask:
68+
nextTasks = typedTask.GetAlternatives()
69+
currentOutputSize = currentInputSize
70+
case UnaryTask:
71+
nextTasks = append(nextTasks, typedTask.GetNext())
72+
73+
ft, ok := typedTask.(*FunctionTask)
74+
if ok {
75+
currentOutputSize = avgOutputSize[ft.Func]
76+
} else {
77+
currentOutputSize = currentInputSize
78+
}
79+
default:
80+
currentOutputSize = currentInputSize
81+
}
82+
83+
outputSize[string(task.GetId())] = currentOutputSize
84+
85+
for _, nt := range nextTasks {
86+
if _, ok := visited[nt]; !ok {
87+
nextTask, ok := workflow.Tasks[nt]
88+
if ok {
89+
if !slices.Contains(toVisit, nextTask) {
90+
toVisit = append(toVisit, nextTask)
91+
}
92+
}
93+
}
94+
}
95+
96+
currentInputSize = currentOutputSize
97+
}
98+
99+
return outputSize
100+
}
101+
102+
func (policy *IlpOffloadingPolicy) Evaluate(r *Request, p *Progress) (OffloadingDecision, error) {
103+
104+
completed := 0
105+
106+
if p == nil || !r.CanDoOffloading || len(p.ReadyToExecute) == 0 {
107+
return OffloadingDecision{Offload: false}, nil
108+
}
109+
110+
for _, s := range p.Status {
111+
if s == Executed {
112+
completed++
113+
}
114+
}
115+
116+
if completed > 0 {
117+
// TODO: we do not handle scheduling during workflow execution at the moment
118+
return OffloadingDecision{Offload: false}, nil
119+
}
120+
121+
// TODO: prepare parameters for ILP
122+
params := ilpParams{}
123+
params.N = []string{LOCAL, CLOUD}
124+
params.Deadline = r.QoS.MaxRespT
125+
params.HandlingNode = LOCAL
126+
params.NodeMemory[LOCAL] = (float64)(node.Resources.AvailableMemMB)
127+
128+
// Add available Edge peers
129+
nearbyServers := registration.Reg.NearbyServersMap
130+
if nearbyServers != nil {
131+
for k, v := range nearbyServers {
132+
if v.AvailableMemMB > 0 && v.AvailableCPUs > 0 {
133+
params.N = append(params.N, k)
134+
params.NodeMemory[k] = float64(v.AvailableMemMB)
135+
}
136+
}
137+
}
138+
139+
// Compute distances
140+
for key1, v1 := range nearbyServers {
141+
var distance float64
142+
if !slices.Contains(params.N, key1) {
143+
continue
144+
}
145+
distance = registration.Reg.Client.DistanceTo(&v1.Coordinates).Seconds() / 2
146+
params.NodeLatency[tupleKey(LOCAL, key1)] = distance
147+
params.NodeLatency[tupleKey(key1, LOCAL)] = distance
148+
149+
for key2, v2 := range nearbyServers {
150+
if !slices.Contains(params.N, key2) {
151+
continue
152+
}
153+
if key1 == key2 {
154+
distance = 0.0
155+
} else {
156+
distance = v1.Coordinates.DistanceTo(&v2.Coordinates).Seconds() / 2
157+
}
158+
params.NodeLatency[tupleKey(key1, key2)] = distance
159+
params.NodeLatency[tupleKey(key2, key1)] = distance
160+
}
161+
}
162+
163+
// Distances to Cloud and Data Store
164+
// TODO: we assume distance to Cloud == distance to DS (for all Edge nodes)
165+
distanceToCloud := 0.100 // TODO
166+
for _, n := range params.N {
167+
if n == CLOUD {
168+
params.NodeLatency[tupleKey(n, n)] = 0.0
169+
params.DSLatency[n] = 0.0
170+
} else {
171+
params.NodeLatency[tupleKey(n, CLOUD)] = distanceToCloud
172+
params.NodeLatency[tupleKey(CLOUD, n)] = distanceToCloud
173+
params.DSLatency[n] = distanceToCloud
174+
}
175+
}
176+
177+
// Bandwidth (we assume identical)
178+
dsBandwidth := 100.0 // TODO
179+
for _, n := range params.N {
180+
if n == CLOUD {
181+
params.DSBandwidth[n] = dsBandwidth * 10
182+
} else {
183+
params.DSBandwidth[n] = dsBandwidth
184+
}
185+
}
186+
187+
// Workflow
188+
// TODO: we cannot marshal every time just to know the size!!
189+
data, _ := json.Marshal(r.Params)
190+
params.InputSize = float64(len(data))
191+
params.OutputSize = computeOutputSize(r.W, params.InputSize)
192+
193+
params.T = make([]string, len(r.W.Tasks))
194+
for tid, task := range r.W.Tasks {
195+
params.T = append(params.T, string(tid))
196+
params.Adj[string(tid)] = make([]string, 1)
197+
198+
switch typedTask := task.(type) {
199+
case ConditionalTask:
200+
nextTasks := typedTask.GetAlternatives()
201+
for _, nextTask := range nextTasks {
202+
probability := 1.0 / float64(len(nextTasks)) // TODO
203+
entry := tupleKey(string(nextTask), strconv.FormatFloat(probability, 'f', 2, 32))
204+
params.Adj[string(tid)] = append(params.Adj[string(tid)], entry)
205+
}
206+
params.TaskMemory[string(tid)] = float64(10)
207+
case UnaryTask:
208+
nextTid := string(typedTask.GetNext())
209+
entry := tupleKey(nextTid, "1.0")
210+
params.Adj[string(tid)] = append(params.Adj[string(tid)], entry)
211+
212+
ft, ok := typedTask.(*FunctionTask)
213+
if ok {
214+
f, _ := function.GetFunction(ft.Func)
215+
params.TaskMemory[string(tid)] = float64(f.MemoryMB)
216+
}
217+
218+
default:
219+
params.TaskMemory[string(tid)] = float64(10)
220+
}
221+
222+
}
223+
224+
// TODO: resolve ILP
225+
226+
// TODO: parse results and make a decision
227+
228+
//if completed >= 2 && completed < 4 {
229+
// plan := ExecutionPlan{ToExecute: p.ReadyToExecute} // TODO
230+
// return OffloadingDecision{true, "127.0.0.1:1323", plan}, nil
231+
//}
232+
233+
return OffloadingDecision{Offload: false}, nil
234+
}

0 commit comments

Comments
 (0)