Skip to content

Commit bdff6a5

Browse files
authored
[CRE] [4/5] ConfidentialModule, config, DB migration, syncer routing (#21641)
* resolve go.mod conflicts * Revert file fetcher HTTP URL handling The filepath.Base() code for HTTP URLs in newFileFetcher was dead code. No test or production path sends an HTTP URL to a file-based fetcher. The enclave fetches binaries via its own BinaryFetcher, independent of the node syncer's fetcher. * Restore comments in startAndRegisterEngine, extract newV2EngineConfig Restore comments that were dropped when extracting startAndRegisterEngine from tryEngineCreate. Extract common EngineConfig construction into newV2EngineConfig and initDone hook wiring into wireInitDoneHook, reducing duplication between the normal and confidential engine paths. * Unify engine creation flow for confidential and normal paths Replace the early-return pattern with a symmetric if/else that picks the factory, then converges on a single startAndRegisterEngine call. Rename tryConfidentialEngineCreate to confidentialEngineFactory and change its signature to return (services.Service, error). # Conflicts: # core/services/workflows/syncer/v2/handler.go * Restore initDone comment in tryEngineCreate * Inline startAndRegisterEngine back into tryEngineCreate No longer needed as a separate method now that both engine paths converge in tryEngineCreate. * Restore inline comments in wireInitDoneHook * Restore original BeholderEmitter closure pattern in newV2EngineConfig * Clean up factory signatures and newV2EngineConfig param ordering Group string params together in newV2EngineConfig, move SdkName and DebugMode into the constructor, drop unused wid param from confidentialEngineFactory. * Fix lint: errors.New, assert.Empty * Add BinaryURLResolver to ConfidentialModule for presigned URL support The enclave needs an authenticated URL to download WASM binaries from the CRE storage service. BinaryURLResolver resolves the raw on-chain URL into a presigned/ephemeral URL per execution. Nil-safe: falls back to the raw URL when no resolver is set. PR 5/5 (#21642) wires this to the storage service retriever. * add EmitUserMetric to stubExecutionHelper host.ExecutionHelper gained EmitUserMetric in chainlink-common #1924. * fix struct field alignment in ConfidentialModule * set org_id on WorkflowExecution from CRE context Bump chainlink-common to pick up OrgId field on WorkflowExecution proto. Read org from the CRE execution context and include it in the proto sent to the enclave, matching the pattern used by the framework executor for VaultDON requests. * block user workflows from calling system-only capabilities Adds a deny-list check in ExecutionHelper.CallCapability() that prevents user workflow steps from invoking internal capabilities like confidential-workflows. The ConfidentialModule bypasses this gate because it calls the registry directly.
1 parent 6b6d76b commit bdff6a5

10 files changed

Lines changed: 1077 additions & 69 deletions

File tree

core/services/job/models.go

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -930,6 +930,7 @@ type WorkflowSpec struct {
930930
CreatedAt time.Time `toml:"-"`
931931
UpdatedAt time.Time `toml:"-"`
932932
SpecType WorkflowSpecType `toml:"spec_type" db:"spec_type"`
933+
Attributes []byte `db:"attributes"`
933934
sdkWorkflow *sdk.WorkflowSpec
934935
rawSpec []byte
935936
config []byte

core/services/standardcapabilities/conversions/conversions.go

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -50,6 +50,8 @@ func GetCapabilityIDFromCommand(command string, config string) string {
5050
return "http-trigger@1.0.0-alpha"
5151
case "http_action":
5252
return "http-actions@1.0.0-alpha" // plural "actions"
53+
case "mock":
54+
return "mock@1.0.0"
5355
default:
5456
return ""
5557
}
@@ -71,6 +73,8 @@ func GetCommandFromCapabilityID(capabilityID string) string {
7173
return "http_trigger"
7274
case strings.HasPrefix(capabilityID, "http-actions"):
7375
return "http_action"
76+
case strings.HasPrefix(capabilityID, "mock"):
77+
return "mock"
7478
default:
7579
return ""
7680
}

core/services/workflows/artifacts/v2/orm.go

Lines changed: 6 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -63,7 +63,8 @@ func (orm *orm) UpsertWorkflowSpec(ctx context.Context, spec *job.WorkflowSpec)
6363
config_url,
6464
created_at,
6565
updated_at,
66-
spec_type
66+
spec_type,
67+
attributes
6768
) VALUES (
6869
:workflow,
6970
:config,
@@ -76,7 +77,8 @@ func (orm *orm) UpsertWorkflowSpec(ctx context.Context, spec *job.WorkflowSpec)
7677
:config_url,
7778
:created_at,
7879
:updated_at,
79-
:spec_type
80+
:spec_type,
81+
:attributes
8082
) ON CONFLICT (workflow_id) DO UPDATE
8183
SET
8284
workflow = EXCLUDED.workflow,
@@ -89,7 +91,8 @@ func (orm *orm) UpsertWorkflowSpec(ctx context.Context, spec *job.WorkflowSpec)
8991
config_url = EXCLUDED.config_url,
9092
created_at = EXCLUDED.created_at,
9193
updated_at = EXCLUDED.updated_at,
92-
spec_type = EXCLUDED.spec_type
94+
spec_type = EXCLUDED.spec_type,
95+
attributes = EXCLUDED.attributes
9396
RETURNING id
9497
`
9598

core/services/workflows/syncer/v2/handler.go

Lines changed: 120 additions & 66 deletions
Original file line numberDiff line numberDiff line change
@@ -558,6 +558,7 @@ func (h *eventHandler) createWorkflowSpec(ctx context.Context, payload WorkflowR
558558
SpecType: job.WASMFile,
559559
BinaryURL: payload.BinaryURL,
560560
ConfigURL: payload.ConfigURL,
561+
Attributes: payload.Attributes,
561562
}
562563

563564
if _, err = h.workflowArtifactsStore.UpsertWorkflowSpec(ctx, entry); err != nil {
@@ -658,60 +659,9 @@ func (h *eventHandler) engineFactoryFn(ctx context.Context, workflowID string, o
658659
}
659660

660661
// V2 aka "NoDAG"
661-
cfg := &v2.EngineConfig{
662-
Lggr: h.lggr,
663-
Module: module,
664-
WorkflowConfig: config,
665-
CapRegistry: h.capRegistry,
666-
DonSubscriber: h.workflowDonSubscriber,
667-
UseLocalTimeProvider: h.useLocalTimeProvider,
668-
DonTimeStore: h.donTimeStore,
669-
ExecutionsStore: h.workflowStore,
670-
WorkflowID: workflowID,
671-
WorkflowOwner: owner,
672-
WorkflowName: name,
673-
WorkflowTag: tag,
674-
WorkflowEncryptionKey: h.workflowEncryptionKey,
662+
cfg := h.newV2EngineConfig(module, workflowID, owner, tag, sdkName, name, config)
675663

676-
LocalLimits: v2.EngineLimits{}, // all defaults
677-
LocalLimiters: h.engineLimiters,
678-
FeatureFlags: h.featureFlags,
679-
GlobalExecutionConcurrencyLimiter: h.workflowLimits,
680-
681-
BeholderEmitter: func() custmsg.MessageEmitter {
682-
h.emitterMu.RLock()
683-
defer h.emitterMu.RUnlock()
684-
return h.emitter
685-
}(),
686-
BillingClient: h.billingClient,
687-
688-
WorkflowRegistryAddress: h.workflowRegistryAddress,
689-
WorkflowRegistryChainSelector: h.workflowRegistryChainSelector,
690-
OrgResolver: h.orgResolver,
691-
DebugMode: h.debugMode,
692-
SecretsFetcher: h.secretsFetcher,
693-
SdkName: sdkName,
694-
695-
ShardOrchestratorClient: h.shardOrchestratorClient,
696-
ShardingEnabled: h.shardingEnabled,
697-
MyShardID: h.myShardID,
698-
ShardRoutingSteady: h.shardRoutingSteady,
699-
}
700-
701-
// Wire the initDone channel to the OnInitialized lifecycle hook.
702-
// This will be called when the engine completes initialization (including trigger subscriptions).
703-
// We compose with any existing hook to avoid overwriting test hooks or other user-provided hooks.
704-
if initDone != nil {
705-
existingHook := cfg.Hooks.OnInitialized
706-
cfg.Hooks.OnInitialized = func(err error) {
707-
// Signal completion to the handler first
708-
initDone <- err
709-
// Then call any existing hook (e.g., from tests)
710-
if existingHook != nil {
711-
existingHook(err)
712-
}
713-
}
714-
}
664+
h.wireInitDoneHook(cfg, initDone)
715665

716666
return v2.NewEngine(cfg)
717667
}
@@ -824,24 +774,23 @@ func (h *eventHandler) tryEngineCreate(ctx context.Context, spec *job.WorkflowSp
824774
return fmt.Errorf("invalid workflow name: %w", err)
825775
}
826776

777+
confidential, err := v2.IsConfidential(spec.Attributes)
778+
if err != nil {
779+
return fmt.Errorf("failed to parse workflow attributes: %w", err)
780+
}
781+
827782
// Create a channel to receive the initialization result.
828783
// This allows us to wait for the engine to complete initialization (including trigger subscriptions)
829784
// before emitting the workflowActivated event, ensuring the event accurately reflects deployment status.
830785
initDone := make(chan error, 1)
786+
var engine services.Service
831787

832-
// Scope the engineFactory call so that decodedBinary goes out of scope immediately after the factory returns
833-
engine, err := func() (services.Service, error) {
834-
return h.engineFactory(
835-
ctx,
836-
spec.WorkflowID,
837-
spec.WorkflowOwner,
838-
workflowName,
839-
spec.WorkflowTag,
840-
configBytes,
841-
decodedBinary,
842-
initDone,
843-
)
844-
}()
788+
if confidential {
789+
h.lggr.Infow("routing workflow to confidential execution", "workflowID", spec.WorkflowID)
790+
engine, err = h.confidentialEngineFactory(spec, workflowName, decodedBinary, initDone)
791+
} else {
792+
engine, err = h.engineFactory(ctx, spec.WorkflowID, spec.WorkflowOwner, workflowName, spec.WorkflowTag, configBytes, decodedBinary, initDone)
793+
}
845794
if err != nil {
846795
return fmt.Errorf("failed to create workflow engine: %w", err)
847796
}
@@ -898,6 +847,111 @@ func (h *eventHandler) tryEngineCreate(ctx context.Context, spec *job.WorkflowSp
898847
return nil
899848
}
900849

850+
// newV2EngineConfig builds the common EngineConfig shared by both the normal
851+
// WASM engine and the confidential engine paths. Caller supplies the module.
852+
func (h *eventHandler) newV2EngineConfig(
853+
module host.ModuleV2,
854+
workflowID, owner, tag, sdkName string,
855+
name types.WorkflowName,
856+
config []byte,
857+
) *v2.EngineConfig {
858+
return &v2.EngineConfig{
859+
Lggr: h.lggr,
860+
Module: module,
861+
WorkflowConfig: config,
862+
CapRegistry: h.capRegistry,
863+
DonSubscriber: h.workflowDonSubscriber,
864+
UseLocalTimeProvider: h.useLocalTimeProvider,
865+
DonTimeStore: h.donTimeStore,
866+
ExecutionsStore: h.workflowStore,
867+
WorkflowID: workflowID,
868+
WorkflowOwner: owner,
869+
WorkflowName: name,
870+
WorkflowTag: tag,
871+
WorkflowEncryptionKey: h.workflowEncryptionKey,
872+
873+
LocalLimits: v2.EngineLimits{}, // all defaults
874+
LocalLimiters: h.engineLimiters,
875+
FeatureFlags: h.featureFlags,
876+
GlobalExecutionConcurrencyLimiter: h.workflowLimits,
877+
878+
BeholderEmitter: func() custmsg.MessageEmitter {
879+
h.emitterMu.RLock()
880+
defer h.emitterMu.RUnlock()
881+
return h.emitter
882+
}(),
883+
BillingClient: h.billingClient,
884+
885+
WorkflowRegistryAddress: h.workflowRegistryAddress,
886+
WorkflowRegistryChainSelector: h.workflowRegistryChainSelector,
887+
OrgResolver: h.orgResolver,
888+
SecretsFetcher: h.secretsFetcher,
889+
DebugMode: h.debugMode,
890+
SdkName: sdkName,
891+
892+
ShardOrchestratorClient: h.shardOrchestratorClient,
893+
ShardingEnabled: h.shardingEnabled,
894+
MyShardID: h.myShardID,
895+
ShardRoutingSteady: h.shardRoutingSteady,
896+
}
897+
}
898+
899+
// wireInitDoneHook wires the initDone channel to the OnInitialized lifecycle hook.
900+
// This will be called when the engine completes initialization (including trigger subscriptions).
901+
// We compose with any existing hook to avoid overwriting test hooks or other user-provided hooks.
902+
func (h *eventHandler) wireInitDoneHook(cfg *v2.EngineConfig, initDone chan<- error) {
903+
if initDone == nil {
904+
return
905+
}
906+
existingHook := cfg.Hooks.OnInitialized
907+
cfg.Hooks.OnInitialized = func(err error) {
908+
// Signal completion to the handler first
909+
initDone <- err
910+
// Then call any existing hook (e.g., from tests)
911+
if existingHook != nil {
912+
existingHook(err)
913+
}
914+
}
915+
}
916+
917+
// confidentialEngineFactory creates a V2 engine backed by a ConfidentialModule
918+
// instead of a local WASM module. The ConfidentialModule delegates execution to
919+
// the confidential-workflows capability which runs the WASM inside a TEE.
920+
func (h *eventHandler) confidentialEngineFactory(
921+
spec *job.WorkflowSpec,
922+
workflowName types.WorkflowName,
923+
decodedBinary []byte,
924+
initDone chan<- error,
925+
) (services.Service, error) {
926+
attrs, err := v2.ParseWorkflowAttributes(spec.Attributes)
927+
if err != nil {
928+
return nil, fmt.Errorf("failed to parse workflow attributes: %w", err)
929+
}
930+
931+
binaryHash := v2.ComputeBinaryHash(decodedBinary)
932+
933+
lggr := logger.Named(h.lggr, "WorkflowEngine.ConfidentialModule")
934+
lggr = logger.With(lggr, "workflowID", spec.WorkflowID, "workflowName", spec.WorkflowName, "workflowOwner", spec.WorkflowOwner)
935+
936+
// nil resolver: raw binaryURL is passed to the enclave as-is.
937+
// PR 5/5 (#21642) wires this to the storage service retriever
938+
// so the enclave receives a presigned URL.
939+
module := v2.NewConfidentialModule(
940+
h.capRegistry,
941+
spec.BinaryURL,
942+
binaryHash,
943+
spec.WorkflowID, spec.WorkflowOwner, workflowName.String(), spec.WorkflowTag,
944+
attrs.VaultDonSecrets,
945+
nil,
946+
lggr,
947+
)
948+
949+
cfg := h.newV2EngineConfig(module, spec.WorkflowID, spec.WorkflowOwner, spec.WorkflowTag, "", workflowName, []byte(spec.Config))
950+
h.wireInitDoneHook(cfg, initDone)
951+
952+
return v2.NewEngine(cfg)
953+
}
954+
901955
// logCustMsg emits a custom message to the external sink and logs an error if that fails.
902956
func logCustMsg(ctx context.Context, cma custmsg.MessageEmitter, msg string, log logger.Logger) {
903957
err := cma.Emit(ctx, msg)

0 commit comments

Comments
 (0)