66 "fmt"
77 "slices"
88 "strconv"
9+ "sync"
910 "text/template"
1011 "time"
1112
@@ -15,6 +16,7 @@ import (
1516 "github.com/pkg/errors"
1617 "github.com/rs/zerolog"
1718 chainselectors "github.com/smartcontractkit/chain-selectors"
19+ "golang.org/x/sync/errgroup"
1820 "google.golang.org/protobuf/types/known/durationpb"
1921
2022 capabilitiespb "github.com/smartcontractkit/chainlink-common/pkg/capabilities/pb"
@@ -174,106 +176,122 @@ func createJobs(
174176 return errors .Wrapf (chErr , "failed to get Solana chain ID from selector %d" , solChain .ChainSelector ())
175177 }
176178
179+ solChainID , err := solChain .SolClient .GetGenesisHash (ctx )
180+ if err != nil {
181+ return errors .Wrapf (err , "failed to get sol genesis hash" )
182+ }
183+ version := creEnv .ContractVersions [cre_sol .ForwarderContract .String ()]
184+ creForwarderKey := datastore .NewAddressRefKey (
185+ solChain .ChainSelector (),
186+ cre_sol .ForwarderContract ,
187+ version ,
188+ cre_sol .DefaultForwarderQualifier ,
189+ )
190+ creForwarderStateKey := datastore .NewAddressRefKey (
191+ solChain .ChainSelector (),
192+ cre_sol .ForwarderState ,
193+ version ,
194+ cre_sol .DefaultForwarderQualifier ,
195+ )
196+ creForwarderAddress , err := creEnv .CldfEnvironment .DataStore .Addresses ().Get (creForwarderKey )
197+ if err != nil {
198+ return errors .Wrap (err , "failed to get CRE Forwarder address" )
199+ }
200+ creForwarderStateAddress , err := creEnv .CldfEnvironment .DataStore .Addresses ().Get (creForwarderStateKey )
201+ if err != nil {
202+ return errors .Wrap (err , "failed to get CRE Forwarder State address" )
203+ }
204+ tmpl , err := template .New ("solConfig" ).Parse (configTemplate )
205+ if err != nil {
206+ return errors .Wrapf (err , "failed to parse %s config template" , flag )
207+ }
208+
209+ var specsMu sync.Mutex
210+ group , groupCtx := errgroup .WithContext (ctx )
177211 for _ , workerNode := range workerNodes {
178- key , ok := workerNode .Keys .Solana [chainID ]
179- if ! ok {
180- return fmt .Errorf ("failed to get solana key (chainID %s, node index %d)" , chainID , workerNode .Index )
181- }
212+ group .Go (func () error {
213+ key , ok := workerNode .Keys .Solana [chainID ]
214+ if ! ok {
215+ return fmt .Errorf ("failed to get solana key (chainID %s, node index %d)" , chainID , workerNode .Index )
216+ }
182217
183- version := creEnv .ContractVersions [cre_sol .ForwarderContract .String ()]
218+ nodeAddress := key .PublicAddress .String ()
219+ runtimeFallbacks := map [string ]any {
220+ "CREForwarderAddress" : creForwarderAddress .Address ,
221+ "CREForwarderState" : creForwarderStateAddress .Address ,
222+ "NodeAddress" : nodeAddress ,
223+ "IsLocal" : true ,
224+ "Network" : "solana" ,
225+ "ChainID" : solChainID .String (),
226+ }
184227
185- creForwarderKey := datastore .NewAddressRefKey (
186- solChain .ChainSelector (),
187- cre_sol .ForwarderContract ,
188- version ,
189- cre_sol .DefaultForwarderQualifier ,
190- )
191- creForwarderStateKey := datastore .NewAddressRefKey (
192- solChain .ChainSelector (),
193- cre_sol .ForwarderState ,
194- version ,
195- cre_sol .DefaultForwarderQualifier ,
196- )
197- creForwarderAddress , err := creEnv .CldfEnvironment .DataStore .Addresses ().Get (creForwarderKey )
198- if err != nil {
199- return errors .Wrap (err , "failed to get CRE Forwarder address" )
200- }
201- creForwarderStateAddress , err := creEnv .CldfEnvironment .DataStore .Addresses ().Get (creForwarderStateKey )
202- if err != nil {
203- return errors .Wrap (err , "failed to get CRE Forwarder State address" )
204- }
228+ templateData , aErr := credon .ApplyRuntimeValues (config .Values , runtimeFallbacks )
229+ if aErr != nil {
230+ return errors .Wrap (aErr , "failed to apply runtime values" )
231+ }
205232
206- nodeAddress := key .PublicAddress .String ()
207- tmpl , err := template .New ("solConfig" ).Parse (configTemplate )
208- if err != nil {
209- return errors .Wrapf (err , "failed to parse %s config template" , flag )
210- }
233+ var configBuffer bytes.Buffer
234+ if err := tmpl .Execute (& configBuffer , templateData ); err != nil {
235+ return errors .Wrapf (err , "failed to execute %s config template" , flag )
236+ }
211237
212- solChainID , err := solChain .SolClient .GetGenesisHash (ctx )
213- if err != nil {
214- return errors .Wrapf (err , "failed to get sol genesis hash" )
215- }
216- runtimeFallbacks := map [string ]any {
217- "CREForwarderAddress" : creForwarderAddress .Address ,
218- "CREForwarderState" : creForwarderStateAddress .Address ,
219- "NodeAddress" : nodeAddress ,
220- "IsLocal" : true ,
221- "Network" : "solana" ,
222- "ChainID" : solChainID .String (),
223- }
238+ configStr := configBuffer .String ()
239+ if err := credon .ValidateTemplateSubstitution (configStr , flag ); err != nil {
240+ return errors .Wrapf (err , "%s template validation failed" , flag )
241+ }
224242
225- templateData , aErr := credon .ApplyRuntimeValues (config .Values , runtimeFallbacks )
226- if aErr != nil {
227- return errors .Wrap (aErr , "failed to apply runtime values" )
228- }
243+ workerInput := cre_jobs.ProposeJobSpecInput {
244+ Domain : offchain .ProductLabel ,
245+ Environment : cre .EnvironmentName ,
246+ DONName : don .Name ,
247+ JobName : "sol-v2-worker-" + chainID ,
248+ ExtraLabels : map [string ]string {cre .CapabilityLabelKey : flag },
249+ DONFilters : []offchain.TargetDONFilter {
250+ {Key : offchain .FilterKeyDONName , Value : don .Name },
251+ {Key : "p2p_id" , Value : workerNode .Keys .PeerID ()}, // required since each node requires a different config (it contains its own from address)
252+ },
253+ Template : job_types .Solana ,
254+ Inputs : job_types.JobSpecInput {
255+ "command" : command ,
256+ "config" : configStr ,
257+ },
258+ }
229259
230- var configBuffer bytes. Buffer
231- if err := tmpl . Execute ( & configBuffer , templateData ); err != nil {
232- return errors . Wrapf ( err , " failed to execute %s config template " , flag )
233- }
260+ workerVerErr := cre_jobs. ProposeJobSpec {}. VerifyPreconditions ( * creEnv . CldfEnvironment , workerInput )
261+ if workerVerErr != nil {
262+ return fmt . Errorf ( "precondition verification failed for Solana v2 worker job: %w " , workerVerErr )
263+ }
234264
235- configStr := configBuffer . String ( )
236- if err := credon . ValidateTemplateSubstitution ( configStr , flag ); err != nil {
237- return errors . Wrapf ( err , "%s template validation failed " , flag )
238- }
265+ workerReport , workerErr := cre_jobs. ProposeJobSpec {}. Apply ( * creEnv . CldfEnvironment , workerInput )
266+ if workerErr != nil {
267+ return fmt . Errorf ( "failed to propose Solana v2 worker job spec: %w " , workerErr )
268+ }
239269
240- workerInput := cre_jobs.ProposeJobSpecInput {
241- Domain : offchain .ProductLabel ,
242- Environment : cre .EnvironmentName ,
243- DONName : don .Name ,
244- JobName : "sol-v2-worker-" + chainID ,
245- ExtraLabels : map [string ]string {cre .CapabilityLabelKey : flag },
246- DONFilters : []offchain.TargetDONFilter {
247- {Key : offchain .FilterKeyDONName , Value : don .Name },
248- {Key : "p2p_id" , Value : workerNode .Keys .PeerID ()}, // required since each node requires a different config (it contains its own from address)
249- },
250- Template : job_types .Solana ,
251- Inputs : job_types.JobSpecInput {
252- "command" : command ,
253- "config" : configStr ,
254- },
255- }
270+ specsMu .Lock ()
271+ defer specsMu .Unlock ()
272+ for _ , r := range workerReport .Reports {
273+ out , ok := r .Output .(cre_jobs_ops.ProposeStandardCapabilityJobOutput )
274+ if ! ok {
275+ return fmt .Errorf ("unable to cast to ProposeStandardCapabilityJobOutput, actual type: %T" , r .Output )
276+ }
277+ mErr := mergo .Merge (& specs , out .Specs , mergo .WithAppendSlice )
278+ if mErr != nil {
279+ return fmt .Errorf ("failed to merge worker job specs: %w" , mErr )
280+ }
281+ }
256282
257- workerVerErr := cre_jobs.ProposeJobSpec {}.VerifyPreconditions (* creEnv .CldfEnvironment , workerInput )
258- if workerVerErr != nil {
259- return fmt .Errorf ("precondition verification failed for Solana v2 worker job: %w" , workerVerErr )
260- }
283+ select {
284+ case <- groupCtx .Done ():
285+ return groupCtx .Err ()
286+ default :
287+ }
261288
262- workerReport , workerErr := cre_jobs.ProposeJobSpec {}.Apply (* creEnv .CldfEnvironment , workerInput )
263- if workerErr != nil {
264- return fmt .Errorf ("failed to propose Solana v2 worker job spec: %w" , workerErr )
265- }
289+ return nil
290+ })
291+ }
266292
267- for _ , r := range workerReport .Reports {
268- out , ok := r .Output .(cre_jobs_ops.ProposeStandardCapabilityJobOutput )
269- if ! ok {
270- return fmt .Errorf ("unable to cast to ProposeStandardCapabilityJobOutput, actual type: %T" , r .Output )
271- }
272- mErr := mergo .Merge (& specs , out .Specs , mergo .WithAppendSlice )
273- if mErr != nil {
274- return fmt .Errorf ("failed to merge worker job specs: %w" , mErr )
275- }
276- }
293+ if err := group .Wait (); err != nil {
294+ return err
277295 }
278296
279297 approveErr := jobs .Approve (ctx , creEnv .CldfEnvironment .Offchain , dons , specs )
0 commit comments