Skip to content

Commit f9e0333

Browse files
author
tac0turtle
committed
different approach
1 parent 7e7a1af commit f9e0333

8 files changed

Lines changed: 100 additions & 83 deletions

File tree

block/manager.go

Lines changed: 43 additions & 57 deletions
Original file line numberDiff line numberDiff line change
@@ -102,8 +102,6 @@ type Manager struct {
102102
signer signer.Signer
103103

104104
daHeight *atomic.Uint64
105-
// lastPersistedDAHeight tracks the last DA height that was persisted to disk
106-
lastPersistedDAHeight *atomic.Uint64
107105

108106
headerBroadcaster broadcaster[*types.SignedHeader]
109107
dataBroadcaster broadcaster[*types.Data]
@@ -319,9 +317,28 @@ func NewManager(
319317
return nil, err
320318
}
321319

322-
// Only use config.DA.StartHeight for fresh initialization (when DAHeight is 0)
323-
// This allows resuming from the last queried DA height on restart
324-
if s.DAHeight == 0 && config.DA.StartHeight > 0 {
320+
// Determine the DA height to start from based on the last applied block's DA inclusion
321+
if s.LastBlockHeight > 0 {
322+
// Try to find where the last applied block was included in DA
323+
headerKey := fmt.Sprintf("%s/%d/h", storepkg.HeightToDAHeightKey, s.LastBlockHeight)
324+
if daHeightBytes, err := store.GetMetadata(ctx, headerKey); err == nil && len(daHeightBytes) == 8 {
325+
lastBlockDAHeight := binary.LittleEndian.Uint64(daHeightBytes)
326+
// Start scanning from the next DA height after the last included block
327+
s.DAHeight = lastBlockDAHeight + 1
328+
logger.Info().
329+
Uint64("lastBlockHeight", s.LastBlockHeight).
330+
Uint64("lastBlockDAHeight", lastBlockDAHeight).
331+
Uint64("startingDAHeight", s.DAHeight).
332+
Msg("resuming DA scan from last applied block's DA inclusion height")
333+
} else {
334+
// Fallback: if we can't find DA inclusion info, use the persisted DA height
335+
logger.Info().
336+
Uint64("lastBlockHeight", s.LastBlockHeight).
337+
Uint64("daHeight", s.DAHeight).
338+
Msg("no DA inclusion metadata found for last block, using persisted DA height")
339+
}
340+
} else if s.DAHeight < config.DA.StartHeight {
341+
// For fresh chains, use the configured start height
325342
s.DAHeight = config.DA.StartHeight
326343
}
327344

@@ -369,18 +386,13 @@ func NewManager(
369386
daH := atomic.Uint64{}
370387
daH.Store(s.DAHeight)
371388

372-
// Initialize last persisted DA height to match current DA height
373-
lastPersistedDAH := atomic.Uint64{}
374-
lastPersistedDAH.Store(s.DAHeight)
375-
376389
m := &Manager{
377390
signer: signer,
378391
config: config,
379392
genesis: genesis,
380393
lastState: s,
381394
store: store,
382395
daHeight: &daH,
383-
lastPersistedDAHeight: &lastPersistedDAH,
384396
headerBroadcaster: headerBroadcaster,
385397
dataBroadcaster: dataBroadcaster,
386398
// channels are buffered to avoid blocking on input/output operations, buffer sizes are arbitrary
@@ -468,53 +480,6 @@ func (m *Manager) GetLastState() types.State {
468480
return m.lastState
469481
}
470482

471-
// shouldPersistDAHeight determines if we should persist the DA height based on the configured interval.
472-
// Returns true if we should persist, false otherwise.
473-
func (m *Manager) shouldPersistDAHeight(newDAHeight uint64) bool {
474-
lastPersisted := m.lastPersistedDAHeight.Load()
475-
interval := m.config.DA.PersistInterval
476-
477-
// Always persist if interval is 0 (backward compatibility) or 1 (persist every block)
478-
if interval <= 1 {
479-
return true
480-
}
481-
482-
// Persist if we've reached the interval threshold
483-
return newDAHeight-lastPersisted >= interval
484-
}
485-
486-
// persistDAHeight updates the DAHeight in the persistent state.
487-
// This ensures that the last queried DA height is preserved across restarts.
488-
func (m *Manager) persistDAHeight(ctx context.Context, newDAHeight uint64) error {
489-
m.lastStateMtx.Lock()
490-
defer m.lastStateMtx.Unlock()
491-
492-
// Create an updated state with the new DA height
493-
updatedState := m.lastState
494-
updatedState.DAHeight = newDAHeight
495-
496-
// Persist the updated state
497-
err := m.store.UpdateState(ctx, updatedState)
498-
if err != nil {
499-
return fmt.Errorf("failed to update state with new DA height %d: %w", newDAHeight, err)
500-
}
501-
502-
// Update the in-memory state and last persisted height
503-
m.lastState = updatedState
504-
m.lastPersistedDAHeight.Store(newDAHeight)
505-
return nil
506-
}
507-
508-
// maybePersistDAHeight persists the DA height only if the configured interval has been reached.
509-
// This reduces disk writes while maintaining reasonable restart recovery.
510-
func (m *Manager) maybePersistDAHeight(ctx context.Context, newDAHeight uint64) {
511-
if m.shouldPersistDAHeight(newDAHeight) {
512-
if err := m.persistDAHeight(ctx, newDAHeight); err != nil {
513-
m.logger.Error().Err(err).Uint64("newDAHeight", newDAHeight).Msg("failed to persist DA height")
514-
}
515-
}
516-
}
517-
518483
// GetDAIncludedHeight returns the height at which all blocks have been
519484
// included in the DA
520485
func (m *Manager) GetDAIncludedHeight() uint64 {
@@ -575,6 +540,27 @@ func (m *Manager) IsDAIncluded(ctx context.Context, height uint64) (bool, error)
575540
return isIncluded, nil
576541
}
577542

543+
// storeDAInclusionMetadata stores the DA height where a block was included.
544+
// This is used by both aggregators (via SetSequencerHeightToDAHeight) and
545+
// non-aggregator nodes during sync to track where blocks came from in DA.
546+
func (m *Manager) storeDAInclusionMetadata(ctx context.Context, blockHeight uint64, daHeight uint64) error {
547+
// Store header DA height
548+
headerHeightBytes := make([]byte, 8)
549+
binary.LittleEndian.PutUint64(headerHeightBytes, daHeight)
550+
headerKey := fmt.Sprintf("%s/%d/h", storepkg.HeightToDAHeightKey, blockHeight)
551+
if err := m.store.SetMetadata(ctx, headerKey, headerHeightBytes); err != nil {
552+
return fmt.Errorf("failed to store header DA height: %w", err)
553+
}
554+
555+
// Store data DA height (same as header for synced blocks)
556+
dataKey := fmt.Sprintf("%s/%d/d", storepkg.HeightToDAHeightKey, blockHeight)
557+
if err := m.store.SetMetadata(ctx, dataKey, headerHeightBytes); err != nil {
558+
return fmt.Errorf("failed to store data DA height: %w", err)
559+
}
560+
561+
return nil
562+
}
563+
578564
// SetSequencerHeightToDAHeight stores the mapping from a Evolve block height to the corresponding
579565
// DA (Data Availability) layer heights where the block's header and data were included.
580566
// This mapping is persisted in the store metadata and is used to track which DA heights

block/retriever.go

Lines changed: 1 addition & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -29,13 +29,6 @@ func (m *Manager) RetrieveLoop(ctx context.Context) {
2929
for {
3030
select {
3131
case <-ctx.Done():
32-
// On shutdown, persist the current DA height to minimize data loss on restart
33-
currentDAHeight := m.daHeight.Load()
34-
if currentDAHeight > m.lastPersistedDAHeight.Load() {
35-
if err := m.persistDAHeight(context.Background(), currentDAHeight); err != nil {
36-
m.logger.Error().Err(err).Uint64("currentDAHeight", currentDAHeight).Msg("failed to persist DA height on shutdown")
37-
}
38-
}
3932
return
4033
case <-m.retrieveCh:
4134
case <-blobsFoundCh:
@@ -54,11 +47,7 @@ func (m *Manager) RetrieveLoop(ctx context.Context) {
5447
case blobsFoundCh <- struct{}{}:
5548
default:
5649
}
57-
newDAHeight := daHeight + 1
58-
m.daHeight.Store(newDAHeight)
59-
60-
// Persist the updated DA height based on configured interval
61-
m.maybePersistDAHeight(ctx, newDAHeight)
50+
m.daHeight.Store(daHeight + 1)
6251
}
6352
}
6453

block/retriever_test.go

Lines changed: 0 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -61,7 +61,6 @@ func setupManagerForRetrieverTest(t *testing.T, initialDAHeight uint64) (*Manage
6161
config: config.Config{DA: config.DAConfig{BlockTime: config.DurationWrapper{Duration: 1 * time.Second}}},
6262
genesis: genesis.Genesis{ProposerAddress: addr},
6363
daHeight: &atomic.Uint64{},
64-
lastPersistedDAHeight: &atomic.Uint64{},
6564
daIncludedHeight: atomic.Uint64{},
6665
headerInCh: make(chan NewHeaderEvent, eventInChLength),
6766
headerStore: headerStore,
@@ -82,7 +81,6 @@ func setupManagerForRetrieverTest(t *testing.T, initialDAHeight uint64) (*Manage
8281
}
8382
manager.daIncludedHeight.Store(0)
8483
manager.daHeight.Store(initialDAHeight)
85-
manager.lastPersistedDAHeight.Store(initialDAHeight)
8684

8785
t.Cleanup(cancel)
8886

block/sync.go

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -172,6 +172,16 @@ func (m *Manager) trySyncNextBlock(ctx context.Context, daHeight uint64) error {
172172
return err
173173
}
174174

175+
// Store the DA inclusion metadata for this block
176+
// This is crucial for non-aggregator nodes to resume from the correct DA height on restart
177+
if err = m.storeDAInclusionMetadata(ctx, hHeight, daHeight); err != nil {
178+
m.logger.Error().Err(err).
179+
Uint64("blockHeight", hHeight).
180+
Uint64("daHeight", daHeight).
181+
Msg("failed to store DA inclusion metadata during sync")
182+
// Don't fail the sync, just log the error
183+
}
184+
175185
// Record sync metrics
176186
m.recordSyncMetrics("block_applied")
177187

block/sync_test.go

Lines changed: 46 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -68,15 +68,13 @@ func setupManagerForSyncLoopTest(t *testing.T, initialState types.State) (
6868
dataStoreCh: dataStoreCh,
6969
retrieveCh: retrieveCh,
7070
daHeight: &atomic.Uint64{},
71-
lastPersistedDAHeight: &atomic.Uint64{},
7271
metrics: NopMetrics(),
7372
headerStore: &goheaderstore.Store[*types.SignedHeader]{},
7473
dataStore: &goheaderstore.Store[*types.Data]{},
7574
signaturePayloadProvider: types.DefaultSignaturePayloadProvider,
7675
validatorHasherProvider: types.DefaultValidatorHasherProvider,
7776
}
7877
m.daHeight.Store(initialState.DAHeight)
79-
m.lastPersistedDAHeight.Store(initialState.DAHeight)
8078

8179
ctx, cancel := context.WithCancel(context.Background())
8280

@@ -135,6 +133,13 @@ func TestSyncLoop_ProcessSingleBlock_HeaderFirst(t *testing.T) {
135133
mockStore.On("UpdateState", mock.Anything, expectedNewState).Return(nil).Run(func(args mock.Arguments) { close(syncChan) }).Once()
136134

137135
mockStore.On("SetHeight", mock.Anything, newHeight).Return(nil).Once()
136+
137+
// Add expectations for DA inclusion metadata storage
138+
// These calls happen in storeDAInclusionMetadata when syncing blocks
139+
headerKey := fmt.Sprintf("%s/%d/h", store.HeightToDAHeightKey, newHeight)
140+
dataKey := fmt.Sprintf("%s/%d/d", store.HeightToDAHeightKey, newHeight)
141+
mockStore.On("SetMetadata", mock.Anything, headerKey, mock.Anything).Return(nil).Once()
142+
mockStore.On("SetMetadata", mock.Anything, dataKey, mock.Anything).Return(nil).Once()
138143

139144
ctx, loopCancel := context.WithTimeout(context.Background(), 1*time.Second)
140145
defer loopCancel()
@@ -220,6 +225,13 @@ func TestSyncLoop_ProcessSingleBlock_DataFirst(t *testing.T) {
220225
mockStore.On("SaveBlockData", mock.Anything, header, data, &header.Signature).Return(nil).Once()
221226
mockStore.On("UpdateState", mock.Anything, expectedNewState).Return(nil).Run(func(args mock.Arguments) { close(syncChan) }).Once()
222227
mockStore.On("SetHeight", mock.Anything, newHeight).Return(nil).Once()
228+
229+
// Add expectations for DA inclusion metadata storage
230+
// These calls happen in storeDAInclusionMetadata when syncing blocks
231+
headerKey := fmt.Sprintf("%s/%d/h", store.HeightToDAHeightKey, newHeight)
232+
dataKey := fmt.Sprintf("%s/%d/d", store.HeightToDAHeightKey, newHeight)
233+
mockStore.On("SetMetadata", mock.Anything, headerKey, mock.Anything).Return(nil).Once()
234+
mockStore.On("SetMetadata", mock.Anything, dataKey, mock.Anything).Return(nil).Once()
223235

224236
ctx, loopCancel := context.WithTimeout(context.Background(), 1*time.Second)
225237
defer loopCancel()
@@ -331,6 +343,12 @@ func TestSyncLoop_ProcessMultipleBlocks_Sequentially(t *testing.T) {
331343
t.Logf("Mock SetHeight called for H+1, updated mock height to %d", newHeight)
332344
}).
333345
Once()
346+
347+
// Add expectations for DA inclusion metadata storage for H+1
348+
headerKeyH1 := fmt.Sprintf("%s/%d/h", store.HeightToDAHeightKey, heightH1)
349+
dataKeyH1 := fmt.Sprintf("%s/%d/d", store.HeightToDAHeightKey, heightH1)
350+
mockStore.On("SetMetadata", mock.Anything, headerKeyH1, mock.Anything).Return(nil).Once()
351+
mockStore.On("SetMetadata", mock.Anything, dataKeyH1, mock.Anything).Return(nil).Once()
334352

335353
// --- Mock Expectations for H+2 ---
336354
mockExec.On("ExecuteTxs", mock.Anything, txsH2, heightH2, headerH2.Time(), expectedNewAppHashH1).
@@ -344,6 +362,12 @@ func TestSyncLoop_ProcessMultipleBlocks_Sequentially(t *testing.T) {
344362
t.Logf("Mock SetHeight called for H+2, updated mock height to %d", newHeight)
345363
}).
346364
Once()
365+
366+
// Add expectations for DA inclusion metadata storage for H+2
367+
headerKeyH2 := fmt.Sprintf("%s/%d/h", store.HeightToDAHeightKey, heightH2)
368+
dataKeyH2 := fmt.Sprintf("%s/%d/d", store.HeightToDAHeightKey, heightH2)
369+
mockStore.On("SetMetadata", mock.Anything, headerKeyH2, mock.Anything).Return(nil).Once()
370+
mockStore.On("SetMetadata", mock.Anything, dataKeyH2, mock.Anything).Return(nil).Once()
347371

348372
ctx, loopCancel := context.WithTimeout(context.Background(), 1*time.Second)
349373
defer loopCancel()
@@ -469,9 +493,15 @@ func TestSyncLoop_ProcessBlocks_OutOfOrderArrival(t *testing.T) {
469493
Run(func(args mock.Arguments) {
470494
newHeight := args.Get(1).(uint64)
471495
*heightPtr = newHeight // Update the mocked height
472-
t.Logf("Mock SetHeight called for H+2, updated mock height to %d", newHeight)
496+
t.Logf("Mock SetHeight called for H+1, updated mock height to %d", newHeight)
473497
}).
474498
Once()
499+
500+
// Add expectations for DA inclusion metadata storage for H+1
501+
headerKeyH1 := fmt.Sprintf("%s/%d/h", store.HeightToDAHeightKey, heightH1)
502+
dataKeyH1 := fmt.Sprintf("%s/%d/d", store.HeightToDAHeightKey, heightH1)
503+
mockStore.On("SetMetadata", mock.Anything, headerKeyH1, mock.Anything).Return(nil).Once()
504+
mockStore.On("SetMetadata", mock.Anything, dataKeyH1, mock.Anything).Return(nil).Once()
475505

476506
// --- Mock Expectations for H+2 (will be called second) ---
477507
mockStore.On("Height", mock.Anything).Return(heightH1, nil).Maybe()
@@ -489,6 +519,12 @@ func TestSyncLoop_ProcessBlocks_OutOfOrderArrival(t *testing.T) {
489519
mockStore.On("UpdateState", mock.Anything, expectedStateH2).Return(nil).
490520
Run(func(args mock.Arguments) { close(syncChanH2) }).
491521
Once()
522+
523+
// Add expectations for DA inclusion metadata storage for H+2
524+
headerKeyH2 := fmt.Sprintf("%s/%d/h", store.HeightToDAHeightKey, heightH2)
525+
dataKeyH2 := fmt.Sprintf("%s/%d/d", store.HeightToDAHeightKey, heightH2)
526+
mockStore.On("SetMetadata", mock.Anything, headerKeyH2, mock.Anything).Return(nil).Once()
527+
mockStore.On("SetMetadata", mock.Anything, dataKeyH2, mock.Anything).Return(nil).Once()
492528

493529
ctx, loopCancel := context.WithTimeout(context.Background(), 1*time.Second)
494530
defer loopCancel()
@@ -598,6 +634,13 @@ func TestSyncLoop_IgnoreDuplicateEvents(t *testing.T) {
598634
mockStore.On("UpdateState", mock.Anything, expectedStateH1).Return(nil).
599635
Run(func(args mock.Arguments) { close(syncChanH1) }).
600636
Once()
637+
638+
// Add expectations for DA inclusion metadata storage
639+
// These calls happen in storeDAInclusionMetadata when syncing blocks
640+
headerKey := fmt.Sprintf("%s/%d/h", store.HeightToDAHeightKey, heightH1)
641+
dataKey := fmt.Sprintf("%s/%d/d", store.HeightToDAHeightKey, heightH1)
642+
mockStore.On("SetMetadata", mock.Anything, headerKey, mock.Anything).Return(nil).Once()
643+
mockStore.On("SetMetadata", mock.Anything, dataKey, mock.Anything).Return(nil).Once()
601644

602645
ctx, loopCancel := context.WithTimeout(context.Background(), 1*time.Second)
603646
defer loopCancel()
@@ -875,13 +918,11 @@ func TestSyncLoop_MultipleHeadersArriveFirst_ThenData(t *testing.T) {
875918
headerInCh: headerInCh,
876919
dataInCh: dataInCh,
877920
daHeight: &atomic.Uint64{},
878-
lastPersistedDAHeight: &atomic.Uint64{},
879921
metrics: NopMetrics(),
880922
signaturePayloadProvider: types.DefaultSignaturePayloadProvider,
881923
validatorHasherProvider: types.DefaultValidatorHasherProvider,
882924
}
883925
m.daHeight.Store(initialState.DAHeight)
884-
m.lastPersistedDAHeight.Store(initialState.DAHeight)
885926

886927
ctx, loopCancel := context.WithTimeout(context.Background(), 2*time.Second)
887928
defer loopCancel()

pkg/config/config.go

Lines changed: 0 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -67,8 +67,6 @@ const (
6767
FlagDAMempoolTTL = "rollkit.da.mempool_ttl"
6868
// FlagDAMaxSubmitAttempts is a flag for specifying the maximum DA submit attempts
6969
FlagDAMaxSubmitAttempts = "rollkit.da.max_submit_attempts"
70-
// FlagDAPersistInterval is a flag for specifying how often to persist DA height to disk
71-
FlagDAPersistInterval = "rollkit.da.persist_interval"
7270

7371
// P2P configuration flags
7472

@@ -161,7 +159,6 @@ type DAConfig struct {
161159
StartHeight uint64 `mapstructure:"start_height" yaml:"start_height" comment:"Starting block height on the DA layer from which to begin syncing. Useful when deploying a new chain on an existing DA chain."`
162160
MempoolTTL uint64 `mapstructure:"mempool_ttl" yaml:"mempool_ttl" comment:"Number of DA blocks after which a transaction is considered expired and dropped from the mempool. Controls retry backoff timing."`
163161
MaxSubmitAttempts int `mapstructure:"max_submit_attempts" yaml:"max_submit_attempts" comment:"Maximum number of attempts to submit data to the DA layer before giving up. Higher values provide more resilience but can delay error reporting."`
164-
PersistInterval uint64 `mapstructure:"persist_interval" yaml:"persist_interval" comment:"Number of DA blocks after which to persist DA height to disk. Higher values reduce disk writes but increase potential rollback on restart. Recommended: 50-100."`
165162
}
166163

167164
// GetHeaderNamespace returns the namespace for header submissions, falling back to the legacy namespace if not set
@@ -287,7 +284,6 @@ func AddFlags(cmd *cobra.Command) {
287284
cmd.Flags().String(FlagDASubmitOptions, def.DA.SubmitOptions, "DA submit options")
288285
cmd.Flags().Uint64(FlagDAMempoolTTL, def.DA.MempoolTTL, "number of DA blocks until transaction is dropped from the mempool")
289286
cmd.Flags().Int(FlagDAMaxSubmitAttempts, def.DA.MaxSubmitAttempts, "maximum number of attempts to submit data to the DA layer before giving up")
290-
cmd.Flags().Uint64(FlagDAPersistInterval, def.DA.PersistInterval, "number of DA blocks after which to persist DA height to disk")
291287

292288
// P2P configuration flags
293289
cmd.Flags().String(FlagP2PListenAddress, def.P2P.ListenAddress, "P2P listen address (host:port)")

pkg/config/config_test.go

Lines changed: 0 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -30,7 +30,6 @@ func TestDefaultConfig(t *testing.T) {
3030
assert.Equal(t, 6*time.Second, def.DA.BlockTime.Duration)
3131
assert.Equal(t, uint64(0), def.DA.StartHeight)
3232
assert.Equal(t, uint64(0), def.DA.MempoolTTL)
33-
assert.Equal(t, uint64(100), def.DA.PersistInterval)
3433
assert.Equal(t, uint64(0), def.Node.MaxPendingHeadersAndData)
3534
assert.Equal(t, false, def.Node.LazyMode)
3635
assert.Equal(t, 60*time.Second, def.Node.LazyBlockInterval.Duration)
@@ -73,7 +72,6 @@ func TestAddFlags(t *testing.T) {
7372
assertFlagValue(t, flags, FlagDASubmitOptions, DefaultConfig.DA.SubmitOptions)
7473
assertFlagValue(t, flags, FlagDAMempoolTTL, DefaultConfig.DA.MempoolTTL)
7574
assertFlagValue(t, flags, FlagDAMaxSubmitAttempts, DefaultConfig.DA.MaxSubmitAttempts)
76-
assertFlagValue(t, flags, FlagDAPersistInterval, DefaultConfig.DA.PersistInterval)
7775

7876
// P2P flags
7977
assertFlagValue(t, flags, FlagP2PListenAddress, DefaultConfig.P2P.ListenAddress)

pkg/config/defaults.go

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -60,7 +60,6 @@ var DefaultConfig = Config{
6060
Namespace: "",
6161
HeaderNamespace: "rollkit-headers",
6262
DataNamespace: "rollkit-data",
63-
PersistInterval: 100, // Persist DA height every 100 blocks
6463
},
6564
Instrumentation: DefaultInstrumentationConfig(),
6665
Log: LogConfig{

0 commit comments

Comments
 (0)