Skip to content

Commit 5b7d6a6

Browse files
committed
CCIP - re-sync chain accessor on gRPC client refresh
1 parent 61182c2 commit 5b7d6a6

5 files changed

Lines changed: 124 additions & 21 deletions

File tree

pkg/loop/internal/net/broker.go

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -83,6 +83,16 @@ func (b *BrokerExt) NewClientConn(name string, newClient newClientFn) *clientCon
8383
}
8484
}
8585

86+
// NewClientConnWithCallback return a new *clientConn backed by this *BrokerExt with a refresh callback.
87+
func (b *BrokerExt) NewClientConnWithCallback(name string, newClient newClientFn, refreshCallback RefreshCallback) *clientConn {
88+
return &clientConn{
89+
BrokerExt: b.WithName(name),
90+
newClient: newClient,
91+
name: name,
92+
refreshCallback: refreshCallback,
93+
}
94+
}
95+
8696
func (b *BrokerExt) StopCtx() (context.Context, context.CancelFunc) {
8797
return utils.ContextFromChan(b.StopCh)
8898
}

pkg/loop/internal/net/client.go

Lines changed: 23 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -35,6 +35,10 @@ var _ grpc.ClientConnInterface = (*clientConn)(nil)
3535
// newClientFn returns a new client connection id to dial, and a set of Resource dependencies to close.
3636
type newClientFn func(context.Context) (id uint32, deps Resources, err error)
3737

38+
// RefreshCallback is called after a successful refresh, but before the connection is made available.
39+
// This allows clients to perform state management operations before the refreshed client is used again.
40+
type RefreshCallback func(ctx context.Context) error
41+
3842
// clientConn is a [grpc.ClientConnInterface] backed by a [*grpc.ClientConn] which can be recreated and swapped out
3943
// via the provided [newClientFn].
4044
// New instances should be created via BrokerExt.NewClientConn.
@@ -43,9 +47,10 @@ type clientConn struct {
4347
newClient newClientFn
4448
name string
4549

46-
mu sync.RWMutex
47-
deps Resources
48-
cc *grpc.ClientConn
50+
mu sync.RWMutex
51+
deps Resources
52+
cc *grpc.ClientConn
53+
refreshCallback RefreshCallback
4954
}
5055

5156
func (c *clientConn) Invoke(ctx context.Context, method string, args interface{}, reply interface{}, opts ...grpc.CallOption) error {
@@ -93,6 +98,14 @@ func (c *clientConn) NewStream(ctx context.Context, desc *grpc.StreamDesc, metho
9398
return nil, context.Cause(ctx)
9499
}
95100

101+
// SetRefreshCallback registers a callback to be invoked after a successful refresh,
102+
// but before the connection is made available for use.
103+
func (c *clientConn) SetRefreshCallback(callback RefreshCallback) {
104+
c.mu.Lock()
105+
defer c.mu.Unlock()
106+
c.refreshCallback = callback
107+
}
108+
96109
// refresh replaces c.cc with a new (different from orig) *grpc.ClientConn, and returns it as well.
97110
// It will block until a new connection is successfully dialed, or return nil if the context expires.
98111
func (c *clientConn) refresh(ctx context.Context, orig *grpc.ClientConn) *grpc.ClientConn {
@@ -150,5 +163,12 @@ func (c *clientConn) refresh(ctx context.Context, orig *grpc.ClientConn) *grpc.C
150163
}
151164
}
152165

166+
if c.refreshCallback != nil {
167+
if err := c.refreshCallback(ctx); err != nil {
168+
// Log error but make the refreshed connection available anyway.
169+
c.Logger.Errorw("Refresh callback failed", "err", err)
170+
}
171+
}
172+
153173
return c.cc
154174
}

pkg/loop/internal/relayer/pluginprovider/ext/ccipocr3/ccip_provider.go

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,8 @@
11
package ccipocr3
22

33
import (
4+
"context"
5+
46
"google.golang.org/grpc"
57

68
"github.com/smartcontractkit/libocr/offchainreporting2plus/ocr3types"
@@ -70,6 +72,13 @@ func (p *CCIPProviderClient) Codec() ccipocr3.Codec {
7072
}
7173
}
7274

75+
func (p *CCIPProviderClient) RefreshChainAccessor(ctx context.Context) error {
76+
if refresher, ok := p.chainAccessor.(*chainAccessorClient); ok {
77+
return refresher.Refresh(ctx)
78+
}
79+
return nil
80+
}
81+
7382
// Server implementation
7483
type CCIPProviderServer struct{}
7584

pkg/loop/internal/relayer/pluginprovider/ext/ccipocr3/chainaccessor.go

Lines changed: 51 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@ package ccipocr3
22

33
import (
44
"context"
5+
"sync"
56
"time"
67

78
"google.golang.org/grpc"
@@ -19,12 +20,17 @@ var _ ccipocr3.ChainAccessor = (*chainAccessorClient)(nil)
1920
type chainAccessorClient struct {
2021
*net.BrokerExt
2122
grpc ccipocr3pb.ChainAccessorClient
23+
24+
// Local persistence for refresh functionality
25+
mu sync.RWMutex
26+
syncedContracts map[string]ccipocr3.UnknownAddress // contractName -> contractAddress
2227
}
2328

2429
func NewChainAccessorClient(broker *net.BrokerExt, cc grpc.ClientConnInterface) ccipocr3.ChainAccessor {
2530
return &chainAccessorClient{
26-
BrokerExt: broker,
27-
grpc: ccipocr3pb.NewChainAccessorClient(cc),
31+
BrokerExt: broker,
32+
grpc: ccipocr3pb.NewChainAccessorClient(cc),
33+
syncedContracts: make(map[string]ccipocr3.UnknownAddress),
2834
}
2935
}
3036

@@ -82,9 +88,52 @@ func (c *chainAccessorClient) Sync(ctx context.Context, contractName string, con
8288
ContractName: contractName,
8389
ContractAddress: contractAddress,
8490
})
91+
92+
if err == nil {
93+
// Persist the synced contract locally for client refresh
94+
c.mu.Lock()
95+
c.syncedContracts[contractName] = contractAddress
96+
c.mu.Unlock()
97+
c.Logger.Debugw("Persisted synced contract", "contractName", contractName, "contractAddress", contractAddress)
98+
}
99+
85100
return err
86101
}
87102

103+
// Refresh re-syncs all previously synced contracts after a connection refresh.
104+
func (c *chainAccessorClient) Refresh(ctx context.Context) error {
105+
c.mu.RLock()
106+
contracts := make(map[string]ccipocr3.UnknownAddress)
107+
for name, addr := range c.syncedContracts {
108+
contracts[name] = addr
109+
}
110+
c.mu.RUnlock()
111+
112+
if len(contracts) == 0 {
113+
c.Logger.Debug("No previously synced contracts to refresh")
114+
return nil
115+
}
116+
117+
c.Logger.Infow("Refreshing synced contracts", "count", len(contracts))
118+
119+
// Re-sync all previously synced contracts
120+
for contractName, contractAddress := range contracts {
121+
if err := c.Sync(ctx, contractName, contractAddress); err != nil {
122+
c.Logger.Errorw("Failed to refresh contract sync",
123+
"contractName", contractName,
124+
"contractAddress", contractAddress,
125+
"err", err)
126+
// Continue refreshing other contracts even if one fails
127+
} else {
128+
c.Logger.Debugw("Successfully refreshed contract sync",
129+
"contractName", contractName,
130+
"contractAddress", contractAddress)
131+
}
132+
}
133+
134+
return nil
135+
}
136+
88137
// DestinationAccessor methods
89138
func (c *chainAccessorClient) CommitReportsGTETimestamp(
90139
ctx context.Context,

pkg/loop/internal/relayer/relayer.go

Lines changed: 31 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -289,23 +289,38 @@ func (r *relayerClient) NewLLOProvider(ctx context.Context, rargs types.RelayArg
289289
}
290290

291291
func (r *relayerClient) NewCCIPProvider(ctx context.Context, cargs types.CCIPProviderArgs) (types.CCIPProvider, error) {
292-
cc := r.NewClientConn("CCIPProvider", func(ctx context.Context) (uint32, net.Resources, error) {
293-
reply, err := r.relayer.NewCCIPProvider(ctx, &pb.NewCCIPProviderRequest{
294-
CcipProviderArgs: &pb.CCIPProviderArgs{
295-
ExternalJobID: cargs.ExternalJobID[:],
296-
ContractReaderConfig: cargs.ContractReaderConfig,
297-
ChainWriterConfig: cargs.ChainWriterConfig,
298-
OffRampAddress: cargs.OffRampAddress,
299-
PluginType: cargs.PluginType,
300-
},
301-
})
302-
if err != nil {
303-
return 0, nil, err
304-
}
305-
return reply.CcipProviderID, nil, nil
306-
})
292+
var provider types.CCIPProvider
293+
294+
// Create client connection with refresh callback
295+
clientConn := r.NewClientConnWithCallback("CCIPProvider",
296+
func(ctx context.Context) (uint32, net.Resources, error) {
297+
reply, err := r.relayer.NewCCIPProvider(ctx, &pb.NewCCIPProviderRequest{
298+
CcipProviderArgs: &pb.CCIPProviderArgs{
299+
ExternalJobID: cargs.ExternalJobID[:],
300+
ContractReaderConfig: cargs.ContractReaderConfig,
301+
ChainWriterConfig: cargs.ChainWriterConfig,
302+
OffRampAddress: cargs.OffRampAddress,
303+
PluginType: cargs.PluginType,
304+
},
305+
})
306+
if err != nil {
307+
return 0, nil, err
308+
}
309+
return reply.CcipProviderID, nil, nil
310+
},
311+
func(ctx context.Context) error {
312+
r.Logger.Info("CCIP Provider connection refreshed - refreshing chain accessor contracts")
313+
if ccipProvider, ok := provider.(*ccipocr3.CCIPProviderClient); ok {
314+
return ccipProvider.RefreshChainAccessor(ctx)
315+
}
316+
return nil
317+
},
318+
)
319+
320+
// Create the CCIP provider client
321+
provider = ccipocr3.NewCCIPProviderClient(r.WithName(cargs.ExternalJobID.String()).WithName("CCIPProviderClient"), clientConn)
307322

308-
return ccipocr3.NewCCIPProviderClient(r.WithName(cargs.ExternalJobID.String()).WithName("CCIPProviderClient"), cc), nil
323+
return provider, nil
309324
}
310325

311326
func (r *relayerClient) LatestHead(ctx context.Context) (types.Head, error) {

0 commit comments

Comments
 (0)