Skip to content

Commit da13c82

Browse files
committed
parallelize secrets generation, JD linking and job proposal/approvals
1 parent 5c2db8b commit da13c82

18 files changed

Lines changed: 1202 additions & 242 deletions

File tree

system-tests/lib/cre/contracts/keystone.go

Lines changed: 32 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -436,7 +436,7 @@ func toDons(input cre.ConfigureCapabilityRegistryInput) (*dons, error) {
436436
return dons, nil
437437
}
438438

439-
func ConfigureCapabilityRegistry(input cre.ConfigureCapabilityRegistryInput) (CapabilitiesRegistry, error) {
439+
func ConfigureCapabilityRegistry(ctx context.Context, input cre.ConfigureCapabilityRegistryInput) (CapabilitiesRegistry, error) {
440440
if err := input.Validate(); err != nil {
441441
return nil, errors.Wrap(err, "input validation failed")
442442
}
@@ -445,6 +445,7 @@ func ConfigureCapabilityRegistry(input cre.ConfigureCapabilityRegistryInput) (Ca
445445
if dErr != nil {
446446
return nil, errors.Wrap(dErr, "failed to map input to dons")
447447
}
448+
var capReg CapabilitiesRegistry
448449
if !input.WithV2Registries {
449450
for _, don := range dons.donsOrderedByID() {
450451
for i, cap := range don.Capabilities {
@@ -478,41 +479,48 @@ func ConfigureCapabilityRegistry(input cre.ConfigureCapabilityRegistryInput) (Ca
478479
return nil, errors.Wrap(seqErr, "failed to configure capabilities registry")
479480
}
480481

481-
capReg, cErr := cre_contracts.GetOwnedContractV2[*kcr.CapabilitiesRegistry](
482+
capRegContract, cErr := cre_contracts.GetOwnedContractV2[*kcr.CapabilitiesRegistry](
482483
input.CldEnv.DataStore.Addresses(),
483484
input.CldEnv.BlockChains.EVMChains()[input.ChainSelector],
484485
input.CapabilitiesRegistryAddress.Hex(),
485486
)
486487
if cErr != nil {
487488
return nil, errors.Wrap(cErr, "failed to get capabilities registry contract")
488489
}
489-
return &registryWrapper{V1: capReg.Contract}, nil
490-
}
490+
capReg = &registryWrapper{V1: capRegContract.Contract}
491+
} else {
492+
// Transform dons data to V2 sequence input format
493+
v2Input := dons.mustToV2ConfigureInput(input.ChainSelector, input.CapabilitiesRegistryAddress.Hex(), input.CapabilityToOCR3Config, input.ExtraSignerFamilies)
494+
_, seqErr := operations.ExecuteSequence(
495+
input.CldEnv.OperationsBundle,
496+
cap_reg_v2_seq.ConfigureCapabilitiesRegistry,
497+
cap_reg_v2_seq.ConfigureCapabilitiesRegistryDeps{
498+
Env: input.CldEnv,
499+
},
500+
v2Input,
501+
)
502+
if seqErr != nil {
503+
return nil, errors.Wrap(seqErr, "failed to configure capabilities registry")
504+
}
491505

492-
// Transform dons data to V2 sequence input format
493-
v2Input := dons.mustToV2ConfigureInput(input.ChainSelector, input.CapabilitiesRegistryAddress.Hex(), input.CapabilityToOCR3Config, input.ExtraSignerFamilies)
494-
_, seqErr := operations.ExecuteSequence(
495-
input.CldEnv.OperationsBundle,
496-
cap_reg_v2_seq.ConfigureCapabilitiesRegistry,
497-
cap_reg_v2_seq.ConfigureCapabilitiesRegistryDeps{
498-
Env: input.CldEnv,
499-
},
500-
v2Input,
501-
)
502-
if seqErr != nil {
503-
return nil, errors.Wrap(seqErr, "failed to configure capabilities registry")
506+
capRegContract, cErr := cre_contracts.GetOwnedContractV2[*capabilities_registry_v2.CapabilitiesRegistry](
507+
input.CldEnv.DataStore.Addresses(),
508+
input.CldEnv.BlockChains.EVMChains()[input.ChainSelector],
509+
input.CapabilitiesRegistryAddress.Hex(),
510+
)
511+
if cErr != nil {
512+
return nil, errors.Wrap(cErr, "failed to get capabilities registry contract")
513+
}
514+
515+
capReg = &registryWrapper{V2: capRegContract.Contract}
504516
}
505517

506-
capReg, cErr := cre_contracts.GetOwnedContractV2[*capabilities_registry_v2.CapabilitiesRegistry](
507-
input.CldEnv.DataStore.Addresses(),
508-
input.CldEnv.BlockChains.EVMChains()[input.ChainSelector],
509-
input.CapabilitiesRegistryAddress.Hex(),
510-
)
511-
if cErr != nil {
512-
return nil, errors.Wrap(cErr, "failed to get capabilities registry contract")
518+
// TODO: remove this once the race condition is fixed (CRE-2684)
519+
if waitErr := waitForWorkflowWorkersCapabilityRegistrySync(ctx, input); waitErr != nil {
520+
return nil, errors.Wrap(waitErr, "failed waiting for workflow nodes to sync capability registry state")
513521
}
514522

515-
return &registryWrapper{V2: capReg.Contract}, nil
523+
return capReg, nil
516524
}
517525

518526
type DonInfo struct {
Lines changed: 205 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,205 @@
1+
package contracts
2+
3+
import (
4+
"context"
5+
"database/sql"
6+
"encoding/json"
7+
"fmt"
8+
"sort"
9+
"strconv"
10+
"strings"
11+
"time"
12+
13+
"github.com/jmoiron/sqlx"
14+
"github.com/pkg/errors"
15+
16+
"github.com/smartcontractkit/chainlink-testing-framework/framework/components/postgres"
17+
18+
"github.com/smartcontractkit/chainlink/system-tests/lib/cre"
19+
)
20+
21+
const (
22+
capabilityRegistrySyncPollInterval = 5 * time.Second
23+
capabilityRegistrySyncTimeout = 2 * time.Minute
24+
capabilityRegistrySyncQueryTimeout = 3 * time.Second
25+
)
26+
27+
type workflowWorkerTarget struct {
28+
donName string
29+
nodeIndex int
30+
dbPort int
31+
}
32+
33+
type capabilityRegistrySyncState struct {
34+
IDsToDONs map[string]json.RawMessage `json:"IDsToDONs"`
35+
IDsToNodes map[string]json.RawMessage `json:"IDsToNodes"`
36+
IDsToCapabilities map[string]json.RawMessage `json:"IDsToCapabilities"`
37+
}
38+
39+
const latestCapabilityRegistrySyncStateQuery = `
40+
SELECT data
41+
FROM registry_syncer_states
42+
ORDER BY id DESC
43+
LIMIT 1
44+
`
45+
46+
func waitForWorkflowWorkersCapabilityRegistrySync(ctx context.Context, input cre.ConfigureCapabilityRegistryInput) error {
47+
// Waiting for capability registry sync is not supported in Kubernetes mode.
48+
if input.Provider.IsKubernetes() {
49+
return nil
50+
}
51+
targets, tErr := workflowWorkerTargets(input.Topology, input.NodeSets)
52+
if tErr != nil {
53+
return tErr
54+
}
55+
if len(targets) == 0 {
56+
return nil
57+
}
58+
59+
timeoutCtx, cancel := context.WithTimeout(ctx, capabilityRegistrySyncTimeout)
60+
defer cancel()
61+
62+
pending := make(map[string]workflowWorkerTarget, len(targets))
63+
lastState := make(map[string]string, len(targets))
64+
for _, target := range targets {
65+
key := registryTargetKey(target)
66+
pending[key] = target
67+
lastState[key] = "awaiting first successful registry snapshot check"
68+
}
69+
70+
ticker := time.NewTicker(capabilityRegistrySyncPollInterval)
71+
defer ticker.Stop()
72+
73+
for {
74+
for key, target := range pending {
75+
ready, state := hasCapabilityRegistrySyncOnWorker(timeoutCtx, target.dbPort, target.nodeIndex)
76+
if ready {
77+
delete(pending, key)
78+
delete(lastState, key)
79+
continue
80+
}
81+
lastState[key] = state
82+
}
83+
84+
if len(pending) == 0 {
85+
return nil
86+
}
87+
88+
select {
89+
case <-timeoutCtx.Done():
90+
if errors.Is(timeoutCtx.Err(), context.DeadlineExceeded) {
91+
return fmt.Errorf("timed out after %.0f seconds waiting for workflow workers to sync capability registry state: %s", capabilityRegistrySyncTimeout.Seconds(), formatCapabilityRegistrySyncPending(lastState))
92+
}
93+
return timeoutCtx.Err()
94+
case <-ticker.C:
95+
}
96+
}
97+
}
98+
99+
func workflowWorkerTargets(topology *cre.Topology, nodeSets []*cre.NodeSet) ([]workflowWorkerTarget, error) {
100+
if topology == nil || topology.DonsMetadata == nil {
101+
return nil, errors.New("topology metadata cannot be nil")
102+
}
103+
104+
allDons := topology.DonsMetadata.List()
105+
indexByName := make(map[string]int, len(allDons))
106+
for i, don := range allDons {
107+
indexByName[don.Name] = i
108+
}
109+
110+
workflowDons, err := topology.DonsMetadata.WorkflowDONs()
111+
if err != nil {
112+
return nil, errors.Wrap(err, "failed to resolve workflow DONs")
113+
}
114+
115+
targets := make([]workflowWorkerTarget, 0)
116+
for _, workflowDON := range workflowDons {
117+
donIdx, ok := indexByName[workflowDON.Name]
118+
if !ok {
119+
return nil, fmt.Errorf("workflow DON %s not found in topology list", workflowDON.Name)
120+
}
121+
if donIdx >= len(nodeSets) || nodeSets[donIdx] == nil {
122+
return nil, fmt.Errorf("nodeset for workflow DON %s is missing", workflowDON.Name)
123+
}
124+
125+
dbPort := nodeSets[donIdx].DbInput.Port
126+
if dbPort == 0 {
127+
defaultPort, dErr := strconv.Atoi(postgres.Port)
128+
if dErr != nil {
129+
return nil, errors.Wrap(dErr, "failed to convert postgres port to int")
130+
}
131+
dbPort = defaultPort
132+
}
133+
134+
workers, wErr := workflowDON.Workers()
135+
if wErr != nil {
136+
return nil, errors.Wrapf(wErr, "failed to resolve workers for workflow DON %s", workflowDON.Name)
137+
}
138+
139+
for _, worker := range workers {
140+
targets = append(targets, workflowWorkerTarget{
141+
donName: workflowDON.Name,
142+
nodeIndex: worker.Index,
143+
dbPort: dbPort,
144+
})
145+
}
146+
}
147+
148+
return targets, nil
149+
}
150+
151+
func hasCapabilityRegistrySyncOnWorker(ctx context.Context, dbPort, nodeIndex int) (bool, string) {
152+
dsn := fmt.Sprintf("host=%s port=%d user=%s password=%s dbname=%s sslmode=disable connect_timeout=3", "127.0.0.1", dbPort, postgres.User, postgres.Password, fmt.Sprintf("db_%d", nodeIndex))
153+
db, err := sqlx.Open("postgres", dsn)
154+
if err != nil {
155+
return false, fmt.Sprintf("failed to open db connection: %v", err)
156+
}
157+
defer db.Close()
158+
159+
queryCtx, cancel := context.WithTimeout(ctx, capabilityRegistrySyncQueryTimeout)
160+
defer cancel()
161+
162+
var rawData []byte
163+
if err = db.GetContext(queryCtx, &rawData, latestCapabilityRegistrySyncStateQuery); err != nil {
164+
if errors.Is(err, sql.ErrNoRows) {
165+
return false, "registry_syncer_states is empty"
166+
}
167+
return false, fmt.Sprintf("failed to query latest registry syncer state: %v", err)
168+
}
169+
if len(rawData) == 0 {
170+
return false, "latest registry_syncer_states row has empty data payload"
171+
}
172+
173+
var state capabilityRegistrySyncState
174+
if err = json.Unmarshal(rawData, &state); err != nil {
175+
return false, fmt.Sprintf("failed to unmarshal latest registry syncer state payload: %v", err)
176+
}
177+
178+
hasDONs := len(state.IDsToDONs) > 0
179+
hasNodes := len(state.IDsToNodes) > 0
180+
hasCapabilities := len(state.IDsToCapabilities) > 0
181+
if !hasDONs || !hasCapabilities || !hasNodes {
182+
return false, fmt.Sprintf("incomplete registry snapshot (has_dons=%t has_nodes=%t has_capabilities=%t)", hasDONs, hasNodes, hasCapabilities)
183+
}
184+
185+
return true, ""
186+
}
187+
188+
func registryTargetKey(target workflowWorkerTarget) string {
189+
return fmt.Sprintf("%s/%d", target.donName, target.nodeIndex)
190+
}
191+
192+
func formatCapabilityRegistrySyncPending(lastState map[string]string) string {
193+
parts := make([]string, 0, len(lastState))
194+
keys := make([]string, 0, len(lastState))
195+
for target := range lastState {
196+
keys = append(keys, target)
197+
}
198+
sort.Strings(keys)
199+
200+
for _, target := range keys {
201+
reason := lastState[target]
202+
parts = append(parts, fmt.Sprintf("%s (%s)", target, reason))
203+
}
204+
return strings.Join(parts, "; ")
205+
}

0 commit comments

Comments
 (0)