Skip to content

Commit 4e19087

Browse files
authored
feat: implement workflow validation to reject duplicate edges and remove redundant edge filtering logic (#808)
1 parent 01c5d96 commit 4e19087

4 files changed

Lines changed: 85 additions & 19 deletions

File tree

workflow/validation.go

Lines changed: 30 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -29,6 +29,11 @@ var ErrNoStartNode = errors.New("no start node found")
2929
// ErrNodePointsToStart is returned when a node points to the start node.
3030
var ErrNodePointsToStart = errors.New("node points to start node")
3131

32+
// ErrDuplicateEdge is returned when an edge set contains two identical edges.
33+
// Two edges with the same (From, To) are rejected regardless of Route; use
34+
// MultiRoute to express alternatives to the same target.
35+
var ErrDuplicateEdge = errors.New("duplicate edge")
36+
3237
// ErrMultipleDefaultRoutes is returned when a node has more than one default route.
3338
var ErrMultipleDefaultRoutes = errors.New("node has more than one default route")
3439

@@ -97,7 +102,10 @@ func validateStartNodeNoIncoming(edges []Edge) error {
97102
}
98103

99104
// validateWorkflow executes a set of workflow validation checks.
100-
func validateWorkflow(workflow *Workflow) error {
105+
func validateWorkflow(workflow *graph) error {
106+
if err := validateUniqueEdges(workflow); err != nil {
107+
return err
108+
}
101109
if err := validateDefaultRoute(workflow); err != nil {
102110
return err
103111
}
@@ -107,9 +115,25 @@ func validateWorkflow(workflow *Workflow) error {
107115
return nil
108116
}
109117

118+
// validateUniqueEdges checks that there are no duplicate edges in the workflow.
119+
// Two edges with the same (From, To) are rejected regardless of Route; use
120+
// MultiRoute to express alternatives to the same target.
121+
func validateUniqueEdges(workflow *graph) error {
122+
for node, edges := range workflow.successors {
123+
uniqueEdges := make(map[Node]struct{})
124+
for _, edge := range edges {
125+
if _, ok := uniqueEdges[edge.To]; ok {
126+
return fmt.Errorf("%w: from %q to %q", ErrDuplicateEdge, node.Name(), edge.To.Name())
127+
}
128+
uniqueEdges[edge.To] = struct{}{}
129+
}
130+
}
131+
return nil
132+
}
133+
110134
// validateDefaultRoute checks that there are no multiple default routes for one node.
111-
func validateDefaultRoute(workflow *Workflow) error {
112-
for node, edges := range workflow.graph.successors {
135+
func validateDefaultRoute(workflow *graph) error {
136+
for node, edges := range workflow.successors {
113137
hasDefault := false
114138
for _, edge := range edges {
115139
if edge.Route == Default && !hasDefault {
@@ -127,7 +151,7 @@ func validateDefaultRoute(workflow *Workflow) error {
127151
// for cycles where all edges in the cycle have nil routes.
128152
// Default routes (where Route == Default) are treated as conditional edges
129153
// and are ignored during unconditional cycle detection.
130-
func validateCycles(workflow *Workflow) error {
154+
func validateCycles(workflow *graph) error {
131155
visited := make(map[Node]struct{})
132156

133157
var traverse func(n Node, inStack map[Node]struct{}) error
@@ -143,7 +167,7 @@ func validateCycles(workflow *Workflow) error {
143167
inStack[n] = struct{}{}
144168
visited[n] = struct{}{}
145169

146-
for _, edge := range workflow.graph.successors[n] {
170+
for _, edge := range workflow.successors[n] {
147171
if edge.Route == nil {
148172
if err := traverse(edge.To, inStack); err != nil {
149173
return err
@@ -155,7 +179,7 @@ func validateCycles(workflow *Workflow) error {
155179
return nil
156180
}
157181

158-
for node := range workflow.graph.successors {
182+
for node := range workflow.successors {
159183
if _, ok := visited[node]; !ok {
160184
inStack := make(map[Node]struct{})
161185
if err := traverse(node, inStack); err != nil {

workflow/validation_test.go

Lines changed: 46 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -155,6 +155,50 @@ func TestStartNodeNoIncomingEdges(t *testing.T) {
155155
}
156156
}
157157

158+
func TestDuplicateEdges(t *testing.T) {
159+
nodeA := &dummyNode{name: "A"}
160+
nodeB := &dummyNode{name: "B"}
161+
tests := []struct {
162+
name string
163+
edges []Edge
164+
expectErr bool
165+
}{
166+
{
167+
name: "no duplicate edges",
168+
edges: []Edge{{From: nodeA, To: nodeB}},
169+
},
170+
{
171+
name: "duplicate edges",
172+
edges: []Edge{{From: nodeA, To: nodeB}, {From: nodeA, To: nodeB}},
173+
expectErr: true,
174+
},
175+
{
176+
name: "duplicate edges with different routes",
177+
edges: []Edge{{From: nodeA, To: nodeB, Route: StringRoute("test1")}, {From: nodeA, To: nodeB, Route: StringRoute("test2")}},
178+
expectErr: true,
179+
},
180+
{
181+
name: "duplicate edges one without route",
182+
edges: []Edge{{From: nodeA, To: nodeB, Route: StringRoute("test1")}, {From: nodeA, To: nodeB}},
183+
expectErr: true,
184+
},
185+
{
186+
name: "empty edges",
187+
edges: []Edge{},
188+
},
189+
}
190+
191+
for _, tc := range tests {
192+
t.Run(tc.name, func(t *testing.T) {
193+
if err := validateUniqueEdges(newGraph(tc.edges)); err != nil && !tc.expectErr {
194+
t.Errorf("got an error %v, expected none", err)
195+
} else if err == nil && tc.expectErr {
196+
t.Errorf("expected an error, got none")
197+
}
198+
})
199+
}
200+
}
201+
158202
func TestDefaultRoute(t *testing.T) {
159203
nodeA := &dummyNode{name: "A"}
160204
nodeB := &dummyNode{name: "B"}
@@ -177,7 +221,7 @@ func TestDefaultRoute(t *testing.T) {
177221

178222
for _, tc := range tests {
179223
t.Run(tc.name, func(t *testing.T) {
180-
if err := validateDefaultRoute(&Workflow{graph: newGraph(tc.edges)}); !errors.Is(err, tc.expectErr) {
224+
if err := validateDefaultRoute(newGraph(tc.edges)); !errors.Is(err, tc.expectErr) {
181225
t.Errorf("got %v, expected %v", err, tc.expectErr)
182226
}
183227
})
@@ -272,9 +316,7 @@ func TestValidateCycles(t *testing.T) {
272316

273317
for _, tc := range tests {
274318
t.Run(tc.name, func(t *testing.T) {
275-
w := &Workflow{graph: newGraph(tc.edges())}
276-
277-
err := validateCycles(w)
319+
err := validateCycles(newGraph(tc.edges()))
278320
if tc.expectErr && err == nil {
279321
t.Errorf("expected error, got none")
280322
} else if !tc.expectErr && err != nil {

workflow/workflow.go

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -120,7 +120,11 @@ func New(edges []Edge) (*Workflow, error) {
120120
if err := validateNodes(edges); err != nil {
121121
return nil, err
122122
}
123-
return &Workflow{graph: newGraph(edges)}, nil
123+
graph := newGraph(edges)
124+
if err := validateWorkflow(graph); err != nil {
125+
return nil, err
126+
}
127+
return &Workflow{graph: graph}, nil
124128
}
125129

126130
// Run drives the workflow to completion (or to a graceful pause in

workflow/workflow_test.go

Lines changed: 4 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -426,19 +426,15 @@ func TestWorkflowRouting(t *testing.T) {
426426
expectedExec: nil,
427427
},
428428
{
429-
name: "duplicate edges to same node",
430-
startRoutes: []string{"branchA"},
429+
name: "MultiRoute with multiple matching routes",
430+
startRoutes: []string{"branchA", "branchB"},
431431
edges: func(x *CustomRouteNode, a, b *FunctionNode, c *CustomRouteNode, d *FunctionNode) []Edge {
432432
return []Edge{
433433
{From: Start, To: x},
434-
{From: x, To: a},
435-
{From: x, To: a, Route: StringRoute("branchA")},
436-
{From: x, To: b},
437-
{From: x, To: c},
438-
{From: c, To: d},
434+
{From: x, To: a, Route: MultiRoute[string]{"branchA", "branchB"}},
439435
}
440436
},
441-
expectedExec: []string{"A", "B", "C", "D"},
437+
expectedExec: []string{"A"},
442438
},
443439
}
444440

0 commit comments

Comments
 (0)