Skip to content

Commit 2c8e5ea

Browse files
authored
[DX03578] speed up local CRE startup (#21719)
* parallelize secrets generation, JD linking and job proposal/approvals * clean up * lints lints * allow CRE system/regression tests to run max 10 minutes in the CI * CR changes
1 parent c5b6d3c commit 2c8e5ea

19 files changed

Lines changed: 1223 additions & 243 deletions

File tree

.github/workflows/cre-regression-system-tests.yaml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -99,7 +99,7 @@ jobs:
9999
# http://docs.github.com/en/actions/how-tos/deploy/configure-and-manage-deployments/control-deployments#using-environments-without-deployments
100100
name: integration
101101
deployment: false
102-
timeout-minutes: 60
102+
timeout-minutes: 10
103103
env:
104104
ENABLE_AUTO_QUARANTINE: "true"
105105
# override Chip Ingress and Chip Config images with remote images. We have added this env var here, instead of the "Start local CRE" step, because

.github/workflows/cre-system-tests.yaml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -146,7 +146,7 @@ jobs:
146146
# http://docs.github.com/en/actions/how-tos/deploy/configure-and-manage-deployments/control-deployments#using-environments-without-deployments
147147
name: integration
148148
deployment: false
149-
timeout-minutes: 60
149+
timeout-minutes: 10
150150
env:
151151
ENABLE_AUTO_QUARANTINE: "true"
152152
# override Chip Ingress and Chip Config images with remote images. We have added this env var here, instead of the "Start local CRE" step, because

system-tests/lib/cre/contracts/keystone.go

Lines changed: 32 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -436,7 +436,7 @@ func toDons(input cre.ConfigureCapabilityRegistryInput) (*dons, error) {
436436
return dons, nil
437437
}
438438

439-
func ConfigureCapabilityRegistry(input cre.ConfigureCapabilityRegistryInput) (CapabilitiesRegistry, error) {
439+
func ConfigureCapabilityRegistry(ctx context.Context, input cre.ConfigureCapabilityRegistryInput) (CapabilitiesRegistry, error) {
440440
if err := input.Validate(); err != nil {
441441
return nil, errors.Wrap(err, "input validation failed")
442442
}
@@ -445,6 +445,7 @@ func ConfigureCapabilityRegistry(input cre.ConfigureCapabilityRegistryInput) (Ca
445445
if dErr != nil {
446446
return nil, errors.Wrap(dErr, "failed to map input to dons")
447447
}
448+
var capReg CapabilitiesRegistry
448449
if !input.WithV2Registries {
449450
for _, don := range dons.donsOrderedByID() {
450451
for i, cap := range don.Capabilities {
@@ -478,41 +479,48 @@ func ConfigureCapabilityRegistry(input cre.ConfigureCapabilityRegistryInput) (Ca
478479
return nil, errors.Wrap(seqErr, "failed to configure capabilities registry")
479480
}
480481

481-
capReg, cErr := cre_contracts.GetOwnedContractV2[*kcr.CapabilitiesRegistry](
482+
capRegContract, cErr := cre_contracts.GetOwnedContractV2[*kcr.CapabilitiesRegistry](
482483
input.CldEnv.DataStore.Addresses(),
483484
input.CldEnv.BlockChains.EVMChains()[input.ChainSelector],
484485
input.CapabilitiesRegistryAddress.Hex(),
485486
)
486487
if cErr != nil {
487488
return nil, errors.Wrap(cErr, "failed to get capabilities registry contract")
488489
}
489-
return &registryWrapper{V1: capReg.Contract}, nil
490-
}
490+
capReg = &registryWrapper{V1: capRegContract.Contract}
491+
} else {
492+
// Transform dons data to V2 sequence input format
493+
v2Input := dons.mustToV2ConfigureInput(input.ChainSelector, input.CapabilitiesRegistryAddress.Hex(), input.CapabilityToOCR3Config, input.ExtraSignerFamilies)
494+
_, seqErr := operations.ExecuteSequence(
495+
input.CldEnv.OperationsBundle,
496+
cap_reg_v2_seq.ConfigureCapabilitiesRegistry,
497+
cap_reg_v2_seq.ConfigureCapabilitiesRegistryDeps{
498+
Env: input.CldEnv,
499+
},
500+
v2Input,
501+
)
502+
if seqErr != nil {
503+
return nil, errors.Wrap(seqErr, "failed to configure capabilities registry")
504+
}
491505

492-
// Transform dons data to V2 sequence input format
493-
v2Input := dons.mustToV2ConfigureInput(input.ChainSelector, input.CapabilitiesRegistryAddress.Hex(), input.CapabilityToOCR3Config, input.ExtraSignerFamilies)
494-
_, seqErr := operations.ExecuteSequence(
495-
input.CldEnv.OperationsBundle,
496-
cap_reg_v2_seq.ConfigureCapabilitiesRegistry,
497-
cap_reg_v2_seq.ConfigureCapabilitiesRegistryDeps{
498-
Env: input.CldEnv,
499-
},
500-
v2Input,
501-
)
502-
if seqErr != nil {
503-
return nil, errors.Wrap(seqErr, "failed to configure capabilities registry")
506+
capRegContract, cErr := cre_contracts.GetOwnedContractV2[*capabilities_registry_v2.CapabilitiesRegistry](
507+
input.CldEnv.DataStore.Addresses(),
508+
input.CldEnv.BlockChains.EVMChains()[input.ChainSelector],
509+
input.CapabilitiesRegistryAddress.Hex(),
510+
)
511+
if cErr != nil {
512+
return nil, errors.Wrap(cErr, "failed to get capabilities registry contract")
513+
}
514+
515+
capReg = &registryWrapper{V2: capRegContract.Contract}
504516
}
505517

506-
capReg, cErr := cre_contracts.GetOwnedContractV2[*capabilities_registry_v2.CapabilitiesRegistry](
507-
input.CldEnv.DataStore.Addresses(),
508-
input.CldEnv.BlockChains.EVMChains()[input.ChainSelector],
509-
input.CapabilitiesRegistryAddress.Hex(),
510-
)
511-
if cErr != nil {
512-
return nil, errors.Wrap(cErr, "failed to get capabilities registry contract")
518+
// TODO: remove this once the race condition is fixed (CRE-2684)
519+
if waitErr := waitForWorkflowWorkersCapabilityRegistrySync(ctx, input); waitErr != nil {
520+
return nil, errors.Wrap(waitErr, "failed waiting for workflow nodes to sync capability registry state")
513521
}
514522

515-
return &registryWrapper{V2: capReg.Contract}, nil
523+
return capReg, nil
516524
}
517525

518526
type DonInfo struct {
Lines changed: 232 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,232 @@
1+
package contracts
2+
3+
import (
4+
"context"
5+
"database/sql"
6+
"encoding/json"
7+
"fmt"
8+
"sort"
9+
"strconv"
10+
"strings"
11+
"sync"
12+
"time"
13+
14+
"github.com/jmoiron/sqlx"
15+
"github.com/pkg/errors"
16+
"golang.org/x/sync/errgroup"
17+
18+
"github.com/smartcontractkit/chainlink-testing-framework/framework/components/postgres"
19+
20+
"github.com/smartcontractkit/chainlink/system-tests/lib/cre"
21+
)
22+
23+
const (
24+
capabilityRegistrySyncPollInterval = 5 * time.Second
25+
capabilityRegistrySyncTimeout = 2 * time.Minute
26+
capabilityRegistrySyncQueryTimeout = 3 * time.Second
27+
capabilityRegistrySyncConcurrency = 4
28+
)
29+
30+
type workflowWorkerTarget struct {
31+
donName string
32+
nodeIndex int
33+
dbPort int
34+
}
35+
36+
type capabilityRegistrySyncState struct {
37+
IDsToDONs map[string]json.RawMessage `json:"IDsToDONs"`
38+
IDsToNodes map[string]json.RawMessage `json:"IDsToNodes"`
39+
IDsToCapabilities map[string]json.RawMessage `json:"IDsToCapabilities"`
40+
}
41+
42+
const latestCapabilityRegistrySyncStateQuery = `
43+
SELECT data
44+
FROM registry_syncer_states
45+
ORDER BY id DESC
46+
LIMIT 1
47+
`
48+
49+
func waitForWorkflowWorkersCapabilityRegistrySync(ctx context.Context, input cre.ConfigureCapabilityRegistryInput) error {
50+
// Waiting for capability registry sync is not supported in Kubernetes mode.
51+
if input.Provider.IsKubernetes() {
52+
return nil
53+
}
54+
targets, tErr := workflowWorkerTargets(input.Topology, input.NodeSets)
55+
if tErr != nil {
56+
return tErr
57+
}
58+
if len(targets) == 0 {
59+
return nil
60+
}
61+
62+
timeoutCtx, cancel := context.WithTimeout(ctx, capabilityRegistrySyncTimeout)
63+
defer cancel()
64+
65+
pending := make(map[string]workflowWorkerTarget, len(targets))
66+
lastState := make(map[string]string, len(targets))
67+
for _, target := range targets {
68+
key := registryTargetKey(target)
69+
pending[key] = target
70+
lastState[key] = "awaiting first successful registry snapshot check"
71+
}
72+
73+
ticker := time.NewTicker(capabilityRegistrySyncPollInterval)
74+
defer ticker.Stop()
75+
76+
for {
77+
type checkResult struct {
78+
key string
79+
ready bool
80+
state string
81+
}
82+
results := make([]checkResult, 0, len(pending))
83+
resultsMu := sync.Mutex{}
84+
eg, egCtx := errgroup.WithContext(timeoutCtx)
85+
eg.SetLimit(capabilityRegistrySyncConcurrency)
86+
for key, target := range pending {
87+
eg.Go(func() error {
88+
ready, state := hasCapabilityRegistrySyncOnWorker(egCtx, target.dbPort, target.nodeIndex)
89+
resultsMu.Lock()
90+
results = append(results, checkResult{
91+
key: key,
92+
ready: ready,
93+
state: state,
94+
})
95+
resultsMu.Unlock()
96+
return nil
97+
})
98+
}
99+
if err := eg.Wait(); err != nil {
100+
return err
101+
}
102+
for _, result := range results {
103+
if result.ready {
104+
delete(pending, result.key)
105+
delete(lastState, result.key)
106+
continue
107+
}
108+
lastState[result.key] = result.state
109+
}
110+
111+
if len(pending) == 0 {
112+
return nil
113+
}
114+
115+
select {
116+
case <-timeoutCtx.Done():
117+
if errors.Is(timeoutCtx.Err(), context.DeadlineExceeded) {
118+
return fmt.Errorf("timed out after %.0f seconds waiting for workflow workers to sync capability registry state: %s", capabilityRegistrySyncTimeout.Seconds(), formatCapabilityRegistrySyncPending(lastState))
119+
}
120+
return timeoutCtx.Err()
121+
case <-ticker.C:
122+
}
123+
}
124+
}
125+
126+
func workflowWorkerTargets(topology *cre.Topology, nodeSets []*cre.NodeSet) ([]workflowWorkerTarget, error) {
127+
if topology == nil || topology.DonsMetadata == nil {
128+
return nil, errors.New("topology metadata cannot be nil")
129+
}
130+
131+
allDons := topology.DonsMetadata.List()
132+
indexByName := make(map[string]int, len(allDons))
133+
for i, don := range allDons {
134+
indexByName[don.Name] = i
135+
}
136+
137+
workflowDons, err := topology.DonsMetadata.WorkflowDONs()
138+
if err != nil {
139+
return nil, errors.Wrap(err, "failed to resolve workflow DONs")
140+
}
141+
142+
targets := make([]workflowWorkerTarget, 0)
143+
for _, workflowDON := range workflowDons {
144+
donIdx, ok := indexByName[workflowDON.Name]
145+
if !ok {
146+
return nil, fmt.Errorf("workflow DON %s not found in topology list", workflowDON.Name)
147+
}
148+
if donIdx >= len(nodeSets) || nodeSets[donIdx] == nil {
149+
return nil, fmt.Errorf("nodeset for workflow DON %s is missing", workflowDON.Name)
150+
}
151+
152+
dbPort := nodeSets[donIdx].DbInput.Port
153+
if dbPort == 0 {
154+
defaultPort, dErr := strconv.Atoi(postgres.Port)
155+
if dErr != nil {
156+
return nil, errors.Wrap(dErr, "failed to convert postgres port to int")
157+
}
158+
dbPort = defaultPort
159+
}
160+
161+
workers, wErr := workflowDON.Workers()
162+
if wErr != nil {
163+
return nil, errors.Wrapf(wErr, "failed to resolve workers for workflow DON %s", workflowDON.Name)
164+
}
165+
166+
for _, worker := range workers {
167+
targets = append(targets, workflowWorkerTarget{
168+
donName: workflowDON.Name,
169+
nodeIndex: worker.Index,
170+
dbPort: dbPort,
171+
})
172+
}
173+
}
174+
175+
return targets, nil
176+
}
177+
178+
func hasCapabilityRegistrySyncOnWorker(ctx context.Context, dbPort, nodeIndex int) (bool, string) {
179+
dsn := fmt.Sprintf("host=%s port=%d user=%s password=%s dbname=%s sslmode=disable connect_timeout=3", "127.0.0.1", dbPort, postgres.User, postgres.Password, fmt.Sprintf("db_%d", nodeIndex))
180+
db, err := sqlx.Open("postgres", dsn)
181+
if err != nil {
182+
return false, fmt.Sprintf("failed to open db connection: %v", err)
183+
}
184+
defer db.Close()
185+
186+
queryCtx, cancel := context.WithTimeout(ctx, capabilityRegistrySyncQueryTimeout)
187+
defer cancel()
188+
189+
var rawData []byte
190+
if err = db.GetContext(queryCtx, &rawData, latestCapabilityRegistrySyncStateQuery); err != nil {
191+
if errors.Is(err, sql.ErrNoRows) {
192+
return false, "registry_syncer_states is empty"
193+
}
194+
return false, fmt.Sprintf("failed to query latest registry syncer state: %v", err)
195+
}
196+
if len(rawData) == 0 {
197+
return false, "latest registry_syncer_states row has empty data payload"
198+
}
199+
200+
var state capabilityRegistrySyncState
201+
if err = json.Unmarshal(rawData, &state); err != nil {
202+
return false, fmt.Sprintf("failed to unmarshal latest registry syncer state payload: %v", err)
203+
}
204+
205+
hasDONs := len(state.IDsToDONs) > 0
206+
hasNodes := len(state.IDsToNodes) > 0
207+
hasCapabilities := len(state.IDsToCapabilities) > 0
208+
if !hasDONs || !hasCapabilities || !hasNodes {
209+
return false, fmt.Sprintf("incomplete registry snapshot (has_dons=%t has_nodes=%t has_capabilities=%t)", hasDONs, hasNodes, hasCapabilities)
210+
}
211+
212+
return true, ""
213+
}
214+
215+
func registryTargetKey(target workflowWorkerTarget) string {
216+
return fmt.Sprintf("%s/%d", target.donName, target.nodeIndex)
217+
}
218+
219+
func formatCapabilityRegistrySyncPending(lastState map[string]string) string {
220+
parts := make([]string, 0, len(lastState))
221+
keys := make([]string, 0, len(lastState))
222+
for target := range lastState {
223+
keys = append(keys, target)
224+
}
225+
sort.Strings(keys)
226+
227+
for _, target := range keys {
228+
reason := lastState[target]
229+
parts = append(parts, fmt.Sprintf("%s (%s)", target, reason))
230+
}
231+
return strings.Join(parts, "; ")
232+
}

0 commit comments

Comments
 (0)