Skip to content

Commit 640c517

Browse files
committed
fix: LB handling non /invoke requests
1 parent e7f6dbb commit 640c517

4 files changed

Lines changed: 56 additions & 10 deletions

File tree

internal/lb/architectureAwareLb.go

Lines changed: 36 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -24,8 +24,10 @@ type ArchitectureAwareBalancer struct {
2424
armRing *HashRing
2525
x86Ring *HashRing
2626

27-
mode string
28-
rrIndices map[string]int
27+
mode string
28+
archRRIndex int
29+
armRRIndex int // for non-invoke requests using RR
30+
x86RRIndex int // for non-invoke requests using RR
2931
}
3032

3133
// NewArchitectureAwareBalancer Constructor
@@ -37,9 +39,11 @@ func NewArchitectureAwareBalancer(targets []*middleware.ProxyTarget) *Architectu
3739
log.Printf("Running ArchitectureAwareLB with %d replicas per node in the hash rings\n", REPLICAS)
3840

3941
b := &ArchitectureAwareBalancer{
40-
armRing: NewHashRing(REPLICAS),
41-
x86Ring: NewHashRing(REPLICAS),
42-
rrIndices: make(map[string]int),
42+
armRing: NewHashRing(REPLICAS),
43+
x86Ring: NewHashRing(REPLICAS),
44+
archRRIndex: 0,
45+
armRRIndex: 0,
46+
x86RRIndex: 0,
4347
}
4448

4549
b.mode = config.GetString(config.LB_MODE, RR)
@@ -64,6 +68,29 @@ func (b *ArchitectureAwareBalancer) Next(c echo.Context) *middleware.ProxyTarget
6468
b.mu.Lock()
6569
defer b.mu.Unlock()
6670

71+
if !isInvoke(c) {
72+
log.Printf("c NOT INVOKE: %s\n", c.Path())
73+
// fallback to round-robin
74+
var candidate *middleware.ProxyTarget
75+
var arch string
76+
if len(b.armRing.targetList) == 0 {
77+
arch = container.X86
78+
} else if len(b.x86Ring.targetList) == 0 {
79+
arch = container.ARM
80+
} else {
81+
arch = b.selectArchitectureRR()
82+
}
83+
84+
if arch == container.ARM {
85+
b.armRRIndex = (b.armRRIndex + 1) % len(b.armRing.targetList)
86+
candidate = b.armRing.targetList[b.armRRIndex]
87+
} else {
88+
b.x86RRIndex = (b.x86RRIndex + 1) % len(b.x86Ring.targetList)
89+
candidate = b.x86Ring.targetList[b.x86RRIndex]
90+
}
91+
return candidate
92+
}
93+
6794
funcName := extractFunctionName(c) // get function's name from request's URL
6895
fun, ok := function.GetFunction(funcName) // we use this to leverage cache before asking etcd
6996
if !ok {
@@ -88,7 +115,7 @@ func (b *ArchitectureAwareBalancer) Next(c echo.Context) *middleware.ProxyTarget
88115
bandit := mab.GlobalBanditManager.GetBandit(funcName)
89116
targetArch = bandit.SelectArm(ctx)
90117
} else if b.mode == RR { // RoundRobin
91-
targetArch = b.selectArchitectureRR(funcName) // here the load balancer decides what architecture to use for this function
118+
targetArch = b.selectArchitectureRR() // here the load balancer decides what architecture to use for this function
92119
} else { // Random
93120
targetArch = b.selectArchitectureRandom() // random load balancer for testing purposes
94121
}
@@ -196,14 +223,14 @@ func (b *ArchitectureAwareBalancer) selectArchitecture(fun *function.Function) (
196223
}
197224

198225
// selectArchitectureRR selects the architecture using a Round Robin policy.
199-
func (b *ArchitectureAwareBalancer) selectArchitectureRR(funcName string) string {
226+
func (b *ArchitectureAwareBalancer) selectArchitectureRR() string {
200227

201228
// This is just a function to use as a baseline for the LB. It should actually implement checks over the rings dimension.
202229
// i.e.: it cannot select ARM/X86 "blindly", it should check if we have at least one node for that architecture.
203230
archs := []string{container.ARM, container.X86}
204-
index := b.rrIndices[funcName]
231+
index := b.archRRIndex
205232
selected := archs[index]
206-
b.rrIndices[funcName] = (index + 1) % len(archs)
233+
b.archRRIndex = (index + 1) % len(archs)
207234
return selected
208235
}
209236

internal/lb/architectureAwareLb_test.go

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -157,6 +157,7 @@ func TestGetNodeFromRing(t *testing.T) {
157157
req := httptest.NewRequest(http.MethodPost, "/invoke/testGetNodeFromRingFunc", nil)
158158
rec := httptest.NewRecorder()
159159
c := e.NewContext(req, rec)
160+
c.SetPath("/invoke/testGetNodeFromRingFunc")
160161

161162
// First call
162163
firstTarget := b.Next(c)
@@ -207,6 +208,7 @@ func TestGetArchFallback(t *testing.T) {
207208
req := httptest.NewRequest(http.MethodPost, "/invoke/testGetArchFallbackFunc", nil)
208209
rec := httptest.NewRecorder()
209210
c := e.NewContext(req, rec)
211+
c.SetPath("/invoke/testGetArchFallbackFunc")
210212

211213
// First call
212214
firstTarget := b.Next(c)
@@ -256,6 +258,7 @@ func TestGetArchFallbackNotPossible(t *testing.T) {
256258
req := httptest.NewRequest(http.MethodPost, "/invoke/testGetArchFallbackNotPossibleFunc", nil)
257259
rec := httptest.NewRecorder()
258260
c := e.NewContext(req, rec)
261+
c.SetPath("/invoke/testGetArchFallbackNotPossibleFunc")
259262

260263
// First call
261264
firstTarget := b.Next(c)

internal/lb/architectureUnawareLb.go

Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@ package lb
22

33
import (
44
"log"
5+
"strings"
56
"sync"
67
"time"
78

@@ -18,6 +19,9 @@ type ArchitectureUnawareBalancer struct {
1819

1920
// instead of classic lists we will use hashRings (see hashRing.go) to implement a consistent hashing technique
2021
hashRing *HashRing
22+
23+
// round-robin: used for non-invocation requests
24+
rrIndex int
2125
}
2226

2327
// NewArchitectureUnawareBalancer Constructor
@@ -53,6 +57,14 @@ func (b *ArchitectureUnawareBalancer) Next(c echo.Context) *middleware.ProxyTarg
5357
b.mu.Lock()
5458
defer b.mu.Unlock()
5559

60+
if !isInvoke(c) {
61+
// fallback to round-robin
62+
b.rrIndex = (b.rrIndex + 1) % len(b.hashRing.targetList)
63+
candidate := b.hashRing.targetList[b.rrIndex]
64+
log.Printf("Forwarding %s request to target %s\n", c.Path(), candidate.Name)
65+
return candidate
66+
}
67+
5668
funcName := extractFunctionName(c) // get function's name from request's URL
5769
fun, ok := function.GetFunction(funcName) // we use this to leverage cache before asking etcd
5870
if !ok {
@@ -75,6 +87,10 @@ func (b *ArchitectureUnawareBalancer) Next(c echo.Context) *middleware.ProxyTarg
7587
return candidate
7688
}
7789

90+
func isInvoke(c echo.Context) bool {
91+
return strings.HasPrefix(c.Path(), "/invoke/")
92+
}
93+
7894
// AddTarget Echo requires this method for dynamic load-balancing. It simply inserts a new node in the respective ring.
7995
func (b *ArchitectureUnawareBalancer) AddTarget(t *middleware.ProxyTarget) bool {
8096
b.mu.Lock()

internal/lb/lb.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -24,7 +24,7 @@ var currentTargets []*middleware.ProxyTarget
2424

2525
func newBalancer(targets []*middleware.ProxyTarget) (middleware.ProxyBalancer, bool) {
2626
// old Load Balancer: return middleware.NewRoundRobinBalancer(targets)
27-
isArchAware := config.GetBool(config.Arch_AWARENESS, true)
27+
isArchAware := config.GetBool(config.Arch_AWARENESS, false)
2828

2929
if isArchAware {
3030
return NewArchitectureAwareBalancer(targets), true

0 commit comments

Comments
 (0)