Skip to content

Commit c52a20d

Browse files
authored
[CRE-1773] LocalCapabilityManager to launch std caps without job specs (#21132)
1 parent 5ae7305 commit c52a20d

13 files changed

Lines changed: 1724 additions & 178 deletions

File tree

core/capabilities/launcher.go

Lines changed: 17 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@ import (
1818
"github.com/smartcontractkit/chainlink-common/pkg/logger"
1919
"github.com/smartcontractkit/chainlink-common/pkg/services"
2020

21+
"github.com/smartcontractkit/chainlink/v2/core/capabilities/localcapmgr"
2122
"github.com/smartcontractkit/chainlink/v2/core/capabilities/remote"
2223
"github.com/smartcontractkit/chainlink/v2/core/capabilities/remote/aggregation"
2324
"github.com/smartcontractkit/chainlink/v2/core/capabilities/remote/executable"
@@ -56,6 +57,7 @@ type launcher struct {
5657
don2donSharedPeer p2ptypes.SharedPeer
5758
p2pStreamConfig p2ptypes.StreamConfig
5859
metrics *launcherMetrics
60+
localCapMgr localcapmgr.LocalCapabilityManager
5961
}
6062

6163
// For V2 capabilities, shims are created once and their config is updated dynamically.
@@ -244,6 +246,11 @@ func (w *launcher) Close() error {
244246
return nil
245247
}
246248

249+
// LocalCapabilityManager is initialized after the Launcher is created
250+
func (w *launcher) SetLocalCapabilityManager(lcm localcapmgr.LocalCapabilityManager) {
251+
w.localCapMgr = lcm
252+
}
253+
247254
func (w *launcher) Ready() error {
248255
return nil
249256
}
@@ -365,6 +372,16 @@ func (w *launcher) OnNewRegistry(ctx context.Context, localRegistry *registrysyn
365372
w.lggr.Debug("My node doesn't belong to any DON families. No filtering will be applied.")
366373
}
367374

375+
// Reconcile local capabilities: start/stop/restart capabilities based on registry state.
376+
if w.localCapMgr != nil {
377+
myDONs := make([]registrysyncer.DON, 0, len(myCapabilityDONs)+len(myWorkflowDONs))
378+
myDONs = append(myDONs, myCapabilityDONs...)
379+
myDONs = append(myDONs, myWorkflowDONs...)
380+
if err := w.localCapMgr.Reconcile(ctx, myDONs); err != nil {
381+
w.lggr.Errorw("Failed to reconcile local capabilities", "error", err)
382+
}
383+
}
384+
368385
belongsToAWorkflowDON := len(myWorkflowDONs) > 0
369386
if belongsToAWorkflowDON {
370387
myDON := myWorkflowDONs[0]

core/capabilities/launcher_test.go

Lines changed: 117 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1121,3 +1121,120 @@ func addCapabilityToDON(registry *registrysyncer.LocalRegistry, donID uint32, ca
11211121
CapabilityType: capabilityType,
11221122
}
11231123
}
1124+
1125+
func TestLauncher_OnNewRegistry_CallsLocalCapabilityManagerReconcile(t *testing.T) {
1126+
lggr := logger.Test(t)
1127+
registry := NewRegistry(lggr)
1128+
dispatcher := remoteMocks.NewDispatcher(t)
1129+
1130+
capabilityDonNodes := newNodes(4)
1131+
peer := mocks.NewPeer(t)
1132+
peer.On("UpdateConnections", mock.Anything).Return(nil)
1133+
peer.On("ID").Return(capabilityDonNodes[0])
1134+
peer.On("IsBootstrap").Return(false)
1135+
wrapper := mocks.NewPeerWrapper(t)
1136+
wrapper.On("GetPeer").Return(peer)
1137+
1138+
fullTriggerCapID := "streams-trigger@1.0.0"
1139+
mt := newMockTrigger(capabilities.MustNewCapabilityInfo(
1140+
fullTriggerCapID,
1141+
capabilities.CapabilityTypeTrigger,
1142+
"streams trigger",
1143+
))
1144+
require.NoError(t, registry.Add(t.Context(), mt))
1145+
1146+
triggerCapIDHash := RandomUTF8BytesWord()
1147+
capDonID := uint32(1)
1148+
1149+
localRegistry := buildLocalRegistry()
1150+
addDON(localRegistry, capDonID, uint32(0), uint8(1), true, false, capabilityDonNodes, []string{"zone-a"}, 1, [][32]byte{triggerCapIDHash})
1151+
addCapabilityToDON(localRegistry, capDonID, fullTriggerCapID, capabilities.CapabilityTypeTrigger, nil)
1152+
1153+
reconcileCalled := make(chan struct{}, 1)
1154+
mockLCM := &mockLocalCapabilityManager{
1155+
reconcileFn: func(ctx context.Context, dons []registrysyncer.DON) error {
1156+
assert.Len(t, dons, 1, "should pass all DONs")
1157+
assert.Equal(t, capDonID, dons[0].ID)
1158+
reconcileCalled <- struct{}{}
1159+
return nil
1160+
},
1161+
}
1162+
1163+
dispatcher.On("SetReceiver", fullTriggerCapID, capDonID, mock.Anything).Return(nil)
1164+
1165+
launcher, err := NewLauncher(
1166+
lggr,
1167+
wrapper,
1168+
nil,
1169+
nil,
1170+
dispatcher,
1171+
registry,
1172+
&mockDonNotifier{},
1173+
)
1174+
require.NoError(t, err)
1175+
launcher.SetLocalCapabilityManager(mockLCM)
1176+
require.NoError(t, launcher.Start(t.Context()))
1177+
defer launcher.Close()
1178+
1179+
err = launcher.OnNewRegistry(t.Context(), localRegistry)
1180+
require.NoError(t, err)
1181+
1182+
select {
1183+
case <-reconcileCalled:
1184+
// success
1185+
default:
1186+
t.Fatal("Reconcile was not called on LocalCapabilityManager")
1187+
}
1188+
}
1189+
1190+
func TestLauncher_OnNewRegistry_NilLocalCapabilityManager(t *testing.T) {
1191+
lggr := logger.Test(t)
1192+
registry := NewRegistry(lggr)
1193+
dispatcher := remoteMocks.NewDispatcher(t)
1194+
1195+
nodes := newNodes(4)
1196+
peer := mocks.NewPeer(t)
1197+
peer.On("UpdateConnections", mock.Anything).Return(nil)
1198+
peer.On("ID").Return(nodes[0])
1199+
peer.On("IsBootstrap").Return(false)
1200+
wrapper := mocks.NewPeerWrapper(t)
1201+
wrapper.On("GetPeer").Return(peer)
1202+
1203+
localRegistry := buildLocalRegistry()
1204+
dID := uint32(1)
1205+
addDON(localRegistry, dID, uint32(0), uint8(1), true, true, nodes, []string{"zone-a"}, 1, nil)
1206+
1207+
// Don't set localCapMgr - should not panic.
1208+
launcher, err := NewLauncher(
1209+
lggr,
1210+
wrapper,
1211+
nil,
1212+
nil,
1213+
dispatcher,
1214+
registry,
1215+
&mockDonNotifier{},
1216+
)
1217+
require.NoError(t, err)
1218+
require.NoError(t, launcher.Start(t.Context()))
1219+
defer launcher.Close()
1220+
1221+
err = launcher.OnNewRegistry(t.Context(), localRegistry)
1222+
require.NoError(t, err)
1223+
}
1224+
1225+
// mockLocalCapabilityManager is a test mock that records calls to Reconcile.
1226+
type mockLocalCapabilityManager struct {
1227+
reconcileFn func(ctx context.Context, dons []registrysyncer.DON) error
1228+
}
1229+
1230+
func (m *mockLocalCapabilityManager) Start(context.Context) error { return nil }
1231+
func (m *mockLocalCapabilityManager) Close() error { return nil }
1232+
func (m *mockLocalCapabilityManager) Ready() error { return nil }
1233+
func (m *mockLocalCapabilityManager) HealthReport() map[string]error { return nil }
1234+
func (m *mockLocalCapabilityManager) Name() string { return "mockLocalCapMgr" }
1235+
func (m *mockLocalCapabilityManager) Reconcile(ctx context.Context, dons []registrysyncer.DON) error {
1236+
if m.reconcileFn != nil {
1237+
return m.reconcileFn(ctx, dons)
1238+
}
1239+
return nil
1240+
}

0 commit comments

Comments
 (0)