Skip to content

Commit b1684de

Browse files
committed
Basic ExecutionPlan in resuming requests
1 parent 34d4993 commit b1684de

5 files changed

Lines changed: 103 additions & 59 deletions

File tree

internal/api/workflow.go

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -151,7 +151,7 @@ func ResumeWorkflow(e echo.Context) error {
151151
return e.JSON(http.StatusNotFound, "function workflow '"+workflowName+"' does not exist")
152152
}
153153

154-
var clientReq client.WorkflowInvocationResumeRequest
154+
var clientReq workflow.WorkflowInvocationResumeRequest
155155
err := json.NewDecoder(e.Request().Body).Decode(&clientReq)
156156
if err != nil && err != io.EOF {
157157
log.Printf("Could not parse invoke request - error during decoding: %v", err)
@@ -169,6 +169,12 @@ func ResumeWorkflow(e echo.Context) error {
169169
req.Id = clientReq.ReqId
170170
req.ExecReport.Reports = map[string]*function.ExecutionReport{}
171171

172+
if clientReq.Plan.ToExecute != nil {
173+
req.Plan = &workflow.ExecutionPlan{ToExecute: clientReq.Plan.ToExecute}
174+
}
175+
176+
log.Printf("Resuming workflow '%s'", workflowName)
177+
172178
return handleWorkflowInvocation(e, req)
173179
}
174180

internal/client/types.go

Lines changed: 0 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -28,12 +28,6 @@ type WorkflowInvocationRequest struct {
2828
Async bool
2929
}
3030

31-
// WorkflowInvocationResumeRequest is a request to resume the execution of a workflow (typically on a remote node)
32-
type WorkflowInvocationResumeRequest struct {
33-
ReqId string
34-
WorkflowInvocationRequest
35-
}
36-
3731
type WorkflowCreationRequest struct {
3832
Name string // Name of the new workflow
3933
ASLSrc string // Specification source in Amazon State Language (encoded in Base64)

internal/workflow/offloading_policy.go

Lines changed: 8 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -3,12 +3,17 @@ package workflow
33
type OffloadingDecision struct {
44
Offload bool `json:"offload"`
55
RemoteHost string `json:"remote_host"`
6+
ExecutionPlan
67
}
78

89
type OffloadingPolicy interface {
910
Evaluate(r *Request, p *Progress) (OffloadingDecision, error)
1011
}
1112

13+
type ExecutionPlan struct {
14+
ToExecute []TaskId
15+
}
16+
1217
type NoOffloadingPolicy struct{}
1318

1419
func (policy *NoOffloadingPolicy) Evaluate(r *Request, p *Progress) (OffloadingDecision, error) {
@@ -32,8 +37,9 @@ func (policy *SimpleOffloadingPolicy) Evaluate(r *Request, p *Progress) (Offload
3237
}
3338
}
3439

35-
if completed >= 2 {
36-
return OffloadingDecision{true, "127.0.0.1:1323"}, nil
40+
if completed >= 2 && completed < 4 {
41+
plan := ExecutionPlan{ToExecute: p.ReadyToExecute} // TODO
42+
return OffloadingDecision{true, "127.0.0.1:1323", plan}, nil
3743
}
3844

3945
return OffloadingDecision{Offload: false}, nil

internal/workflow/request.go

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,8 @@ type Request struct {
1818
QoS function.RequestQoS // every function should have its QoS
1919
CanDoOffloading bool // every function inherits this flag
2020
Async bool
21-
Resuming bool // indicating whether the function is resuming from a previous (partial) execution
21+
Resuming bool // indicating whether the function is resuming from a previous (partial) execution
22+
Plan *ExecutionPlan // optional; execution plan
2223
}
2324

2425
func NewRequest(reqId string, workflow *Workflow, params map[string]interface{}) *Request {

internal/workflow/workflow.go

Lines changed: 86 additions & 49 deletions
Original file line numberDiff line numberDiff line change
@@ -12,16 +12,26 @@ import (
1212
"sort"
1313
"time"
1414

15-
"github.com/labstack/gommon/log"
1615
"github.com/serverledge-faas/serverledge/internal/cache"
1716
"github.com/serverledge-faas/serverledge/utils"
17+
"log"
1818

1919
"github.com/serverledge-faas/serverledge/internal/asl"
2020
"github.com/serverledge-faas/serverledge/internal/function"
2121
"github.com/serverledge-faas/serverledge/internal/types"
2222
)
2323

24-
var offloadingPolicy OffloadingPolicy = &NoOffloadingPolicy{} // TODO: handle initialization elsewhere
24+
// WorkflowInvocationResumeRequest is a request to resume the execution of a workflow (typically on a remote node)
25+
// TODO: move in another file?
26+
type WorkflowInvocationResumeRequest struct {
27+
ReqId string
28+
client.WorkflowInvocationRequest
29+
Plan ExecutionPlan
30+
}
31+
32+
var offloadingPolicy OffloadingPolicy = &SimpleOffloadingPolicy{}
33+
34+
//&NoOffloadingPolicy{} // TODO: handle initialization elsewhere
2535

2636
// Workflow is a Workflow to drive the execution of the workflow
2737
type Workflow struct {
@@ -58,22 +68,6 @@ func (workflow *Workflow) add(task Task) {
5868
workflow.Tasks[task.GetId()] = task // if already exists, overwrites!
5969
}
6070

61-
func isTaskPresent(task Task, infos []Task) bool {
62-
isPresent := false
63-
for _, taskInfo := range infos {
64-
if taskInfo == task {
65-
isPresent = true
66-
break
67-
}
68-
}
69-
return isPresent
70-
}
71-
72-
func isEndTask(task Task) bool {
73-
_, ok := task.(*EndTask)
74-
return ok
75-
}
76-
7771
func (w *Workflow) GetPreviousTasks(task Task) []TaskId {
7872
if w.prevTasks == nil {
7973
w.computePreviousTasks()
@@ -242,9 +236,22 @@ func (workflow *Workflow) doNothingExec(progress *Progress, input *PartialData,
242236
func (workflow *Workflow) Execute(r *Request, input *PartialData, progress *Progress) (*PartialData, *Progress, bool, error) {
243237
var output *PartialData
244238
var err error
245-
nextTasks := progress.ReadyToExecute
246239
shouldContinue := true
247240

241+
var nextTasks []TaskId
242+
243+
if r.Plan == nil {
244+
nextTasks = progress.ReadyToExecute
245+
} else {
246+
for _, t := range progress.ReadyToExecute {
247+
for _, other := range r.Plan.ToExecute {
248+
if t == other {
249+
nextTasks = append(nextTasks, t)
250+
}
251+
}
252+
}
253+
}
254+
248255
if len(nextTasks) > 1 {
249256
// TODO: revise this whole function to pop next tasks one at a time
250257
output, progress, err = workflow.executeParallel(progress, input, nextTasks, r)
@@ -282,7 +289,14 @@ func (workflow *Workflow) Execute(r *Request, input *PartialData, progress *Prog
282289
return output, progress, false, err
283290
}
284291
} else {
285-
// should never happen
292+
err = SaveProgress(progress)
293+
if err != nil {
294+
return nil, progress, false, err
295+
}
296+
err = SavePartialData(input)
297+
if err != nil {
298+
return nil, progress, false, err
299+
}
286300
return nil, progress, false, nil
287301
}
288302

@@ -442,18 +456,45 @@ func (workflow *Workflow) Invoke(r *Request) error {
442456
for shouldContinue {
443457
decision, err := offloadingPolicy.Evaluate(r, progress)
444458
if err == nil && decision.Offload {
445-
err = offload(r, decision.RemoteHost, progress, pd)
459+
460+
err := SaveProgress(progress)
461+
if err != nil {
462+
return fmt.Errorf("Could not save progress: %v", err)
463+
}
464+
err = SavePartialData(pd)
465+
if err != nil {
466+
return fmt.Errorf("Could not save partial data: %v", err)
467+
}
468+
469+
shouldContinue, err = offload(r, &decision)
446470
if err != nil {
447471
return err
448472
}
449-
shouldContinue = false
473+
474+
log.Printf("Offloading done. Should continue: %v", shouldContinue)
475+
476+
if shouldContinue {
477+
progress, err = RetrieveProgress(requestId)
478+
if err != nil {
479+
return fmt.Errorf("Could not retrieve progress after offloading: %v", err)
480+
}
481+
pds, err := RetrievePartialData(requestId, progress.ReadyToExecute[0])
482+
if err != nil {
483+
return fmt.Errorf("Could not retrieve partial data: %v", err)
484+
}
485+
if len(pds) != 1 {
486+
return fmt.Errorf("expected 1 partial data for next task: %v", progress.ReadyToExecute[0])
487+
}
488+
pd = pds[0] // TODO: to be updated when refactoring parallel orchestration
489+
log.Printf("Ready to execute after offloading: %v", progress.ReadyToExecute)
490+
}
450491
} else {
451492
pd, progress, shouldContinue, err = workflow.Execute(r, pd, progress)
452493
if err != nil {
453494
return fmt.Errorf("failed workflow execution: %v", err)
454495
}
455496

456-
if !shouldContinue {
497+
if !shouldContinue && pd != nil {
457498
r.ExecReport.Result = pd.Data
458499
}
459500
}
@@ -465,64 +506,60 @@ func (workflow *Workflow) Invoke(r *Request) error {
465506
return nil
466507
}
467508

468-
func offload(r *Request, hostPort string, progress *Progress, pd *PartialData) error {
509+
func offload(r *Request, policyDecision *OffloadingDecision) (bool, error) {
469510

470-
err := SaveProgress(progress)
471-
if err != nil {
472-
return fmt.Errorf("Could not save progress: %v", err)
473-
}
474-
err = SavePartialData(pd)
475-
if err != nil {
476-
return fmt.Errorf("Could not save partial data: %v", err)
477-
}
511+
shouldContinue := false
512+
513+
log.Printf("Offloading decision: %v", policyDecision)
478514

479-
request := client.WorkflowInvocationResumeRequest{
515+
request := WorkflowInvocationResumeRequest{
480516
ReqId: r.Id,
481517
WorkflowInvocationRequest: client.WorkflowInvocationRequest{
482518
Params: r.Params,
483519
CanDoOffloading: false,
484-
Async: r.Async,
520+
Async: false, // we force a synchronous request
485521
},
522+
Plan: policyDecision.ExecutionPlan,
486523
}
487524
invocationBody, err := json.Marshal(request)
488525
if err != nil {
489-
return fmt.Errorf("JSON marshaling failed: %v", err)
526+
return shouldContinue, fmt.Errorf("JSON marshaling failed: %v", err)
490527
}
491528

492529
// Send invocation request
493-
url := fmt.Sprintf("http://%s/workflow/resume/%s", hostPort, r.W.Name)
530+
url := fmt.Sprintf("http://%s/workflow/resume/%s", policyDecision.RemoteHost, r.W.Name)
494531
resp, err := utils.PostJson(url, invocationBody)
495532
if err != nil {
496-
return fmt.Errorf("HTTP request for offloading failed: %v", err)
533+
return shouldContinue, fmt.Errorf("HTTP request for offloading failed: %v", err)
497534
}
498535

499536
if resp.StatusCode != http.StatusOK {
500-
return fmt.Errorf("failed offloaded workflow: %v", err)
501-
}
502-
503-
if r.Async {
504-
// no need to parse and return response
505-
return nil
537+
return shouldContinue, fmt.Errorf("failed offloaded workflow: %v", err)
506538
}
507539

508540
var response InvocationResponse
509541
body, _ := io.ReadAll(resp.Body)
510542
err = json.Unmarshal(body, &response)
511543
if err != nil {
512-
return fmt.Errorf("Failed InvocationResponse unmarshaling: %v", err)
544+
return shouldContinue, fmt.Errorf("Failed InvocationResponse unmarshaling: %v", err)
513545
}
514546

515547
if !response.Success {
516-
return fmt.Errorf("failed offloaded workflow: %v", err)
548+
return shouldContinue, fmt.Errorf("failed offloaded workflow: %v", err)
517549
}
518550

519-
r.ExecReport.Result = response.Result
520-
521551
for k, v := range response.Reports {
522552
r.ExecReport.Reports[k] = v
523553
}
524554

525-
return nil
555+
if response.Result == nil {
556+
// workflow execution is not complete after offloading
557+
shouldContinue = true
558+
} else {
559+
r.ExecReport.Result = response.Result
560+
}
561+
562+
return shouldContinue, nil
526563
}
527564

528565
// Delete removes the Workflow from cache and from etcd, so it cannot be invoked anymore
@@ -554,7 +591,7 @@ func (workflow *Workflow) Exists() bool {
554591
if err.Error() == fmt.Sprintf("failed to retrieve value for key %s", getEtcdKey(workflow.Name)) {
555592
return false
556593
} else {
557-
log.Error(err.Error())
594+
log.Printf("ERROR: %v", err.Error())
558595
return false
559596
}
560597
}

0 commit comments

Comments
 (0)