Skip to content

Commit 8576169

Browse files
committed
Introduces ReadyToExecute in Progress to replace group
1 parent ea9c23b commit 8576169

10 files changed

Lines changed: 130 additions & 562 deletions

File tree

internal/test/progress_test.go

Lines changed: 6 additions & 241 deletions
Original file line numberDiff line numberDiff line change
@@ -15,7 +15,7 @@ func simpleProgress(t *testing.T) (*workflow.Progress, *workflow.Workflow) {
1515
u.AssertNil(t, err)
1616
wflow, err := CreateSequenceWorkflow(py, py)
1717
u.AssertNil(t, err)
18-
return workflow.InitProgressRecursive("simple", wflow), wflow
18+
return workflow.InitProgress("simple", wflow), wflow
1919
}
2020

2121
func choiceProgress(t *testing.T, condition workflow.Condition) (*workflow.Progress, *workflow.Workflow) {
@@ -34,7 +34,7 @@ func choiceProgress(t *testing.T, condition workflow.Condition) (*workflow.Progr
3434
EndChoiceAndBuild()
3535
u.AssertNil(t, err)
3636

37-
return workflow.InitProgressRecursive("abc", wflow), wflow
37+
return workflow.InitProgress("abc", wflow), wflow
3838
}
3939

4040
func parallelProgress(t *testing.T) (*workflow.Progress, *workflow.Workflow) {
@@ -50,7 +50,7 @@ func parallelProgress(t *testing.T) (*workflow.Progress, *workflow.Workflow) {
5050
Build()
5151
u.AssertNil(t, err)
5252

53-
return workflow.InitProgressRecursive("abc", wflow), wflow
53+
return workflow.InitProgress("abc", wflow), wflow
5454
}
5555

5656
func complexProgress(t *testing.T, condition workflow.Condition) (*workflow.Progress, *workflow.Workflow) {
@@ -74,7 +74,7 @@ func complexProgress(t *testing.T, condition workflow.Condition) (*workflow.Prog
7474
EndChoiceAndBuild()
7575
u.AssertNil(t, err)
7676

77-
return workflow.InitProgressRecursive("abc", wflow), wflow
77+
return workflow.InitProgress("abc", wflow), wflow
7878
}
7979

8080
func TestProgressMarshaling(t *testing.T) {
@@ -127,10 +127,8 @@ func TestProgressCache(t *testing.T) {
127127
u.AssertTrueMsg(t, found, "progress not found")
128128
u.AssertTrueMsg(t, progress.Equals(retrievedProgress), "progresses don't match")
129129

130-
err = progress.CompleteNode(wflow.Start.Id)
131-
u.AssertNilMsg(t, err, "failed to update progress")
132-
err = progress.CompleteNode(wflow.Start.Next)
133-
u.AssertNilMsg(t, err, "failed to update progress")
130+
progress.CompleteNode(wflow.Start.Id)
131+
progress.CompleteNode(wflow.Start.Next)
134132

135133
err = workflow.SaveProgress(progress, cache.Persist)
136134
u.AssertNilMsg(t, err, "failed to save after update")
@@ -148,236 +146,3 @@ func TestProgressCache(t *testing.T) {
148146
u.AssertFalseMsg(t, found, "progress should have been deleted")
149147
}
150148
}
151-
152-
// TestProgressSequence tests a sequence workflow with 2 simple node
153-
func TestProgressSequence(t *testing.T) {
154-
progress, workflow := simpleProgress(t)
155-
156-
// Start node
157-
checkAndCompleteNext(t, progress, workflow)
158-
159-
// Simple Node 1
160-
checkAndCompleteNext(t, progress, workflow)
161-
162-
// Simple Node 2
163-
checkAndCompleteNext(t, progress, workflow)
164-
165-
// End node
166-
checkAndCompleteNext(t, progress, workflow)
167-
168-
// End of workflow
169-
finishProgress(t, progress)
170-
}
171-
172-
// TestProgressChoice1 tests the left branch
173-
func TestProgressChoice1(t *testing.T) {
174-
condition := workflow.NewPredicate().And(
175-
workflow.NewEqCondition(1, 3),
176-
workflow.NewGreaterCondition(1, 3),
177-
).Build()
178-
progress, wflow := choiceProgress(t, condition)
179-
180-
// Start node
181-
checkAndCompleteNext(t, progress, wflow)
182-
183-
// Choice node
184-
choice := checkAndCompleteNext(t, progress, wflow).(*workflow.ChoiceNode)
185-
186-
// Simple node (left) // suppose the left condition is true
187-
checkAndCompleteChoice(t, progress, choice, wflow, 0)
188-
189-
// End
190-
checkAndCompleteNext(t, progress, wflow)
191-
192-
// End of workflow
193-
finishProgress(t, progress)
194-
}
195-
196-
// TestProgressChoice2 tests the right branch
197-
func TestProgressChoice2(t *testing.T) {
198-
condition := workflow.NewPredicate().And(
199-
workflow.NewEqCondition(1, 1),
200-
workflow.NewGreaterCondition(5, 3),
201-
).Build()
202-
progress, wflow := choiceProgress(t, condition)
203-
204-
// Start node
205-
checkAndCompleteNext(t, progress, wflow)
206-
207-
// Choice node
208-
choice := checkAndCompleteNext(t, progress, wflow).(*workflow.ChoiceNode)
209-
210-
// Simple Node left is skipped, right is executed
211-
expectedMatch := 1
212-
checkAndCompleteChoice(t, progress, choice, wflow, expectedMatch)
213-
214-
// Simple Node right 2
215-
checkAndCompleteNext(t, progress, wflow)
216-
217-
// End node
218-
checkAndCompleteNext(t, progress, wflow)
219-
220-
// End of workflow
221-
finishProgress(t, progress)
222-
}
223-
224-
func TestParallelProgress(t *testing.T) {
225-
progress, wflow := parallelProgress(t)
226-
227-
// Start node
228-
checkAndCompleteNext(t, progress, wflow)
229-
230-
// FanOut node
231-
checkAndCompleteNext(t, progress, wflow)
232-
233-
// 3 Simple Nodes in parallel
234-
checkAndCompleteMultiple(t, progress, wflow)
235-
// simpleNode1 := fanOut.GetNext()[0]
236-
// simpleNode2 := fanOut.GetNext()[1]
237-
// simpleNode3 := fanOut.GetNext()[2]
238-
239-
// 2 Simple Nodes in parallel // here should get two nodes
240-
checkAndCompleteMultiple(t, progress, wflow)
241-
// nextNode = progress.NextNodes()
242-
// simpleNodeCentral2 := simpleNode2.GetNext()[0]
243-
// u.AssertEquals(t, nextNode[0], simpleNodeCentral2.GetId())
244-
// u.AssertEquals(t, 3, progress.GetGroup(nextNode[0]))
245-
// err = progress.CompleteNode(nextNode[0])
246-
// u.AssertNil(t, err)
247-
// simpleNodeCentral3 := simpleNode3.GetNext()[0]
248-
// u.AssertEquals(t, nextNode[1], simpleNodeCentral3.GetId())
249-
// u.AssertEquals(t, 3, progress.GetGroup(nextNode[1]))
250-
// err = progress.CompleteNode(nextNode[1])
251-
// u.AssertNil(t, err)
252-
253-
// 1 Simple node (parallel) right, bottom
254-
checkAndCompleteMultiple(t, progress, wflow)
255-
// nextNode = progress.NextNodes()
256-
// simpleNodeBottom3 := simpleNodeCentral3.GetNext()[0]
257-
// u.AssertEquals(t, nextNode[0], simpleNodeBottom3.GetId())
258-
// u.AssertEquals(t, 4, progress.GetGroup(nextNode[0]))
259-
// err = progress.CompleteNode(nextNode[0])
260-
// u.AssertNil(t, err)
261-
262-
// Fan in
263-
checkAndCompleteNext(t, progress, wflow)
264-
265-
// End node
266-
checkAndCompleteNext(t, progress, wflow)
267-
268-
// End of workflow
269-
finishProgress(t, progress)
270-
}
271-
272-
func TestComplexProgress(t *testing.T) {
273-
condition := workflow.NewPredicate().And(
274-
workflow.NewEqCondition(1, 3),
275-
workflow.NewGreaterCondition(1, 3),
276-
).Build()
277-
progress, wflow := complexProgress(t, condition)
278-
279-
// Start node
280-
checkAndCompleteNext(t, progress, wflow)
281-
282-
// SimpleNode
283-
checkAndCompleteNext(t, progress, wflow)
284-
285-
// Choice
286-
choice := checkAndCompleteNext(t, progress, wflow).(*workflow.ChoiceNode)
287-
288-
// Simple Node, FanOut
289-
checkAndCompleteChoice(t, progress, choice, wflow, 0)
290-
291-
// End node
292-
checkAndCompleteNext(t, progress, wflow)
293-
294-
// End of workflow
295-
finishProgress(t, progress)
296-
}
297-
298-
func TestComplexProgress2(t *testing.T) {
299-
condition := workflow.NewPredicate().And(
300-
workflow.NewEqCondition(1, 1),
301-
workflow.NewGreaterCondition(4, 3),
302-
).Build()
303-
progress, wflow := complexProgress(t, condition)
304-
305-
// Start node
306-
checkAndCompleteNext(t, progress, wflow)
307-
308-
// Simple Node
309-
checkAndCompleteNext(t, progress, wflow)
310-
311-
// Choice
312-
choice := checkAndCompleteNext(t, progress, wflow).(*workflow.ChoiceNode)
313-
314-
// Simple Node, FanOut // suppose the fanout node at the right and all its children are skipped
315-
checkAndCompleteChoice(t, progress, choice, wflow, 1)
316-
317-
// 3 Simple Nodes in parallel
318-
checkAndCompleteMultiple(t, progress, wflow)
319-
320-
// 3 other Simple Nodes
321-
checkAndCompleteMultiple(t, progress, wflow)
322-
323-
// Fan in
324-
checkAndCompleteNext(t, progress, wflow)
325-
326-
// End node
327-
checkAndCompleteNext(t, progress, wflow)
328-
329-
// End of workflow
330-
finishProgress(t, progress)
331-
}
332-
333-
func checkAndCompleteNext(t *testing.T, progress *workflow.Progress, workflow *workflow.Workflow) workflow.Task {
334-
nextNode, err := progress.NextNodes()
335-
u.AssertNil(t, err)
336-
nodeId := nextNode[0]
337-
node, ok := workflow.Find(nodeId)
338-
u.AssertTrue(t, ok)
339-
u.AssertEquals(t, nodeId, node.GetId())
340-
u.AssertEquals(t, progress.NextGroup, progress.GetGroup(nodeId))
341-
err = progress.CompleteNode(nodeId)
342-
u.AssertNil(t, err)
343-
return node
344-
}
345-
346-
func checkAndCompleteChoice(t *testing.T, progress *workflow.Progress, choice *workflow.ChoiceNode, workflow *workflow.Workflow, conditionToMatch int) {
347-
nextNode, err := progress.NextNodes() // Simple1, Simple2
348-
u.AssertNil(t, err)
349-
simpleNodeLeft := choice.Alternatives[0]
350-
fanOut := choice.Alternatives[1]
351-
u.AssertEquals(t, nextNode[0], simpleNodeLeft)
352-
u.AssertEquals(t, nextNode[1], fanOut)
353-
u.AssertEquals(t, progress.NextGroup, progress.GetGroup(nextNode[0]))
354-
u.AssertEquals(t, progress.NextGroup, progress.GetGroup(nextNode[1]))
355-
356-
err = progress.CompleteNode(nextNode[conditionToMatch])
357-
u.AssertNil(t, err)
358-
nodeToSkip := choice.GetNodesToSkip(workflow, conditionToMatch)
359-
err = progress.SkipAll(nodeToSkip)
360-
u.AssertNil(t, err)
361-
}
362-
363-
func checkAndCompleteMultiple(t *testing.T, progress *workflow.Progress, wflow *workflow.Workflow) []workflow.Task {
364-
nextNode, err := progress.NextNodes()
365-
completedNodes := make([]workflow.Task, 0)
366-
u.AssertNil(t, err)
367-
for _, nodeId := range nextNode {
368-
node, ok := wflow.Find(nodeId)
369-
u.AssertTrue(t, ok)
370-
u.AssertEquals(t, nodeId, node.GetId())
371-
u.AssertEquals(t, progress.NextGroup, progress.GetGroup(nodeId))
372-
err = progress.CompleteNode(nodeId)
373-
u.AssertNil(t, err)
374-
completedNodes = append(completedNodes, node)
375-
}
376-
return completedNodes
377-
}
378-
379-
func finishProgress(t *testing.T, progress *workflow.Progress) {
380-
nothing, err := progress.NextNodes()
381-
u.AssertNil(t, err)
382-
u.AssertEmptySlice(t, nothing)
383-
}

internal/workflow/choice_task.go

Lines changed: 5 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -90,16 +90,15 @@ func (workflow *Workflow) executeChoice(progress *Progress, input *PartialData,
9090

9191
// for task node, we skip all branch that will not be executed
9292
nodesToSkip := task.GetNodesToSkip(workflow, matchedCondition)
93-
errSkip := progress.SkipAll(nodesToSkip)
94-
if errSkip != nil {
95-
return outputData, progress, false, errSkip
93+
for _, node := range nodesToSkip {
94+
progress.SkipNode(node.GetId())
9695
}
9796

98-
err := progress.CompleteNode(task.GetId())
97+
progress.CompleteNode(task.GetId())
98+
err := progress.AddReadyTask(nextTaskId)
9999
if err != nil {
100-
return outputData, progress, false, err
100+
return nil, progress, false, err
101101
}
102-
103102
return outputData, progress, true, nil
104103
}
105104

internal/workflow/end_task.go

Lines changed: 1 addition & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -42,10 +42,7 @@ func (e *EndNode) Equals(cmp types.Comparable) bool {
4242
}
4343

4444
func (workflow *Workflow) executeEnd(progress *Progress, partialData *PartialData, node *EndNode, r *Request) (*PartialData, *Progress, bool, error) {
45-
err := progress.CompleteNode(node.Id)
46-
if err != nil {
47-
return partialData, progress, false, err
48-
}
45+
progress.CompleteNode(node.Id)
4946
return partialData, progress, false, nil // false because we want to stop when reaching the end
5047
}
5148

internal/workflow/fail_task.go

Lines changed: 1 addition & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -47,10 +47,7 @@ func (workflow *Workflow) executeFail(progress *Progress, task *FailNode, r *Req
4747
output[task.Error] = task.Cause
4848
outputData := NewPartialData(ReqId(r.Id), forNode, task.GetId(), output)
4949

50-
err := progress.CompleteNode(task.GetId())
51-
if err != nil {
52-
return outputData, progress, false, err
53-
}
50+
progress.CompleteNode(task.GetId())
5451

5552
shouldContinueExecution := task.GetNodeType() != Fail && task.GetNodeType() != Succeed
5653
return outputData, progress, shouldContinueExecution, nil

internal/workflow/fanin_task.go

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -27,7 +27,8 @@ func NewFanInNode(fanInDegree int) *FanInNode {
2727

2828
func (workflow *Workflow) executeFanIn(progress *Progress, input *PartialData, task *FanInNode, r *Request) (*PartialData, *Progress, bool, error) {
2929
outputData := NewPartialData(ReqId(r.Id), task.GetNext()[0], task.GetId(), input.Data)
30-
err := progress.CompleteNode(task.GetId())
30+
progress.CompleteNode(task.GetId())
31+
err := progress.AddReadyTask(task.GetNext()[0])
3132
if err != nil {
3233
return nil, progress, false, err
3334
}

internal/workflow/fanout_task.go

Lines changed: 8 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -62,8 +62,7 @@ func (workflow *Workflow) executeFanOut(progress *Progress, input *PartialData,
6262
// input -> output: map["input":1] -> map["0":map["input":1], "1":map["input":1]]
6363
if task.Type == Broadcast {
6464
for _, nextNode := range task.GetNext() {
65-
// TODO: check if a copy of params[0] is needed
66-
output[string(nextNode)] = input.Data // simply returns input, that will be copied to each subsequent node
65+
output[string(nextNode)] = maps.Clone(input.Data) // simply returns input, that will be copied to each subsequent node
6766
}
6867
} else if task.Type == Scatter { // scatter only accepts an array with exactly fanOutDegree elements. However, multiple input values are allowed
6968
if len(input.Data) != 1 {
@@ -97,10 +96,14 @@ func (workflow *Workflow) executeFanOut(progress *Progress, input *PartialData,
9796
outputData := NewPartialData(ReqId(r.Id), "", task.GetId(), output)
9897
//newOutputDataMap := make(map[string]interface{}) // TODO: consider using a map of PartialData rather than a single PartialData object
9998

100-
err := progress.CompleteNode(task.GetId())
101-
if err != nil {
102-
return nil, progress, false, err
99+
progress.CompleteNode(task.GetId())
100+
for _, nextNode := range task.GetNext() {
101+
err := progress.AddReadyTask(nextNode)
102+
if err != nil {
103+
return nil, progress, false, err
104+
}
103105
}
106+
104107
return outputData, progress, true, nil
105108
}
106109

0 commit comments

Comments
 (0)