Skip to content

Commit f620e47

Browse files
committed
Makes fields of Resources private
1 parent 6f7fd53 commit f620e47

8 files changed

Lines changed: 81 additions & 60 deletions

File tree

internal/api/api.go

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -229,9 +229,10 @@ func GetServerStatus(c echo.Context) error {
229229
// TODO: use a different type
230230
response := registration.StatusInformation{
231231
AvailableWarmContainers: node.WarmStatus(),
232-
AvailableMemMB: node.LocalResources.AvailableMemory(),
233-
UsedMemMB: node.LocalResources.BusyPoolUsedMem,
234-
AvailableCPUs: node.LocalResources.AvailableCPUs(),
232+
TotalMemory: node.LocalResources.TotalMemory(),
233+
UsedMemory: node.LocalResources.UsedMemory(),
234+
TotalCPU: node.LocalResources.TotalCPUs(),
235+
UsedCPU: node.LocalResources.UsedCPUs(),
235236
Coordinates: *registration.VivaldiClient.GetCoordinate(),
236237
LoadAvg: loadAvgValues,
237238
}

internal/node/node.go

Lines changed: 35 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,8 @@ import (
44
"errors"
55
"fmt"
66
"github.com/lithammer/shortuuid"
7+
"github.com/serverledge-faas/serverledge/internal/config"
8+
"runtime"
79
"strconv"
810
"sync"
911
"time"
@@ -29,29 +31,52 @@ func NewIdentifier(area string) NodeID {
2931

3032
type Resources struct {
3133
sync.RWMutex
32-
TotalMemory int64
33-
TotalCPUs float64
34-
BusyPoolUsedMem int64
35-
WarmPoolUsedMem int64
36-
UsedCPUs float64
37-
ContainerPools map[string]*ContainerPool
34+
totalMemory int64
35+
totalCPUs float64
36+
busyPoolUsedMem int64
37+
warmPoolUsedMem int64
38+
usedCPUs float64
39+
containerPools map[string]*ContainerPool
40+
}
41+
42+
func (n *Resources) Init() {
43+
availableCores := runtime.NumCPU()
44+
n.totalCPUs = config.GetFloat(config.POOL_CPUS, float64(availableCores))
45+
n.totalMemory = int64(config.GetInt(config.POOL_MEMORY_MB, 1024))
46+
n.containerPools = make(map[string]*ContainerPool)
3847
}
3948

4049
func (n *Resources) String() string {
41-
return fmt.Sprintf("[CPUs: %f/%f - Mem: %d(%d warm)/%d]", n.UsedCPUs, n.TotalCPUs, n.BusyPoolUsedMem, n.WarmPoolUsedMem, n.TotalMemory)
50+
return fmt.Sprintf("[CPUs: %f/%f - Mem: %d(%d warm)/%d]", n.usedCPUs, n.totalCPUs, n.busyPoolUsedMem, n.warmPoolUsedMem, n.totalMemory)
4251
}
4352

4453
func (n *Resources) FreeMemory() int64 {
45-
return n.TotalMemory - n.BusyPoolUsedMem - n.WarmPoolUsedMem
54+
return n.totalMemory - n.busyPoolUsedMem - n.warmPoolUsedMem
4655
}
4756

4857
// AvailableMemory returns amount of memory that is free or reclaimable from warm containers
4958
func (n *Resources) AvailableMemory() int64 {
50-
return n.TotalMemory - n.BusyPoolUsedMem
59+
return n.totalMemory - n.busyPoolUsedMem
5160
}
5261

5362
func (n *Resources) AvailableCPUs() float64 {
54-
return n.TotalCPUs - n.UsedCPUs
63+
return n.totalCPUs - n.usedCPUs
64+
}
65+
66+
func (n *Resources) UsedMemory() int64 {
67+
return n.busyPoolUsedMem
68+
}
69+
70+
func (n *Resources) UsedCPUs() float64 {
71+
return n.usedCPUs
72+
}
73+
74+
func (n *Resources) TotalCPUs() float64 {
75+
return n.totalCPUs
76+
}
77+
78+
func (n *Resources) TotalMemory() int64 {
79+
return n.totalMemory
5580
}
5681

5782
var LocalResources Resources

internal/node/pool.go

Lines changed: 25 additions & 25 deletions
Original file line numberDiff line numberDiff line change
@@ -20,12 +20,12 @@ var NoWarmFoundErr = errors.New("no warm container is available")
2020

2121
// GetContainerPool retrieves (or creates) the container pool for a function.
2222
func GetContainerPool(f *function.Function) *ContainerPool {
23-
if fp, ok := LocalResources.ContainerPools[f.Name]; ok {
23+
if fp, ok := LocalResources.containerPools[f.Name]; ok {
2424
return fp
2525
}
2626

2727
fp := newContainerPool()
28-
LocalResources.ContainerPools[f.Name] = fp
28+
LocalResources.containerPools[f.Name] = fp
2929
return fp
3030
}
3131

@@ -73,9 +73,9 @@ func acquireNewMemory(mem int64, forWarmPool bool) bool {
7373
}
7474

7575
if forWarmPool {
76-
LocalResources.WarmPoolUsedMem += mem
76+
LocalResources.warmPoolUsedMem += mem
7777
} else {
78-
LocalResources.BusyPoolUsedMem += mem
78+
LocalResources.busyPoolUsedMem += mem
7979
}
8080

8181
return true
@@ -113,8 +113,8 @@ func acquireWarmContainer(f *function.Function) (*container.Container, error) {
113113
//log.Printf("Not enough CPU to start a warm container for %s", f)
114114
return nil, OutOfResourcesErr
115115
}
116-
LocalResources.BusyPoolUsedMem += f.MemoryMB
117-
LocalResources.WarmPoolUsedMem -= f.MemoryMB
116+
LocalResources.busyPoolUsedMem += f.MemoryMB
117+
LocalResources.warmPoolUsedMem -= f.MemoryMB
118118
LocalResources.usedCPUs += f.CPUDemand
119119

120120
// add container to the busy pool
@@ -172,9 +172,9 @@ func HandleCompletion(cont *container.Container, f *function.Function) {
172172
cont.ExpirationTime = time.Now().Add(d).UnixNano()
173173
fp.idle.PushBack(cont)
174174

175-
LocalResources.UsedCPUs -= f.CPUDemand
176-
LocalResources.BusyPoolUsedMem -= f.MemoryMB
177-
LocalResources.WarmPoolUsedMem += f.MemoryMB
175+
LocalResources.usedCPUs -= f.CPUDemand
176+
LocalResources.busyPoolUsedMem -= f.MemoryMB
177+
LocalResources.warmPoolUsedMem += f.MemoryMB
178178
}
179179
}
180180

@@ -191,7 +191,7 @@ func AcquireResourcesForNewContainer(fun *function.Function, forWarmPool bool) b
191191
}
192192

193193
if !forWarmPool {
194-
LocalResources.UsedCPUs += fun.CPUDemand
194+
LocalResources.usedCPUs += fun.CPUDemand
195195
}
196196
return true
197197
}
@@ -209,8 +209,8 @@ func NewContainerWithAcquiredResources(fun *function.Function, startAsIdle bool,
209209
LocalResources.Lock()
210210
defer LocalResources.Unlock()
211211
if err != nil {
212-
LocalResources.BusyPoolUsedMem -= fun.MemoryMB
213-
LocalResources.UsedCPUs -= fun.CPUDemand
212+
LocalResources.busyPoolUsedMem -= fun.MemoryMB
213+
LocalResources.usedCPUs -= fun.CPUDemand
214214
return nil, err
215215
}
216216

@@ -236,8 +236,8 @@ func NewContainerWithAcquiredResourcesAsync(fun *function.Function, okCallback f
236236
LocalResources.Lock()
237237
defer LocalResources.Unlock()
238238
if err != nil {
239-
LocalResources.BusyPoolUsedMem -= fun.MemoryMB
240-
LocalResources.UsedCPUs -= fun.CPUDemand
239+
LocalResources.busyPoolUsedMem -= fun.MemoryMB
240+
LocalResources.usedCPUs -= fun.CPUDemand
241241
return
242242
}
243243

@@ -264,7 +264,7 @@ func dismissContainer(requiredMemoryMB int64) (bool, error) {
264264
var containerToDismiss []itemToDismiss
265265

266266
//first phase, research
267-
for _, funPool := range LocalResources.ContainerPools {
267+
for _, funPool := range LocalResources.containerPools {
268268
if funPool.idle.Len() > 0 {
269269
// every container into the funPool has the same memory (same function)
270270
//so it is not important which one you destroy
@@ -294,7 +294,7 @@ cleanup: // second phase, cleanup
294294
if err != nil {
295295
return false, err
296296
}
297-
LocalResources.WarmPoolUsedMem -= item.memory
297+
LocalResources.warmPoolUsedMem -= item.memory
298298
}
299299
} else {
300300
log.Printf("Not enough containers to free up at least %d MB (avail to dismiss: %d)", requiredMemoryMB, cleanedMB)
@@ -310,7 +310,7 @@ func DeleteExpiredContainer() {
310310
LocalResources.Lock()
311311
defer LocalResources.Unlock()
312312

313-
for _, pool := range LocalResources.ContainerPools {
313+
for _, pool := range LocalResources.containerPools {
314314
elem := pool.idle.Front()
315315
for ok := elem != nil; ok; ok = elem != nil {
316316
warm := elem.Value.(*container.Container)
@@ -321,7 +321,7 @@ func DeleteExpiredContainer() {
321321
pool.idle.Remove(temp) // remove the expired element
322322

323323
memory, _ := container.GetMemoryMB(warm.ID)
324-
LocalResources.WarmPoolUsedMem -= memory
324+
LocalResources.warmPoolUsedMem -= memory
325325
err := container.Destroy(warm.ID)
326326
if err != nil {
327327
log.Printf("Error while destroying container %s: %s\n", warm.ID, err)
@@ -341,7 +341,7 @@ func ShutdownWarmContainersFor(f *function.Function) {
341341
LocalResources.Lock()
342342
defer LocalResources.Unlock()
343343

344-
fp, ok := LocalResources.ContainerPools[f.Name]
344+
fp, ok := LocalResources.containerPools[f.Name]
345345
if !ok {
346346
return
347347
}
@@ -357,7 +357,7 @@ func ShutdownWarmContainersFor(f *function.Function) {
357357
fp.idle.Remove(temp)
358358

359359
memory, _ := container.GetMemoryMB(warmed.ID)
360-
LocalResources.WarmPoolUsedMem -= memory
360+
LocalResources.warmPoolUsedMem -= memory
361361
containersToDelete = append(containersToDelete, warmed.ID)
362362
}
363363

@@ -378,7 +378,7 @@ func ShutdownAllContainers() {
378378
LocalResources.Lock()
379379
defer LocalResources.Unlock()
380380

381-
for fun, pool := range LocalResources.ContainerPools {
381+
for fun, pool := range LocalResources.containerPools {
382382
functionDescriptor, _ := function.GetFunction(fun)
383383

384384
for elem := pool.idle.Front(); elem != nil; elem = elem.Next() {
@@ -391,7 +391,7 @@ func ShutdownAllContainers() {
391391
if err != nil {
392392
log.Printf("Error while destroying container %s: %s", warmed.ID, err)
393393
}
394-
LocalResources.WarmPoolUsedMem -= functionDescriptor.MemoryMB
394+
LocalResources.warmPoolUsedMem -= functionDescriptor.MemoryMB
395395
}
396396

397397
for elem := pool.busy.Front(); elem != nil; elem = elem.Next() {
@@ -406,8 +406,8 @@ func ShutdownAllContainers() {
406406
continue
407407
}
408408

409-
LocalResources.UsedCPUs -= functionDescriptor.CPUDemand
410-
LocalResources.BusyPoolUsedMem -= functionDescriptor.MemoryMB
409+
LocalResources.usedCPUs -= functionDescriptor.CPUDemand
410+
LocalResources.busyPoolUsedMem -= functionDescriptor.MemoryMB
411411
}
412412
}
413413
}
@@ -417,7 +417,7 @@ func WarmStatus() map[string]int {
417417
LocalResources.RLock()
418418
defer LocalResources.RUnlock()
419419
warmPool := make(map[string]int)
420-
for funcName, pool := range LocalResources.ContainerPools {
420+
for funcName, pool := range LocalResources.containerPools {
421421
warmPool[funcName] = pool.idle.Len()
422422
}
423423

internal/registration/types.go

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -19,9 +19,10 @@ type NodeRegistration struct {
1919

2020
type StatusInformation struct {
2121
AvailableWarmContainers map[string]int // <k, v> = <function name, warm container number>
22-
AvailableMemMB int64
23-
UsedMemMB int64
24-
AvailableCPUs float64
22+
TotalMemory int64
23+
UsedMemory int64
24+
TotalCPU float64
25+
UsedCPU float64
2526
Coordinates vivaldi.Coordinate
2627
LoadAvg []float64
2728
}

internal/registration/udp.go

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -70,9 +70,10 @@ func handleUDPConnection(conn *net.UDPConn) {
7070
func getCurrentStatusInformation() (status []byte, err error) {
7171
response := StatusInformation{
7272
AvailableWarmContainers: node.WarmStatus(),
73-
AvailableMemMB: node.LocalResources.AvailableMemory(),
74-
AvailableCPUs: node.LocalResources.AvailableCPUs(),
75-
UsedMemMB: node.LocalResources.BusyPoolUsedMem,
73+
TotalMemory: node.LocalResources.TotalMemory(),
74+
TotalCPU: node.LocalResources.TotalCPUs(),
75+
UsedMemory: node.LocalResources.UsedMemory(),
76+
UsedCPU: node.LocalResources.UsedCPUs(),
7677
Coordinates: *VivaldiClient.GetCoordinate(),
7778
}
7879

internal/scheduling/scheduler.go

Lines changed: 1 addition & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -5,10 +5,8 @@ import (
55
"github.com/serverledge-faas/serverledge/internal/registration"
66
"log"
77
"net/http"
8-
"runtime"
98
"time"
109

11-
"github.com/serverledge-faas/serverledge/internal/config"
1210
"github.com/serverledge-faas/serverledge/internal/container"
1311
"github.com/serverledge-faas/serverledge/internal/function"
1412
"github.com/serverledge-faas/serverledge/internal/metrics"
@@ -27,12 +25,7 @@ func Run(p Policy) {
2725
requests = make(chan *scheduledRequest, 500)
2826
completions = make(chan *completionNotification, 500)
2927

30-
// initialize resources
31-
availableCores := runtime.NumCPU()
32-
33-
node.LocalResources.TotalMemory = int64(config.GetInt(config.POOL_MEMORY_MB, 1024))
34-
node.LocalResources.TotalCPUs = config.GetFloat(config.POOL_CPUS, float64(availableCores))
35-
node.LocalResources.ContainerPools = make(map[string]*node.ContainerPool)
28+
node.LocalResources.Init()
3629
log.Printf("Current resources: %v\n", &node.LocalResources)
3730

3831
container.InitDockerContainerFactory()

internal/workflow/remote_offloading_policy.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -201,9 +201,9 @@ func prepareParameters(r *Request, p *Progress) *remotePolicyParams {
201201
nearbyServers := registration.GetFullNeighborInfo()
202202
if nearbyServers != nil {
203203
for k, v := range nearbyServers {
204-
if v.AvailableMemMB > 0 && v.AvailableCPUs > 0 {
204+
if (v.TotalMemory-v.UsedMemory) > 0 && (v.TotalCPU-v.UsedCPU) > 0 {
205205
params.EdgeNodes = append(params.EdgeNodes, k)
206-
params.NodeMemory[k] = float64(v.AvailableMemMB)
206+
params.NodeMemory[k] = float64(v.TotalMemory - v.UsedMemory)
207207

208208
// Cost (assuming that Edge nodes are all in the same area)
209209
params.Cost[k] = localCost

internal/workflow/threshold_policy.go

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -24,7 +24,7 @@ func (policy *ThresholdBasedPolicy) Evaluate(r *Request, p *Progress) (Offloadin
2424
return OffloadingDecision{Offload: false}, nil
2525
}
2626

27-
usedMemory := node.LocalResources.BusyPoolUsedMem
27+
usedMemory := node.LocalResources.UsedMemory()
2828
nextTaskId := p.ReadyToExecute[0] // TODO: update in case of parallel branches
2929
nextTask := r.W.Tasks[nextTaskId]
3030

@@ -41,7 +41,7 @@ func (policy *ThresholdBasedPolicy) Evaluate(r *Request, p *Progress) (Offloadin
4141
return OffloadingDecision{Offload: false}, nil
4242
}
4343

44-
if float64(usedMemory+f.MemoryMB)/float64(node.LocalResources.TotalMemory) <= utilizationThreshold {
44+
if float64(usedMemory+f.MemoryMB)/float64(node.LocalResources.TotalMemory()) <= utilizationThreshold {
4545
log.Printf("Threshold OK...executing locally %v", nextTaskId)
4646
// execute locally next task
4747
return OffloadingDecision{Offload: false}, nil
@@ -70,7 +70,7 @@ func (policy *ThresholdBasedPolicy) Evaluate(r *Request, p *Progress) (Offloadin
7070
log.Printf("Could not find function for task %s", nextTaskId)
7171
break
7272
}
73-
if float64(usedMemory+f.MemoryMB)/float64(node.LocalResources.TotalMemory) > utilizationThreshold {
73+
if float64(usedMemory+f.MemoryMB)/float64(node.LocalResources.TotalMemory()) > utilizationThreshold {
7474
log.Printf("%v also violates threshold", nextTaskId)
7575
offloadedMemory += f.MemoryMB
7676
offloadedTasks = append(offloadedTasks, nextTaskId)
@@ -102,10 +102,10 @@ func (policy *ThresholdBasedPolicy) Evaluate(r *Request, p *Progress) (Offloadin
102102
if nearbyServers != nil {
103103
for k, v := range nearbyServers {
104104
// TODO: apply a threshold here ?
105-
if v.AvailableMemMB >= offloadedMemory { // TODO: should look at free memory (ignoring warm containers)
106-
if offloadingTarget == "" || v.AvailableMemMB > offloadingTargetMem {
105+
if (v.TotalMemory - v.UsedMemory) >= offloadedMemory { // TODO: should look at free memory (ignoring warm containers)
106+
if offloadingTarget == "" || (v.TotalMemory-v.UsedMemory) > offloadingTargetMem {
107107
offloadingTarget = k
108-
offloadingTargetMem = v.AvailableMemMB
108+
offloadingTargetMem = v.TotalMemory - v.UsedMemory
109109
}
110110
} else {
111111
log.Printf("Not enough memory to offload to %v", k)

0 commit comments

Comments
 (0)