Skip to content

Commit 104423e

Browse files
committed
Improved Progress/TaskData saving
1 parent a18ce44 commit 104423e

4 files changed

Lines changed: 29 additions & 11 deletions

File tree

internal/config/keys.go

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -87,6 +87,8 @@ const TRACING_ENABLED = "tracing.enabled"
8787
// Custom output file for traces
8888
const TRACING_OUTFILE = "tracing.outfile"
8989

90+
const WORKFLOW_ALWAYS_SAVE_PROGRESS = "workflow.always_save_progress"
91+
9092
// Workflow offloading policy to use
9193
// Possible values: "disable", "ilp"
9294
const WORKFLOW_OFFLOADING_POLICY = "workflow.offloading.policy"

internal/workflow/progress.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -193,7 +193,7 @@ func (p *Progress) Save() error {
193193
}
194194
// saves the json object into etcd
195195
key := getProgressEtcdKey(p.ReqId)
196-
log.Printf("Saving progress with key: %s", key)
196+
log.Printf("[Rq-%v] Saving progress - bytes: %d", p.ReqId, len(payload))
197197

198198
_, err = cli.Put(ctx, key, string(payload))
199199
if err != nil {

internal/workflow/task_data.go

Lines changed: 7 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@ import (
66
"errors"
77
"fmt"
88
clientv3 "go.etcd.io/etcd/client/v3"
9+
"log"
910
"time"
1011

1112
"github.com/serverledge-faas/serverledge/utils"
@@ -60,17 +61,19 @@ func (td *TaskData) Save(reqId ReqId, task TaskId) error {
6061
return err
6162
}
6263
ctx := context.TODO()
63-
// marshal the progress object into json
64+
// marshal task data object into json
6465
payload, err := json.Marshal(td)
6566
if err != nil {
66-
return fmt.Errorf("could not marshal progress: %v", err)
67+
return fmt.Errorf("could not marshal task data: %v", err)
6768
}
6869
// saves the json object into etcd
6970
key := getTaskDataEtcdKey(reqId, task)
7071

72+
log.Printf("[Rq-%v] Saving task data to etcd - key: %s - bytes: %d", reqId, key, len(payload))
7173
_, err = cli.Put(ctx, key, string(payload))
7274
if err != nil {
73-
utils.TryEtcdReconnection()
75+
log.Printf("[Rq-%v] Could not save task data due to failed Put()...retrying Etcd connection: %v", reqId, err)
76+
utils.TriggerEtcdReconnection()
7477
return fmt.Errorf("failed etcd Put partial data: %v", err)
7578
}
7679
return nil
@@ -86,7 +89,7 @@ func RetrievePartialData(reqId ReqId, task TaskId) (*TaskData, error) {
8689
key := getTaskDataEtcdKey(reqId, task)
8790
getResponse, err := cli.Get(ctx, key)
8891
if err != nil {
89-
utils.TryEtcdReconnection()
92+
utils.TriggerEtcdReconnection()
9093
return nil, fmt.Errorf("failed to retrieve PD for requestId %s: %v", key, err)
9194
}
9295
if len(getResponse.Kvs) < 1 {

internal/workflow/workflow.go

Lines changed: 19 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -425,6 +425,8 @@ func (wflow *Workflow) savePartialDataForReadyTasks(requestId ReqId, progress *P
425425
// Invoke schedules each function of the workflow and invokes them
426426
func (wflow *Workflow) Invoke(r *Request) error {
427427

428+
alwaysSaveProgress := config.GetBool(config.WORKFLOW_ALWAYS_SAVE_PROGRESS, false)
429+
428430
var err error
429431
requestId := ReqId(r.Id)
430432

@@ -437,9 +439,11 @@ func (wflow *Workflow) Invoke(r *Request) error {
437439
dataMap := make(map[TaskId]*TaskData)
438440

439441
if len(progress.ReadyToExecute) == 0 {
440-
return fmt.Errorf("wflow resumed but no task is ready for execution: %v", requestId)
442+
return fmt.Errorf("[Rq-%v] wflow resumed but no task is ready for execution", requestId)
441443
}
442444

445+
log.Printf("[Rq-%v] Starting/resuming execution (%d to executed)", requestId, len(progress.ReadyToExecute))
446+
443447
for len(progress.ReadyToExecute) > 0 {
444448
t0 := time.Now()
445449
decision, err := offloadingPolicy.Evaluate(r, progress)
@@ -450,7 +454,7 @@ func (wflow *Workflow) Invoke(r *Request) error {
450454
return fmt.Errorf("an error occurred in policy evaluation: %v", err)
451455
}
452456

453-
if decision.Offload {
457+
if decision.Offload || alwaysSaveProgress {
454458
err := progress.Save()
455459
if err != nil {
456460
return fmt.Errorf("Could not save progress: %v", err)
@@ -462,24 +466,26 @@ func (wflow *Workflow) Invoke(r *Request) error {
462466
return fmt.Errorf("Could not save partial data: %v", err)
463467
}
464468

465-
log.Printf("Offloading request: %v", requestId)
469+
}
466470

471+
if decision.Offload {
467472
err = offload(r, &decision)
468473
if err != nil {
469474
return err
470475
}
471476

472477
if r.ExecReport.Result != nil {
473478
// Workflow execution has completed on remote node
474-
break
479+
log.Printf("[Rq-%v] Workflow has completed on remote node", requestId)
480+
return nil
475481
}
476482

477483
progress, err = RetrieveProgress(requestId)
478484
if err != nil {
479485
return fmt.Errorf("Could not retrieve progress after offloading: %v", err)
480486
}
481487

482-
log.Printf("Ready to execute after offloading: %v", progress.ReadyToExecute)
488+
log.Printf("[Rq-%v] Ready to execute after offloading: %v", requestId, progress.ReadyToExecute)
483489
} else {
484490
// pick next executable task
485491
var taskToExecute TaskId = ""
@@ -489,9 +495,12 @@ func (wflow *Workflow) Invoke(r *Request) error {
489495
}
490496
}
491497
if taskToExecute == "" {
498+
log.Printf("[Rq-%v] Workflow has not completed but there is nothing left to execute in the plan", requestId)
492499
break
493500
}
494501

502+
log.Printf("[Rq-%v] Now going to execute %s", requestId, taskToExecute)
503+
495504
// Prepare input for taskToExecute
496505
var input *TaskData
497506
if wflow.Tasks[taskToExecute].GetType() == Start {
@@ -530,11 +539,15 @@ func (wflow *Workflow) Invoke(r *Request) error {
530539
}
531540
}
532541

542+
log.Printf("[Rq-%v] Executed %s", requestId, taskToExecute)
543+
533544
dataMap[taskToExecute] = output
534545

535546
if len(progress.ReadyToExecute) == 0 && output != nil {
536547
r.ExecReport.Result = output.Data
537548

549+
log.Printf("[Rq-%v] Workflow completed", requestId)
550+
538551
if isProgressOnEtcd {
539552
err = DeleteProgress(requestId)
540553
if err != nil {
@@ -568,7 +581,7 @@ func (wflow *Workflow) Invoke(r *Request) error {
568581

569582
func offload(r *Request, policyDecision *OffloadingDecision) error {
570583

571-
log.Printf("Offloading decision: %v", policyDecision)
584+
log.Printf("[Rq-%v] Offloading decision: %v", r.Id, policyDecision)
572585

573586
request := WorkflowInvocationResumeRequest{
574587
ReqId: r.Id,

0 commit comments

Comments
 (0)