Skip to content

Commit 9ec6221

Browse files
Copilotjulienrbrt
andauthored
feat: add recovery history depth pruner (#3064)
* Initial plan * Add recovery history retention pruning Co-authored-by: julienrbrt <29894366+julienrbrt@users.noreply.github.com> * feat: add configurable recovery history retention Co-authored-by: julienrbrt <29894366+julienrbrt@users.noreply.github.com> * Refactor recovery pruning into pruner component Co-authored-by: julienrbrt <29894366+julienrbrt@users.noreply.github.com> * Address pruner review feedback Co-authored-by: julienrbrt <29894366+julienrbrt@users.noreply.github.com> * Update evm test module dependency Co-authored-by: julienrbrt <29894366+julienrbrt@users.noreply.github.com> * Address code review feedback Co-authored-by: julienrbrt <29894366+julienrbrt@users.noreply.github.com> * Refine pruner checks and docs Co-authored-by: julienrbrt <29894366+julienrbrt@users.noreply.github.com> * Clarify PruneExecMeta comment Co-authored-by: julienrbrt <29894366+julienrbrt@users.noreply.github.com> * Rename recovery history setting Co-authored-by: julienrbrt <29894366+julienrbrt@users.noreply.github.com> * Adjust pruner interval and defaults Co-authored-by: julienrbrt <29894366+julienrbrt@users.noreply.github.com> * fixes * updates * updates * updates * comment --------- Co-authored-by: copilot-swe-agent[bot] <198982749+Copilot@users.noreply.github.com> Co-authored-by: julienrbrt <29894366+julienrbrt@users.noreply.github.com> Co-authored-by: Julien Robert <julien@rbrt.fr>
1 parent c01d909 commit 9ec6221

20 files changed

Lines changed: 491 additions & 83 deletions

File tree

apps/evm/cmd/rollback.go

Lines changed: 5 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,7 @@ import (
88

99
"github.com/ethereum/go-ethereum/common"
1010
ds "github.com/ipfs/go-datastore"
11+
"github.com/rs/zerolog"
1112
"github.com/spf13/cobra"
1213

1314
"github.com/evstack/ev-node/execution/evm"
@@ -30,6 +31,7 @@ func NewRollbackCmd() *cobra.Command {
3031
if err != nil {
3132
return err
3233
}
34+
logger := rollcmd.SetupLogger(nodeConfig.Log)
3335

3436
goCtx := cmd.Context()
3537
if goCtx == nil {
@@ -69,7 +71,7 @@ func NewRollbackCmd() *cobra.Command {
6971
}
7072

7173
// rollback execution layer via EngineClient
72-
engineClient, err := createRollbackEngineClient(cmd, rawEvolveDB)
74+
engineClient, err := createRollbackEngineClient(cmd, rawEvolveDB, logger.With().Str("module", "engine_client").Logger())
7375
if err != nil {
7476
cmd.Printf("Warning: failed to create engine client, skipping EL rollback: %v\n", err)
7577
} else {
@@ -99,7 +101,7 @@ func NewRollbackCmd() *cobra.Command {
99101
return cmd
100102
}
101103

102-
func createRollbackEngineClient(cmd *cobra.Command, db ds.Batching) (*evm.EngineClient, error) {
104+
func createRollbackEngineClient(cmd *cobra.Command, db ds.Batching, logger zerolog.Logger) (*evm.EngineClient, error) {
103105
ethURL, err := cmd.Flags().GetString(evm.FlagEvmEthURL)
104106
if err != nil {
105107
return nil, fmt.Errorf("failed to get '%s' flag: %w", evm.FlagEvmEthURL, err)
@@ -128,5 +130,5 @@ func createRollbackEngineClient(cmd *cobra.Command, db ds.Batching) (*evm.Engine
128130
return nil, fmt.Errorf("JWT secret file '%s' is empty", jwtSecretFile)
129131
}
130132

131-
return evm.NewEngineExecutionClient(ethURL, engineURL, jwtSecret, common.Hash{}, common.Address{}, db, false)
133+
return evm.NewEngineExecutionClient(ethURL, engineURL, jwtSecret, common.Hash{}, common.Address{}, db, false, logger)
132134
}

apps/evm/cmd/run.go

Lines changed: 3 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -55,7 +55,7 @@ var RunCmd = &cobra.Command{
5555
}
5656

5757
tracingEnabled := nodeConfig.Instrumentation.IsTracingEnabled()
58-
executor, err := createExecutionClient(cmd, datastore, tracingEnabled)
58+
executor, err := createExecutionClient(cmd, datastore, tracingEnabled, logger.With().Str("module", "engine_client").Logger())
5959
if err != nil {
6060
return err
6161
}
@@ -67,11 +67,6 @@ var RunCmd = &cobra.Command{
6767

6868
daClient := block.NewDAClient(blobClient, nodeConfig, logger)
6969

70-
// Attach logger to the EVM engine client if available
71-
if ec, ok := executor.(*evm.EngineClient); ok {
72-
ec.SetLogger(logger.With().Str("module", "engine_client").Logger())
73-
}
74-
7570
headerNamespace := da.NamespaceFromString(nodeConfig.DA.GetNamespace())
7671
dataNamespace := da.NamespaceFromString(nodeConfig.DA.GetDataNamespace())
7772

@@ -192,7 +187,7 @@ func createSequencer(
192187
return sequencer, nil
193188
}
194189

195-
func createExecutionClient(cmd *cobra.Command, db datastore.Batching, tracingEnabled bool) (execution.Executor, error) {
190+
func createExecutionClient(cmd *cobra.Command, db datastore.Batching, tracingEnabled bool, logger zerolog.Logger) (execution.Executor, error) {
196191
// Read execution client parameters from flags
197192
ethURL, err := cmd.Flags().GetString(evm.FlagEvmEthURL)
198193
if err != nil {
@@ -237,7 +232,7 @@ func createExecutionClient(cmd *cobra.Command, db datastore.Batching, tracingEna
237232
genesisHash := common.HexToHash(genesisHashStr)
238233
feeRecipient := common.HexToAddress(feeRecipientStr)
239234

240-
return evm.NewEngineExecutionClient(ethURL, engineURL, jwtSecret, genesisHash, feeRecipient, db, tracingEnabled)
235+
return evm.NewEngineExecutionClient(ethURL, engineURL, jwtSecret, genesisHash, feeRecipient, db, tracingEnabled, logger)
241236
}
242237

243238
// addFlags adds flags related to the EVM execution client

block/components.go

Lines changed: 27 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,7 @@ import (
1212
"github.com/evstack/ev-node/block/internal/common"
1313
da "github.com/evstack/ev-node/block/internal/da"
1414
"github.com/evstack/ev-node/block/internal/executing"
15+
"github.com/evstack/ev-node/block/internal/pruner"
1516
"github.com/evstack/ev-node/block/internal/reaping"
1617
"github.com/evstack/ev-node/block/internal/submitting"
1718
"github.com/evstack/ev-node/block/internal/syncing"
@@ -29,6 +30,7 @@ import (
2930
// Components represents the block-related components
3031
type Components struct {
3132
Executor *executing.Executor
33+
Pruner *pruner.Pruner
3234
Reaper *reaping.Reaper
3335
Syncer *syncing.Syncer
3436
Submitter *submitting.Submitter
@@ -60,6 +62,11 @@ func (bc *Components) Start(ctx context.Context) error {
6062
return fmt.Errorf("failed to start executor: %w", err)
6163
}
6264
}
65+
if bc.Pruner != nil {
66+
if err := bc.Pruner.Start(ctxWithCancel); err != nil {
67+
return fmt.Errorf("failed to start pruner: %w", err)
68+
}
69+
}
6370
if bc.Reaper != nil {
6471
if err := bc.Reaper.Start(ctxWithCancel); err != nil {
6572
return fmt.Errorf("failed to start reaper: %w", err)
@@ -96,6 +103,11 @@ func (bc *Components) Stop() error {
96103
errs = errors.Join(errs, fmt.Errorf("failed to stop executor: %w", err))
97104
}
98105
}
106+
if bc.Pruner != nil {
107+
if err := bc.Pruner.Stop(); err != nil {
108+
errs = errors.Join(errs, fmt.Errorf("failed to stop pruner: %w", err))
109+
}
110+
}
99111
if bc.Reaper != nil {
100112
if err := bc.Reaper.Stop(); err != nil {
101113
errs = errors.Join(errs, fmt.Errorf("failed to stop reaper: %w", err))
@@ -166,6 +178,12 @@ func NewSyncComponents(
166178
syncer.SetBlockSyncer(syncing.WithTracingBlockSyncer(syncer))
167179
}
168180

181+
var execPruner coreexecutor.ExecPruner
182+
if p, ok := exec.(coreexecutor.ExecPruner); ok {
183+
execPruner = p
184+
}
185+
pruner := pruner.New(logger, store, execPruner, config.Node)
186+
169187
// Create submitter for sync nodes (no signer, only DA inclusion processing)
170188
var daSubmitter submitting.DASubmitterAPI = submitting.NewDASubmitter(daClient, config, genesis, blockOpts, metrics, logger, headerDAHintAppender, dataDAHintAppender)
171189
if config.Instrumentation.IsTracingEnabled() {
@@ -189,6 +207,7 @@ func NewSyncComponents(
189207
Syncer: syncer,
190208
Submitter: submitter,
191209
Cache: cacheManager,
210+
Pruner: pruner,
192211
errorCh: errorCh,
193212
}, nil
194213
}
@@ -248,6 +267,12 @@ func NewAggregatorComponents(
248267
executor.SetBlockProducer(executing.WithTracingBlockProducer(executor))
249268
}
250269

270+
var execPruner coreexecutor.ExecPruner
271+
if p, ok := exec.(coreexecutor.ExecPruner); ok {
272+
execPruner = p
273+
}
274+
pruner := pruner.New(logger, store, execPruner, config.Node)
275+
251276
reaper, err := reaping.NewReaper(
252277
exec,
253278
sequencer,
@@ -264,6 +289,7 @@ func NewAggregatorComponents(
264289
if config.Node.BasedSequencer { // no submissions needed for bases sequencer
265290
return &Components{
266291
Executor: executor,
292+
Pruner: pruner,
267293
Reaper: reaper,
268294
Cache: cacheManager,
269295
errorCh: errorCh,
@@ -290,6 +316,7 @@ func NewAggregatorComponents(
290316

291317
return &Components{
292318
Executor: executor,
319+
Pruner: pruner,
293320
Reaper: reaper,
294321
Submitter: submitter,
295322
Cache: cacheManager,

block/components_test.go

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -127,6 +127,7 @@ func TestNewSyncComponents_Creation(t *testing.T) {
127127
assert.NotNil(t, components.Syncer)
128128
assert.NotNil(t, components.Submitter)
129129
assert.NotNil(t, components.Cache)
130+
assert.NotNil(t, components.Pruner)
130131
assert.NotNil(t, components.errorCh)
131132
assert.Nil(t, components.Executor) // Sync nodes don't have executors
132133
}
@@ -183,6 +184,7 @@ func TestNewAggregatorComponents_Creation(t *testing.T) {
183184
assert.NotNil(t, components.Executor)
184185
assert.NotNil(t, components.Submitter)
185186
assert.NotNil(t, components.Cache)
187+
assert.NotNil(t, components.Pruner)
186188
assert.NotNil(t, components.errorCh)
187189
assert.Nil(t, components.Syncer) // Aggregator nodes currently don't create syncers in this constructor
188190
}

block/internal/pruner/pruner.go

Lines changed: 195 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,195 @@
1+
package pruner
2+
3+
import (
4+
"context"
5+
"encoding/binary"
6+
"errors"
7+
"fmt"
8+
"sync"
9+
"time"
10+
11+
ds "github.com/ipfs/go-datastore"
12+
"github.com/rs/zerolog"
13+
14+
coreexecutor "github.com/evstack/ev-node/core/execution"
15+
16+
"github.com/evstack/ev-node/pkg/config"
17+
"github.com/evstack/ev-node/pkg/store"
18+
)
19+
20+
const defaultPruneInterval = 15 * time.Minute
21+
22+
// Pruner periodically removes old state and execution metadata entries.
23+
type Pruner struct {
24+
store store.Store
25+
execPruner coreexecutor.ExecPruner
26+
cfg config.NodeConfig
27+
logger zerolog.Logger
28+
lastPruned uint64
29+
30+
// Lifecycle
31+
ctx context.Context
32+
wg sync.WaitGroup
33+
cancel context.CancelFunc
34+
}
35+
36+
// New creates a new Pruner instance.
37+
func New(
38+
logger zerolog.Logger,
39+
store store.Store,
40+
execPruner coreexecutor.ExecPruner,
41+
cfg config.NodeConfig,
42+
) *Pruner {
43+
return &Pruner{
44+
store: store,
45+
execPruner: execPruner,
46+
cfg: cfg,
47+
logger: logger.With().Str("component", "prune").Logger(),
48+
}
49+
}
50+
51+
// Start begins the pruning loop.
52+
func (p *Pruner) Start(ctx context.Context) error {
53+
p.ctx, p.cancel = context.WithCancel(ctx)
54+
55+
// Start pruner loop
56+
p.wg.Go(p.pruneLoop)
57+
58+
p.logger.Info().Msg("pruner started")
59+
return nil
60+
}
61+
62+
// Stop stops the pruning loop.
63+
func (p *Pruner) Stop() error {
64+
if p.cancel != nil {
65+
p.cancel()
66+
}
67+
p.wg.Wait()
68+
69+
p.logger.Info().Msg("pruner stopped")
70+
return nil
71+
}
72+
73+
func (p *Pruner) pruneLoop() {
74+
ticker := time.NewTicker(defaultPruneInterval)
75+
defer ticker.Stop()
76+
77+
for {
78+
select {
79+
case <-ticker.C:
80+
if err := p.pruneRecoveryHistory(p.ctx, p.cfg.RecoveryHistoryDepth); err != nil {
81+
p.logger.Error().Err(err).Msg("failed to prune recovery history")
82+
}
83+
84+
if err := p.pruneBlocks(); err != nil {
85+
p.logger.Error().Err(err).Msg("failed to prune old blocks")
86+
}
87+
88+
// TODO: add pruning of old blocks // https://github.com/evstack/ev-node/pull/2984
89+
case <-p.ctx.Done():
90+
return
91+
}
92+
}
93+
}
94+
95+
func (p *Pruner) pruneBlocks() error {
96+
if !p.cfg.PruningEnabled || p.cfg.PruningKeepRecent == 0 || p.cfg.PruningInterval == 0 {
97+
return nil
98+
}
99+
100+
var currentDAIncluded uint64
101+
currentDAIncludedBz, err := p.store.GetMetadata(p.ctx, store.DAIncludedHeightKey)
102+
if err == nil && len(currentDAIncludedBz) == 8 {
103+
currentDAIncluded = binary.LittleEndian.Uint64(currentDAIncludedBz)
104+
} else {
105+
// if we cannot get the current DA height, we cannot safely prune, so we skip pruning until we can get it.
106+
return nil
107+
}
108+
109+
var lastPruned uint64
110+
if bz, err := p.store.GetMetadata(p.ctx, store.LastPrunedBlockHeightKey); err == nil && len(bz) == 8 {
111+
lastPruned = binary.LittleEndian.Uint64(bz)
112+
}
113+
114+
storeHeight, err := p.store.Height(p.ctx)
115+
if err != nil {
116+
return fmt.Errorf("failed to get store height for pruning: %w", err)
117+
}
118+
if storeHeight <= lastPruned+p.cfg.PruningInterval {
119+
return nil
120+
}
121+
122+
// Never prune blocks that are not DA included
123+
upperBound := min(storeHeight, currentDAIncluded)
124+
if upperBound <= p.cfg.PruningKeepRecent {
125+
// Not enough fully included blocks to prune
126+
return nil
127+
}
128+
129+
targetHeight := upperBound - p.cfg.PruningKeepRecent
130+
131+
if err := p.store.PruneBlocks(p.ctx, targetHeight); err != nil {
132+
p.logger.Error().Err(err).Uint64("target_height", targetHeight).Msg("failed to prune old block data")
133+
}
134+
135+
if p.execPruner != nil {
136+
if err := p.execPruner.PruneExec(p.ctx, targetHeight); err != nil && !errors.Is(err, ds.ErrNotFound) {
137+
return err
138+
}
139+
}
140+
141+
return nil
142+
}
143+
144+
// pruneRecoveryHistory prunes old state and execution metadata entries based on the configured retention depth.
145+
// It does not prunes old blocks, as those are handled by the pruning logic.
146+
// Pruning old state does not lose history but limit the ability to recover (replay or rollback) to the last HEAD-N blocks, where N is the retention depth.
147+
func (p *Pruner) pruneRecoveryHistory(ctx context.Context, retention uint64) error {
148+
if p.cfg.RecoveryHistoryDepth == 0 {
149+
return nil
150+
}
151+
152+
height, err := p.store.Height(ctx)
153+
if err != nil {
154+
return err
155+
}
156+
157+
if height <= retention {
158+
return nil
159+
}
160+
161+
target := height - retention
162+
if target <= p.lastPruned {
163+
return nil
164+
}
165+
166+
// maxPruneBatch limits how many heights we prune per cycle to bound work.
167+
// it is callibrated to prune the last N blocks in one cycle, where N is the number of blocks produced in the defaultPruneInterval.
168+
blockTime := p.cfg.BlockTime.Duration
169+
if blockTime == 0 {
170+
blockTime = 1
171+
}
172+
173+
maxPruneBatch := max(uint64(defaultPruneInterval/blockTime), (target-p.lastPruned)/5)
174+
175+
start := p.lastPruned + 1
176+
end := target
177+
if end-start+1 > maxPruneBatch {
178+
end = start + maxPruneBatch - 1
179+
}
180+
181+
for h := start; h <= end; h++ {
182+
if err := p.store.DeleteStateAtHeight(ctx, h); err != nil && !errors.Is(err, ds.ErrNotFound) {
183+
return err
184+
}
185+
186+
if p.execPruner != nil {
187+
if err := p.execPruner.PruneExec(ctx, h); err != nil && !errors.Is(err, ds.ErrNotFound) {
188+
return err
189+
}
190+
}
191+
}
192+
193+
p.lastPruned = end
194+
return nil
195+
}

0 commit comments

Comments
 (0)