Skip to content

Commit 4294618

Browse files
committed
Requirements selecting runner
1 parent 660e58a commit 4294618

2 files changed

Lines changed: 337 additions & 0 deletions

File tree

Lines changed: 84 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,84 @@
1+
package host
2+
3+
import (
4+
"context"
5+
"errors"
6+
"fmt"
7+
"sync"
8+
"sync/atomic"
9+
10+
"github.com/smartcontractkit/chainlink-protos/cre/go/sdk"
11+
)
12+
13+
type ModuleAndHandler struct {
14+
ModuleV2
15+
RequirementsHandler
16+
}
17+
18+
func NewRequirementSelectingModule(moduleAndHandlers []ModuleAndHandler) ModuleV2 {
19+
return &requirementSelectingModule{
20+
moduleAndHandler: moduleAndHandlers,
21+
runOn: -1,
22+
}
23+
}
24+
25+
type requirementSelectingModule struct {
26+
moduleAndHandler []ModuleAndHandler
27+
runOn int
28+
started atomic.Bool
29+
findMutex sync.Mutex
30+
}
31+
32+
func (r *requirementSelectingModule) Start() {
33+
r.started.Store(true)
34+
r.moduleAndHandler[0].Start()
35+
}
36+
37+
func (r *requirementSelectingModule) Close() {
38+
r.findMutex.Lock()
39+
defer r.findMutex.Unlock()
40+
if r.runOn == -1 {
41+
r.moduleAndHandler[0].Close()
42+
} else {
43+
r.moduleAndHandler[r.runOn].Close()
44+
}
45+
}
46+
47+
func (r *requirementSelectingModule) IsLegacyDAG() bool {
48+
return r.moduleAndHandler[0].IsLegacyDAG()
49+
}
50+
51+
func (r *requirementSelectingModule) Execute(ctx context.Context, request *sdk.ExecuteRequest, handler ExecutionHelper) (*sdk.ExecutionResult, error) {
52+
if r.runOn >= 0 {
53+
return r.moduleAndHandler[r.runOn].Execute(ctx, request, handler)
54+
}
55+
56+
r.findMutex.Lock()
57+
defer r.findMutex.Unlock()
58+
result, err := r.moduleAndHandler[0].Execute(ctx, request, handler)
59+
if err == nil {
60+
r.runOn = 0
61+
return result, nil
62+
}
63+
64+
rerun := &RequirementsRerun{}
65+
if !errors.As(err, &rerun) {
66+
return nil, err
67+
}
68+
69+
numHandlers := len(r.moduleAndHandler)
70+
for i := 1; i < numHandlers; i++ {
71+
item := r.moduleAndHandler[i]
72+
if CheckRequirements(item.RequirementsHandler, (*sdk.Requirements)(rerun)) {
73+
r.runOn = i
74+
if r.started.Load() {
75+
item.Start()
76+
}
77+
return item.Execute(ctx, request, handler)
78+
}
79+
}
80+
81+
return nil, fmt.Errorf("cannot find a runner that can satisfy the requirements %+v\n", rerun)
82+
}
83+
84+
var _ ModuleV2 = &requirementSelectingModule{}
Lines changed: 253 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,253 @@
1+
package host
2+
3+
import (
4+
"context"
5+
"testing"
6+
7+
"github.com/stretchr/testify/assert"
8+
"github.com/stretchr/testify/require"
9+
10+
"github.com/smartcontractkit/chainlink-protos/cre/go/sdk"
11+
)
12+
13+
type stubModuleV2 struct {
14+
startFn func()
15+
closeFn func()
16+
legacyFn func() bool
17+
executeFn func(context.Context, *sdk.ExecuteRequest, ExecutionHelper) (*sdk.ExecutionResult, error)
18+
}
19+
20+
func (s *stubModuleV2) Start() { s.startFn() }
21+
func (s *stubModuleV2) Close() { s.closeFn() }
22+
func (s *stubModuleV2) IsLegacyDAG() bool { return s.legacyFn() }
23+
func (s *stubModuleV2) Execute(ctx context.Context, req *sdk.ExecuteRequest, h ExecutionHelper) (*sdk.ExecutionResult, error) {
24+
return s.executeFn(ctx, req, h)
25+
}
26+
27+
func TestRequirementSelectingModule_Start(t *testing.T) {
28+
var started bool
29+
m0 := &stubModuleV2{startFn: func() { started = true }}
30+
m := NewRequirementSelectingModule([]ModuleAndHandler{{ModuleV2: m0}})
31+
m.Start()
32+
assert.True(t, started)
33+
}
34+
35+
func TestRequirementSelectingModule_Close(t *testing.T) {
36+
t.Run("before execute closes first module", func(t *testing.T) {
37+
var closedIdx int
38+
m0 := &stubModuleV2{closeFn: func() { closedIdx = 0 }}
39+
m1 := &stubModuleV2{closeFn: func() { closedIdx = 1 }}
40+
m := NewRequirementSelectingModule([]ModuleAndHandler{
41+
{ModuleV2: m0},
42+
{ModuleV2: m1},
43+
})
44+
closedIdx = -1
45+
m.Close()
46+
assert.Equal(t, 0, closedIdx)
47+
})
48+
49+
t.Run("after execute closes selected module", func(t *testing.T) {
50+
wantResult := &sdk.ExecutionResult{}
51+
rerunErr := &RequirementsRerun{Tee: &sdk.Tee{}}
52+
var closedIdx int
53+
54+
m0 := &stubModuleV2{
55+
startFn: func() {},
56+
closeFn: func() { closedIdx = 0 },
57+
executeFn: func(context.Context, *sdk.ExecuteRequest, ExecutionHelper) (*sdk.ExecutionResult, error) {
58+
return nil, rerunErr
59+
},
60+
}
61+
m1 := &stubModuleV2{
62+
closeFn: func() { closedIdx = 1 },
63+
executeFn: func(context.Context, *sdk.ExecuteRequest, ExecutionHelper) (*sdk.ExecutionResult, error) {
64+
return wantResult, nil
65+
},
66+
}
67+
68+
m := NewRequirementSelectingModule([]ModuleAndHandler{
69+
{ModuleV2: m0},
70+
{ModuleV2: m1, RequirementsHandler: RequirementsHandler{Tee: func(*sdk.Tee) bool { return true }}},
71+
})
72+
73+
_, err := m.Execute(t.Context(), &sdk.ExecuteRequest{}, nil)
74+
require.NoError(t, err)
75+
76+
closedIdx = -1
77+
m.Close()
78+
assert.Equal(t, 1, closedIdx)
79+
})
80+
}
81+
82+
func TestRequirementSelectingModule_IsLegacyDAG(t *testing.T) {
83+
t.Run("delegates", func(t *testing.T) {
84+
m0 := &stubModuleV2{legacyFn: func() bool { return true }}
85+
m := NewRequirementSelectingModule([]ModuleAndHandler{{ModuleV2: m0}})
86+
assert.True(t, m.IsLegacyDAG())
87+
})
88+
}
89+
90+
func TestRequirementSelectingModule_Execute(t *testing.T) {
91+
t.Run("delegates when runOn already set", func(t *testing.T) {
92+
calls := 0
93+
wantResult := &sdk.ExecutionResult{}
94+
m0 := &stubModuleV2{
95+
executeFn: func(context.Context, *sdk.ExecuteRequest, ExecutionHelper) (*sdk.ExecutionResult, error) {
96+
calls++
97+
return wantResult, nil
98+
},
99+
}
100+
101+
m := NewRequirementSelectingModule([]ModuleAndHandler{{ModuleV2: m0}})
102+
103+
_, err := m.Execute(t.Context(), &sdk.ExecuteRequest{}, nil)
104+
require.NoError(t, err)
105+
assert.Equal(t, 1, calls)
106+
107+
got, err := m.Execute(t.Context(), &sdk.ExecuteRequest{}, nil)
108+
require.NoError(t, err)
109+
assert.Equal(t, wantResult, got)
110+
assert.Equal(t, 2, calls)
111+
})
112+
113+
t.Run("first module succeeds sets runOn to zero", func(t *testing.T) {
114+
wantResult := &sdk.ExecutionResult{}
115+
numCalls := 0
116+
m0 := &stubModuleV2{
117+
executeFn: func(context.Context, *sdk.ExecuteRequest, ExecutionHelper) (*sdk.ExecutionResult, error) {
118+
numCalls++
119+
return wantResult, nil
120+
},
121+
}
122+
123+
m := NewRequirementSelectingModule([]ModuleAndHandler{{ModuleV2: m0}})
124+
125+
got, err := m.Execute(t.Context(), &sdk.ExecuteRequest{}, nil)
126+
require.NoError(t, err)
127+
assert.Equal(t, 1, numCalls)
128+
assert.Equal(t, wantResult, got)
129+
})
130+
131+
t.Run("non-RequirementsRerun error is propagated without additional executions", func(t *testing.T) {
132+
m0 := &stubModuleV2{
133+
executeFn: func(context.Context, *sdk.ExecuteRequest, ExecutionHelper) (*sdk.ExecutionResult, error) {
134+
return nil, assert.AnError
135+
},
136+
}
137+
138+
m1 := &stubModuleV2{executeFn: func(context.Context, *sdk.ExecuteRequest, ExecutionHelper) (*sdk.ExecutionResult, error) {
139+
assert.Fail(t, "second module should not be executed")
140+
return nil, nil
141+
}}
142+
143+
m := NewRequirementSelectingModule([]ModuleAndHandler{{ModuleV2: m0}, {ModuleV2: m1}})
144+
145+
_, err := m.Execute(t.Context(), &sdk.ExecuteRequest{}, nil)
146+
assert.ErrorIs(t, err, assert.AnError)
147+
})
148+
149+
t.Run("RequirementsRerun with matching handler not started", func(t *testing.T) {
150+
rerunErr := &RequirementsRerun{Tee: &sdk.Tee{}}
151+
wantResult := &sdk.ExecutionResult{}
152+
var m1Started bool
153+
154+
m0 := &stubModuleV2{
155+
executeFn: func(context.Context, *sdk.ExecuteRequest, ExecutionHelper) (*sdk.ExecutionResult, error) {
156+
return nil, rerunErr
157+
},
158+
}
159+
m1 := &stubModuleV2{
160+
startFn: func() { m1Started = true },
161+
executeFn: func(context.Context, *sdk.ExecuteRequest, ExecutionHelper) (*sdk.ExecutionResult, error) {
162+
return wantResult, nil
163+
},
164+
}
165+
166+
m := NewRequirementSelectingModule([]ModuleAndHandler{
167+
{ModuleV2: m0},
168+
{ModuleV2: m1, RequirementsHandler: RequirementsHandler{Tee: func(*sdk.Tee) bool { return true }}},
169+
})
170+
171+
got, err := m.Execute(t.Context(), &sdk.ExecuteRequest{}, nil)
172+
require.NoError(t, err)
173+
assert.Equal(t, wantResult, got)
174+
assert.False(t, m1Started)
175+
})
176+
177+
t.Run("RequirementsRerun with matching handler already started", func(t *testing.T) {
178+
rerunErr := &RequirementsRerun{Tee: &sdk.Tee{}}
179+
wantResult := &sdk.ExecutionResult{}
180+
var m1Started bool
181+
182+
m0 := &stubModuleV2{
183+
startFn: func() {},
184+
executeFn: func(context.Context, *sdk.ExecuteRequest, ExecutionHelper) (*sdk.ExecutionResult, error) {
185+
return nil, rerunErr
186+
},
187+
}
188+
m1 := &stubModuleV2{
189+
startFn: func() { m1Started = true },
190+
executeFn: func(context.Context, *sdk.ExecuteRequest, ExecutionHelper) (*sdk.ExecutionResult, error) {
191+
return wantResult, nil
192+
},
193+
}
194+
195+
m := NewRequirementSelectingModule([]ModuleAndHandler{
196+
{ModuleV2: m0},
197+
{ModuleV2: m1, RequirementsHandler: RequirementsHandler{Tee: func(*sdk.Tee) bool { return true }}},
198+
})
199+
200+
m.Start()
201+
202+
got, err := m.Execute(t.Context(), &sdk.ExecuteRequest{}, nil)
203+
require.NoError(t, err)
204+
assert.Equal(t, wantResult, got)
205+
assert.True(t, m1Started)
206+
})
207+
208+
t.Run("RequirementsRerun with no matching handler returns error", func(t *testing.T) {
209+
rerunErr := &RequirementsRerun{Tee: &sdk.Tee{}}
210+
m0 := &stubModuleV2{
211+
executeFn: func(context.Context, *sdk.ExecuteRequest, ExecutionHelper) (*sdk.ExecutionResult, error) {
212+
return nil, rerunErr
213+
},
214+
}
215+
m1 := &stubModuleV2{}
216+
217+
m := NewRequirementSelectingModule([]ModuleAndHandler{
218+
{ModuleV2: m0},
219+
{ModuleV2: m1, RequirementsHandler: RequirementsHandler{Tee: func(*sdk.Tee) bool { return false }}},
220+
})
221+
222+
_, err := m.Execute(t.Context(), &sdk.ExecuteRequest{}, nil)
223+
require.Error(t, err)
224+
assert.Contains(t, err.Error(), "cannot find a runner that can satisfy the requirements")
225+
})
226+
227+
t.Run("RequirementsRerun skips non-matching selects later match", func(t *testing.T) {
228+
rerunErr := &RequirementsRerun{Tee: &sdk.Tee{}}
229+
wantResult := &sdk.ExecutionResult{}
230+
231+
m0 := &stubModuleV2{
232+
executeFn: func(context.Context, *sdk.ExecuteRequest, ExecutionHelper) (*sdk.ExecutionResult, error) {
233+
return nil, rerunErr
234+
},
235+
}
236+
m1 := &stubModuleV2{}
237+
m2 := &stubModuleV2{
238+
executeFn: func(context.Context, *sdk.ExecuteRequest, ExecutionHelper) (*sdk.ExecutionResult, error) {
239+
return wantResult, nil
240+
},
241+
}
242+
243+
m := NewRequirementSelectingModule([]ModuleAndHandler{
244+
{ModuleV2: m0},
245+
{ModuleV2: m1, RequirementsHandler: RequirementsHandler{Tee: func(*sdk.Tee) bool { return false }}},
246+
{ModuleV2: m2, RequirementsHandler: RequirementsHandler{Tee: func(*sdk.Tee) bool { return true }}},
247+
})
248+
249+
got, err := m.Execute(t.Context(), &sdk.ExecuteRequest{}, nil)
250+
require.NoError(t, err)
251+
assert.Equal(t, wantResult, got)
252+
})
253+
}

0 commit comments

Comments
 (0)