Skip to content

Commit cd53638

Browse files
committed
Test resuming workflow
1 parent e30eb67 commit cd53638

4 files changed

Lines changed: 59 additions & 4 deletions

File tree

internal/test/workflow_integration_test.go

Lines changed: 52 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -114,7 +114,9 @@ func TestInvokeFC(t *testing.T) {
114114

115115
// check result
116116
output := resultMap.Result[f.Signature.GetOutputs()[0].Name]
117-
u.AssertEquals(t, length, output.(int))
117+
if length != int(output.(float64)) {
118+
t.FailNow()
119+
}
118120

119121
// cleaning up function composition and function
120122
err3 := wflow.Delete()
@@ -564,3 +566,52 @@ func TestInvokeWorkflowPassDoNothing(t *testing.T) {
564566
u.AssertNilMsg(t, err, "Result not found")
565567
u.AssertEquals(t, 3, result)
566568
}
569+
570+
// TestResumeWorkflow offloads the execution of an invocation request to the same node
571+
func TestResumeWorkflow(t *testing.T) {
572+
573+
if testing.Short() {
574+
t.Skip("Skipping integration test")
575+
}
576+
577+
workflowName := "test"
578+
// CREATE - we create a test function composition
579+
length := 5
580+
f, fArr, err := initializeSameFunctionSlice(length, "js")
581+
u.AssertNil(t, err)
582+
wflow, err := CreateSequenceWorkflow(fArr...)
583+
wflow.Name = workflowName
584+
u.AssertNil(t, err)
585+
err1 := wflow.Save()
586+
u.AssertNil(t, err1)
587+
588+
// INVOKE - we call the function composition
589+
params := make(map[string]interface{})
590+
params[f.Signature.GetInputs()[0].Name] = 0
591+
592+
request := workflow.NewRequest(shortuuid.New(), wflow, params)
593+
594+
progress := workflow.InitProgress(workflow.ReqId(request.Id), wflow)
595+
pd := workflow.NewPartialData(workflow.ReqId(request.Id), wflow.Start.Next, "", request.Params)
596+
597+
err = workflow.SaveProgress(progress, true)
598+
u.AssertNil(t, err)
599+
err = workflow.SavePartialData(pd, true)
600+
u.AssertNil(t, err)
601+
602+
resumedRequest := workflow.NewRequest(request.Id, wflow, params)
603+
resumedRequest.Resuming = true
604+
605+
resultMap, err2 := wflow.Invoke(resumedRequest)
606+
u.AssertNil(t, err2)
607+
608+
// check result
609+
output := resultMap.Result[f.Signature.GetOutputs()[0].Name]
610+
if length != int(output.(float64)) {
611+
t.FailNow()
612+
}
613+
614+
// cleaning up function composition and function
615+
err3 := wflow.Delete()
616+
u.AssertNil(t, err3)
617+
}

internal/workflow/partial_data.go

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@ import (
55
"encoding/json"
66
"errors"
77
"fmt"
8+
"log"
89
"sync"
910
"time"
1011

@@ -230,7 +231,10 @@ func savePartialDataToEtcd(pd *PartialData) error {
230231
pdEtcdMutex.Lock()
231232
defer pdEtcdMutex.Unlock()
232233
// saves the json object into etcd
233-
_, err = cli.Put(ctx, getPartialDataEtcdKey(pd.ReqId, pd.ForTask), string(payload))
234+
key := getPartialDataEtcdKey(pd.ReqId, pd.ForTask)
235+
log.Printf("Saving PD on etcd with key: %s\n", key)
236+
237+
_, err = cli.Put(ctx, key, string(payload))
234238
if err != nil {
235239
return fmt.Errorf("failed etcd Put partial data: %v", err)
236240
}

internal/workflow/progress.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -207,7 +207,7 @@ func InitProgress(reqId ReqId, workflow *Workflow) *Progress {
207207
ReadyToExecute: make([]TaskId, 0),
208208
}
209209

210-
p.ReadyToExecute = append(p.ReadyToExecute, workflow.Start.GetId())
210+
p.ReadyToExecute = append(p.ReadyToExecute, workflow.Start.Next)
211211

212212
return p
213213
}

internal/workflow/workflow.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -452,7 +452,7 @@ func (workflow *Workflow) Invoke(r *Request) (ExecutionReport, error) {
452452

453453
pd, err = RetrieveSinglePartialData(requestId, progress.ReadyToExecute[0], true)
454454
if err != nil {
455-
return ExecutionReport{}, fmt.Errorf("workflow resumed but unable to retrieve partial data of next task: %v", requestId)
455+
return ExecutionReport{}, fmt.Errorf("workflow resumed but unable to retrieve partial data of next task: %v", progress.ReadyToExecute[0])
456456
}
457457
}
458458

0 commit comments

Comments
 (0)