Skip to content

Commit d7a6e06

Browse files
authored
Merge pull request #2353 from CortexFoundation/dev
reduce api load
2 parents 3adef88 + 33c39b9 commit d7a6e06

8 files changed

Lines changed: 119 additions & 74 deletions

File tree

core/blockchain_reader.go

Lines changed: 21 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -272,36 +272,18 @@ func (bc *BlockChain) GetAncestor(hash common.Hash, number, ancestor uint64, max
272272
// A null will be returned in the transaction is not found and background
273273
// transaction indexing is already finished. The transaction is not existent
274274
// from the node's perspective
275-
func (bc *BlockChain) GetTransactionLookup(hash common.Hash) (*rawdb.LegacyTxLookupEntry, *types.Transaction, error) {
275+
func (bc *BlockChain) GetTransactionLookup(hash common.Hash) (*rawdb.LegacyTxLookupEntry, *types.Transaction) {
276276
bc.txLookupLock.RLock()
277277
defer bc.txLookupLock.RUnlock()
278278

279279
// Short circuit if the txlookup already in the cache, retrieve otherwise
280280
if item, exist := bc.txLookupCache.Get(hash); exist {
281-
return item.lookup, item.transaction, nil
281+
return item.lookup, item.transaction
282282
}
283283

284284
tx, blockHash, blockNumber, txIndex := rawdb.ReadTransaction(bc.db, hash)
285285
if tx == nil {
286-
progress, err := bc.TxIndexProgress()
287-
if err != nil {
288-
// No error is returned if the transaction indexing progress is unreachable
289-
// due to unexpected internal errors. In such cases, it is impossible to
290-
// determine whether the transaction does not exist or has simply not been
291-
// indexed yet without a progress marker.
292-
//
293-
// In such scenarios, the transaction is treated as unreachable, though
294-
// this is clearly an unintended and unexpected situation.
295-
return nil, nil, nil
296-
}
297-
// The transaction indexing is not finished yet, returning an
298-
// error to explicitly indicate it.
299-
if !progress.Done() {
300-
return nil, nil, errors.New("transaction indexing still in progress")
301-
}
302-
// The transaction is already indexed, the transaction is either
303-
// not existent or not in the range of index, returning null.
304-
return nil, nil, nil
286+
return nil, nil
305287
}
306288

307289
lookup := &rawdb.LegacyTxLookupEntry{
@@ -313,7 +295,23 @@ func (bc *BlockChain) GetTransactionLookup(hash common.Hash) (*rawdb.LegacyTxLoo
313295
lookup: lookup,
314296
transaction: tx,
315297
})
316-
return lookup, tx, nil
298+
return lookup, tx
299+
}
300+
301+
// TxIndexDone returns true if the transaction indexer has finished indexing.
302+
func (bc *BlockChain) TxIndexDone() bool {
303+
progress, err := bc.TxIndexProgress()
304+
if err != nil {
305+
// No error is returned if the transaction indexing progress is unreachable
306+
// due to unexpected internal errors. In such cases, it is impossible to
307+
// determine whether the transaction does not exist or has simply not been
308+
// indexed yet without a progress marker.
309+
//
310+
// In such scenarios, the transaction is treated as unreachable, though
311+
// this is clearly an unintended and unexpected situation.
312+
return true
313+
}
314+
return progress.Done()
317315
}
318316

319317
// GetTd retrieves a block's total difficulty in the canonical chain from the
@@ -416,7 +414,7 @@ func (bc *BlockChain) TxIndexProgress() (TxIndexProgress, error) {
416414
if bc.txIndexer == nil {
417415
return TxIndexProgress{}, errors.New("tx indexer is not enabled")
418416
}
419-
return bc.txIndexer.txIndexProgress()
417+
return bc.txIndexer.txIndexProgress(), nil
420418
}
421419

422420
// GetVMConfig returns the block chain VM config.

core/txindexer.go

Lines changed: 34 additions & 28 deletions
Original file line numberDiff line numberDiff line change
@@ -17,8 +17,8 @@
1717
package core
1818

1919
import (
20-
"errors"
2120
"fmt"
21+
"sync/atomic"
2222

2323
"github.com/CortexFoundation/CortexTheseus/common"
2424
"github.com/CortexFoundation/CortexTheseus/core/rawdb"
@@ -46,27 +46,38 @@ type txIndexer struct {
4646
// * N: means the latest N blocks [HEAD-N+1, HEAD] should be indexed
4747
// and all others shouldn't.
4848
limit uint64
49+
// The current head of blockchain for transaction indexing. This field
50+
// is accessed by both the indexer and the indexing progress queries.
51+
head atomic.Uint64
52+
53+
// The current tail of the indexed transactions, null indicates
54+
// that no transactions have been indexed yet.
55+
//
56+
// This field is accessed by both the indexer and the indexing
57+
// progress queries.
58+
tail atomic.Pointer[uint64]
4959

5060
// cutoff denotes the block number before which the chain segment should
5161
// be pruned and not available locally.
52-
cutoff uint64
53-
db ctxcdb.Database
54-
progress chan chan TxIndexProgress
55-
term chan chan struct{}
56-
closed chan struct{}
62+
cutoff uint64
63+
db ctxcdb.Database
64+
term chan chan struct{}
65+
closed chan struct{}
5766
}
5867

5968
// newTxIndexer initializes the transaction indexer.
6069
func newTxIndexer(limit uint64, chain *BlockChain) *txIndexer {
6170
cutoff, _ := chain.HistoryPruningCutoff()
6271
indexer := &txIndexer{
63-
limit: limit,
64-
cutoff: cutoff,
65-
db: chain.db,
66-
progress: make(chan chan TxIndexProgress),
67-
term: make(chan chan struct{}),
68-
closed: make(chan struct{}),
72+
limit: limit,
73+
cutoff: cutoff,
74+
db: chain.db,
75+
term: make(chan chan struct{}),
76+
closed: make(chan struct{}),
6977
}
78+
indexer.head.Store(indexer.resolveHead())
79+
indexer.tail.Store(rawdb.ReadTxIndexTail(chain.db))
80+
7081
go indexer.loop(chain)
7182

7283
var msg string
@@ -154,6 +165,7 @@ func (indexer *txIndexer) repair(head uint64) {
154165
// A crash may occur between the two delete operations,
155166
// potentially leaving dangling indexes in the database.
156167
// However, this is considered acceptable.
168+
indexer.tail.Store(nil)
157169
rawdb.DeleteTxIndexTail(indexer.db)
158170
rawdb.DeleteAllTxLookupEntries(indexer.db, nil)
159171
log.Warn("Purge transaction indexes", "head", head, "tail", *tail)
@@ -174,6 +186,7 @@ func (indexer *txIndexer) repair(head uint64) {
174186
// Traversing the database directly within the transaction
175187
// index namespace might be slow and expensive, but we
176188
// have no choice.
189+
indexer.tail.Store(nil)
177190
rawdb.DeleteTxIndexTail(indexer.db)
178191
rawdb.DeleteAllTxLookupEntries(indexer.db, nil)
179192
log.Warn("Purge transaction indexes", "head", head, "cutoff", indexer.cutoff)
@@ -187,6 +200,7 @@ func (indexer *txIndexer) repair(head uint64) {
187200
// A crash may occur between the two delete operations,
188201
// potentially leaving dangling indexes in the database.
189202
// However, this is considered acceptable.
203+
indexer.tail.Store(&indexer.cutoff)
190204
rawdb.WriteTxIndexTail(indexer.db, indexer.cutoff)
191205
rawdb.DeleteAllTxLookupEntries(indexer.db, func(txhash common.Hash, blob []byte) bool {
192206
n := rawdb.DecodeTxLookupEntry(blob, indexer.db)
@@ -216,16 +230,16 @@ func (indexer *txIndexer) loop(chain *BlockChain) {
216230

217231
// Listening to chain events and manipulate the transaction indexes.
218232
var (
219-
stop chan struct{} // Non-nil if background routine is active
220-
done chan struct{} // Non-nil if background routine is active
221-
head = indexer.resolveHead() // The latest announced chain head
233+
stop chan struct{} // Non-nil if background routine is active
234+
done chan struct{} // Non-nil if background routine is active
222235

223236
headCh = make(chan ChainHeadEvent)
224237
sub = chain.SubscribeChainHeadEvent(headCh)
225238
)
226239
defer sub.Unsubscribe()
227240

228241
// Validate the transaction indexes and repair if necessary
242+
head := indexer.head.Load()
229243
indexer.repair(head)
230244

231245
// Launch the initial processing if chain is not empty (head != genesis).
@@ -238,17 +252,16 @@ func (indexer *txIndexer) loop(chain *BlockChain) {
238252
for {
239253
select {
240254
case h := <-headCh:
255+
indexer.head.Store(h.Header.Number.Uint64())
241256
if done == nil {
242257
stop = make(chan struct{})
243258
done = make(chan struct{})
244259
go indexer.run(h.Header.Number.Uint64(), stop, done)
245260
}
246-
head = h.Header.Number.Uint64()
247261
case <-done:
248262
stop = nil
249263
done = nil
250-
case ch := <-indexer.progress:
251-
ch <- indexer.report(head)
264+
indexer.tail.Store(rawdb.ReadTxIndexTail(indexer.db))
252265
case ch := <-indexer.term:
253266
if stop != nil {
254267
close(stop)
@@ -264,7 +277,7 @@ func (indexer *txIndexer) loop(chain *BlockChain) {
264277
}
265278

266279
// report returns the tx indexing progress.
267-
func (indexer *txIndexer) report(head uint64) TxIndexProgress {
280+
func (indexer *txIndexer) report(head uint64, tail *uint64) TxIndexProgress {
268281
// Special case if the head is even below the cutoff,
269282
// nothing to index.
270283
if head < indexer.cutoff {
@@ -284,7 +297,6 @@ func (indexer *txIndexer) report(head uint64) TxIndexProgress {
284297
}
285298
// Compute how many blocks have been indexed
286299
var indexed uint64
287-
tail := rawdb.ReadTxIndexTail(indexer.db)
288300
if tail != nil {
289301
indexed = head - *tail + 1
290302
}
@@ -302,14 +314,8 @@ func (indexer *txIndexer) report(head uint64) TxIndexProgress {
302314

303315
// txIndexProgress retrieves the tx indexing progress, or an error if the
304316
// background tx indexer is already stopped.
305-
func (indexer *txIndexer) txIndexProgress() (TxIndexProgress, error) {
306-
ch := make(chan TxIndexProgress, 1)
307-
select {
308-
case indexer.progress <- ch:
309-
return <-ch, nil
310-
case <-indexer.closed:
311-
return TxIndexProgress{}, errors.New("indexer is closed")
312-
}
317+
func (indexer *txIndexer) txIndexProgress() TxIndexProgress {
318+
return indexer.report(indexer.head.Load(), indexer.tail.Load())
313319
}
314320

315321
// close shutdown the indexer. Safe to be called for multiple times.

core/types/transaction.go

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -142,6 +142,13 @@ func (tx *Transaction) EncodeRLP(w io.Writer) error {
142142
return rlp.Encode(w, &tx.data)
143143
}
144144

145+
// MarshalBinary returns the canonical encoding of the transaction.
146+
// For legacy transactions, it returns the RLP encoding. For EIP-2718 typed
147+
// transactions, it returns the type and payload.
148+
func (tx *Transaction) MarshalBinary() ([]byte, error) {
149+
return rlp.EncodeToBytes(tx)
150+
}
151+
145152
// DecodeRLP implements rlp.Decoder
146153
func (tx *Transaction) DecodeRLP(s *rlp.Stream) error {
147154
_, size, _ := s.Kind()

ctxc/api_backend.go

Lines changed: 16 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -278,15 +278,17 @@ func (b *CortexAPIBackend) GetPoolTransaction(hash common.Hash) *types.Transacti
278278
return b.ctxc.txPool.Get(hash)
279279
}
280280

281-
func (b *CortexAPIBackend) GetTransaction(ctx context.Context, txHash common.Hash) (*types.Transaction, common.Hash, uint64, uint64, error) {
282-
lookup, tx, err := b.ctxc.blockchain.GetTransactionLookup(txHash)
283-
if err != nil {
284-
return nil, common.Hash{}, 0, 0, err
285-
}
281+
func (b *CortexAPIBackend) GetTransaction(txHash common.Hash) (bool, *types.Transaction, common.Hash, uint64, uint64) {
282+
lookup, tx := b.ctxc.blockchain.GetTransactionLookup(txHash)
286283
if lookup == nil || tx == nil {
287-
return nil, common.Hash{}, 0, 0, nil
284+
return false, nil, common.Hash{}, 0, 0
288285
}
289-
return tx, lookup.BlockHash, lookup.BlockIndex, lookup.Index, nil
286+
return true, tx, lookup.BlockHash, lookup.BlockIndex, lookup.Index
287+
}
288+
289+
// TxIndexDone returns true if the transaction indexer has finished indexing.
290+
func (b *CortexAPIBackend) TxIndexDone() bool {
291+
return b.ctxc.blockchain.TxIndexDone()
290292
}
291293

292294
func (b *CortexAPIBackend) GetPoolNonce(ctx context.Context, addr common.Address) (uint64, error) {
@@ -313,8 +315,13 @@ func (b *CortexAPIBackend) SubscribeNewTxsEvent(ch chan<- core.NewTxsEvent) even
313315
return b.ctxc.TxPool().SubscribeNewTxsEvent(ch)
314316
}
315317

316-
func (b *CortexAPIBackend) SyncProgress() cortex.SyncProgress {
317-
return b.ctxc.Downloader().Progress()
318+
func (b *CortexAPIBackend) SyncProgress(ctx context.Context) cortex.SyncProgress {
319+
prog := b.ctxc.Downloader().Progress()
320+
if txProg, err := b.ctxc.blockchain.TxIndexProgress(); err == nil {
321+
prog.TxIndexFinishedBlocks = txProg.Indexed
322+
prog.TxIndexRemainingBlocks = txProg.Remaining
323+
}
324+
return prog
318325
}
319326

320327
func (b *CortexAPIBackend) ProtocolVersion() int {

ctxc/tracers/api.go

Lines changed: 9 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -75,7 +75,8 @@ type Backend interface {
7575
HeaderByNumber(ctx context.Context, number rpc.BlockNumber) (*types.Header, error)
7676
BlockByHash(ctx context.Context, hash common.Hash) (*types.Block, error)
7777
BlockByNumber(ctx context.Context, number rpc.BlockNumber) (*types.Block, error)
78-
GetTransaction(ctx context.Context, txHash common.Hash) (*types.Transaction, common.Hash, uint64, uint64, error)
78+
GetTransaction(txHash common.Hash) (bool, *types.Transaction, common.Hash, uint64, uint64)
79+
TxIndexDone() bool
7980
RPCGasCap() uint64
8081
ChainConfig() *params.ChainConfig
8182
Engine() consensus.Engine
@@ -807,9 +808,13 @@ func containsTx(block *types.Block, hash common.Hash) bool {
807808
// TraceTransaction returns the structured logs created during the execution of CVM
808809
// and returns them as a JSON object.
809810
func (api *API) TraceTransaction(ctx context.Context, hash common.Hash, config *TraceConfig) (any, error) {
810-
tx, blockHash, blockNumber, index, err := api.backend.GetTransaction(ctx, hash)
811-
if err != nil {
812-
return nil, err
811+
found, tx, blockHash, blockNumber, index := api.backend.GetTransaction(hash)
812+
if !found {
813+
// Warn in case tx indexer is not done.
814+
if !api.backend.TxIndexDone() {
815+
return nil, errTxNotFound
816+
}
817+
return nil, errTxNotFound
813818
}
814819

815820
// Only mined txes are supported

interfaces.go

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -104,6 +104,10 @@ type SyncProgress struct {
104104
HighestBlock uint64 // Highest alleged block number in the chain
105105
PulledStates uint64 // Number of state trie entries already downloaded
106106
KnownStates uint64 // Total number of state trie entries known about
107+
108+
// "transaction indexing" fields
109+
TxIndexFinishedBlocks uint64 // Number of blocks whose transactions are already indexed
110+
TxIndexRemainingBlocks uint64 // Number of blocks whose transactions are not indexed yet
107111
}
108112

109113
// ChainSyncReader wraps access to the node's current sync status. If there's no

internal/ctxcapi/api.go

Lines changed: 25 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -50,6 +50,8 @@ const (
5050
defaultGasPrice = params.GWei
5151
)
5252

53+
var errTxNotFound = errors.New("transaction not found")
54+
5355
// PublicCortexAPI provides an API to access Cortex related information.
5456
// It offers only methods that operate on public data that is freely available to anyone.
5557
type PublicCortexAPI struct {
@@ -79,8 +81,8 @@ func (s *PublicCortexAPI) ProtocolVersion() hexutil.Uint {
7981
// - highestBlock: block number of the highest block header this node has received from peers
8082
// - pulledStates: number of state entries processed until now
8183
// - knownStates: number of known state entries that still need to be pulled
82-
func (s *PublicCortexAPI) Syncing() (any, error) {
83-
progress := s.b.SyncProgress()
84+
func (s *PublicCortexAPI) Syncing(ctx context.Context) (any, error) {
85+
progress := s.b.SyncProgress(ctx)
8486

8587
// Return not syncing if the synchronisation already completed
8688
if progress.CurrentBlock >= progress.HighestBlock {
@@ -1328,9 +1330,18 @@ func (s *PublicTransactionPoolAPI) GetTransactionCount(ctx context.Context, addr
13281330
// GetTransactionByHash returns the transaction for the given hash
13291331
func (s *PublicTransactionPoolAPI) GetTransactionByHash(ctx context.Context, hash common.Hash) (*RPCTransaction, error) {
13301332
// Try to return an already finalized transaction
1331-
tx, blockHash, blockNumber, index, err := s.b.GetTransaction(ctx, hash)
1332-
if err != nil {
1333-
return nil, err
1333+
found, tx, blockHash, blockNumber, index := s.b.GetTransaction(hash)
1334+
if !found {
1335+
// No finalized transaction, try to retrieve it from the pool
1336+
if tx := s.b.GetPoolTransaction(hash); tx != nil {
1337+
return newRPCPendingTransaction(tx), nil
1338+
}
1339+
// If also not in the pool there is a chance the tx indexer is still in progress.
1340+
if !s.b.TxIndexDone() {
1341+
return nil, errTxNotFound
1342+
}
1343+
// If the transaction is not found in the pool and the indexer is done, return nil
1344+
return nil, nil
13341345
}
13351346
if tx != nil {
13361347
return newRPCTransaction(tx, blockHash, blockNumber, index), nil
@@ -1347,9 +1358,15 @@ func (s *PublicTransactionPoolAPI) GetTransactionByHash(ctx context.Context, has
13471358
// GetRawTransactionByHash returns the bytes of the transaction for the given hash.
13481359
func (s *PublicTransactionPoolAPI) GetRawTransactionByHash(ctx context.Context, hash common.Hash) (hexutil.Bytes, error) {
13491360
// Retrieve a finalized transaction, or a pooled otherwise
1350-
tx, _, _, _, err := s.b.GetTransaction(ctx, hash)
1351-
if err != nil {
1352-
return nil, err
1361+
found, tx, _, _, _ := s.b.GetTransaction(hash)
1362+
if !found {
1363+
if tx = s.b.GetPoolTransaction(hash); tx != nil {
1364+
return tx.MarshalBinary()
1365+
}
1366+
if !s.b.TxIndexDone() {
1367+
return nil, errTxNotFound
1368+
}
1369+
return nil, nil
13531370
}
13541371
if tx == nil {
13551372
if tx = s.b.GetPoolTransaction(hash); tx == nil {

0 commit comments

Comments
 (0)