Skip to content

Commit 864385d

Browse files
committed
Merge remote-tracking branch 'origin/main' into feat/fix-node-restart
2 parents b12cdc5 + 265ff50 commit 864385d

26 files changed

Lines changed: 1035 additions & 146 deletions

block/da_speed_test.go

Lines changed: 17 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -118,22 +118,23 @@ func setupManagerForTest(t *testing.T, initialDAHeight uint64) (*Manager, *rollm
118118
Node: config.NodeConfig{BlockTime: config.DurationWrapper{Duration: blockTime}},
119119
DA: config.DAConfig{BlockTime: config.DurationWrapper{Duration: blockTime}},
120120
},
121-
genesis: genesis.Genesis{ProposerAddress: addr},
122-
daHeight: new(atomic.Uint64),
123-
headerInCh: make(chan NewHeaderEvent),
124-
headerStore: headerStore,
125-
dataInCh: make(chan NewDataEvent),
126-
dataStore: dataStore,
127-
headerCache: cache.NewCache[types.SignedHeader](),
128-
dataCache: cache.NewCache[types.Data](),
129-
headerStoreCh: make(chan struct{}),
130-
dataStoreCh: make(chan struct{}),
131-
retrieveCh: make(chan struct{}),
132-
logger: logger,
133-
lastStateMtx: new(sync.RWMutex),
134-
da: mockDAClient,
135-
signer: noopSigner,
136-
metrics: NopMetrics(),
121+
genesis: genesis.Genesis{ProposerAddress: addr},
122+
daHeight: new(atomic.Uint64),
123+
headerInCh: make(chan NewHeaderEvent),
124+
headerStore: headerStore,
125+
dataInCh: make(chan NewDataEvent),
126+
dataStore: dataStore,
127+
headerCache: cache.NewCache[types.SignedHeader](),
128+
dataCache: cache.NewCache[types.Data](),
129+
headerStoreCh: make(chan struct{}),
130+
dataStoreCh: make(chan struct{}),
131+
retrieveCh: make(chan struct{}),
132+
logger: logger,
133+
lastStateMtx: new(sync.RWMutex),
134+
da: mockDAClient,
135+
namespaceMigrationCompleted: &atomic.Bool{},
136+
signer: noopSigner,
137+
metrics: NopMetrics(),
137138
}
138139
manager.daIncludedHeight.Store(0)
139140
manager.daHeight.Store(initialDAHeight)

block/manager.go

Lines changed: 56 additions & 25 deletions
Original file line numberDiff line numberDiff line change
@@ -43,6 +43,9 @@ const (
4343
// defaultMempoolTTL is the number of blocks until transaction is dropped from mempool
4444
defaultMempoolTTL = 25
4545

46+
// Key for storing namespace migration state in the store
47+
namespaceMigrationKey = "namespace_migration_completed"
48+
4649
// Applies to the headerInCh and dataInCh, 10000 is a large enough number for headers per DA block.
4750
eventInChLength = 10000
4851
)
@@ -161,6 +164,10 @@ type Manager struct {
161164
// validatorHasherProvider is used to provide the validator hash for the header.
162165
// It is used to set the validator hash in the header.
163166
validatorHasherProvider types.ValidatorHasherProvider
167+
168+
// namespaceMigrationCompleted tracks whether we have completed the migration
169+
// from legacy namespace to separate header/data namespaces
170+
namespaceMigrationCompleted *atomic.Bool
164171
}
165172

166173
// getInitialState tries to load lastState from Store, and if it's not available it reads genesis.
@@ -368,38 +375,44 @@ func NewManager(
368375
headerBroadcaster: headerBroadcaster,
369376
dataBroadcaster: dataBroadcaster,
370377
// channels are buffered to avoid blocking on input/output operations, buffer sizes are arbitrary
371-
headerInCh: make(chan NewHeaderEvent, eventInChLength),
372-
dataInCh: make(chan NewDataEvent, eventInChLength),
373-
headerStoreCh: make(chan struct{}, 1),
374-
dataStoreCh: make(chan struct{}, 1),
375-
headerStore: headerStore,
376-
dataStore: dataStore,
377-
lastStateMtx: new(sync.RWMutex),
378-
lastBatchData: lastBatchData,
379-
headerCache: cache.NewCache[types.SignedHeader](),
380-
dataCache: cache.NewCache[types.Data](),
381-
retrieveCh: make(chan struct{}, 1),
382-
daIncluderCh: make(chan struct{}, 1),
383-
logger: logger,
384-
txsAvailable: false,
385-
pendingHeaders: pendingHeaders,
386-
pendingData: pendingData,
387-
metrics: seqMetrics,
388-
sequencer: sequencer,
389-
exec: exec,
390-
da: da,
391-
gasPrice: gasPrice,
392-
gasMultiplier: gasMultiplier,
393-
txNotifyCh: make(chan struct{}, 1), // Non-blocking channel
394-
signaturePayloadProvider: managerOpts.SignaturePayloadProvider,
395-
validatorHasherProvider: managerOpts.ValidatorHasherProvider,
378+
headerInCh: make(chan NewHeaderEvent, eventInChLength),
379+
dataInCh: make(chan NewDataEvent, eventInChLength),
380+
headerStoreCh: make(chan struct{}, 1),
381+
dataStoreCh: make(chan struct{}, 1),
382+
headerStore: headerStore,
383+
dataStore: dataStore,
384+
lastStateMtx: new(sync.RWMutex),
385+
lastBatchData: lastBatchData,
386+
headerCache: cache.NewCache[types.SignedHeader](),
387+
dataCache: cache.NewCache[types.Data](),
388+
retrieveCh: make(chan struct{}, 1),
389+
daIncluderCh: make(chan struct{}, 1),
390+
logger: logger,
391+
txsAvailable: false,
392+
pendingHeaders: pendingHeaders,
393+
pendingData: pendingData,
394+
metrics: seqMetrics,
395+
sequencer: sequencer,
396+
exec: exec,
397+
da: da,
398+
gasPrice: gasPrice,
399+
gasMultiplier: gasMultiplier,
400+
txNotifyCh: make(chan struct{}, 1), // Non-blocking channel
401+
signaturePayloadProvider: managerOpts.SignaturePayloadProvider,
402+
validatorHasherProvider: managerOpts.ValidatorHasherProvider,
403+
namespaceMigrationCompleted: &atomic.Bool{},
396404
}
397405

398406
// initialize da included height
399407
if height, err := m.store.GetMetadata(ctx, storepkg.DAIncludedHeightKey); err == nil && len(height) == 8 {
400408
m.daIncludedHeight.Store(binary.LittleEndian.Uint64(height))
401409
}
402410

411+
// initialize namespace migration state
412+
if migrationData, err := m.store.GetMetadata(ctx, namespaceMigrationKey); err == nil && len(migrationData) > 0 {
413+
m.namespaceMigrationCompleted.Store(migrationData[0] == 1)
414+
}
415+
403416
// Set the default publishBlock implementation
404417
m.publishBlock = m.publishBlockInternal
405418

@@ -411,6 +424,24 @@ func NewManager(
411424
return m, nil
412425
}
413426

427+
// setNamespaceMigrationCompleted marks the namespace migration as completed and persists it to disk
428+
func (m *Manager) setNamespaceMigrationCompleted(ctx context.Context) error {
429+
m.namespaceMigrationCompleted.Store(true)
430+
return m.store.SetMetadata(ctx, namespaceMigrationKey, []byte{1})
431+
}
432+
433+
// loadNamespaceMigrationState loads the namespace migration state from persistent storage
434+
func (m *Manager) loadNamespaceMigrationState(ctx context.Context) (bool, error) {
435+
migrationData, err := m.store.GetMetadata(ctx, namespaceMigrationKey)
436+
if err != nil {
437+
if errors.Is(err, ds.ErrNotFound) {
438+
return false, nil // Migration not completed
439+
}
440+
return false, fmt.Errorf("failed to load migration state: %w", err)
441+
}
442+
return len(migrationData) > 0 && migrationData[0] == 1, nil
443+
}
444+
414445
// PendingHeaders returns the pending headers.
415446
func (m *Manager) PendingHeaders() *PendingHeaders {
416447
return m.pendingHeaders

0 commit comments

Comments
 (0)