From a480a08f88ff8aab5c2eec63c0972218f0608524 Mon Sep 17 00:00:00 2001 From: mchain0 Date: Fri, 3 Apr 2026 12:32:25 +0200 Subject: [PATCH 1/2] fix/bind-workflow-registry-once --- core/services/cre/cre.go | 51 ++++++++++++++++++---------------------- 1 file changed, 23 insertions(+), 28 deletions(-) diff --git a/core/services/cre/cre.go b/core/services/cre/cre.go index baf87584cc8..6aed9277563 100644 --- a/core/services/cre/cre.go +++ b/core/services/cre/cre.go @@ -841,6 +841,7 @@ func newWorkflowRegistrySyncerV2( gatewayConnectorWrapper *gatewayconnector.ServiceWrapper, ) (syncerV2.WorkflowRegistrySyncer, []commonsrv.Service, error) { capCfg := cfg.Capabilities() + wfReg := capCfg.WorkflowRegistry() key := opts.WorkflowKey fetcherFunc, retrieverFunc, srvcs, err := newFetcherServiceV2(opts, capCfg, lggr, gatewayConnectorWrapper) @@ -859,13 +860,13 @@ func newWorkflowRegistrySyncerV2( lf, artifactsV2.WithMaxArtifactSize( artifactsV2.ArtifactConfig{ - MaxBinarySize: uint64(capCfg.WorkflowRegistry().MaxBinarySize()), - MaxSecretsSize: uint64(capCfg.WorkflowRegistry().MaxEncryptedSecretsSize()), - MaxConfigSize: uint64(capCfg.WorkflowRegistry().MaxConfigSize()), + MaxBinarySize: uint64(wfReg.MaxBinarySize()), + MaxSecretsSize: uint64(wfReg.MaxEncryptedSecretsSize()), + MaxConfigSize: uint64(wfReg.MaxConfigSize()), }, ), artifactsV2.WithConfig(artifactsV2.StoreConfig{ - ArtifactStorageHost: capCfg.WorkflowRegistry().WorkflowStorage().ArtifactStorageHost(), + ArtifactStorageHost: wfReg.WorkflowStorage().ArtifactStorageHost(), }), ) if err != nil { @@ -884,7 +885,7 @@ func newWorkflowRegistrySyncerV2( return nil, nil, fmt.Errorf("could not instantiate engine feature flags: %w", err) } - selector, err := chainSelector(capCfg.WorkflowRegistry().ChainID(), capCfg.WorkflowRegistry().NetworkID()) + selector, err := chainSelector(wfReg.ChainID(), wfReg.NetworkID()) if err != nil { return nil, nil, fmt.Errorf("failed to get workflow registry chain details by chain ID and network ID: %w", err) } @@ -905,7 +906,7 @@ func newWorkflowRegistrySyncerV2( key, workflowDonNotifier, syncerV2.WithBillingClient(billingClient), - syncerV2.WithWorkflowRegistry(capCfg.WorkflowRegistry().Address(), selector), + syncerV2.WithWorkflowRegistry(wfReg.Address(), selector), syncerV2.WithOrgResolver(orgResolver), syncerV2.WithDebugMode(cfg.CRE().DebugMode()), syncerV2.WithLocalSecrets(lggr, cfg.CRE().LocalSecrets()), @@ -919,35 +920,29 @@ func newWorkflowRegistrySyncerV2( return nil, nil, errors.New("failed to instantiate contract reader factory") } - var shardOrchestratorClient shardorchestrator.ClientInterface - if opts.ShardOrchestratorClient != nil { - shardOrchestratorClient = opts.ShardOrchestratorClient - } else { - var c shardorchestrator.ClientInterface - c, err = newShardOrchestratorClient(cfg, lggr) + shardOrchestratorClient := opts.ShardOrchestratorClient + if shardOrchestratorClient == nil { + shardOrchestratorClient, err = newShardOrchestratorClient(cfg, lggr) if err != nil { return nil, nil, err } - shardOrchestratorClient = c - } - - addSources := capCfg.WorkflowRegistry().AdditionalSources() - addSourceConfigs := make([]syncerV2.AdditionalSourceConfig, 0, len(addSources)) - if len(addSources) > 0 { - for _, src := range addSources { - addSourceConfigs = append(addSourceConfigs, syncerV2.AdditionalSourceConfig{ - URL: src.GetURL(), - Name: src.GetName(), - TLSEnabled: src.GetTLSEnabled(), - JWTGenerator: opts.JWTGenerator, - }) + } + + addSources := wfReg.AdditionalSources() + addSourceConfigs := make([]syncerV2.AdditionalSourceConfig, len(addSources)) + for i, src := range addSources { + addSourceConfigs[i] = syncerV2.AdditionalSourceConfig{ + URL: src.GetURL(), + Name: src.GetName(), + TLSEnabled: src.GetTLSEnabled(), + JWTGenerator: opts.JWTGenerator, } } registryOpts := []syncerV2.Option{ syncerV2.WithAdditionalSources(addSourceConfigs), syncerV2.WithShardOrchestratorClient(shardOrchestratorClient), - syncerV2.WithMaxConcurrency(capCfg.WorkflowRegistry().MaxConcurrency()), + syncerV2.WithMaxConcurrency(wfReg.MaxConcurrency()), } if cfg.Sharding().ShardingEnabled() { registryOpts = append(registryOpts, @@ -959,11 +954,11 @@ func newWorkflowRegistrySyncerV2( workflowRegistrySyncerV2, err := syncerV2.NewWorkflowRegistry( lggr, crFactory, - capCfg.WorkflowRegistry().Address(), + wfReg.Address(), selector, syncerV2.Config{ QueryCount: 100, - SyncStrategy: syncerV2.SyncStrategy(capCfg.WorkflowRegistry().SyncStrategy()), + SyncStrategy: syncerV2.SyncStrategy(wfReg.SyncStrategy()), }, eventHandler, workflowDonNotifier, From 7439a692ba0c6905df89b7366589ab735275b57e Mon Sep 17 00:00:00 2001 From: mchain0 Date: Tue, 7 Apr 2026 14:08:23 +0200 Subject: [PATCH 2/2] develop merge fix --- core/services/cre/cre.go | 13 ------------- 1 file changed, 13 deletions(-) diff --git a/core/services/cre/cre.go b/core/services/cre/cre.go index fa6e03ffef7..1e9dd386c01 100644 --- a/core/services/cre/cre.go +++ b/core/services/cre/cre.go @@ -949,19 +949,6 @@ func newWorkflowRegistrySyncerV2( return nil, nil, fmt.Errorf("unable to create workflow registry event handler: %w", err) } - crFactory, err := newContractReaderFactory(capCfg, relayerChainInterops) - if err != nil { - return nil, nil, errors.New("failed to instantiate contract reader factory") - } - - shardOrchestratorClient := opts.ShardOrchestratorClient - if shardOrchestratorClient == nil { - shardOrchestratorClient, err = newShardOrchestratorClient(cfg, lggr) - if err != nil { - return nil, nil, err - } - } - addSources := wfReg.AdditionalSources() addSourceConfigs := make([]syncerV2.AdditionalSourceConfig, len(addSources)) for i, src := range addSources {