Skip to content

Commit e221198

Browse files
committed
chore: addressing PR feedback
1 parent 5a41d98 commit e221198

7 files changed

Lines changed: 14 additions & 91 deletions

File tree

CHANGELOG.md

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,10 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
99

1010
## [Unreleased]
1111

12+
### Fixed
13+
14+
- Drain the pending tx queue in merged batches with a durable WAL-backed ack, fixing severe queue backlog under heavy tx load. Tx dedup moved from the reaper cache into the sequencer queue [#3351](https://github.com/evstack/ev-node/pull/3351)
15+
1216
## v1.1.2
1317

1418
### Changes

block/components.go

Lines changed: 0 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -276,25 +276,13 @@ func newAggregatorComponents(
276276
}
277277
pruner := pruner.New(logger, store, execPruner, config.Pruning, config.Node.BlockTime.Duration, config.DA.Address)
278278

279-
// expose the raw sequencer's monotonic enqueue count so the reaper can
280-
// distinguish duplicate scrapes from newly queued entries, even when
281-
// the sequencer is wrapped with tracing
282-
type enqueueCounter interface {
283-
TotalEnqueuedBatches() uint64
284-
}
285-
var totalEnqueuedBatches func() uint64
286-
if counter, ok := rawSequencer.(enqueueCounter); ok {
287-
totalEnqueuedBatches = counter.TotalEnqueuedBatches
288-
}
289-
290279
reaper, err := reaping.NewReaper(
291280
exec,
292281
sequencer,
293282
genesis,
294283
logger,
295284
config.Node.ScrapeInterval.Duration,
296285
executor.NotifyNewTransactions,
297-
totalEnqueuedBatches,
298286
)
299287
if err != nil {
300288
return nil, fmt.Errorf("failed to create reaper: %w", err)

block/internal/reaping/bench_test.go

Lines changed: 0 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -97,7 +97,6 @@ func benchmarkReaperFlow(b *testing.B, batchSize int, txSize int, feedInterval t
9797
zerolog.Nop(),
9898
50*time.Millisecond,
9999
func() { notified.Add(1) },
100-
nil,
101100
)
102101
if err != nil {
103102
b.Fatal(err)
@@ -181,7 +180,6 @@ func BenchmarkReaperFlow_Sustained(b *testing.B) {
181180
zerolog.Nop(),
182181
10*time.Millisecond,
183182
func() { notified.Add(1) },
184-
nil,
185183
)
186184
if err != nil {
187185
b.Fatal(err)
@@ -240,7 +238,6 @@ func BenchmarkReaperFlow_StartStop(b *testing.B) {
240238
zerolog.Nop(),
241239
100*time.Millisecond,
242240
func() {},
243-
nil,
244241
)
245242
if err != nil {
246243
b.Fatal(err)
@@ -281,7 +278,6 @@ func BenchmarkReaperFlow_StartStop(b *testing.B) {
281278
zerolog.Nop(),
282279
10*time.Millisecond,
283280
func() {},
284-
nil,
285281
)
286282
if err != nil {
287283
b.Fatal(err)

block/internal/reaping/reaper.go

Lines changed: 10 additions & 27 deletions
Original file line numberDiff line numberDiff line change
@@ -28,12 +28,6 @@ type Reaper struct {
2828
interval time.Duration
2929
onTxsSubmitted func()
3030

31-
// totalEnqueuedBatches reports the sequencer's monotonic enqueue count,
32-
// used to detect whether a submission actually enqueued new entries.
33-
// monotonicity makes the before/after comparison immune to concurrent
34-
// drains shrinking the queue. Optional.
35-
totalEnqueuedBatches func() uint64
36-
3731
logger zerolog.Logger
3832

3933
ctx context.Context
@@ -48,20 +42,18 @@ func NewReaper(
4842
logger zerolog.Logger,
4943
scrapeInterval time.Duration,
5044
onTxsSubmitted func(),
51-
totalEnqueuedBatches func() uint64,
5245
) (*Reaper, error) {
5346
if scrapeInterval == 0 {
5447
return nil, errors.New("scrape interval cannot be empty")
5548
}
5649

5750
return &Reaper{
58-
exec: exec,
59-
sequencer: sequencer,
60-
chainID: genesis.ChainID,
61-
interval: scrapeInterval,
62-
logger: logger.With().Str("component", "reaper").Logger(),
63-
onTxsSubmitted: onTxsSubmitted,
64-
totalEnqueuedBatches: totalEnqueuedBatches,
51+
exec: exec,
52+
sequencer: sequencer,
53+
chainID: genesis.ChainID,
54+
interval: scrapeInterval,
55+
logger: logger.With().Str("component", "reaper").Logger(),
56+
onTxsSubmitted: onTxsSubmitted,
6557
}, nil
6658
}
6759

@@ -141,11 +133,6 @@ func (r *Reaper) drainMempool() error {
141133
return nil
142134
}
143135

144-
var before uint64
145-
if r.totalEnqueuedBatches != nil {
146-
before = r.totalEnqueuedBatches()
147-
}
148-
149136
_, err = r.sequencer.SubmitBatchTxs(r.ctx, coresequencer.SubmitBatchTxsRequest{
150137
Id: []byte(r.chainID),
151138
Batch: &coresequencer.Batch{Transactions: txs},
@@ -154,19 +141,15 @@ func (r *Reaper) drainMempool() error {
154141
return fmt.Errorf("failed to submit txs to sequencer: %w", err)
155142
}
156143

157-
// without an enqueue count we cannot tell duplicates apart, so assume queued
158-
queued := true
159-
if r.totalEnqueuedBatches != nil {
160-
queued = r.totalEnqueuedBatches() > before
161-
}
162-
163-
if r.onTxsSubmitted != nil && queued {
144+
// the sequencer dedups resubmitted txs, so this may notify for txs that
145+
// were already queued. at worst this triggers an unnecessary (possibly
146+
// empty) block in lazy mode.
147+
if r.onTxsSubmitted != nil {
164148
r.onTxsSubmitted()
165149
}
166150

167151
r.logger.Debug().
168152
Int("seen_txs", len(txs)).
169-
Bool("queued", queued).
170153
Msg("drained mempool")
171154

172155
return nil

block/internal/reaping/reaper_test.go

Lines changed: 0 additions & 27 deletions
Original file line numberDiff line numberDiff line change
@@ -39,7 +39,6 @@ func newTestEnv(t *testing.T) *testEnv {
3939
zerolog.Nop(),
4040
100*time.Millisecond,
4141
env.notify,
42-
nil,
4342
)
4443
require.NoError(t, err)
4544
env.reaper = r
@@ -130,7 +129,6 @@ func TestReaper_NilCallback_NoPanic(t *testing.T) {
130129
zerolog.Nop(),
131130
100*time.Millisecond,
132131
nil,
133-
nil,
134132
)
135133
require.NoError(t, err)
136134

@@ -143,31 +141,6 @@ func TestReaper_NilCallback_NoPanic(t *testing.T) {
143141
assert.NoError(t, err)
144142
}
145143

146-
func TestReaper_DuplicateScrape_NoNotify(t *testing.T) {
147-
mockExec := testmocks.NewMockExecutor(t)
148-
mockSeq := testmocks.NewMockSequencer(t)
149-
150-
var notified atomic.Bool
151-
// enqueue count never changes — submission was fully deduped
152-
r, err := NewReaper(
153-
mockExec, mockSeq,
154-
genesis.Genesis{ChainID: "test-chain"},
155-
zerolog.Nop(),
156-
100*time.Millisecond,
157-
func() { notified.Store(true) },
158-
func() uint64 { return 1 },
159-
)
160-
require.NoError(t, err)
161-
162-
tx := []byte("dup-tx")
163-
mockExec.EXPECT().GetTxs(mock.Anything).Return([][]byte{tx}, nil).Once()
164-
mockSeq.EXPECT().SubmitBatchTxs(mock.Anything, mock.AnythingOfType("sequencer.SubmitBatchTxsRequest")).
165-
Return(&coresequencer.SubmitBatchTxsResponse{}, nil).Once()
166-
167-
require.NoError(t, r.drainMempool())
168-
assert.False(t, notified.Load())
169-
}
170-
171144
func TestReaper_StopTerminates(t *testing.T) {
172145
env := newTestEnv(t)
173146
env.execMock.EXPECT().GetTxs(mock.Anything).Return(nil, nil).Maybe()

pkg/sequencers/single/queue.go

Lines changed: 0 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -61,10 +61,6 @@ type BatchQueue struct {
6161
// by maxQueueSize (unbounded when maxQueueSize is 0).
6262
txSeen map[[32]byte]struct{}
6363

64-
// totalEnqueued counts batches ever enqueued via AddBatch. Monotonic,
65-
// never decremented, so callers can detect new enqueues race-free.
66-
totalEnqueued uint64
67-
6864
// Sequence numbers for generating new keys
6965
nextAddSeq uint64
7066
nextPrependSeq uint64
@@ -142,7 +138,6 @@ func (bq *BatchQueue) AddBatch(ctx context.Context, batch coresequencer.Batch) e
142138
bq.nextAddSeq++
143139

144140
bq.queue = append(bq.queue, queuedItem{Batch: batch, Key: key})
145-
bq.totalEnqueued++
146141

147142
return nil
148143
}
@@ -537,15 +532,6 @@ func (bq *BatchQueue) Size() int {
537532
return len(bq.queue) - bq.head + len(bq.inFlight)
538533
}
539534

540-
// TotalEnqueued returns a monotonic count of batches ever enqueued via
541-
// AddBatch. Unlike Size it never decreases, so comparing two snapshots
542-
// reliably detects whether new batches were enqueued in between.
543-
func (bq *BatchQueue) TotalEnqueued() uint64 {
544-
bq.mu.Lock()
545-
defer bq.mu.Unlock()
546-
return bq.totalEnqueued
547-
}
548-
549535
// persistBatch persists a batch to the datastore with the given key
550536
func (bq *BatchQueue) persistBatch(ctx context.Context, batch coresequencer.Batch, key string) error {
551537
pbBatch := &pb.Batch{

pkg/sequencers/single/sequencer.go

Lines changed: 0 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -425,13 +425,6 @@ func (c *Sequencer) AckBatch(ctx context.Context) error {
425425
return c.queue.Ack(ctx)
426426
}
427427

428-
// TotalEnqueuedBatches reports a monotonic count of batches enqueued via
429-
// SubmitBatchTxs. It never decreases, so two snapshots reliably detect
430-
// whether new batches were enqueued in between.
431-
func (c *Sequencer) TotalEnqueuedBatches() uint64 {
432-
return c.queue.TotalEnqueued()
433-
}
434-
435428
// VerifyBatch implements sequencing.Sequencer.
436429
func (c *Sequencer) VerifyBatch(ctx context.Context, req coresequencer.VerifyBatchRequest) (*coresequencer.VerifyBatchResponse, error) {
437430
if !c.isValid(req.Id) {

0 commit comments

Comments
 (0)