Skip to content

Commit 5a0d419

Browse files
committed
[WIP] Refactored registration package
1 parent 4a93b53 commit 5a0d419

16 files changed

Lines changed: 252 additions & 344 deletions

File tree

cmd/serverledge/main.go

Lines changed: 7 additions & 27 deletions
Original file line numberDiff line numberDiff line change
@@ -9,16 +9,14 @@ import (
99
"os/signal"
1010
"time"
1111

12+
"github.com/labstack/echo/v4"
1213
"github.com/serverledge-faas/serverledge/internal/api"
1314
"github.com/serverledge-faas/serverledge/internal/config"
1415
"github.com/serverledge-faas/serverledge/internal/metrics"
1516
"github.com/serverledge-faas/serverledge/internal/node"
1617
"github.com/serverledge-faas/serverledge/internal/registration"
1718
"github.com/serverledge-faas/serverledge/internal/scheduling"
1819
"github.com/serverledge-faas/serverledge/internal/telemetry"
19-
"github.com/serverledge-faas/serverledge/utils"
20-
21-
"github.com/labstack/echo/v4"
2220
)
2321

2422
func main() {
@@ -32,32 +30,13 @@ func main() {
3230
api.CacheSetup()
3331

3432
// register to etcd, this way server is visible to the others under a given local area
35-
registry := new(registration.Registry)
36-
isInCloud := config.GetBool(config.IS_IN_CLOUD, false)
37-
if isInCloud {
38-
registry.Area = "cloud/" + config.GetString(config.REGISTRY_AREA, "ROME")
39-
} else {
40-
registry.Area = config.GetString(config.REGISTRY_AREA, "ROME")
41-
}
42-
// before register checkout other servers into the local area
43-
//todo use this info later on; future work with active remote server selection
44-
_, err := registry.GetAll(true)
45-
if err != nil {
46-
log.Fatal(err)
47-
}
33+
myArea := config.GetString(config.REGISTRY_AREA, "ROME")
34+
node.LocalNode = node.NewIdentifier(myArea)
4835

49-
defaultAddressStr := "127.0.0.1"
50-
address, err := utils.GetOutboundIp()
51-
if err == nil {
52-
defaultAddressStr = address.String()
53-
}
54-
registration.RegisteredLocalIP = config.GetString(config.API_IP, defaultAddressStr)
55-
56-
myKey, err := registry.RegisterToEtcd()
36+
err := registration.RegisterToEtcd()
5737
if err != nil {
5838
log.Fatal(err)
5939
}
60-
node.NodeIdentifier = myKey
6140

6241
metrics.Init()
6342

@@ -83,13 +62,14 @@ func main() {
8362
e := echo.New()
8463

8564
// Register a signal handler to cleanup things on termination
86-
api.RegisterTerminationHandler(registry, e)
65+
api.RegisterTerminationHandler(e)
8766

8867
schedulingPolicy := api.CreateSchedulingPolicy()
8968
go scheduling.Run(schedulingPolicy)
9069

70+
isInCloud := config.GetBool(config.IS_IN_CLOUD, false)
9171
if !isInCloud {
92-
err = registration.InitEdgeMonitoring(registry)
72+
err = registration.StartMonitoring()
9373
if err != nil {
9474
log.Fatal(err)
9575
}

internal/api/api.go

Lines changed: 3 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -12,7 +12,6 @@ import (
1212
"time"
1313

1414
"github.com/serverledge-faas/serverledge/internal/client"
15-
"github.com/serverledge-faas/serverledge/internal/config"
1615
"github.com/serverledge-faas/serverledge/internal/container"
1716
"github.com/serverledge-faas/serverledge/internal/function"
1817
"github.com/serverledge-faas/serverledge/internal/node"
@@ -74,7 +73,7 @@ func InvokeFunction(c echo.Context) error {
7473
r.Async = invocationRequest.Async
7574
r.ReturnOutput = invocationRequest.ReturnOutput
7675

77-
reqId := fmt.Sprintf("%s-%s%d", fun, node.NodeIdentifier[len(node.NodeIdentifier)-5:], r.Arrival.Nanosecond())
76+
reqId := fmt.Sprintf("%s-%s%d", funcName, node.LocalNode.String()[len(node.LocalNode.String())-5:], r.Arrival.Nanosecond())
7877
r.Ctx = context.WithValue(context.Background(), "ReqId", reqId)
7978

8079
// Tracing
@@ -233,22 +232,18 @@ func GetServerStatus(c echo.Context) error {
233232
node.Resources.RLock()
234233
defer node.Resources.RUnlock()
235234

236-
portNumber := config.GetInt("api.port", 1323)
237-
238-
url := fmt.Sprintf("http://%s:%d", registration.RegisteredLocalIP, portNumber)
239-
240235
loadAvg, err := loadavg.Parse()
241236
loadAvgValues := []float64{-1.0, -1.0, -1.0}
242237
if err == nil {
243238
loadAvgValues = []float64{loadAvg.LoadAverage1, loadAvg.LoadAverage5, loadAvg.LoadAverage10}
244239
}
245240

246241
response := registration.StatusInformation{
247-
Url: url,
242+
Url: registration.SelfRegistration.RemoteURL,
248243
AvailableWarmContainers: node.WarmStatus(),
249244
AvailableMemMB: node.Resources.AvailableMemMB,
250245
AvailableCPUs: node.Resources.AvailableCPUs,
251-
Coordinates: *registration.Reg.Client.GetCoordinate(),
246+
Coordinates: *registration.VivaldiClient.GetCoordinate(),
252247
LoadAvg: loadAvgValues,
253248
}
254249

internal/api/server.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -76,7 +76,7 @@ func CacheSetup() {
7676
cache.GetCacheInstance()
7777
}
7878

79-
func RegisterTerminationHandler(r *registration.Registry, e *echo.Echo) {
79+
func RegisterTerminationHandler(e *echo.Echo) {
8080
c := make(chan os.Signal)
8181
signal.Notify(c, os.Interrupt)
8282

@@ -87,7 +87,7 @@ func RegisterTerminationHandler(r *registration.Registry, e *echo.Echo) {
8787
node.ShutdownAllContainers()
8888

8989
// deregister from etcd; server should be unreachable
90-
err := r.Deregister()
90+
err := registration.Deregister()
9191
if err != nil {
9292
log.Fatal(err)
9393
}

internal/api/workflow.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -213,7 +213,7 @@ func InvokeWorkflow(e echo.Context) error {
213213
req.Async = clientReq.Async
214214
req.Plan = nil
215215
req.Resuming = false
216-
req.Id = fmt.Sprintf("%v-%s%d", wflow.Name, node.NodeIdentifier[len(node.NodeIdentifier)-5:], req.Arrival.Nanosecond())
216+
req.Id = fmt.Sprintf("%v-%s%d", wflow.Name, node.LocalNode.String()[len(node.LocalNode.String())-5:], req.Arrival.Nanosecond())
217217
req.ExecReport.Reports = map[string]*function.ExecutionReport{}
218218

219219
return handleWorkflowInvocation(e, req)

internal/config/keys.go

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -44,15 +44,15 @@ const IS_IN_CLOUD = "cloud"
4444
// the area wich the server belongs to
4545
const REGISTRY_AREA = "registry.area"
4646

47+
// the area that acts as "remote cloud" for this node
48+
const REGISTRY_REMOTE_AREA = "registry.remote.area"
49+
4750
// short period: retrieve information about nearby edge-servers
4851
const REG_NEARBY_INTERVAL = "registry.nearby.interval"
4952

5053
// long period for general monitoring inside the area
5154
const REG_MONITORING_INTERVAL = "registry.monitoring.interval"
5255

53-
// registration TTL in seconds
54-
const REGISTRATION_TTL = "registry.ttl"
55-
5656
// port for udp status listener
5757
const LISTEN_UDP_PORT = "registry.udp.port"
5858

internal/metrics/metrics.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -67,10 +67,10 @@ func Init() {
6767
}
6868

6969
func AddCompletedInvocation(funcName string) {
70-
metricCompletions.With(prometheus.Labels{"function": funcName, "node": node.NodeIdentifier}).Inc()
70+
metricCompletions.With(prometheus.Labels{"function": funcName, "node": node.LocalNode.String()}).Inc()
7171
}
7272
func AddFunctionDurationValue(funcName string, duration float64) {
73-
metricExecutionTime.With(prometheus.Labels{"function": funcName, "node": node.NodeIdentifier}).Observe(duration)
73+
metricExecutionTime.With(prometheus.Labels{"function": funcName, "node": node.LocalNode.String()}).Observe(duration)
7474
}
7575
func AddFunctionOutputSizeValue(funcName string, size float64) {
7676
metricOutputSize.With(prometheus.Labels{"function": funcName}).Observe(size)

internal/metrics/retriever.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -128,15 +128,15 @@ func MetricsRetriever() {
128128
select {
129129
case <-ticker.C:
130130

131-
query := fmt.Sprintf("%s{node=\"%s\"}", COMPLETIONS, node.NodeIdentifier)
131+
query := fmt.Sprintf("%s{node=\"%s\"}", COMPLETIONS, node.LocalNode)
132132
completionsPerFunction, err := retrieveByFunction(query, api, ctx)
133133
if err != nil {
134134
log.Printf("Error in retrieveByFunction: %v\n", err)
135135
}
136136
retrievedMetrics.Completions = completionsPerFunction
137137

138138
query = fmt.Sprintf("%s_sum{node=\"%s\"}/%s_count{node=\"%s\"}",
139-
EXECUTION_TIME, node.NodeIdentifier, EXECUTION_TIME, node.NodeIdentifier)
139+
EXECUTION_TIME, node.LocalNode, EXECUTION_TIME, node.LocalNode)
140140
avgFunDuration, err := retrieveByFunction(query, api, ctx)
141141
if err != nil {
142142
log.Printf("Error in retrieveByFunction: %v\n", err)

internal/node/node.go

Lines changed: 18 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -3,12 +3,29 @@ package node
33
import (
44
"errors"
55
"fmt"
6+
"github.com/lithammer/shortuuid"
7+
"strconv"
68
"sync"
9+
"time"
710
)
811

912
var OutOfResourcesErr = errors.New("not enough resources for function execution")
1013

11-
var NodeIdentifier string
14+
type NodeID struct {
15+
Area string
16+
Key string
17+
}
18+
19+
var LocalNode NodeID
20+
21+
func (n NodeID) String() string {
22+
return fmt.Sprintf("%s/%s", n.Area, n.Key)
23+
}
24+
25+
func NewIdentifier(area string) NodeID {
26+
id := shortuuid.New() + strconv.FormatInt(time.Now().UnixNano(), 10)
27+
return NodeID{Area: area, Key: id}
28+
}
1229

1330
type NodeResources struct {
1431
sync.RWMutex

0 commit comments

Comments
 (0)