@@ -26,6 +26,7 @@ import (
2626 "github.com/CortexFoundation/CortexTheseus/common"
2727 "github.com/CortexFoundation/CortexTheseus/common/lru"
2828 "github.com/CortexFoundation/CortexTheseus/common/mclock"
29+ "github.com/CortexFoundation/CortexTheseus/core"
2930 "github.com/CortexFoundation/CortexTheseus/core/txpool"
3031 "github.com/CortexFoundation/CortexTheseus/core/types"
3132 "github.com/CortexFoundation/CortexTheseus/log"
@@ -64,6 +65,11 @@ const (
6465
6566 // addTxsBatchSize it the max number of transactions to add in a single batch from a peer.
6667 addTxsBatchSize = 128
68+
69+ // txOnChainCacheLimit is number of on-chain transactions to keep in a cache to avoid
70+ // re-fetching them soon after they are mined.
71+ // Approx 1MB for 30 minutes of transactions at 18 tps
72+ txOnChainCacheLimit = 32768
6773)
6874
6975var (
@@ -141,6 +147,9 @@ type TxFetcher struct {
141147
142148 underpriced * lru.Cache [common.Hash , time.Time ] // Transactions discarded as too cheap (don't re-fetch)
143149
150+ chain * core.BlockChain // Blockchain interface for on-chain checks
151+ txOnChainCache * lru.Cache [common.Hash , struct {}] // Cache to avoid fetching once the tx gets on chain
152+
144153 // Stage 1: Waiting lists for newly discovered transactions that might be
145154 // broadcast without needing explicit request/reply round trips.
146155 waitlist map [common.Hash ]map [string ]struct {} // Transactions waiting for an potential broadcast
@@ -171,34 +180,37 @@ type TxFetcher struct {
171180
172181// NewTxFetcher creates a transaction fetcher to retrieve transaction
173182// based on hash announcements.
174- func NewTxFetcher (hasTx func (common.Hash ) bool , addTxs func ([]* types.Transaction ) []error , fetchTxs func (string , []common.Hash ) error ) * TxFetcher {
175- return NewTxFetcherForTests (hasTx , addTxs , fetchTxs , mclock.System {}, nil )
183+ func NewTxFetcher (chain * core. BlockChain , hasTx func (common.Hash ) bool , addTxs func ([]* types.Transaction ) []error , fetchTxs func (string , []common.Hash ) error ) * TxFetcher {
184+ return NewTxFetcherForTests (chain , hasTx , addTxs , fetchTxs , mclock.System {}, nil )
176185}
177186
178187// NewTxFetcherForTests is a testing method to mock out the realtime clock with
179188// a simulated version and the internal randomness with a deterministic one.
180189func NewTxFetcherForTests (
190+ chain * core.BlockChain ,
181191 hasTx func (common.Hash ) bool , addTxs func ([]* types.Transaction ) []error , fetchTxs func (string , []common.Hash ) error ,
182192 clock mclock.Clock , rand * mrand.Rand ) * TxFetcher {
183193 return & TxFetcher {
184- notify : make (chan * txAnnounce ),
185- cleanup : make (chan * txDelivery ),
186- drop : make (chan * txDrop ),
187- quit : make (chan struct {}),
188- waitlist : make (map [common.Hash ]map [string ]struct {}),
189- waittime : make (map [common.Hash ]mclock.AbsTime ),
190- waitslots : make (map [string ]map [common.Hash ]* txMetadataWithSeq ),
191- announces : make (map [string ]map [common.Hash ]* txMetadataWithSeq ),
192- announced : make (map [common.Hash ]map [string ]struct {}),
193- fetching : make (map [common.Hash ]string ),
194- requests : make (map [string ]* txRequest ),
195- alternates : make (map [common.Hash ]map [string ]struct {}),
196- underpriced : lru.NewCache [common.Hash , time.Time ](maxTxUnderpricedSetSize ),
197- hasTx : hasTx ,
198- addTxs : addTxs ,
199- fetchTxs : fetchTxs ,
200- clock : clock ,
201- rand : rand ,
194+ notify : make (chan * txAnnounce ),
195+ cleanup : make (chan * txDelivery ),
196+ drop : make (chan * txDrop ),
197+ quit : make (chan struct {}),
198+ waitlist : make (map [common.Hash ]map [string ]struct {}),
199+ waittime : make (map [common.Hash ]mclock.AbsTime ),
200+ waitslots : make (map [string ]map [common.Hash ]* txMetadataWithSeq ),
201+ announces : make (map [string ]map [common.Hash ]* txMetadataWithSeq ),
202+ announced : make (map [common.Hash ]map [string ]struct {}),
203+ fetching : make (map [common.Hash ]string ),
204+ requests : make (map [string ]* txRequest ),
205+ alternates : make (map [common.Hash ]map [string ]struct {}),
206+ underpriced : lru.NewCache [common.Hash , time.Time ](maxTxUnderpricedSetSize ),
207+ txOnChainCache : lru.NewCache [common.Hash , struct {}](txOnChainCacheLimit ),
208+ chain : chain ,
209+ hasTx : hasTx ,
210+ addTxs : addTxs ,
211+ fetchTxs : fetchTxs ,
212+ clock : clock ,
213+ rand : rand ,
202214 }
203215}
204216
@@ -217,23 +229,31 @@ func (f *TxFetcher) Notify(peer string, hashes []common.Hash) error {
217229 unknownHashes = make ([]common.Hash , 0 , len (hashes ))
218230 unknownMetas = make ([]txMetadata , 0 , len (hashes ))
219231 duplicate int64
232+ onchain int64
220233 underpriced int64
221234 )
222235 for _ , hash := range hashes {
223- switch {
224- case f .hasTx (hash ):
236+ if f .hasTx (hash ) {
225237 duplicate ++
238+ continue
239+ }
226240
227- case f .isKnownUnderpriced (hash ):
228- underpriced ++
241+ if _ , exist := f .txOnChainCache .Get (hash ); exist {
242+ onchain ++
243+ continue
244+ }
229245
230- default :
231- unknownHashes = append ( unknownHashes , hash )
232- unknownMetas = append ( unknownMetas , txMetadata { kind : types . LegacyTxType , size : 0 })
246+ if f . isKnownUnderpriced ( hash ) {
247+ underpriced ++
248+ continue
233249 }
250+
251+ unknownHashes = append (unknownHashes , hash )
252+ unknownMetas = append (unknownMetas , txMetadata {kind : types .LegacyTxType , size : 0 })
234253 }
235254 txAnnounceKnownMeter .Mark (duplicate )
236255 txAnnounceUnderpricedMeter .Mark (underpriced )
256+ txAnnounceOnchainMeter .Mark (onchain )
237257
238258 // If anything's left to announce, push it into the internal loop
239259 if len (unknownHashes ) == 0 {
@@ -365,7 +385,16 @@ func (f *TxFetcher) loop() {
365385
366386 waitTrigger = make (chan struct {}, 1 )
367387 timeoutTrigger = make (chan struct {}, 1 )
388+
389+ oldHead * types.Header
368390 )
391+ // Subscribe to chain events to know when transactions are added to chain
392+ var headEventCh chan core.ChainEvent
393+ if f .chain != nil {
394+ headEventCh = make (chan core.ChainEvent , 10 )
395+ sub := f .chain .SubscribeChainEvent (headEventCh )
396+ defer sub .Unsubscribe ()
397+ }
369398 for {
370399 select {
371400 case ann := <- f .notify :
@@ -742,6 +771,20 @@ func (f *TxFetcher) loop() {
742771 f .scheduleFetches (timeoutTimer , timeoutTrigger , nil )
743772 f .rescheduleTimeout (timeoutTimer , timeoutTrigger )
744773 }
774+ case ev := <- headEventCh :
775+ // New head(s) added
776+ newHead := ev .Header
777+ if oldHead != nil && newHead .ParentHash != oldHead .Hash () {
778+ // Reorg or setHead detected, clear the cache. We could be smarter here and
779+ // only remove/add the diff, but this is simpler and not being exact here
780+ // only results in a few more fetches.
781+ f .txOnChainCache .Purge ()
782+ }
783+ oldHead = newHead
784+ // Add all transactions from the new block to the on-chain cache
785+ for _ , tx := range ev .Transactions {
786+ f .txOnChainCache .Add (tx .Hash (), struct {}{})
787+ }
745788
746789 case <- f .quit :
747790 return
0 commit comments