diff --git a/src/Nethermind/Nethermind.Blockchain.Test/BlockProcessorTests.cs b/src/Nethermind/Nethermind.Blockchain.Test/BlockProcessorTests.cs index 6da2c6bfd99c..9c04f95a893d 100644 --- a/src/Nethermind/Nethermind.Blockchain.Test/BlockProcessorTests.cs +++ b/src/Nethermind/Nethermind.Blockchain.Test/BlockProcessorTests.cs @@ -30,12 +30,16 @@ using NSubstitute; using NUnit.Framework; using System; +using System.Collections.Concurrent; using System.Collections.Generic; using System.Security; using System.Threading; using System.Threading.Tasks; using Nethermind.Blockchain.Tracing; +using Nethermind.Core.Threading; using Nethermind.Evm; +using Nethermind.Evm.Tracing; +using Nethermind.Int256; namespace Nethermind.Blockchain.Test; @@ -337,4 +341,294 @@ public void ValidateBlockAccessList_storage_read_budget_uses_ItemCost(long gasRe Assert.DoesNotThrow(() => balManager.ValidateBlockAccessList(block, 0)); } } + + [Test] + public void PrepareForProcessing_keeps_parallel_bal_execution_for_validated_eip8037_multi_tx_blocks() + { + BlockAccessListManager balManager = CreateAmsterdamBalManager(); + + Block block = Build.A.Block + .WithNumber(1) + .WithTransactions(2, Amsterdam.Instance) + .WithBlockAccessList(new BlockAccessList()) + .TestObject; + + balManager.PrepareForProcessing(block, Amsterdam.Instance, ProcessingOptions.None); + + Assert.That(balManager.ParallelExecutionEnabled, Is.True); + } + + [Test] + public void PrepareForProcessing_keeps_parallel_bal_execution_for_validated_eip8037_single_tx_blocks() + { + BlockAccessListManager balManager = CreateAmsterdamBalManager(); + + Block block = Build.A.Block + .WithNumber(1) + .WithTransactions(1, Amsterdam.Instance) + .WithBlockAccessList(new BlockAccessList()) + .TestObject; + + balManager.PrepareForProcessing(block, Amsterdam.Instance, ProcessingOptions.None); + + Assert.That(balManager.ParallelExecutionEnabled, Is.True); + } + + [Test] + public void IncrementalValidation_rejects_eip8037_tx_when_worst_case_exceeds_ordered_remaining_gas() + { + BlockAccessListManager balManager = CreateAmsterdamBalManager(); + Transaction firstTx = Build.A.Transaction + .WithHash(TestItem.KeccakA) + .WithGasLimit(90_000) + .TestObject; + Transaction secondTx = Build.A.Transaction + .WithHash(TestItem.KeccakB) + .WithGasLimit(50_000) + .WithNonce(1) + .TestObject; + Block block = Build.A.Block + .WithNumber(1) + .WithGasLimit(100_000) + .WithTransactions(firstTx, secondTx) + .WithBlockAccessList(new BlockAccessList()) + .TestObject; + + balManager.PrepareForProcessing(block, Amsterdam.Instance, ProcessingOptions.None); + balManager.SetBlockExecutionContext(new(block.Header, Amsterdam.Instance)); + balManager.Setup(block); + + TaskCompletionSource<(long BlockGasUsed, long BlockStateGasUsed, InvalidBlockException? Exception)>[] gasResults = + [ + new(), + new() + ]; + gasResults[0].SetResult((80_000, 0, null)); + gasResults[1].SetResult((21_000, 0, null)); + + InvalidBlockException? exception = Assert.Throws(() => + balManager.IncrementalValidation(block, gasResults, new BlockReceiptsTracer[2], null, CancellationToken.None)); + + Assert.That(exception!.Message, Does.Contain("Block gas limit exceeded")); + } + + [TestCase(true)] + [TestCase(false)] + public void Parallel_validation_preserves_processing_thread_metric_scope_for_worker_transactions(bool isBlockProcessingThread) + { + Assume.That(Environment.ProcessorCount, Is.GreaterThan(1)); + + const int txCount = 64; + IWorldState stateProvider = TestWorldStateFactory.CreateForTest(); + using IDisposable scope = stateProvider.BeginScope(IWorldState.PreGenesis); + + Transaction[] transactions = new Transaction[txCount]; + for (int i = 0; i < transactions.Length; i++) + { + transactions[i] = Build.A.Transaction + .WithNonce((UInt256)i) + .WithGasLimit(1) + .TestObject; + } + + Block block = Build.A.Block + .WithNumber(1) + .WithGasLimit(txCount) + .WithTransactions(transactions) + .WithBlockAccessList(new BlockAccessList()) + .TestObject; + + using RecordingTransactionProcessorAdapter transactionProcessor = new(); + IBlockAccessListManager balManager = Substitute.For(); + balManager.Enabled.Returns(true); + balManager.ParallelExecutionEnabled.Returns(true); + balManager.GetTxProcessor(Arg.Any()).Returns(transactionProcessor); + + IBlockProcessor.IBlockTransactionsExecutor inner = Substitute.For(); + BlockProcessor.ParallelBlockValidationTransactionsExecutor executor = new( + inner, + stateProvider, + new TestSingleReleaseSpecProvider(Amsterdam.Instance), + balManager, + LimboLogs.Instance); + + bool previousIsBlockProcessingThread = ProcessingThread.IsBlockProcessingThread; + ProcessingThread.IsBlockProcessingThread = isBlockProcessingThread; + try + { + TxReceipt[] receipts = executor.ProcessTransactions( + block, + ProcessingOptions.None, + new BlockReceiptsTracer(), + CancellationToken.None); + Assert.That(receipts, Has.Length.EqualTo(txCount)); + } + finally + { + ProcessingThread.IsBlockProcessingThread = previousIsBlockProcessingThread; + } + + Assert.That(transactionProcessor.ThreadIds.Count, Is.GreaterThan(1)); + Assert.That(transactionProcessor.ObservedProcessingThreadFlags.Count, Is.EqualTo(txCount)); + foreach (bool observedProcessingThreadFlag in transactionProcessor.ObservedProcessingThreadFlags) + { + Assert.That(observedProcessingThreadFlag, Is.EqualTo(isBlockProcessingThread)); + } + } + + [Test] + public void Parallel_validation_forwards_parallel_safe_block_tracer_to_worker_transactions() + { + Assume.That(Environment.ProcessorCount, Is.GreaterThan(1)); + + const int txCount = 64; + IWorldState stateProvider = TestWorldStateFactory.CreateForTest(); + using IDisposable scope = stateProvider.BeginScope(IWorldState.PreGenesis); + + Transaction[] transactions = new Transaction[txCount]; + for (int i = 0; i < transactions.Length; i++) + { + transactions[i] = Build.A.Transaction + .WithNonce((UInt256)i) + .WithGasLimit(1) + .TestObject; + } + + Block block = Build.A.Block + .WithNumber(1) + .WithGasLimit(txCount) + .WithTransactions(transactions) + .WithBlockAccessList(new BlockAccessList()) + .TestObject; + + using RecordingTransactionProcessorAdapter transactionProcessor = new(traceOperation: true); + IBlockAccessListManager balManager = Substitute.For(); + balManager.Enabled.Returns(true); + balManager.ParallelExecutionEnabled.Returns(true); + balManager.GetTxProcessor(Arg.Any()).Returns(transactionProcessor); + + IBlockProcessor.IBlockTransactionsExecutor inner = Substitute.For(); + BlockProcessor.ParallelBlockValidationTransactionsExecutor executor = new( + inner, + stateProvider, + new TestSingleReleaseSpecProvider(Amsterdam.Instance), + balManager, + LimboLogs.Instance); + RecordingParallelSafeBlockTracer blockTracer = new(); + BlockReceiptsTracer receiptsTracer = new(); + receiptsTracer.SetOtherTracer(blockTracer); + receiptsTracer.StartNewBlockTrace(block); + + TxReceipt[] receipts = executor.ProcessTransactions( + block, + ProcessingOptions.None, + receiptsTracer, + CancellationToken.None); + receiptsTracer.EndBlockTrace(); + + Assert.Multiple(() => + { + Assert.That(receipts, Has.Length.EqualTo(txCount)); + Assert.That(blockTracer.StartedTransactions, Is.EqualTo(txCount)); + Assert.That(blockTracer.EndedTransactions, Is.EqualTo(txCount)); + Assert.That(blockTracer.OpcodeCount, Is.EqualTo(txCount)); + }); + } + + private static BlockAccessListManager CreateAmsterdamBalManager() + { + IWorldState stateProvider = TestWorldStateFactory.CreateForTest(); + return new( + stateProvider, + new TestSingleReleaseSpecProvider(Amsterdam.Instance), + Substitute.For(), + LimboLogs.Instance, + new BlocksConfig { ParallelExecution = true }, + new WithdrawalProcessorFactory(LimboLogs.Instance)); + } + + private sealed class RecordingTransactionProcessorAdapter(bool traceOperation = false) : ITransactionProcessorAdapter, IDisposable + { + private readonly ManualResetEventSlim _parallelExecutionStarted = new(); + private int _executedCount; + + public ConcurrentBag ObservedProcessingThreadFlags { get; } = new(); + public ConcurrentDictionary ThreadIds { get; } = new(); + + public TransactionResult Execute(Transaction transaction, ITxTracer txTracer) + { + ThreadIds.TryAdd(Environment.CurrentManagedThreadId, 0); + if (Interlocked.Increment(ref _executedCount) >= 2) + { + _parallelExecutionStarted.Set(); + } + + _parallelExecutionStarted.Wait(TimeSpan.FromSeconds(5)); + ObservedProcessingThreadFlags.Add(ProcessingThread.IsBlockProcessingThread); + if (traceOperation && txTracer.IsTracingInstructions) + { + txTracer.StartOperation(0, Instruction.ADD, 0, null!); + } + + transaction.BlockGasUsed = 1; + txTracer.MarkAsSuccess(Address.Zero, 1, [], []); + + return TransactionResult.Ok; + } + + public void SetBlockExecutionContext(in BlockExecutionContext blockExecutionContext) + { + } + + public void Dispose() => _parallelExecutionStarted.Dispose(); + } + + private sealed class RecordingParallelSafeBlockTracer : IParallelSafeBlockTracer + { + private int _startedTransactions; + private int _endedTransactions; + private int _opcodeCount; + + public int StartedTransactions => _startedTransactions; + public int EndedTransactions => _endedTransactions; + public int OpcodeCount => _opcodeCount; + public bool IsTracingRewards => false; + + public void ReportReward(Address author, string rewardType, UInt256 rewardValue) + { + } + + public void StartNewBlockTrace(Block block) + { + } + + public ITxTracer StartNewTxTrace(Transaction? tx) + { + if (tx is null) + { + return NullTxTracer.Instance; + } + + Interlocked.Increment(ref _startedTransactions); + return new RecordingInstructionTxTracer(this); + } + + public void EndTxTrace() => + Interlocked.Increment(ref _endedTransactions); + + public void EndBlockTrace() + { + } + + private void RecordOpcode() => + Interlocked.Increment(ref _opcodeCount); + + private sealed class RecordingInstructionTxTracer(RecordingParallelSafeBlockTracer blockTracer) : TxTracer + { + public override bool IsTracingInstructions => true; + + public override void StartOperation(int pc, Instruction opcode, long gas, in ExecutionEnvironment env) => + blockTracer.RecordOpcode(); + } + } } diff --git a/src/Nethermind/Nethermind.Blockchain/Tracing/BlockReceiptsTracer.cs b/src/Nethermind/Nethermind.Blockchain/Tracing/BlockReceiptsTracer.cs index d6c2e645bf76..9a9c097846d7 100644 --- a/src/Nethermind/Nethermind.Blockchain/Tracing/BlockReceiptsTracer.cs +++ b/src/Nethermind/Nethermind.Blockchain/Tracing/BlockReceiptsTracer.cs @@ -235,6 +235,7 @@ public void ReportFees(UInt256 fees, UInt256 burntFees) protected Transaction? CurrentTx; public ReadOnlySpan TxReceipts => CollectionsMarshal.AsSpan(_txReceipts); public TxReceipt LastReceipt => _txReceipts[^1]; + public IBlockTracer OtherTracer => _otherTracer; /// /// EIP-8037: cumulative state gas for the last tracked tx. diff --git a/src/Nethermind/Nethermind.Blockchain/Tracing/CompositeBlockTracer.cs b/src/Nethermind/Nethermind.Blockchain/Tracing/CompositeBlockTracer.cs index b9f9138fd310..23d153fbc1a7 100644 --- a/src/Nethermind/Nethermind.Blockchain/Tracing/CompositeBlockTracer.cs +++ b/src/Nethermind/Nethermind.Blockchain/Tracing/CompositeBlockTracer.cs @@ -99,4 +99,45 @@ public IBlockTracer GetTracer() => 1 => _childTracers[0], _ => this }; + + public IBlockTracer GetParallelSafeTracer() + { + List parallelSafeTracers = []; + for (int index = 0; index < _childTracers.Count; index++) + { + IBlockTracer childTracer = _childTracers[index]; + if (childTracer is IParallelSafeBlockTracer) + { + parallelSafeTracers.Add(childTracer); + } + else if (childTracer is CompositeBlockTracer compositeBlockTracer) + { + IBlockTracer parallelSafeTracer = compositeBlockTracer.GetParallelSafeTracer(); + if (parallelSafeTracer != NullBlockTracer.Instance) + { + parallelSafeTracers.Add(parallelSafeTracer); + } + } + } + + int parallelSafeTracerCount = parallelSafeTracers.Count; + if (parallelSafeTracerCount == 0) + { + return NullBlockTracer.Instance; + } + + if (parallelSafeTracerCount == 1) + { + return parallelSafeTracers[0]; + } + + CompositeBlockTracer parallelSafeCompositeBlockTracer = new(); + parallelSafeCompositeBlockTracer._childTracers.AddRange(parallelSafeTracers); + for (int index = 0; index < parallelSafeTracerCount; index++) + { + parallelSafeCompositeBlockTracer.IsTracingRewards |= parallelSafeTracers[index].IsTracingRewards; + } + + return parallelSafeCompositeBlockTracer; + } } diff --git a/src/Nethermind/Nethermind.Consensus.Test/ProcessingStatsTests.cs b/src/Nethermind/Nethermind.Consensus.Test/ProcessingStatsTests.cs new file mode 100644 index 000000000000..37f7c5baefdd --- /dev/null +++ b/src/Nethermind/Nethermind.Consensus.Test/ProcessingStatsTests.cs @@ -0,0 +1,180 @@ +// SPDX-FileCopyrightText: 2026 Demerzel Solutions Limited +// SPDX-License-Identifier: LGPL-3.0-only + +using System; +using System.Threading.Tasks; +using Nethermind.Consensus.Processing; +using Nethermind.Core; +using Nethermind.Core.Test.Builders; +using Nethermind.Int256; +using Nethermind.Logging; +using Nethermind.State; +using NSubstitute; +using NUnit.Framework; +using BlockchainMetrics = Nethermind.Blockchain.Metrics; + +namespace Nethermind.Consensus.Test; + +[NonParallelizable] +public class ProcessingStatsTests +{ + [Test] + public async Task UpdateStats_aggregates_block_totals_for_processing_window() => + await WithRestoredBlockchainMetrics(async () => + { + ProcessingStats processingStats = CreateProcessingStats(out TaskCompletionSource completion); + + BlockHeader baseBlock = Build.A.BlockHeader + .WithNumber(0) + .WithStateRoot(TestItem.KeccakA) + .TestObject; + Block block1 = Build.A.Block + .WithParent(baseBlock) + .WithGasUsed(2_000_000) + .WithStateRoot(TestItem.KeccakB) + .WithTransactions(new Transaction(), new Transaction()) + .TestObject; + Block block2 = Build.A.Block + .WithParent(block1) + .WithGasUsed(3_000_000) + .WithStateRoot(TestItem.KeccakC) + .WithTransactions(new Transaction()) + .TestObject; + + processingStats.UpdateStats([block1, block2], baseBlock, 500_000); + + BlockStatistics stats = await completion.Task.WaitAsync(TimeSpan.FromSeconds(5)); + + Assert.Multiple(() => + { + Assert.That(stats.BlockCount, Is.EqualTo(2)); + Assert.That(stats.BlockFrom, Is.EqualTo(1)); + Assert.That(stats.BlockTo, Is.EqualTo(2)); + Assert.That(stats.ProcessingMs, Is.EqualTo(500)); + Assert.That(stats.MGasPerSecond, Is.EqualTo(10)); + }); + }); + + [Test] + public async Task UpdateStats_aggregates_multiple_updates_until_report_window() => + await WithRestoredBlockchainMetrics(async () => + { +#if DEBUG + Assert.Ignore("ProcessingStats emits every report when debug logging is enabled."); +#endif + TestProcessingStats processingStats = CreateTestProcessingStats(out TaskCompletionSource completion); + processingStats.Start(); + + BlockHeader baseBlock = Build.A.BlockHeader + .WithNumber(0) + .WithStateRoot(TestItem.KeccakA) + .TestObject; + Block block1 = Build.A.Block + .WithParent(baseBlock) + .WithGasUsed(2_000_000) + .WithStateRoot(TestItem.KeccakB) + .WithTransactions(new Transaction(), new Transaction()) + .TestObject; + Block block2 = Build.A.Block + .WithParent(block1) + .WithGasUsed(3_000_000) + .WithStateRoot(TestItem.KeccakC) + .WithTransactions(new Transaction()) + .TestObject; + + processingStats.GenerateReportForTest(block1, baseBlock, blockCount: 1, gasUsed: 2_000_000, transactionCount: 2, processingMicroseconds: 200_000); + processingStats.AdvanceReportWindow(); + processingStats.GenerateReportForTest(block2, block1.Header, blockCount: 1, gasUsed: 3_000_000, transactionCount: 1, processingMicroseconds: 300_000); + + BlockStatistics stats = await completion.Task.WaitAsync(TimeSpan.FromSeconds(5)); + + Assert.Multiple(() => + { + Assert.That(stats.BlockCount, Is.EqualTo(2)); + Assert.That(stats.BlockFrom, Is.EqualTo(1)); + Assert.That(stats.BlockTo, Is.EqualTo(2)); + Assert.That(stats.ProcessingMs, Is.EqualTo(500)); + Assert.That(stats.MGasPerSecond, Is.EqualTo(10)); + }); + }); + + private static ProcessingStats CreateProcessingStats(out TaskCompletionSource completion) + { + IStateReader stateReader = Substitute.For(); + stateReader.HasStateForBlock(Arg.Any()).Returns(true); + ProcessingStats processingStats = new(stateReader, new TestLogManager(LogLevel.Warn)); + TaskCompletionSource statsCompletion = new(TaskCreationOptions.RunContinuationsAsynchronously); + processingStats.NewProcessingStatistics += (_, stats) => statsCompletion.TrySetResult(stats); + completion = statsCompletion; + return processingStats; + } + + private static TestProcessingStats CreateTestProcessingStats(out TaskCompletionSource completion) + { + IStateReader stateReader = Substitute.For(); + stateReader.HasStateForBlock(Arg.Any()).Returns(true); + TestProcessingStats processingStats = new(stateReader); + TaskCompletionSource statsCompletion = new(TaskCreationOptions.RunContinuationsAsynchronously); + processingStats.NewProcessingStatistics += (_, stats) => statsCompletion.TrySetResult(stats); + completion = statsCompletion; + return processingStats; + } + + private static async Task WithRestoredBlockchainMetrics(Func test) + { + double originalMgas = BlockchainMetrics.Mgas; + double originalMgasPerSec = BlockchainMetrics.MgasPerSec; + long originalTransactions = BlockchainMetrics.Transactions; + long originalBlocks = BlockchainMetrics.Blocks; + long originalBlockchainHeight = BlockchainMetrics.BlockchainHeight; + UInt256 originalTotalDifficulty = BlockchainMetrics.TotalDifficulty; + UInt256 originalLastDifficulty = BlockchainMetrics.LastDifficulty; + long originalGasUsed = BlockchainMetrics.GasUsed; + long originalGasLimit = BlockchainMetrics.GasLimit; + + try + { + await test(); + } + finally + { + BlockchainMetrics.Mgas = originalMgas; + BlockchainMetrics.MgasPerSec = originalMgasPerSec; + BlockchainMetrics.Transactions = originalTransactions; + BlockchainMetrics.Blocks = originalBlocks; + BlockchainMetrics.BlockchainHeight = originalBlockchainHeight; + BlockchainMetrics.TotalDifficulty = originalTotalDifficulty; + BlockchainMetrics.LastDifficulty = originalLastDifficulty; + BlockchainMetrics.GasUsed = originalGasUsed; + BlockchainMetrics.GasLimit = originalGasLimit; + } + } + + private sealed class TestProcessingStats(IStateReader stateReader) + : ProcessingStats(stateReader, new TestLogManager(LogLevel.Warn)) + { + private long _reportMs; + + public void AdvanceReportWindow() => _reportMs += 1_001; + + public void GenerateReportForTest( + Block block, + BlockHeader baseBlock, + long blockCount, + long gasUsed, + long transactionCount, + long processingMicroseconds) => + GenerateReport(new BlockData + { + Block = block, + BaseBlock = baseBlock, + BlockCount = blockCount, + FirstBlockNumber = block.Number, + GasUsed = gasUsed, + TransactionCount = transactionCount, + ProcessingMicroseconds = processingMicroseconds + }); + + protected override long GetReportMs() => _reportMs; + } +} diff --git a/src/Nethermind/Nethermind.Consensus/Processing/BlockProcessor.ParallelBlockValidationTransactionsExecutor.cs b/src/Nethermind/Nethermind.Consensus/Processing/BlockProcessor.ParallelBlockValidationTransactionsExecutor.cs index 5ef78806ecda..28b152eaa424 100644 --- a/src/Nethermind/Nethermind.Consensus/Processing/BlockProcessor.ParallelBlockValidationTransactionsExecutor.cs +++ b/src/Nethermind/Nethermind.Consensus/Processing/BlockProcessor.ParallelBlockValidationTransactionsExecutor.cs @@ -10,6 +10,7 @@ using Nethermind.Core.Threading; using Nethermind.Evm; using Nethermind.Evm.State; +using Nethermind.Evm.Tracing; using Nethermind.Evm.TransactionProcessing; using Nethermind.Logging; @@ -44,7 +45,7 @@ public TxReceipt[] ProcessTransactions(Block block, ProcessingOptions processing Metrics.ResetBlockStats(); return !block.IsGenesis && balManager.ParallelExecutionEnabled - ? ProcessTransactionsParallel(block, processingOptions, token) + ? ProcessTransactionsParallel(block, processingOptions, receiptsTracer, token) : ProcessTransactionsSequential(block, processingOptions, receiptsTracer, token); } @@ -66,9 +67,11 @@ private TxReceipt[] ProcessTransactionsSequential(Block block, ProcessingOptions return [.. receiptsTracer.TxReceipts]; } - private TxReceipt[] ProcessTransactionsParallel(Block block, ProcessingOptions processingOptions, CancellationToken token) + private TxReceipt[] ProcessTransactionsParallel(Block block, ProcessingOptions processingOptions, BlockReceiptsTracer receiptsTracer, CancellationToken token) { int len = block.Transactions.Length; + bool isBlockProcessingThread = ProcessingThread.IsBlockProcessingThread; + IBlockTracer parallelSafeTracer = GetParallelSafeTracer(receiptsTracer.OtherTracer); BlockReceiptsTracer[] receiptsTracers = new BlockReceiptsTracer[len]; TaskCompletionSource<(long BlockGasUsed, long BlockStateGasUsed, InvalidBlockException? Exception)>[] gasResults = new TaskCompletionSource<(long BlockGasUsed, long BlockStateGasUsed, InvalidBlockException? Exception)>[len]; @@ -76,6 +79,10 @@ private TxReceipt[] ProcessTransactionsParallel(Block block, ProcessingOptions p { BlockReceiptsTracer tracer = new(true); tracer.StartNewBlockTrace(block); + if (parallelSafeTracer != NullBlockTracer.Instance) + { + tracer.SetOtherTracer(parallelSafeTracer); + } receiptsTracers[i] = tracer; gasResults[i] = new TaskCompletionSource<(long BlockGasUsed, long BlockStateGasUsed, InvalidBlockException? Exception)>(); } @@ -89,46 +96,60 @@ private TxReceipt[] ProcessTransactionsParallel(Block block, ProcessingOptions p 0, len + 1, ParallelUnbalancedWork.DefaultOptions, - (block, processingOptions, stateProvider, balManager, receiptsTracers, gasResults, specProvider, txs: block.Transactions), + (block, processingOptions, stateProvider, balManager, receiptsTracers, gasResults, specProvider, + txs: block.Transactions, isBlockProcessingThread), static (i, state) => { - if (i == 0) - { - // ApplyStateChanges mutates the shared stateProvider so runs inside - // the parallel loop (slot 0) rather than via Task.Run. Parallel tx - // workers read from BAL-backed world states, not stateProvider. - BlockAccessListManager.ApplyStateChanges(state.block.BlockAccessList, state.stateProvider, state.specProvider.GetSpec(state.block.Header), !state.block.Header.IsGenesis || !state.specProvider.GenesisStateUnavailable); - return state; - } - - int txIndex = i - 1; + bool previousIsBlockProcessingThread = ProcessingThread.IsBlockProcessingThread; + ProcessingThread.IsBlockProcessingThread = state.isBlockProcessingThread; try { - Transaction tx = state.txs[txIndex]; - ProcessTransaction( - state.balManager.GetTxProcessor(i), - state.stateProvider, - state.block, - tx, - txIndex, - state.receiptsTracers[txIndex], - state.processingOptions); - state.gasResults[txIndex].SetResult((tx.BlockGasUsed, state.receiptsTracers[txIndex].BlockStateGasUsed, null)); - } - catch (InvalidBlockException ex) - { - state.gasResults[txIndex].SetResult((state.txs[txIndex].GasLimit, 0, ex)); + if (i == 0) + { + // ApplyStateChanges mutates the shared stateProvider so runs inside + // the parallel loop (slot 0) rather than via Task.Run. Parallel tx + // workers read from BAL-backed world states, not stateProvider. + BlockAccessListManager.ApplyStateChanges( + state.block.BlockAccessList, + state.stateProvider, + state.specProvider.GetSpec(state.block.Header), + !state.block.Header.IsGenesis || !state.specProvider.GenesisStateUnavailable); + return state; + } + + int txIndex = i - 1; + try + { + Transaction tx = state.txs[txIndex]; + ProcessTransaction( + state.balManager.GetTxProcessor(i), + state.stateProvider, + state.block, + tx, + txIndex, + state.receiptsTracers[txIndex], + state.processingOptions); + state.gasResults[txIndex].SetResult((tx.BlockGasUsed, state.receiptsTracers[txIndex].BlockStateGasUsed, null)); + } + catch (InvalidBlockException ex) + { + state.gasResults[txIndex].SetResult((state.txs[txIndex].GasLimit, 0, ex)); + } + catch + { + // Ensure IncrementalValidation is not permanently blocked on gasResults[j] + // if an unexpected exception escapes the worker (e.g. NRE, OCE). + // SetCanceled unblocks the inner GetAwaiter().GetResult() loop. + state.gasResults[txIndex].TrySetCanceled(); + throw; + } + + return state; } - catch + finally { - // Ensure IncrementalValidation is not permanently blocked on gasResults[j] - // if an unexpected exception escapes the worker (e.g. NRE, OCE). - // SetCanceled unblocks the inner GetAwaiter().GetResult() loop. - state.gasResults[txIndex].TrySetCanceled(); - throw; + ProcessingThread.IsBlockProcessingThread = previousIsBlockProcessingThread; } - - return state; }); } catch @@ -158,6 +179,14 @@ private TxReceipt[] ProcessTransactionsParallel(Block block, ProcessingOptions p return CombineReceipts(receiptsTracers, len, block); } + private static IBlockTracer GetParallelSafeTracer(IBlockTracer tracer) => + tracer switch + { + IParallelSafeBlockTracer => tracer, + CompositeBlockTracer compositeBlockTracer => compositeBlockTracer.GetParallelSafeTracer(), + _ => NullBlockTracer.Instance + }; + private static TxReceipt[] CombineReceipts(BlockReceiptsTracer[] receiptsTracers, int len, Block block) { TxReceipt[] result = new TxReceipt[len]; diff --git a/src/Nethermind/Nethermind.Consensus/Processing/BlockchainProcessor.cs b/src/Nethermind/Nethermind.Consensus/Processing/BlockchainProcessor.cs index 021fb0c97013..678871da735f 100644 --- a/src/Nethermind/Nethermind.Consensus/Processing/BlockchainProcessor.cs +++ b/src/Nethermind/Nethermind.Consensus/Processing/BlockchainProcessor.cs @@ -490,7 +490,7 @@ private void FireProcessingQueueEmpty() int blockQueueCount = _blockQueue.Reader.Count; Metrics.RecoveryQueueSize = Math.Max(_queueCount - blockQueueCount - (IsProcessingBlock ? 1 : 0), 0); Metrics.ProcessingQueueSize = blockQueueCount; - _stats.UpdateStats(lastProcessed, processingBranch.BaseBlock, blockProcessingTimeInMicrosecs); + _stats.UpdateStats(processedBlocks, processingBranch.BaseBlock, blockProcessingTimeInMicrosecs); } bool updateHead = !options.ContainsFlag(ProcessingOptions.DoNotUpdateHead); diff --git a/src/Nethermind/Nethermind.Consensus/Processing/IProcessingStats.cs b/src/Nethermind/Nethermind.Consensus/Processing/IProcessingStats.cs index 7d7b65723e69..b16ff7e58730 100644 --- a/src/Nethermind/Nethermind.Consensus/Processing/IProcessingStats.cs +++ b/src/Nethermind/Nethermind.Consensus/Processing/IProcessingStats.cs @@ -2,6 +2,7 @@ // SPDX-License-Identifier: LGPL-3.0-only using System; +using System.Collections.Generic; using Nethermind.Core; namespace Nethermind.Consensus.Processing; @@ -27,10 +28,10 @@ public interface IProcessingStats void CaptureStartStats(); /// - /// Update statistics after a block has been processed. + /// Update statistics after blocks have been processed. /// - /// The processed block. + /// The processed blocks. /// The parent block header. /// Processing time in microseconds. - void UpdateStats(Block? block, BlockHeader? baseBlock, long blockProcessingTimeInMicros); + void UpdateStats(IReadOnlyList blocks, BlockHeader? baseBlock, long blockProcessingTimeInMicros); } diff --git a/src/Nethermind/Nethermind.Consensus/Processing/ProcessingStats.cs b/src/Nethermind/Nethermind.Consensus/Processing/ProcessingStats.cs index 1ea1057036b5..b8731f8e8226 100644 --- a/src/Nethermind/Nethermind.Consensus/Processing/ProcessingStats.cs +++ b/src/Nethermind/Nethermind.Consensus/Processing/ProcessingStats.cs @@ -59,6 +59,7 @@ public class ProcessingStats : IProcessingStats private long _chunkTx; private long _chunkBlobs; private long _chunkBlocks; + private long _chunkFirstBlockNumber = -1; private long _opCodes; private long _callOps; private long _emptyCalls; @@ -96,13 +97,34 @@ public void CaptureStartStats() _startOpCodes = Evm.Metrics.MainThreadOpCodes; } - public void UpdateStats(Block? block, BlockHeader? baseBlock, long blockProcessingTimeInMicros) + public void UpdateStats(IReadOnlyList blocks, BlockHeader? baseBlock, long blockProcessingTimeInMicros) { - if (block is null) return; + if (blocks.Count == 0) return; + + Block lastBlock = blocks[^1]; + long gasUsed = 0; + long transactionCount = 0; + long blobCount = 0; + for (int i = 0; i < blocks.Count; i++) + { + Block block = blocks[i]; + gasUsed += block.GasUsed; + Transaction[] transactions = block.Transactions; + transactionCount += transactions.Length; + for (int j = 0; j < transactions.Length; j++) + { + blobCount += transactions[j].GetBlobCount(); + } + } BlockData blockData = _dataPool.Get(); - blockData.Block = block; + blockData.Block = lastBlock; blockData.BaseBlock = baseBlock; + blockData.BlockCount = blocks.Count; + blockData.FirstBlockNumber = blocks[0].Number; + blockData.GasUsed = gasUsed; + blockData.TransactionCount = transactionCount; + blockData.BlobCount = blobCount; blockData.RunningMicroseconds = _runStopwatch.ElapsedMicroseconds(); blockData.RunMicroseconds = (_runStopwatch.ElapsedMicroseconds() - _lastElapsedRunningMicroseconds); blockData.StartOpCodes = _startOpCodes; @@ -169,27 +191,35 @@ protected virtual void GenerateReport(BlockData data) if (block is null) return; long blockNumber = data.Block.Number; - double chunkMGas = (_chunkMGas += block.GasUsed / 1_000_000.0); + double chunkMGas = (_chunkMGas += data.GasUsed / 1_000_000.0); // We want the rate here - double mgas = block.GasUsed / 1_000_000.0; + double mgas = data.GasUsed / 1_000_000.0; double timeSec = data.ProcessingMicroseconds / 1_000_000.0; Metrics.BlockMGasPerSec.Observe(mgas / timeSec); Metrics.BlockProcessingTimeMicros.Observe(data.ProcessingMicroseconds); - Metrics.Mgas += block.GasUsed / 1_000_000.0; + Metrics.Mgas += data.GasUsed / 1_000_000.0; Transaction[] txs = block.Transactions; double chunkMicroseconds = (_chunkProcessingMicroseconds += data.ProcessingMicroseconds); - double chunkTx = (_chunkTx += txs.Length); + double chunkTx = (_chunkTx += data.TransactionCount); - long chunkBlocks = (++_chunkBlocks); + long chunkFirstBlockNumber = _chunkFirstBlockNumber; + if (chunkFirstBlockNumber == -1) + { + chunkFirstBlockNumber = data.FirstBlockNumber; + _chunkFirstBlockNumber = chunkFirstBlockNumber; + } + + long chunkBlocks = (_chunkBlocks += data.BlockCount); Metrics.Blocks = blockNumber; Metrics.BlockchainHeight = blockNumber; - Metrics.Transactions += txs.Length; + Metrics.Transactions += data.TransactionCount; Metrics.TotalDifficulty = block.TotalDifficulty ?? UInt256.Zero; Metrics.LastDifficulty = block.Difficulty; + // These gauges describe the latest processed block; chunk totals above use data.GasUsed and data.TransactionCount. Metrics.GasUsed = block.GasUsed; Metrics.GasLimit = block.GasLimit; @@ -212,6 +242,13 @@ protected virtual void GenerateReport(BlockData data) isMev = true; } + _chunkBlobs += data.BlobCount; + long blobs = _chunkBlobs; + if (blobs > 0) + { + _showBlobs = true; + } + if (data.BaseBlock is null || !_stateReader.HasStateForBlock(data.BaseBlock) || block.StateRoot is null || !_stateReader.HasStateForBlock(block.Header)) return; @@ -238,17 +275,7 @@ protected virtual void GenerateReport(BlockData data) if (_logger.IsError) _logger.Error("Error when calculating block rewards", ex); } - foreach (Transaction tx in txs) - { - _chunkBlobs += tx.GetBlobCount(); - } - long blobs = _chunkBlobs; - if (blobs > 0) - { - _showBlobs = true; - } - - long reportMs = Environment.TickCount64; + long reportMs = GetReportMs(); if (reportMs - _lastReportMs > 1000 || _logger.IsDebug) { _lastReportMs = reportMs; @@ -260,6 +287,7 @@ protected virtual void GenerateReport(BlockData data) _chunkBlobs = 0; _chunkBlocks = 0; + _chunkFirstBlockNumber = -1; _chunkMGas = 0; _chunkTx = 0; _chunkProcessingMicroseconds = 0; @@ -298,7 +326,7 @@ protected virtual void GenerateReport(BlockData data) NewProcessingStatistics?.Invoke(this, new BlockStatistics() { BlockCount = chunkBlocks, - BlockFrom = block.Number - chunkBlocks + 1, + BlockFrom = chunkFirstBlockNumber, BlockTo = block.Number, ProcessingMs = chunkMs, @@ -317,7 +345,7 @@ protected virtual void GenerateReport(BlockData data) { if (chunkBlocks > 1) { - _logger.Info($"Processed {block.Number - chunkBlocks + 1,10}...{block.Number,9} | {chunkMs,10:N1} ms | slot {runMs,11:N0} ms |{blockGas}"); + _logger.Info($"Processed {chunkFirstBlockNumber,10}...{block.Number,9} | {chunkMs,10:N1} ms | slot {runMs,11:N0} ms |{blockGas}"); } else { @@ -413,11 +441,13 @@ UInt256 CalculateBalanceChange(BlockHeader? startBlock, BlockHeader endBlock, Ad } } + protected virtual long GetReportMs() => Environment.TickCount64; + public void Start() { if (!_runStopwatch.IsRunning) { - _lastReportMs = Environment.TickCount64; + _lastReportMs = GetReportMs(); _runStopwatch.Start(); } } @@ -438,6 +468,11 @@ public bool Return(BlockData data) // Release the object references so we don't hold them from being GC'd data.Block = null; data.BaseBlock = null; + data.BlockCount = 0; + data.FirstBlockNumber = 0; + data.GasUsed = 0; + data.TransactionCount = 0; + data.BlobCount = 0; return true; } @@ -447,6 +482,11 @@ protected class BlockData { public Block Block; public BlockHeader? BaseBlock; + public long BlockCount; + public long FirstBlockNumber; + public long GasUsed; + public long TransactionCount; + public long BlobCount; public long CurrentOpCodes; public long CurrentSLoadOps; public long CurrentSStoreOps; diff --git a/src/Nethermind/Nethermind.Core.Test/Threading/ParallelUnbalancedWorkTests.cs b/src/Nethermind/Nethermind.Core.Test/Threading/ParallelUnbalancedWorkTests.cs index e9a128291cb1..efca86fe4a68 100644 --- a/src/Nethermind/Nethermind.Core.Test/Threading/ParallelUnbalancedWorkTests.cs +++ b/src/Nethermind/Nethermind.Core.Test/Threading/ParallelUnbalancedWorkTests.cs @@ -122,12 +122,16 @@ public void For_WhenWorkerFaults_OtherWorkersStopFetchingWork() // Once one worker captures an exception, others must stop pulling new indices — otherwise // we burn CPU and run side effects after the operation is already faulted. int actionCalls = 0; + int hasThrown = 0; const int range = 100_000; Action act = () => ParallelUnbalancedWork.For(0, range, FourThreads, i => { Interlocked.Increment(ref actionCalls); - if (i == 0) throw new InvalidOperationException(); + if (Interlocked.CompareExchange(ref hasThrown, 1, 0) == 0) + { + throw new InvalidOperationException(); + } }); act.Should().Throw(); diff --git a/src/Nethermind/Nethermind.Evm/Tracing/IBlockTracer.cs b/src/Nethermind/Nethermind.Evm/Tracing/IBlockTracer.cs index 09dc03e4bcf6..9f22136bf4b8 100644 --- a/src/Nethermind/Nethermind.Evm/Tracing/IBlockTracer.cs +++ b/src/Nethermind/Nethermind.Evm/Tracing/IBlockTracer.cs @@ -61,4 +61,11 @@ public interface IBlockTracer : IBlockTracer { IReadOnlyCollection BuildResult(); } + + /// + /// Marker for block tracers that support concurrent transaction tracing callbacks for one active block. + /// + public interface IParallelSafeBlockTracer : IBlockTracer + { + } } diff --git a/src/Nethermind/Nethermind.Merge.Plugin.Test/BaseEngineModuleTests.cs b/src/Nethermind/Nethermind.Merge.Plugin.Test/BaseEngineModuleTests.cs index 1104fa681659..6640927d3ffb 100644 --- a/src/Nethermind/Nethermind.Merge.Plugin.Test/BaseEngineModuleTests.cs +++ b/src/Nethermind/Nethermind.Merge.Plugin.Test/BaseEngineModuleTests.cs @@ -203,6 +203,8 @@ public MergeTestBlockchain(IMergeConfig? mergeConfig = null) { MergeConfig.NewPayloadBlockProcessingTimeout = 60_000; } + + TestTimeout = Math.Max(TestTimeout, MergeConfig.NewPayloadBlockProcessingTimeout); } protected override Task AddBlocksOnStart() => Task.CompletedTask; diff --git a/src/Nethermind/Nethermind.Merge.Plugin.Test/EngineModuleTests.RelayBuilder.cs b/src/Nethermind/Nethermind.Merge.Plugin.Test/EngineModuleTests.RelayBuilder.cs index 7044964d85a6..d5fb017d0b84 100644 --- a/src/Nethermind/Nethermind.Merge.Plugin.Test/EngineModuleTests.RelayBuilder.cs +++ b/src/Nethermind/Nethermind.Merge.Plugin.Test/EngineModuleTests.RelayBuilder.cs @@ -155,7 +155,8 @@ public virtual async Task forkchoiceUpdatedV1_should_communicate_with_boost_rela mockHttp .Expect(HttpMethod.Post, relayUrl + BoostRelay.SendPayloadPath) - .WithContent(serializer.Serialize(expected)); + .WithContent(serializer.Serialize(expected)) + .Respond("application/json", "{}"); DefaultHttpClient defaultHttpClient = new(mockHttp.ToHttpClient(), serializer, logManager, 1, 100); diff --git a/src/Nethermind/Nethermind.OpcodeTracing.Plugin.Test/OpcodeBlockTracerTests.cs b/src/Nethermind/Nethermind.OpcodeTracing.Plugin.Test/OpcodeBlockTracerTests.cs new file mode 100644 index 000000000000..68b00403b011 --- /dev/null +++ b/src/Nethermind/Nethermind.OpcodeTracing.Plugin.Test/OpcodeBlockTracerTests.cs @@ -0,0 +1,80 @@ +// SPDX-FileCopyrightText: 2026 Demerzel Solutions Limited +// SPDX-License-Identifier: LGPL-3.0-only + +using Nethermind.Core; +using Nethermind.Core.Crypto; +using Nethermind.Evm; +using Nethermind.Evm.Tracing; +using Nethermind.Int256; +using Nethermind.OpcodeTracing.Plugin.Tracing; +using NUnit.Framework; + +namespace Nethermind.OpcodeTracing.Plugin.Test; + +public class OpcodeBlockTracerTests +{ + [Test] + public void EndTxTrace_accumulates_transaction_trace() + { + OpcodeBlockTrace? completedTrace = null; + OpcodeBlockTracer blockTracer = new(trace => completedTrace = trace); + Transaction transaction = new() { GasLimit = 1 }; + + blockTracer.StartNewBlockTrace(new Block(CreateHeader(), [transaction], [])); + ITxTracer txTracer = blockTracer.StartNewTxTrace(transaction); + txTracer.StartOperation(0, Instruction.ADD, 0, null!); + blockTracer.EndTxTrace(); + blockTracer.EndBlockTrace(); + + Assert.That(completedTrace, Is.Not.Null); + Assert.Multiple(() => + { + Assert.That(completedTrace!.TransactionCount, Is.EqualTo(1)); + Assert.That(completedTrace.Opcodes[(byte)Instruction.ADD], Is.EqualTo(1)); + }); + } + + [Test] + public void Counts_all_transaction_traces_when_completed_in_parallel() + { + const int txCount = 128; + OpcodeBlockTrace? completedTrace = null; + OpcodeBlockTracer blockTracer = new(trace => completedTrace = trace); + Transaction[] transactions = new Transaction[txCount]; + for (int i = 0; i < transactions.Length; i++) + { + transactions[i] = new Transaction { GasLimit = 1 }; + } + + Block block = new(CreateHeader(), transactions, []); + blockTracer.StartNewBlockTrace(block); + + Parallel.For(0, txCount, i => + { + using ITxTracer txTracer = blockTracer.StartNewTxTrace(transactions[i]); + txTracer.StartOperation(0, Instruction.ADD, 0, null!); + txTracer.StartOperation(1, Instruction.STOP, 0, null!); + blockTracer.EndTxTrace(); + }); + blockTracer.EndBlockTrace(); + + Assert.That(completedTrace, Is.Not.Null); + Assert.Multiple(() => + { + Assert.That(completedTrace!.TransactionCount, Is.EqualTo(txCount)); + Assert.That(completedTrace.Opcodes[(byte)Instruction.ADD], Is.EqualTo(txCount)); + Assert.That(completedTrace.Opcodes[(byte)Instruction.STOP], Is.EqualTo(txCount)); + }); + } + + private static BlockHeader CreateHeader() => + new( + Keccak.Zero, + Keccak.OfAnEmptySequenceRlp, + Address.Zero, + UInt256.Zero, + number: 1, + gasLimit: 1_000_000, + timestamp: 1, + extraData: []); +} diff --git a/src/Nethermind/Nethermind.OpcodeTracing.Plugin/InternalsVisibility.cs b/src/Nethermind/Nethermind.OpcodeTracing.Plugin/InternalsVisibility.cs new file mode 100644 index 000000000000..051434d27185 --- /dev/null +++ b/src/Nethermind/Nethermind.OpcodeTracing.Plugin/InternalsVisibility.cs @@ -0,0 +1,6 @@ +// SPDX-FileCopyrightText: 2026 Demerzel Solutions Limited +// SPDX-License-Identifier: LGPL-3.0-only + +using System.Runtime.CompilerServices; + +[assembly: InternalsVisibleTo("Nethermind.OpcodeTracing.Plugin.Test")] diff --git a/src/Nethermind/Nethermind.OpcodeTracing.Plugin/Tracing/OpcodeBlockTracer.cs b/src/Nethermind/Nethermind.OpcodeTracing.Plugin/Tracing/OpcodeBlockTracer.cs index ebb955b4c0cb..c260c8ce0986 100644 --- a/src/Nethermind/Nethermind.OpcodeTracing.Plugin/Tracing/OpcodeBlockTracer.cs +++ b/src/Nethermind/Nethermind.OpcodeTracing.Plugin/Tracing/OpcodeBlockTracer.cs @@ -8,11 +8,19 @@ namespace Nethermind.OpcodeTracing.Plugin.Tracing; -internal sealed class OpcodeBlockTracer(Action onBlockCompleted) : IBlockTracer +internal sealed class OpcodeBlockTracer(Action onBlockCompleted) : IParallelSafeBlockTracer { private readonly Action _onBlockCompleted = onBlockCompleted ?? throw new ArgumentNullException(nameof(onBlockCompleted)); private OpcodeTraceBuilder? _builder; - private OpcodeCountingTxTracer? _currentTxTracer; + + // Transaction execution is synchronous; the standard processing path ends + // the trace on the same worker thread that started it. Keep this out of + // AsyncLocal because it sits on the per-transaction hot path. + [ThreadStatic] + private static OpcodeBlockTracer? t_currentTracerOwner; + + [ThreadStatic] + private static OpcodeCountingTxTracer? t_currentTxTracer; public bool IsTracingRewards => false; @@ -21,44 +29,65 @@ public void ReportReward(Address author, string rewardType, UInt256 rewardValue) // Rewards do not execute opcodes, so nothing to capture here. } - public void StartNewBlockTrace(Block block) => - _builder = new OpcodeTraceBuilder(block ?? throw new ArgumentNullException(nameof(block))); + public void StartNewBlockTrace(Block block) + { + ArgumentNullException.ThrowIfNull(block); + + Volatile.Write(ref _builder, new OpcodeTraceBuilder(block)); + } public ITxTracer StartNewTxTrace(Transaction? tx) { - if (_builder is null || tx is null) + if (tx is null) { - _currentTxTracer = null; + ClearCurrentTxTracer(); return NullTxTracer.Instance; } - OpcodeCountingTxTracer tracer = new(); - _currentTxTracer = tracer; + OpcodeTraceBuilder? builder = Volatile.Read(ref _builder); + if (builder is null) + { + ClearCurrentTxTracer(); + return NullTxTracer.Instance; + } + + OpcodeCountingTxTracer tracer = new(builder); + t_currentTracerOwner = this; + t_currentTxTracer = tracer; return tracer; } public void EndTxTrace() { - if (_builder is null || _currentTxTracer is null) + if (!ReferenceEquals(t_currentTracerOwner, this)) { - _currentTxTracer = null; return; } - _builder.Accumulate(_currentTxTracer); - _currentTxTracer = null; + OpcodeCountingTxTracer? currentTxTracer = t_currentTxTracer; + t_currentTracerOwner = null; + t_currentTxTracer = null; + currentTxTracer?.Dispose(); } public void EndBlockTrace() { - if (_builder is null) + OpcodeTraceBuilder? builder = Interlocked.Exchange(ref _builder, null); + if (builder is null) { return; } - OpcodeBlockTrace trace = _builder.Build(); - _builder = null; - _onBlockCompleted(trace); + _onBlockCompleted(builder.Build()); + } + + private void ClearCurrentTxTracer() + { + if (ReferenceEquals(t_currentTracerOwner, this)) + { + t_currentTracerOwner = null; + t_currentTxTracer = null; + } } } @@ -80,8 +109,8 @@ internal sealed class OpcodeTraceBuilder(Block block) public void Accumulate(OpcodeCountingTxTracer tracer) { - _transactions++; - tracer.AccumulateInto(_opcodeCounters); + Interlocked.Increment(ref _transactions); + tracer.AccumulateIntoThreadSafe(_opcodeCounters); } public OpcodeBlockTrace Build() @@ -104,9 +133,8 @@ public OpcodeBlockTrace Build() ParentHash = _block.ParentHash ?? Keccak.Zero, BlockNumber = _block.Number, Timestamp = _block.Timestamp, - TransactionCount = _transactions, + TransactionCount = Volatile.Read(ref _transactions), Opcodes = opcodeMap }; } } - diff --git a/src/Nethermind/Nethermind.OpcodeTracing.Plugin/Tracing/OpcodeCountingTxTracer.cs b/src/Nethermind/Nethermind.OpcodeTracing.Plugin/Tracing/OpcodeCountingTxTracer.cs index 9f8b28ba22d2..e1bc55a6bc6f 100644 --- a/src/Nethermind/Nethermind.OpcodeTracing.Plugin/Tracing/OpcodeCountingTxTracer.cs +++ b/src/Nethermind/Nethermind.OpcodeTracing.Plugin/Tracing/OpcodeCountingTxTracer.cs @@ -6,9 +6,10 @@ namespace Nethermind.OpcodeTracing.Plugin.Tracing; -internal sealed class OpcodeCountingTxTracer : TxTracer +internal sealed class OpcodeCountingTxTracer(OpcodeTraceBuilder? builder = null) : TxTracer { private const int OpcodeSpace = 256; + private OpcodeTraceBuilder? _builder = builder; private readonly long[] _opcodeCounters = new long[OpcodeSpace]; public override bool IsTracingInstructions => true; @@ -45,4 +46,24 @@ public void AccumulateInto(long[] aggregate) aggregate[i] += value; } } + + public void AccumulateIntoThreadSafe(long[] aggregate) + { + for (int i = 0; i < OpcodeSpace; i++) + { + long value = _opcodeCounters[i]; + if (value == 0) + { + continue; + } + + Interlocked.Add(ref aggregate[i], value); + } + } + + public override void Dispose() + { + OpcodeTraceBuilder? builder = Interlocked.Exchange(ref _builder, null); + builder?.Accumulate(this); + } } diff --git a/src/Nethermind/Nethermind.Synchronization.Test/E2ESyncTests.cs b/src/Nethermind/Nethermind.Synchronization.Test/E2ESyncTests.cs index 988c80b54d09..361a982d883f 100644 --- a/src/Nethermind/Nethermind.Synchronization.Test/E2ESyncTests.cs +++ b/src/Nethermind/Nethermind.Synchronization.Test/E2ESyncTests.cs @@ -393,6 +393,7 @@ private async Task SetPivot(SyncConfig syncConfig, CancellationToken cancellatio { IBlockProcessingQueue blockProcessingQueue = _server.Resolve(); await blockProcessingQueue.WaitForBlockProcessing(cancellationToken); + _server.Resolve().FlushCache(cancellationToken); IBlockTree serverBlockTree = _server.Resolve(); long serverHeadNumber = serverBlockTree.Head!.Number; BlockHeader pivot = serverBlockTree.FindHeader(serverHeadNumber - HeadPivotDistance)!;