-
Notifications
You must be signed in to change notification settings - Fork 268
Expand file tree
/
Copy pathclient.go
More file actions
273 lines (242 loc) · 11.2 KB
/
Copy pathclient.go
File metadata and controls
273 lines (242 loc) · 11.2 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
package jsonrpc
import (
"context"
"encoding/hex"
"fmt"
"net/http"
"strings"
"github.com/filecoin-project/go-jsonrpc"
logging "github.com/ipfs/go-log/v2"
"github.com/evstack/ev-node/core/da"
internal "github.com/evstack/ev-node/da/jsonrpc/internal"
)
//go:generate mockgen -destination=mocks/api.go -package=mocks . Module
type Module interface {
da.DA
}
// API defines the jsonrpc service module API
type API struct {
Logger logging.EventLogger
Namespace []byte
MaxBlobSize uint64
Internal struct {
Get func(ctx context.Context, ids []da.ID, ns []byte) ([]da.Blob, error) `perm:"read"`
GetIDs func(ctx context.Context, height uint64, ns []byte) (*da.GetIDsResult, error) `perm:"read"`
GetProofs func(ctx context.Context, ids []da.ID, ns []byte) ([]da.Proof, error) `perm:"read"`
Commit func(ctx context.Context, blobs []da.Blob, ns []byte) ([]da.Commitment, error) `perm:"read"`
Validate func(context.Context, []da.ID, []da.Proof, []byte) ([]bool, error) `perm:"read"`
Submit func(context.Context, []da.Blob, float64, []byte) ([]da.ID, error) `perm:"write"`
SubmitWithOptions func(context.Context, []da.Blob, float64, []byte, []byte) ([]da.ID, error) `perm:"write"`
GasMultiplier func(context.Context) (float64, error) `perm:"read"`
GasPrice func(context.Context) (float64, error) `perm:"read"`
}
}
// Get returns Blob for each given ID, or an error.
func (api *API) Get(ctx context.Context, ids []da.ID, _ []byte) ([]da.Blob, error) {
api.Logger.Debug("Making RPC call", "method", "Get", "num_ids", len(ids), "namespace", string(api.Namespace))
res, err := api.Internal.Get(ctx, ids, api.Namespace)
if err != nil {
if strings.Contains(err.Error(), context.Canceled.Error()) {
api.Logger.Debug("RPC call canceled due to context cancellation", "method", "Get")
return res, context.Canceled
}
api.Logger.Error("RPC call failed", "method", "Get", "error", err)
// Wrap error for context, potentially using the translated error from the RPC library
return nil, fmt.Errorf("failed to get blobs: %w", err)
}
api.Logger.Debug("RPC call successful", "method", "Get", "num_blobs_returned", len(res))
return res, nil
}
// GetIDs returns IDs of all Blobs located in DA at given height.
func (api *API) GetIDs(ctx context.Context, height uint64, _ []byte) (*da.GetIDsResult, error) {
api.Logger.Debug("Making RPC call", "method", "GetIDs", "height", height, "namespace", string(api.Namespace))
res, err := api.Internal.GetIDs(ctx, height, api.Namespace)
if err != nil {
// Using strings.contains since JSON RPC serialization doesn't preserve error wrapping
// Check if the error is specifically BlobNotFound, otherwise log and return
if strings.Contains(err.Error(), da.ErrBlobNotFound.Error()) { // Use the error variable directly
api.Logger.Debug("RPC call indicates blobs not found", "method", "GetIDs", "height", height)
return nil, err // Return the specific ErrBlobNotFound
}
if strings.Contains(err.Error(), da.ErrHeightFromFuture.Error()) {
api.Logger.Debug("RPC call indicates height from future", "method", "GetIDs", "height", height)
return nil, err // Return the specific ErrHeightFromFuture
}
if strings.Contains(err.Error(), context.Canceled.Error()) {
api.Logger.Debug("RPC call canceled due to context cancellation", "method", "GetIDs")
return res, context.Canceled
}
api.Logger.Error("RPC call failed", "method", "GetIDs", "error", err)
return nil, err
}
// Handle cases where the RPC call succeeds but returns no IDs
if res == nil || len(res.IDs) == 0 {
api.Logger.Debug("RPC call successful but no IDs found", "method", "GetIDs", "height", height)
return nil, da.ErrBlobNotFound // Return specific error for not found (use variable directly)
}
api.Logger.Debug("RPC call successful", "method", "GetIDs")
return res, nil
}
// GetProofs returns inclusion Proofs for Blobs specified by their IDs.
func (api *API) GetProofs(ctx context.Context, ids []da.ID, _ []byte) ([]da.Proof, error) {
api.Logger.Debug("Making RPC call", "method", "GetProofs", "num_ids", len(ids), "namespace", string(api.Namespace))
res, err := api.Internal.GetProofs(ctx, ids, api.Namespace)
if err != nil {
api.Logger.Error("RPC call failed", "method", "GetProofs", "error", err)
} else {
api.Logger.Debug("RPC call successful", "method", "GetProofs", "num_proofs_returned", len(res))
}
return res, err
}
// Commit creates a Commitment for each given Blob.
func (api *API) Commit(ctx context.Context, blobs []da.Blob, _ []byte) ([]da.Commitment, error) {
api.Logger.Debug("Making RPC call", "method", "Commit", "num_blobs", len(blobs), "namespace", string(api.Namespace))
res, err := api.Internal.Commit(ctx, blobs, api.Namespace)
if err != nil {
api.Logger.Error("RPC call failed", "method", "Commit", "error", err)
} else {
api.Logger.Debug("RPC call successful", "method", "Commit", "num_commitments_returned", len(res))
}
return res, err
}
// Validate validates Commitments against the corresponding Proofs. This should be possible without retrieving the Blobs.
func (api *API) Validate(ctx context.Context, ids []da.ID, proofs []da.Proof, _ []byte) ([]bool, error) {
api.Logger.Debug("Making RPC call", "method", "Validate", "num_ids", len(ids), "num_proofs", len(proofs), "namespace", string(api.Namespace))
res, err := api.Internal.Validate(ctx, ids, proofs, api.Namespace)
if err != nil {
api.Logger.Error("RPC call failed", "method", "Validate", "error", err)
} else {
api.Logger.Debug("RPC call successful", "method", "Validate", "num_results_returned", len(res))
}
return res, err
}
// Submit submits the Blobs to Data Availability layer.
func (api *API) Submit(ctx context.Context, blobs []da.Blob, gasPrice float64, _ []byte) ([]da.ID, error) {
api.Logger.Debug("Making RPC call", "method", "Submit", "num_blobs", len(blobs), "gas_price", gasPrice, "namespace", string(api.Namespace))
res, err := api.Internal.Submit(ctx, blobs, gasPrice, api.Namespace)
if err != nil {
if strings.Contains(err.Error(), context.Canceled.Error()) {
api.Logger.Debug("RPC call canceled due to context cancellation", "method", "Submit")
return res, context.Canceled
}
api.Logger.Error("RPC call failed", "method", "Submit", "error", err, "namespace", api.Namespace)
} else {
api.Logger.Debug("RPC call successful", "method", "Submit", "num_ids_returned", len(res))
}
return res, err
}
// SubmitWithOptions submits the Blobs to Data Availability layer with additional options.
// It validates the entire batch against MaxBlobSize before submission.
// If any blob or the total batch size exceeds limits, it returns ErrBlobSizeOverLimit.
func (api *API) SubmitWithOptions(ctx context.Context, inputBlobs []da.Blob, gasPrice float64, _ []byte, options []byte) ([]da.ID, error) {
maxBlobSize := api.MaxBlobSize
if len(inputBlobs) == 0 {
return []da.ID{}, nil
}
// Validate each blob individually and calculate total size
var totalSize uint64
for i, blob := range inputBlobs {
blobLen := uint64(len(blob))
if blobLen > maxBlobSize {
api.Logger.Warn("Individual blob exceeds MaxBlobSize", "index", i, "blobSize", blobLen, "maxBlobSize", maxBlobSize)
return nil, da.ErrBlobSizeOverLimit
}
totalSize += blobLen
}
// Validate total batch size
if totalSize > maxBlobSize {
api.Logger.Warn("Total batch size exceeds MaxBlobSize", "totalSize", totalSize, "maxBlobSize", maxBlobSize, "numBlobs", len(inputBlobs))
return nil, da.ErrBlobSizeOverLimit
}
api.Logger.Debug("Making RPC call", "method", "SubmitWithOptions", "num_blobs", len(inputBlobs), "total_size", totalSize, "gas_price", gasPrice, "namespace", string(api.Namespace))
res, err := api.Internal.SubmitWithOptions(ctx, inputBlobs, gasPrice, api.Namespace, options)
if err != nil {
if strings.Contains(err.Error(), context.Canceled.Error()) {
api.Logger.Debug("RPC call canceled due to context cancellation", "method", "SubmitWithOptions")
return res, context.Canceled
}
api.Logger.Error("RPC call failed", "method", "SubmitWithOptions", "error", err)
} else {
api.Logger.Debug("RPC call successful", "method", "SubmitWithOptions", "num_ids_returned", len(res))
}
return res, err
}
func (api *API) GasMultiplier(ctx context.Context) (float64, error) {
api.Logger.Debug("Making RPC call", "method", "GasMultiplier")
res, err := api.Internal.GasMultiplier(ctx)
if err != nil {
api.Logger.Error("RPC call failed", "method", "GasMultiplier", "error", err)
} else {
api.Logger.Debug("RPC call successful", "method", "GasMultiplier", "result", res)
}
return res, err
}
func (api *API) GasPrice(ctx context.Context) (float64, error) {
api.Logger.Debug("Making RPC call", "method", "GasPrice")
res, err := api.Internal.GasPrice(ctx)
if err != nil {
api.Logger.Error("RPC call failed", "method", "GasPrice", "error", err)
} else {
api.Logger.Debug("RPC call successful", "method", "GasPrice", "result", res)
}
return res, err
}
// Client is the jsonrpc client
type Client struct {
DA API
closer multiClientCloser
}
// multiClientCloser is a wrapper struct to close clients across multiple namespaces.
type multiClientCloser struct {
closers []jsonrpc.ClientCloser
}
// register adds a new closer to the multiClientCloser
func (m *multiClientCloser) register(closer jsonrpc.ClientCloser) {
m.closers = append(m.closers, closer)
}
// closeAll closes all saved clients.
func (m *multiClientCloser) closeAll() {
for _, closer := range m.closers {
closer()
}
}
// Close closes the connections to all namespaces registered on the staticClient.
func (c *Client) Close() {
c.closer.closeAll()
}
// NewClient creates a new Client with one connection per namespace with the
// given token as the authorization token.
func NewClient(ctx context.Context, logger logging.EventLogger, addr string, token, ns string) (*Client, error) {
authHeader := http.Header{"Authorization": []string{fmt.Sprintf("Bearer %s", token)}}
return newClient(ctx, logger, addr, authHeader, ns)
}
func newClient(ctx context.Context, logger logging.EventLogger, addr string, authHeader http.Header, namespace string) (*Client, error) {
var multiCloser multiClientCloser
var client Client
client.DA.Logger = logger
client.DA.MaxBlobSize = internal.DefaultMaxBytes
namespaceBytes, err := hex.DecodeString(namespace)
if err != nil {
return nil, fmt.Errorf("failed to decode namespace: %w", err)
}
client.DA.Namespace = namespaceBytes
logger.Info("creating new client", "namespace", namespace)
errs := getKnownErrorsMapping()
for name, module := range moduleMap(&client) {
closer, err := jsonrpc.NewMergeClient(ctx, addr, name, []interface{}{module}, authHeader, jsonrpc.WithErrors(errs))
if err != nil {
// If an error occurs, close any previously opened connections
multiCloser.closeAll()
return nil, err
}
multiCloser.register(closer)
}
client.closer = multiCloser // Assign the multiCloser to the client
return &client, nil
}
func moduleMap(client *Client) map[string]interface{} {
// TODO: this duplication of strings many times across the codebase can be avoided with issue #1176
return map[string]interface{}{
"da": &client.DA.Internal,
}
}