Skip to content

Commit 55bbd04

Browse files
committed
Removes FanOut/FanIn; GetNext returns single Id
1 parent 09daa6d commit 55bbd04

19 files changed

Lines changed: 154 additions & 1005 deletions

internal/test/api_test.go

Lines changed: 0 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -210,7 +210,6 @@ func TestAsyncInvokeWorkflow(t *testing.T) {
210210
t.Skip("Skipping integration test")
211211
}
212212
fcName := "sequence"
213-
//deleteWorkflowApiTest(t, fcName, HOST, PORT)
214213

215214
fn, err := InitializePyFunction("inc", "handler", function.NewSignature().
216215
AddInput("input", function.Int{}).
@@ -241,7 +240,6 @@ func TestAsyncInvokeWorkflow(t *testing.T) {
241240
for {
242241
pollResult := pollWorkflowTest(t, reqIdStruct.ReqId, HOST, PORT)
243242

244-
fmt.Println(pollResult)
245243
var response workflow.InvocationResponse
246244
errUnmarshalExecResult := json.Unmarshal([]byte(pollResult), &response)
247245

internal/test/progress_test.go

Lines changed: 4 additions & 48 deletions
Original file line numberDiff line numberDiff line change
@@ -36,46 +36,6 @@ func choiceProgress(t *testing.T, condition workflow.Condition) (*workflow.Progr
3636
return workflow.InitProgress("abc", wflow), wflow
3737
}
3838

39-
func parallelProgress(t *testing.T) (*workflow.Progress, *workflow.Workflow) {
40-
py, err := initializeExamplePyFunction()
41-
u.AssertNil(t, err)
42-
43-
wflow, err := workflow.NewBuilder().
44-
AddBroadcastFanOutNode(3).
45-
NextFanOutBranch(CreateSequenceWorkflow(py)).
46-
NextFanOutBranch(CreateSequenceWorkflow(py, py)).
47-
NextFanOutBranch(CreateSequenceWorkflow(py, py, py)).
48-
AddFanInNode().
49-
Build()
50-
u.AssertNil(t, err)
51-
52-
return workflow.InitProgress("abc", wflow), wflow
53-
}
54-
55-
func complexProgress(t *testing.T, condition workflow.Condition) (*workflow.Progress, *workflow.Workflow) {
56-
py, err := initializeExamplePyFunction()
57-
u.AssertNil(t, err)
58-
59-
notCondition := workflow.NewPredicate().Not(condition).Build()
60-
61-
wflow, err := workflow.NewBuilder().
62-
AddSimpleNode(py).
63-
AddChoiceNode(
64-
notCondition,
65-
condition,
66-
).
67-
NextBranch(CreateSequenceWorkflow(py)).
68-
NextBranch(workflow.NewBuilder().
69-
AddBroadcastFanOutNode(3).
70-
ForEachParallelBranch(func() (*workflow.Workflow, error) { return CreateSequenceWorkflow(py, py) }).
71-
AddFanInNode().
72-
Build()).
73-
EndChoiceAndBuild()
74-
u.AssertNil(t, err)
75-
76-
return workflow.InitProgress("abc", wflow), wflow
77-
}
78-
7939
func TestProgressMarshaling(t *testing.T) {
8040
condition := workflow.NewPredicate().And(
8141
workflow.NewEqCondition(1, 3),
@@ -84,9 +44,7 @@ func TestProgressMarshaling(t *testing.T) {
8444

8545
progress1, _ := simpleProgress(t)
8646
progress2, _ := choiceProgress(t, condition)
87-
progress3, _ := parallelProgress(t)
88-
progress4, _ := complexProgress(t, condition)
89-
progresses := []*workflow.Progress{progress1, progress2, progress3, progress4}
47+
progresses := []*workflow.Progress{progress1, progress2}
9048

9149
for i, progress := range progresses {
9250
marshal, errMarshal := json.Marshal(progress)
@@ -111,10 +69,8 @@ func TestProgressCache(t *testing.T) {
11169

11270
progress1, workflow1 := simpleProgress(t)
11371
progress2, workflow2 := choiceProgress(t, condition)
114-
progress3, workflow3 := parallelProgress(t)
115-
progress4, workflow4 := complexProgress(t, condition)
116-
progresses := []*workflow.Progress{progress1, progress2, progress3, progress4}
117-
workflows := []*workflow.Workflow{workflow1, workflow2, workflow3, workflow4}
72+
progresses := []*workflow.Progress{progress1, progress2}
73+
workflows := []*workflow.Workflow{workflow1, workflow2}
11874

11975
for i := 0; i < len(workflows); i++ {
12076
progress := progresses[i]
@@ -127,7 +83,7 @@ func TestProgressCache(t *testing.T) {
12783
u.AssertTrueMsg(t, progress.Equals(retrievedProgress), "progresses don't match")
12884

12985
progress.Complete(wflow.Start.Id)
130-
progress.Complete(wflow.Start.GetNext()[0])
86+
progress.Complete(wflow.Start.GetNext())
13187

13288
err = workflow.SaveProgress(progress)
13389
u.AssertNilMsg(t, err, "failed to save after update")

internal/test/util.go

Lines changed: 0 additions & 42 deletions
Original file line numberDiff line numberDiff line change
@@ -324,48 +324,6 @@ func CreateChoiceWorkflow(dagger func() (*workflow.Workflow, error), condArr ...
324324
EndChoiceAndBuild()
325325
}
326326

327-
// CreateScatterSingleFunctionWorkflow if successful, returns a workflow with one fan out, N simple node with the same function
328-
// and then a fan in node that merges all the result in an array.
329-
func CreateScatterSingleFunctionWorkflow(fun *function.Function, fanOutDegree int) (*workflow.Workflow, error) {
330-
return workflow.NewBuilder().
331-
AddScatterFanOutNode(fanOutDegree).
332-
ForEachParallelBranch(func() (*workflow.Workflow, error) { return CreateSequenceWorkflow(fun) }).
333-
AddFanInNode().
334-
Build()
335-
}
336-
337-
// CreateBroadcastWorkflow if successful, returns a workflow with one fan out node, N simple nodes with different functions and a fan in node
338-
// The number of branches is defined by the number of given functions
339-
func CreateBroadcastWorkflow(dagger func() (*workflow.Workflow, error), fanOutDegree int) (*workflow.Workflow, error) {
340-
return workflow.NewBuilder().
341-
AddBroadcastFanOutNode(fanOutDegree).
342-
ForEachParallelBranch(dagger).
343-
AddFanInNode().
344-
Build()
345-
}
346-
347-
// CreateBroadcastMultiFunctionWorkflow if successful, returns a workflow with one fan out node, each branch chained with a different workflow that run in parallel, and a fan in node.
348-
// The number of branch is defined as the number of dagger functions.
349-
func CreateBroadcastMultiFunctionWorkflow(dagger ...func() (*workflow.Workflow, error)) (*workflow.Workflow, error) {
350-
builder := workflow.NewBuilder().
351-
AddBroadcastFanOutNode(len(dagger))
352-
for _, dagFn := range dagger {
353-
builder = builder.NextFanOutBranch(dagFn())
354-
}
355-
return builder.
356-
AddFanInNode().
357-
Build()
358-
}
359-
360-
func GetSingleResult(cer *workflow.ExecutionReport) (string, error) {
361-
if len(cer.Result) == 1 {
362-
for _, value := range cer.Result {
363-
return fmt.Sprintf("%v", value), nil
364-
}
365-
}
366-
return "", fmt.Errorf("there is not exactly one result: there are %d result(s)", len(cer.Result))
367-
}
368-
369327
func GetIntSingleResult(cer *workflow.ExecutionReport) (int, error) {
370328
if len(cer.Result) == 1 {
371329
for _, value := range cer.Result {

internal/test/workflow_integration_test.go

Lines changed: 0 additions & 88 deletions
Original file line numberDiff line numberDiff line change
@@ -234,50 +234,6 @@ func TestInvokeFC_DifferentFunctions(t *testing.T) {
234234
u.AssertNil(t, err3)
235235
}
236236

237-
// TestInvokeFC_BroadcastFanOut executes a Parallel Workflow with N parallel branches
238-
func TestInvokeFC_BroadcastFanOut(t *testing.T) {
239-
if testing.Short() {
240-
t.Skip("Skipping integration test")
241-
}
242-
243-
workflowName := "testBrFO"
244-
// CREATE - we create a test function composition
245-
fDouble, errF1 := InitializePyFunction("double", "handler", function.NewSignature().
246-
AddInput("input", function.Int{}).
247-
AddOutput("result", function.Int{}).
248-
Build())
249-
u.AssertNil(t, errF1)
250-
251-
width := 3
252-
wflow, err := CreateBroadcastWorkflow(func() (*workflow.Workflow, error) { return CreateSequenceWorkflow(fDouble) }, width)
253-
wflow.Name = workflowName
254-
u.AssertNil(t, err)
255-
256-
err1 := wflow.Save()
257-
u.AssertNil(t, err1)
258-
259-
// INVOKE - we call the function composition
260-
params := make(map[string]interface{})
261-
params[fDouble.Signature.GetInputs()[0].Name] = 1
262-
request := workflow.NewRequest(shortuuid.New(), wflow, params)
263-
request.CanDoOffloading = false
264-
err2 := wflow.Invoke(request)
265-
u.AssertNil(t, err2)
266-
267-
// check multiple result
268-
output := request.ExecReport.Result
269-
270-
u.AssertNonNil(t, output)
271-
for i := 0; i < width; i++ {
272-
currOutput := output[fmt.Sprintf("%d", i)].(map[string]interface{})
273-
u.AssertEquals(t, 2, currOutput["result"])
274-
}
275-
276-
// cleaning up function composition and functions
277-
//err3 := workflow.Delete()
278-
//u.AssertNil(t, err3)
279-
}
280-
281237
// TestInvokeFC_Concurrent executes concurrently m times a Sequential Workflow of length N, where each node executes a simple increment function.
282238
func TestInvokeFC_Concurrent(t *testing.T) {
283239

@@ -352,50 +308,6 @@ func TestInvokeFC_Concurrent(t *testing.T) {
352308
u.AssertNil(t, err3)
353309
}
354310

355-
// TestInvokeFC_ScatterFanOut executes a Parallel Workflow with N parallel branches
356-
func TestInvokeFC_ScatterFanOut(t *testing.T) {
357-
if testing.Short() {
358-
t.Skip("Skipping integration test")
359-
}
360-
//for i := 0; i < 1; i++ {
361-
362-
workflowName := "test"
363-
// CREATE - we create a test function composition
364-
fDouble, errF1 := InitializePyFunction("double", "handler", function.NewSignature().
365-
AddInput("input", function.Int{}).
366-
AddOutput("result", function.Int{}).
367-
Build())
368-
u.AssertNil(t, errF1)
369-
370-
width := 3
371-
wflow, err := CreateScatterSingleFunctionWorkflow(fDouble, width)
372-
wflow.Name = workflowName
373-
u.AssertNil(t, err)
374-
375-
err1 := wflow.Save()
376-
u.AssertNil(t, err1)
377-
378-
// INVOKE - we call the function composition
379-
params := make(map[string]interface{})
380-
params[fDouble.Signature.GetInputs()[0].Name] = []int{1, 2, 3}
381-
request := workflow.NewRequest(shortuuid.New(), wflow, params)
382-
request.CanDoOffloading = false
383-
err2 := wflow.Invoke(request)
384-
u.AssertNil(t, err2)
385-
386-
// check multiple result
387-
output := request.ExecReport.Result
388-
u.AssertNonNil(t, output)
389-
for i := 0; i < width; i++ {
390-
currOutput := output[fmt.Sprintf("%d", i)].(map[string]interface{})
391-
u.AssertEquals(t, (i+1)*2, cast.ToInt(currOutput["result"]))
392-
}
393-
394-
// cleaning up function composition and functions
395-
err3 := wflow.Delete()
396-
u.AssertNil(t, err3)
397-
}
398-
399311
func TestInvokeSieveChoice(t *testing.T) {
400312
if testing.Short() {
401313
t.Skip("Skipping integration test")

0 commit comments

Comments
 (0)