Skip to content

Commit 0875dd2

Browse files
committed
fix: address review findings in batch queue drain/ack path
- defer postponed entry requeue until ack fully succeeds so a drain rollback after a failed ack neither loses nor duplicates postponed txs - replace fmt.Printf with structured logging in BatchQueue - use monotonic enqueue counter in reaper to detect new submissions race-free against concurrent drain/ack
1 parent bd4e754 commit 0875dd2

7 files changed

Lines changed: 184 additions & 92 deletions

File tree

block/components.go

Lines changed: 7 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -276,15 +276,15 @@ func newAggregatorComponents(
276276
}
277277
pruner := pruner.New(logger, store, execPruner, config.Pruning, config.Node.BlockTime.Duration, config.DA.Address)
278278

279-
// expose the raw sequencer's pending batch count so the reaper can
279+
// expose the raw sequencer's monotonic enqueue count so the reaper can
280280
// distinguish duplicate scrapes from newly queued entries, even when
281281
// the sequencer is wrapped with tracing
282-
type pendingBatchCounter interface {
283-
PendingBatchCount() int
282+
type enqueueCounter interface {
283+
TotalEnqueuedBatches() uint64
284284
}
285-
var pendingBatchCount func() int
286-
if counter, ok := rawSequencer.(pendingBatchCounter); ok {
287-
pendingBatchCount = counter.PendingBatchCount
285+
var totalEnqueuedBatches func() uint64
286+
if counter, ok := rawSequencer.(enqueueCounter); ok {
287+
totalEnqueuedBatches = counter.TotalEnqueuedBatches
288288
}
289289

290290
reaper, err := reaping.NewReaper(
@@ -294,7 +294,7 @@ func newAggregatorComponents(
294294
logger,
295295
config.Node.ScrapeInterval.Duration,
296296
executor.NotifyNewTransactions,
297-
pendingBatchCount,
297+
totalEnqueuedBatches,
298298
)
299299
if err != nil {
300300
return nil, fmt.Errorf("failed to create reaper: %w", err)

block/internal/reaping/reaper.go

Lines changed: 19 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -28,9 +28,11 @@ type Reaper struct {
2828
interval time.Duration
2929
onTxsSubmitted func()
3030

31-
// pendingBatchCount reports the sequencer's pending batch count, used to
32-
// detect whether a submission actually enqueued new entries. Optional.
33-
pendingBatchCount func() int
31+
// totalEnqueuedBatches reports the sequencer's monotonic enqueue count,
32+
// used to detect whether a submission actually enqueued new entries.
33+
// monotonicity makes the before/after comparison immune to concurrent
34+
// drains shrinking the queue. Optional.
35+
totalEnqueuedBatches func() uint64
3436

3537
logger zerolog.Logger
3638

@@ -46,20 +48,20 @@ func NewReaper(
4648
logger zerolog.Logger,
4749
scrapeInterval time.Duration,
4850
onTxsSubmitted func(),
49-
pendingBatchCount func() int,
51+
totalEnqueuedBatches func() uint64,
5052
) (*Reaper, error) {
5153
if scrapeInterval == 0 {
5254
return nil, errors.New("scrape interval cannot be empty")
5355
}
5456

5557
return &Reaper{
56-
exec: exec,
57-
sequencer: sequencer,
58-
chainID: genesis.ChainID,
59-
interval: scrapeInterval,
60-
logger: logger.With().Str("component", "reaper").Logger(),
61-
onTxsSubmitted: onTxsSubmitted,
62-
pendingBatchCount: pendingBatchCount,
58+
exec: exec,
59+
sequencer: sequencer,
60+
chainID: genesis.ChainID,
61+
interval: scrapeInterval,
62+
logger: logger.With().Str("component", "reaper").Logger(),
63+
onTxsSubmitted: onTxsSubmitted,
64+
totalEnqueuedBatches: totalEnqueuedBatches,
6365
}, nil
6466
}
6567

@@ -139,9 +141,9 @@ func (r *Reaper) drainMempool() error {
139141
return nil
140142
}
141143

142-
before := 0
143-
if r.pendingBatchCount != nil {
144-
before = r.pendingBatchCount()
144+
var before uint64
145+
if r.totalEnqueuedBatches != nil {
146+
before = r.totalEnqueuedBatches()
145147
}
146148

147149
_, err = r.sequencer.SubmitBatchTxs(r.ctx, coresequencer.SubmitBatchTxsRequest{
@@ -152,10 +154,10 @@ func (r *Reaper) drainMempool() error {
152154
return fmt.Errorf("failed to submit txs to sequencer: %w", err)
153155
}
154156

155-
// without a pending count we cannot tell duplicates apart, so assume queued
157+
// without an enqueue count we cannot tell duplicates apart, so assume queued
156158
queued := true
157-
if r.pendingBatchCount != nil {
158-
queued = r.pendingBatchCount() > before
159+
if r.totalEnqueuedBatches != nil {
160+
queued = r.totalEnqueuedBatches() > before
159161
}
160162

161163
if r.onTxsSubmitted != nil && queued {

block/internal/reaping/reaper_test.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -148,14 +148,14 @@ func TestReaper_DuplicateScrape_NoNotify(t *testing.T) {
148148
mockSeq := testmocks.NewMockSequencer(t)
149149

150150
var notified atomic.Bool
151-
// pending batch count never changes — submission was fully deduped
151+
// enqueue count never changes — submission was fully deduped
152152
r, err := NewReaper(
153153
mockExec, mockSeq,
154154
genesis.Genesis{ChainID: "test-chain"},
155155
zerolog.Nop(),
156156
100*time.Millisecond,
157157
func() { notified.Store(true) },
158-
func() int { return 1 },
158+
func() uint64 { return 1 },
159159
)
160160
require.NoError(t, err)
161161

pkg/sequencers/single/queue.go

Lines changed: 63 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,7 @@ import (
1111

1212
ds "github.com/ipfs/go-datastore"
1313
"github.com/ipfs/go-datastore/query"
14+
"github.com/rs/zerolog"
1415
"google.golang.org/protobuf/proto"
1516

1617
coresequencer "github.com/evstack/ev-node/core/sequencer"
@@ -45,26 +46,34 @@ type BatchQueue struct {
4546
// inFlightPostponed holds txs that should be requeued on Ack.
4647
// Set via SetPostponed between Drain and Ack. Cleared only on successful Ack.
4748
inFlightPostponed [][]byte
48-
// inFlightPostponedItem tracks a postponed batch already persisted during Ack
49-
// so retries do not append duplicate postponed entries.
50-
inFlightPostponedItem *queuedItem
49+
// postponedItem holds a postponed batch persisted to the WAL during Ack.
50+
// It is only prepended to the in-memory queue once Ack fully succeeds, so
51+
// a direct Ack retry does not persist a duplicate entry. If a Drain rolls
52+
// the in-flight state back instead, the entry is discarded again because
53+
// its txs are still covered by the rolled-back WAL entries.
54+
postponedItem *queuedItem
5155

5256
// txSeen is an in-memory dedup set keyed by sha256 hash of each tx.
5357
// hashes are added in AddBatch and removed on successful Ack.
5458
// prevents the reaper from enqueuing the same tx multiple scrape cycles.
5559
txSeen map[[32]byte]struct{}
5660

61+
// totalEnqueued counts batches ever enqueued via AddBatch. Monotonic,
62+
// never decremented, so callers can detect new enqueues race-free.
63+
totalEnqueued uint64
64+
5765
// Sequence numbers for generating new keys
5866
nextAddSeq uint64
5967
nextPrependSeq uint64
6068

61-
mu sync.Mutex
62-
db ds.Batching
69+
mu sync.Mutex
70+
db ds.Batching
71+
logger zerolog.Logger
6372
}
6473

6574
// NewBatchQueue creates a new BatchQueue with the specified maximum size.
6675
// If maxSize is 0, the queue will be unlimited.
67-
func NewBatchQueue(db ds.Batching, prefix string, maxSize int) *BatchQueue {
76+
func NewBatchQueue(db ds.Batching, prefix string, maxSize int, logger zerolog.Logger) *BatchQueue {
6877
return &BatchQueue{
6978
queue: make([]queuedItem, 0),
7079
head: 0,
@@ -73,6 +82,7 @@ func NewBatchQueue(db ds.Batching, prefix string, maxSize int) *BatchQueue {
7382
db: store.NewPrefixKVStore(db, prefix),
7483
nextAddSeq: initialSeqNum,
7584
nextPrependSeq: initialSeqNum - 1,
85+
logger: logger,
7686
}
7787
}
7888

@@ -129,6 +139,7 @@ func (bq *BatchQueue) AddBatch(ctx context.Context, batch coresequencer.Batch) e
129139
bq.nextAddSeq++
130140

131141
bq.queue = append(bq.queue, queuedItem{Batch: batch, Key: key})
142+
bq.totalEnqueued++
132143

133144
return nil
134145
}
@@ -149,7 +160,7 @@ func (bq *BatchQueue) Drain(ctx context.Context, maxBytes uint64) (*coresequence
149160
bq.mu.Lock()
150161
defer bq.mu.Unlock()
151162

152-
bq.rollbackInFlightLocked()
163+
bq.rollbackInFlightLocked(ctx)
153164

154165
if bq.head >= len(bq.queue) {
155166
return &coresequencer.Batch{Transactions: nil}, nil
@@ -192,7 +203,7 @@ func (bq *BatchQueue) Drain(ctx context.Context, maxBytes uint64) (*coresequence
192203
func (bq *BatchQueue) SetPostponed(txs [][]byte) {
193204
bq.mu.Lock()
194205
defer bq.mu.Unlock()
195-
if bq.inFlightPostponedItem != nil {
206+
if bq.postponedItem != nil {
196207
return
197208
}
198209
bq.inFlightPostponed = txs
@@ -208,17 +219,16 @@ func (bq *BatchQueue) Ack(ctx context.Context) error {
208219

209220
// persist postponed txs BEFORE deleting source WAL entries.
210221
// if this fails the original entries still exist — no data loss.
211-
if len(bq.inFlightPostponed) > 0 && bq.inFlightPostponedItem == nil {
222+
// the item is only prepended to the in-memory queue after the WAL
223+
// deletes succeed, so a rollback never sees its txs twice.
224+
if len(bq.inFlightPostponed) > 0 && bq.postponedItem == nil {
212225
batch := coresequencer.Batch{Transactions: bq.inFlightPostponed}
213226
key := seqToKey(bq.nextPrependSeq)
214227
if err := bq.persistBatch(ctx, batch, key); err != nil {
215228
return fmt.Errorf("failed to persist postponed txs: %w", err)
216229
}
217230
bq.nextPrependSeq--
218-
219-
item := queuedItem{Batch: batch, Key: key}
220-
bq.prependItemLocked(item)
221-
bq.inFlightPostponedItem = &item
231+
bq.postponedItem = &queuedItem{Batch: batch, Key: key}
222232
}
223233

224234
// delete WAL entries for committed inFlight items in one batch.
@@ -259,10 +269,15 @@ func (bq *BatchQueue) Ack(ctx context.Context) error {
259269
}
260270
}
261271

272+
// requeue the persisted postponed entry now that the commit is durable
273+
if bq.postponedItem != nil {
274+
bq.prependItemLocked(*bq.postponedItem)
275+
}
276+
262277
clear(bq.inFlight)
263278
bq.inFlight = bq.inFlight[:0]
264279
bq.inFlightPostponed = nil
265-
bq.inFlightPostponedItem = nil
280+
bq.postponedItem = nil
266281

267282
return nil
268283
}
@@ -279,12 +294,25 @@ func (bq *BatchQueue) prependItemLocked(item queuedItem) {
279294
}
280295

281296
// rollbackInFlightLocked moves un-acked inFlight items back to the front of the queue.
282-
// Must be called with bq.mu held.
283-
func (bq *BatchQueue) rollbackInFlightLocked() {
297+
// Postponed state is discarded: the postponed txs are still covered by the
298+
// rolled-back WAL entries, so a persisted postponed entry would duplicate
299+
// them and is deleted (best-effort; Load dedups any leftover on restart).
300+
// The caller is expected to make a fresh SetPostponed decision after the
301+
// next Drain. Must be called with bq.mu held.
302+
func (bq *BatchQueue) rollbackInFlightLocked(ctx context.Context) {
284303
if len(bq.inFlight) == 0 {
285304
return
286305
}
287306

307+
if bq.postponedItem != nil {
308+
if err := bq.db.Delete(ctx, ds.NewKey(bq.postponedItem.Key)); err != nil {
309+
bq.logger.Warn().Err(err).Str("key", bq.postponedItem.Key).
310+
Msg("failed to delete rolled-back postponed WAL entry")
311+
}
312+
bq.postponedItem = nil
313+
}
314+
bq.inFlightPostponed = nil
315+
288316
if bq.head >= len(bq.inFlight) {
289317
// enough head slots — fill them directly
290318
for i := len(bq.inFlight) - 1; i >= 0; i-- {
@@ -337,13 +365,13 @@ func (bq *BatchQueue) dedupAndEnqueueLocked(ctx context.Context, batch coreseque
337365
switch {
338366
case len(filtered) == 0:
339367
if err := bq.db.Delete(ctx, ds.NewKey(key)); err != nil {
340-
fmt.Printf("Error deleting duplicate WAL entry %s: %v\n", key, err)
368+
bq.logger.Error().Err(err).Str("key", key).Msg("failed to delete duplicate WAL entry")
341369
}
342370
return
343371
case len(filtered) < len(batch.Transactions):
344372
batch = coresequencer.Batch{Transactions: filtered}
345373
if err := bq.persistBatch(ctx, batch, key); err != nil {
346-
fmt.Printf("Error rewriting partially duplicate WAL entry %s: %v\n", key, err)
374+
bq.logger.Error().Err(err).Str("key", key).Msg("failed to rewrite partially duplicate WAL entry")
347375
}
348376
}
349377

@@ -355,6 +383,8 @@ func (bq *BatchQueue) dedupAndEnqueueLocked(ctx context.Context, batch coreseque
355383
// may still hold entries whose txs were already committed in the last block.
356384
// Entries are rewritten in place (or deleted when emptied) so a subsequent
357385
// reload stays consistent. Returns the number of dropped transactions.
386+
// It must be called on a freshly loaded queue: only queued entries are
387+
// scanned, so any in-flight entries would be missed.
358388
func (bq *BatchQueue) DropIncluded(ctx context.Context, included [][]byte) (int, error) {
359389
bq.mu.Lock()
360390
defer bq.mu.Unlock()
@@ -410,7 +440,7 @@ func (bq *BatchQueue) Load(ctx context.Context) error {
410440
bq.txSeen = make(map[[32]byte]struct{})
411441
bq.inFlight = nil
412442
bq.inFlightPostponed = nil
413-
bq.inFlightPostponedItem = nil
443+
bq.postponedItem = nil
414444
bq.nextAddSeq = initialSeqNum
415445
bq.nextPrependSeq = initialSeqNum - 1
416446

@@ -426,7 +456,7 @@ func (bq *BatchQueue) Load(ctx context.Context) error {
426456
var legacyItems []queuedItem
427457
for result := range results.Next() {
428458
if result.Error != nil {
429-
fmt.Printf("Error reading entry from datastore: %v\n", result.Error)
459+
bq.logger.Error().Err(result.Error).Msg("failed to read entry from datastore")
430460
continue
431461
}
432462
// We care about the last part of the key (the sequence number)
@@ -436,7 +466,7 @@ func (bq *BatchQueue) Load(ctx context.Context) error {
436466
var pbBatch pb.Batch
437467
err := proto.Unmarshal(result.Value, &pbBatch)
438468
if err != nil {
439-
fmt.Printf("Error decoding batch for key '%s': %v. Skipping entry.\n", keyName, err)
469+
bq.logger.Error().Err(err).Str("key", keyName).Msg("failed to decode batch, skipping entry")
440470
continue
441471
}
442472

@@ -465,18 +495,18 @@ func (bq *BatchQueue) Load(ctx context.Context) error {
465495
if len(legacyItems) == 0 {
466496
return nil
467497
}
468-
fmt.Printf("Found %d legacy items to migrate...\n", len(legacyItems))
498+
bq.logger.Info().Int("count", len(legacyItems)).Msg("found legacy items to migrate")
469499

470500
for _, item := range legacyItems {
471501
newKeyName := seqToKey(bq.nextAddSeq)
472502

473503
if err := bq.persistBatch(ctx, item.Batch, newKeyName); err != nil {
474-
fmt.Printf("Failed to migrate legacy item %s: %v\n", item.Key, err)
504+
bq.logger.Error().Err(err).Str("key", item.Key).Msg("failed to migrate legacy item")
475505
continue
476506
}
477507

478508
if err := bq.db.Delete(ctx, ds.NewKey(item.Key)); err != nil {
479-
fmt.Printf("Failed to delete legacy key %s after migration: %v\n", item.Key, err)
509+
bq.logger.Error().Err(err).Str("key", item.Key).Msg("failed to delete legacy key after migration")
480510
}
481511

482512
bq.dedupAndEnqueueLocked(ctx, item.Batch, newKeyName)
@@ -493,6 +523,15 @@ func (bq *BatchQueue) Size() int {
493523
return len(bq.queue) - bq.head + len(bq.inFlight)
494524
}
495525

526+
// TotalEnqueued returns a monotonic count of batches ever enqueued via
527+
// AddBatch. Unlike Size it never decreases, so comparing two snapshots
528+
// reliably detects whether new batches were enqueued in between.
529+
func (bq *BatchQueue) TotalEnqueued() uint64 {
530+
bq.mu.Lock()
531+
defer bq.mu.Unlock()
532+
return bq.totalEnqueued
533+
}
534+
496535
// persistBatch persists a batch to the datastore with the given key
497536
func (bq *BatchQueue) persistBatch(ctx context.Context, batch coresequencer.Batch, key string) error {
498537
pbBatch := &pb.Batch{

pkg/sequencers/single/queue_migration_test.go

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@ import (
66

77
ds "github.com/ipfs/go-datastore"
88
"github.com/ipfs/go-datastore/query"
9+
"github.com/rs/zerolog"
910
"github.com/stretchr/testify/require"
1011
"google.golang.org/protobuf/proto"
1112

@@ -53,7 +54,7 @@ func TestLoad_MigratesLegacyKeys(t *testing.T) {
5354
require.NoError(err)
5455

5556
// 2. Create Queue and call Load
56-
bq := NewBatchQueue(memdb, prefix, 0)
57+
bq := NewBatchQueue(memdb, prefix, 0, zerolog.Nop())
5758
err = bq.Load(ctx)
5859
require.NoError(err)
5960

@@ -129,7 +130,7 @@ func TestLoad_Migration_DBCheck(t *testing.T) {
129130
require.NoError(err)
130131

131132
// Load
132-
bq := NewBatchQueue(memdb, prefix, 0)
133+
bq := NewBatchQueue(memdb, prefix, 0, zerolog.Nop())
133134
require.NoError(bq.Load(ctx))
134135

135136
// Verify DB keys

0 commit comments

Comments
 (0)