@@ -6,9 +6,10 @@ import (
66 "github.com/serverledge-faas/serverledge/internal/node"
77 "golang.org/x/exp/maps"
88 "log"
9- "net/url"
109 "path"
1110 "sort"
11+ "strconv"
12+ "strings"
1213 "sync"
1314 "time"
1415
@@ -19,14 +20,15 @@ import (
1920)
2021
2122const registryBaseDirectory = "registry"
22- const registryLoadBalancerDirectory = "lb "
23+ const registryLoadBalancerDirectory = "__lb "
2324const etcdLeaseTTL = 120
2425
2526var mutex sync.RWMutex
2627
2728var nearestNeighbors []NodeRegistration
2829var neighborInfo map [string ]* StatusInformation
2930var neighbors map [string ]NodeRegistration
31+ var remoteOffloadingTarget NodeRegistration
3032
3133var VivaldiClient * vivaldi.Client
3234var SelfRegistration * NodeRegistration
@@ -42,6 +44,10 @@ func (r *NodeRegistration) toEtcdKey() (key string) {
4244 }
4345}
4446
47+ func (r * NodeRegistration ) APIUrl () (url string ) {
48+ return fmt .Sprintf ("http://%s:%d" , r .IPAddress , r .APIPort )
49+ }
50+
4551func areaEtcdKey (area string ) string {
4652 return fmt .Sprintf ("%s/%s/" , registryBaseDirectory , area )
4753}
@@ -71,14 +77,17 @@ func registerToEtcd(asLoadBalancer bool) error {
7177 etcdLease = resp .ID
7278
7379 registeredLocalIP := config .GetString (config .API_IP , defaultAddressStr )
74- hostport := fmt .Sprintf ("http://%s:%d" , registeredLocalIP , config .GetInt (config .API_PORT , 1323 ))
80+ apiPort := config .GetInt (config .API_PORT , 1323 )
81+ udpPort := config .GetInt (config .LISTEN_UDP_PORT , 9876 )
7582
76- SelfRegistration = & NodeRegistration {NodeID : node .LocalNode , IPAddress : registeredLocalIP , RemoteURL : hostport , IsLoadBalancer : asLoadBalancer }
83+ payload := fmt .Sprintf ("%d;%d" , apiPort , udpPort )
84+
85+ SelfRegistration = & NodeRegistration {NodeID : node .LocalNode , IPAddress : registeredLocalIP , APIPort : apiPort , UDPPort : udpPort , IsLoadBalancer : asLoadBalancer }
7786
7887 // save couple (id, hostport) to the correct Area-dir on etcd
7988 etcdKey := SelfRegistration .toEtcdKey ()
8089 log .Printf ("Registering to etcd: %s\n " , etcdKey )
81- _ , err = etcdClient .Put (ctx , etcdKey , hostport , clientv3 .WithLease (etcdLease ))
90+ _ , err = etcdClient .Put (ctx , etcdKey , payload , clientv3 .WithLease (etcdLease ))
8291 if err != nil {
8392 log .Fatal (IdRegistrationErr )
8493 return IdRegistrationErr
@@ -109,36 +118,73 @@ func keepAliveLease() {
109118 }
110119}
111120
112- // GetAllInArea is used to obtain the list of other server's addresses under a specific local Area
113- func GetAllInArea (area string , includeSelf bool ) (map [string ]NodeRegistration , error ) {
121+ func parseEtcdRegisteredNode (area string , key string , payload []byte ) (NodeRegistration , error ) {
122+ payloadStr := string (payload )
123+ split := strings .Split (payloadStr , ";" )
124+ if len (split ) < 2 {
125+ return NodeRegistration {}, fmt .Errorf ("invalid payload: %s" , payloadStr )
126+ }
127+
128+ apiPort , err := strconv .Atoi (split [0 ])
129+ if err != nil {
130+ return NodeRegistration {}, err
131+ }
132+
133+ udpPort , err := strconv .Atoi (split [1 ])
134+ if err != nil {
135+ return NodeRegistration {}, err
136+ }
137+
138+ return NodeRegistration {NodeID : node.NodeID {Area : area , Key : key }, APIPort : apiPort , UDPPort : udpPort }, nil
139+ }
140+
141+ // GetNodesInArea is used to obtain the list of other server's addresses under a specific local Area
142+ func GetNodesInArea (area string , includeSelf bool , limit int64 ) (map [string ]NodeRegistration , error ) {
114143 baseDir := areaEtcdKey (area )
144+ lbPrefix := path .Join (baseDir , registryLoadBalancerDirectory )
115145
116146 ctx , _ := context .WithTimeout (context .Background (), 3 * time .Second )
117147
118- resp , err := etcdClient .Get (ctx , baseDir , clientv3 .WithPrefix ())
148+ if limit < 0 {
149+ limit = 0 // no limit
150+ }
151+ resp , err := etcdClient .Get (ctx , baseDir , clientv3 .WithPrefix (), clientv3 .WithLimit (limit ))
119152 if err != nil {
120153 return nil , fmt .Errorf ("Could not read from etcd: %v" , err )
121154 }
122155
123156 servers := make (map [string ]NodeRegistration )
124157 for _ , s := range resp .Kvs {
125- key := path .Base (string (s .Key ))
126- if key == registryLoadBalancerDirectory {
127- // TODO: is this check needed?
158+ if strings .HasPrefix (string (s .Key ), lbPrefix ) {
159+ // skip LB
128160 continue
129161 }
162+ key := path .Base (string (s .Key ))
130163 if ! includeSelf && area == SelfRegistration .Area && key == SelfRegistration .Key {
131164 continue
132165 }
133166
134- remoteURL := string (s .Value )
135- servers [key ] = NodeRegistration {NodeID : node.NodeID {Area : area , Key : key }, RemoteURL : remoteURL }
136- fmt .Printf ("Server found: %v\n " , servers [key ])
167+ reg , err := parseEtcdRegisteredNode (area , key , s .Value )
168+ if err == nil {
169+ servers [key ] = reg
170+ fmt .Printf ("Server found: %v\n " , servers [key ])
171+ }
137172 }
138173
139174 return servers , nil
140175}
141176
177+ func GetOneNodeInArea (area string , includeSelf bool ) (NodeRegistration , error ) {
178+ nodes , err := GetNodesInArea (area , includeSelf , 1 )
179+ if err == nil {
180+ for _ , n := range nodes {
181+ return n , nil
182+ }
183+ }
184+
185+ return NodeRegistration {}, err
186+ }
187+
142188func GetLBInArea (area string ) (map [string ]NodeRegistration , error ) {
143189 baseDir := areaEtcdKey (area ) + "/" + registryLoadBalancerDirectory
144190
@@ -152,26 +198,23 @@ func GetLBInArea(area string) (map[string]NodeRegistration, error) {
152198 servers := make (map [string ]NodeRegistration )
153199 for _ , s := range resp .Kvs {
154200 key := path .Base (string (s .Key ))
155- remoteURL := string (s .Value )
156- servers [key ] = NodeRegistration {NodeID : node.NodeID {Area : area , Key : key }, RemoteURL : remoteURL , IsLoadBalancer : true }
157- fmt .Printf ("Server found: %v\n " , servers [key ])
201+ reg , err := parseEtcdRegisteredNode (area , key , s .Value )
202+ if err == nil {
203+ reg .IsLoadBalancer = true
204+ servers [key ] = reg
205+ fmt .Printf ("Server found: %v\n " , servers [key ])
206+ }
158207 }
159208
160209 return servers , nil
161210}
162211
163212// Deregister deletes from etcd the key, value pair previously inserted
164213func Deregister () (e error ) {
165- etcdClient , err := utils .GetEtcdClient ()
166- if err != nil {
167- log .Fatal (UnavailableClientErr )
168- return UnavailableClientErr
169- }
170-
171214 ctx , _ := context .WithTimeout (context .Background (), 1 * time .Second )
172- _ , err = etcdClient .Revoke (ctx , etcdLease )
215+ _ , err : = etcdClient .Revoke (ctx , etcdLease )
173216 if err != nil {
174- return err
217+ log . Printf ( "Error revoking lease: %v" , err )
175218 }
176219
177220 return nil
@@ -217,7 +260,7 @@ func runMonitor() {
217260func globalMonitoring () {
218261
219262 // gets info from Etcd about other nodes in the area
220- newNeighbors , err := GetAllInArea (SelfRegistration .Area , false )
263+ newNeighbors , err := GetNodesInArea (SelfRegistration .Area , false , 0 )
221264 if err != nil {
222265 log .Println (err )
223266 return
@@ -235,6 +278,38 @@ func globalMonitoring() {
235278 delete (neighborInfo , key )
236279 }
237280 }
281+
282+ updateRemoteOffloadingTarget ()
283+ }
284+
285+ func updateRemoteOffloadingTarget () {
286+ // If there is a LB in the remote area, it is used.
287+ // Otherwise, a random node in the area is chosen.
288+
289+ remoteArea := config .GetString (config .REGISTRY_REMOTE_AREA , "" )
290+ if remoteArea == "" {
291+ log .Printf ("No remote area is configured; vertical offloading disabled" )
292+ remoteOffloadingTarget = NodeRegistration {}
293+ return
294+ }
295+
296+ lbs , err := GetLBInArea (remoteArea )
297+ if err != nil {
298+ log .Println (err )
299+ }
300+ if err == nil && len (lbs ) > 0 {
301+ for _ , lb := range lbs {
302+ log .Printf ("Using LB as offloading target: %v" , lb .NodeID )
303+ remoteOffloadingTarget = lb
304+ return
305+ }
306+ }
307+
308+ remoteNode , err := GetOneNodeInArea (remoteArea , false )
309+ if err == nil {
310+ log .Printf ("Using as offloading target: %v" , remoteNode .NodeID )
311+ remoteOffloadingTarget = remoteNode
312+ }
238313}
239314
240315// computeNearestNeighbors finds servers nearby to the current one
@@ -278,22 +353,17 @@ func nearbyMonitoring(vivaldiClient *vivaldi.Client) {
278353 mutex .RUnlock ()
279354
280355 for _ , registeredNode := range peersToUpdate {
281- u , err := url .Parse (registeredNode .RemoteURL )
282- if err != nil {
283- panic (err )
284- }
285- hostname := u .Hostname ()
286- newInfo , rtt := statusInfoRequest (hostname )
356+ newInfo , rtt := statusInfoRequest (& registeredNode )
287357
288358 if newInfo == nil {
289- log .Printf ("Unreachable neighbor: %s\n " , registeredNode .RemoteURL )
359+ log .Printf ("Unreachable neighbor: %s\n " , registeredNode .NodeID )
290360 continue
291361 }
292362
293363 mutex .Lock ()
294364 neighborInfo [registeredNode .Key ] = newInfo
295365
296- _ , err = vivaldiClient .Update ("node" , & newInfo .Coordinates , rtt )
366+ _ , err : = vivaldiClient .Update ("node" , & newInfo .Coordinates , rtt )
297367 if err != nil {
298368 log .Printf ("Error while updating node coordinates: %s\n " , err )
299369 }
@@ -308,6 +378,13 @@ func GetNearestNeighbors() []NodeRegistration {
308378 return nearestNeighbors
309379}
310380
381+ func GetRemoteOffloadingTarget () * NodeRegistration {
382+ if remoteOffloadingTarget .Key != "" {
383+ return & remoteOffloadingTarget
384+ }
385+ return nil
386+ }
387+
311388func GetFullNeighborInfo () map [string ]* StatusInformation {
312389 mutex .RLock ()
313390 defer mutex .RUnlock ()
0 commit comments