Skip to content

Commit 2d0e747

Browse files
authored
Merge branch 'minhd-vu/sensor-broadcast' into minhd-vu/rpc-methods
2 parents 2f5585c + 3b82419 commit 2d0e747

4 files changed

Lines changed: 108 additions & 174 deletions

File tree

p2p/conns.go

Lines changed: 20 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -318,19 +318,30 @@ func (c *Conns) PeerConnectedAt(peerID string) time.Time {
318318
return time.Time{}
319319
}
320320

321-
// AddTx adds a transaction to the shared cache for duplicate detection and serving.
322-
func (c *Conns) AddTx(hash common.Hash, tx *types.Transaction) {
323-
c.txs.Add(hash, tx)
321+
// AddTxs adds multiple transactions to the shared cache in a single lock operation.
322+
// Returns the computed hashes for reuse by the caller.
323+
func (c *Conns) AddTxs(txs []*types.Transaction) []common.Hash {
324+
if len(txs) == 0 {
325+
return nil
326+
}
327+
hashes := make([]common.Hash, len(txs))
328+
for i, tx := range txs {
329+
hashes[i] = tx.Hash()
330+
}
331+
c.txs.AddBatch(hashes, txs)
332+
return hashes
324333
}
325334

326-
// GetTx retrieves a transaction from the shared cache.
327-
func (c *Conns) GetTx(hash common.Hash) (*types.Transaction, bool) {
328-
return c.txs.Get(hash)
335+
// PeekTxs retrieves multiple transactions from the shared cache without updating LRU ordering.
336+
// Uses a single read lock for better concurrency when LRU ordering is not needed.
337+
func (c *Conns) PeekTxs(hashes []common.Hash) []*types.Transaction {
338+
return c.txs.PeekMany(hashes)
329339
}
330340

331-
// GetTxs retrieves multiple transactions from the shared cache in a single lock operation.
332-
func (c *Conns) GetTxs(hashes []common.Hash) []*types.Transaction {
333-
return c.txs.GetMany(hashes)
341+
// PeekTxsWithHashes retrieves multiple transactions with their hashes from the cache.
342+
// Returns parallel slices of found hashes and transactions. Uses a single read lock.
343+
func (c *Conns) PeekTxsWithHashes(hashes []common.Hash) ([]common.Hash, []*types.Transaction) {
344+
return c.txs.PeekManyWithKeys(hashes)
334345
}
335346

336347
// Blocks returns the global blocks cache.

p2p/datastructures/bloomset_test.go

Lines changed: 11 additions & 32 deletions
Original file line numberDiff line numberDiff line change
@@ -8,7 +8,7 @@ import (
88
"github.com/ethereum/go-ethereum/crypto"
99
)
1010

11-
func TestBloomSet_AddAndContains(t *testing.T) {
11+
func TestBloomSetAddAndContains(t *testing.T) {
1212
b := NewBloomSet(DefaultBloomSetOptions())
1313

1414
hash1 := common.HexToHash("0x1234567890abcdef1234567890abcdef1234567890abcdef1234567890abcdef")
@@ -33,7 +33,7 @@ func TestBloomSet_AddAndContains(t *testing.T) {
3333
}
3434
}
3535

36-
func TestBloomSet_AddMany(t *testing.T) {
36+
func TestBloomSetAddMany(t *testing.T) {
3737
b := NewBloomSet(DefaultBloomSetOptions())
3838

3939
hashes := make([]common.Hash, 100)
@@ -51,7 +51,7 @@ func TestBloomSet_AddMany(t *testing.T) {
5151
}
5252
}
5353

54-
func TestBloomSet_FilterNotContained(t *testing.T) {
54+
func TestBloomSetFilterNotContained(t *testing.T) {
5555
b := NewBloomSet(DefaultBloomSetOptions())
5656

5757
// Add some hashes
@@ -82,7 +82,7 @@ func TestBloomSet_FilterNotContained(t *testing.T) {
8282
}
8383
}
8484

85-
func TestBloomSet_Rotate(t *testing.T) {
85+
func TestBloomSetRotate(t *testing.T) {
8686
b := NewBloomSet(DefaultBloomSetOptions())
8787

8888
hash1 := common.HexToHash("0x1111111111111111111111111111111111111111111111111111111111111111")
@@ -116,7 +116,7 @@ func TestBloomSet_Rotate(t *testing.T) {
116116
}
117117
}
118118

119-
func TestBloomSet_MemoryUsage(t *testing.T) {
119+
func TestBloomSetMemoryUsage(t *testing.T) {
120120
opts := DefaultBloomSetOptions()
121121
b := NewBloomSet(opts)
122122

@@ -136,13 +136,13 @@ func generateTestHash(seed uint64) common.Hash {
136136
return crypto.Keccak256Hash(buf[:])
137137
}
138138

139-
func TestBloomSet_FalsePositiveRate(t *testing.T) {
139+
func TestBloomSetFalsePositiveRate(t *testing.T) {
140140
// Test that false positive rate is approximately as expected
141141
b := NewBloomSet(DefaultBloomSetOptions())
142142

143143
// Add 32768 unique hashes (the design capacity)
144144
// Use keccak256 to generate properly distributed hashes
145-
for i := uint64(0); i < 32768; i++ {
145+
for i := range uint64(32768) {
146146
b.Add(generateTestHash(i))
147147
}
148148

@@ -162,7 +162,7 @@ func TestBloomSet_FalsePositiveRate(t *testing.T) {
162162
t.Logf("False positive rate: %.2f%%", rate*100)
163163
}
164164

165-
func BenchmarkBloomSet_Add(b *testing.B) {
165+
func BenchmarkBloomSetAdd(b *testing.B) {
166166
bloom := NewBloomSet(DefaultBloomSetOptions())
167167
hash := common.HexToHash("0x1234567890abcdef1234567890abcdef1234567890abcdef1234567890abcdef")
168168

@@ -171,7 +171,7 @@ func BenchmarkBloomSet_Add(b *testing.B) {
171171
}
172172
}
173173

174-
func BenchmarkBloomSet_Contains(b *testing.B) {
174+
func BenchmarkBloomSetContains(b *testing.B) {
175175
bloom := NewBloomSet(DefaultBloomSetOptions())
176176
hash := common.HexToHash("0x1234567890abcdef1234567890abcdef1234567890abcdef1234567890abcdef")
177177
bloom.Add(hash)
@@ -181,7 +181,7 @@ func BenchmarkBloomSet_Contains(b *testing.B) {
181181
}
182182
}
183183

184-
func BenchmarkBloomSet_FilterNotContained(b *testing.B) {
184+
func BenchmarkBloomSetFilterNotContained(b *testing.B) {
185185
bloom := NewBloomSet(DefaultBloomSetOptions())
186186

187187
// Add 1000 hashes
@@ -196,29 +196,8 @@ func BenchmarkBloomSet_FilterNotContained(b *testing.B) {
196196
batch[i] = common.BytesToHash([]byte{byte(i >> 8), byte(i)})
197197
}
198198

199-
b.ResetTimer()
200-
for i := 0; i < b.N; i++ {
199+
for b.Loop() {
201200
bloom.FilterNotContained(batch)
202201
}
203202
}
204203

205-
func BenchmarkLRU_FilterNotContained(b *testing.B) {
206-
cache := NewLRU[common.Hash, struct{}](LRUOptions{MaxSize: 32768})
207-
208-
// Add 1000 hashes
209-
for i := range 1000 {
210-
hash := common.BytesToHash([]byte{byte(i >> 8), byte(i)})
211-
cache.Add(hash, struct{}{})
212-
}
213-
214-
// Create a batch of 100 hashes (mix of known and unknown)
215-
batch := make([]common.Hash, 100)
216-
for i := range batch {
217-
batch[i] = common.BytesToHash([]byte{byte(i >> 8), byte(i)})
218-
}
219-
220-
b.ResetTimer()
221-
for i := 0; i < b.N; i++ {
222-
cache.FilterNotContained(batch)
223-
}
224-
}

p2p/datastructures/lru.go

Lines changed: 61 additions & 102 deletions
Original file line numberDiff line numberDiff line change
@@ -100,16 +100,39 @@ func (c *LRU[K, V]) Get(key K) (V, bool) {
100100
return e.value, true
101101
}
102102

103-
// GetMany retrieves multiple values from the cache and updates LRU ordering.
104-
// Uses a single write lock for all lookups, reducing lock contention compared
105-
// to calling Get in a loop. Returns a slice of values for keys that were found.
106-
func (c *LRU[K, V]) GetMany(keys []K) []V {
103+
// Peek retrieves a value from the cache without updating LRU ordering.
104+
// Uses a read lock for better concurrency.
105+
func (c *LRU[K, V]) Peek(key K) (V, bool) {
106+
c.mu.RLock()
107+
defer c.mu.RUnlock()
108+
109+
elem, ok := c.items[key]
110+
if !ok {
111+
var zero V
112+
return zero, false
113+
}
114+
115+
e := elem.Value.(*entry[K, V])
116+
117+
if e.expiresAt != nil && time.Now().After(*e.expiresAt) {
118+
var zero V
119+
return zero, false
120+
}
121+
122+
return e.value, true
123+
}
124+
125+
// PeekMany retrieves multiple values from the cache without updating LRU ordering.
126+
// Uses a single read lock for all lookups, providing better concurrency than GetMany
127+
// when LRU ordering updates are not needed. Returns only found values (indices don't
128+
// correspond to input keys). Use PeekManyWithKeys if you need key-value pairs.
129+
func (c *LRU[K, V]) PeekMany(keys []K) []V {
107130
if len(keys) == 0 {
108131
return nil
109132
}
110133

111-
c.mu.Lock()
112-
defer c.mu.Unlock()
134+
c.mu.RLock()
135+
defer c.mu.RUnlock()
113136

114137
now := time.Now()
115138
result := make([]V, 0, len(keys))
@@ -123,38 +146,47 @@ func (c *LRU[K, V]) GetMany(keys []K) []V {
123146
e := elem.Value.(*entry[K, V])
124147

125148
if e.expiresAt != nil && now.After(*e.expiresAt) {
126-
c.list.Remove(elem)
127-
delete(c.items, key)
128149
continue
129150
}
130151

131-
c.list.MoveToFront(elem)
132152
result = append(result, e.value)
133153
}
134154

135155
return result
136156
}
137157

138-
// Peek retrieves a value from the cache without updating LRU ordering.
139-
// Uses a read lock for better concurrency.
140-
func (c *LRU[K, V]) Peek(key K) (V, bool) {
158+
// PeekManyWithKeys retrieves multiple key-value pairs from the cache without updating
159+
// LRU ordering. Returns parallel slices of found keys and values. Uses a single read
160+
// lock for all lookups.
161+
func (c *LRU[K, V]) PeekManyWithKeys(keys []K) ([]K, []V) {
162+
if len(keys) == 0 {
163+
return nil, nil
164+
}
165+
141166
c.mu.RLock()
142167
defer c.mu.RUnlock()
143168

144-
elem, ok := c.items[key]
145-
if !ok {
146-
var zero V
147-
return zero, false
148-
}
169+
now := time.Now()
170+
foundKeys := make([]K, 0, len(keys))
171+
foundValues := make([]V, 0, len(keys))
149172

150-
e := elem.Value.(*entry[K, V])
173+
for _, key := range keys {
174+
elem, ok := c.items[key]
175+
if !ok {
176+
continue
177+
}
151178

152-
if e.expiresAt != nil && time.Now().After(*e.expiresAt) {
153-
var zero V
154-
return zero, false
179+
e := elem.Value.(*entry[K, V])
180+
181+
if e.expiresAt != nil && now.After(*e.expiresAt) {
182+
continue
183+
}
184+
185+
foundKeys = append(foundKeys, key)
186+
foundValues = append(foundValues, e.value)
155187
}
156188

157-
return e.value, true
189+
return foundKeys, foundValues
158190
}
159191

160192
// Update atomically updates a value in the cache using the provided update function.
@@ -209,26 +241,6 @@ func (c *LRU[K, V]) Update(key K, updateFn func(V) V) {
209241
}
210242
}
211243

212-
// Contains checks if a key exists in the cache and is not expired.
213-
// Uses a read lock and doesn't update LRU ordering.
214-
func (c *LRU[K, V]) Contains(key K) bool {
215-
c.mu.RLock()
216-
defer c.mu.RUnlock()
217-
218-
elem, ok := c.items[key]
219-
if !ok {
220-
return false
221-
}
222-
223-
e := elem.Value.(*entry[K, V])
224-
225-
if e.expiresAt != nil && time.Now().After(*e.expiresAt) {
226-
return false
227-
}
228-
229-
return true
230-
}
231-
232244
// Remove removes a key from the cache and returns the value if it existed.
233245
func (c *LRU[K, V]) Remove(key K) (V, bool) {
234246
c.mu.Lock()
@@ -245,66 +257,11 @@ func (c *LRU[K, V]) Remove(key K) (V, bool) {
245257
return zero, false
246258
}
247259

248-
// Len returns the number of items in the cache.
249-
func (c *LRU[K, V]) Len() int {
250-
c.mu.RLock()
251-
defer c.mu.RUnlock()
252-
return c.list.Len()
253-
}
254-
255-
// Purge clears all items from the cache.
256-
func (c *LRU[K, V]) Purge() {
257-
c.mu.Lock()
258-
defer c.mu.Unlock()
259-
260-
c.items = make(map[K]*list.Element)
261-
c.list.Init()
262-
}
263-
264-
// Keys returns all keys in the cache.
265-
func (c *LRU[K, V]) Keys() []K {
266-
c.mu.RLock()
267-
defer c.mu.RUnlock()
268-
269-
keys := make([]K, 0, c.list.Len())
270-
for elem := c.list.Front(); elem != nil; elem = elem.Next() {
271-
e := elem.Value.(*entry[K, V])
272-
keys = append(keys, e.key)
273-
}
274-
return keys
275-
}
276-
277-
// FilterNotContained returns the subset of keys that are not in the cache.
278-
// Uses a single read lock for all lookups, reducing lock contention compared
279-
// to calling Contains in a loop.
280-
func (c *LRU[K, V]) FilterNotContained(keys []K) []K {
281-
c.mu.RLock()
282-
defer c.mu.RUnlock()
283-
284-
now := time.Now()
285-
result := make([]K, 0, len(keys))
286-
287-
for _, key := range keys {
288-
elem, ok := c.items[key]
289-
if !ok {
290-
result = append(result, key)
291-
continue
292-
}
293-
294-
e := elem.Value.(*entry[K, V])
295-
if e.expiresAt != nil && now.After(*e.expiresAt) {
296-
result = append(result, key)
297-
}
298-
}
299-
300-
return result
301-
}
302-
303-
// AddMany adds multiple keys with the same value to the cache.
260+
// AddBatch adds multiple key-value pairs to the cache.
304261
// Uses a single write lock for all additions, reducing lock contention
305-
// compared to calling Add in a loop.
306-
func (c *LRU[K, V]) AddMany(keys []K, value V) {
307-
if len(keys) == 0 {
262+
// compared to calling Add in a loop. Keys and values must have the same length.
263+
func (c *LRU[K, V]) AddBatch(keys []K, values []V) {
264+
if len(keys) == 0 || len(keys) != len(values) {
308265
return
309266
}
310267

@@ -317,7 +274,9 @@ func (c *LRU[K, V]) AddMany(keys []K, value V) {
317274
expiresAt = &t
318275
}
319276

320-
for _, key := range keys {
277+
for i, key := range keys {
278+
value := values[i]
279+
321280
if elem, ok := c.items[key]; ok {
322281
c.list.MoveToFront(elem)
323282
e := elem.Value.(*entry[K, V])

0 commit comments

Comments
 (0)