Skip to content

Commit e30eb67

Browse files
committed
Completed workflow offloading mechanism
1 parent c41138c commit e30eb67

7 files changed

Lines changed: 162 additions & 24 deletions

File tree

internal/api/server.go

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -31,6 +31,7 @@ func StartAPIServer(e *echo.Echo) {
3131
e.GET("/status", GetServerStatus)
3232
// Workflow routes
3333
e.POST("/workflow/invoke/:workflow", InvokeWorkflow)
34+
e.POST("/workflow/resume/:workflow", ResumeWorkflow)
3435
e.POST("/workflow/create", CreateWorkflowFromASL)
3536
e.POST("/workflow/import", CreateWorkflow)
3637
e.POST("/workflow/delete", DeleteWorkflow)

internal/api/workflow.go

Lines changed: 37 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -142,6 +142,36 @@ func DeleteWorkflow(c echo.Context) error {
142142
return c.JSON(http.StatusOK, response)
143143
}
144144

145+
// ResumeWorkflow handles a workflow invocation resume request (workflow offloading).
146+
func ResumeWorkflow(e echo.Context) error {
147+
workflowName := e.Param("workflow")
148+
wflow, ok := workflow.Get(workflowName)
149+
if !ok {
150+
log.Printf("Dropping request for unknown workflow '%s'", workflowName)
151+
return e.JSON(http.StatusNotFound, "function workflow '"+workflowName+"' does not exist")
152+
}
153+
154+
var clientReq client.WorkflowInvocationResumeRequest
155+
err := json.NewDecoder(e.Request().Body).Decode(&clientReq)
156+
if err != nil && err != io.EOF {
157+
log.Printf("Could not parse invoke request - error during decoding: %v", err)
158+
return e.JSON(http.StatusInternalServerError, "failed to parse workflow invocation request. Check parameters and workflow definition")
159+
}
160+
161+
req := workflowInvocationRequestPool.Get().(*workflow.Request)
162+
req.W = wflow
163+
req.Params = clientReq.Params
164+
req.Arrival = time.Now()
165+
req.QoS = clientReq.QoS
166+
req.CanDoOffloading = clientReq.CanDoOffloading
167+
req.Async = clientReq.Async
168+
req.Resuming = true
169+
req.Id = clientReq.ReqId
170+
req.ExecReport.Reports = map[string]*function.ExecutionReport{}
171+
172+
return handleWorkflowInvocation(e, req)
173+
}
174+
145175
// InvokeWorkflow handles a function workflow invocation request.
146176
func InvokeWorkflow(e echo.Context) error {
147177
workflowName := e.Param("workflow")
@@ -165,10 +195,15 @@ func InvokeWorkflow(e echo.Context) error {
165195
req.QoS = clientReq.QoS
166196
req.CanDoOffloading = clientReq.CanDoOffloading
167197
req.Async = clientReq.Async
168-
198+
req.Resuming = false
169199
req.Id = fmt.Sprintf("%v-%s%d", wflow.Name, node.NodeIdentifier[len(node.NodeIdentifier)-5:], req.Arrival.Nanosecond())
170200
req.ExecReport.Reports = map[string]*function.ExecutionReport{}
171201

202+
return handleWorkflowInvocation(e, req)
203+
}
204+
205+
func handleWorkflowInvocation(e echo.Context, req *workflow.Request) error {
206+
172207
if req.Async {
173208
go func() {
174209
_, errInvoke := req.W.Invoke(req)
@@ -194,7 +229,7 @@ func InvokeWorkflow(e echo.Context) error {
194229
}
195230

196231
// Synchronous execution of the workflow
197-
_, err = req.W.Invoke(req)
232+
_, err := req.W.Invoke(req)
198233

199234
defer workflowInvocationRequestPool.Put(req)
200235

internal/cli/cli.go

Lines changed: 2 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -470,11 +470,8 @@ func invokeWorkflow(cmd *cobra.Command, args []string) {
470470
}
471471

472472
// Prepare request // TODO: it's ok to reuse the same type that function invocation uses?
473-
request := client.InvocationRequest{
474-
Params: paramsMap,
475-
QoSClass: api.DecodeServiceClass(qosClass),
476-
// QoSClass: qosClass,
477-
QoSMaxRespT: qosMaxRespT,
473+
request := client.WorkflowInvocationRequest{
474+
Params: paramsMap,
478475
CanDoOffloading: true,
479476
Async: asyncInvocation}
480477
invocationBody, err := json.Marshal(request)

internal/client/types.go

Lines changed: 6 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -26,13 +26,12 @@ type WorkflowInvocationRequest struct {
2626
QoS function.RequestQoS
2727
CanDoOffloading bool
2828
Async bool
29-
// NextNodes []string // DagNodeId
30-
// we do not add Progress here, only the next group of node that should execute
31-
// in case of choice node, we retrieve the progress for each taskId and execute only the one that is not in Skipped State
32-
// in case of fan out node, we retrieve all the progress and execute concurrently all the tasks in the group.
33-
// in case of fan in node, we retrieve periodically all the progress of the previous nodes and start the merging only when all previous node are completed.
34-
// or simply, we can get the N partialData for the Fan Out, coming from the previous nodes.
35-
// furthermore, we should be careful not to run multiple fanIn at the same time!
29+
}
30+
31+
// WorkflowInvocationResumeRequest is a request to resume the execution of a workflow (typically on a remote node)
32+
type WorkflowInvocationResumeRequest struct {
33+
ReqId string
34+
WorkflowInvocationRequest
3635
}
3736

3837
type WorkflowCreationRequest struct {
Lines changed: 33 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,33 @@
1+
package workflow
2+
3+
type OffloadingDecision struct {
4+
Offload bool `json:"offload"`
5+
RemoteHost string `json:"remote_host"`
6+
}
7+
8+
type OffloadingPolicy interface {
9+
Evaluate(r *Request, p *Progress) (OffloadingDecision, error)
10+
}
11+
12+
type SimpleOffloadingPolicy struct{}
13+
14+
func (policy *SimpleOffloadingPolicy) Evaluate(r *Request, p *Progress) (OffloadingDecision, error) {
15+
16+
completed := 0
17+
18+
if p == nil || !r.CanDoOffloading || len(p.ReadyToExecute) == 0 {
19+
return OffloadingDecision{Offload: false}, nil
20+
}
21+
22+
for _, s := range p.Status {
23+
if s == Executed {
24+
completed++
25+
}
26+
}
27+
28+
if completed >= 2 {
29+
return OffloadingDecision{true, "127.0.0.1:1323"}, nil
30+
}
31+
32+
return OffloadingDecision{Offload: false}, nil
33+
}

internal/workflow/progress.go

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@ import (
55
"encoding/json"
66
"errors"
77
"fmt"
8+
"log"
89
"sync"
910
"time"
1011

@@ -283,6 +284,7 @@ func RetrieveProgress(reqId ReqId, tryFromEtcd bool) (*Progress, bool) {
283284
// cache miss - retrieve progress from ETCD
284285
progress, err = getProgressFromEtcd(reqId)
285286
if err != nil {
287+
log.Printf("failed to retrieve progress from Etcd %v: %v", reqId, err)
286288
return nil, false
287289
}
288290
// insert a new element to the cache
@@ -345,6 +347,7 @@ func saveProgressToEtcd(p *Progress) error {
345347
}
346348
// saves the json object into etcd
347349
key := getProgressEtcdKey(p.ReqId)
350+
log.Printf("Saving progress with key: %s", key)
348351
progressMutexEtcd.Lock()
349352
defer progressMutexEtcd.Unlock()
350353
_, err = cli.Put(ctx, key, string(payload))

internal/workflow/workflow.go

Lines changed: 80 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,9 @@ import (
55
"encoding/json"
66
"errors"
77
"fmt"
8+
"github.com/serverledge-faas/serverledge/internal/client"
9+
"io"
10+
"net/http"
811
"sort"
912
"time"
1013

@@ -17,8 +20,7 @@ import (
1720
"github.com/serverledge-faas/serverledge/internal/types"
1821
)
1922

20-
// used to send output from parallel tasks to fan in task or to the next task
21-
// var outputChannel = make(chan map[string]interface{})
23+
var offloadingPolicy OffloadingPolicy = &SimpleOffloadingPolicy{} // TODO: handle initialization elsewhere
2224

2325
// Workflow is a Workflow to drive the execution of the workflow
2426
type Workflow struct {
@@ -456,22 +458,90 @@ func (workflow *Workflow) Invoke(r *Request) (ExecutionReport, error) {
456458

457459
shouldContinue := true
458460
for shouldContinue {
459-
// TODO: introduce a workflow offloading policy
461+
decision, err := offloadingPolicy.Evaluate(r, progress)
462+
if err == nil && decision.Offload {
463+
err = offload(r, decision.RemoteHost, progress, pd)
464+
if err != nil {
465+
return ExecutionReport{}, err
466+
}
467+
shouldContinue = false
468+
} else {
469+
pd, progress, shouldContinue, err = workflow.Execute(r, pd, progress)
470+
if err != nil {
471+
return ExecutionReport{}, fmt.Errorf("failed workflow execution: %v", err)
472+
}
460473

461-
// TODO: if local execution:
462-
// executing workflow
463-
pd, progress, shouldContinue, err = workflow.Execute(r, pd, progress)
464-
if err != nil {
465-
return ExecutionReport{}, fmt.Errorf("failed workflow execution: %v", err)
474+
if !shouldContinue {
475+
r.ExecReport.Result = pd.Data
476+
}
466477
}
467-
}
468478

469-
r.ExecReport.Result = pd.Data
479+
}
470480

471481
// TODO: remove r.ExecReport
472482
return r.ExecReport, nil
473483
}
474484

485+
func offload(r *Request, hostPort string, progress *Progress, pd *PartialData) error {
486+
487+
err := saveProgressToEtcd(progress)
488+
if err != nil {
489+
return fmt.Errorf("Could not save progress: %v", err)
490+
}
491+
err = savePartialDataToEtcd(pd)
492+
if err != nil {
493+
return fmt.Errorf("Could not save partial data: %v", err)
494+
}
495+
496+
request := client.WorkflowInvocationResumeRequest{
497+
ReqId: r.Id,
498+
WorkflowInvocationRequest: client.WorkflowInvocationRequest{
499+
Params: r.Params,
500+
CanDoOffloading: false,
501+
Async: r.Async,
502+
},
503+
}
504+
invocationBody, err := json.Marshal(request)
505+
if err != nil {
506+
return fmt.Errorf("JSON marshaling failed: %v", err)
507+
}
508+
509+
// Send invocation request
510+
url := fmt.Sprintf("http://%s/workflow/resume/%s", hostPort, r.W.Name)
511+
resp, err := utils.PostJson(url, invocationBody)
512+
if err != nil {
513+
return fmt.Errorf("HTTP request for offloading failed: %v", err)
514+
}
515+
516+
if resp.StatusCode != http.StatusOK {
517+
return fmt.Errorf("failed offloaded workflow: %v", err)
518+
}
519+
520+
if r.Async {
521+
// no need to parse and return response
522+
return nil
523+
}
524+
525+
var response InvocationResponse
526+
body, _ := io.ReadAll(resp.Body)
527+
err = json.Unmarshal(body, &response)
528+
if err != nil {
529+
return fmt.Errorf("Failed InvocationResponse unmarshaling: %v", err)
530+
}
531+
532+
if !response.Success {
533+
return fmt.Errorf("failed offloaded workflow: %v", err)
534+
}
535+
536+
r.ExecReport.Result = response.Result
537+
538+
for k, v := range response.Reports {
539+
r.ExecReport.Reports[k] = v
540+
}
541+
542+
return nil
543+
}
544+
475545
// Delete removes the Workflow from cache and from etcd, so it cannot be invoked anymore
476546
func (workflow *Workflow) Delete() error {
477547
cli, err := utils.GetEtcdClient()

0 commit comments

Comments
 (0)