Skip to content

Commit 5bb8eb8

Browse files
committed
[CRE] [5/5] Wire confidential workflow execution into CRE
Wires PRs 1-4 together and adds system test support: - cre.go: start confidential relay service when config is enabled - models.go: Attributes field on WorkflowSpec - System tests: ConfidentialRelay feature plugin, workflow registration with attributes, CompileAndDeployConfidentialWorkflow helper This PR includes code from PRs 1-4 for compilation. As those merge, rebasing this branch will shrink the diff to just the wiring. Depends on all of: #21635 PRs 1-4 Part of #21635
1 parent 07003ac commit 5bb8eb8

26 files changed

Lines changed: 430 additions & 236 deletions

File tree

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,5 @@
1+
---
2+
"chainlink": minor
3+
---
4+
5+
Add confidential workflow execution: ConfidentialModule, relay handler, gateway wiring, single-DON capability fix #added #db_update

core/capabilities/launcher.go

Lines changed: 26 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -46,7 +46,7 @@ var defaultStreamConfig = p2ptypes.StreamConfig{
4646

4747
type launcher struct {
4848
services.StateMachine
49-
lggr logger.SugaredLogger
49+
lggr logger.Logger
5050
myPeerID p2ptypes.PeerID
5151
peerWrapper p2ptypes.PeerWrapper
5252
dispatcher remotetypes.Dispatcher
@@ -106,7 +106,7 @@ func NewLauncher(
106106
return nil, fmt.Errorf("failed to create launcher metrics: %w", err)
107107
}
108108
return &launcher{
109-
lggr: logger.Sugared(lggr).Named("CapabilitiesLauncher"),
109+
lggr: logger.Named(lggr, "CapabilitiesLauncher"),
110110
peerWrapper: peerWrapper,
111111
dispatcher: dispatcher,
112112
cachedShims: cachedShims{
@@ -342,12 +342,9 @@ func (w *launcher) OnNewRegistry(ctx context.Context, localRegistry *registrysyn
342342
for family := range myDONFamiliesSet {
343343
myDONFamilies = append(myDONFamilies, family)
344344
}
345-
w.lggr.Debugw("Found my DON families", "count", len(myDONFamilies))
346-
w.lggr.Tracew("My DON families", "myDONFamilies", myDONFamilies)
347-
w.lggr.Debugw("Found my workflow DONs", "count", len(myWorkflowDONs))
348-
w.lggr.Tracew("My workflow DONs", "myWorkflowDONs", myWorkflowDONs)
349-
w.lggr.Debugw("Found all remote workflow DONs", "count", len(remoteWorkflowDONs))
350-
w.lggr.Tracew("All remote workflow DONs", "remoteWorkflowDONs", remoteWorkflowDONs)
345+
w.lggr.Debugw("Found my DON families", "count", len(myDONFamilies), "myDONFamilies", myDONFamilies)
346+
w.lggr.Debugw("Found my workflow DONs", "count", len(myWorkflowDONs), "myWorkflowDONs", myWorkflowDONs)
347+
w.lggr.Debugw("Found all remote workflow DONs", "count", len(remoteWorkflowDONs), "remoteWorkflowDONs", remoteWorkflowDONs)
351348

352349
// Capability DONs (with IsPublic = true) the current node is a part of.
353350
// These need server-side shims to expose my own capabilities externally.
@@ -362,18 +359,14 @@ func (w *launcher) OnNewRegistry(ctx context.Context, localRegistry *registrysyn
362359
}
363360
}
364361
}
365-
w.lggr.Debugw("Found my capability DONs", "count", len(myCapabilityDONs))
366-
w.lggr.Tracew("My capability DONs", "myCapabilityDONs", myCapabilityDONs)
367-
w.lggr.Debugw("Found all remote capability DONs", "count", len(remoteCapabilityDONs))
368-
w.lggr.Tracew("All remote capability DONs", "remoteCapabilityDONs", remoteCapabilityDONs)
362+
w.lggr.Debugw("Found my capability DONs", "count", len(myCapabilityDONs), "myCapabilityDONs", myCapabilityDONs)
363+
w.lggr.Debugw("Found all remote capability DONs", "count", len(remoteCapabilityDONs), "remoteCapabilityDONs", remoteCapabilityDONs)
369364

370365
if len(myDONFamilies) > 0 {
371366
remoteWorkflowDONs = filterDONsByFamilies(remoteWorkflowDONs, myDONFamilies)
372367
remoteCapabilityDONs = filterDONsByFamilies(remoteCapabilityDONs, myDONFamilies)
373-
w.lggr.Debugw("Filtered remote workflow DONs to my families", "count", len(remoteWorkflowDONs))
374-
w.lggr.Tracew("Filtered remote workflow DONs to my families", "remoteWorkflowDONs", remoteWorkflowDONs)
375-
w.lggr.Debugw("Filtered remote capability DONs to my families", "count", len(remoteCapabilityDONs))
376-
w.lggr.Tracew("Filtered remote capability DONs to my families", "remoteCapabilityDONs", remoteCapabilityDONs)
368+
w.lggr.Debugw("Filtered remote workflow DONs to my families", "count", len(remoteWorkflowDONs), "remoteWorkflowDONs", remoteWorkflowDONs)
369+
w.lggr.Debugw("Filtered remote capability DONs to my families", "count", len(remoteCapabilityDONs), "remoteCapabilityDONs", remoteCapabilityDONs)
377370
} else {
378371
// legacy / Keystone setting
379372
w.lggr.Debug("My node doesn't belong to any DON families. No filtering will be applied.")
@@ -408,8 +401,23 @@ func (w *launcher) OnNewRegistry(ctx context.Context, localRegistry *registrysyn
408401

409402
belongsToACapabilityDON := len(myCapabilityDONs) > 0
410403
if belongsToACapabilityDON {
404+
// Include both remote workflow DONs and the node's own workflow DONs.
405+
// In single-DON topologies (e.g. local CRE), the same DON is both a
406+
// workflow DON and a capability DON, so remoteWorkflowDONs is empty.
407+
// Without including myWorkflowDONs, capabilities fail to serve with
408+
// "empty workflowDONs provided".
409+
allWorkflowDONs := make([]registrysyncer.DON, 0, len(remoteWorkflowDONs)+len(myWorkflowDONs))
410+
allWorkflowDONs = append(allWorkflowDONs, remoteWorkflowDONs...)
411+
allWorkflowDONs = append(allWorkflowDONs, myWorkflowDONs...)
411412
for _, myDON := range myCapabilityDONs {
412-
w.serveCapabilities(ctx, w.myPeerID, myDON, localRegistry, remoteWorkflowDONs)
413+
w.serveCapabilities(ctx, w.myPeerID, myDON, localRegistry, allWorkflowDONs)
414+
415+
// Capability DONs also need remote capabilities (e.g. relay DON
416+
// needs vault for secret fetching). Without this, only workflow
417+
// DONs discover cross-DON capabilities.
418+
for _, rcd := range remoteCapabilityDONs {
419+
w.addRemoteCapabilities(ctx, myDON, rcd, localRegistry)
420+
}
413421
}
414422
}
415423

@@ -425,7 +433,7 @@ func (w *launcher) OnNewRegistry(ctx context.Context, localRegistry *registrysyn
425433
}
426434
if w.don2donSharedPeer != nil {
427435
donPairs := w.donPairsToUpdate(w.myPeerID, localRegistry)
428-
err := w.don2donSharedPeer.UpdateConnectionsByDONs(ctx, donPairs, w.p2pStreamConfig)
436+
err := w.don2donSharedPeer.UpdateConnectionsByDONs(ctx, donPairs, defaultStreamConfig)
429437
if err != nil {
430438
return fmt.Errorf("failed to update peer connections: %w", err)
431439
}

core/capabilities/launcher_test.go

Lines changed: 68 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -12,7 +12,6 @@ import (
1212
"google.golang.org/protobuf/proto"
1313
"google.golang.org/protobuf/types/known/durationpb"
1414

15-
"github.com/smartcontractkit/libocr/ragep2p"
1615
ragetypes "github.com/smartcontractkit/libocr/ragep2p/types"
1716

1817
"github.com/smartcontractkit/chainlink-common/pkg/capabilities"
@@ -234,6 +233,73 @@ func TestLauncher(t *testing.T) {
234233
assert.Equal(t, 1, observedLogs.FilterMessage("failed to serve capability").Len())
235234
})
236235

236+
t.Run("OK-single_DON_serves_capabilities", func(t *testing.T) {
237+
// Regression test: in single-DON topologies (e.g. local CRE), the same
238+
// DON is both a workflow DON and a capability DON. The DON appears in
239+
// myWorkflowDONs (not remoteWorkflowDONs). Previously, serveCapabilities
240+
// only received remoteWorkflowDONs, causing executable.Server.SetConfig
241+
// to fail with "empty workflowDONs provided".
242+
lggr, observedLogs := logger.TestObserved(t, zapcore.DebugLevel)
243+
registry := NewRegistry(lggr)
244+
dispatcher := remoteMocks.NewDispatcher(t)
245+
246+
nodes := newNodes(4)
247+
peer := mocks.NewPeer(t)
248+
peer.On("UpdateConnections", mock.Anything).Return(nil)
249+
peer.On("ID").Return(nodes[0])
250+
peer.On("IsBootstrap").Return(false)
251+
wrapper := mocks.NewPeerWrapper(t)
252+
wrapper.On("GetPeer").Return(peer)
253+
254+
fullTriggerCapID := "streams-trigger@1.0.0"
255+
mt := newMockTrigger(capabilities.MustNewCapabilityInfo(
256+
fullTriggerCapID,
257+
capabilities.CapabilityTypeTrigger,
258+
"streams trigger",
259+
))
260+
require.NoError(t, registry.Add(t.Context(), mt))
261+
262+
fullTargetID := "write-chain_evm_1@1.0.0"
263+
mtarg := &mockCapability{
264+
CapabilityInfo: capabilities.MustNewCapabilityInfo(
265+
fullTargetID,
266+
capabilities.CapabilityTypeTarget,
267+
"write chain",
268+
),
269+
}
270+
require.NoError(t, registry.Add(t.Context(), mtarg))
271+
272+
triggerCapID := RandomUTF8BytesWord()
273+
targetCapID := RandomUTF8BytesWord()
274+
275+
// Single DON: acceptsWorkflows=true AND has capability configurations.
276+
// This puts it in both myWorkflowDONs and myCapabilityDONs.
277+
dID := uint32(1)
278+
localRegistry := buildLocalRegistry()
279+
addDON(localRegistry, dID, uint32(0), uint8(1), true, true, nodes, []string{"zone-a"}, 1, [][32]byte{triggerCapID, targetCapID})
280+
addCapabilityToDON(localRegistry, dID, fullTriggerCapID, capabilities.CapabilityTypeTrigger, nil)
281+
addCapabilityToDON(localRegistry, dID, fullTargetID, capabilities.CapabilityTypeTarget, nil)
282+
283+
launcher, err := NewLauncher(
284+
lggr,
285+
wrapper,
286+
nil,
287+
nil,
288+
dispatcher,
289+
registry,
290+
&mockDonNotifier{},
291+
)
292+
require.NoError(t, err)
293+
require.NoError(t, launcher.Start(t.Context()))
294+
defer launcher.Close()
295+
296+
dispatcher.On("SetReceiver", fullTriggerCapID, dID, mock.AnythingOfType("*remote.triggerPublisher")).Return(nil)
297+
dispatcher.On("SetReceiver", fullTargetID, dID, mock.AnythingOfType("*executable.server")).Return(nil)
298+
299+
require.NoError(t, launcher.OnNewRegistry(t.Context(), localRegistry))
300+
assert.Equal(t, 0, observedLogs.FilterMessage("failed to serve capability").Len())
301+
})
302+
237303
t.Run("start and close with nil peer wrapper", func(t *testing.T) {
238304
lggr := logger.Test(t)
239305
registry := NewRegistry(lggr)
@@ -893,24 +959,10 @@ func TestLauncher_V2CapabilitiesAddViaCombinedClient(t *testing.T) {
893959
addCapabilityToDON(localRegistry, zoneBDonID, fullExecutableCapID, capabilities.CapabilityTypeTarget, execCfg)
894960
addCapabilityToDON(localRegistry, capDonID, fullLocalCapID, capabilities.CapabilityTypeAction, localCfg) // should be skipped
895961

896-
customStreamConfig := p2ptypes.StreamConfig{
897-
IncomingMessageBufferSize: 999,
898-
OutgoingMessageBufferSize: 888,
899-
MaxMessageLenBytes: 777777,
900-
MessageRateLimiter: ragep2p.TokenBucketParams{
901-
Rate: 50.0,
902-
Capacity: 250,
903-
},
904-
BytesRateLimiter: ragep2p.TokenBucketParams{
905-
Rate: 2500000.0,
906-
Capacity: 5000000,
907-
},
908-
}
909-
910962
sharedPeer := mocks.NewSharedPeer(t)
911963
sharedPeer.On("ID").Return(workflowDonNodes[0])
912964
sharedPeer.On("IsBootstrap").Return(false)
913-
sharedPeer.On("UpdateConnectionsByDONs", mock.Anything, mock.Anything, customStreamConfig).Return(nil)
965+
sharedPeer.On("UpdateConnectionsByDONs", mock.Anything, mock.Anything, mock.Anything).Return(nil)
914966

915967
launcher, err := NewLauncher(
916968
lggr,
@@ -922,7 +974,6 @@ func TestLauncher_V2CapabilitiesAddViaCombinedClient(t *testing.T) {
922974
&mockDonNotifier{},
923975
)
924976
require.NoError(t, err)
925-
launcher.p2pStreamConfig = customStreamConfig
926977
servicetest.Run(t, launcher)
927978

928979
dispatcher.On("SetReceiverForMethod", fullTriggerCapID, capDonID, "StreamsTrigger", mock.AnythingOfType("*remote.triggerSubscriber")).Return(nil)

core/scripts/cre/environment/environment/workflow.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -475,7 +475,7 @@ func deployWorkflow(
475475

476476
fmt.Printf("\n⚙️ Registering workflow '%s' with the workflow registry\n\n", workflowNameFlag)
477477

478-
workflowID, registerErr := creworkflow.RegisterWithContract(ctx, sethClient, common.HexToAddress(workflowRegistryAddress), workflowRegistryVersion, uint64(donIDFlag), workflowNameFlag, "file://"+wasmWorkflowFilePathFlag, configPath, secretsPath, &containerTargetDirFlag)
478+
workflowID, registerErr := creworkflow.RegisterWithContract(ctx, sethClient, common.HexToAddress(workflowRegistryAddress), workflowRegistryVersion, uint64(donIDFlag), workflowNameFlag, "file://"+wasmWorkflowFilePathFlag, configPath, secretsPath, nil, &containerTargetDirFlag)
479479
if registerErr != nil {
480480
return errors.Wrapf(registerErr, "❌ failed to register workflow %s", workflowNameFlag)
481481
}

core/scripts/go.mod

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -46,13 +46,13 @@ require (
4646
github.com/shopspring/decimal v1.4.0
4747
github.com/smartcontractkit/chainlink-automation v0.8.1
4848
github.com/smartcontractkit/chainlink-ccip v0.1.1-solana.0.20260317185256-d5f7db87ae70
49-
github.com/smartcontractkit/chainlink-common v0.11.0
49+
github.com/smartcontractkit/chainlink-common v0.11.1-0.20260323163826-2c5b95089478
5050
github.com/smartcontractkit/chainlink-common/keystore v1.0.2
5151
github.com/smartcontractkit/chainlink-data-streams v0.1.13
5252
github.com/smartcontractkit/chainlink-deployments-framework v0.86.3
5353
github.com/smartcontractkit/chainlink-evm v0.3.4-0.20260320152158-2191d797b5ce
5454
github.com/smartcontractkit/chainlink-evm/gethwrappers v0.0.0-20260119171452-39c98c3b33cd
55-
github.com/smartcontractkit/chainlink-protos/cre/go v0.0.0-20260226130359-963f935e0396
55+
github.com/smartcontractkit/chainlink-protos/cre/go v0.0.0-20260320153346-314ec8dbe5a4
5656
github.com/smartcontractkit/chainlink-protos/job-distributor v0.18.0
5757
github.com/smartcontractkit/chainlink-testing-framework/framework v0.15.5
5858
github.com/smartcontractkit/chainlink-testing-framework/framework/components/dockercompose v0.1.20
@@ -345,6 +345,7 @@ require (
345345
github.com/hashicorp/yamux v0.1.2 // indirect
346346
github.com/hasura/go-graphql-client v0.15.1 // indirect
347347
github.com/hdevalence/ed25519consensus v0.2.0 // indirect
348+
github.com/hf/nitrite v0.0.0-20241225144000-c2d5d3c4f303 // indirect
348349
github.com/holiman/billy v0.0.0-20250707135307-f2f9b9aae7db // indirect
349350
github.com/holiman/bloomfilter/v2 v2.0.3 // indirect
350351
github.com/holiman/uint256 v1.3.2 // indirect
@@ -493,6 +494,7 @@ require (
493494
github.com/smartcontractkit/chainlink-ccip/deployment v0.0.0-20260317185256-d5f7db87ae70 // indirect
494495
github.com/smartcontractkit/chainlink-ccv v0.0.0-20260320145055-eb20b529ff95 // indirect
495496
github.com/smartcontractkit/chainlink-common/pkg/chipingress v0.0.11-0.20251211140724-319861e514c4 // indirect
497+
github.com/smartcontractkit/chainlink-common/pkg/teeattestation v0.0.0-20260316172927-2c727f64397c // indirect
496498
github.com/smartcontractkit/chainlink-evm/contracts/cre/gobindings v0.0.0-20260107191744-4b93f62cffe3 // indirect
497499
github.com/smartcontractkit/chainlink-feeds v0.1.2-0.20250227211209-7cd000095135 // indirect
498500
github.com/smartcontractkit/chainlink-framework/capabilities v0.0.0-20250818175541-3389ac08a563 // indirect

core/scripts/go.sum

Lines changed: 9 additions & 4 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

core/services/cre/cre.go

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -32,6 +32,7 @@ import (
3232

3333
"github.com/smartcontractkit/chainlink/v2/core/capabilities"
3434
"github.com/smartcontractkit/chainlink/v2/core/capabilities/compute"
35+
"github.com/smartcontractkit/chainlink/v2/core/capabilities/confidentialrelay"
3536
gatewayconnector "github.com/smartcontractkit/chainlink/v2/core/capabilities/gateway_connector"
3637
"github.com/smartcontractkit/chainlink/v2/core/capabilities/localcapmgr"
3738
"github.com/smartcontractkit/chainlink/v2/core/capabilities/remote"
@@ -169,6 +170,17 @@ func (s *Services) newSubservices(
169170
}
170171
s.GatewayConnectorWrapper = gatewayConnectorWrapper
171172
srvs = append(srvs, gatewayConnectorWrapper)
173+
174+
if relayConfig := cfg.CRE().ConfidentialRelay(); relayConfig.Enabled() {
175+
relayService := confidentialrelay.NewService(
176+
gatewayConnectorWrapper,
177+
opts.CapabilitiesRegistry,
178+
[]byte(relayConfig.TrustedPCRs()),
179+
relayConfig.CARootsPEM(),
180+
lggr,
181+
)
182+
srvs = append(srvs, relayService)
183+
}
172184
}
173185

174186
if cfg.CRE().Linking().URL() != "" {

0 commit comments

Comments
 (0)