Skip to content

Commit 30dceb7

Browse files
committed
Avoid returning ExecutionReport in Invoke()
1 parent e92905b commit 30dceb7

4 files changed

Lines changed: 48 additions & 48 deletions

File tree

internal/api/workflow.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -206,7 +206,7 @@ func handleWorkflowInvocation(e echo.Context, req *workflow.Request) error {
206206

207207
if req.Async {
208208
go func() {
209-
_, errInvoke := req.W.Invoke(req)
209+
errInvoke := req.W.Invoke(req)
210210

211211
defer workflowInvocationRequestPool.Put(req)
212212

@@ -229,7 +229,7 @@ func handleWorkflowInvocation(e echo.Context, req *workflow.Request) error {
229229
}
230230

231231
// Synchronous execution of the workflow
232-
_, err := req.W.Invoke(req)
232+
err := req.W.Invoke(req)
233233

234234
defer workflowInvocationRequestPool.Put(req)
235235

internal/test/aslparser_test.go

Lines changed: 13 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -61,7 +61,7 @@ func commonTest(t *testing.T, name string, expectedResult int) {
6161
params := make(map[string]interface{})
6262
params["input"] = "0"
6363
request := workflow.NewRequest(shortuuid.New(), comp, params)
64-
_, err2 := comp.Invoke(request)
64+
err2 := comp.Invoke(request)
6565
utils.AssertNil(t, err2)
6666
}
6767

@@ -124,9 +124,11 @@ func TestParsingChoiceWorkflowWithDefaultFail(t *testing.T) {
124124
params := make(map[string]interface{})
125125
params[incFn.Signature.GetInputs()[0].Name] = 10
126126
request := workflow.NewRequest(shortuuid.New(), comp, params)
127-
resultMap, err2 := comp.Invoke(request)
127+
err2 := comp.Invoke(request)
128128
utils.AssertNil(t, err2)
129129

130+
resultMap := request.ExecReport
131+
130132
expectedKey := "DefaultStateError"
131133
expectedValue := "No Matches!"
132134

@@ -167,28 +169,28 @@ func TestParsingChoiceWorkflowWithDataTestExpr(t *testing.T) {
167169
params1 := make(map[string]interface{})
168170
params1[incFn.Signature.GetInputs()[0].Name] = 1
169171
request1 := workflow.NewRequest(shortuuid.New(), comp, params1)
170-
resultMap1, err1 := comp.Invoke(request1)
172+
err1 := comp.Invoke(request1)
171173
utils.AssertNil(t, err1)
172174

173175
// checks that output is (1+1)*2=4
174-
output := resultMap1.Result[incFn.Signature.GetOutputs()[0].Name]
176+
output := request1.ExecReport.Result[incFn.Signature.GetOutputs()[0].Name]
175177
utils.AssertEquals(t, 4, output.(int))
176178
// runs the workflow (2nd choice branch) test: (input == 2)
177179
params2 := make(map[string]interface{})
178180
params2[incFn.Signature.GetInputs()[0].Name] = 2
179181
request2 := workflow.NewRequest(shortuuid.New(), comp, params2)
180-
resultMap, err2 := comp.Invoke(request2)
182+
err2 := comp.Invoke(request2)
181183
utils.AssertNil(t, err2)
182184

183185
// check that output is 2*2*2 = 8
184-
output2 := resultMap.Result[incFn.Signature.GetOutputs()[0].Name]
186+
output2 := request2.ExecReport.Result[incFn.Signature.GetOutputs()[0].Name]
185187
utils.AssertEquals(t, 8, output2.(int))
186188

187189
// runs the workflow (default choice branch)
188190
paramsDefault := make(map[string]interface{})
189191
paramsDefault[incFn.Signature.GetInputs()[0].Name] = "Giacomo"
190192
requestDefault := workflow.NewRequest(shortuuid.New(), comp, paramsDefault)
191-
resultMap, errDef := comp.Invoke(requestDefault)
193+
errDef := comp.Invoke(requestDefault)
192194
utils.AssertNil(t, errDef)
193195

194196
deleteApiTest(t, "inc", HOST, PORT)
@@ -217,11 +219,11 @@ func TestParsingChoiceWorkflowWithBoolExpr(t *testing.T) {
217219
params["value"] = 1
218220
//params["input"] = 1
219221
request := workflow.NewRequest(shortuuid.New(), comp, params)
220-
resultMap, err1 := comp.Invoke(request)
222+
err1 := comp.Invoke(request)
221223
utils.AssertNil(t, err1)
222224

223225
// checks the result (1+1+1 = 3)
224-
output, err := GetIntSingleResult(&resultMap)
226+
output, err := GetIntSingleResult(&request.ExecReport)
225227
utils.AssertNilMsg(t, err, "failed to get int single result")
226228
utils.AssertEquals(t, 3, output)
227229

@@ -230,11 +232,11 @@ func TestParsingChoiceWorkflowWithBoolExpr(t *testing.T) {
230232
params2["type"] = "Private"
231233
params2["value"] = 20
232234
request2 := workflow.NewRequest(shortuuid.New(), comp, params2)
233-
resultMap2, err2 := comp.Invoke(request2)
235+
err2 := comp.Invoke(request2)
234236
utils.AssertNil(t, err2)
235237

236238
// checks the result (20*2+1 = 41)
237-
output2, err := GetIntSingleResult(&resultMap2)
239+
output2, err := GetIntSingleResult(&request2.ExecReport)
238240
utils.AssertNilMsg(t, err, "failed to get int single result")
239241
utils.AssertEquals(t, 41, output2)
240242

internal/test/workflow_integration_test.go

Lines changed: 24 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -111,11 +111,11 @@ func TestInvokeFC(t *testing.T) {
111111
request := workflow.NewRequest(shortuuid.New(), wflow, params)
112112
request.CanDoOffloading = false
113113

114-
resultMap, err2 := wflow.Invoke(request)
114+
err2 := wflow.Invoke(request)
115115
u.AssertNil(t, err2)
116116

117117
// check result
118-
output := cast.ToInt(resultMap.Result[f.Signature.GetOutputs()[0].Name])
118+
output := cast.ToInt(request.ExecReport.Result[f.Signature.GetOutputs()[0].Name])
119119
if length != output {
120120
t.FailNow()
121121
}
@@ -167,10 +167,10 @@ func TestInvokeChoiceFC(t *testing.T) {
167167

168168
request := workflow.NewRequest(shortuuid.New(), wflow, params)
169169
request.CanDoOffloading = false
170-
resultMap, err2 := wflow.Invoke(request)
170+
err2 := wflow.Invoke(request)
171171
u.AssertNil(t, err2)
172172
// checking the result, should be input + 1
173-
output := cast.ToInt(resultMap.Result[f.Signature.GetOutputs()[0].Name])
173+
output := cast.ToInt(request.ExecReport.Result[f.Signature.GetOutputs()[0].Name])
174174
u.AssertEquals(t, input*2, output)
175175

176176
// cleaning up function composition and function
@@ -216,15 +216,15 @@ func TestInvokeFC_DifferentFunctions(t *testing.T) {
216216
params[fDouble.Signature.GetInputs()[0].Name] = 2
217217
request := workflow.NewRequest(shortuuid.New(), wflow, params)
218218
request.CanDoOffloading = false
219-
resultMap, err2 := wflow.Invoke(request)
219+
err2 := wflow.Invoke(request)
220220
if err2 != nil {
221221
log.Printf("%v\n", err2)
222222
t.FailNow()
223223
}
224224
u.AssertNil(t, err2)
225225

226226
// check result
227-
output := cast.ToInt(resultMap.Result[fInc.Signature.GetOutputs()[0].Name])
227+
output := cast.ToInt(request.ExecReport.Result[fInc.Signature.GetOutputs()[0].Name])
228228
if output != 11 {
229229
t.FailNow()
230230
}
@@ -261,11 +261,11 @@ func TestInvokeFC_BroadcastFanOut(t *testing.T) {
261261
params[fDouble.Signature.GetInputs()[0].Name] = 1
262262
request := workflow.NewRequest(shortuuid.New(), wflow, params)
263263
request.CanDoOffloading = false
264-
resultMap, err2 := wflow.Invoke(request)
264+
err2 := wflow.Invoke(request)
265265
u.AssertNil(t, err2)
266266

267267
// check multiple result
268-
output := resultMap.Result
268+
output := request.ExecReport.Result
269269

270270
u.AssertNonNil(t, output)
271271
for i := 0; i < width; i++ {
@@ -324,10 +324,10 @@ func TestInvokeFC_Concurrent(t *testing.T) {
324324
// wait until all goroutines are ready
325325
<-start
326326
// return error
327-
resultMap, err2 := wflow.Invoke(request)
327+
err2 := wflow.Invoke(request)
328328
errChan <- err2
329329
// return result
330-
output := resultMap.Result[f.Signature.GetOutputs()[0].Name]
330+
output := request.ExecReport.Result[f.Signature.GetOutputs()[0].Name]
331331
resultChan <- output
332332
}(i, resultChan, errChan, start)
333333
}
@@ -380,11 +380,11 @@ func TestInvokeFC_ScatterFanOut(t *testing.T) {
380380
params[fDouble.Signature.GetInputs()[0].Name] = []int{1, 2, 3}
381381
request := workflow.NewRequest(shortuuid.New(), wflow, params)
382382
request.CanDoOffloading = false
383-
resultMap, err2 := wflow.Invoke(request)
383+
err2 := wflow.Invoke(request)
384384
u.AssertNil(t, err2)
385385

386386
// check multiple result
387-
output := resultMap.Result
387+
output := request.ExecReport.Result
388388
u.AssertNonNil(t, output)
389389
for i := 0; i < width; i++ {
390390
currOutput := output[fmt.Sprintf("%d", i)].(map[string]interface{})
@@ -442,11 +442,11 @@ func TestInvokeSieveChoice(t *testing.T) {
442442

443443
request := workflow.NewRequest(shortuuid.New(), wflow, params)
444444
request.CanDoOffloading = false
445-
resultMap, err2 := wflow.Invoke(request)
445+
err2 := wflow.Invoke(request)
446446
u.AssertNil(t, err2)
447447

448448
// checking the result
449-
output := resultMap.Result[sieveJs.Signature.GetOutputs()[1].Name]
449+
output := request.ExecReport.Result[sieveJs.Signature.GetOutputs()[1].Name]
450450
slice, err := u.ConvertToSlice(output)
451451
u.AssertNil(t, err)
452452

@@ -489,7 +489,7 @@ func TestInvokeWorkflowError(t *testing.T) {
489489

490490
request := workflow.NewRequest(shortuuid.New(), wflow, params)
491491
request.CanDoOffloading = false
492-
_, err2 := wflow.Invoke(request)
492+
err2 := wflow.Invoke(request)
493493
u.AssertNonNil(t, err2)
494494
}
495495

@@ -519,10 +519,10 @@ func TestInvokeWorkflowFailAndSucceed(t *testing.T) {
519519

520520
request := workflow.NewRequest(shortuuid.New(), wflow, params)
521521
request.CanDoOffloading = false
522-
resultMap, errInvoke1 := wflow.Invoke(request)
522+
errInvoke1 := wflow.Invoke(request)
523523
u.AssertNilMsg(t, errInvoke1, "error while invoking the branch (succeed)")
524524

525-
result, err := GetIntSingleResult(&resultMap)
525+
result, err := GetIntSingleResult(&request.ExecReport)
526526
u.AssertNilMsg(t, err, "Result not found")
527527
u.AssertEquals(t, 1, result)
528528

@@ -531,11 +531,11 @@ func TestInvokeWorkflowFailAndSucceed(t *testing.T) {
531531
params2["value"] = 2
532532

533533
request2 := workflow.NewRequest(shortuuid.New(), wflow, params2)
534-
request.CanDoOffloading = false
535-
resultMap2, errInvoke2 := wflow.Invoke(request2)
534+
request2.CanDoOffloading = false
535+
errInvoke2 := wflow.Invoke(request2)
536536
u.AssertNilMsg(t, errInvoke2, "error while invoking the branch (fail)")
537537

538-
valueError, found := resultMap2.Result["FakeError"]
538+
valueError, found := request2.ExecReport.Result["FakeError"]
539539
u.AssertTrueMsg(t, found, "FakeError not found")
540540
causeStr, ok := valueError.(string)
541541

@@ -568,10 +568,10 @@ func TestInvokeWorkflowPassDoNothing(t *testing.T) {
568568

569569
request := workflow.NewRequest(shortuuid.New(), wflow, params)
570570
request.CanDoOffloading = false
571-
resultMap, errInvoke1 := wflow.Invoke(request)
571+
errInvoke1 := wflow.Invoke(request)
572572
u.AssertNilMsg(t, errInvoke1, "error while invoking the composition with pass node")
573573

574-
result, err := GetIntSingleResult(&resultMap)
574+
result, err := GetIntSingleResult(&request.ExecReport)
575575
u.AssertNilMsg(t, err, "Result not found")
576576
u.AssertEquals(t, 3, result)
577577
}
@@ -612,11 +612,11 @@ func TestResumeWorkflow(t *testing.T) {
612612
resumedRequest.CanDoOffloading = true
613613
resumedRequest.Resuming = true
614614

615-
resultMap, err2 := wflow.Invoke(resumedRequest)
615+
err2 := wflow.Invoke(resumedRequest)
616616
u.AssertNil(t, err2)
617617

618618
// check result
619-
output := cast.ToInt(resultMap.Result[f.Signature.GetOutputs()[0].Name])
619+
output := cast.ToInt(resumedRequest.ExecReport.Result[f.Signature.GetOutputs()[0].Name])
620620
if length != output {
621621
t.FailNow()
622622
}

internal/workflow/workflow.go

Lines changed: 9 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -425,36 +425,35 @@ func (workflow *Workflow) Save() error {
425425
}
426426

427427
// Invoke schedules each function of the workflow and invokes them
428-
func (workflow *Workflow) Invoke(r *Request) (ExecutionReport, error) {
428+
func (workflow *Workflow) Invoke(r *Request) error {
429429

430430
var err error
431431
requestId := ReqId(r.Id)
432432

433433
var progress *Progress
434434
var pd *PartialData
435435

436-
// TODO: move into a function?
437436
if !r.Resuming {
438437
progress = InitProgress(requestId, workflow)
439438
pd = NewPartialData(requestId, workflow.Start.Id, r.Params)
440439
} else {
441440
progress, err = RetrieveProgress(requestId)
442441
if err != nil {
443-
return ExecutionReport{}, fmt.Errorf("failed to retrieve workflow progress: %v", err)
442+
return fmt.Errorf("failed to retrieve workflow progress: %v", err)
444443
}
445444
if len(progress.ReadyToExecute) == 0 {
446-
return ExecutionReport{}, fmt.Errorf("workflow resumed but no task is ready for execution: %v", requestId)
445+
return fmt.Errorf("workflow resumed but no task is ready for execution: %v", requestId)
447446
} else if len(progress.ReadyToExecute) > 1 {
448447
// TODO: manage case when len is > 1 (e.g., parallel branches)
449-
return ExecutionReport{}, fmt.Errorf("workflow resumed with multiple tasks ready for execution not yet implemented!: %v", requestId)
448+
return fmt.Errorf("workflow resumed with multiple tasks ready for execution not yet implemented!: %v", requestId)
450449
}
451450

452451
pds, err := RetrievePartialData(requestId, progress.ReadyToExecute[0])
453452
if err != nil {
454-
return ExecutionReport{}, fmt.Errorf("workflow resumed but unable to retrieve partial data of next task: %v", progress.ReadyToExecute[0])
453+
return fmt.Errorf("workflow resumed but unable to retrieve partial data of next task: %v", progress.ReadyToExecute[0])
455454
}
456455
if len(pds) != 1 {
457-
return ExecutionReport{}, fmt.Errorf("expected 1 partial data for next task: %v", progress.ReadyToExecute[0])
456+
return fmt.Errorf("expected 1 partial data for next task: %v", progress.ReadyToExecute[0])
458457
}
459458
pd = pds[0] // TODO: to be updated when refactoring parallel orchestration
460459
}
@@ -465,13 +464,13 @@ func (workflow *Workflow) Invoke(r *Request) (ExecutionReport, error) {
465464
if err == nil && decision.Offload {
466465
err = offload(r, decision.RemoteHost, progress, pd)
467466
if err != nil {
468-
return ExecutionReport{}, err
467+
return err
469468
}
470469
shouldContinue = false
471470
} else {
472471
pd, progress, shouldContinue, err = workflow.Execute(r, pd, progress)
473472
if err != nil {
474-
return ExecutionReport{}, fmt.Errorf("failed workflow execution: %v", err)
473+
return fmt.Errorf("failed workflow execution: %v", err)
475474
}
476475

477476
if !shouldContinue {
@@ -483,8 +482,7 @@ func (workflow *Workflow) Invoke(r *Request) (ExecutionReport, error) {
483482

484483
// TODO: delete progress if needed
485484

486-
// TODO: remove r.ExecReport
487-
return r.ExecReport, nil
485+
return nil
488486
}
489487

490488
func offload(r *Request, hostPort string, progress *Progress, pd *PartialData) error {

0 commit comments

Comments
 (0)