Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions .changeset/confidential-module-plumbing.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
---
"chainlink": patch
---

Add ConfidentialModule and attributes plumbing for confidential CRE workflows #added #db_update
2 changes: 1 addition & 1 deletion core/scripts/cre/environment/environment/workflow.go
Original file line number Diff line number Diff line change
Expand Up @@ -475,7 +475,7 @@ func deployWorkflow(

fmt.Printf("\n⚙️ Registering workflow '%s' with the workflow registry\n\n", workflowNameFlag)

workflowID, registerErr := creworkflow.RegisterWithContract(ctx, sethClient, common.HexToAddress(workflowRegistryAddress), workflowRegistryVersion, uint64(donIDFlag), workflowNameFlag, "file://"+wasmWorkflowFilePathFlag, configPath, secretsPath, &containerTargetDirFlag)
workflowID, registerErr := creworkflow.RegisterWithContract(ctx, sethClient, common.HexToAddress(workflowRegistryAddress), workflowRegistryVersion, uint64(donIDFlag), workflowNameFlag, "file://"+wasmWorkflowFilePathFlag, configPath, secretsPath, nil, &containerTargetDirFlag)
if registerErr != nil {
return errors.Wrapf(registerErr, "❌ failed to register workflow %s", workflowNameFlag)
}
Expand Down
1 change: 1 addition & 0 deletions core/services/job/models.go
Original file line number Diff line number Diff line change
Expand Up @@ -930,6 +930,7 @@ type WorkflowSpec struct {
CreatedAt time.Time `toml:"-"`
UpdatedAt time.Time `toml:"-"`
SpecType WorkflowSpecType `toml:"spec_type" db:"spec_type"`
Attributes []byte `db:"attributes"`
sdkWorkflow *sdk.WorkflowSpec
rawSpec []byte
config []byte
Expand Down
9 changes: 6 additions & 3 deletions core/services/workflows/artifacts/v2/orm.go
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,8 @@ func (orm *orm) UpsertWorkflowSpec(ctx context.Context, spec *job.WorkflowSpec)
config_url,
created_at,
updated_at,
spec_type
spec_type,
attributes
) VALUES (
:workflow,
:config,
Expand All @@ -76,7 +77,8 @@ func (orm *orm) UpsertWorkflowSpec(ctx context.Context, spec *job.WorkflowSpec)
:config_url,
:created_at,
:updated_at,
:spec_type
:spec_type,
:attributes
) ON CONFLICT (workflow_id) DO UPDATE
SET
workflow = EXCLUDED.workflow,
Expand All @@ -89,7 +91,8 @@ func (orm *orm) UpsertWorkflowSpec(ctx context.Context, spec *job.WorkflowSpec)
config_url = EXCLUDED.config_url,
created_at = EXCLUDED.created_at,
updated_at = EXCLUDED.updated_at,
spec_type = EXCLUDED.spec_type
spec_type = EXCLUDED.spec_type,
attributes = EXCLUDED.attributes
RETURNING id
`

Expand Down
18 changes: 13 additions & 5 deletions core/services/workflows/syncer/fetcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -177,12 +177,20 @@ func newFileFetcher(basePath string, lggr logger.Logger) types.FetcherFunc {
if err != nil {
return nil, fmt.Errorf("invalid URL: %w", err)
}
fullPath := filepath.Clean(u.Path)

// ensure that the incoming request URL is either relative or absolute but within the basePath
if !filepath.IsAbs(fullPath) {
// If it's not absolute, we assume it's relative to the basePath
fullPath = filepath.Join(basePath, fullPath)
var fullPath string
if u.Scheme == "http" || u.Scheme == "https" {
// For HTTP(S) URLs, extract just the filename and resolve against basePath.
// This supports confidential workflows where the on-chain URL must be HTTP
// (so the enclave can fetch the binary), but the syncer reads from the local filesystem.
fullPath = filepath.Join(basePath, filepath.Base(u.Path))
} else {
fullPath = filepath.Clean(u.Path)
// ensure that the incoming request URL is either relative or absolute but within the basePath
if !filepath.IsAbs(fullPath) {
// If it's not absolute, we assume it's relative to the basePath
fullPath = filepath.Join(basePath, fullPath)
}
}
if !strings.HasPrefix(fullPath, basePath) {
return nil, fmt.Errorf("request URL %s is not within the basePath %s", fullPath, basePath)
Expand Down
18 changes: 13 additions & 5 deletions core/services/workflows/syncer/v2/fetcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -211,12 +211,20 @@ func newFileFetcher(basePath string, lggr logger.Logger) types.FetcherFunc {
if err != nil {
return nil, fmt.Errorf("invalid URL: %w", err)
}
fullPath := filepath.Clean(u.Path)

// ensure that the incoming request URL is either relative or absolute but within the basePath
if !filepath.IsAbs(fullPath) {
// If it's not absolute, we assume it's relative to the basePath
fullPath = filepath.Join(basePath, fullPath)
var fullPath string
if u.Scheme == "http" || u.Scheme == "https" {
// For HTTP(S) URLs, extract just the filename and resolve against basePath.
// This supports confidential workflows where the on-chain URL must be HTTP
// (so the enclave can fetch the binary), but the syncer reads from the local filesystem.
fullPath = filepath.Join(basePath, filepath.Base(u.Path))
} else {
fullPath = filepath.Clean(u.Path)
// ensure that the incoming request URL is either relative or absolute but within the basePath
if !filepath.IsAbs(fullPath) {
// If it's not absolute, we assume it's relative to the basePath
fullPath = filepath.Join(basePath, fullPath)
}
}
if !strings.HasPrefix(fullPath, basePath) {
return nil, fmt.Errorf("request URL %s is not within the basePath %s", fullPath, basePath)
Expand Down
128 changes: 128 additions & 0 deletions core/services/workflows/syncer/v2/handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -485,6 +485,7 @@ func (h *eventHandler) createWorkflowSpec(ctx context.Context, payload WorkflowR
SpecType: job.WASMFile,
BinaryURL: payload.BinaryURL,
ConfigURL: payload.ConfigURL,
Attributes: payload.Attributes,
}

if _, err = h.workflowArtifactsStore.UpsertWorkflowSpec(ctx, entry); err != nil {
Expand Down Expand Up @@ -726,6 +727,15 @@ func (h *eventHandler) tryEngineCreate(ctx context.Context, spec *job.WorkflowSp
return fmt.Errorf("invalid workflow name: %w", err)
}

confidential, err := v2.IsConfidential(spec.Attributes)
if err != nil {
return fmt.Errorf("failed to parse workflow attributes: %w", err)
}
if confidential {
h.lggr.Infow("routing workflow to confidential execution", "workflowID", spec.WorkflowID)
return h.tryConfidentialEngineCreate(ctx, spec, wid, workflowName, decodedBinary, source)
}

// Create a channel to receive the initialization result.
// This allows us to wait for the engine to complete initialization (including trigger subscriptions)
// before emitting the workflowActivated event, ensuring the event accurately reflects deployment status.
Expand Down Expand Up @@ -797,6 +807,124 @@ func (h *eventHandler) tryEngineCreate(ctx context.Context, spec *job.WorkflowSp
return nil
}

// tryConfidentialEngineCreate creates a V2 engine backed by a ConfidentialModule
// instead of a local WASM module. The ConfidentialModule delegates execution to
// the confidential-workflows capability which runs the WASM inside a TEE.
func (h *eventHandler) tryConfidentialEngineCreate(
ctx context.Context,
spec *job.WorkflowSpec,
wid types.WorkflowID,
workflowName types.WorkflowName,
decodedBinary []byte,
source string,
) error {
attrs, err := v2.ParseWorkflowAttributes(spec.Attributes)
if err != nil {
return fmt.Errorf("failed to parse workflow attributes: %w", err)
}

binaryHash := v2.ComputeBinaryHash(decodedBinary)

lggr := logger.Named(h.lggr, "WorkflowEngine.ConfidentialModule")
lggr = logger.With(lggr, "workflowID", spec.WorkflowID, "workflowName", spec.WorkflowName, "workflowOwner", spec.WorkflowOwner)

module := v2.NewConfidentialModule(
h.capRegistry,
spec.BinaryURL,
binaryHash,
spec.WorkflowID,
spec.WorkflowOwner,
workflowName.String(),
spec.WorkflowTag,
attrs.VaultDonSecrets,
lggr,
)

initDone := make(chan error, 1)

cfg := &v2.EngineConfig{
Lggr: h.lggr,
Module: module,
WorkflowConfig: []byte(spec.Config),
CapRegistry: h.capRegistry,
DonSubscriber: h.workflowDonSubscriber,
UseLocalTimeProvider: h.useLocalTimeProvider,
DonTimeStore: h.donTimeStore,
ExecutionsStore: h.workflowStore,
WorkflowID: spec.WorkflowID,
WorkflowOwner: spec.WorkflowOwner,
WorkflowName: workflowName,
WorkflowTag: spec.WorkflowTag,
WorkflowEncryptionKey: h.workflowEncryptionKey,

LocalLimits: v2.EngineLimits{},
LocalLimiters: h.engineLimiters,
FeatureFlags: h.featureFlags,
GlobalExecutionConcurrencyLimiter: h.workflowLimits,

BeholderEmitter: h.emitter,
BillingClient: h.billingClient,

WorkflowRegistryAddress: h.workflowRegistryAddress,
WorkflowRegistryChainSelector: h.workflowRegistryChainSelector,
OrgResolver: h.orgResolver,
SecretsFetcher: h.secretsFetcher,
}

existingHook := cfg.Hooks.OnInitialized
cfg.Hooks.OnInitialized = func(err error) {
initDone <- err
if existingHook != nil {
existingHook(err)
}
}

engine, err := v2.NewEngine(cfg)
if err != nil {
return fmt.Errorf("failed to create confidential workflow engine: %w", err)
}

if err = engine.Start(ctx); err != nil {
return fmt.Errorf("failed to start confidential workflow engine: %w", err)
}

select {
case <-ctx.Done():
if closeErr := engine.Close(); closeErr != nil {
h.lggr.Errorw("failed to close engine after context cancellation", "error", closeErr, "workflowID", spec.WorkflowID)
}
return fmt.Errorf("context cancelled while waiting for engine initialization: %w", ctx.Err())
case initErr := <-initDone:
if initErr != nil {
if closeErr := engine.Close(); closeErr != nil {
h.lggr.Errorw("failed to close engine after initialization failure", "error", closeErr, "workflowID", spec.WorkflowID)
}
return fmt.Errorf("engine initialization failed: %w", initErr)
}
}

if err := h.engineRegistry.Add(wid, source, engine); err != nil {
if closeErr := engine.Close(); closeErr != nil {
return fmt.Errorf("failed to close workflow engine: %w during invariant violation: %w", closeErr, err)
}

if errors.Is(err, ErrAlreadyExists) {
existingEntry, found := h.engineRegistry.Get(wid)
if found {
h.lggr.Warnw("WorkflowID collision detected: workflow already exists from different source",
"workflowID", wid.Hex(),
"attemptedSource", source,
"existingSource", existingEntry.Source,
"hint", "Each workflow ID should only be registered from a single source. Check your workflow configurations for duplicates.")
}
}

return fmt.Errorf("invariant violation: %w", err)
}

return nil
}

// logCustMsg emits a custom message to the external sink and logs an error if that fails.
func logCustMsg(ctx context.Context, cma custmsg.MessageEmitter, msg string, log logger.Logger) {
err := cma.Emit(ctx, msg)
Expand Down
Loading
Loading