-
Notifications
You must be signed in to change notification settings - Fork 2k
Expand file tree
/
Copy pathclient.go
More file actions
320 lines (275 loc) · 10 KB
/
client.go
File metadata and controls
320 lines (275 loc) · 10 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
package executable
import (
"context"
"errors"
"fmt"
"sync"
"sync/atomic"
"time"
commoncap "github.com/smartcontractkit/chainlink-common/pkg/capabilities"
"github.com/smartcontractkit/chainlink-common/pkg/capabilities/pb"
"github.com/smartcontractkit/chainlink-common/pkg/logger"
"github.com/smartcontractkit/chainlink-common/pkg/services"
"github.com/smartcontractkit/chainlink/v2/core/capabilities/remote"
"github.com/smartcontractkit/chainlink/v2/core/capabilities/remote/executable/request"
"github.com/smartcontractkit/chainlink/v2/core/capabilities/remote/types"
"github.com/smartcontractkit/chainlink/v2/core/capabilities/transmission"
)
// client is a shim for remote executable capabilities.
// It translates between capability API calls and network messages.
// Its responsibilities are:
// 1. Transmit capability requests to remote nodes according to a transmission schedule
// 2. Aggregate responses from remote nodes and return the aggregated response
//
// client communicates with corresponding server on remote nodes.
type client struct {
services.StateMachine
capabilityID string
capMethodName string
dispatcher types.Dispatcher
cfg atomic.Pointer[dynamicConfig]
lggr logger.Logger
requestIDToCallerRequest map[string]*request.ClientRequest
mutex sync.Mutex
stopCh services.StopChan
wg sync.WaitGroup
}
type dynamicConfig struct {
remoteCapabilityInfo commoncap.CapabilityInfo
localDONInfo commoncap.DON
requestTimeout time.Duration
// Has to be set only for V2 capabilities. V1 capabilities read transmission schedule from every request.
transmissionConfig *transmission.TransmissionConfig
// Has to be set only for V2 capabilities using OCR.
signers [][]byte
}
type Client interface {
commoncap.ExecutableCapability
Receive(ctx context.Context, msg *types.MessageBody)
SetConfig(remoteCapabilityInfo commoncap.CapabilityInfo, localDonInfo commoncap.DON, requestTimeout time.Duration, transmissionConfig *transmission.TransmissionConfig, signers [][]byte) error
}
var _ Client = &client{}
var _ types.Receiver = &client{}
var _ services.Service = &client{}
const defaultExpiryCheckInterval = 30 * time.Second
var (
ErrRequestExpired = errors.New("request expired by executable client")
ErrContextDoneBeforeResponseQuorum = errors.New("context done before remote client received a quorum of responses")
)
func NewClient(capabilityID string, capMethodName string, dispatcher types.Dispatcher, lggr logger.Logger) *client {
return &client{
capabilityID: capabilityID,
capMethodName: capMethodName,
dispatcher: dispatcher,
lggr: logger.With(logger.Named(lggr, "ExecutableCapabilityClient"), "capabilityID", capabilityID, "capMethodName", capMethodName),
requestIDToCallerRequest: make(map[string]*request.ClientRequest),
stopCh: make(services.StopChan),
}
}
// SetConfig sets the remote capability configuration dynamically
// TransmissionConfig has to be set only for V2 capabilities. V1 capabilities read transmission schedule from every request.
func (c *client) SetConfig(remoteCapabilityInfo commoncap.CapabilityInfo, localDonInfo commoncap.DON, requestTimeout time.Duration, transmissionConfig *transmission.TransmissionConfig, signers [][]byte) error {
if remoteCapabilityInfo.ID == "" || remoteCapabilityInfo.ID != c.capabilityID {
return fmt.Errorf("capability info provided does not match the client's capabilityID: %s != %s", remoteCapabilityInfo.ID, c.capabilityID)
}
if remoteCapabilityInfo.DON == nil {
return errors.New("remote capability info missing DON")
}
if len(localDonInfo.Members) == 0 {
return errors.New("empty localDonInfo provided")
}
if requestTimeout <= 0 {
return errors.New("requestTimeout must be positive")
}
// always replace the whole dynamicConfig object to avoid inconsistent state
c.cfg.Store(&dynamicConfig{
remoteCapabilityInfo: remoteCapabilityInfo,
localDONInfo: localDonInfo,
requestTimeout: requestTimeout,
transmissionConfig: transmissionConfig,
signers: signers,
})
c.lggr.Infow("SetConfig", "remoteDONName", remoteCapabilityInfo.DON.Name, "remoteDONID", remoteCapabilityInfo.DON.ID, "requestTimeout", requestTimeout, "transmissionConfig", transmissionConfig)
return nil
}
func (c *client) Start(ctx context.Context) error {
return c.StartOnce(c.Name(), func() error {
cfg := c.cfg.Load()
// Validate that all required fields are set before starting
if cfg == nil {
return errors.New("config not set - call SetConfig() before Start()")
}
if cfg.remoteCapabilityInfo.ID == "" {
return errors.New("remote capability info not set - call SetConfig() before Start()")
}
if len(cfg.localDONInfo.Members) == 0 {
return errors.New("local DON info not set - call SetConfig() before Start()")
}
if cfg.requestTimeout <= 0 {
return errors.New("request timeout not set - call SetConfig() before Start()")
}
if c.dispatcher == nil {
return errors.New("dispatcher set to nil, cannot start client")
}
c.wg.Add(1)
go func() {
defer c.wg.Done()
c.checkForExpiredRequests()
}()
c.wg.Add(1)
go func() {
defer c.wg.Done()
c.checkDispatcherReady()
}()
c.lggr.Info("ExecutableCapabilityClient started")
return nil
})
}
func (c *client) Close() error {
return c.StopOnce(c.Name(), func() error {
close(c.stopCh)
c.cancelAllRequests(errors.New("client closed"))
c.wg.Wait()
c.lggr.Info("ExecutableCapabilityClient closed")
return nil
})
}
func (c *client) checkDispatcherReady() {
ticker := time.NewTicker(1 * time.Second)
defer ticker.Stop()
for {
select {
case <-c.stopCh:
return
case <-ticker.C:
if err := c.dispatcher.Ready(); err != nil {
c.cancelAllRequests(fmt.Errorf("dispatcher not ready: %w", err))
}
}
}
}
func (c *client) checkForExpiredRequests() {
ticker := time.NewTicker(getClientTickerInterval(c.cfg.Load()))
defer ticker.Stop()
for {
select {
case <-c.stopCh:
return
case <-ticker.C:
ticker.Reset(getClientTickerInterval(c.cfg.Load()))
c.expireRequests()
}
}
}
func getClientTickerInterval(cfg *dynamicConfig) time.Duration {
if cfg != nil && cfg.requestTimeout > 0 {
return cfg.requestTimeout
}
return defaultExpiryCheckInterval
}
func (c *client) expireRequests() {
c.mutex.Lock()
defer c.mutex.Unlock()
for messageID, req := range c.requestIDToCallerRequest {
if req.Expired() {
req.Cancel(ErrRequestExpired)
delete(c.requestIDToCallerRequest, messageID)
}
if c.dispatcher.Ready() != nil {
c.cancelAllRequests(errors.New("dispatcher not ready"))
return
}
}
}
func (c *client) cancelAllRequests(err error) {
c.mutex.Lock()
defer c.mutex.Unlock()
for _, req := range c.requestIDToCallerRequest {
req.Cancel(err)
}
}
func (c *client) Info(ctx context.Context) (commoncap.CapabilityInfo, error) {
cfg := c.cfg.Load()
if cfg == nil {
return commoncap.CapabilityInfo{}, errors.New("config not set - call SetConfig() before Info()")
}
return cfg.remoteCapabilityInfo, nil
}
func (c *client) RegisterToWorkflow(ctx context.Context, registerRequest commoncap.RegisterToWorkflowRequest) error {
return nil
}
func (c *client) UnregisterFromWorkflow(ctx context.Context, unregisterRequest commoncap.UnregisterFromWorkflowRequest) error {
return nil
}
func (c *client) Execute(ctx context.Context, capReq commoncap.CapabilityRequest) (commoncap.CapabilityResponse, error) {
cfg := c.cfg.Load()
if cfg == nil {
return commoncap.CapabilityResponse{}, errors.New("config not set - call SetConfig() before Execute()")
}
req, err := request.NewClientExecuteRequest(ctx, c.lggr, capReq, cfg.remoteCapabilityInfo, cfg.localDONInfo, c.dispatcher,
cfg.requestTimeout, cfg.transmissionConfig, c.capMethodName, cfg.signers)
if err != nil {
return commoncap.CapabilityResponse{}, fmt.Errorf("failed to create client request: %w", err)
}
c.lggr.Debugw("created new client request", "requestID", req.ID())
if err = c.storeRequest(req); err != nil {
return commoncap.CapabilityResponse{}, fmt.Errorf("failed to store request: %w", err)
}
var respResult []byte
var respErr error
select {
case resp := <-req.ResponseChan():
respResult = resp.Result
respErr = resp.Err
case <-ctx.Done():
// NOTE: ClientRequest will not block on sending to ResponseChan() because that channel is buffered (with size 1)
return commoncap.CapabilityResponse{}, errors.Join(ErrContextDoneBeforeResponseQuorum, ctx.Err())
}
if respErr != nil {
return commoncap.CapabilityResponse{}, fmt.Errorf("error executing request: %w", respErr)
}
capabilityResponse, err := pb.UnmarshalCapabilityResponse(respResult)
if err != nil {
return commoncap.CapabilityResponse{}, fmt.Errorf("failed to unmarshal capability response: %w", err)
}
if c.cfg.Load() != nil {
capabilityResponse.Metadata.CapDON_N = uint32(len(c.cfg.Load().localDONInfo.Members)) //nolint:gosec // G115
}
return capabilityResponse, nil
}
func (c *client) storeRequest(req *request.ClientRequest) error {
c.mutex.Lock()
defer c.mutex.Unlock()
if _, ok := c.requestIDToCallerRequest[req.ID()]; ok {
return fmt.Errorf("request for ID %s already exists", req.ID())
}
c.requestIDToCallerRequest[req.ID()] = req
return nil
}
func (c *client) Receive(ctx context.Context, msg *types.MessageBody) {
c.mutex.Lock()
defer c.mutex.Unlock()
messageID, err := GetMessageID(msg)
if err != nil {
c.lggr.Errorw("invalid message ID", "err", err, "id", remote.SanitizeLogString(string(msg.MessageId)))
return
}
c.lggr.Debugw("Remote client executable receiving message", "messageID", messageID)
req := c.requestIDToCallerRequest[messageID]
if req == nil {
c.lggr.Warnw("received response for unknown message ID ", "messageID", messageID)
return
}
if err := req.OnMessage(ctx, msg); err != nil {
c.lggr.Errorw("failed to add response to request", "messageID", messageID, "err", err)
}
}
func (c *client) Ready() error {
return nil
}
func (c *client) HealthReport() map[string]error {
return nil
}
func (c *client) Name() string {
return c.lggr.Name()
}