Skip to content

Commit 975d3ae

Browse files
committed
Deletes branch IDs and nodeset.go
1 parent b2312cc commit 975d3ae

14 files changed

Lines changed: 10 additions & 267 deletions

internal/test/workflow_test.go

Lines changed: 5 additions & 49 deletions
Original file line numberDiff line numberDiff line change
@@ -91,7 +91,6 @@ func TestSimpleWorkflow(t *testing.T) {
9191
nextNodeId := prevNode.GetNext()[0]
9292
currentNode, _ = wflow.Find(nextNodeId)
9393
u.AssertEquals(t, prevNode.(*workflow.SimpleNode).OutputTo, currentNode.GetId())
94-
u.AssertEquals(t, prevNode.(*workflow.SimpleNode).BranchId, 0)
9594
u.AssertTrue(t, prevNode.(*workflow.SimpleNode).Func == f.Name)
9695
}
9796
prevNode = currentNode
@@ -135,15 +134,8 @@ func TestChoiceWorkflow(t *testing.T) {
135134
u.AssertTrue(t, foundS)
136135
u.AssertEquals(t, simple.(*workflow.SimpleNode).OutputTo, wflow.End.GetId())
137136
}
138-
u.AssertEqualsMsg(t, 0, n.GetBranchId(), "wrong branchId for choice node")
139137
case *workflow.SimpleNode:
140138
u.AssertTrue(t, n.(*workflow.SimpleNode).Func == f.Name)
141-
for i, alternative := range choice.Alternatives {
142-
// the branch of the simple nodes should be 1,2 or 3 because branch of choice is 0
143-
if alternative == n.GetId() {
144-
u.AssertEqualsMsg(t, i+1, n.GetBranchId(), "wrong branchId for simple node")
145-
}
146-
}
147139
}
148140
}
149141
}
@@ -185,7 +177,6 @@ func TestChoiceWorkflow_BuiltWithNextBranch(t *testing.T) {
185177
for _, n := range wflow.Nodes {
186178
switch node := n.(type) {
187179
case *workflow.ChoiceNode:
188-
u.AssertEquals(t, node.GetBranchId(), 0)
189180
u.AssertEquals(t, len(choice.Conditions), len(choice.Alternatives))
190181
for _, s := range choice.Alternatives {
191182
simple, foundS := wflow.Find(s)
@@ -196,13 +187,6 @@ func TestChoiceWorkflow_BuiltWithNextBranch(t *testing.T) {
196187
}
197188
case *workflow.SimpleNode:
198189
u.AssertTrue(t, node.Func == f.Name)
199-
for i, alternative := range choice.Alternatives {
200-
// the branch of the simple nodes should be 1,2 or 3 because branch of choice is 0
201-
if alternative == n.GetId() {
202-
u.AssertEqualsMsg(t, i+1, n.GetBranchId(), "wrong branchId for simple node")
203-
}
204-
}
205-
u.AssertTrue(t, node.GetBranchId() > 0)
206190
}
207191
}
208192
}
@@ -238,20 +222,16 @@ func TestBroadcastWorkflow(t *testing.T) {
238222
fanOut := n.(*workflow.FanOutNode)
239223
u.AssertEquals(t, len(fanOut.OutputTo), fanOut.FanOutDegree)
240224
u.AssertEquals(t, width, fanOut.FanOutDegree)
241-
for i, s := range fanOut.OutputTo {
242-
simple, found := wflow.Find(s)
225+
for _, s := range fanOut.OutputTo {
226+
_, found := wflow.Find(s)
243227
u.AssertTrue(t, found)
244-
u.AssertEquals(t, simple.GetBranchId(), i+1)
245228
}
246-
u.AssertEquals(t, n.GetBranchId(), 0)
247229
case *workflow.FanInNode:
248230
fanIn := n.(*workflow.FanInNode)
249231
u.AssertEquals(t, width, fanIn.FanInDegree)
250232
u.AssertEquals(t, wflow.End.GetId(), fanIn.OutputTo)
251-
u.AssertEquals(t, n.GetBranchId(), 4)
252233
case *workflow.SimpleNode:
253234
u.AssertTrue(t, n.(*workflow.SimpleNode).Func == f.Name)
254-
u.AssertTrue(t, n.GetBranchId() > 0 && n.GetBranchId() < 4)
255235
default:
256236
continue
257237
}
@@ -285,23 +265,19 @@ func TestScatterWorkflow(t *testing.T) {
285265
fanOut := node
286266
u.AssertEquals(t, len(fanOut.OutputTo), fanOut.FanOutDegree)
287267
u.AssertEquals(t, width, fanOut.FanOutDegree)
288-
for j, s := range fanOut.OutputTo {
289-
simple, foundSimple := wflow.Find(s)
268+
for _, s := range fanOut.OutputTo {
269+
_, foundSimple := wflow.Find(s)
290270
u.AssertTrue(t, foundSimple)
291-
u.AssertEquals(t, simple.GetBranchId(), j+1)
292271
}
293-
u.AssertEquals(t, node.GetBranchId(), 0)
294272
case *workflow.FanInNode:
295273
fanIn := node
296274
u.AssertEquals(t, width, fanIn.FanInDegree)
297275
u.AssertEquals(t, wflow.End.GetId(), fanIn.OutputTo)
298-
u.AssertEquals(t, fanIn.GetBranchId(), fanIn.FanInDegree+1)
299276
case *workflow.SimpleNode:
300277
u.AssertTrue(t, n.(*workflow.SimpleNode).Func == f.Name)
301278
outputTo, _ := wflow.Find(node.OutputTo)
302279
_, chainedToFanIn := outputTo.(*workflow.FanInNode)
303280
u.AssertTrue(t, chainedToFanIn)
304-
u.AssertTrue(t, n.GetBranchId() > 0 && n.GetBranchId() < 4)
305281
simpleNodeChainedToFanIn++
306282
default:
307283
continue
@@ -325,7 +301,6 @@ func TestCreateBroadcastMultiFunctionWorkflow(t *testing.T) {
325301
)
326302
u.AssertNil(t, errWorkflow)
327303
startNext, startNextFound := wflow.Find(wflow.Start.Next)
328-
fanOutDegree := startNext.(*workflow.FanOutNode).FanOutDegree
329304

330305
u.AssertNonNil(t, wflow.Start)
331306
u.AssertNonNil(t, wflow.End)
@@ -346,26 +321,17 @@ func TestCreateBroadcastMultiFunctionWorkflow(t *testing.T) {
346321
u.AssertEquals(t, len(fanOut.OutputTo), fanOut.FanOutDegree)
347322
// test that there are simple nodes chained to fan out
348323
for _, s := range fanOut.OutputTo {
349-
simple, foundSimple := wflow.Find(s)
324+
_, foundSimple := wflow.Find(s)
350325
u.AssertTrue(t, foundSimple)
351-
for i, branch := range fanOut.OutputTo {
352-
// the branch of the simple nodes should be 1,2 or 3 because branch of choice is 0
353-
if branch == simple.GetId() {
354-
u.AssertEqualsMsg(t, i+1, simple.GetBranchId(), "wrong branchId for simple node")
355-
}
356-
}
357326
}
358-
u.AssertEqualsMsg(t, 0, fanOut.GetBranchId(), "wrong branchId for fanOut")
359327
case *workflow.FanInNode:
360328
fanIn := node
361329
u.AssertEquals(t, wflow.Width, fanIn.FanInDegree)
362330
u.AssertEquals(t, wflow.End.GetId(), fanIn.OutputTo)
363-
u.AssertEquals(t, fanIn.GetBranchId(), fanIn.FanInDegree+1)
364331
default:
365332
continue
366333
case *workflow.SimpleNode:
367334
u.AssertTrue(t, node.Func == f.Name)
368-
u.AssertTrue(t, node.GetBranchId() > 0 && node.GetBranchId() < fanOutDegree+1)
369335
outputTo, _ := wflow.Find(node.OutputTo)
370336
if _, ok := outputTo.(*workflow.FanInNode); ok {
371337
simpleNodeChainedToFanIn++
@@ -417,7 +383,6 @@ func TestWorkflowBuilder(t *testing.T) {
417383
fanOut := node
418384
u.AssertEquals(t, len(fanOut.OutputTo), fanOut.FanOutDegree)
419385
u.AssertEquals(t, width, fanOut.FanOutDegree)
420-
u.AssertEqualsMsg(t, 2, fanOut.GetBranchId(), "fan out has wrong branchId")
421386
for _, s := range fanOut.OutputTo {
422387
_, found := wflow.Find(s)
423388
u.AssertTrue(t, found)
@@ -426,19 +391,11 @@ func TestWorkflowBuilder(t *testing.T) {
426391
fanIn := node
427392
u.AssertEquals(t, width, fanIn.FanInDegree)
428393
u.AssertEquals(t, wflow.End.GetId(), fanIn.OutputTo)
429-
u.AssertEqualsMsg(t, 6, fanIn.GetBranchId(), "wrong branchId for fan in")
430394
case *workflow.SimpleNode:
431395
u.AssertTrue(t, node.Func == f.Name)
432396
nextNode, _ := wflow.Find(node.GetNext()[0])
433397
if _, ok := nextNode.(*workflow.FanInNode); ok {
434398
simpleNodeChainedToFanIn++
435-
u.AssertTrueMsg(t, node.GetBranchId() > 2 && node.GetBranchId() < 6, "wrong branch for simple node connected to fanIn input") // the parallel branches of fan out node
436-
} else if _, ok2 := nextNode.(*workflow.ChoiceNode); ok2 {
437-
u.AssertEqualsMsg(t, 0, node.GetBranchId(), "wrong branch for simpleNode connected to choice node input") // the first simple node
438-
} else if _, ok3 := nextNode.(*workflow.EndNode); ok3 {
439-
u.AssertEqualsMsg(t, 1, node.GetBranchId(), "wrong branch for simpleNode connected to choice alternative 1") // the first branch of choice node
440-
} else {
441-
u.AssertTrueMsg(t, node.GetBranchId() > 2 && node.GetBranchId() < 6, "wrong branch for simple node inside parallel section") // the parallel branches of fan out node
442399
}
443400
case *workflow.ChoiceNode:
444401
choice := node
@@ -453,7 +410,6 @@ func TestWorkflowBuilder(t *testing.T) {
453410
u.AssertTrue(t, foundAlt0)
454411
u.AssertTrue(t, foundAlt1)
455412
u.AssertEquals(t, firstAlternative.OutputTo, wflow.End.GetId())
456-
u.AssertEquals(t, choice.GetBranchId(), 0)
457413
// checking fan out - simples - fan in
458414
for i := range secondAlternative.OutputTo {
459415
secondAltOutput, _ := wflow.Find(secondAlternative.OutputTo[i])

internal/workflow/builder.go

Lines changed: 0 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -63,7 +63,6 @@ func (b *Builder) AddSimpleNode(f *function.Function) *Builder {
6363
}
6464

6565
simpleNode := NewSimpleNode(f.Name)
66-
simpleNode.setBranchId(b.BranchNumber)
6766

6867
b.workflow.addNode(simpleNode)
6968
err := b.workflow.chain(b.prevNode, simpleNode)
@@ -87,7 +86,6 @@ func (b *Builder) AddSimpleNodeWithId(f *function.Function, id string) *Builder
8786

8887
simpleNode := NewSimpleNode(f.Name)
8988
simpleNode.Id = TaskId(id)
90-
simpleNode.setBranchId(b.BranchNumber)
9189

9290
b.workflow.addNode(simpleNode)
9391
err := b.workflow.chain(b.prevNode, simpleNode)
@@ -111,7 +109,6 @@ func (b *Builder) AddChoiceNode(conditions ...Condition) *ChoiceBranchBuilder {
111109

112110
// fmt.Println("Added choice node to Workflow")
113111
choiceNode := NewChoiceNode(conditions)
114-
choiceNode.setBranchId(b.BranchNumber)
115112
b.branches = len(conditions)
116113
b.workflow.addNode(choiceNode)
117114
err := b.workflow.chain(b.prevNode, choiceNode)
@@ -141,7 +138,6 @@ func (b *Builder) AddScatterFanOutNode(fanOutDegree int) *ParallelScatterBranchB
141138
return &ParallelScatterBranchBuilder{builder: b, terminalNodes: make([]Task, 0)}
142139
}
143140
fanOut := NewFanOutNode(fanOutDegree, Scatter)
144-
fanOut.setBranchId(b.BranchNumber)
145141
b.workflow.addNode(fanOut)
146142
err := b.workflow.chain(b.prevNode, fanOut)
147143
if err != nil {
@@ -163,7 +159,6 @@ func (b *Builder) AddBroadcastFanOutNode(fanOutDegree int) *ParallelBroadcastBra
163159
return &ParallelBroadcastBranchBuilder{builder: b, completed: 0, terminalNodes: make([]Task, 0)}
164160
}
165161
fanOut := NewFanOutNode(fanOutDegree, Broadcast)
166-
fanOut.setBranchId(b.BranchNumber)
167162
b.workflow.addNode(fanOut)
168163
err := b.workflow.chain(b.prevNode, fanOut)
169164
if err != nil {
@@ -194,7 +189,6 @@ func (c *ChoiceBranchBuilder) NextBranch(toMerge *Workflow, err1 error) *ChoiceB
194189
//fmt.Println("Added simple node to a branch in choice node of Workflow")
195190
if c.HasNextBranch() {
196191
c.builder.BranchNumber++
197-
baseBranchNumber := c.builder.BranchNumber
198192
// getting start.Next from the toMerge
199193
startNext, _ := toMerge.Find(toMerge.Start.Next)
200194
// chains the alternative to the input workflow, which is already connected to a whole series of nodes
@@ -213,15 +207,12 @@ func (c *ChoiceBranchBuilder) NextBranch(toMerge *Workflow, err1 error) *ChoiceB
213207
continue
214208
case *FanOutNode:
215209
c.builder.workflow.addNode(n)
216-
n.setBranchId(n.GetBranchId() + baseBranchNumber)
217210
continue
218211
case *ChoiceNode:
219212
c.builder.workflow.addNode(n)
220-
n.setBranchId(n.GetBranchId() + baseBranchNumber)
221213
continue
222214
default:
223215
c.builder.workflow.addNode(n)
224-
n.setBranchId(n.GetBranchId() + baseBranchNumber)
225216
nextNode, _ := toMerge.Find(n.GetNext()[0])
226217
// chain the last node(s) of the input workflow to the end node of the building workflow
227218

@@ -333,7 +324,6 @@ func (c *ChoiceBranchBuilder) ForEachBranch(dagger func() (*Workflow, error)) *C
333324
c.builder.appendError(errFanout)
334325
continue
335326
default:
336-
n.setBranchId(c.builder.BranchNumber)
337327
c.builder.workflow.addNode(n)
338328
// chain the last node(s) of the input workflow to the end node of the building workflow
339329
if n.GetNext() != nil && len(n.GetNext()) > 0 && n.GetNext()[0] == workflowCopy.End.GetId() {
@@ -384,7 +374,6 @@ func (p *ParallelBroadcastBranchBuilder) ForEachParallelBranch(dagger func() (*W
384374
continue
385375
default:
386376
p.builder.workflow.addNode(n)
387-
n.setBranchId(p.builder.BranchNumber)
388377
if n.GetNext() != nil && len(n.GetNext()) > 0 && n.GetNext()[0] == workflowCopy.End.GetId() {
389378
p.terminalNodes = append(p.terminalNodes, n) // we do not chain to end node, only add to terminal nodes, so that we can chain to a fan in later
390379
}
@@ -429,7 +418,6 @@ func (p *ParallelScatterBranchBuilder) ForEachParallelBranch(dagger func() (*Wor
429418
continue
430419
default:
431420
p.builder.workflow.addNode(n)
432-
n.setBranchId(p.builder.BranchNumber)
433421
if n.GetNext() != nil && len(n.GetNext()) > 0 && n.GetNext()[0] == workflowCopy.End.GetId() {
434422
p.terminalNodes = append(p.terminalNodes, n) // we do not chain to end node, only add to terminal nodes, so that we can chain to a fan in later
435423
}
@@ -447,7 +435,6 @@ func (p *ParallelScatterBranchBuilder) AddFanInNode() *Builder {
447435
workflow := &p.builder.workflow
448436
fanInNode := NewFanInNode(p.builder.prevNode.Width())
449437
p.builder.BranchNumber++
450-
fanInNode.setBranchId(p.builder.BranchNumber)
451438
// TODO: set fanin inside fanout, so that we know which fanin are dealing with
452439
for _, n := range p.terminalNodes {
453440
// terminal nodes
@@ -474,7 +461,6 @@ func (p *ParallelBroadcastBranchBuilder) AddFanInNode() *Builder {
474461
workflow := &p.builder.workflow
475462
fanInNode := NewFanInNode(p.builder.prevNode.Width())
476463
p.builder.BranchNumber++
477-
fanInNode.setBranchId(p.builder.BranchNumber)
478464
for _, n := range p.terminalNodes {
479465
// terminal nodes
480466
errAdd := n.AddOutput(workflow, fanInNode.GetId())
@@ -528,7 +514,6 @@ func (p *ParallelBroadcastBranchBuilder) NextFanOutBranch(toMerge *Workflow, err
528514
continue
529515
default:
530516
p.builder.workflow.addNode(n)
531-
n.setBranchId(p.builder.BranchNumber)
532517
if n.GetNext() != nil && len(n.GetNext()) > 0 && n.GetNext()[0] == toMerge.End.GetId() {
533518
p.terminalNodes = append(p.terminalNodes, n)
534519
}
@@ -556,7 +541,6 @@ func (b *Builder) AddFailNodeAndBuild(errorName, errorMessage string) (*Workflow
556541
}
557542

558543
failNode := NewFailNode(errorName, errorMessage)
559-
failNode.setBranchId(b.BranchNumber)
560544

561545
b.workflow.addNode(failNode)
562546
err := b.workflow.chain(b.prevNode, failNode)
@@ -574,7 +558,6 @@ func (b *Builder) AddSucceedNodeAndBuild(message string) (*Workflow, error) {
574558
}
575559

576560
succeedNode := NewSucceedNode(message)
577-
succeedNode.setBranchId(b.BranchNumber)
578561

579562
b.workflow.addNode(succeedNode)
580563
err := b.workflow.chain(b.prevNode, succeedNode)
@@ -593,7 +576,6 @@ func (b *Builder) AddPassNode(result string) *Builder {
593576
}
594577

595578
passNode := NewPassNode(result)
596-
passNode.setBranchId(b.BranchNumber)
597579

598580
b.workflow.addNode(passNode)
599581
err := b.workflow.chain(b.prevNode, passNode)

internal/workflow/choice_task.go

Lines changed: 0 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -15,7 +15,6 @@ import (
1515
type ChoiceNode struct {
1616
Id TaskId
1717
NodeType TaskType
18-
BranchId int
1918
Alternatives []TaskId
2019
Conditions []Condition
2120
}
@@ -180,14 +179,6 @@ func (c *ChoiceNode) Name() string {
180179
}
181180
}
182181

183-
func (c *ChoiceNode) setBranchId(number int) {
184-
c.BranchId = number
185-
}
186-
187-
func (c *ChoiceNode) GetBranchId() int {
188-
return c.BranchId
189-
}
190-
191182
func (c *ChoiceNode) String() string {
192183
conditions := "<"
193184
for i, condFn := range c.Conditions {

internal/workflow/fail_task.go

Lines changed: 1 addition & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -13,7 +13,6 @@ type FailNode struct {
1313
Cause string
1414

1515
OutputTo TaskId
16-
BranchId int
1716
}
1817

1918
func NewFailNode(error, cause string) *FailNode {
@@ -38,8 +37,7 @@ func (f *FailNode) Equals(cmp types.Comparable) bool {
3837
f.NodeType == f2.NodeType &&
3938
f.Error == f2.Error &&
4039
f.Cause == f2.Cause &&
41-
f.OutputTo == f2.OutputTo &&
42-
f.BranchId == f2.BranchId
40+
f.OutputTo == f2.OutputTo
4341
}
4442

4543
func (workflow *Workflow) executeFail(progress *Progress, task *FailNode, r *Request) (*PartialData, *Progress, bool, error) {
@@ -87,14 +85,6 @@ func (f *FailNode) GetId() TaskId {
8785
return f.Id
8886
}
8987

90-
func (f *FailNode) setBranchId(number int) {
91-
f.BranchId = number
92-
}
93-
94-
func (f *FailNode) GetBranchId() int {
95-
return f.BranchId
96-
}
97-
9888
func (f *FailNode) GetNodeType() TaskType {
9989
return f.NodeType
10090
}

internal/workflow/fanin_task.go

Lines changed: 0 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -10,7 +10,6 @@ import (
1010
type FanInNode struct {
1111
Id TaskId
1212
NodeType TaskType
13-
BranchId int
1413
OutputTo TaskId
1514
FanInDegree int
1615
}
@@ -67,13 +66,6 @@ func (f *FanInNode) String() string {
6766
return fmt.Sprintf("[FanInNode(%d)]", f.FanInDegree)
6867
}
6968

70-
func (f *FanInNode) setBranchId(number int) {
71-
f.BranchId = number
72-
}
73-
func (f *FanInNode) GetBranchId() int {
74-
return f.BranchId
75-
}
76-
7769
func (f *FanInNode) GetId() TaskId {
7870
return f.Id
7971
}

0 commit comments

Comments
 (0)