Skip to content
294 changes: 294 additions & 0 deletions src/Nethermind/Nethermind.Blockchain.Test/BlockProcessorTests.cs
Original file line number Diff line number Diff line change
Expand Up @@ -30,12 +30,16 @@
using NSubstitute;
using NUnit.Framework;
using System;
using System.Collections.Concurrent;
using System.Collections.Generic;
using System.Security;
using System.Threading;
using System.Threading.Tasks;
using Nethermind.Blockchain.Tracing;
using Nethermind.Core.Threading;
using Nethermind.Evm;
using Nethermind.Evm.Tracing;
using Nethermind.Int256;

namespace Nethermind.Blockchain.Test;

Expand Down Expand Up @@ -337,4 +341,294 @@ public void ValidateBlockAccessList_storage_read_budget_uses_ItemCost(long gasRe
Assert.DoesNotThrow(() => balManager.ValidateBlockAccessList(block, 0));
}
}

[Test]
public void PrepareForProcessing_keeps_parallel_bal_execution_for_validated_eip8037_multi_tx_blocks()
{
BlockAccessListManager balManager = CreateAmsterdamBalManager();

Block block = Build.A.Block
.WithNumber(1)
.WithTransactions(2, Amsterdam.Instance)
.WithBlockAccessList(new BlockAccessList())
.TestObject;

balManager.PrepareForProcessing(block, Amsterdam.Instance, ProcessingOptions.None);

Assert.That(balManager.ParallelExecutionEnabled, Is.True);
}

[Test]
public void PrepareForProcessing_keeps_parallel_bal_execution_for_validated_eip8037_single_tx_blocks()
{
BlockAccessListManager balManager = CreateAmsterdamBalManager();

Block block = Build.A.Block
.WithNumber(1)
.WithTransactions(1, Amsterdam.Instance)
.WithBlockAccessList(new BlockAccessList())
.TestObject;

balManager.PrepareForProcessing(block, Amsterdam.Instance, ProcessingOptions.None);

Assert.That(balManager.ParallelExecutionEnabled, Is.True);
}

[Test]
public void IncrementalValidation_rejects_eip8037_tx_when_worst_case_exceeds_ordered_remaining_gas()
{
BlockAccessListManager balManager = CreateAmsterdamBalManager();
Transaction firstTx = Build.A.Transaction
.WithHash(TestItem.KeccakA)
.WithGasLimit(90_000)
.TestObject;
Transaction secondTx = Build.A.Transaction
.WithHash(TestItem.KeccakB)
.WithGasLimit(50_000)
.WithNonce(1)
.TestObject;
Block block = Build.A.Block
.WithNumber(1)
.WithGasLimit(100_000)
.WithTransactions(firstTx, secondTx)
.WithBlockAccessList(new BlockAccessList())
.TestObject;

balManager.PrepareForProcessing(block, Amsterdam.Instance, ProcessingOptions.None);
balManager.SetBlockExecutionContext(new(block.Header, Amsterdam.Instance));
balManager.Setup(block);

TaskCompletionSource<(long BlockGasUsed, long BlockStateGasUsed, InvalidBlockException? Exception)>[] gasResults =
[
new(),
new()
];
gasResults[0].SetResult((80_000, 0, null));
gasResults[1].SetResult((21_000, 0, null));

InvalidBlockException? exception = Assert.Throws<InvalidBlockException>(() =>
balManager.IncrementalValidation(block, gasResults, new BlockReceiptsTracer[2], null, CancellationToken.None));

Assert.That(exception!.Message, Does.Contain("Block gas limit exceeded"));
}

[TestCase(true)]
[TestCase(false)]
public void Parallel_validation_preserves_processing_thread_metric_scope_for_worker_transactions(bool isBlockProcessingThread)
{
Assume.That(Environment.ProcessorCount, Is.GreaterThan(1));

const int txCount = 64;
IWorldState stateProvider = TestWorldStateFactory.CreateForTest();
using IDisposable scope = stateProvider.BeginScope(IWorldState.PreGenesis);

Transaction[] transactions = new Transaction[txCount];
for (int i = 0; i < transactions.Length; i++)
{
transactions[i] = Build.A.Transaction
.WithNonce((UInt256)i)
.WithGasLimit(1)
.TestObject;
}

Block block = Build.A.Block
.WithNumber(1)
.WithGasLimit(txCount)
.WithTransactions(transactions)
.WithBlockAccessList(new BlockAccessList())
.TestObject;

using RecordingTransactionProcessorAdapter transactionProcessor = new();
IBlockAccessListManager balManager = Substitute.For<IBlockAccessListManager>();
balManager.Enabled.Returns(true);
balManager.ParallelExecutionEnabled.Returns(true);
balManager.GetTxProcessor(Arg.Any<int?>()).Returns(transactionProcessor);

IBlockProcessor.IBlockTransactionsExecutor inner = Substitute.For<IBlockProcessor.IBlockTransactionsExecutor>();
BlockProcessor.ParallelBlockValidationTransactionsExecutor executor = new(
inner,
stateProvider,
new TestSingleReleaseSpecProvider(Amsterdam.Instance),
balManager,
LimboLogs.Instance);

bool previousIsBlockProcessingThread = ProcessingThread.IsBlockProcessingThread;
ProcessingThread.IsBlockProcessingThread = isBlockProcessingThread;
try
{
TxReceipt[] receipts = executor.ProcessTransactions(
block,
ProcessingOptions.None,
new BlockReceiptsTracer(),
CancellationToken.None);
Assert.That(receipts, Has.Length.EqualTo(txCount));
}
finally
{
ProcessingThread.IsBlockProcessingThread = previousIsBlockProcessingThread;
}

Assert.That(transactionProcessor.ThreadIds.Count, Is.GreaterThan(1));
Assert.That(transactionProcessor.ObservedProcessingThreadFlags.Count, Is.EqualTo(txCount));
foreach (bool observedProcessingThreadFlag in transactionProcessor.ObservedProcessingThreadFlags)
{
Assert.That(observedProcessingThreadFlag, Is.EqualTo(isBlockProcessingThread));
}
}

[Test]
public void Parallel_validation_forwards_parallel_safe_block_tracer_to_worker_transactions()
{
Assume.That(Environment.ProcessorCount, Is.GreaterThan(1));

const int txCount = 64;
IWorldState stateProvider = TestWorldStateFactory.CreateForTest();
using IDisposable scope = stateProvider.BeginScope(IWorldState.PreGenesis);

Transaction[] transactions = new Transaction[txCount];
for (int i = 0; i < transactions.Length; i++)
{
transactions[i] = Build.A.Transaction
.WithNonce((UInt256)i)
.WithGasLimit(1)
.TestObject;
}

Block block = Build.A.Block
.WithNumber(1)
.WithGasLimit(txCount)
.WithTransactions(transactions)
.WithBlockAccessList(new BlockAccessList())
.TestObject;

using RecordingTransactionProcessorAdapter transactionProcessor = new(traceOperation: true);
IBlockAccessListManager balManager = Substitute.For<IBlockAccessListManager>();
balManager.Enabled.Returns(true);
balManager.ParallelExecutionEnabled.Returns(true);
balManager.GetTxProcessor(Arg.Any<int?>()).Returns(transactionProcessor);

IBlockProcessor.IBlockTransactionsExecutor inner = Substitute.For<IBlockProcessor.IBlockTransactionsExecutor>();
BlockProcessor.ParallelBlockValidationTransactionsExecutor executor = new(
inner,
stateProvider,
new TestSingleReleaseSpecProvider(Amsterdam.Instance),
balManager,
LimboLogs.Instance);
RecordingParallelSafeBlockTracer blockTracer = new();
BlockReceiptsTracer receiptsTracer = new();
receiptsTracer.SetOtherTracer(blockTracer);
receiptsTracer.StartNewBlockTrace(block);

TxReceipt[] receipts = executor.ProcessTransactions(
block,
ProcessingOptions.None,
receiptsTracer,
CancellationToken.None);
receiptsTracer.EndBlockTrace();

Assert.Multiple(() =>
{
Assert.That(receipts, Has.Length.EqualTo(txCount));
Assert.That(blockTracer.StartedTransactions, Is.EqualTo(txCount));
Assert.That(blockTracer.EndedTransactions, Is.EqualTo(txCount));
Assert.That(blockTracer.OpcodeCount, Is.EqualTo(txCount));
});
}

private static BlockAccessListManager CreateAmsterdamBalManager()
{
IWorldState stateProvider = TestWorldStateFactory.CreateForTest();
return new(
stateProvider,
new TestSingleReleaseSpecProvider(Amsterdam.Instance),
Substitute.For<IBlockhashProvider>(),
LimboLogs.Instance,
new BlocksConfig { ParallelExecution = true },
new WithdrawalProcessorFactory(LimboLogs.Instance));
}

private sealed class RecordingTransactionProcessorAdapter(bool traceOperation = false) : ITransactionProcessorAdapter, IDisposable
{
private readonly ManualResetEventSlim _parallelExecutionStarted = new();
private int _executedCount;

public ConcurrentBag<bool> ObservedProcessingThreadFlags { get; } = new();
public ConcurrentDictionary<int, byte> ThreadIds { get; } = new();

public TransactionResult Execute(Transaction transaction, ITxTracer txTracer)
{
ThreadIds.TryAdd(Environment.CurrentManagedThreadId, 0);
if (Interlocked.Increment(ref _executedCount) >= 2)
{
_parallelExecutionStarted.Set();
}

_parallelExecutionStarted.Wait(TimeSpan.FromSeconds(5));
ObservedProcessingThreadFlags.Add(ProcessingThread.IsBlockProcessingThread);
if (traceOperation && txTracer.IsTracingInstructions)
{
txTracer.StartOperation(0, Instruction.ADD, 0, null!);
}

transaction.BlockGasUsed = 1;
txTracer.MarkAsSuccess(Address.Zero, 1, [], []);

return TransactionResult.Ok;
}

public void SetBlockExecutionContext(in BlockExecutionContext blockExecutionContext)
{
}

public void Dispose() => _parallelExecutionStarted.Dispose();
}

private sealed class RecordingParallelSafeBlockTracer : IParallelSafeBlockTracer
{
private int _startedTransactions;
private int _endedTransactions;
private int _opcodeCount;

public int StartedTransactions => _startedTransactions;
public int EndedTransactions => _endedTransactions;
public int OpcodeCount => _opcodeCount;
public bool IsTracingRewards => false;

public void ReportReward(Address author, string rewardType, UInt256 rewardValue)
{
}

public void StartNewBlockTrace(Block block)
{
}

public ITxTracer StartNewTxTrace(Transaction? tx)
{
if (tx is null)
{
return NullTxTracer.Instance;
}

Interlocked.Increment(ref _startedTransactions);
return new RecordingInstructionTxTracer(this);
}

public void EndTxTrace() =>
Interlocked.Increment(ref _endedTransactions);

public void EndBlockTrace()
{
}

private void RecordOpcode() =>
Interlocked.Increment(ref _opcodeCount);

private sealed class RecordingInstructionTxTracer(RecordingParallelSafeBlockTracer blockTracer) : TxTracer
{
public override bool IsTracingInstructions => true;

public override void StartOperation(int pc, Instruction opcode, long gas, in ExecutionEnvironment env) =>
blockTracer.RecordOpcode();
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -235,6 +235,7 @@ public void ReportFees(UInt256 fees, UInt256 burntFees)
protected Transaction? CurrentTx;
public ReadOnlySpan<TxReceipt> TxReceipts => CollectionsMarshal.AsSpan(_txReceipts);
public TxReceipt LastReceipt => _txReceipts[^1];
public IBlockTracer OtherTracer => _otherTracer;

/// <summary>
/// EIP-8037: cumulative state gas for the last tracked tx.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -99,4 +99,45 @@ public IBlockTracer GetTracer() =>
1 => _childTracers[0],
_ => this
};

public IBlockTracer GetParallelSafeTracer()
{
List<IBlockTracer> parallelSafeTracers = [];
for (int index = 0; index < _childTracers.Count; index++)
{
IBlockTracer childTracer = _childTracers[index];
if (childTracer is IParallelSafeBlockTracer)
{
parallelSafeTracers.Add(childTracer);
}
else if (childTracer is CompositeBlockTracer compositeBlockTracer)
{
IBlockTracer parallelSafeTracer = compositeBlockTracer.GetParallelSafeTracer();
if (parallelSafeTracer != NullBlockTracer.Instance)
{
parallelSafeTracers.Add(parallelSafeTracer);
}
}
}

int parallelSafeTracerCount = parallelSafeTracers.Count;
if (parallelSafeTracerCount == 0)
{
return NullBlockTracer.Instance;
}

if (parallelSafeTracerCount == 1)
{
return parallelSafeTracers[0];
}

CompositeBlockTracer parallelSafeCompositeBlockTracer = new();
parallelSafeCompositeBlockTracer._childTracers.AddRange(parallelSafeTracers);
for (int index = 0; index < parallelSafeTracerCount; index++)
{
parallelSafeCompositeBlockTracer.IsTracingRewards |= parallelSafeTracers[index].IsTracingRewards;
}

return parallelSafeCompositeBlockTracer;
}
}
Loading
Loading