@@ -65,6 +65,11 @@ type Subscriber struct {
6565 // headReached tracks whether the subscriber has caught up to DA head.
6666 headReached atomic.Bool
6767
68+ // seenSubscriptionEvent tracks whether followLoop has observed at least one
69+ // subscription event in this run. Without this, startup catchup can stop
70+ // too early when highestSeen is still just the initial start height.
71+ seenSubscriptionEvent atomic.Bool
72+
6873 // catchupSignal wakes catchupLoop when a new height is seen above localDAHeight.
6974 catchupSignal chan struct {}
7075
@@ -192,6 +197,7 @@ func (s *Subscriber) runSubscription(ctx context.Context) error {
192197 if ! ok {
193198 return errors .New ("subscription channel closed" )
194199 }
200+ s .seenSubscriptionEvent .Store (true )
195201 s .updateHighest (ev .Height )
196202
197203 local := s .localDAHeight .Load ()
@@ -317,6 +323,13 @@ func (s *Subscriber) runCatchup(ctx context.Context) {
317323 }
318324
319325 local := s .localDAHeight .Load ()
326+ highest := s .highestSeenDAHeight .Load ()
327+
328+ if s .seenSubscriptionEvent .Load () && local > highest {
329+ s .headReached .Store (true )
330+ return
331+ }
332+
320333 // CAS claims this height — prevents followLoop from inline-processing.
321334 if ! s .localDAHeight .CompareAndSwap (local , local + 1 ) {
322335 // followLoop already advanced past this height via inline processing.
@@ -326,7 +339,7 @@ func (s *Subscriber) runCatchup(ctx context.Context) {
326339 if err := s .handler .HandleCatchup (ctx , local ); err != nil {
327340 // Roll back so we can retry after backoff.
328341 s .localDAHeight .Store (local )
329- if errors .Is (err , datypes .ErrHeightFromFuture ) {
342+ if errors .Is (err , datypes .ErrHeightFromFuture ) && local >= highest {
330343 s .headReached .Store (true )
331344 return
332345 }
0 commit comments