Skip to content

Commit b89e37f

Browse files
committed
lb: reimplemented without using middleware.Proxy
1 parent f8ed360 commit b89e37f

5 files changed

Lines changed: 163 additions & 59 deletions

File tree

internal/config/keys.go

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -110,3 +110,5 @@ const WORKFLOW_THRESHOLD_BASED_POLICY_THRESHOLD = "workflow.offloading.policy.th
110110

111111
// Max number of tasks offloaded at once in the threshold-based offloading policy
112112
const WORKFLOW_THRESHOLD_BASED_POLICY_MAX_OFFLOADED = "workflow.offloading.policy.threshold.offloaded.max"
113+
114+
const LOAD_BALANCER_POLICY = "load.balancer.policy"

internal/lb/lb.go

Lines changed: 131 additions & 57 deletions
Original file line numberDiff line numberDiff line change
@@ -3,10 +3,12 @@ package lb
33
import (
44
"errors"
55
"fmt"
6+
"io"
67
"log"
78
"net/http"
89
"net/url"
910
"os"
11+
"sync"
1012
"time"
1113

1214
"github.com/labstack/echo/v4"
@@ -15,92 +17,164 @@ import (
1517
"github.com/serverledge-faas/serverledge/internal/registration"
1618
)
1719

18-
var currentTargets []*middleware.ProxyTarget
19-
20-
func newBalancer(targets []*middleware.ProxyTarget) middleware.ProxyBalancer {
21-
return middleware.NewRoundRobinBalancer(targets)
20+
var client *http.Client
21+
var lbPolicy policy
22+
var targetsMutex sync.RWMutex
23+
var currentTargets map[string]registration.NodeRegistration
24+
25+
func newPolicy() policy {
26+
policyName := config.GetString(config.LOAD_BALANCER_POLICY, "round-robin")
27+
if policyName == "random" {
28+
return &randomPolicy{}
29+
} else {
30+
panic("unknown policy: " + policyName)
31+
}
2232
}
2333

24-
func StartReverseProxy(e *echo.Echo, region string) {
25-
targets, err := getTargets(region)
34+
func handleInvoke(c echo.Context) error {
35+
36+
funcName := c.Param("fun")
37+
// Select backend
38+
targetURL, err := lbPolicy.Route(funcName)
39+
40+
// Create a new HTTP request to forward to the selected backend
41+
newURL, _ := url.JoinPath(targetURL, c.Request().RequestURI)
42+
req, err := http.NewRequest(c.Request().Method, newURL, c.Request().Body)
2643
if err != nil {
27-
log.Printf("Cannot connect to registry to retrieve targets: %v\n", err)
28-
os.Exit(2)
44+
return err
2945
}
46+
// Copy the request headers to the new request
47+
req.Header = c.Request().Header
3048

31-
log.Printf("Initializing with %d targets.\n", len(targets))
32-
balancer := newBalancer(targets)
33-
currentTargets = targets
34-
e.Use(middleware.Proxy(balancer))
49+
// Send the request to the backend using the global HTTP client
50+
resp, err := client.Do(req)
51+
if err != nil {
52+
return err
53+
}
54+
defer resp.Body.Close()
3555

36-
go updateTargets(balancer, region)
56+
if resp.StatusCode == http.StatusOK {
57+
// function has been actually executed
58+
// TODO: update info?
59+
}
3760

38-
portNumber := config.GetInt(config.API_PORT, 1323)
39-
if err := e.Start(fmt.Sprintf(":%d", portNumber)); err != nil && !errors.Is(err, http.ErrServerClosed) {
40-
e.Logger.Fatal("shutting down the server")
61+
res := c.Response()
62+
63+
// Copy response headers
64+
for key, values := range resp.Header {
65+
for _, value := range values {
66+
res.Header().Add(key, value)
67+
}
4168
}
69+
70+
// Set status code and copy body
71+
res.WriteHeader(resp.StatusCode)
72+
_, err = io.Copy(res.Writer, resp.Body)
73+
return err
74+
4275
}
4376

44-
func getTargets(region string) ([]*middleware.ProxyTarget, error) {
45-
cloudNodes, err := registration.GetNodesInArea(region, false, 0)
77+
func handleOtherRequest(c echo.Context) error {
78+
79+
res := c.Response()
80+
// TODO: status should be handled differently
81+
82+
var targetURL string
83+
for _, target := range currentTargets {
84+
targetURL = target.APIUrl()
85+
break
86+
}
87+
// Create a new HTTP request to forward to the selected backend
88+
newURL, _ := url.JoinPath(targetURL, c.Request().RequestURI)
89+
req, err := http.NewRequest(c.Request().Method, newURL, c.Request().Body)
90+
if err != nil {
91+
return err
92+
}
93+
// Copy the request headers to the new request
94+
req.Header = c.Request().Header
95+
96+
// Send the request to the backend using the global HTTP client
97+
resp, err := client.Do(req)
4698
if err != nil {
47-
return nil, err
99+
return err
48100
}
101+
defer resp.Body.Close()
49102

50-
targets := make([]*middleware.ProxyTarget, 0, len(cloudNodes))
51-
for _, target := range cloudNodes {
52-
log.Printf("Found target: %v\n", target.Key)
53-
// TODO: etcd should NOT contain URLs, but only host and port...
54-
parsedUrl, err := url.Parse(target.APIUrl())
55-
if err != nil {
56-
return nil, err
103+
// Copy response headers
104+
for key, values := range resp.Header {
105+
for _, value := range values {
106+
res.Header().Add(key, value)
57107
}
58-
targets = append(targets, &middleware.ProxyTarget{Name: target.Key, URL: parsedUrl})
59108
}
60109

61-
log.Printf("Found %d targets\n", len(targets))
110+
// Set status code and copy body
111+
res.WriteHeader(resp.StatusCode)
112+
_, err = io.Copy(res.Writer, resp.Body)
113+
return err
62114

63-
return targets, nil
64115
}
65116

66-
func updateTargets(balancer middleware.ProxyBalancer, region string) {
117+
func StartReverseProxy(e *echo.Echo, region string) {
118+
var err error
119+
120+
currentTargets, err = registration.GetNodesInArea(region, false, 0)
121+
if err != nil {
122+
log.Printf("Cannot connect to registry to retrieve targets: %v\n", err)
123+
os.Exit(2)
124+
}
125+
126+
log.Printf("Initializing with %d targets.\n", len(currentTargets))
127+
lbPolicy = newPolicy()
128+
129+
go updateTargets(region)
130+
131+
tr := &http.Transport{
132+
MaxIdleConns: 2500,
133+
MaxIdleConnsPerHost: 2500,
134+
MaxConnsPerHost: 0,
135+
IdleConnTimeout: 10 * time.Minute,
136+
}
137+
client = &http.Client{Transport: tr}
138+
139+
e.HideBanner = true
140+
e.Use(middleware.Recover())
141+
142+
// Routes
143+
e.POST("/invoke/:fun", handleInvoke)
144+
e.Any("/*", handleOtherRequest)
145+
146+
portNumber := config.GetInt(config.API_PORT, 1323)
147+
if err := e.Start(fmt.Sprintf(":%d", portNumber)); err != nil && !errors.Is(err, http.ErrServerClosed) {
148+
e.Logger.Fatal("shutting down the server")
149+
}
150+
151+
}
152+
153+
func updateTargets(region string) {
67154
for {
68-
time.Sleep(30 * time.Second) // TODO: configure
155+
time.Sleep(10 * time.Second) // TODO: configure
69156

70-
targets, err := getTargets(region)
71-
if err != nil {
157+
newTargets, err := registration.GetNodesInArea(region, false, 0)
158+
if err != nil || newTargets == nil {
72159
log.Printf("Cannot update targets: %v\n", err)
73160
}
74161

75-
toKeep := make([]bool, len(currentTargets))
76-
for i := range currentTargets {
77-
toKeep[i] = false
78-
}
79-
for _, t := range targets {
80-
toAdd := true
81-
for i, curr := range currentTargets {
82-
if curr.Name == t.Name {
83-
toKeep[i] = true
84-
toAdd = false
85-
}
86-
}
87-
if toAdd {
88-
log.Printf("Adding %s\n", t.Name)
89-
balancer.AddTarget(t)
162+
targetsMutex.Lock()
163+
for k, _ := range currentTargets {
164+
if _, ok := newTargets[k]; !ok {
165+
// this target is not present any more
166+
log.Printf("Removing target: %s\n", k)
90167
}
91168
}
92169

93-
toRemove := make([]string, 0)
94-
for i, curr := range currentTargets {
95-
if !toKeep[i] {
96-
log.Printf("Removing %s\n", curr.Name)
97-
toRemove = append(toRemove, curr.Name)
170+
for k, _ := range newTargets {
171+
if _, ok := currentTargets[k]; !ok {
172+
// this target was not present
173+
log.Printf("Adding new target: %s\n", k)
98174
}
99175
}
100-
for _, curr := range toRemove {
101-
balancer.RemoveTarget(curr)
102-
}
103176

104-
currentTargets = targets
177+
currentTargets = newTargets
178+
targetsMutex.Unlock()
105179
}
106180
}

internal/lb/policy.go

Lines changed: 29 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,29 @@
1+
package lb
2+
3+
import (
4+
"errors"
5+
"math/rand"
6+
)
7+
8+
type policy interface {
9+
Route(funcName string) (string, error)
10+
}
11+
12+
type randomPolicy struct{}
13+
14+
func (r *randomPolicy) Route(funcName string) (string, error) {
15+
targetsMutex.RLock()
16+
defer targetsMutex.RUnlock()
17+
18+
skip := rand.Intn(len(currentTargets))
19+
i := 0
20+
21+
for _, value := range currentTargets {
22+
if i == skip {
23+
return value.APIUrl(), nil
24+
}
25+
i++
26+
}
27+
28+
return "", errors.New("no targets")
29+
}

internal/registration/registry.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -174,7 +174,7 @@ func GetNodesInArea(area string, includeSelf bool, limit int64) (map[string]Node
174174
reg, err := parseEtcdRegisteredNode(area, key, s.Value)
175175
if err == nil {
176176
servers[key] = reg
177-
fmt.Printf("Server found: %v (%v-udp:%d)\n", servers[key], reg.IPAddress, reg.UDPPort)
177+
//fmt.Printf("Server found: %v (%v-udp:%d)\n", servers[key], reg.IPAddress, reg.UDPPort)
178178
}
179179
}
180180

internal/scheduling/offloading.go

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -39,7 +39,6 @@ func Offload(r *scheduledRequest, serverUrl string) error {
3939
sendingTime := time.Now() // used to compute latency later on
4040
resp, err := offloadingClient.Post(serverUrl+"/invoke/"+r.Fun.Name, "application/json",
4141
bytes.NewBuffer(invocationBody))
42-
4342
if err != nil {
4443
log.Print(err)
4544
return err

0 commit comments

Comments
 (0)