|
65 | 65 | headFinalizedBlockGauge = metrics.NewRegisteredGauge("chain/head/finalized", nil) |
66 | 66 | headSafeBlockGauge = metrics.NewRegisteredGauge("chain/head/safe", nil) |
67 | 67 |
|
68 | | - chainInfoGauge = metrics.NewRegisteredGaugeInfo("chain/info", nil) |
| 68 | + chainInfoGauge = metrics.NewRegisteredGaugeInfo("chain/info", nil) |
| 69 | + chainMgaspsGauge = metrics.NewRegisteredGauge("chain/mgasps", nil) |
69 | 70 |
|
70 | 71 | accountReadTimer = metrics.NewRegisteredResettingTimer("chain/account/reads", nil) |
71 | 72 | accountHashTimer = metrics.NewRegisteredResettingTimer("chain/account/hashes", nil) |
|
92 | 93 | blockReorgAddMeter = metrics.NewRegisteredMeter("chain/reorg/add", nil) |
93 | 94 | blockReorgDropMeter = metrics.NewRegisteredMeter("chain/reorg/drop", nil) |
94 | 95 |
|
95 | | - blockPrefetchExecuteTimer = metrics.NewRegisteredTimer("chain/prefetch/executes", nil) |
96 | | - blockPrefetchInterruptMeter = metrics.NewRegisteredMeter("chain/prefetch/interrupts", nil) |
| 96 | + blockPrefetchExecuteTimer = metrics.NewRegisteredResettingTimer("chain/prefetch/executes", nil) |
| 97 | + blockPrefetchInterruptMeter = metrics.NewRegisteredMeter("chain/prefetch/interrupts", nil) |
| 98 | + blockPrefetchTxsInvalidMeter = metrics.NewRegisteredMeter("chain/prefetch/txs/invalid", nil) |
| 99 | + blockPrefetchTxsValidMeter = metrics.NewRegisteredMeter("chain/prefetch/txs/valid", nil) |
97 | 100 |
|
98 | 101 | errInsertionInterrupted = errors.New("insertion is interrupted") |
99 | 102 | errChainStopped = errors.New("blockchain is stopped") |
@@ -979,17 +982,16 @@ func (bc *BlockChain) setHeadBeyondRoot(head uint64, time uint64, root common.Ha |
979 | 982 | // Ignore the error here since light client won't hit this path |
980 | 983 | frozen, _ := bc.db.Ancients() |
981 | 984 | if num+1 <= frozen { |
982 | | - // Truncate all relative data(header, total difficulty, body, receipt |
983 | | - // and canonical hash) from ancient store. |
984 | | - if _, err := bc.db.TruncateHead(num); err != nil { |
985 | | - log.Crit("Failed to truncate ancient data", "number", num, "err", err) |
986 | | - } |
987 | | - // Remove the hash <-> number mapping from the active store. |
988 | | - rawdb.DeleteHeaderNumber(db, hash) |
| 985 | + // The chain segment, such as the block header, canonical hash, |
| 986 | + // body, and receipt, will be removed from the ancient store |
| 987 | + // in one go. |
| 988 | + // |
| 989 | + // The hash-to-number mapping in the key-value store will be |
| 990 | + // removed by the hc.SetHead function. |
989 | 991 | } else { |
990 | | - // Remove relative body and receipts from the active store. |
991 | | - // The header, total difficulty and canonical hash will be |
992 | | - // removed in the hc.SetHead function. |
| 992 | + // Remove the associated body and receipts from the key-value store. |
| 993 | + // The header, hash-to-number mapping, and canonical hash will be |
| 994 | + // removed by the hc.SetHead function. |
993 | 995 | rawdb.DeleteBody(db, hash, num) |
994 | 996 | rawdb.DeleteReceipts(db, hash, num) |
995 | 997 | } |
@@ -1361,7 +1363,7 @@ func (bc *BlockChain) InsertReceiptChain(blockChain types.Blocks, receiptChain [ |
1361 | 1363 | size += writeSize |
1362 | 1364 |
|
1363 | 1365 | // Sync the ancient store explicitly to ensure all data has been flushed to disk. |
1364 | | - if err := bc.db.Sync(); err != nil { |
| 1366 | + if err := bc.db.SyncAncient(); err != nil { |
1365 | 1367 | return 0, err |
1366 | 1368 | } |
1367 | 1369 | // Write hash to number mappings |
@@ -1759,18 +1761,6 @@ func (bc *BlockChain) insertChain(chain types.Blocks, setHead bool, makeWitness |
1759 | 1761 | bc.reportBlock(block, nil, err) |
1760 | 1762 | return nil, it.index, err |
1761 | 1763 | } |
1762 | | - // No validation errors for the first block (or chain prefix skipped) |
1763 | | - var activeState *state.StateDB |
1764 | | - defer func() { |
1765 | | - // The chain importer is starting and stopping trie prefetchers. If a bad |
1766 | | - // block or other error is hit however, an early return may not properly |
1767 | | - // terminate the background threads. This defer ensures that we clean up |
1768 | | - // and dangling prefetcher, without deferring each and holding on live refs. |
1769 | | - if activeState != nil { |
1770 | | - activeState.StopPrefetcher() |
1771 | | - } |
1772 | | - }() |
1773 | | - |
1774 | 1764 | // Track the singleton witness from this chain insertion (if any) |
1775 | 1765 | var witness *stateless.Witness |
1776 | 1766 |
|
@@ -1826,63 +1816,20 @@ func (bc *BlockChain) insertChain(chain types.Blocks, setHead bool, makeWitness |
1826 | 1816 | continue |
1827 | 1817 | } |
1828 | 1818 | // Retrieve the parent block and it's state to execute on top |
1829 | | - start := time.Now() |
1830 | 1819 | parent := it.previous() |
1831 | 1820 | if parent == nil { |
1832 | 1821 | parent = bc.GetHeader(block.ParentHash(), block.NumberU64()-1) |
1833 | 1822 | } |
1834 | | - statedb, err := state.New(parent.Root, bc.statedb) |
1835 | | - if err != nil { |
1836 | | - return nil, it.index, err |
1837 | | - } |
1838 | | - |
1839 | | - // If we are past Byzantium, enable prefetching to pull in trie node paths |
1840 | | - // while processing transactions. Before Byzantium the prefetcher is mostly |
1841 | | - // useless due to the intermediate root hashing after each transaction. |
1842 | | - if bc.chainConfig.IsByzantium(block.Number()) { |
1843 | | - // Generate witnesses either if we're self-testing, or if it's the |
1844 | | - // only block being inserted. A bit crude, but witnesses are huge, |
1845 | | - // so we refuse to make an entire chain of them. |
1846 | | - if bc.vmConfig.StatelessSelfValidation || (makeWitness && len(chain) == 1) { |
1847 | | - witness, err = stateless.NewWitness(block.Header(), bc) |
1848 | | - if err != nil { |
1849 | | - return nil, it.index, err |
1850 | | - } |
1851 | | - } |
1852 | | - statedb.StartPrefetcher("chain", witness) |
1853 | | - } |
1854 | | - activeState = statedb |
1855 | | - |
1856 | | - // If we have a followup block, run that against the current state to pre-cache |
1857 | | - // transactions and probabilistically some of the account/storage trie nodes. |
1858 | | - var followupInterrupt atomic.Bool |
1859 | | - if !bc.cacheConfig.TrieCleanNoPrefetch { |
1860 | | - if followup, err := it.peek(); followup != nil && err == nil { |
1861 | | - throwaway, _ := state.New(parent.Root, bc.statedb) |
1862 | | - |
1863 | | - go func(start time.Time, followup *types.Block, throwaway *state.StateDB) { |
1864 | | - // Disable tracing for prefetcher executions. |
1865 | | - vmCfg := bc.vmConfig |
1866 | | - vmCfg.Tracer = nil |
1867 | | - bc.prefetcher.Prefetch(followup, throwaway, vmCfg, &followupInterrupt) |
1868 | | - |
1869 | | - blockPrefetchExecuteTimer.Update(time.Since(start)) |
1870 | | - if followupInterrupt.Load() { |
1871 | | - blockPrefetchInterruptMeter.Mark(1) |
1872 | | - } |
1873 | | - }(time.Now(), followup, throwaway) |
1874 | | - } |
1875 | | - } |
1876 | | - |
1877 | 1823 | // The traced section of block import. |
1878 | | - res, err := bc.processBlock(block, statedb, start, setHead) |
1879 | | - followupInterrupt.Store(true) |
| 1824 | + start := time.Now() |
| 1825 | + res, err := bc.processBlock(parent.Root, block, setHead, makeWitness && len(chain) == 1) |
1880 | 1826 | if err != nil { |
1881 | 1827 | return nil, it.index, err |
1882 | 1828 | } |
1883 | 1829 | // Report the import stats before returning the various results |
1884 | 1830 | stats.processed++ |
1885 | 1831 | stats.usedGas += res.usedGas |
| 1832 | + witness = res.witness |
1886 | 1833 |
|
1887 | 1834 | var snapDiffItems, snapBufItems common.StorageSize |
1888 | 1835 | if bc.snaps != nil { |
@@ -1938,11 +1885,74 @@ type blockProcessingResult struct { |
1938 | 1885 | usedGas uint64 |
1939 | 1886 | procTime time.Duration |
1940 | 1887 | status WriteStatus |
| 1888 | + witness *stateless.Witness |
1941 | 1889 | } |
1942 | 1890 |
|
1943 | 1891 | // processBlock executes and validates the given block. If there was no error |
1944 | 1892 | // it writes the block and associated state to database. |
1945 | | -func (bc *BlockChain) processBlock(block *types.Block, statedb *state.StateDB, start time.Time, setHead bool) (_ *blockProcessingResult, blockEndErr error) { |
| 1893 | +func (bc *BlockChain) processBlock(parentRoot common.Hash, block *types.Block, setHead bool, makeWitness bool) (_ *blockProcessingResult, blockEndErr error) { |
| 1894 | + var ( |
| 1895 | + err error |
| 1896 | + startTime = time.Now() |
| 1897 | + statedb *state.StateDB |
| 1898 | + interrupt atomic.Bool |
| 1899 | + ) |
| 1900 | + defer interrupt.Store(true) // terminate the prefetch at the end |
| 1901 | + |
| 1902 | + if bc.cacheConfig.TrieCleanNoPrefetch { |
| 1903 | + statedb, err = state.New(parentRoot, bc.statedb) |
| 1904 | + if err != nil { |
| 1905 | + return nil, err |
| 1906 | + } |
| 1907 | + } else { |
| 1908 | + // If prefetching is enabled, run that against the current state to pre-cache |
| 1909 | + // transactions and probabilistically some of the account/storage trie nodes. |
| 1910 | + // |
| 1911 | + // Note: the main processor and prefetcher share the same reader with a local |
| 1912 | + // cache for mitigating the overhead of state access. |
| 1913 | + reader, err := bc.statedb.ReaderWithCache(parentRoot) |
| 1914 | + if err != nil { |
| 1915 | + return nil, err |
| 1916 | + } |
| 1917 | + throwaway, err := state.NewWithReader(parentRoot, bc.statedb, reader) |
| 1918 | + if err != nil { |
| 1919 | + return nil, err |
| 1920 | + } |
| 1921 | + statedb, err = state.NewWithReader(parentRoot, bc.statedb, reader) |
| 1922 | + if err != nil { |
| 1923 | + return nil, err |
| 1924 | + } |
| 1925 | + go func(start time.Time, throwaway *state.StateDB, block *types.Block) { |
| 1926 | + // Disable tracing for prefetcher executions. |
| 1927 | + vmCfg := bc.vmConfig |
| 1928 | + vmCfg.Tracer = nil |
| 1929 | + bc.prefetcher.Prefetch(block, throwaway, vmCfg, &interrupt) |
| 1930 | + |
| 1931 | + blockPrefetchExecuteTimer.Update(time.Since(start)) |
| 1932 | + if interrupt.Load() { |
| 1933 | + blockPrefetchInterruptMeter.Mark(1) |
| 1934 | + } |
| 1935 | + }(time.Now(), throwaway, block) |
| 1936 | + } |
| 1937 | + |
| 1938 | + // If we are past Byzantium, enable prefetching to pull in trie node paths |
| 1939 | + // while processing transactions. Before Byzantium the prefetcher is mostly |
| 1940 | + // useless due to the intermediate root hashing after each transaction. |
| 1941 | + var witness *stateless.Witness |
| 1942 | + if bc.chainConfig.IsByzantium(block.Number()) { |
| 1943 | + // Generate witnesses either if we're self-testing, or if it's the |
| 1944 | + // only block being inserted. A bit crude, but witnesses are huge, |
| 1945 | + // so we refuse to make an entire chain of them. |
| 1946 | + if bc.vmConfig.StatelessSelfValidation || makeWitness { |
| 1947 | + witness, err = stateless.NewWitness(block.Header(), bc) |
| 1948 | + if err != nil { |
| 1949 | + return nil, err |
| 1950 | + } |
| 1951 | + } |
| 1952 | + statedb.StartPrefetcher("chain", witness) |
| 1953 | + defer statedb.StopPrefetcher() |
| 1954 | + } |
| 1955 | + |
1946 | 1956 | if bc.logger != nil && bc.logger.OnBlockStart != nil { |
1947 | 1957 | bc.logger.OnBlockStart(tracing.BlockEvent{ |
1948 | 1958 | Block: block, |
@@ -2001,7 +2011,7 @@ func (bc *BlockChain) processBlock(block *types.Block, statedb *state.StateDB, s |
2001 | 2011 | } |
2002 | 2012 | } |
2003 | 2013 | xvtime := time.Since(xvstart) |
2004 | | - proctime := time.Since(start) // processing + validation + cross validation |
| 2014 | + proctime := time.Since(startTime) // processing + validation + cross validation |
2005 | 2015 |
|
2006 | 2016 | // Update the metrics touched during block processing and validation |
2007 | 2017 | accountReadTimer.Update(statedb.AccountReads) // Account reads are complete(in processing) |
@@ -2042,9 +2052,14 @@ func (bc *BlockChain) processBlock(block *types.Block, statedb *state.StateDB, s |
2042 | 2052 | triedbCommitTimer.Update(statedb.TrieDBCommits) // Trie database commits are complete, we can mark them |
2043 | 2053 |
|
2044 | 2054 | blockWriteTimer.Update(time.Since(wstart) - max(statedb.AccountCommits, statedb.StorageCommits) /* concurrent */ - statedb.SnapshotCommits - statedb.TrieDBCommits) |
2045 | | - blockInsertTimer.UpdateSince(start) |
2046 | | - |
2047 | | - return &blockProcessingResult{usedGas: res.GasUsed, procTime: proctime, status: status}, nil |
| 2055 | + blockInsertTimer.UpdateSince(startTime) |
| 2056 | + |
| 2057 | + return &blockProcessingResult{ |
| 2058 | + usedGas: res.GasUsed, |
| 2059 | + procTime: proctime, |
| 2060 | + status: status, |
| 2061 | + witness: witness, |
| 2062 | + }, nil |
2048 | 2063 | } |
2049 | 2064 |
|
2050 | 2065 | // insertSideChain is called when an import batch hits upon a pruned ancestor |
@@ -2627,7 +2642,8 @@ func (bc *BlockChain) InsertHeadersBeforeCutoff(headers []*types.Header) (int, e |
2627 | 2642 | if err != nil { |
2628 | 2643 | return 0, err |
2629 | 2644 | } |
2630 | | - if err := bc.db.Sync(); err != nil { |
| 2645 | + // Sync the ancient store explicitly to ensure all data has been flushed to disk. |
| 2646 | + if err := bc.db.SyncAncient(); err != nil { |
2631 | 2647 | return 0, err |
2632 | 2648 | } |
2633 | 2649 | // Write hash to number mappings |
|
0 commit comments