Skip to content

Commit cf8e8eb

Browse files
committed
Add explicit no block timestamp path
1 parent 03152fd commit cf8e8eb

10 files changed

Lines changed: 374 additions & 26 deletions

File tree

apps/evm/server/force_inclusion_test.go

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -34,6 +34,10 @@ func (m *mockDA) Retrieve(ctx context.Context, height uint64, namespace []byte)
3434
return da.ResultRetrieve{}
3535
}
3636

37+
func (m *mockDA) RetrieveBlobs(ctx context.Context, height uint64, namespace []byte) da.ResultRetrieve {
38+
return da.ResultRetrieve{}
39+
}
40+
3741
func (m *mockDA) RetrieveHeaders(ctx context.Context, height uint64) da.ResultRetrieve {
3842
return da.ResultRetrieve{}
3943
}

block/internal/da/client.go

Lines changed: 103 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@ import (
66
"errors"
77
"fmt"
88
"strings"
9+
"sync"
910
"time"
1011

1112
"github.com/celestiaorg/go-square/v3/share"
@@ -37,6 +38,7 @@ type client struct {
3738
dataNamespaceBz []byte
3839
forcedNamespaceBz []byte
3940
hasForcedNamespace bool
41+
timestampCache *blockTimestampCache
4042
}
4143

4244
// Ensure client implements the FullClient interface (Client + BlobGetter + Verifier).
@@ -45,8 +47,72 @@ var _ FullClient = (*client)(nil)
4547
const (
4648
blockTimestampFetchMaxAttempts = 3
4749
blockTimestampFetchBackoff = 100 * time.Millisecond
50+
blockTimestampCacheWindow = 2048
4851
)
4952

53+
type blockTimestampCache struct {
54+
mu sync.RWMutex
55+
byHeight map[uint64]time.Time
56+
highest uint64
57+
window uint64
58+
}
59+
60+
func newBlockTimestampCache(window uint64) *blockTimestampCache {
61+
if window == 0 {
62+
window = blockTimestampCacheWindow
63+
}
64+
return &blockTimestampCache{
65+
byHeight: make(map[uint64]time.Time),
66+
window: window,
67+
}
68+
}
69+
70+
func (c *blockTimestampCache) get(height uint64) (time.Time, bool) {
71+
c.mu.RLock()
72+
defer c.mu.RUnlock()
73+
74+
blockTime, ok := c.byHeight[height]
75+
return blockTime, ok
76+
}
77+
78+
func (c *blockTimestampCache) put(height uint64, blockTime time.Time) {
79+
if c == nil || blockTime.IsZero() {
80+
return
81+
}
82+
83+
blockTime = blockTime.UTC()
84+
85+
c.mu.Lock()
86+
defer c.mu.Unlock()
87+
88+
minRetained := c.minRetainedHeightLocked()
89+
if minRetained > 0 && height < minRetained {
90+
return
91+
}
92+
93+
if height > c.highest {
94+
c.highest = height
95+
}
96+
c.byHeight[height] = blockTime
97+
98+
minRetained = c.minRetainedHeightLocked()
99+
if minRetained == 0 {
100+
return
101+
}
102+
for cachedHeight := range c.byHeight {
103+
if cachedHeight < minRetained {
104+
delete(c.byHeight, cachedHeight)
105+
}
106+
}
107+
}
108+
109+
func (c *blockTimestampCache) minRetainedHeightLocked() uint64 {
110+
if c.window == 0 || c.highest < c.window-1 {
111+
return 0
112+
}
113+
return c.highest - c.window + 1
114+
}
115+
50116
// NewClient creates a new blob client wrapper with pre-calculated namespace bytes.
51117
func NewClient(cfg Config) FullClient {
52118
if cfg.DA == nil {
@@ -71,6 +137,7 @@ func NewClient(cfg Config) FullClient {
71137
dataNamespaceBz: datypes.NamespaceFromString(cfg.DataNamespace).Bytes(),
72138
forcedNamespaceBz: forcedNamespaceBz,
73139
hasForcedNamespace: hasForcedNamespace,
140+
timestampCache: newBlockTimestampCache(blockTimestampCacheWindow),
74141
}
75142
}
76143

@@ -197,7 +264,9 @@ func (c *client) getBlockTimestamp(ctx context.Context, height uint64) (time.Tim
197264
header, err := c.headerAPI.GetByHeight(headerCtx, height)
198265
cancel()
199266
if err == nil {
200-
return header.Time(), nil
267+
blockTime := header.Time().UTC()
268+
c.storeBlockTimestamp(height, blockTime)
269+
return blockTime, nil
201270
}
202271
lastErr = err
203272

@@ -225,10 +294,38 @@ func (c *client) getBlockTimestamp(ctx context.Context, height uint64) (time.Tim
225294
return time.Time{}, fmt.Errorf("get header timestamp for block %d after %d attempts: %w", height, blockTimestampFetchMaxAttempts, lastErr)
226295
}
227296

297+
func (c *client) cachedBlockTimestamp(height uint64) (time.Time, bool) {
298+
return c.timestampCache.get(height)
299+
}
300+
301+
func (c *client) storeBlockTimestamp(height uint64, blockTime time.Time) {
302+
c.timestampCache.put(height, blockTime)
303+
}
304+
305+
func (c *client) resolveBlockTimestamp(ctx context.Context, height uint64, strict bool) (time.Time, error) {
306+
if !strict {
307+
if blockTime, ok := c.cachedBlockTimestamp(height); ok {
308+
return blockTime, nil
309+
}
310+
return time.Time{}, nil
311+
}
312+
313+
return c.getBlockTimestamp(ctx, height)
314+
}
315+
316+
// RetrieveBlobs retrieves blobs without blocking on DA header timestamps.
317+
func (c *client) RetrieveBlobs(ctx context.Context, height uint64, namespace []byte) datypes.ResultRetrieve {
318+
return c.retrieve(ctx, height, namespace, false)
319+
}
320+
228321
// Retrieve retrieves blobs from the DA layer at the specified height and namespace.
229322
// It uses GetAll to fetch all blobs at once.
230323
// The timestamp is derived from the DA block header to ensure determinism.
231324
func (c *client) Retrieve(ctx context.Context, height uint64, namespace []byte) datypes.ResultRetrieve {
325+
return c.retrieve(ctx, height, namespace, true)
326+
}
327+
328+
func (c *client) retrieve(ctx context.Context, height uint64, namespace []byte, strictTimestamp bool) datypes.ResultRetrieve {
232329
ns, err := share.NewNamespaceFromBytes(namespace)
233330
if err != nil {
234331
return datypes.ResultRetrieve{
@@ -250,8 +347,7 @@ func (c *client) Retrieve(ctx context.Context, height uint64, namespace []byte)
250347
switch {
251348
case strings.Contains(err.Error(), datypes.ErrBlobNotFound.Error()):
252349
c.logger.Debug().Uint64("height", height).Msg("No blobs found at height")
253-
// Fetch block timestamp for deterministic responses using parent context
254-
blockTime, err := c.getBlockTimestamp(ctx, height)
350+
blockTime, err := c.resolveBlockTimestamp(ctx, height, strictTimestamp)
255351
if err != nil {
256352
c.logger.Error().Uint64("height", height).Err(err).Msg("failed to get block timestamp")
257353
return datypes.ResultRetrieve{
@@ -293,8 +389,7 @@ func (c *client) Retrieve(ctx context.Context, height uint64, namespace []byte)
293389
}
294390
}
295391

296-
// Fetch block timestamp for deterministic responses using parent context
297-
blockTime, err := c.getBlockTimestamp(ctx, height)
392+
blockTime, err := c.resolveBlockTimestamp(ctx, height, strictTimestamp)
298393
if err != nil {
299394
c.logger.Error().Uint64("height", height).Err(err).Msg("failed to get block timestamp")
300395
return datypes.ResultRetrieve{
@@ -426,14 +521,14 @@ func (c *client) Subscribe(ctx context.Context, namespace []byte, includeTimesta
426521
var blockTime time.Time
427522
// Use header time if available (celestia-node v0.21.0+)
428523
if resp.Header != nil && !resp.Header.Time.IsZero() {
429-
blockTime = resp.Header.Time
524+
blockTime = resp.Header.Time.UTC()
525+
c.storeBlockTimestamp(resp.Height, blockTime)
430526
} else if includeTimestamp {
431527
// Fallback to fetching timestamp for older nodes
432528
blockTime, err = c.getBlockTimestamp(ctx, resp.Height)
433529
if err != nil {
434530
c.logger.Error().Uint64("height", resp.Height).Err(err).Msg("failed to get DA block timestamp for subscription event")
435-
blockTime = time.Now()
436-
// TODO: we should retry fetching the timestamp. Current time may mess block time consistency for based sequencers.
531+
blockTime = time.Time{}
437532
}
438533
}
439534
select {

block/internal/da/client_test.go

Lines changed: 127 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -228,6 +228,133 @@ func TestClient_Retrieve_TimestampFetchRetry(t *testing.T) {
228228
}
229229
}
230230

231+
func TestClient_RetrieveBlobs_TimestampBehavior(t *testing.T) {
232+
ns := share.MustNewV0Namespace([]byte("ns"))
233+
nsBz := ns.Bytes()
234+
fixedTime := time.Date(2024, 1, 3, 12, 0, 0, 0, time.UTC)
235+
236+
specs := map[string]struct {
237+
primeCache bool
238+
getAllErr error
239+
wantStatus datypes.StatusCode
240+
wantTimestamp time.Time
241+
wantHeaderCalls int
242+
}{
243+
"uncached_skips_header_fetch": {
244+
wantStatus: datypes.StatusSuccess,
245+
wantTimestamp: time.Time{},
246+
wantHeaderCalls: 0,
247+
},
248+
"cached_reuses_timestamp": {
249+
primeCache: true,
250+
getAllErr: datypes.ErrBlobNotFound,
251+
wantStatus: datypes.StatusNotFound,
252+
wantTimestamp: fixedTime,
253+
wantHeaderCalls: 1,
254+
},
255+
}
256+
257+
for name, spec := range specs {
258+
t.Run(name, func(t *testing.T) {
259+
blobModule := mocks.NewMockBlobModule(t)
260+
headerModule := mocks.NewMockHeaderModule(t)
261+
262+
payloadBlob, err := blobrpc.NewBlobV0(ns, []byte("payload"))
263+
require.NoError(t, err)
264+
265+
headerCalls := 0
266+
headerModule.EXPECT().GetByHeight(mock.Anything, uint64(7)).RunAndReturn(func(context.Context, uint64) (*blobrpc.Header, error) {
267+
headerCalls++
268+
return &blobrpc.Header{Header: blobrpc.RawHeader{Time: fixedTime}}, nil
269+
}).Maybe()
270+
271+
cl := NewClient(Config{
272+
DA: makeBlobRPCClient(blobModule, headerModule),
273+
Logger: zerolog.Nop(),
274+
Namespace: "ns",
275+
DataNamespace: "ns",
276+
})
277+
278+
if spec.primeCache {
279+
blobModule.On("GetAll", mock.Anything, uint64(7), mock.Anything).Return([]*blobrpc.Blob{payloadBlob}, nil).Once()
280+
res := cl.Retrieve(context.Background(), 7, nsBz)
281+
require.Equal(t, datypes.StatusSuccess, res.Code)
282+
require.Equal(t, fixedTime, res.Timestamp)
283+
}
284+
285+
if spec.getAllErr != nil {
286+
blobModule.On("GetAll", mock.Anything, uint64(7), mock.Anything).Return([]*blobrpc.Blob(nil), spec.getAllErr).Once()
287+
} else {
288+
blobModule.On("GetAll", mock.Anything, uint64(7), mock.Anything).Return([]*blobrpc.Blob{payloadBlob}, nil).Once()
289+
}
290+
291+
res := cl.RetrieveBlobs(context.Background(), 7, nsBz)
292+
require.Equal(t, spec.wantStatus, res.Code)
293+
require.Equal(t, spec.wantTimestamp, res.Timestamp)
294+
require.Equal(t, spec.wantHeaderCalls, headerCalls)
295+
})
296+
}
297+
}
298+
299+
func TestBlockTimestampCache_Bounded(t *testing.T) {
300+
cache := newBlockTimestampCache(2)
301+
t1 := time.Date(2024, 1, 4, 12, 0, 0, 0, time.UTC)
302+
t2 := t1.Add(time.Second)
303+
t3 := t2.Add(time.Second)
304+
305+
cache.put(10, t1)
306+
cache.put(11, t2)
307+
cache.put(12, t3)
308+
309+
_, ok := cache.get(10)
310+
require.False(t, ok)
311+
312+
got, ok := cache.get(11)
313+
require.True(t, ok)
314+
require.Equal(t, t2, got)
315+
316+
got, ok = cache.get(12)
317+
require.True(t, ok)
318+
require.Equal(t, t3, got)
319+
}
320+
321+
func TestClient_Subscribe_PrimesTimestampCache(t *testing.T) {
322+
ns := share.MustNewV0Namespace([]byte("ns"))
323+
nsBz := ns.Bytes()
324+
fixedTime := time.Date(2024, 1, 5, 12, 0, 0, 0, time.UTC)
325+
326+
blobModule := mocks.NewMockBlobModule(t)
327+
subCh := make(chan *blobrpc.SubscriptionResponse, 1)
328+
blobModule.On("Subscribe", mock.Anything, mock.Anything).Return((<-chan *blobrpc.SubscriptionResponse)(subCh), nil).Once()
329+
blobModule.On("GetAll", mock.Anything, uint64(10), mock.Anything).Return([]*blobrpc.Blob(nil), datypes.ErrBlobNotFound).Once()
330+
331+
cl := NewClient(Config{
332+
DA: makeBlobRPCClient(blobModule, nil),
333+
Logger: zerolog.Nop(),
334+
Namespace: "ns",
335+
DataNamespace: "ns",
336+
})
337+
338+
ctx, cancel := context.WithCancel(t.Context())
339+
defer cancel()
340+
341+
events, err := cl.Subscribe(ctx, nsBz, false)
342+
require.NoError(t, err)
343+
344+
subCh <- &blobrpc.SubscriptionResponse{
345+
Height: 10,
346+
Header: &blobrpc.RawHeader{Time: fixedTime},
347+
}
348+
349+
ev := <-events
350+
require.Equal(t, uint64(10), ev.Height)
351+
require.Equal(t, fixedTime, ev.Timestamp)
352+
353+
res := cl.RetrieveBlobs(t.Context(), 10, nsBz)
354+
require.Equal(t, datypes.StatusNotFound, res.Code)
355+
require.Equal(t, fixedTime, res.Timestamp)
356+
}
357+
231358
func TestClient_SubmitOptionsMerge(t *testing.T) {
232359
ns := share.MustNewV0Namespace([]byte("ns")).Bytes()
233360
blobModule := mocks.NewMockBlobModule(t)

block/internal/da/interface.go

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,11 @@ type Client interface {
1414
// Retrieve retrieves blobs from the DA layer at the specified height and namespace.
1515
Retrieve(ctx context.Context, height uint64, namespace []byte) datypes.ResultRetrieve
1616

17+
// RetrieveBlobs retrieves blobs from the DA layer at the specified height and namespace
18+
// without requiring a DA header timestamp. Callers that need deterministic DA time should
19+
// use Retrieve instead.
20+
RetrieveBlobs(ctx context.Context, height uint64, namespace []byte) datypes.ResultRetrieve
21+
1722
// Get retrieves blobs by their IDs. Used for visualization and fetching specific blobs.
1823
Get(ctx context.Context, ids []datypes.ID, namespace []byte) ([]datypes.Blob, error)
1924

block/internal/da/tracing.go

Lines changed: 20 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -67,6 +67,26 @@ func (t *tracedClient) Retrieve(ctx context.Context, height uint64, namespace []
6767
return res
6868
}
6969

70+
func (t *tracedClient) RetrieveBlobs(ctx context.Context, height uint64, namespace []byte) datypes.ResultRetrieve {
71+
ctx, span := t.tracer.Start(ctx, "DA.RetrieveBlobs",
72+
trace.WithAttributes(
73+
attribute.Int("ns.length", len(namespace)),
74+
attribute.String("da.namespace", hex.EncodeToString(namespace)),
75+
),
76+
)
77+
defer span.End()
78+
79+
res := t.inner.RetrieveBlobs(ctx, height, namespace)
80+
81+
if res.Code != datypes.StatusSuccess && res.Code != datypes.StatusNotFound {
82+
span.RecordError(&submitError{msg: res.Message})
83+
span.SetStatus(codes.Error, res.Message)
84+
} else {
85+
span.SetAttributes(attribute.Int("blob.count", len(res.Data)))
86+
}
87+
return res
88+
}
89+
7090
func (t *tracedClient) Get(ctx context.Context, ids []datypes.ID, namespace []byte) ([]datypes.Blob, error) {
7191
ctx, span := t.tracer.Start(ctx, "DA.Get",
7292
trace.WithAttributes(

0 commit comments

Comments
 (0)