-
Notifications
You must be signed in to change notification settings - Fork 264
refactor: reaper to drain mempool #3236
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Merged
Merged
Changes from 9 commits
Commits
Show all changes
15 commits
Select commit
Hold shift + click to select a range
066544c
refactor: reaper to drain mempool
julienrbrt bc016bd
feedback
julienrbrt 6815a24
fix partial drain
julienrbrt 0940b1b
cleanup old readme
julienrbrt 8d68835
Merge branch 'main' into julien/reaper
julienrbrt 28033e8
Prevent multiple Start() calls across components
julienrbrt 2e58f04
fix unwanted log
julienrbrt 067a769
lock
julienrbrt 6909b30
updates
julienrbrt d28180e
remove redundant
julienrbrt 70c8b89
changelog
julienrbrt 6c44381
feedback
julienrbrt 485cff2
feedback
julienrbrt efb43cb
feedback
julienrbrt 2a071a2
add bench
julienrbrt File filter
Filter by extension
Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
There are no files selected for viewing
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -12,7 +12,6 @@ import ( | |
| "github.com/rs/zerolog" | ||
|
|
||
| "github.com/evstack/ev-node/block/internal/cache" | ||
| "github.com/evstack/ev-node/block/internal/executing" | ||
| coreexecutor "github.com/evstack/ev-node/core/execution" | ||
| coresequencer "github.com/evstack/ev-node/core/sequencer" | ||
| "github.com/evstack/ev-node/pkg/genesis" | ||
|
|
@@ -21,40 +20,35 @@ import ( | |
| const ( | ||
| // MaxBackoffInterval is the maximum backoff interval for retries | ||
| MaxBackoffInterval = 30 * time.Second | ||
| CleanupInterval = 1 * time.Hour | ||
| ) | ||
|
|
||
| // Reaper is responsible for periodically retrieving transactions from the executor, | ||
| // filtering out already seen transactions, and submitting new transactions to the sequencer. | ||
| type Reaper struct { | ||
| exec coreexecutor.Executor | ||
| sequencer coresequencer.Sequencer | ||
| chainID string | ||
| interval time.Duration | ||
| cache cache.CacheManager | ||
| executor *executing.Executor | ||
|
|
||
| // shared components | ||
| exec coreexecutor.Executor | ||
| sequencer coresequencer.Sequencer | ||
| chainID string | ||
| interval time.Duration | ||
| cache cache.CacheManager | ||
| onTxsSubmitted func() | ||
|
|
||
| logger zerolog.Logger | ||
|
|
||
| // Lifecycle | ||
| ctx context.Context | ||
| cancel context.CancelFunc | ||
| wg sync.WaitGroup | ||
| } | ||
|
|
||
| // NewReaper creates a new Reaper instance. | ||
| func NewReaper( | ||
| exec coreexecutor.Executor, | ||
| sequencer coresequencer.Sequencer, | ||
| genesis genesis.Genesis, | ||
| logger zerolog.Logger, | ||
| executor *executing.Executor, | ||
| cache cache.CacheManager, | ||
| scrapeInterval time.Duration, | ||
| onTxsSubmitted func(), | ||
| ) (*Reaper, error) { | ||
| if executor == nil { | ||
| return nil, errors.New("executor cannot be nil") | ||
| } | ||
| if cache == nil { | ||
| return nil, errors.New("cache cannot be nil") | ||
| } | ||
|
|
@@ -63,71 +57,84 @@ func NewReaper( | |
| } | ||
|
|
||
| return &Reaper{ | ||
| exec: exec, | ||
| sequencer: sequencer, | ||
| chainID: genesis.ChainID, | ||
| interval: scrapeInterval, | ||
| logger: logger.With().Str("component", "reaper").Logger(), | ||
| cache: cache, | ||
| executor: executor, | ||
| exec: exec, | ||
| sequencer: sequencer, | ||
| chainID: genesis.ChainID, | ||
| interval: scrapeInterval, | ||
| logger: logger.With().Str("component", "reaper").Logger(), | ||
| cache: cache, | ||
| onTxsSubmitted: onTxsSubmitted, | ||
| }, nil | ||
| } | ||
|
|
||
| // Start begins the execution component | ||
| func (r *Reaper) Start(ctx context.Context) error { | ||
| if r.cancel != nil { | ||
| return errors.New("reaper already started") | ||
| } | ||
| r.ctx, r.cancel = context.WithCancel(ctx) | ||
|
|
||
| // Start reaper loop | ||
| r.wg.Go(r.reaperLoop) | ||
|
|
||
| r.logger.Info().Dur("interval", r.interval).Msg("reaper started") | ||
| r.logger.Info().Dur("idle_interval", r.interval).Msg("reaper started") | ||
| return nil | ||
| } | ||
|
|
||
| func (r *Reaper) reaperLoop() { | ||
| ticker := time.NewTicker(r.interval) | ||
| defer ticker.Stop() | ||
|
|
||
| cleanupTicker := time.NewTicker(1 * time.Hour) | ||
| cleanupTicker := time.NewTicker(CleanupInterval) | ||
| defer cleanupTicker.Stop() | ||
|
|
||
| consecutiveFailures := 0 | ||
|
|
||
| for { | ||
| select { | ||
| case <-r.ctx.Done(): | ||
| return | ||
| case <-ticker.C: | ||
| err := r.SubmitTxs() | ||
| if err != nil { | ||
| // Increment failure counter and apply exponential backoff | ||
| consecutiveFailures++ | ||
| backoff := r.interval * time.Duration(1<<min(consecutiveFailures, 5)) // Cap at 2^5 = 32x | ||
| backoff = min(backoff, MaxBackoffInterval) | ||
| r.logger.Warn(). | ||
| Err(err). | ||
| Int("consecutive_failures", consecutiveFailures). | ||
| Dur("next_retry_in", backoff). | ||
| Msg("reaper encountered error, applying backoff") | ||
|
|
||
| // Reset ticker with backoff interval | ||
| ticker.Reset(backoff) | ||
| } else { | ||
| // Reset failure counter and backoff on success | ||
| if consecutiveFailures > 0 { | ||
| r.logger.Info().Msg("reaper recovered from errors, resetting backoff") | ||
| consecutiveFailures = 0 | ||
| ticker.Reset(r.interval) | ||
| } | ||
| } | ||
| case <-cleanupTicker.C: | ||
| // Clean up transaction hashes older than 24 hours | ||
| // This prevents unbounded growth of the transaction seen cache | ||
| removed := r.cache.CleanupOldTxs(cache.DefaultTxCacheRetention) | ||
| if removed > 0 { | ||
| r.logger.Info().Int("removed", removed).Msg("cleaned up old transaction hashes") | ||
| submitted, err := r.drainMempool() | ||
|
|
||
| if err != nil && !errors.Is(err, context.Canceled) { | ||
| consecutiveFailures++ | ||
| backoff := r.interval * time.Duration(1<<min(consecutiveFailures, 5)) | ||
| backoff = min(backoff, MaxBackoffInterval) | ||
| r.logger.Warn(). | ||
| Err(err). | ||
| Int("consecutive_failures", consecutiveFailures). | ||
| Dur("backoff", backoff). | ||
| Msg("reaper error, backing off") | ||
| if r.wait(backoff, nil) { | ||
|
Member
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. this is the main change, if we get txs we retry immediately. if we do not we wait for the interval |
||
| return | ||
| } | ||
| continue | ||
| } | ||
|
|
||
| if consecutiveFailures > 0 { | ||
| r.logger.Info().Msg("reaper recovered from errors") | ||
| consecutiveFailures = 0 | ||
| } | ||
|
|
||
| if submitted { | ||
| continue | ||
| } | ||
|
|
||
| if r.wait(r.interval, cleanupTicker.C) { | ||
| return | ||
| } | ||
| } | ||
| } | ||
|
|
||
| // wait blocks for the given duration. Returns true if the context was cancelled. | ||
| // When cleanupCh is non-nil, processes cache cleanup if that channel fires first. | ||
| func (r *Reaper) wait(d time.Duration, cleanupCh <-chan time.Time) bool { | ||
| timer := time.NewTimer(d) | ||
| defer timer.Stop() | ||
| select { | ||
| case <-r.ctx.Done(): | ||
| return true | ||
| case <-cleanupCh: | ||
| removed := r.cache.CleanupOldTxs(cache.DefaultTxCacheRetention) | ||
| if removed > 0 { | ||
| r.logger.Info().Int("removed", removed).Msg("cleaned up old transaction hashes") | ||
| } | ||
| return false | ||
| case <-timer.C: | ||
| return false | ||
| } | ||
| } | ||
|
|
||
|
|
@@ -137,60 +144,83 @@ func (r *Reaper) Stop() error { | |
| r.cancel() | ||
| } | ||
| r.wg.Wait() | ||
|
|
||
| r.logger.Info().Msg("reaper stopped") | ||
| return nil | ||
| } | ||
|
|
||
| // SubmitTxs retrieves transactions from the executor and submits them to the sequencer. | ||
| // Returns an error if any critical operation fails. | ||
| func (r *Reaper) SubmitTxs() error { | ||
| txs, err := r.exec.GetTxs(r.ctx) | ||
| if err != nil { | ||
| r.logger.Error().Err(err).Msg("failed to get txs from executor") | ||
| return fmt.Errorf("failed to get txs from executor: %w", err) | ||
| type pendingTx struct { | ||
| tx []byte | ||
| hash string | ||
| } | ||
|
|
||
| func (r *Reaper) drainMempool() (bool, error) { | ||
| var totalSubmitted int | ||
| submitted := false | ||
|
|
||
| defer func() { | ||
| if submitted && r.onTxsSubmitted != nil { | ||
| r.onTxsSubmitted() | ||
| } | ||
| }() | ||
|
|
||
| for { | ||
| txs, err := r.exec.GetTxs(r.ctx) | ||
| if err != nil { | ||
| return totalSubmitted > 0, fmt.Errorf("failed to get txs from executor: %w", err) | ||
| } | ||
| if len(txs) == 0 { | ||
| break | ||
| } | ||
|
|
||
| filtered := r.filterNewTxs(txs) | ||
| if len(filtered) == 0 { | ||
| break | ||
| } | ||
|
|
||
| n, err := r.submitFiltered(filtered) | ||
| if err != nil { | ||
| return totalSubmitted > 0, err | ||
| } | ||
| totalSubmitted += n | ||
| submitted = true | ||
| } | ||
| if len(txs) == 0 { | ||
| r.logger.Debug().Msg("no new txs") | ||
| return nil | ||
|
|
||
| if totalSubmitted > 0 { | ||
| r.logger.Debug().Int("total_txs", totalSubmitted).Msg("drained mempool") | ||
| } | ||
|
|
||
| var newTxs [][]byte | ||
| return totalSubmitted > 0, nil | ||
| } | ||
|
|
||
| func (r *Reaper) filterNewTxs(txs [][]byte) []pendingTx { | ||
| pending := make([]pendingTx, 0, len(txs)) | ||
| for _, tx := range txs { | ||
| txHash := hashTx(tx) | ||
| if !r.cache.IsTxSeen(txHash) { | ||
| newTxs = append(newTxs, tx) | ||
| h := hashTx(tx) | ||
| if !r.cache.IsTxSeen(h) { | ||
| pending = append(pending, pendingTx{tx: tx, hash: h}) | ||
| } | ||
| } | ||
| return pending | ||
| } | ||
|
|
||
| if len(newTxs) == 0 { | ||
| r.logger.Debug().Msg("no new txs to submit") | ||
| return nil | ||
| func (r *Reaper) submitFiltered(batch []pendingTx) (int, error) { | ||
| txs := make([][]byte, len(batch)) | ||
| hashes := make([]string, len(batch)) | ||
| for i, p := range batch { | ||
| txs[i] = p.tx | ||
| hashes[i] = p.hash | ||
| } | ||
|
|
||
| r.logger.Debug().Int("txCount", len(newTxs)).Msg("submitting txs to sequencer") | ||
|
|
||
| _, err = r.sequencer.SubmitBatchTxs(r.ctx, coresequencer.SubmitBatchTxsRequest{ | ||
| _, err := r.sequencer.SubmitBatchTxs(r.ctx, coresequencer.SubmitBatchTxsRequest{ | ||
| Id: []byte(r.chainID), | ||
| Batch: &coresequencer.Batch{Transactions: newTxs}, | ||
| Batch: &coresequencer.Batch{Transactions: txs}, | ||
| }) | ||
| if err != nil { | ||
| return fmt.Errorf("failed to submit txs to sequencer: %w", err) | ||
| return 0, fmt.Errorf("failed to submit txs to sequencer: %w", err) | ||
| } | ||
|
|
||
| for _, tx := range newTxs { | ||
| txHash := hashTx(tx) | ||
| r.cache.SetTxSeen(txHash) | ||
| } | ||
|
|
||
| // Notify the executor that new transactions are available | ||
| if len(newTxs) > 0 { | ||
| r.logger.Debug().Msg("notifying executor of new transactions") | ||
| r.executor.NotifyNewTransactions() | ||
| } | ||
|
|
||
| r.logger.Debug().Msg("successfully submitted txs") | ||
| return nil | ||
| r.cache.SetTxsSeen(hashes) | ||
| return len(txs), nil | ||
| } | ||
|
|
||
| func hashTx(tx []byte) string { | ||
|
|
||
Oops, something went wrong.
Oops, something went wrong.
Add this suggestion to a batch that can be applied as a single commit.
This suggestion is invalid because no changes were made to the code.
Suggestions cannot be applied while the pull request is closed.
Suggestions cannot be applied while viewing a subset of changes.
Only one suggestion per line can be applied in a batch.
Add this suggestion to a batch that can be applied as a single commit.
Applying suggestions on deleted lines is not supported.
You must change the existing code in this line in order to create a valid suggestion.
Outdated suggestions cannot be applied.
This suggestion has been applied or marked resolved.
Suggestions cannot be applied from pending reviews.
Suggestions cannot be applied on multi-line comments.
Suggestions cannot be applied while the pull request is queued to merge.
Suggestion cannot be applied right now. Please check back later.
Uh oh!
There was an error while loading. Please reload this page.