Skip to content

Commit 1ed69b3

Browse files
committed
chore: pr feedback
1 parent e221198 commit 1ed69b3

4 files changed

Lines changed: 43 additions & 24 deletions

File tree

block/components.go

Lines changed: 0 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -238,9 +238,6 @@ func newAggregatorComponents(
238238
// error channel for critical failures
239239
errorCh := make(chan error, 1)
240240

241-
// capture raw sequencer before tracing wrap for batch ack interface
242-
rawSequencer := sequencer
243-
244241
// wrap sequencer with tracing if enabled
245242
if config.Instrumentation.IsTracingEnabled() {
246243
sequencer = telemetry.WithTracingSequencer(sequencer)
@@ -288,18 +285,6 @@ func newAggregatorComponents(
288285
return nil, fmt.Errorf("failed to create reaper: %w", err)
289286
}
290287

291-
// wire batch ack callback so drained queue entries are committed after block commit
292-
type batchAcknowledger interface {
293-
AckBatch(ctx context.Context) error
294-
}
295-
if acker, ok := rawSequencer.(batchAcknowledger); ok {
296-
executor.SetOnBatchCommitted(acker.AckBatch)
297-
} else if !config.Node.BasedSequencer {
298-
// without an ack, drained queue entries are rolled back on every
299-
// retrieval and the same transactions would be re-included each block
300-
logger.Warn().Msg("sequencer does not implement AckBatch; drained batch entries will not be acknowledged after block commit")
301-
}
302-
303288
if config.Node.BasedSequencer { // no submissions needed for based sequencer
304289
return &Components{
305290
Executor: executor,

block/internal/executing/executor.go

Lines changed: 17 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -100,6 +100,12 @@ type Executor struct {
100100
blockProducer BlockProducer
101101
}
102102

103+
// batchAcknowledger is implemented by sequencers whose drained queue
104+
// entries must be acknowledged after the block is durably committed.
105+
type batchAcknowledger interface {
106+
AckBatch(ctx context.Context) error
107+
}
108+
103109
// NewExecutor creates a new block executor.
104110
// The executor is responsible for:
105111
// - Block production from sequencer batches
@@ -162,6 +168,17 @@ func NewExecutor(
162168
logger: logger.With().Str("component", "executor").Logger(),
163169
}
164170
e.blockProducer = e
171+
172+
// wire the batch ack so drained queue entries are committed after block
173+
// commit. tracing wrappers forward AckBatch to the underlying sequencer.
174+
if acker, ok := sequencer.(batchAcknowledger); ok {
175+
e.onBatchCommitted = acker.AckBatch
176+
} else if !config.Node.BasedSequencer {
177+
// without an ack, drained queue entries are rolled back on every
178+
// retrieval and the same transactions would be re-included each block
179+
e.logger.Warn().Msg("sequencer does not implement AckBatch; drained batch entries will not be acknowledged after block commit")
180+
}
181+
165182
return e, nil
166183
}
167184

@@ -171,12 +188,6 @@ func (e *Executor) SetBlockProducer(bp BlockProducer) {
171188
e.blockProducer = bp
172189
}
173190

174-
// SetOnBatchCommitted registers a callback fired after each block commit.
175-
// If the callback fails, it is retried before the next block is produced.
176-
func (e *Executor) SetOnBatchCommitted(fn func(ctx context.Context) error) {
177-
e.onBatchCommitted = fn
178-
}
179-
180191
// ackCommittedBatch invokes the batch ack callback and tracks failures so
181192
// they can be retried before the next block is produced.
182193
func (e *Executor) ackCommittedBatch(ctx context.Context) error {

block/internal/executing/executor_logic_test.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -127,13 +127,13 @@ func TestProduceBlock_RetriesFailedAckBeforeNextBlock(t *testing.T) {
127127
defer fx.Cancel()
128128

129129
var ackCalls int
130-
fx.Exec.SetOnBatchCommitted(func(ctx context.Context) error {
130+
fx.Exec.onBatchCommitted = func(ctx context.Context) error {
131131
ackCalls++
132132
if ackCalls <= 2 {
133133
return assert.AnError
134134
}
135135
return nil
136-
})
136+
}
137137

138138
fx.MockSeq.EXPECT().GetNextBatch(mock.Anything, mock.AnythingOfType("sequencer.GetNextBatchRequest")).
139139
RunAndReturn(func(ctx context.Context, req coreseq.GetNextBatchRequest) (*coreseq.GetNextBatchResponse, error) {

pkg/telemetry/sequencer_tracing.go

Lines changed: 24 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -20,12 +20,35 @@ type tracedSequencer struct {
2020
tracer trace.Tracer
2121
}
2222

23+
// batchAcknowledger is implemented by sequencers that require an ack
24+
// after drained queue entries are durably committed in a block.
25+
type batchAcknowledger interface {
26+
AckBatch(ctx context.Context) error
27+
}
28+
2329
// WithTracingSequencer decorates the provided Sequencer with tracing spans.
30+
// If the inner sequencer implements AckBatch, the returned sequencer
31+
// forwards it so consumers can still detect and use the ack capability.
2432
func WithTracingSequencer(inner coresequencer.Sequencer) coresequencer.Sequencer {
25-
return &tracedSequencer{
33+
ts := &tracedSequencer{
2634
inner: inner,
2735
tracer: otel.Tracer("ev-node/sequencer"),
2836
}
37+
if acker, ok := inner.(batchAcknowledger); ok {
38+
return &tracedAckSequencer{tracedSequencer: ts, acker: acker}
39+
}
40+
return ts
41+
}
42+
43+
// tracedAckSequencer is a tracedSequencer whose inner sequencer also
44+
// implements AckBatch.
45+
type tracedAckSequencer struct {
46+
*tracedSequencer
47+
acker batchAcknowledger
48+
}
49+
50+
func (t *tracedAckSequencer) AckBatch(ctx context.Context) error {
51+
return t.acker.AckBatch(ctx)
2952
}
3053

3154
func (t *tracedSequencer) SubmitBatchTxs(ctx context.Context, req coresequencer.SubmitBatchTxsRequest) (*coresequencer.SubmitBatchTxsResponse, error) {

0 commit comments

Comments
 (0)