Skip to content

Commit 97c67cb

Browse files
committed
fix wrong epoch check and improve time check
1 parent f43cb63 commit 97c67cb

5 files changed

Lines changed: 68 additions & 47 deletions

File tree

block/internal/syncing/syncer.go

Lines changed: 10 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -753,7 +753,7 @@ func (s *Syncer) TrySyncNextBlock(ctx context.Context, event *common.DAHeightEve
753753
// P2P nodes aren't actually able to verify forced inclusions txs as DA inclusion happens later (so DA hints are not available) and DA hints cannot be trusted. This is a known limitation described in the ADR.
754754
// In the future we should verify force inclusion txs completely asynchronously, while still waiting for block n-1 execution.
755755
if event.Source == common.SourceDA {
756-
if err := s.VerifyForcedInclusionTxs(ctx, currentState.DAHeight, data); err != nil {
756+
if err := s.VerifyForcedInclusionTxs(ctx, event.DaHeight, data); err != nil {
757757
s.logger.Error().Err(err).Uint64("height", nextHeight).Msg("forced inclusion verification failed")
758758
if errors.Is(err, errMaliciousProposer) {
759759
// remove header as da included from cache
@@ -772,25 +772,16 @@ func (s *Syncer) TrySyncNextBlock(ctx context.Context, event *common.DAHeightEve
772772
}
773773

774774
// Update DA height if needed
775-
// This height is only updated when a height is processed from DA as P2P
776-
// events do not contain DA height information.
775+
// Update state.DAHeight to track where we are in DA.
776+
// state.DAHeight is used for state persistence and restart recovery, not for
777+
// forced inclusion verification (which now uses event.DaHeight directly).
777778
//
778-
// When a sequencer restarts after extended downtime, it produces "catch-up"
779-
// blocks containing forced inclusion transactions from missed DA epochs and
780-
// submits them to DA at the current (much higher) DA height. This creates a
781-
// gap between the state's DAHeight (tracking forced inclusion epoch progress)
782-
// and event.DaHeight (the DA submission height).
783-
//
784-
// If we jump state.DAHeight directly to event.DaHeight, subsequent calls to
785-
// VerifyForcedInclusionTxs would check the wrong epoch (the submission epoch
786-
// instead of the next forced-inclusion epoch), causing valid catch-up blocks
787-
// to be incorrectly flagged as malicious.
788-
//
789-
// To handle this, when the gap exceeds one DA epoch, we compute the epoch
790-
// start that contains event.DaHeight and set DAHeight to that epoch start.
791-
// This ensures all blocks from the same DA epoch verify against the same
792-
// epoch. Once the sequencer finishes catching up and the gap closes,
793-
// DAHeight converges to event.DaHeight.
779+
// When a sequencer restarts after extended downtime it submits catch-up blocks
780+
// all at the current (much higher) DA height, creating a large gap between
781+
// state.DAHeight and event.DaHeight. Jumping straight to event.DaHeight would
782+
// cause the next VerifyForcedInclusionTxs call to skip over intermediate epochs.
783+
// Instead we advance state.DAHeight to the epoch start that contains
784+
// event.DaHeight, so it converges gradually as catch-up completes.
794785
if event.DaHeight > newState.DAHeight {
795786
epochSize := s.genesis.DAEpochForcedInclusion
796787
gap := event.DaHeight - newState.DAHeight

pkg/sequencers/based/sequencer.go

Lines changed: 15 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -37,6 +37,8 @@ type BasedSequencer struct {
3737
currentBatchTxs [][]byte
3838
// DA epoch end time for timestamp calculation
3939
currentDAEndTime time.Time
40+
// Total number of transactions in the current DA epoch (used for timestamp jitter)
41+
currentEpochTxCount uint64
4042
}
4143

4244
// NewBasedSequencer creates a new based sequencer instance
@@ -113,6 +115,7 @@ func (s *BasedSequencer) GetNextBatch(ctx context.Context, req coresequencer.Get
113115
if daEndTime.After(s.currentDAEndTime) {
114116
s.currentDAEndTime = daEndTime
115117
}
118+
s.currentEpochTxCount = uint64(len(s.currentBatchTxs))
116119
}
117120

118121
// Get remaining transactions from checkpoint position
@@ -156,9 +159,13 @@ func (s *BasedSequencer) GetNextBatch(ctx context.Context, req coresequencer.Get
156159
}
157160
doneProcessing:
158161

159-
// Update checkpoint based on consumed transactions
162+
// Update checkpoint based on consumed transactions.
163+
// txIndexForTimestamp is captured before the epoch-boundary reset so the
164+
// final block of an epoch lands exactly on daEndTime.
165+
var txIndexForTimestamp uint64
160166
if daHeight > 0 || len(batchTxs) > 0 {
161167
s.checkpoint.TxIndex += consumedCount
168+
txIndexForTimestamp = s.checkpoint.TxIndex
162169

163170
// If we've consumed all transactions from this DA epoch, move to next DA epoch
164171
if s.checkpoint.TxIndex >= uint64(len(s.currentBatchTxs)) {
@@ -179,14 +186,13 @@ doneProcessing:
179186
Msg("updated checkpoint after processing batch")
180187
}
181188

182-
// Calculate timestamp based on remaining transactions after this batch
183-
// timestamp corresponds to the last block time of a DA epoch, based on the remaining transactions to be executed
184-
// this is done in order to handle the case where a DA epoch must fit in multiple blocks
185-
var remainingTxs uint64
186-
if len(s.currentBatchTxs) > 0 {
187-
remainingTxs = uint64(len(s.currentBatchTxs)) - s.checkpoint.TxIndex
188-
}
189-
timestamp := s.currentDAEndTime.Add(-time.Duration(remainingTxs) * time.Millisecond)
189+
// Spread blocks across the DA epoch window to produce monotonically increasing timestamps:
190+
// epochStart = daEndTime - totalEpochTxs * 1ms
191+
// blockTimestamp = epochStart + txIndexForTimestamp * 1ms
192+
// The last block of an epoch lands exactly on daEndTime; the first block of
193+
// the next epoch starts at nextDaEndTime - N*1ms >= prevDaEndTime.
194+
epochStart := s.currentDAEndTime.Add(-time.Duration(s.currentEpochTxCount) * time.Millisecond)
195+
timestamp := epochStart.Add(time.Duration(txIndexForTimestamp) * time.Millisecond)
190196

191197
return &coresequencer.GetNextBatchResponse{
192198
Batch: &coresequencer.Batch{Transactions: validTxs},

pkg/sequencers/based/sequencer_test.go

Lines changed: 13 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -693,8 +693,10 @@ func TestBasedSequencer_GetNextBatch_EmptyDABatch_IncreasesDAHeight(t *testing.T
693693
}
694694

695695
func TestBasedSequencer_GetNextBatch_TimestampAdjustment(t *testing.T) {
696-
// Test that timestamp is adjusted based on the number of transactions in the batch
697-
// The timestamp should be: daEndTime - (len(batch.Transactions) * 1ms)
696+
// Test that timestamp is adjusted based on the position within the DA epoch.
697+
// The formula is: epochStart + txIndexForTimestamp * 1ms
698+
// where epochStart = daEndTime - totalEpochTxs * 1ms
699+
// and txIndexForTimestamp is captured before any checkpoint reset.
698700

699701
testBlobs := [][]byte{[]byte("tx1"), []byte("tx2"), []byte("tx3")}
700702
daEndTime := time.Date(2024, 1, 1, 12, 0, 0, 0, time.UTC)
@@ -726,15 +728,18 @@ func TestBasedSequencer_GetNextBatch_TimestampAdjustment(t *testing.T) {
726728
require.NotNil(t, resp.Batch)
727729
assert.Equal(t, 3, len(resp.Batch.Transactions))
728730

729-
// After taking all 3 txs, there are 0 remaining, so timestamp = daEndTime - 0ms = daEndTime
731+
// epochStart = T - 3ms; all 3 txs consumed → txIndexForTimestamp=3 (pre-reset)
732+
// timestamp = T - 3ms + 3ms = daEndTime
730733
expectedTimestamp := daEndTime
731734
assert.Equal(t, expectedTimestamp, resp.Timestamp)
732735

733736
mockRetriever.AssertExpectations(t)
734737
}
735738

736739
func TestBasedSequencer_GetNextBatch_TimestampAdjustment_PartialBatch(t *testing.T) {
737-
// Test timestamp adjustment when MaxBytes limits the batch size
740+
// Test timestamp adjustment when filtering limits the batch size.
741+
// Formula: epochStart + txIndexForTimestamp * 1ms
742+
// where epochStart = daEndTime - totalEpochTxs * 1ms
738743
tx1 := make([]byte, 100)
739744
tx2 := make([]byte, 150)
740745
tx3 := make([]byte, 200)
@@ -769,7 +774,8 @@ func TestBasedSequencer_GetNextBatch_TimestampAdjustment_PartialBatch(t *testing
769774
require.NotNil(t, resp.Batch)
770775
assert.Equal(t, 2, len(resp.Batch.Transactions))
771776

772-
// After taking 2 txs, there is 1 remaining, so timestamp = daEndTime - 1ms
777+
// epochStart = T - 3ms; 2 txs consumed → txIndexForTimestamp=2
778+
// timestamp = T - 3ms + 2ms = daEndTime - 1ms
773779
expectedTimestamp := daEndTime.Add(-1 * time.Millisecond)
774780
assert.Equal(t, expectedTimestamp, resp.Timestamp)
775781

@@ -786,7 +792,8 @@ func TestBasedSequencer_GetNextBatch_TimestampAdjustment_PartialBatch(t *testing
786792
require.NotNil(t, resp.Batch)
787793
assert.Equal(t, 1, len(resp.Batch.Transactions))
788794

789-
// After taking this 1 tx, there are 0 remaining, so timestamp = daEndTime - 0ms = daEndTime
795+
// epochStart = T - 3ms; 3 txs consumed total → txIndexForTimestamp=3 (pre-reset)
796+
// timestamp = T - 3ms + 3ms = daEndTime
790797
expectedTimestamp2 := daEndTime
791798
assert.Equal(t, expectedTimestamp2, resp.Timestamp)
792799

pkg/sequencers/single/sequencer.go

Lines changed: 18 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -64,6 +64,8 @@ type Sequencer struct {
6464
catchUpState atomic.Int32
6565
// currentDAEndTime is the DA epoch end timestamp, used during catch-up
6666
currentDAEndTime time.Time
67+
// currentEpochTxCount is the total number of txs in the current DA epoch (used for timestamp jitter)
68+
currentEpochTxCount uint64
6769
}
6870

6971
// NewSequencer creates a new Single Sequencer
@@ -311,10 +313,14 @@ func (c *Sequencer) GetNextBatch(ctx context.Context, req coresequencer.GetNextB
311313
}
312314
}
313315

314-
// Update checkpoint after consuming forced inclusion transactions
316+
// Update checkpoint after consuming forced inclusion transactions.
317+
// txIndexForTimestamp is captured before the epoch-boundary reset so the
318+
// final block of an epoch lands exactly on daEndTime.
319+
var txIndexForTimestamp uint64
315320
if daHeight > 0 || len(forcedTxs) > 0 {
316321
// Advance TxIndex by the number of consumed forced transactions
317322
c.checkpoint.TxIndex += forcedTxConsumedCount
323+
txIndexForTimestamp = c.checkpoint.TxIndex
318324

319325
if c.checkpoint.TxIndex >= uint64(len(c.cachedForcedInclusionTxs)) {
320326
// All forced txs were consumed (OK or Remove), move to next DA epoch
@@ -341,14 +347,16 @@ func (c *Sequencer) GetNextBatch(ctx context.Context, req coresequencer.GetNextB
341347
batchTxs = append(batchTxs, validForcedTxs...)
342348
batchTxs = append(batchTxs, validMempoolTxs...)
343349

344-
// Use DA epoch timestamp during catch-up
350+
// Spread catch-up blocks across the DA epoch window for monotonically increasing timestamps:
351+
// epochStart = daEndTime - totalEpochTxs * 1ms
352+
// blockTimestamp = epochStart + txIndexForTimestamp * 1ms
353+
// The last block of an epoch lands exactly on daEndTime; the first block of
354+
// the next epoch starts at nextDaEndTime - N*1ms >= prevDaEndTime.
355+
// During normal operation, use wall-clock time instead.
345356
timestamp := time.Now()
346357
if c.catchUpState.Load() == catchUpInProgress && !c.currentDAEndTime.IsZero() {
347-
var remainingForcedTxs uint64
348-
if len(c.cachedForcedInclusionTxs) > 0 {
349-
remainingForcedTxs = uint64(len(c.cachedForcedInclusionTxs)) - c.checkpoint.TxIndex
350-
}
351-
timestamp = c.currentDAEndTime.Add(-time.Duration(remainingForcedTxs) * time.Millisecond)
358+
epochStart := c.currentDAEndTime.Add(-time.Duration(c.currentEpochTxCount) * time.Millisecond)
359+
timestamp = epochStart.Add(time.Duration(txIndexForTimestamp) * time.Millisecond)
352360
}
353361

354362
return &coresequencer.GetNextBatchResponse{
@@ -447,6 +455,8 @@ func (c *Sequencer) fetchNextDAEpoch(ctx context.Context, maxBytes uint64) (uint
447455
if !forcedTxsEvent.Timestamp.IsZero() {
448456
c.currentDAEndTime = forcedTxsEvent.Timestamp.UTC()
449457
}
458+
// Record total tx count for the epoch so the timestamp jitter can be computed
459+
// after oversized txs are filtered out below.
450460

451461
// Filter out oversized transactions
452462
validTxs := make([][]byte, 0, len(forcedTxsEvent.Txs))
@@ -473,6 +483,7 @@ func (c *Sequencer) fetchNextDAEpoch(ctx context.Context, maxBytes uint64) (uint
473483
Msg("fetched forced inclusion transactions from DA")
474484

475485
c.cachedForcedInclusionTxs = validTxs
486+
c.currentEpochTxCount = uint64(len(validTxs))
476487

477488
return forcedTxsEvent.EndDaHeight, nil
478489
}

pkg/sequencers/single/sequencer_test.go

Lines changed: 12 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -1912,8 +1912,12 @@ func TestSequencer_CatchUp_CheckpointAdvancesDuringCatchUp(t *testing.T) {
19121912
func TestSequencer_CatchUp_MonotonicTimestamps(t *testing.T) {
19131913
// When a single DA epoch has more forced txs than fit in one block,
19141914
// catch-up must produce strictly monotonic timestamps across the
1915-
// resulting blocks. This uses the same jitter scheme as the based
1916-
// sequencer: timestamp = DAEndTime - (remainingForcedTxs * 1ms).
1915+
// resulting blocks. The jitter scheme is:
1916+
// epochStart = daEndTime - totalEpochTxs * 1ms
1917+
// blockTimestamp = epochStart + txIndexForTimestamp * 1ms
1918+
// where txIndexForTimestamp is the cumulative consumed-tx count
1919+
// captured *before* the checkpoint resets at an epoch boundary.
1920+
// The final block of an epoch therefore lands exactly on daEndTime.
19171921
ctx := context.Background()
19181922
logger := zerolog.New(zerolog.NewTestWriter(t))
19191923

@@ -2016,10 +2020,11 @@ func TestSequencer_CatchUp_MonotonicTimestamps(t *testing.T) {
20162020
i, timestamps[i], i-1, timestamps[i-1])
20172021
}
20182022

2019-
// Verify exact jitter values:
2020-
// Block 0: 3 txs total, 1 consumed → 2 remaining → T - 2ms
2021-
// Block 1: 1 consumed → 1 remaining → T - 1ms
2022-
// Block 2: 1 consumed → 0 remaining → T
2023+
// Verify exact jitter values using epochStart + txIndexForTimestamp formula:
2024+
// epochStart = T - 3ms (3 total txs in epoch)
2025+
// Block 0: 1 consumed → txIndex=1 → epochStart + 1ms = T - 2ms
2026+
// Block 1: 1 consumed → txIndex=2 → epochStart + 2ms = T - 1ms
2027+
// Block 2: 1 consumed → txIndex=3 (pre-reset) → epochStart + 3ms = T
20232028
assert.Equal(t, epochTimestamp.Add(-2*time.Millisecond), timestamps[0], "block 0: T - 2ms")
20242029
assert.Equal(t, epochTimestamp.Add(-1*time.Millisecond), timestamps[1], "block 1: T - 1ms")
20252030
assert.Equal(t, epochTimestamp, timestamps[2], "block 2: T (exact epoch end time)")
@@ -2032,6 +2037,7 @@ func TestSequencer_CatchUp_MonotonicTimestamps(t *testing.T) {
20322037
assert.True(t, resp4.Timestamp.After(timestamps[2]),
20332038
"epoch 101 timestamp (%v) must be after epoch 100 last timestamp (%v)",
20342039
resp4.Timestamp, timestamps[2])
2040+
// epoch 101 has 1 tx: epochStart = T2 - 1ms, txIndexForTimestamp=1 → T2 - 1ms + 1ms = T2
20352041
assert.Equal(t, epoch2Timestamp, resp4.Timestamp, "single-tx epoch gets exact DA end time")
20362042
}
20372043

0 commit comments

Comments
 (0)