Skip to content

Commit 2614471

Browse files
committed
Review feedback
1 parent 86359c3 commit 2614471

4 files changed

Lines changed: 126 additions & 4 deletions

File tree

block/internal/da/subscriber.go

Lines changed: 24 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -80,6 +80,7 @@ type Subscriber struct {
8080
fetchBlockTimestamp bool
8181

8282
// lifecycle
83+
lifecycleMu sync.Mutex
8384
cancel context.CancelFunc
8485
wg sync.WaitGroup
8586
}
@@ -113,8 +114,17 @@ func (s *Subscriber) Start(ctx context.Context) error {
113114
return nil
114115
}
115116

116-
ctx, s.cancel = context.WithCancel(ctx)
117+
s.lifecycleMu.Lock()
118+
if s.cancel != nil {
119+
s.lifecycleMu.Unlock()
120+
return nil
121+
}
122+
123+
ctx, cancel := context.WithCancel(ctx)
124+
s.cancel = cancel
117125
s.wg.Add(2)
126+
s.lifecycleMu.Unlock()
127+
118128
go s.followLoop(ctx)
119129
go s.catchupLoop(ctx)
120130

@@ -123,8 +133,13 @@ func (s *Subscriber) Start(ctx context.Context) error {
123133

124134
// Stop gracefully stops the background goroutines.
125135
func (s *Subscriber) Stop() {
126-
if s.cancel != nil {
127-
s.cancel()
136+
s.lifecycleMu.Lock()
137+
cancel := s.cancel
138+
s.cancel = nil
139+
s.lifecycleMu.Unlock()
140+
141+
if cancel != nil {
142+
cancel()
128143
}
129144
s.wg.Wait()
130145
}
@@ -206,7 +221,12 @@ func (s *Subscriber) runSubscription(ctx context.Context) error {
206221
err = s.handler.HandleEvent(subCtx, ev, isInline)
207222
if isInline {
208223
if err == nil {
209-
s.headReached.Store(true)
224+
highest := s.highestSeenDAHeight.Load()
225+
// Mark head reached only if this inline event is at or beyond
226+
// the currently observed head; otherwise catchup is still pending.
227+
if ev.Height >= highest {
228+
s.headReached.Store(true)
229+
}
210230
} else {
211231
s.localDAHeight.Store(local)
212232
s.logger.Debug().Err(err).Uint64("da_height", ev.Height).

block/internal/da/subscriber_test.go

Lines changed: 45 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -100,3 +100,48 @@ func TestSubscriber_RunCatchup(t *testing.T) {
100100
assert.True(t, sub.HasReachedHead())
101101
})
102102
}
103+
104+
func TestSubscriber_RunSubscription_InlineDoesNotPrematurelyReachHead(t *testing.T) {
105+
ctx, cancel := context.WithCancel(t.Context())
106+
defer cancel()
107+
108+
mockHandler := new(MockSubscriberHandler)
109+
mockClient := testmocks.NewMockClient(t)
110+
111+
sub := NewSubscriber(SubscriberConfig{
112+
Client: mockClient,
113+
Logger: zerolog.Nop(),
114+
Handler: mockHandler,
115+
Namespaces: [][]byte{[]byte("ns")},
116+
StartHeight: 100,
117+
DABlockTime: time.Hour,
118+
})
119+
120+
subCh := make(chan datypes.SubscriptionEvent, 2)
121+
mockClient.EXPECT().
122+
Subscribe(mock.Anything, []byte("ns"), false).
123+
Return((<-chan datypes.SubscriptionEvent)(subCh), nil).
124+
Once()
125+
126+
mockHandler.On("HandleEvent", mock.Anything, datypes.SubscriptionEvent{
127+
Height: 101,
128+
Blobs: [][]byte{[]byte("h101")},
129+
}, false).Return(nil).Once()
130+
mockHandler.On("HandleEvent", mock.Anything, datypes.SubscriptionEvent{
131+
Height: 100,
132+
Blobs: [][]byte{[]byte("h100")},
133+
}, true).Return(nil).Once()
134+
135+
subCh <- datypes.SubscriptionEvent{Height: 101, Blobs: [][]byte{[]byte("h101")}}
136+
subCh <- datypes.SubscriptionEvent{Height: 100, Blobs: [][]byte{[]byte("h100")}}
137+
close(subCh)
138+
139+
err := sub.runSubscription(ctx)
140+
assert.Error(t, err)
141+
if err != nil {
142+
assert.Contains(t, err.Error(), "subscription channel closed")
143+
}
144+
assert.False(t, sub.HasReachedHead())
145+
assert.Equal(t, uint64(101), sub.LocalDAHeight())
146+
assert.Equal(t, uint64(101), sub.HighestSeenDAHeight())
147+
}

block/internal/syncing/da_follower.go

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -36,6 +36,8 @@ type daFollower struct {
3636
priorityHeights []uint64
3737
}
3838

39+
const maxPriorityHeights = 1024
40+
3941
// DAFollowerConfig holds configuration for creating a DAFollower.
4042
type DAFollowerConfig struct {
4143
Client da.Client
@@ -183,6 +185,16 @@ func (f *daFollower) QueuePriorityHeight(daHeight uint64) {
183185
if found {
184186
return
185187
}
188+
189+
// Keep the queue bounded. When full, prefer lower (sooner) heights.
190+
if len(f.priorityHeights) >= maxPriorityHeights {
191+
last := f.priorityHeights[len(f.priorityHeights)-1]
192+
if daHeight >= last {
193+
return
194+
}
195+
f.priorityHeights = f.priorityHeights[:len(f.priorityHeights)-1]
196+
}
197+
186198
f.priorityHeights = slices.Insert(f.priorityHeights, idx, daHeight)
187199
}
188200

block/internal/syncing/da_follower_test.go

Lines changed: 45 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -206,3 +206,48 @@ func TestDAFollower_HandleCatchup(t *testing.T) {
206206
})
207207
}
208208
}
209+
210+
func TestDAFollower_QueuePriorityHeight(t *testing.T) {
211+
specs := map[string]struct {
212+
initial []uint64
213+
queue []uint64
214+
want []uint64
215+
}{
216+
"sorts_and_deduplicates": {
217+
initial: []uint64{5, 10},
218+
queue: []uint64{7, 10, 3},
219+
want: []uint64{3, 5, 7, 10},
220+
},
221+
"bounded_drops_largest_when_smaller_arrives": {
222+
initial: makeRange(1, maxPriorityHeights),
223+
queue: []uint64{maxPriorityHeights + 1, 0},
224+
want: append([]uint64{0}, makeRange(1, maxPriorityHeights-1)...),
225+
},
226+
}
227+
228+
for name, spec := range specs {
229+
t.Run(name, func(t *testing.T) {
230+
follower := &daFollower{
231+
logger: zerolog.Nop(),
232+
priorityHeights: append([]uint64(nil), spec.initial...),
233+
}
234+
235+
for _, daHeight := range spec.queue {
236+
follower.QueuePriorityHeight(daHeight)
237+
}
238+
239+
assert.Equal(t, spec.want, follower.priorityHeights)
240+
})
241+
}
242+
}
243+
244+
func makeRange(start, end uint64) []uint64 {
245+
if end < start {
246+
return nil
247+
}
248+
out := make([]uint64, 0, end-start+1)
249+
for v := start; v <= end; v++ {
250+
out = append(out, v)
251+
}
252+
return out
253+
}

0 commit comments

Comments
 (0)