Skip to content

Commit 656944d

Browse files
authored
Merge branch 'main' into marko/key_rotation
2 parents 6b2db4b + 49ef5c9 commit 656944d

5 files changed

Lines changed: 79 additions & 45 deletions

File tree

CHANGELOG.md

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,10 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
99

1010
## [Unreleased]
1111

12+
### Changes
13+
14+
- Optimization of mutex usage in cache for reaper [#3286](https://github.com/evstack/ev-node/pull/3286)
15+
1216
## v1.1.1
1317

1418
### Changes

block/internal/cache/generic_cache.go

Lines changed: 38 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -58,6 +58,19 @@ func (c *Cache) isSeen(hash string) bool {
5858
return c.hashes[hash]
5959
}
6060

61+
// areSeen checks which hashes have been seen. Returns a boolean slice
62+
// parallel to the input where result[i] is true if hashes[i] is in the
63+
// cache. Acquires the read lock once for the entire batch.
64+
func (c *Cache) areSeen(hashes []string) []bool {
65+
c.mu.RLock()
66+
defer c.mu.RUnlock()
67+
result := make([]bool, len(hashes))
68+
for i, h := range hashes {
69+
result[i] = c.hashes[h]
70+
}
71+
return result
72+
}
73+
6174
func (c *Cache) setSeen(hash string, height uint64) {
6275
c.mu.Lock()
6376
defer c.mu.Unlock()
@@ -75,6 +88,31 @@ func (c *Cache) removeSeen(hash string) {
7588
delete(c.hashes, hash)
7689
}
7790

91+
// setSeenBatch marks all hashes as seen under a single write lock.
92+
// For height 0 (transactions), the hashByHeight bookkeeping is skipped
93+
// since all txs share the same sentinel height — the map lookup and
94+
// overwrite on every entry is pure overhead with no benefit.
95+
func (c *Cache) setSeenBatch(hashes []string, height uint64) {
96+
c.mu.Lock()
97+
defer c.mu.Unlock()
98+
if height == 0 {
99+
for _, h := range hashes {
100+
c.hashes[h] = true
101+
}
102+
return
103+
}
104+
105+
// currently not used, but there for compleness against setSeen
106+
for _, h := range hashes {
107+
if existing, ok := c.hashByHeight[height]; ok && existing == h {
108+
c.hashes[existing] = true
109+
continue
110+
}
111+
c.hashes[h] = true
112+
c.hashByHeight[height] = h
113+
}
114+
}
115+
78116
func (c *Cache) getDAIncluded(hash string) (uint64, bool) {
79117
c.mu.RLock()
80118
defer c.mu.RUnlock()

block/internal/cache/manager.go

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -52,6 +52,7 @@ type CacheManager interface {
5252

5353
// Transaction operations
5454
IsTxSeen(hash string) bool
55+
AreTxsSeen(hashes []string) []bool
5556
SetTxSeen(hash string)
5657
SetTxsSeen(hashes []string)
5758
CleanupOldTxs(olderThan time.Duration) int
@@ -204,6 +205,10 @@ func (m *implementation) IsTxSeen(hash string) bool {
204205
return m.txCache.isSeen(hash)
205206
}
206207

208+
func (m *implementation) AreTxsSeen(hashes []string) []bool {
209+
return m.txCache.areSeen(hashes)
210+
}
211+
207212
func (m *implementation) SetTxSeen(hash string) {
208213
// Use 0 as height since transactions don't have a block height yet
209214
m.txCache.setSeen(hash, 0)
@@ -212,9 +217,9 @@ func (m *implementation) SetTxSeen(hash string) {
212217
}
213218

214219
func (m *implementation) SetTxsSeen(hashes []string) {
220+
m.txCache.setSeenBatch(hashes, 0)
215221
now := time.Now()
216222
for _, hash := range hashes {
217-
m.txCache.setSeen(hash, 0)
218223
m.txTimestamps.Store(hash, now)
219224
}
220225
}

block/internal/reaping/reaper.go

Lines changed: 29 additions & 40 deletions
Original file line numberDiff line numberDiff line change
@@ -143,11 +143,6 @@ func (r *Reaper) Stop() error {
143143
return nil
144144
}
145145

146-
type pendingTx struct {
147-
tx []byte
148-
hash string
149-
}
150-
151146
func (r *Reaper) drainMempool(cleanupCh <-chan time.Time) (bool, error) {
152147
var totalSubmitted int
153148

@@ -175,16 +170,32 @@ func (r *Reaper) drainMempool(cleanupCh <-chan time.Time) (bool, error) {
175170
break
176171
}
177172

178-
filtered := r.filterNewTxs(txs)
179-
if len(filtered) == 0 {
173+
hashes := hashTxs(txs)
174+
seen := r.cache.AreTxsSeen(hashes)
175+
176+
newTxs := make([][]byte, 0, len(txs))
177+
newHashes := make([]string, 0, len(txs))
178+
for i, tx := range txs {
179+
if !seen[i] {
180+
newTxs = append(newTxs, tx)
181+
newHashes = append(newHashes, hashes[i])
182+
}
183+
}
184+
185+
if len(newTxs) == 0 {
180186
break
181187
}
182188

183-
n, err := r.submitFiltered(filtered)
189+
_, err = r.sequencer.SubmitBatchTxs(r.ctx, coresequencer.SubmitBatchTxsRequest{
190+
Id: []byte(r.chainID),
191+
Batch: &coresequencer.Batch{Transactions: newTxs},
192+
})
184193
if err != nil {
185-
return totalSubmitted > 0, err
194+
return totalSubmitted > 0, fmt.Errorf("failed to submit txs to sequencer: %w", err)
186195
}
187-
totalSubmitted += n
196+
197+
r.cache.SetTxsSeen(newHashes)
198+
totalSubmitted += len(newTxs)
188199
}
189200

190201
if totalSubmitted > 0 {
@@ -194,38 +205,16 @@ func (r *Reaper) drainMempool(cleanupCh <-chan time.Time) (bool, error) {
194205
return totalSubmitted > 0, nil
195206
}
196207

197-
func (r *Reaper) filterNewTxs(txs [][]byte) []pendingTx {
198-
pending := make([]pendingTx, 0, len(txs))
199-
for _, tx := range txs {
200-
h := hashTx(tx)
201-
if !r.cache.IsTxSeen(h) {
202-
pending = append(pending, pendingTx{tx: tx, hash: h})
203-
}
204-
}
205-
return pending
206-
}
207-
208-
func (r *Reaper) submitFiltered(batch []pendingTx) (int, error) {
209-
txs := make([][]byte, len(batch))
210-
hashes := make([]string, len(batch))
211-
for i, p := range batch {
212-
txs[i] = p.tx
213-
hashes[i] = p.hash
208+
func hashTxs(txs [][]byte) []string {
209+
hashes := make([]string, len(txs))
210+
for i, tx := range txs {
211+
h := sha256.Sum256(tx)
212+
hashes[i] = hex.EncodeToString(h[:])
214213
}
215-
216-
_, err := r.sequencer.SubmitBatchTxs(r.ctx, coresequencer.SubmitBatchTxsRequest{
217-
Id: []byte(r.chainID),
218-
Batch: &coresequencer.Batch{Transactions: txs},
219-
})
220-
if err != nil {
221-
return 0, fmt.Errorf("failed to submit txs to sequencer: %w", err)
222-
}
223-
224-
r.cache.SetTxsSeen(hashes)
225-
return len(txs), nil
214+
return hashes
226215
}
227216

228217
func hashTx(tx []byte) string {
229-
hash := sha256.Sum256(tx)
230-
return hex.EncodeToString(hash[:])
218+
h := sha256.Sum256(tx)
219+
return hex.EncodeToString(h[:])
231220
}

block/internal/submitting/submitter_test.go

Lines changed: 2 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -235,8 +235,7 @@ func TestSubmitter_initializeDAIncludedHeight(t *testing.T) {
235235
}
236236

237237
func TestSubmitter_processDAInclusionLoop_advances(t *testing.T) {
238-
ctx, cancel := context.WithCancel(context.Background())
239-
defer cancel()
238+
ctx := t.Context()
240239

241240
// Clean up any existing visualization server
242241
defer server.SetDAVisualizationServer(nil)
@@ -448,8 +447,7 @@ func (f *fakeSigner) GetPublic() (crypto.PubKey, error) { return nil, nil }
448447
func (f *fakeSigner) GetAddress() ([]byte, error) { return []byte("addr"), nil }
449448

450449
func TestSubmitter_CacheClearedOnHeightInclusion(t *testing.T) {
451-
ctx, cancel := context.WithCancel(context.Background())
452-
defer cancel()
450+
ctx := t.Context()
453451

454452
cm, st := newTestCacheAndStore(t)
455453

0 commit comments

Comments
 (0)