Skip to content
Merged
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0

- Fix race on startup sync. [#3162](https://github.com/evstack/ev-node/pull/3162)
- Strict raft state. [#3167](https://github.com/evstack/ev-node/pull/3167)
- Retry fetching the timestamp on error in da-client [#3166](https://github.com/evstack/ev-node/pull/3166)

### Changes

Expand Down
60 changes: 50 additions & 10 deletions block/internal/da/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,11 @@ type client struct {
// Ensure client implements the FullClient interface (Client + BlobGetter + Verifier).
var _ FullClient = (*client)(nil)

const (
blockTimestampFetchMaxAttempts = 3
blockTimestampFetchBackoff = 100 * time.Millisecond
)

// NewClient creates a new blob client wrapper with pre-calculated namespace bytes.
func NewClient(cfg Config) FullClient {
if cfg.DA == nil {
Expand Down Expand Up @@ -184,15 +189,40 @@ func (c *client) Submit(ctx context.Context, data [][]byte, _ float64, namespace

// getBlockTimestamp fetches the block timestamp from the DA layer header.
func (c *client) getBlockTimestamp(ctx context.Context, height uint64) (time.Time, error) {
headerCtx, cancel := context.WithTimeout(ctx, c.defaultTimeout)
defer cancel()
var lastErr error
backoff := blockTimestampFetchBackoff

for attempt := 1; attempt <= blockTimestampFetchMaxAttempts; attempt++ {
headerCtx, cancel := context.WithTimeout(ctx, c.defaultTimeout)
header, err := c.headerAPI.GetByHeight(headerCtx, height)
cancel()
if err == nil {
return header.Time(), nil
}
lastErr = err

header, err := c.headerAPI.GetByHeight(headerCtx, height)
if err != nil {
return time.Time{}, fmt.Errorf("failed to get header timestamp for block %d: %w", height, err)
if attempt == blockTimestampFetchMaxAttempts {
break
}

c.logger.Info().
Uint64("height", height).
Int("attempt", attempt).
Int("max_attempts", blockTimestampFetchMaxAttempts).
Dur("retry_in", backoff).
Err(err).
Msg("failed to get block timestamp, retrying")

select {
case <-ctx.Done():
return time.Time{}, fmt.Errorf("fetching header timestamp for block %d: %w", height, ctx.Err())
case <-time.After(backoff):
}

backoff *= 2
}

return header.Time(), nil
return time.Time{}, fmt.Errorf("get header timestamp for block %d after %d attempts: %w", height, blockTimestampFetchMaxAttempts, lastErr)
}

// Retrieve retrieves blobs from the DA layer at the specified height and namespace.
Expand Down Expand Up @@ -224,8 +254,13 @@ func (c *client) Retrieve(ctx context.Context, height uint64, namespace []byte)
blockTime, err := c.getBlockTimestamp(ctx, height)
if err != nil {
c.logger.Error().Uint64("height", height).Err(err).Msg("failed to get block timestamp")
blockTime = time.Now()
// TODO: we should retry fetching the timestamp. Current time may mess block time consistency for based sequencers.
return datypes.ResultRetrieve{
BaseResult: datypes.BaseResult{
Code: datypes.StatusError,
Message: fmt.Sprintf("failed to get block timestamp: %s", err.Error()),
Height: height,
},
}
}

return datypes.ResultRetrieve{
Expand Down Expand Up @@ -262,8 +297,13 @@ func (c *client) Retrieve(ctx context.Context, height uint64, namespace []byte)
blockTime, err := c.getBlockTimestamp(ctx, height)
if err != nil {
c.logger.Error().Uint64("height", height).Err(err).Msg("failed to get block timestamp")
blockTime = time.Now()
// TODO: we should retry fetching the timestamp. Current time may mess block time consistency for based sequencers.
return datypes.ResultRetrieve{
BaseResult: datypes.BaseResult{
Code: datypes.StatusError,
Message: fmt.Sprintf("failed to get block timestamp: %s", err.Error()),
Height: height,
},
}
}

if len(blobs) == 0 {
Expand Down
72 changes: 72 additions & 0 deletions block/internal/da/client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -156,6 +156,78 @@ func TestClient_Retrieve_Success(t *testing.T) {
require.Equal(t, expectedTime, res.Timestamp)
}

func TestClient_Retrieve_TimestampFetchRetry(t *testing.T) {
ns := share.MustNewV0Namespace([]byte("ns"))
nsBz := ns.Bytes()
fixedTime := time.Date(2024, 1, 2, 12, 0, 0, 0, time.UTC)

specs := map[string]struct {
getAllErr error
headerFailures int
wantStatus datypes.StatusCode
wantTimestamp time.Time
wantMessageSubstr string
wantHeaderCalls int
}{
"success_retries_timestamp_fetch": {
getAllErr: nil,
headerFailures: 2,
wantStatus: datypes.StatusSuccess,
wantTimestamp: fixedTime,
wantHeaderCalls: 3,
},
"not_found_fails_hard_when_timestamp_unavailable": {
getAllErr: datypes.ErrBlobNotFound,
headerFailures: blockTimestampFetchMaxAttempts,
wantStatus: datypes.StatusError,
wantMessageSubstr: "failed to get block timestamp",
wantHeaderCalls: blockTimestampFetchMaxAttempts,
},
}

for name, spec := range specs {
t.Run(name, func(t *testing.T) {
blobModule := mocks.NewMockBlobModule(t)
headerModule := mocks.NewMockHeaderModule(t)

if spec.getAllErr != nil {
blobModule.On("GetAll", mock.Anything, uint64(7), mock.Anything).Return([]*blobrpc.Blob(nil), spec.getAllErr).Once()
} else {
b, err := blobrpc.NewBlobV0(ns, []byte("payload"))
require.NoError(t, err)
blobModule.On("GetAll", mock.Anything, uint64(7), mock.Anything).Return([]*blobrpc.Blob{b}, nil).Once()
}

headerCalls := 0
headerModule.EXPECT().GetByHeight(mock.Anything, uint64(7)).RunAndReturn(func(context.Context, uint64) (*blobrpc.Header, error) {
headerCalls++
if headerCalls <= spec.headerFailures {
return nil, errors.New("header unavailable")
}
return &blobrpc.Header{Header: blobrpc.RawHeader{Time: fixedTime}}, nil
})

cl := NewClient(Config{
DA: makeBlobRPCClient(blobModule, headerModule),
Logger: zerolog.Nop(),
Namespace: "ns",
DataNamespace: "ns",
})

res := cl.Retrieve(context.Background(), 7, nsBz)
require.Equal(t, spec.wantStatus, res.Code)
require.Equal(t, spec.wantHeaderCalls, headerCalls)

if !spec.wantTimestamp.IsZero() {
require.Equal(t, spec.wantTimestamp, res.Timestamp)
}
if spec.wantMessageSubstr != "" {
require.Contains(t, res.Message, spec.wantMessageSubstr)
}
})
}
}

func TestClient_SubmitOptionsMerge(t *testing.T) {
ns := share.MustNewV0Namespace([]byte("ns")).Bytes()
blobModule := mocks.NewMockBlobModule(t)
Expand Down
Loading