Skip to content

Commit 07003ac

Browse files
committed
[CRE] [4/5] ConfidentialModule, config, DB migration, syncer routing
Adds the core abstractions for confidential workflow execution: - ConfidentialModule: implements host.ModuleV2, dispatches workflow execution to TEE enclave via confidential-workflows capability - Syncer routing: detects confidential workflows via on-chain attributes, routes to ConfidentialModule instead of local WASM engine - Config: CRE.ConfidentialRelay TOML config (enabled, trustedPCRs, caRootsPEM) - DB: adds attributes column to workflow_specs_v2 - WorkflowSpec.Attributes field for persisting on-chain attributes Nothing is wired into CRE yet. The routing is inert until PR 5/5. Part of #21635
1 parent 9fbe859 commit 07003ac

17 files changed

Lines changed: 1039 additions & 130 deletions

File tree

core/capabilities/launcher.go

Lines changed: 18 additions & 26 deletions
Original file line numberDiff line numberDiff line change
@@ -46,7 +46,7 @@ var defaultStreamConfig = p2ptypes.StreamConfig{
4646

4747
type launcher struct {
4848
services.StateMachine
49-
lggr logger.Logger
49+
lggr logger.SugaredLogger
5050
myPeerID p2ptypes.PeerID
5151
peerWrapper p2ptypes.PeerWrapper
5252
dispatcher remotetypes.Dispatcher
@@ -106,7 +106,7 @@ func NewLauncher(
106106
return nil, fmt.Errorf("failed to create launcher metrics: %w", err)
107107
}
108108
return &launcher{
109-
lggr: logger.Named(lggr, "CapabilitiesLauncher"),
109+
lggr: logger.Sugared(lggr).Named("CapabilitiesLauncher"),
110110
peerWrapper: peerWrapper,
111111
dispatcher: dispatcher,
112112
cachedShims: cachedShims{
@@ -342,9 +342,12 @@ func (w *launcher) OnNewRegistry(ctx context.Context, localRegistry *registrysyn
342342
for family := range myDONFamiliesSet {
343343
myDONFamilies = append(myDONFamilies, family)
344344
}
345-
w.lggr.Debugw("Found my DON families", "count", len(myDONFamilies), "myDONFamilies", myDONFamilies)
346-
w.lggr.Debugw("Found my workflow DONs", "count", len(myWorkflowDONs), "myWorkflowDONs", myWorkflowDONs)
347-
w.lggr.Debugw("Found all remote workflow DONs", "count", len(remoteWorkflowDONs), "remoteWorkflowDONs", remoteWorkflowDONs)
345+
w.lggr.Debugw("Found my DON families", "count", len(myDONFamilies))
346+
w.lggr.Tracew("My DON families", "myDONFamilies", myDONFamilies)
347+
w.lggr.Debugw("Found my workflow DONs", "count", len(myWorkflowDONs))
348+
w.lggr.Tracew("My workflow DONs", "myWorkflowDONs", myWorkflowDONs)
349+
w.lggr.Debugw("Found all remote workflow DONs", "count", len(remoteWorkflowDONs))
350+
w.lggr.Tracew("All remote workflow DONs", "remoteWorkflowDONs", remoteWorkflowDONs)
348351

349352
// Capability DONs (with IsPublic = true) the current node is a part of.
350353
// These need server-side shims to expose my own capabilities externally.
@@ -359,14 +362,18 @@ func (w *launcher) OnNewRegistry(ctx context.Context, localRegistry *registrysyn
359362
}
360363
}
361364
}
362-
w.lggr.Debugw("Found my capability DONs", "count", len(myCapabilityDONs), "myCapabilityDONs", myCapabilityDONs)
363-
w.lggr.Debugw("Found all remote capability DONs", "count", len(remoteCapabilityDONs), "remoteCapabilityDONs", remoteCapabilityDONs)
365+
w.lggr.Debugw("Found my capability DONs", "count", len(myCapabilityDONs))
366+
w.lggr.Tracew("My capability DONs", "myCapabilityDONs", myCapabilityDONs)
367+
w.lggr.Debugw("Found all remote capability DONs", "count", len(remoteCapabilityDONs))
368+
w.lggr.Tracew("All remote capability DONs", "remoteCapabilityDONs", remoteCapabilityDONs)
364369

365370
if len(myDONFamilies) > 0 {
366371
remoteWorkflowDONs = filterDONsByFamilies(remoteWorkflowDONs, myDONFamilies)
367372
remoteCapabilityDONs = filterDONsByFamilies(remoteCapabilityDONs, myDONFamilies)
368-
w.lggr.Debugw("Filtered remote workflow DONs to my families", "count", len(remoteWorkflowDONs), "remoteWorkflowDONs", remoteWorkflowDONs)
369-
w.lggr.Debugw("Filtered remote capability DONs to my families", "count", len(remoteCapabilityDONs), "remoteCapabilityDONs", remoteCapabilityDONs)
373+
w.lggr.Debugw("Filtered remote workflow DONs to my families", "count", len(remoteWorkflowDONs))
374+
w.lggr.Tracew("Filtered remote workflow DONs to my families", "remoteWorkflowDONs", remoteWorkflowDONs)
375+
w.lggr.Debugw("Filtered remote capability DONs to my families", "count", len(remoteCapabilityDONs))
376+
w.lggr.Tracew("Filtered remote capability DONs to my families", "remoteCapabilityDONs", remoteCapabilityDONs)
370377
} else {
371378
// legacy / Keystone setting
372379
w.lggr.Debug("My node doesn't belong to any DON families. No filtering will be applied.")
@@ -401,23 +408,8 @@ func (w *launcher) OnNewRegistry(ctx context.Context, localRegistry *registrysyn
401408

402409
belongsToACapabilityDON := len(myCapabilityDONs) > 0
403410
if belongsToACapabilityDON {
404-
// Include both remote workflow DONs and the node's own workflow DONs.
405-
// In single-DON topologies (e.g. local CRE), the same DON is both a
406-
// workflow DON and a capability DON, so remoteWorkflowDONs is empty.
407-
// Without including myWorkflowDONs, capabilities fail to serve with
408-
// "empty workflowDONs provided".
409-
allWorkflowDONs := make([]registrysyncer.DON, 0, len(remoteWorkflowDONs)+len(myWorkflowDONs))
410-
allWorkflowDONs = append(allWorkflowDONs, remoteWorkflowDONs...)
411-
allWorkflowDONs = append(allWorkflowDONs, myWorkflowDONs...)
412411
for _, myDON := range myCapabilityDONs {
413-
w.serveCapabilities(ctx, w.myPeerID, myDON, localRegistry, allWorkflowDONs)
414-
415-
// Capability DONs also need remote capabilities (e.g. relay DON
416-
// needs vault for secret fetching). Without this, only workflow
417-
// DONs discover cross-DON capabilities.
418-
for _, rcd := range remoteCapabilityDONs {
419-
w.addRemoteCapabilities(ctx, myDON, rcd, localRegistry)
420-
}
412+
w.serveCapabilities(ctx, w.myPeerID, myDON, localRegistry, remoteWorkflowDONs)
421413
}
422414
}
423415

@@ -433,7 +425,7 @@ func (w *launcher) OnNewRegistry(ctx context.Context, localRegistry *registrysyn
433425
}
434426
if w.don2donSharedPeer != nil {
435427
donPairs := w.donPairsToUpdate(w.myPeerID, localRegistry)
436-
err := w.don2donSharedPeer.UpdateConnectionsByDONs(ctx, donPairs, defaultStreamConfig)
428+
err := w.don2donSharedPeer.UpdateConnectionsByDONs(ctx, donPairs, w.p2pStreamConfig)
437429
if err != nil {
438430
return fmt.Errorf("failed to update peer connections: %w", err)
439431
}

core/capabilities/launcher_test.go

Lines changed: 17 additions & 68 deletions
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,7 @@ import (
1212
"google.golang.org/protobuf/proto"
1313
"google.golang.org/protobuf/types/known/durationpb"
1414

15+
"github.com/smartcontractkit/libocr/ragep2p"
1516
ragetypes "github.com/smartcontractkit/libocr/ragep2p/types"
1617

1718
"github.com/smartcontractkit/chainlink-common/pkg/capabilities"
@@ -233,73 +234,6 @@ func TestLauncher(t *testing.T) {
233234
assert.Equal(t, 1, observedLogs.FilterMessage("failed to serve capability").Len())
234235
})
235236

236-
t.Run("OK-single_DON_serves_capabilities", func(t *testing.T) {
237-
// Regression test: in single-DON topologies (e.g. local CRE), the same
238-
// DON is both a workflow DON and a capability DON. The DON appears in
239-
// myWorkflowDONs (not remoteWorkflowDONs). Previously, serveCapabilities
240-
// only received remoteWorkflowDONs, causing executable.Server.SetConfig
241-
// to fail with "empty workflowDONs provided".
242-
lggr, observedLogs := logger.TestObserved(t, zapcore.DebugLevel)
243-
registry := NewRegistry(lggr)
244-
dispatcher := remoteMocks.NewDispatcher(t)
245-
246-
nodes := newNodes(4)
247-
peer := mocks.NewPeer(t)
248-
peer.On("UpdateConnections", mock.Anything).Return(nil)
249-
peer.On("ID").Return(nodes[0])
250-
peer.On("IsBootstrap").Return(false)
251-
wrapper := mocks.NewPeerWrapper(t)
252-
wrapper.On("GetPeer").Return(peer)
253-
254-
fullTriggerCapID := "streams-trigger@1.0.0"
255-
mt := newMockTrigger(capabilities.MustNewCapabilityInfo(
256-
fullTriggerCapID,
257-
capabilities.CapabilityTypeTrigger,
258-
"streams trigger",
259-
))
260-
require.NoError(t, registry.Add(t.Context(), mt))
261-
262-
fullTargetID := "write-chain_evm_1@1.0.0"
263-
mtarg := &mockCapability{
264-
CapabilityInfo: capabilities.MustNewCapabilityInfo(
265-
fullTargetID,
266-
capabilities.CapabilityTypeTarget,
267-
"write chain",
268-
),
269-
}
270-
require.NoError(t, registry.Add(t.Context(), mtarg))
271-
272-
triggerCapID := RandomUTF8BytesWord()
273-
targetCapID := RandomUTF8BytesWord()
274-
275-
// Single DON: acceptsWorkflows=true AND has capability configurations.
276-
// This puts it in both myWorkflowDONs and myCapabilityDONs.
277-
dID := uint32(1)
278-
localRegistry := buildLocalRegistry()
279-
addDON(localRegistry, dID, uint32(0), uint8(1), true, true, nodes, []string{"zone-a"}, 1, [][32]byte{triggerCapID, targetCapID})
280-
addCapabilityToDON(localRegistry, dID, fullTriggerCapID, capabilities.CapabilityTypeTrigger, nil)
281-
addCapabilityToDON(localRegistry, dID, fullTargetID, capabilities.CapabilityTypeTarget, nil)
282-
283-
launcher, err := NewLauncher(
284-
lggr,
285-
wrapper,
286-
nil,
287-
nil,
288-
dispatcher,
289-
registry,
290-
&mockDonNotifier{},
291-
)
292-
require.NoError(t, err)
293-
require.NoError(t, launcher.Start(t.Context()))
294-
defer launcher.Close()
295-
296-
dispatcher.On("SetReceiver", fullTriggerCapID, dID, mock.AnythingOfType("*remote.triggerPublisher")).Return(nil)
297-
dispatcher.On("SetReceiver", fullTargetID, dID, mock.AnythingOfType("*executable.server")).Return(nil)
298-
299-
require.NoError(t, launcher.OnNewRegistry(t.Context(), localRegistry))
300-
assert.Equal(t, 0, observedLogs.FilterMessage("failed to serve capability").Len())
301-
})
302-
303237
t.Run("start and close with nil peer wrapper", func(t *testing.T) {
304238
lggr := logger.Test(t)
305239
registry := NewRegistry(lggr)
@@ -959,10 +893,24 @@ func TestLauncher_V2CapabilitiesAddViaCombinedClient(t *testing.T) {
959893
addCapabilityToDON(localRegistry, zoneBDonID, fullExecutableCapID, capabilities.CapabilityTypeTarget, execCfg)
960894
addCapabilityToDON(localRegistry, capDonID, fullLocalCapID, capabilities.CapabilityTypeAction, localCfg) // should be skipped
961895

896+
customStreamConfig := p2ptypes.StreamConfig{
897+
IncomingMessageBufferSize: 999,
898+
OutgoingMessageBufferSize: 888,
899+
MaxMessageLenBytes: 777777,
900+
MessageRateLimiter: ragep2p.TokenBucketParams{
901+
Rate: 50.0,
902+
Capacity: 250,
903+
},
904+
BytesRateLimiter: ragep2p.TokenBucketParams{
905+
Rate: 2500000.0,
906+
Capacity: 5000000,
907+
},
908+
}
909+
962910
sharedPeer := mocks.NewSharedPeer(t)
963911
sharedPeer.On("ID").Return(workflowDonNodes[0])
964912
sharedPeer.On("IsBootstrap").Return(false)
965-
sharedPeer.On("UpdateConnectionsByDONs", mock.Anything, mock.Anything, mock.Anything).Return(nil)
913+
sharedPeer.On("UpdateConnectionsByDONs", mock.Anything, mock.Anything, customStreamConfig).Return(nil)
966914

967915
launcher, err := NewLauncher(
968916
lggr,
@@ -974,6 +922,7 @@ func TestLauncher_V2CapabilitiesAddViaCombinedClient(t *testing.T) {
974922
&mockDonNotifier{},
975923
)
976924
require.NoError(t, err)
925+
launcher.p2pStreamConfig = customStreamConfig
977926
servicetest.Run(t, launcher)
978927

979928
dispatcher.On("SetReceiverForMethod", fullTriggerCapID, capDonID, "StreamsTrigger", mock.AnythingOfType("*remote.triggerSubscriber")).Return(nil)

core/config/cre_config.go

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,7 @@ type CRE interface {
1313
// When enabled, additional OTel tracing and logging is performed.
1414
DebugMode() bool
1515
LocalSecrets() map[string]string
16+
ConfidentialRelay() CREConfidentialRelay
1617
}
1718

1819
// WorkflowFetcher defines configuration for fetching workflow files
@@ -21,6 +22,13 @@ type WorkflowFetcher interface {
2122
URL() string
2223
}
2324

25+
// CREConfidentialRelay defines configuration for the confidential relay handler.
26+
type CREConfidentialRelay interface {
27+
Enabled() bool
28+
TrustedPCRs() string
29+
CARootsPEM() string
30+
}
31+
2432
// CRELinking defines configuration for connecting to the CRE linking service
2533
type CRELinking interface {
2634
URL() string

core/config/toml/types.go

Lines changed: 26 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1895,14 +1895,24 @@ type CreConfig struct {
18951895
// Requires [Tracing].Enabled = true for traces to be exported (trace export is gated by
18961896
// Tracing.Enabled in initGlobals; Telemetry.Enabled is optional—traces work with or without it).
18971897
// WARNING: This is not suitable for production use due to performance overhead.
1898-
DebugMode *bool `toml:",omitempty"`
1898+
DebugMode *bool `toml:",omitempty"`
1899+
ConfidentialRelay *ConfidentialRelayConfig `toml:",omitempty"`
18991900
}
19001901

19011902
// WorkflowFetcherConfig holds the configuration for fetching workflow files
19021903
type WorkflowFetcherConfig struct {
19031904
URL *string `toml:",omitempty"`
19041905
}
19051906

1907+
// ConfidentialRelayConfig holds the configuration for the confidential relay handler.
1908+
// When Enabled is true, the node participates in the confidential relay DON,
1909+
// validating enclave attestations and proxying capability requests.
1910+
type ConfidentialRelayConfig struct {
1911+
Enabled *bool `toml:",omitempty"`
1912+
TrustedPCRs *string `toml:",omitempty"`
1913+
CARootsPEM *string `toml:",omitempty"`
1914+
}
1915+
19061916
// LinkingConfig holds the configuration for connecting to the CRE linking service
19071917
type LinkingConfig struct {
19081918
URL *string `toml:",omitempty"`
@@ -1956,6 +1966,21 @@ func (c *CreConfig) setFrom(f *CreConfig) {
19561966
if f.DebugMode != nil {
19571967
c.DebugMode = f.DebugMode
19581968
}
1969+
1970+
if f.ConfidentialRelay != nil {
1971+
if c.ConfidentialRelay == nil {
1972+
c.ConfidentialRelay = &ConfidentialRelayConfig{}
1973+
}
1974+
if v := f.ConfidentialRelay.Enabled; v != nil {
1975+
c.ConfidentialRelay.Enabled = v
1976+
}
1977+
if v := f.ConfidentialRelay.TrustedPCRs; v != nil {
1978+
c.ConfidentialRelay.TrustedPCRs = v
1979+
}
1980+
if v := f.ConfidentialRelay.CARootsPEM; v != nil {
1981+
c.ConfidentialRelay.CARootsPEM = v
1982+
}
1983+
}
19591984
}
19601985

19611986
func (w *WorkflowFetcherConfig) ValidateConfig() error {

core/services/chainlink/config_cre.go

Lines changed: 29 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -105,6 +105,35 @@ func (c *creConfig) Linking() config.CRELinking {
105105
return &linkingConfig{url: url, tlsEnabled: tlsEnabled}
106106
}
107107

108+
type confidentialRelayConfig struct {
109+
enabled bool
110+
trustedPCRs string
111+
caRootsPEM string
112+
}
113+
114+
func (cr *confidentialRelayConfig) Enabled() bool { return cr.enabled }
115+
func (cr *confidentialRelayConfig) TrustedPCRs() string { return cr.trustedPCRs }
116+
func (cr *confidentialRelayConfig) CARootsPEM() string { return cr.caRootsPEM }
117+
118+
func (c *creConfig) ConfidentialRelay() config.CREConfidentialRelay {
119+
if c.c.ConfidentialRelay == nil {
120+
return &confidentialRelayConfig{}
121+
}
122+
enabled := false
123+
if c.c.ConfidentialRelay.Enabled != nil {
124+
enabled = *c.c.ConfidentialRelay.Enabled
125+
}
126+
trustedPCRs := ""
127+
if c.c.ConfidentialRelay.TrustedPCRs != nil {
128+
trustedPCRs = *c.c.ConfidentialRelay.TrustedPCRs
129+
}
130+
caRootsPEM := ""
131+
if c.c.ConfidentialRelay.CARootsPEM != nil {
132+
caRootsPEM = *c.c.ConfidentialRelay.CARootsPEM
133+
}
134+
return &confidentialRelayConfig{enabled: enabled, trustedPCRs: trustedPCRs, caRootsPEM: caRootsPEM}
135+
}
136+
108137
func (c *creConfig) LocalSecrets() map[string]string {
109138
return c.s.LocalSecrets
110139
}

core/services/job/models.go

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -930,6 +930,7 @@ type WorkflowSpec struct {
930930
CreatedAt time.Time `toml:"-"`
931931
UpdatedAt time.Time `toml:"-"`
932932
SpecType WorkflowSpecType `toml:"spec_type" db:"spec_type"`
933+
Attributes []byte `db:"attributes"`
933934
sdkWorkflow *sdk.WorkflowSpec
934935
rawSpec []byte
935936
config []byte

core/services/standardcapabilities/conversions/conversions.go

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -33,6 +33,8 @@ func GetCapabilityIDFromCommand(command string, config string) string {
3333
return "http-trigger@1.0.0-alpha"
3434
case "http_action":
3535
return "http-actions@1.0.0-alpha" // plural "actions"
36+
case "mock":
37+
return "mock@1.0.0"
3638
default:
3739
return ""
3840
}
@@ -52,6 +54,8 @@ func GetCommandFromCapabilityID(capabilityID string) string {
5254
return "http_trigger"
5355
case strings.HasPrefix(capabilityID, "http-actions"):
5456
return "http_action"
57+
case strings.HasPrefix(capabilityID, "mock"):
58+
return "mock"
5559
default:
5660
return ""
5761
}

core/services/workflows/artifacts/v2/orm.go

Lines changed: 6 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -63,7 +63,8 @@ func (orm *orm) UpsertWorkflowSpec(ctx context.Context, spec *job.WorkflowSpec)
6363
config_url,
6464
created_at,
6565
updated_at,
66-
spec_type
66+
spec_type,
67+
attributes
6768
) VALUES (
6869
:workflow,
6970
:config,
@@ -76,7 +77,8 @@ func (orm *orm) UpsertWorkflowSpec(ctx context.Context, spec *job.WorkflowSpec)
7677
:config_url,
7778
:created_at,
7879
:updated_at,
79-
:spec_type
80+
:spec_type,
81+
:attributes
8082
) ON CONFLICT (workflow_id) DO UPDATE
8183
SET
8284
workflow = EXCLUDED.workflow,
@@ -89,7 +91,8 @@ func (orm *orm) UpsertWorkflowSpec(ctx context.Context, spec *job.WorkflowSpec)
8991
config_url = EXCLUDED.config_url,
9092
created_at = EXCLUDED.created_at,
9193
updated_at = EXCLUDED.updated_at,
92-
spec_type = EXCLUDED.spec_type
94+
spec_type = EXCLUDED.spec_type,
95+
attributes = EXCLUDED.attributes
9396
RETURNING id
9497
`
9598

core/services/workflows/syncer/fetcher.go

Lines changed: 13 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -177,12 +177,20 @@ func newFileFetcher(basePath string, lggr logger.Logger) types.FetcherFunc {
177177
if err != nil {
178178
return nil, fmt.Errorf("invalid URL: %w", err)
179179
}
180-
fullPath := filepath.Clean(u.Path)
181180

182-
// ensure that the incoming request URL is either relative or absolute but within the basePath
183-
if !filepath.IsAbs(fullPath) {
184-
// If it's not absolute, we assume it's relative to the basePath
185-
fullPath = filepath.Join(basePath, fullPath)
181+
var fullPath string
182+
if u.Scheme == "http" || u.Scheme == "https" {
183+
// For HTTP(S) URLs, extract just the filename and resolve against basePath.
184+
// This supports confidential workflows where the on-chain URL must be HTTP
185+
// (so the enclave can fetch the binary), but the syncer reads from the local filesystem.
186+
fullPath = filepath.Join(basePath, filepath.Base(u.Path))
187+
} else {
188+
fullPath = filepath.Clean(u.Path)
189+
// ensure that the incoming request URL is either relative or absolute but within the basePath
190+
if !filepath.IsAbs(fullPath) {
191+
// If it's not absolute, we assume it's relative to the basePath
192+
fullPath = filepath.Join(basePath, fullPath)
193+
}
186194
}
187195
if !strings.HasPrefix(fullPath, basePath) {
188196
return nil, fmt.Errorf("request URL %s is not within the basePath %s", fullPath, basePath)

0 commit comments

Comments
 (0)