Skip to content

Commit 09daa6d

Browse files
committed
Minor fix
1 parent b1684de commit 09daa6d

3 files changed

Lines changed: 14 additions & 10 deletions

File tree

internal/api/workflow.go

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -171,6 +171,8 @@ func ResumeWorkflow(e echo.Context) error {
171171

172172
if clientReq.Plan.ToExecute != nil {
173173
req.Plan = &workflow.ExecutionPlan{ToExecute: clientReq.Plan.ToExecute}
174+
} else {
175+
req.Plan = nil
174176
}
175177

176178
log.Printf("Resuming workflow '%s'", workflowName)
@@ -201,6 +203,7 @@ func InvokeWorkflow(e echo.Context) error {
201203
req.QoS = clientReq.QoS
202204
req.CanDoOffloading = clientReq.CanDoOffloading
203205
req.Async = clientReq.Async
206+
req.Plan = nil
204207
req.Resuming = false
205208
req.Id = fmt.Sprintf("%v-%s%d", wflow.Name, node.NodeIdentifier[len(node.NodeIdentifier)-5:], req.Arrival.Nanosecond())
206209
req.ExecReport.Reports = map[string]*function.ExecutionReport{}
@@ -222,6 +225,7 @@ func handleWorkflowInvocation(e echo.Context, req *workflow.Request) error {
222225
return
223226
}
224227

228+
log.Printf("Invocation succeeded. Publishing: %v", req.ExecReport)
225229
req.ExecReport.ResponseTime = time.Now().Sub(req.Arrival).Seconds()
226230
workflow.PublishAsyncInvocationResponse(req.Id, workflow.InvocationResponse{
227231
Success: true,

internal/test/api_test.go

Lines changed: 7 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@ import (
44
"encoding/json"
55
"errors"
66
"fmt"
7+
"github.com/spf13/cast"
78
"strings"
89
"testing"
910
"time"
@@ -209,6 +210,8 @@ func TestAsyncInvokeWorkflow(t *testing.T) {
209210
t.Skip("Skipping integration test")
210211
}
211212
fcName := "sequence"
213+
//deleteWorkflowApiTest(t, fcName, HOST, PORT)
214+
212215
fn, err := InitializePyFunction("inc", "handler", function.NewSignature().
213216
AddInput("input", function.Int{}).
214217
AddOutput("result", function.Int{}).
@@ -238,8 +241,9 @@ func TestAsyncInvokeWorkflow(t *testing.T) {
238241
for {
239242
pollResult := pollWorkflowTest(t, reqIdStruct.ReqId, HOST, PORT)
240243

241-
var compExecReport workflow.ExecutionReport
242-
errUnmarshalExecResult := json.Unmarshal([]byte(pollResult), &compExecReport)
244+
fmt.Println(pollResult)
245+
var response workflow.InvocationResponse
246+
errUnmarshalExecResult := json.Unmarshal([]byte(pollResult), &response)
243247

244248
if errUnmarshalExecResult != nil {
245249
var unmarshalError *json.UnmarshalTypeError
@@ -249,17 +253,11 @@ func TestAsyncInvokeWorkflow(t *testing.T) {
249253
i++
250254
time.Sleep(200 * time.Millisecond)
251255
} else {
252-
result, err := GetSingleResult(&compExecReport)
253-
utils.AssertNilMsg(t, err, "failed to get single result")
254-
utils.AssertEquals(t, "4", result)
256+
utils.AssertEquals(t, 4, cast.ToInt(response.Result["result"]))
255257
break
256258
}
257259
}
258260

259-
// here we do not use REST API
260-
getFC, b := workflow.Get(fcName)
261-
utils.AssertTrue(t, b)
262-
utils.AssertTrueMsg(t, wflow.Equals(getFC), "composition comparison failed")
263261
err = wflow.Delete()
264262
utils.AssertNilMsg(t, err, "failed to delete composition")
265263
}

internal/workflow/workflow.go

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -29,7 +29,7 @@ type WorkflowInvocationResumeRequest struct {
2929
Plan ExecutionPlan
3030
}
3131

32-
var offloadingPolicy OffloadingPolicy = &SimpleOffloadingPolicy{}
32+
var offloadingPolicy OffloadingPolicy = &NoOffloadingPolicy{}
3333

3434
//&NoOffloadingPolicy{} // TODO: handle initialization elsewhere
3535

@@ -466,6 +466,8 @@ func (workflow *Workflow) Invoke(r *Request) error {
466466
return fmt.Errorf("Could not save partial data: %v", err)
467467
}
468468

469+
log.Printf("Offloading request: %v", requestId)
470+
469471
shouldContinue, err = offload(r, &decision)
470472
if err != nil {
471473
return err

0 commit comments

Comments
 (0)