Skip to content

Commit 7f87f82

Browse files
authored
refactor: split reaper (#2660)
1 parent fe2dce2 commit 7f87f82

5 files changed

Lines changed: 132 additions & 75 deletions

File tree

block/components.go

Lines changed: 24 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,7 @@ import (
1010

1111
"github.com/evstack/ev-node/block/internal/cache"
1212
"github.com/evstack/ev-node/block/internal/executing"
13+
"github.com/evstack/ev-node/block/internal/reaping"
1314
"github.com/evstack/ev-node/block/internal/submitting"
1415
"github.com/evstack/ev-node/block/internal/syncing"
1516
coreda "github.com/evstack/ev-node/core/da"
@@ -25,6 +26,7 @@ import (
2526
// Components represents the block-related components
2627
type Components struct {
2728
Executor *executing.Executor
29+
Reaper *reaping.Reaper
2830
Syncer *syncing.Syncer
2931
Submitter *submitting.Submitter
3032
Cache cache.Manager
@@ -65,6 +67,11 @@ func (bc *Components) Start(ctx context.Context) error {
6567
return fmt.Errorf("failed to start executor: %w", err)
6668
}
6769
}
70+
if bc.Reaper != nil {
71+
if err := bc.Reaper.Start(ctxWithCancel); err != nil {
72+
return fmt.Errorf("failed to start reaper: %w", err)
73+
}
74+
}
6875
if bc.Syncer != nil {
6976
if err := bc.Syncer.Start(ctxWithCancel); err != nil {
7077
return fmt.Errorf("failed to start syncer: %w", err)
@@ -96,6 +103,11 @@ func (bc *Components) Stop() error {
96103
errs = errors.Join(errs, fmt.Errorf("failed to stop executor: %w", err))
97104
}
98105
}
106+
if bc.Reaper != nil {
107+
if err := bc.Reaper.Stop(); err != nil {
108+
errs = errors.Join(errs, fmt.Errorf("failed to stop reaper: %w", err))
109+
}
110+
}
99111
if bc.Syncer != nil {
100112
if err := bc.Syncer.Stop(); err != nil {
101113
errs = errors.Join(errs, fmt.Errorf("failed to stop syncer: %w", err))
@@ -220,6 +232,17 @@ func NewAggregatorComponents(
220232
return nil, fmt.Errorf("failed to create executor: %w", err)
221233
}
222234

235+
reaper, err := reaping.NewReaper(
236+
exec,
237+
sequencer,
238+
genesis,
239+
logger,
240+
executor,
241+
)
242+
if err != nil {
243+
return nil, fmt.Errorf("failed to create reaper: %w", err)
244+
}
245+
223246
// Create DA submitter for aggregator nodes (with signer for submission)
224247
daSubmitter := submitting.NewDASubmitter(da, config, genesis, blockOpts, logger)
225248
submitter := submitting.NewSubmitter(
@@ -237,6 +260,7 @@ func NewAggregatorComponents(
237260

238261
return &Components{
239262
Executor: executor,
263+
Reaper: reaper,
240264
Submitter: submitter,
241265
Cache: cacheManager,
242266
errorCh: errorCh,

block/internal/executing/executor.go

Lines changed: 1 addition & 27 deletions
Original file line numberDiff line numberDiff line change
@@ -22,11 +22,6 @@ import (
2222
"github.com/evstack/ev-node/types"
2323
)
2424

25-
const (
26-
// DefaultInterval is the default reaper interval
27-
DefaultInterval = 1 * time.Second
28-
)
29-
3025
// broadcaster interface for P2P broadcasting
3126
type broadcaster[T any] interface {
3227
WriteToStoreAndBroadcast(ctx context.Context, payload T) error
@@ -61,9 +56,6 @@ type Executor struct {
6156
txNotifyCh chan struct{}
6257
errorCh chan<- error // Channel to report critical execution client failures
6358

64-
// Reaper for transaction processing
65-
reaper *Reaper
66-
6759
// Logging
6860
logger zerolog.Logger
6961

@@ -126,27 +118,13 @@ func (e *Executor) Start(ctx context.Context) error {
126118
return fmt.Errorf("failed to initialize state: %w", err)
127119
}
128120

129-
// Initialize reaper
130-
reaperStore, err := store.NewDefaultInMemoryKVStore()
131-
if err != nil {
132-
return fmt.Errorf("failed to create reaper store: %w", err)
133-
}
134-
e.reaper = NewReaper(e.ctx, e.exec, e.sequencer, e.genesis.ChainID, DefaultInterval, e.logger, reaperStore, e)
135-
136121
// Start execution loop
137122
e.wg.Add(1)
138123
go func() {
139124
defer e.wg.Done()
140125
e.executionLoop()
141126
}()
142127

143-
// Start reaper
144-
e.wg.Add(1)
145-
go func() {
146-
defer e.wg.Done()
147-
e.reaper.Start(e.ctx)
148-
}()
149-
150128
e.logger.Info().Msg("executor started")
151129
return nil
152130
}
@@ -157,6 +135,7 @@ func (e *Executor) Stop() error {
157135
e.cancel()
158136
}
159137
e.wg.Wait()
138+
160139
e.logger.Info().Msg("executor stopped")
161140
return nil
162141
}
@@ -657,11 +636,6 @@ func (e *Executor) recordBlockMetrics(data *types.Data) {
657636
e.metrics.CommittedHeight.Set(float64(data.Metadata.Height))
658637
}
659638

660-
// GetCoreExecutor returns the underlying core executor for testing purposes
661-
func (e *Executor) GetCoreExecutor() coreexecutor.Executor {
662-
return e.exec
663-
}
664-
665639
// BatchData represents batch data from sequencer
666640
type BatchData struct {
667641
*coresequencer.Batch

block/internal/executing/utils.go

Lines changed: 19 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,19 @@
1+
package executing
2+
3+
import coreexecutor "github.com/evstack/ev-node/core/execution"
4+
5+
// GetCoreExecutor returns the underlying core executor for testing purposes
6+
func (e *Executor) GetCoreExecutor() coreexecutor.Executor {
7+
return e.exec
8+
}
9+
10+
// HasPendingTxNotification checks if there's a pending transaction notification
11+
// It is used for testing purposes
12+
func (e *Executor) HasPendingTxNotification() bool {
13+
select {
14+
case <-e.txNotifyCh:
15+
return true
16+
default:
17+
return false
18+
}
19+
}
Lines changed: 65 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -1,67 +1,108 @@
1-
package executing
1+
package reaping
22

33
import (
44
"context"
55
"crypto/sha256"
66
"encoding/hex"
7+
"errors"
8+
"fmt"
9+
"sync"
710
"time"
811

912
ds "github.com/ipfs/go-datastore"
1013
"github.com/rs/zerolog"
1114

15+
"github.com/evstack/ev-node/block/internal/executing"
1216
coreexecutor "github.com/evstack/ev-node/core/execution"
1317
coresequencer "github.com/evstack/ev-node/core/sequencer"
18+
"github.com/evstack/ev-node/pkg/genesis"
19+
"github.com/evstack/ev-node/pkg/store"
1420
)
1521

22+
// DefaultInterval is the default reaper interval
23+
const DefaultInterval = 1 * time.Second
24+
1625
// Reaper is responsible for periodically retrieving transactions from the executor,
1726
// filtering out already seen transactions, and submitting new transactions to the sequencer.
1827
type Reaper struct {
1928
exec coreexecutor.Executor
2029
sequencer coresequencer.Sequencer
2130
chainID string
2231
interval time.Duration
23-
logger zerolog.Logger
24-
ctx context.Context
2532
seenStore ds.Batching
26-
executor *Executor
33+
executor *executing.Executor
34+
35+
// shared components
36+
logger zerolog.Logger
37+
38+
// Lifecycle
39+
ctx context.Context
40+
cancel context.CancelFunc
41+
wg sync.WaitGroup
2742
}
2843

2944
// NewReaper creates a new Reaper instance with persistent seenTx storage.
30-
func NewReaper(ctx context.Context, exec coreexecutor.Executor, sequencer coresequencer.Sequencer, chainID string, interval time.Duration, logger zerolog.Logger, store ds.Batching, executor *Executor) *Reaper {
31-
if interval <= 0 {
32-
interval = DefaultInterval
45+
func NewReaper(exec coreexecutor.Executor, sequencer coresequencer.Sequencer, genesis genesis.Genesis, logger zerolog.Logger, executor *executing.Executor) (*Reaper, error) {
46+
if executor == nil {
47+
return nil, errors.New("executor cannot be nil")
48+
}
49+
50+
store, err := store.NewDefaultInMemoryKVStore()
51+
if err != nil {
52+
return nil, fmt.Errorf("failed to create reaper store: %w", err)
3353
}
54+
3455
return &Reaper{
3556
exec: exec,
3657
sequencer: sequencer,
37-
chainID: chainID,
38-
interval: interval,
58+
chainID: genesis.ChainID,
59+
interval: DefaultInterval, // eventually this can be made configurable via config.Node
3960
logger: logger.With().Str("component", "reaper").Logger(),
40-
ctx: ctx,
4161
seenStore: store,
4262
executor: executor,
43-
}
63+
}, nil
4464
}
4565

46-
// Start begins the reaping process at the specified interval.
47-
func (r *Reaper) Start(ctx context.Context) {
48-
r.ctx = ctx
49-
ticker := time.NewTicker(r.interval)
50-
defer ticker.Stop()
66+
// Start begins the execution component
67+
func (r *Reaper) Start(ctx context.Context) error {
68+
r.ctx, r.cancel = context.WithCancel(ctx)
69+
70+
// Start repear loop
71+
r.wg.Add(1)
72+
go func() {
73+
defer r.wg.Done()
74+
r.reaperLoop()
75+
}()
5176

5277
r.logger.Info().Dur("interval", r.interval).Msg("reaper started")
78+
return nil
79+
}
80+
81+
func (r *Reaper) reaperLoop() {
82+
ticker := time.NewTicker(r.interval)
83+
defer ticker.Stop()
5384

5485
for {
5586
select {
56-
case <-ctx.Done():
57-
r.logger.Info().Msg("reaper stopped")
87+
case <-r.ctx.Done():
5888
return
5989
case <-ticker.C:
6090
r.SubmitTxs()
6191
}
6292
}
6393
}
6494

95+
// Stop shuts down the reaper component
96+
func (r *Reaper) Stop() error {
97+
if r.cancel != nil {
98+
r.cancel()
99+
}
100+
r.wg.Wait()
101+
102+
r.logger.Info().Msg("reaper stopped")
103+
return nil
104+
}
105+
65106
// SubmitTxs retrieves transactions from the executor and submits them to the sequencer.
66107
func (r *Reaper) SubmitTxs() {
67108
txs, err := r.exec.GetTxs(r.ctx)
@@ -109,14 +150,19 @@ func (r *Reaper) SubmitTxs() {
109150
}
110151

111152
// Notify the executor that new transactions are available
112-
if r.executor != nil && len(newTxs) > 0 {
153+
if len(newTxs) > 0 {
113154
r.logger.Debug().Msg("notifying executor of new transactions")
114155
r.executor.NotifyNewTransactions()
115156
}
116157

117158
r.logger.Debug().Msg("successfully submitted txs")
118159
}
119160

161+
// SeenStore returns the datastore used to track seen transactions.
162+
func (r *Reaper) SeenStore() ds.Datastore {
163+
return r.seenStore
164+
}
165+
120166
func hashTx(tx []byte) string {
121167
hash := sha256.Sum256(tx)
122168
return hex.EncodeToString(hash[:])

0 commit comments

Comments
 (0)