1717package core
1818
1919import (
20- "errors"
2120 "fmt"
21+ "sync/atomic"
2222
2323 "github.com/ethereum/go-ethereum/common"
2424 "github.com/ethereum/go-ethereum/core/rawdb"
@@ -47,26 +47,38 @@ type txIndexer struct {
4747 // and all others shouldn't.
4848 limit uint64
4949
50+ // The current head of blockchain for transaction indexing. This field
51+ // is accessed by both the indexer and the indexing progress queries.
52+ head atomic.Uint64
53+
54+ // The current tail of the indexed transactions, null indicates
55+ // that no transactions have been indexed yet.
56+ //
57+ // This field is accessed by both the indexer and the indexing
58+ // progress queries.
59+ tail atomic.Pointer [uint64 ]
60+
5061 // cutoff denotes the block number before which the chain segment should
5162 // be pruned and not available locally.
52- cutoff uint64
53- db ethdb.Database
54- progress chan chan TxIndexProgress
55- term chan chan struct {}
56- closed chan struct {}
63+ cutoff uint64
64+ db ethdb.Database
65+ term chan chan struct {}
66+ closed chan struct {}
5767}
5868
5969// newTxIndexer initializes the transaction indexer.
6070func newTxIndexer (limit uint64 , chain * BlockChain ) * txIndexer {
6171 cutoff , _ := chain .HistoryPruningCutoff ()
6272 indexer := & txIndexer {
63- limit : limit ,
64- cutoff : cutoff ,
65- db : chain .db ,
66- progress : make (chan chan TxIndexProgress ),
67- term : make (chan chan struct {}),
68- closed : make (chan struct {}),
73+ limit : limit ,
74+ cutoff : cutoff ,
75+ db : chain .db ,
76+ term : make (chan chan struct {}),
77+ closed : make (chan struct {}),
6978 }
79+ indexer .head .Store (indexer .resolveHead ())
80+ indexer .tail .Store (rawdb .ReadTxIndexTail (chain .db ))
81+
7082 go indexer .loop (chain )
7183
7284 var msg string
@@ -154,6 +166,7 @@ func (indexer *txIndexer) repair(head uint64) {
154166 // A crash may occur between the two delete operations,
155167 // potentially leaving dangling indexes in the database.
156168 // However, this is considered acceptable.
169+ indexer .tail .Store (nil )
157170 rawdb .DeleteTxIndexTail (indexer .db )
158171 rawdb .DeleteAllTxLookupEntries (indexer .db , nil )
159172 log .Warn ("Purge transaction indexes" , "head" , head , "tail" , * tail )
@@ -174,6 +187,7 @@ func (indexer *txIndexer) repair(head uint64) {
174187 // Traversing the database directly within the transaction
175188 // index namespace might be slow and expensive, but we
176189 // have no choice.
190+ indexer .tail .Store (nil )
177191 rawdb .DeleteTxIndexTail (indexer .db )
178192 rawdb .DeleteAllTxLookupEntries (indexer .db , nil )
179193 log .Warn ("Purge transaction indexes" , "head" , head , "cutoff" , indexer .cutoff )
@@ -187,6 +201,7 @@ func (indexer *txIndexer) repair(head uint64) {
187201 // A crash may occur between the two delete operations,
188202 // potentially leaving dangling indexes in the database.
189203 // However, this is considered acceptable.
204+ indexer .tail .Store (& indexer .cutoff )
190205 rawdb .WriteTxIndexTail (indexer .db , indexer .cutoff )
191206 rawdb .DeleteAllTxLookupEntries (indexer .db , func (txhash common.Hash , blob []byte ) bool {
192207 n := rawdb .DecodeTxLookupEntry (blob , indexer .db )
@@ -216,16 +231,15 @@ func (indexer *txIndexer) loop(chain *BlockChain) {
216231
217232 // Listening to chain events and manipulate the transaction indexes.
218233 var (
219- stop chan struct {} // Non-nil if background routine is active
220- done chan struct {} // Non-nil if background routine is active
221- head = indexer .resolveHead () // The latest announced chain head
222-
234+ stop chan struct {} // Non-nil if background routine is active
235+ done chan struct {} // Non-nil if background routine is active
223236 headCh = make (chan ChainHeadEvent )
224237 sub = chain .SubscribeChainHeadEvent (headCh )
225238 )
226239 defer sub .Unsubscribe ()
227240
228241 // Validate the transaction indexes and repair if necessary
242+ head := indexer .head .Load ()
229243 indexer .repair (head )
230244
231245 // Launch the initial processing if chain is not empty (head != genesis).
@@ -238,17 +252,18 @@ func (indexer *txIndexer) loop(chain *BlockChain) {
238252 for {
239253 select {
240254 case h := <- headCh :
255+ indexer .head .Store (h .Header .Number .Uint64 ())
241256 if done == nil {
242257 stop = make (chan struct {})
243258 done = make (chan struct {})
244259 go indexer .run (h .Header .Number .Uint64 (), stop , done )
245260 }
246- head = h . Header . Number . Uint64 ()
261+
247262 case <- done :
248263 stop = nil
249264 done = nil
250- case ch := <- indexer .progress :
251- ch <- indexer . report ( head )
265+ indexer .tail . Store ( rawdb . ReadTxIndexTail ( indexer . db ))
266+
252267 case ch := <- indexer .term :
253268 if stop != nil {
254269 close (stop )
@@ -264,7 +279,7 @@ func (indexer *txIndexer) loop(chain *BlockChain) {
264279}
265280
266281// report returns the tx indexing progress.
267- func (indexer * txIndexer ) report (head uint64 ) TxIndexProgress {
282+ func (indexer * txIndexer ) report (head uint64 , tail * uint64 ) TxIndexProgress {
268283 // Special case if the head is even below the cutoff,
269284 // nothing to index.
270285 if head < indexer .cutoff {
@@ -284,7 +299,6 @@ func (indexer *txIndexer) report(head uint64) TxIndexProgress {
284299 }
285300 // Compute how many blocks have been indexed
286301 var indexed uint64
287- tail := rawdb .ReadTxIndexTail (indexer .db )
288302 if tail != nil {
289303 indexed = head - * tail + 1
290304 }
@@ -300,16 +314,12 @@ func (indexer *txIndexer) report(head uint64) TxIndexProgress {
300314 }
301315}
302316
303- // txIndexProgress retrieves the tx indexing progress, or an error if the
304- // background tx indexer is already stopped.
305- func (indexer * txIndexer ) txIndexProgress () (TxIndexProgress , error ) {
306- ch := make (chan TxIndexProgress , 1 )
307- select {
308- case indexer .progress <- ch :
309- return <- ch , nil
310- case <- indexer .closed :
311- return TxIndexProgress {}, errors .New ("indexer is closed" )
312- }
317+ // txIndexProgress retrieves the transaction indexing progress. The reported
318+ // progress may slightly lag behind the actual indexing state, as the tail is
319+ // only updated at the end of each indexing operation. However, this delay is
320+ // considered acceptable.
321+ func (indexer * txIndexer ) txIndexProgress () TxIndexProgress {
322+ return indexer .report (indexer .head .Load (), indexer .tail .Load ())
313323}
314324
315325// close shutdown the indexer. Safe to be called for multiple times.
0 commit comments