Skip to content

Commit f38280e

Browse files
matnargrussorusso
andcommitted
WIP: introduce workflow offloading
Co-authored-by: Gabriele Russo Russo <gabri.russo17@gmail.com>
1 parent a624b03 commit f38280e

2 files changed

Lines changed: 31 additions & 4 deletions

File tree

internal/workflow/request.go

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,6 @@ package workflow
33
import (
44
"time"
55

6-
"github.com/cornelk/hashmap"
76
"github.com/serverledge-faas/serverledge/internal/function"
87
)
98

@@ -19,6 +18,7 @@ type Request struct {
1918
QoS function.RequestQoS // every function should have its QoS
2019
CanDoOffloading bool // every function inherits this flag
2120
Async bool
21+
Resuming bool // indicating whether the function is resuming from a previous (partial) execution
2222
}
2323

2424
func NewRequest(reqId string, workflow *Workflow, params map[string]interface{}) *Request {
@@ -28,10 +28,11 @@ func NewRequest(reqId string, workflow *Workflow, params map[string]interface{})
2828
Params: params,
2929
Arrival: time.Now(),
3030
ExecReport: ExecutionReport{
31-
Reports: hashmap.New[ExecutionReportId, *function.ExecutionReport](), // make(map[ExecutionReportId]*function.ExecutionReport),
31+
Reports: map[string]*function.ExecutionReport{},
3232
},
3333
CanDoOffloading: true,
3434
Async: false,
35+
Resuming: false,
3536
}
3637
}
3738

internal/workflow/workflow.go

Lines changed: 28 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -428,11 +428,37 @@ func (workflow *Workflow) Invoke(r *Request) (ExecutionReport, error) {
428428
var err error
429429
requestId := ReqId(r.Id)
430430

431-
progress := InitProgress(requestId, workflow)
432-
pd := NewPartialData(requestId, workflow.Start.Next, "", r.Params)
431+
var progress *Progress
432+
var pd *PartialData
433+
434+
// TODO: move into a function?
435+
if !r.Resuming {
436+
progress = InitProgress(requestId, workflow)
437+
pd = NewPartialData(requestId, workflow.Start.Next, "", r.Params)
438+
} else {
439+
var found bool
440+
progress, found = RetrieveProgress(requestId, true)
441+
if !found {
442+
return ExecutionReport{}, fmt.Errorf("failed to retrieve workflow progress: %v", requestId)
443+
}
444+
if len(progress.ReadyToExecute) == 0 {
445+
return ExecutionReport{}, fmt.Errorf("workflow resumed but no task is ready for execution: %v", requestId)
446+
} else if len(progress.ReadyToExecute) > 1 {
447+
// TODO: manage case when len is > 1 (e.g., parallel branches)
448+
return ExecutionReport{}, fmt.Errorf("workflow resumed with multiple tasks ready for execution not yet implemented!: %v", requestId)
449+
}
450+
451+
pd, err = RetrieveSinglePartialData(requestId, progress.ReadyToExecute[0], true)
452+
if err != nil {
453+
return ExecutionReport{}, fmt.Errorf("workflow resumed but unable to retrieve partial data of next task: %v", requestId)
454+
}
455+
}
433456

434457
shouldContinue := true
435458
for shouldContinue {
459+
// TODO: introduce a workflow offloading policy
460+
461+
// TODO: if local execution:
436462
// executing workflow
437463
pd, progress, shouldContinue, err = workflow.Execute(r, pd, progress)
438464
if err != nil {

0 commit comments

Comments
 (0)