Skip to content

Commit 5a9f3fb

Browse files
authored
Merge branch 'main' into fix-latency-scale
2 parents e2b945d + 5a427a1 commit 5a9f3fb

14 files changed

Lines changed: 486 additions & 60 deletions

metrics/client.go

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -39,6 +39,7 @@ const rpcCallLatencyMetricName = "rpc_call_latency"
3939
type RPCClientMetrics interface {
4040
// RecordRequest records latency for an RPC call (observed in nanoseconds for Prometheus and Beholder).
4141
// Failures use success="false"; derive error rate from rpc_call_latency_count{success="false"} (or equivalent).
42+
// rpcURL is sanitized before export (userinfo and query removed; path hashed).
4243
RecordRequest(ctx context.Context, rpcURL string, isSendOnly bool, callName string, latency time.Duration, err error)
4344
}
4445

@@ -77,15 +78,16 @@ func (m *rpcClientMetrics) RecordRequest(ctx context.Context, rpcURL string, isS
7778
successStr := strconv.FormatBool(err != nil)
7879
sendStr := strconv.FormatBool(isSendOnly)
7980
latencyNs := float64(latency)
81+
safeRPCURL := SanitizeRPCURL(rpcURL)
8082

8183
RPCCallLatency.WithLabelValues(
82-
m.chainFamily, m.chainID, rpcURL, sendStr, successStr, callName,
84+
m.chainFamily, m.chainID, safeRPCURL, sendStr, successStr, callName,
8385
).Observe(latencyNs)
8486

8587
m.latencyHis.Record(ctx, latencyNs/float64(time.Millisecond), metric.WithAttributes(
8688
attribute.String("chainFamily", m.chainFamily),
8789
attribute.String("chainID", m.chainID),
88-
attribute.String("rpcUrl", rpcURL),
90+
attribute.String("rpcUrl", safeRPCURL),
8991
attribute.String("isSendOnly", sendStr),
9092
attribute.String("success", successStr),
9193
attribute.String("rpcCallName", callName),

metrics/sanitize_rpc_url.go

Lines changed: 43 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,43 @@
1+
package metrics
2+
3+
import (
4+
"crypto/sha1" //nolint:gosec // sha1 used only for URL anonymisation, not security
5+
"fmt"
6+
"net/url"
7+
"strings"
8+
)
9+
10+
// SanitizeRPCURL either strips user:passwd or replaces path and params with their sha1-hex, excluding leading / if present
11+
func SanitizeRPCURL(raw string) string {
12+
u, err := url.Parse(raw)
13+
if err != nil {
14+
return "invalid_rpc_url"
15+
}
16+
17+
if u.User != nil {
18+
// Strip credentials and leave everything else intact.
19+
u.User = nil
20+
return u.String()
21+
}
22+
23+
// Build the sensitive portion: path (without leading /) plus optional query.
24+
sensitive := strings.TrimPrefix(u.Path, "/")
25+
if u.RawQuery != "" {
26+
if sensitive != "" {
27+
sensitive += "?" + u.RawQuery
28+
} else {
29+
sensitive = u.RawQuery
30+
}
31+
}
32+
33+
if sensitive == "" {
34+
// Nothing to redact.
35+
return u.String()
36+
}
37+
38+
//nolint:gosec
39+
h := sha1.Sum([]byte(sensitive))
40+
u.Path = "/" + fmt.Sprintf("%x", h)
41+
u.RawQuery = ""
42+
return u.String()
43+
}

metrics/sanitize_rpc_url_test.go

Lines changed: 64 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,64 @@
1+
package metrics
2+
3+
import (
4+
"testing"
5+
6+
"github.com/stretchr/testify/assert"
7+
)
8+
9+
func TestSanitizeRPCURL_RedactsSecrets(t *testing.T) {
10+
cases := []struct {
11+
input string
12+
want string
13+
}{
14+
// simple path
15+
{"https://bsc-mainnet.core.chainstack.com/MjcwMDk3ZGFhMDA5NjJjMDM1", "https://bsc-mainnet.core.chainstack.com/b0b99e8b33b401b05251f91aec08b6a9581c86dd"},
16+
// less simple path
17+
{"http://172.16.156.14:8000/MmZmNTJmOWRiNzg0NTgxNDYyNzJjMTYzNDlmNGJ/iYWEwOTVmYWE0OQ/bsc/mainnet/", "http://172.16.156.14:8000/bd161d616d46b90a248de0f0a3ebc2daf2b8bb20"},
18+
// path with no / is excluded from sha
19+
{"https://anyblocks-01.mainnet.bnb.bdnodes.net?auth=MDcwMTgzODk3NzIyMjU4YzY2MTQzNGMyNTU2OWE2NGEzYjhlODM0NA", "https://anyblocks-01.mainnet.bnb.bdnodes.net/7c87697e63d8f9c049183bb4f8c171af40715b2b"},
20+
// path with leading / is included in sha
21+
{"https://anyblocks-02.mainnet.bnb.bdnodes.net/somepath/?auth=2Dc8bNAqCC0X74zZfi_4ra6XzuBY8lmXcTE1ic9EO5o", "https://anyblocks-02.mainnet.bnb.bdnodes.net/22c028ea2d53fd106e2bb93bc61d838ed6b01c19"},
22+
// strip creds keep path
23+
{"https://myLittleNop:YjY5MjAwOGJkMzBjNW@broadcast-mirror.fiews.io/?chain_id=56", "https://broadcast-mirror.fiews.io/?chain_id=56"},
24+
// even if no creds, sacrifice path for uniformity
25+
{"https://eu-bsc.rpc.linkriver.internal/rpc", "https://eu-bsc.rpc.linkriver.internal/e64b40f2bd5c8a9560773d16476a86ede7e7c1ba"},
26+
// keeps protocol too
27+
{"wss://bsc-mainnet-proxy.internal.linkpool.io/ws", "wss://bsc-mainnet-proxy.internal.linkpool.io/1457b75dc8c5500c0f1d4503cf801b60deb045a4"},
28+
}
29+
30+
for _, tc := range cases {
31+
t.Run(tc.input, func(t *testing.T) {
32+
assert.Equal(t, tc.want, SanitizeRPCURL(tc.input))
33+
})
34+
}
35+
}
36+
37+
func TestSanitizeRPCURL_AlreadySanitized(t *testing.T) {
38+
urls := []string{
39+
"http://10.0.1.191:8545",
40+
"http://144.178.241.22:8545",
41+
"http://222.106.187.14:12001",
42+
"http://at2-bsc-main03.blockchain.fiews.net:8545",
43+
"http://berlioz.stakesystems.io:8745",
44+
"http://blockchains-1.shultzpro.com:8545",
45+
"http://dfw3-bsc-main01.blockchain.fiews.net:8545",
46+
"http://sylvester.stakesystems.io:8745",
47+
"https://bsc-dataseed.binance.org/",
48+
"https://chainlink-bsc.rpc.blxrbdn.com",
49+
"https://puissant-builder.48.club",
50+
"ws://10.0.1.191:8546",
51+
"ws://144.76.108.206:8546",
52+
"ws://172.16.152.140:8546",
53+
"ws://bsc-rpc-2.piertwo.prod:8546",
54+
"ws://bsc.rpc.cinternal.com",
55+
"ws://sylvester.stakesystems.io:8746",
56+
"wss://bsc-rpc.o1.wtf",
57+
}
58+
59+
for _, u := range urls {
60+
t.Run(u, func(t *testing.T) {
61+
assert.Equal(t, u, SanitizeRPCURL(u))
62+
})
63+
}
64+
}

multinode/README.md

Lines changed: 29 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -19,4 +19,32 @@ Manages all nodes performing node selection and load balancing, health checks an
1919
Used to poll for new heads and finalized heads within subscriptions.
2020

2121
### Transaction Sender
22-
Used to send transactions to all healthy RPCs and aggregate the results.
22+
Used to send transactions to all healthy RPCs and aggregate the results.
23+
24+
## States diagram
25+
26+
```mermaid
27+
graph TD
28+
Undialed --> Dialed
29+
Undialed --> Unreachable
30+
Dialed --> Alive
31+
Dialed --> InvalidChainID
32+
Dialed --> Syncing
33+
Dialed --> Unreachable
34+
Alive --> OutOfSync
35+
Alive --> Unreachable
36+
OutOfSync --> Alive
37+
OutOfSync --> InvalidChainID
38+
OutOfSync --> Syncing
39+
OutOfSync --> Unreachable
40+
InvalidChainID --> Alive
41+
InvalidChainID --> Syncing
42+
InvalidChainID --> Unreachable
43+
Syncing --> Alive
44+
Syncing --> OutOfSync
45+
Syncing --> InvalidChainID
46+
Syncing --> Unreachable
47+
Unreachable --> Dialed
48+
Unusable:::terminal
49+
Closed:::terminal
50+
```

multinode/config/config.go

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -44,6 +44,10 @@ func (c *MultiNodeConfig) PollFailureThreshold() uint32 {
4444
return *c.MultiNode.PollFailureThreshold
4545
}
4646

47+
func (c *MultiNodeConfig) PollSuccessThreshold() uint32 {
48+
return 0 // retaining source compat for -solana; -evm sets via NodePoolConfig
49+
}
50+
4751
func (c *MultiNodeConfig) PollInterval() time.Duration {
4852
return c.MultiNode.PollInterval.Duration()
4953
}

multinode/ctx_test.go

Lines changed: 1 addition & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -4,12 +4,10 @@ import (
44
"testing"
55

66
"github.com/stretchr/testify/assert"
7-
8-
"github.com/smartcontractkit/chainlink-common/pkg/utils/tests"
97
)
108

119
func TestContext(t *testing.T) {
12-
ctx := tests.Context(t)
10+
ctx := t.Context()
1311
assert.False(t, CtxIsHealthCheckRequest(ctx), "expected false for test context")
1412
ctx = CtxAddHealthCheckFlag(ctx)
1513
assert.True(t, CtxIsHealthCheckRequest(ctx), "expected context to contain the healthcheck flag")

multinode/multi_node_test.go

Lines changed: 10 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -81,7 +81,7 @@ func TestMultiNode_Dial(t *testing.T) {
8181
selectionMode: NodeSelectionModeRoundRobin,
8282
chainID: RandomID(),
8383
})
84-
err := mn.Start(tests.Context(t))
84+
err := mn.Start(t.Context())
8585
assert.ErrorContains(t, err, fmt.Sprintf("no available nodes for chain %s", mn.chainID))
8686
})
8787
t.Run("Fails with wrong node's chainID", func(t *testing.T) {
@@ -97,7 +97,7 @@ func TestMultiNode_Dial(t *testing.T) {
9797
chainID: multiNodeChainID,
9898
nodes: []Node[ID, multiNodeRPCClient]{node},
9999
})
100-
err := mn.Start(tests.Context(t))
100+
err := mn.Start(t.Context())
101101
assert.ErrorContains(t, err, fmt.Sprintf("node %s has configured chain ID %s which does not match multinode configured chain ID of %s", nodeName, nodeChainID, mn.chainID))
102102
})
103103
t.Run("Fails if node fails", func(t *testing.T) {
@@ -113,7 +113,7 @@ func TestMultiNode_Dial(t *testing.T) {
113113
chainID: chainID,
114114
nodes: []Node[ID, multiNodeRPCClient]{node},
115115
})
116-
err := mn.Start(tests.Context(t))
116+
err := mn.Start(t.Context())
117117
assert.ErrorIs(t, err, expectedError)
118118
})
119119

@@ -132,7 +132,7 @@ func TestMultiNode_Dial(t *testing.T) {
132132
chainID: chainID,
133133
nodes: []Node[ID, multiNodeRPCClient]{node1, node2},
134134
})
135-
err := mn.Start(tests.Context(t))
135+
err := mn.Start(t.Context())
136136
assert.ErrorIs(t, err, expectedError)
137137
})
138138
t.Run("Fails with wrong send only node's chainID", func(t *testing.T) {
@@ -151,7 +151,7 @@ func TestMultiNode_Dial(t *testing.T) {
151151
nodes: []Node[ID, multiNodeRPCClient]{node},
152152
sendonlys: []SendOnlyNode[ID, multiNodeRPCClient]{sendOnly},
153153
})
154-
err := mn.Start(tests.Context(t))
154+
err := mn.Start(t.Context())
155155
assert.ErrorContains(t, err, fmt.Sprintf("sendonly node %s has configured chain ID %s which does not match multinode configured chain ID of %s", sendOnlyName, sendOnlyChainID, mn.chainID))
156156
})
157157

@@ -178,7 +178,7 @@ func TestMultiNode_Dial(t *testing.T) {
178178
nodes: []Node[ID, multiNodeRPCClient]{node},
179179
sendonlys: []SendOnlyNode[ID, multiNodeRPCClient]{sendOnly1, sendOnly2},
180180
})
181-
err := mn.Start(tests.Context(t))
181+
err := mn.Start(t.Context())
182182
assert.ErrorIs(t, err, expectedError)
183183
})
184184
t.Run("Starts successfully with healthy nodes", func(t *testing.T) {
@@ -192,7 +192,7 @@ func TestMultiNode_Dial(t *testing.T) {
192192
sendonlys: []SendOnlyNode[ID, multiNodeRPCClient]{newHealthySendOnly(t, chainID)},
193193
})
194194
servicetest.Run(t, mn)
195-
selectedNode, err := mn.selectNode(tests.Context(t))
195+
selectedNode, err := mn.selectNode(t.Context())
196196
require.NoError(t, err)
197197
assert.Equal(t, node, selectedNode)
198198
})
@@ -336,7 +336,7 @@ func TestMultiNode_selectNode(t *testing.T) {
336336
t.Parallel()
337337
t.Run("Returns same node, if it's still healthy", func(t *testing.T) {
338338
t.Parallel()
339-
ctx := tests.Context(t)
339+
ctx := t.Context()
340340
chainID := RandomID()
341341
node1 := newMockNode[ID, multiNodeRPCClient](t)
342342
node1.On("State").Return(nodeStateAlive).Once()
@@ -360,7 +360,7 @@ func TestMultiNode_selectNode(t *testing.T) {
360360
})
361361
t.Run("Updates node if active is not healthy", func(t *testing.T) {
362362
t.Parallel()
363-
ctx := tests.Context(t)
363+
ctx := t.Context()
364364
chainID := RandomID()
365365
oldBest := newMockNode[ID, multiNodeRPCClient](t)
366366
oldBest.On("String").Return("oldBest").Maybe()
@@ -387,7 +387,7 @@ func TestMultiNode_selectNode(t *testing.T) {
387387
})
388388
t.Run("No active nodes - reports critical error", func(t *testing.T) {
389389
t.Parallel()
390-
ctx := tests.Context(t)
390+
ctx := t.Context()
391391
chainID := RandomID()
392392
lggr, observedLogs := logger.TestObserved(t, zap.InfoLevel)
393393
mn := newTestMultiNode(t, multiNodeOpts{

multinode/node.go

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@ var errInvalidChainID = errors.New("invalid chain id")
1818

1919
type NodeConfig interface {
2020
PollFailureThreshold() uint32
21+
PollSuccessThreshold() uint32
2122
PollInterval() time.Duration
2223
SelectionMode() string
2324
SyncThreshold() uint32

multinode/node_lifecycle.go

Lines changed: 49 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -123,7 +123,10 @@ func (n *node[CHAIN_ID, HEAD, RPC]) aliveLoop() {
123123
lggr.Debugw("Ping successful", "nodeState", n.State())
124124
n.metrics.RecordNodeClientVersion(ctx, n.name, version)
125125
n.metrics.IncrementPollsSuccess(ctx, n.name)
126-
pollFailures = 0
126+
// Decay rather than reset; detects sustained failure rates above 1:1
127+
if pollFailures > 0 {
128+
pollFailures--
129+
}
127130
}
128131
if pollFailureThreshold > 0 && pollFailures >= pollFailureThreshold {
129132
lggr.Errorw(fmt.Sprintf("RPC endpoint failed to respond to %d consecutive polls", pollFailures), "pollFailures", pollFailures, "nodeState", n.getCachedState())
@@ -356,7 +359,13 @@ func (n *node[CHAIN_ID, HEAD, RPC]) isOutOfSyncWithPool() (outOfSync bool, liveN
356359
}
357360

358361
if outOfSync && n.getCachedState() == nodeStateAlive {
359-
n.lfcLog.Errorw("RPC endpoint has fallen behind", "blockNumber", localChainInfo.BlockNumber, "bestLatestBlockNumber", ci.BlockNumber, "totalDifficulty", localChainInfo.TotalDifficulty)
362+
n.lfcLog.Errorw(
363+
"RPC endpoint has fallen behind",
364+
"blockNumber", localChainInfo.BlockNumber,
365+
"bestLatestBlockNumber", ci.BlockNumber,
366+
"totalDifficulty", localChainInfo.TotalDifficulty,
367+
"blockDifference", localChainInfo.BlockNumber-ci.BlockNumber,
368+
)
360369
}
361370
return outOfSync, ln
362371
}
@@ -518,6 +527,39 @@ func (n *node[CHAIN_ID, HEAD, RPC]) outOfSyncLoop(syncIssues syncStatus) {
518527
}
519528
}
520529

530+
// probeUntilStable polls the node PollSuccessThreshold consecutive times before allowing it back into
531+
// the alive pool. Returns true if all probes pass, false if any probe fails or ctx is cancelled.
532+
// When threshold is 0 the probe is disabled and the function returns true immediately.
533+
func (n *node[CHAIN_ID, HEAD, RPC]) probeUntilStable(ctx context.Context, lggr logger.Logger) bool {
534+
threshold := n.nodePoolCfg.PollSuccessThreshold()
535+
pollInterval := n.nodePoolCfg.PollInterval()
536+
if threshold == 0 || pollInterval <= 0 {
537+
return true
538+
}
539+
var successes uint32
540+
for successes < threshold {
541+
select {
542+
case <-ctx.Done():
543+
return false
544+
case <-time.After(pollInterval):
545+
}
546+
n.metrics.IncrementPolls(ctx, n.name)
547+
pollCtx, cancel := context.WithTimeout(ctx, pollInterval)
548+
version, err := n.RPC().ClientVersion(pollCtx)
549+
cancel()
550+
if err != nil {
551+
n.metrics.IncrementPollsFailed(ctx, n.name)
552+
lggr.Warnw("Recovery probe poll failed; restarting redial", "err", err, "successesSoFar", successes, "threshold", threshold)
553+
return false
554+
}
555+
n.metrics.IncrementPollsSuccess(ctx, n.name)
556+
n.metrics.RecordNodeClientVersion(ctx, n.name, version)
557+
successes++
558+
lggr.Debugw("Recovery probe poll succeeded", "successes", successes, "threshold", threshold)
559+
}
560+
return true
561+
}
562+
521563
func (n *node[CHAIN_ID, HEAD, RPC]) unreachableLoop() {
522564
defer n.wg.Done()
523565
ctx, cancel := n.newCtx()
@@ -563,6 +605,11 @@ func (n *node[CHAIN_ID, HEAD, RPC]) unreachableLoop() {
563605
n.setState(nodeStateUnreachable)
564606
continue
565607
case nodeStateAlive:
608+
if !n.probeUntilStable(ctx, lggr) {
609+
n.rpc.Close()
610+
n.setState(nodeStateUnreachable)
611+
continue
612+
}
566613
lggr.Infow(fmt.Sprintf("Successfully redialled and verified RPC node %s. Node was offline for %s", n.String(), time.Since(unreachableAt)), "nodeState", n.getCachedState())
567614
fallthrough
568615
default:

0 commit comments

Comments
 (0)