Skip to content

Commit 5dba34c

Browse files
ValerioValerio CreccoLudovico De Santis
authored andcommitted
Workflow extention of Serverledge
Co-authored-by: Valerio Crecco <valerio.crecco@alumni.uniroma2.eu> Co-authored-by: Ludovico De Santis <ludovico.desantis@alumni.uniroma2.eu>
1 parent 84df18f commit 5dba34c

77 files changed

Lines changed: 6536 additions & 712 deletions

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

cmd/lb/main.go

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -47,7 +47,13 @@ func main() {
4747
// TODO: split Area in Region + Type (e.g., cloud/lb/edge)
4848
region := config.GetString(config.REGISTRY_AREA, "ROME")
4949
registry := &registration.Registry{Area: "lb/" + region}
50-
hostport := fmt.Sprintf("http://%s:%d", utils.GetIpAddress().String(), config.GetInt(config.API_PORT, 1323))
50+
ipAddress, err := utils.GetOutboundIp()
51+
if err != nil {
52+
log.Printf("Could not get ip address: %v\n", err)
53+
return
54+
}
55+
56+
hostport := fmt.Sprintf("http://%s:%d", ipAddress.String(), config.GetInt(config.API_PORT, 1323))
5157
if _, err := registry.RegisterToEtcd(hostport); err != nil {
5258
log.Printf("Could not register to Etcd: %v\n", err)
5359
}

cmd/serverledge/main.go

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -119,7 +119,11 @@ func main() {
119119
log.Fatal(err)
120120
}
121121

122-
ip := config.GetString(config.API_IP, utils.GetIpAddress().String())
122+
address, err := utils.GetOutboundIp()
123+
if err != nil {
124+
log.Fatalf("failed to get ip address: %v", err)
125+
}
126+
ip := config.GetString(config.API_IP, address.String())
123127
url := fmt.Sprintf("http://%s:%d", ip, config.GetInt(config.API_PORT, 1323))
124128
myKey, err := registry.RegisterToEtcd(url)
125129
if err != nil {

internal/api/api.go

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -203,7 +203,13 @@ func GetServerStatus(c echo.Context) error {
203203
node.Resources.RLock()
204204
defer node.Resources.RUnlock()
205205
portNumber := config.GetInt("api.port", 1323)
206-
url := fmt.Sprintf("http://%s:%d", utils.GetIpAddress().String(), portNumber)
206+
207+
address, err := utils.GetOutboundIp()
208+
if err != nil {
209+
return c.String(http.StatusServiceUnavailable, err.Error())
210+
}
211+
212+
url := fmt.Sprintf("http://%s:%d", address.String(), portNumber)
207213
response := registration.StatusInformation{
208214
Url: url,
209215
AvailableWarmContainers: node.WarmStatus(),

internal/api/composition.go

Lines changed: 26 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,10 @@ import (
55
"encoding/json"
66
"errors"
77
"fmt"
8+
"io"
9+
"net/http"
10+
"time"
11+
812
"github.com/cornelk/hashmap"
913
"github.com/grussorusso/serverledge/internal/client"
1014
"github.com/grussorusso/serverledge/internal/container"
@@ -14,9 +18,6 @@ import (
1418
"github.com/grussorusso/serverledge/internal/node"
1519
"github.com/labstack/echo/v4"
1620
"github.com/labstack/gommon/log"
17-
"io"
18-
"net/http"
19-
"time"
2021
)
2122

2223
// ===== Function Composition =====
@@ -31,7 +32,7 @@ func CreateFunctionCompositionFromASL(e echo.Context) error {
3132

3233
err := json.Unmarshal(body, &creationRequest)
3334
if err != nil && err != io.EOF {
34-
log.Printf("Could not parse request: %v", err)
35+
log.Printf("Could not parse compose request - error during unmarshal: %v", err)
3536
return err
3637
}
3738

@@ -49,7 +50,8 @@ func CreateFunctionCompositionFromASL(e echo.Context) error {
4950
log.Printf("Could not decode composition source ASL: %v", err)
5051
return e.JSON(http.StatusBadRequest, "composition already exists")
5152
}
52-
comp, err := fc.FromASL(creationRequest.Name, decodedSrc[:])
53+
54+
comp, err := fc.FromASL(creationRequest.Name, creationRequest.RmFnOnDeletionFlag, decodedSrc[:])
5355
if err != nil {
5456
log.Printf("Could not parse composition from ASL: %v", err)
5557
return e.JSON(http.StatusBadRequest, "composition already exists")
@@ -76,7 +78,7 @@ func CreateFunctionComposition(e echo.Context) error {
7678

7779
err := json.Unmarshal(body, &comp)
7880
if err != nil && err != io.EOF {
79-
log.Printf("Could not parse request: %v", err)
81+
log.Printf("Could not parse composition request - error during unmarshal: %v", err)
8082
return err
8183
}
8284
// checking if the function already exists. If exists we return an error
@@ -97,6 +99,10 @@ func CreateFunctionComposition(e echo.Context) error {
9799
log.Printf("Dropping request for composition with non-existing function '%s'", fName)
98100
return e.JSON(http.StatusBadRequest, "composition with non-existing function")
99101
}
102+
if f.Signature == nil {
103+
return e.JSON(http.StatusBadRequest, "function "+fName+"has nil signature")
104+
}
105+
100106
funcs[fName] = f
101107
}
102108
comp.Functions = funcs
@@ -137,7 +143,7 @@ func DeleteFunctionComposition(c echo.Context) error {
137143
// here we only need the name of the function composition (and if all function should be deleted with it)
138144
err := json.NewDecoder(c.Request().Body).Decode(&comp)
139145
if err != nil && err != io.EOF {
140-
log.Printf("Could not parse request: %v", err)
146+
log.Printf("Could not parse delete request - error during decoding: %v", err)
141147
return err
142148
}
143149

@@ -189,7 +195,7 @@ func InvokeFunctionComposition(e echo.Context) error {
189195
var fcInvocationRequest client.CompositionInvocationRequest
190196
err := json.NewDecoder(e.Request().Body).Decode(&fcInvocationRequest)
191197
if err != nil && err != io.EOF {
192-
log.Printf("Could not parse request: %v", err)
198+
log.Printf("Could not parse invoke request - error during decoding: %v", err)
193199
return e.JSON(http.StatusInternalServerError, "failed to parse composition invocation request. Check parameters and composition definition")
194200
}
195201
// gets a fc.CompositionRequest from the pool goroutine-safe cache.
@@ -214,11 +220,6 @@ func InvokeFunctionComposition(e echo.Context) error {
214220
OffloadLatency: 0,
215221
SchedAction: "",
216222
})
217-
218-
//fcReq.ExecReport.Reports[execReportId] = &function.ExecutionReport{
219-
// SchedAction: "",
220-
// OffloadLatency: 0.0,
221-
//}
222223
}
223224

224225
if fcReq.Async {
@@ -242,6 +243,17 @@ func InvokeFunctionComposition(e echo.Context) error {
242243
}
243244
return e.JSON(http.StatusInternalServerError, v)
244245
} else {
245-
return e.JSON(http.StatusOK, fc.CompositionResponse{Success: true, CompositionExecutionReport: fcReq.ExecReport})
246+
reports := make(map[string]*function.ExecutionReport)
247+
fcReq.ExecReport.Reports.Range(func(id fc.ExecutionReportId, report *function.ExecutionReport) bool {
248+
reports[string(id)] = report
249+
return true
250+
})
251+
252+
return e.JSON(http.StatusOK, fc.CompositionResponse{
253+
Success: true,
254+
Result: fcReq.ExecReport.Result,
255+
Reports: reports,
256+
ResponseTime: fcReq.ExecReport.ResponseTime,
257+
})
246258
}
247259
}

internal/api/server.go

Lines changed: 6 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -4,18 +4,19 @@ import (
44
"context"
55
"errors"
66
"fmt"
7+
"log"
8+
"net/http"
9+
"os"
10+
"os/signal"
11+
"time"
12+
713
"github.com/grussorusso/serverledge/internal/cache"
814
"github.com/grussorusso/serverledge/internal/config"
915
"github.com/grussorusso/serverledge/internal/node"
1016
"github.com/grussorusso/serverledge/internal/registration"
1117
"github.com/grussorusso/serverledge/internal/scheduling"
1218
"github.com/labstack/echo/v4"
1319
"github.com/labstack/echo/v4/middleware"
14-
"log"
15-
"net/http"
16-
"os"
17-
"os/signal"
18-
"time"
1920
)
2021

2122
func StartAPIServer(e *echo.Echo) {

internal/asl/catch.go

Lines changed: 29 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,29 @@
1+
package asl
2+
3+
import (
4+
"github.com/grussorusso/serverledge/internal/types"
5+
"golang.org/x/exp/slices"
6+
)
7+
8+
// Catch is a field in Task, Parallel and Map states. When a state reports an error and either there is no Retrier, or retries have failed to resolve the error, the interpreter scans through the Catchers in array order, and when the Error Name appears in the value of a Catcher’s "ErrorEquals" field, transitions the machine to the state named in the value of the "Next" field. The reserved name "States.ALL" appearing in a Retrier’s "ErrorEquals" field is a wildcard and matches any Error Name.
9+
type Catch struct {
10+
ErrorEquals []string
11+
ResultPath string
12+
Next string
13+
}
14+
15+
func (c *Catch) Equals(cmp types.Comparable) bool {
16+
c2 := cmp.(*Catch)
17+
18+
return slices.Equal(c.ErrorEquals, c2.ErrorEquals) &&
19+
c.ResultPath == c2.ResultPath &&
20+
c.Next == c2.Next
21+
}
22+
23+
type CanCatch interface {
24+
GetCatchOpt() Catch
25+
}
26+
27+
func NoCatch() *Catch {
28+
return &Catch{}
29+
}

internal/asl/choice.go

Lines changed: 123 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,123 @@
1+
package asl
2+
3+
import (
4+
"fmt"
5+
6+
"github.com/buger/jsonparser"
7+
"github.com/grussorusso/serverledge/internal/types"
8+
"github.com/labstack/gommon/log"
9+
)
10+
11+
type ChoiceState struct {
12+
Type StateType // Necessary
13+
Choices []ChoiceRule // Necessary. All DataTestExpression must be State Machine with an end, like fc.ChoiceNode(s).
14+
InputPath Path // Optional
15+
OutputPath Path // Optional
16+
// Default is the default state to execute when no other DataTestExpression matches
17+
Default string // Optional, but to avoid errors it is highly recommended.
18+
}
19+
20+
func (c *ChoiceState) ParseFrom(jsonData []byte) (State, error) {
21+
c.Type = StateType(JsonExtractStringOrDefault(jsonData, "Type", "Choice"))
22+
c.InputPath = JsonExtractRefPathOrDefault(jsonData, "InputPath", "")
23+
c.OutputPath = JsonExtractRefPathOrDefault(jsonData, "OutputPath", "")
24+
c.Default = JsonExtractStringOrDefault(jsonData, "Default", "")
25+
26+
choiceRules := make([]ChoiceRule, 0)
27+
28+
choices, errChoice := JsonExtract(jsonData, "Choices")
29+
if errChoice != nil {
30+
return nil, fmt.Errorf("failed to parse Choices %v", errChoice)
31+
}
32+
33+
_, _ = jsonparser.ArrayEach(choices, func(value []byte, dataType jsonparser.ValueType, offset int, err error) {
34+
cr, errR := ParseRule(value)
35+
if errR != nil {
36+
log.Errorf("failed to parse choice rule %d: %v", offset, err)
37+
return
38+
}
39+
choiceRules = append(choiceRules, cr)
40+
})
41+
//if errArr != nil {
42+
// return nil, fmt.Errorf("error %v when parsing choice rule %s", errArr, choices[:offset])
43+
//}
44+
c.Choices = choiceRules
45+
46+
return c, nil
47+
}
48+
49+
func (c *ChoiceState) Validate(stateNames []string) error {
50+
if c.Default == "" {
51+
log.Warn("Default choice not specified")
52+
}
53+
return nil
54+
}
55+
56+
func NewEmptyChoice() *ChoiceState {
57+
return &ChoiceState{
58+
Type: Choice,
59+
Choices: []ChoiceRule{},
60+
InputPath: "",
61+
OutputPath: "",
62+
Default: "",
63+
}
64+
}
65+
66+
func (c *ChoiceState) GetType() StateType {
67+
return Choice
68+
}
69+
70+
// GetNext for ChoiceState returns the Default branch instead of next
71+
func (c *ChoiceState) GetNext() (string, bool) {
72+
if c.Default != "" {
73+
return c.Default, true
74+
}
75+
return "", false
76+
}
77+
78+
// IsEndState always returns true for a ChoiceState, because it is always a terminal state.
79+
func (c *ChoiceState) IsEndState() bool {
80+
return true
81+
}
82+
83+
func (c *ChoiceState) Equals(cmp types.Comparable) bool {
84+
c2 := cmp.(*ChoiceState)
85+
86+
if len(c.Choices) != len(c2.Choices) {
87+
return false
88+
}
89+
90+
for i, c1 := range c.Choices {
91+
if !c1.Equals(c2.Choices[i]) {
92+
return false
93+
}
94+
}
95+
96+
return c.Type == c2.Type &&
97+
c.InputPath == c2.InputPath &&
98+
c.OutputPath == c2.OutputPath &&
99+
c.Default == c2.Default
100+
}
101+
102+
func (c *ChoiceState) String() string {
103+
str := fmt.Sprint("{",
104+
"\n\t\t\tType: ", c.Type,
105+
"\n\t\t\tDefault: ", c.Default,
106+
"\n\t\t\tChoices: [")
107+
for i, c1 := range c.Choices {
108+
str += c1.String()
109+
if i < len(c.Choices)-1 {
110+
str += ","
111+
}
112+
}
113+
str += "\n\t\t\t]\n"
114+
115+
if c.InputPath != "" {
116+
str += fmt.Sprintf("\t\t\tInputPath: %s\n", c.InputPath)
117+
}
118+
if c.OutputPath != "" {
119+
str += fmt.Sprintf("\t\t\tOutputPath: %s\n", c.OutputPath)
120+
}
121+
str += "\t\t}"
122+
return str
123+
}

0 commit comments

Comments
 (0)