Skip to content

Commit 40aee98

Browse files
committed
Periodic re-computation of ILP-based placement
1 parent 5c5ae30 commit 40aee98

2 files changed

Lines changed: 39 additions & 11 deletions

File tree

internal/config/keys.go

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -88,6 +88,9 @@ const OFFLOADING_POLICY_ILP_OPTIMIZER_PORT = "offloading.policy.ilp.port"
8888
// IP address / hostname used by ILP Offloading Policy for solving the ILP formulation
8989
const OFFLOADING_POLICY_ILP_OPTIMIZER_HOST = "offloading.policy.ilp.host"
9090

91+
// Number of times a scheduling plan can be reused before being re-computed
92+
const OFFLOADING_POLICY_ILP_PLACEMENT_TTL = "offloading.policy.ilp.placement.ttl"
93+
9194
// Estimated bandwidth between the current node and the data store
9295
const OFFLOADING_POLICY_NODE_TO_DATA_STORE_BANDWIDTH = "offloading.policy.node2datastore.bandwidth"
9396

internal/workflow/ilp_offloading_policy.go

Lines changed: 36 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@ import (
44
"bytes"
55
"encoding/json"
66
"fmt"
7+
"github.com/serverledge-faas/serverledge/internal/node"
78
"log"
89
"net/http"
910
"strconv"
@@ -62,24 +63,45 @@ const LOCAL = "LOCAL"
6263

6364
var httpClient = &http.Client{Timeout: 10 * time.Second}
6465

65-
var cachedSolutions map[string]taskPlacement
66+
type cachedPlacement struct {
67+
placement taskPlacement
68+
ttl int
69+
}
70+
71+
var placementCache map[string]*cachedPlacement
6672

6773
func getCachedSolution(r *Request) (*taskPlacement, bool) {
68-
if cachedSolutions == nil {
69-
cachedSolutions = make(map[string]taskPlacement)
74+
if placementCache == nil {
75+
placementCache = make(map[string]*cachedPlacement)
76+
return nil, false
77+
}
78+
79+
sol, ok := placementCache[r.Id]
80+
if !ok {
7081
return nil, false
7182
}
7283

73-
sol, ok := cachedSolutions[r.Id]
74-
return &sol, ok
84+
// check TTL
85+
if sol.ttl > 0 {
86+
sol.ttl--
87+
return &sol.placement, ok
88+
}
89+
90+
delete(placementCache, r.Id)
91+
return nil, false
7592
}
7693

7794
func cacheSolution(r *Request, sol *taskPlacement) {
78-
if cachedSolutions == nil {
79-
cachedSolutions = make(map[string]taskPlacement)
95+
if placementCache == nil {
96+
placementCache = make(map[string]*cachedPlacement)
8097
}
8198

82-
cachedSolutions[r.Id] = *sol
99+
defaultTTL := config.GetInt(config.OFFLOADING_POLICY_ILP_PLACEMENT_TTL, 2) - 1
100+
101+
placementCache[r.Id] = &cachedPlacement{
102+
placement: *sol,
103+
ttl: defaultTTL,
104+
}
83105
}
84106

85107
func tupleKey(s1, s2 string) string {
@@ -160,8 +182,9 @@ func (policy *IlpOffloadingPolicy) Evaluate(r *Request, p *Progress) (Offloading
160182
}
161183

162184
if completed > 0 {
163-
placement, found := getCachedSolution(r) // TODO: trigger ILP resolution?
185+
placement, found := getCachedSolution(r)
164186
if found {
187+
log.Printf("Reusing cached placement\n")
165188
return computeDecisionFromPlacement(*placement, p, r), nil
166189
}
167190
}
@@ -172,7 +195,7 @@ func (policy *IlpOffloadingPolicy) Evaluate(r *Request, p *Progress) (Offloading
172195
params.EdgeNodes = []string{LOCAL}
173196
params.Deadline = r.QoS.MaxRespT - time.Now().Sub(r.Arrival).Seconds() // TODO: what if deadline is <0?
174197
params.HandlingNode = LOCAL
175-
params.NodeMemory[LOCAL] = 10 // (float64)(node.Resources.AvailableMemMB) // TODO: change this
198+
params.NodeMemory[LOCAL] = (float64)(node.Resources.AvailableMemMB)
176199

177200
// TODO: introduce task and node labels
178201

@@ -353,6 +376,8 @@ func (policy *IlpOffloadingPolicy) Evaluate(r *Request, p *Progress) (Offloading
353376
return OffloadingDecision{Offload: false}, fmt.Errorf("decoding response: %w", err)
354377
}
355378

379+
cacheSolution(r, &placement)
380+
356381
for k, v := range placement {
357382
fmt.Printf("Task: %s -> %s \n", k, v)
358383
}
@@ -374,9 +399,9 @@ func computeDecisionFromPlacement(placement taskPlacement, p *Progress, r *Reque
374399
remoteNode = assignedNode
375400
}
376401
}
402+
377403
if localExecution {
378404
log.Println("Continuing with local execution")
379-
cacheSolution(r, &placement)
380405
return OffloadingDecision{Offload: false}
381406
}
382407

0 commit comments

Comments
 (0)