Skip to content

Commit 20bb79d

Browse files
committed
Review feedback
1 parent aca762f commit 20bb79d

17 files changed

Lines changed: 265 additions & 215 deletions

CHANGELOG.md

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,10 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
1515

1616
- Persist cache snapshot only once at shutdown to avoid Badger vlog increase. [#3153](https://github.com/evstack/ev-node/pull/3153)
1717

18+
### Changes
19+
20+
- Subscribe to forced inclusion namespace events [#3146](https://github.com/evstack/ev-node/pull/3146)
21+
1822
## v1.0.0-rc.5
1923

2024
### Added

apps/evm/cmd/run.go

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -23,7 +23,6 @@ import (
2323
"github.com/evstack/ev-node/pkg/config"
2424
blobrpc "github.com/evstack/ev-node/pkg/da/jsonrpc"
2525
da "github.com/evstack/ev-node/pkg/da/types"
26-
"github.com/evstack/ev-node/pkg/genesis"
2726
genesispkg "github.com/evstack/ev-node/pkg/genesis"
2827
"github.com/evstack/ev-node/pkg/p2p/key"
2928
"github.com/evstack/ev-node/pkg/sequencers/based"
@@ -84,7 +83,7 @@ var RunCmd = &cobra.Command{
8483
}
8584

8685
// Create sequencer based on configuration
87-
sequencer, err := createSequencer(logger, datastore, nodeConfig, genesis, daClient, executor)
86+
sequencer, err := createSequencer(cmd.Context(), logger, datastore, nodeConfig, genesis, daClient, executor)
8887
if err != nil {
8988
return err
9089
}
@@ -145,10 +144,11 @@ func init() {
145144
// If BasedSequencer is enabled, it creates a based sequencer that fetches transactions from DA.
146145
// Otherwise, it creates a single (traditional) sequencer.
147146
func createSequencer(
147+
ctx context.Context,
148148
logger zerolog.Logger,
149149
datastore datastore.Batching,
150150
nodeConfig config.Config,
151-
genesis genesis.Genesis,
151+
genesis genesispkg.Genesis,
152152
daClient block.FullDAClient,
153153
executor execution.Executor,
154154
) (coresequencer.Sequencer, error) {
@@ -158,7 +158,7 @@ func createSequencer(
158158
return nil, fmt.Errorf("based sequencer mode requires aggregator mode to be enabled")
159159
}
160160

161-
basedSeq, err := based.NewBasedSequencer(daClient, nodeConfig, datastore, genesis, logger, executor)
161+
basedSeq, err := based.NewBasedSequencer(ctx, daClient, nodeConfig, datastore, genesis, logger, executor)
162162
if err != nil {
163163
return nil, fmt.Errorf("failed to create based sequencer: %w", err)
164164
}

apps/grpc/cmd/run.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -121,7 +121,7 @@ func createSequencer(
121121
return nil, fmt.Errorf("based sequencer mode requires aggregator mode to be enabled")
122122
}
123123

124-
basedSeq, err := based.NewBasedSequencer(daClient, nodeConfig, datastore, genesis, logger, executor)
124+
basedSeq, err := based.NewBasedSequencer(ctx, daClient, nodeConfig, datastore, genesis, logger, executor)
125125
if err != nil {
126126
return nil, fmt.Errorf("failed to create based sequencer: %w", err)
127127
}

apps/testapp/cmd/run.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -124,7 +124,7 @@ func createSequencer(
124124
return nil, fmt.Errorf("based sequencer mode requires aggregator mode to be enabled")
125125
}
126126

127-
basedSeq, err := based.NewBasedSequencer(daClient, nodeConfig, datastore, genesis, logger, executor)
127+
basedSeq, err := based.NewBasedSequencer(ctx, daClient, nodeConfig, datastore, genesis, logger, executor)
128128
if err != nil {
129129
return nil, fmt.Errorf("failed to create based sequencer: %w", err)
130130
}

block/internal/da/async_block_retriever.go

Lines changed: 10 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -174,6 +174,9 @@ func (f *asyncBlockRetriever) GetCachedBlock(ctx context.Context, daHeight uint6
174174
// HandleEvent caches blobs from the subscription inline, even empty ones,
175175
// to record that the DA height was seen and has 0 blobs.
176176
func (f *asyncBlockRetriever) HandleEvent(ctx context.Context, ev datypes.SubscriptionEvent, isInline bool) error {
177+
if err := ctx.Err(); err != nil {
178+
return err
179+
}
177180
f.cacheBlock(ctx, ev.Height, ev.Timestamp, ev.Blobs)
178181
if isInline {
179182
return errors.New("async block retriever relies on catchup state machine")
@@ -184,6 +187,10 @@ func (f *asyncBlockRetriever) HandleEvent(ctx context.Context, ev datypes.Subscr
184187
// HandleCatchup fetches a single height via Retrieve and caches it.
185188
// Also applies the prefetch window for speculative forward fetching.
186189
func (f *asyncBlockRetriever) HandleCatchup(ctx context.Context, daHeight uint64) error {
190+
if err := ctx.Err(); err != nil {
191+
return err
192+
}
193+
187194
if _, err := f.cache.Get(ctx, newBlockDataKey(daHeight)); err != nil {
188195
if err := f.fetchAndCacheBlock(ctx, daHeight); err != nil {
189196
return err
@@ -208,8 +215,10 @@ func (f *asyncBlockRetriever) HandleCatchup(ctx context.Context, daHeight uint64
208215

209216
// fetchAndCacheBlock fetches a block via Retrieve and caches it.
210217
func (f *asyncBlockRetriever) fetchAndCacheBlock(ctx context.Context, height uint64) error {
218+
if err := ctx.Err(); err != nil {
219+
return err
220+
}
211221
f.logger.Debug().Uint64("height", height).Msg("prefetching block")
212-
213222
result := f.client.Retrieve(ctx, height, f.namespace)
214223

215224
switch result.Code {

block/internal/da/forced_inclusion_retriever.go

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -42,7 +42,6 @@ type ForcedInclusionEvent struct {
4242
// NewForcedInclusionRetriever creates a new forced inclusion retriever.
4343
// It internally creates and manages an AsyncBlockRetriever for background prefetching.
4444
func NewForcedInclusionRetriever(
45-
ctx context.Context,
4645
client Client,
4746
logger zerolog.Logger,
4847
daBlockTime time.Duration,

block/internal/da/forced_inclusion_retriever_test.go

Lines changed: 10 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -24,7 +24,7 @@ func TestNewForcedInclusionRetriever(t *testing.T) {
2424
DAEpochForcedInclusion: 10,
2525
}
2626

27-
retriever := NewForcedInclusionRetriever(context.Background(), client, zerolog.Nop(), 100*time.Millisecond, false, gen.DAStartHeight, gen.DAEpochForcedInclusion)
27+
retriever := NewForcedInclusionRetriever(client, zerolog.Nop(), 100*time.Millisecond, false, gen.DAStartHeight, gen.DAEpochForcedInclusion)
2828
assert.Assert(t, retriever != nil)
2929
retriever.Stop()
3030
}
@@ -39,7 +39,7 @@ func TestForcedInclusionRetriever_RetrieveForcedIncludedTxs_NoNamespace(t *testi
3939
DAEpochForcedInclusion: 10,
4040
}
4141

42-
retriever := NewForcedInclusionRetriever(context.Background(), client, zerolog.Nop(), 100*time.Millisecond, false, gen.DAStartHeight, gen.DAEpochForcedInclusion)
42+
retriever := NewForcedInclusionRetriever(client, zerolog.Nop(), 100*time.Millisecond, false, gen.DAStartHeight, gen.DAEpochForcedInclusion)
4343
defer retriever.Stop()
4444
ctx := context.Background()
4545

@@ -60,7 +60,7 @@ func TestForcedInclusionRetriever_RetrieveForcedIncludedTxs_NotAtEpochStart(t *t
6060
DAEpochForcedInclusion: 10,
6161
}
6262

63-
retriever := NewForcedInclusionRetriever(context.Background(), client, zerolog.Nop(), 100*time.Millisecond, false, gen.DAStartHeight, gen.DAEpochForcedInclusion)
63+
retriever := NewForcedInclusionRetriever(client, zerolog.Nop(), 100*time.Millisecond, false, gen.DAStartHeight, gen.DAEpochForcedInclusion)
6464
defer retriever.Stop()
6565
ctx := context.Background()
6666

@@ -95,7 +95,7 @@ func TestForcedInclusionRetriever_RetrieveForcedIncludedTxs_EpochStartSuccess(t
9595
DAEpochForcedInclusion: 1, // Single height epoch
9696
}
9797

98-
retriever := NewForcedInclusionRetriever(context.Background(), client, zerolog.Nop(), 100*time.Millisecond, false, gen.DAStartHeight, gen.DAEpochForcedInclusion)
98+
retriever := NewForcedInclusionRetriever(client, zerolog.Nop(), 100*time.Millisecond, false, gen.DAStartHeight, gen.DAEpochForcedInclusion)
9999
defer retriever.Stop()
100100
ctx := context.Background()
101101

@@ -126,7 +126,7 @@ func TestForcedInclusionRetriever_RetrieveForcedIncludedTxs_EpochStartNotAvailab
126126
DAEpochForcedInclusion: 10,
127127
}
128128

129-
retriever := NewForcedInclusionRetriever(context.Background(), client, zerolog.Nop(), 100*time.Millisecond, false, gen.DAStartHeight, gen.DAEpochForcedInclusion)
129+
retriever := NewForcedInclusionRetriever(client, zerolog.Nop(), 100*time.Millisecond, false, gen.DAStartHeight, gen.DAEpochForcedInclusion)
130130
defer retriever.Stop()
131131
ctx := context.Background()
132132

@@ -151,7 +151,7 @@ func TestForcedInclusionRetriever_RetrieveForcedIncludedTxs_NoBlobsAtHeight(t *t
151151
DAEpochForcedInclusion: 1, // Single height epoch
152152
}
153153

154-
retriever := NewForcedInclusionRetriever(context.Background(), client, zerolog.Nop(), 100*time.Millisecond, false, gen.DAStartHeight, gen.DAEpochForcedInclusion)
154+
retriever := NewForcedInclusionRetriever(client, zerolog.Nop(), 100*time.Millisecond, false, gen.DAStartHeight, gen.DAEpochForcedInclusion)
155155
defer retriever.Stop()
156156
ctx := context.Background()
157157

@@ -191,7 +191,7 @@ func TestForcedInclusionRetriever_RetrieveForcedIncludedTxs_MultiHeightEpoch(t *
191191
DAEpochForcedInclusion: 3, // Epoch: 100-102
192192
}
193193

194-
retriever := NewForcedInclusionRetriever(context.Background(), client, zerolog.Nop(), 100*time.Millisecond, false, gen.DAStartHeight, gen.DAEpochForcedInclusion)
194+
retriever := NewForcedInclusionRetriever(client, zerolog.Nop(), 100*time.Millisecond, false, gen.DAStartHeight, gen.DAEpochForcedInclusion)
195195
defer retriever.Stop()
196196
ctx := context.Background()
197197

@@ -226,7 +226,7 @@ func TestForcedInclusionRetriever_RetrieveForcedIncludedTxs_ErrorHandling(t *tes
226226
DAEpochForcedInclusion: 1, // Single height epoch
227227
}
228228

229-
retriever := NewForcedInclusionRetriever(context.Background(), client, zerolog.Nop(), 100*time.Millisecond, false, gen.DAStartHeight, gen.DAEpochForcedInclusion)
229+
retriever := NewForcedInclusionRetriever(client, zerolog.Nop(), 100*time.Millisecond, false, gen.DAStartHeight, gen.DAEpochForcedInclusion)
230230
defer retriever.Stop()
231231
ctx := context.Background()
232232

@@ -253,7 +253,7 @@ func TestForcedInclusionRetriever_RetrieveForcedIncludedTxs_EmptyBlobsSkipped(t
253253
DAEpochForcedInclusion: 1, // Single height epoch
254254
}
255255

256-
retriever := NewForcedInclusionRetriever(context.Background(), client, zerolog.Nop(), 100*time.Millisecond, false, gen.DAStartHeight, gen.DAEpochForcedInclusion)
256+
retriever := NewForcedInclusionRetriever(client, zerolog.Nop(), 100*time.Millisecond, false, gen.DAStartHeight, gen.DAEpochForcedInclusion)
257257
defer retriever.Stop()
258258
ctx := context.Background()
259259

@@ -299,7 +299,7 @@ func TestForcedInclusionRetriever_RetrieveForcedIncludedTxs_OrderPreserved(t *te
299299
DAEpochForcedInclusion: 3, // Epoch: 100-102
300300
}
301301

302-
retriever := NewForcedInclusionRetriever(context.Background(), client, zerolog.Nop(), 100*time.Millisecond, false, gen.DAStartHeight, gen.DAEpochForcedInclusion)
302+
retriever := NewForcedInclusionRetriever(client, zerolog.Nop(), 100*time.Millisecond, false, gen.DAStartHeight, gen.DAEpochForcedInclusion)
303303
defer retriever.Stop()
304304
ctx := context.Background()
305305

block/internal/da/subscriber.go

Lines changed: 3 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -24,7 +24,6 @@ type SubscriberHandler interface {
2424

2525
// HandleCatchup is called for each height during sequential catchup.
2626
// The subscriber advances localDAHeight only after this returns (true, nil).
27-
// Returning (false, nil) rolls back localDAHeight without triggering a backoff.
2827
// Returning an error rolls back localDAHeight and triggers a backoff retry.
2928
HandleCatchup(ctx context.Context, height uint64) error
3029
}
@@ -93,7 +92,9 @@ func NewSubscriber(cfg SubscriberConfig) *Subscriber {
9392
}
9493
s.localDAHeight.Store(cfg.StartHeight)
9594
s.highestSeenDAHeight.Store(cfg.StartHeight)
96-
s.catchupSignal <- struct{}{}
95+
if cfg.StartHeight != 0 {
96+
s.catchupSignal <- struct{}{}
97+
}
9798

9899
if len(s.namespaces) == 0 {
99100
s.logger.Warn().Msg("no namespaces configured, subscriber will stay idle")
@@ -138,10 +139,6 @@ func (s *Subscriber) HasReachedHead() bool {
138139
return s.headReached.Load()
139140
}
140141

141-
// ---------------------------------------------------------------------------
142-
// Follow loop
143-
// ---------------------------------------------------------------------------
144-
145142
// signalCatchup sends a non-blocking signal to wake catchupLoop.
146143
func (s *Subscriber) signalCatchup() {
147144
select {

block/internal/syncing/da_follower.go

Lines changed: 0 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -89,10 +89,6 @@ func (f *daFollower) HasReachedHead() bool {
8989
return f.subscriber.HasReachedHead()
9090
}
9191

92-
// ---------------------------------------------------------------------------
93-
// SubscriberHandler implementation
94-
// ---------------------------------------------------------------------------
95-
9692
// HandleEvent processes a subscription event. When the follower is
9793
// caught up (ev.Height == localDAHeight) and blobs are available, it processes
9894
// them inline — avoiding a DA re-fetch round trip. Otherwise, it just lets
@@ -178,10 +174,6 @@ func (f *daFollower) fetchAndPipeHeight(ctx context.Context, daHeight uint64) er
178174
return nil
179175
}
180176

181-
// ---------------------------------------------------------------------------
182-
// Priority queue (absorbed from DARetriever — refactoring #2)
183-
// ---------------------------------------------------------------------------
184-
185177
// QueuePriorityHeight queues a DA height for priority retrieval.
186178
func (f *daFollower) QueuePriorityHeight(daHeight uint64) {
187179
f.priorityMu.Lock()

block/internal/syncing/da_retriever.go

Lines changed: 5 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -292,27 +292,22 @@ func (r *daRetriever) tryDecodeHeader(bz []byte, daHeight uint64) *types.SignedH
292292
r.logger.Warn().Msg("strict mode: rejecting block that is not a fully valid envelope")
293293
return nil
294294
}
295-
// Mode Switch Logic
296-
if isValidEnvelope && !r.strictMode {
297-
r.logger.Info().Uint64("height", header.Height()).Msg("valid DA envelope detected, switching to STRICT MODE")
298-
r.strictMode = true
299-
}
300-
301-
// Legacy blob support implies: strictMode == false AND (!isValidEnvelope).
302-
// We fall through here.
303295

304-
// Basic validation
305296
if err := header.Header.ValidateBasic(); err != nil {
306297
r.logger.Debug().Err(err).Msg("invalid header structure")
307298
return nil
308299
}
309300

310-
// Check proposer
311301
if err := r.assertExpectedProposer(header.ProposerAddress); err != nil {
312302
r.logger.Debug().Err(err).Msg("unexpected proposer")
313303
return nil
314304
}
315305

306+
if isValidEnvelope && !r.strictMode {
307+
r.logger.Info().Uint64("height", header.Height()).Msg("valid DA envelope detected, switching to STRICT MODE")
308+
r.strictMode = true
309+
}
310+
316311
// Optimistically mark as DA included
317312
// This has to be done for all fetched DA headers prior to validation because P2P does not confirm
318313
// da inclusion. This is not an issue, as an invalid header will be rejected. There cannot be hash collisions.

0 commit comments

Comments
 (0)