Skip to content

Commit 60e7e76

Browse files
asdacapclaudeLukaszRozmej
authored
refactor: simplify snap+state sync dispatcher (#11102)
* refactor: replace snap+state sync dispatcher with SimpleDispatcher - Introduce SimpleDispatcher that runs feeds sequentially (snap → state) replacing the old parallel SyncDispatcher<T> for snap+state sync. - Merge SyncMode.SnapSync into SyncMode.StateNodes; sequencing is now handled by StateSyncRunner rather than the mode selector. - Move isCloseToHead gate and DB tuning from mode selector / SyncDbTuner into StateSyncRunner. DB tune reset wrapped in try/finally so it runs on cancellation. - StateSyncRunner early-returns when FindBestFullState != 0 (state already present); Run wraps RunRound (reset pivot, run feed, verify trie) plus the production-only waits. - StateSyncFeed / SnapSyncFeed now implement ISimpleSyncFeed; return null signals round finished, dispatcher drains in-flight requests with CancellationToken.None so peer allocations are always freed. - StateSyncFeed/Downloader/AllocationStrategyFactory registered via DI and resolved by StateSyncRunner instead of being new'd per round. - TreeSync.ValidatePrepareRequest collapsed to bool IsSyncRoundFinished; the SyncMode parameter and _syncMode field are gone. - Remove IsSnapGetRangesFinished from ISyncProgressResolver. - Test harness SafeContext.RunFeed now calls IStateSyncRunner.RunRound; feed-level tests resolve StateSyncFeed / StateSyncDownloader via DI. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com> * feat: yield state sync between rounds to MultiSyncModeSelector Restore the old "pause when mode leaves StateNodes" behaviour at the round boundary: StateSyncRunner.RunRound now awaits syncModeSelector.WaitUntilMode(StateNodes) at the top of each iteration, so beacon control / UpdatingPivot / fast-sync re-entry can claim peers between rounds. Pivot changes mid-round are still handled by the feed returning null via IsSyncRoundFinished. Test infra substitutes ISyncModeSelector with one that reports StateNodes immediately — the tests drive RunRound directly and shouldn't depend on the real MultiSyncModeSelector settling. Mid-round yielding (within a single RunFeed call) is not addressed here; the feed's PrepareRequest doesn't consult the mode selector to avoid complicating the test harness. Cost is bounded: pivot moves still end the round naturally, and all currently-observed mode transitions that matter (beacon reorg, pivot updates) also move the pivot. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com> * refactor: make SimpleDispatcher<T> generic, bind feed/downloader at construction Push T to the class level and move feed, downloader, allocation strategy factory, and AllocationContexts to the constructor — each dispatcher is now bound to the single feed it drives, and its entry point is Run(CancellationToken) rather than a generic RunFeed<T>(...). Register SimpleDispatcher<StateSyncBatch?> and SimpleDispatcher<SnapSyncBatch?> as singletons via factory delegates in the SynchronizerModule (feed + downloader + strategy factory are also DI-resolved). State/SnapSyncRunner now inject the dispatcher instead of constructing it per call. Also fix the stale class summary (no more WaitForMode) and the stale cref on ISimpleSyncFeed. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com> * refactor: register and resolve sync feed deps via interfaces Register StateSyncFeed / SnapSyncFeed as ISimpleSyncFeed<T>, downloaders as ISyncDownloader<T>, strategy factories as IPeerAllocationStrategyFactory<T>, and resolve the same interfaces from the SimpleDispatcher factory. Tests that directly touched the feed likewise resolve ISimpleSyncFeed<StateSyncBatch> and ISyncDownloader<StateSyncBatch> instead of the concrete types. Normalize the generic parameter to the non-nullable batch type (StateSyncBatch / SnapSyncBatch) and add `where T : class` to ISimpleSyncFeed<T> / SimpleDispatcher<T> so the interface and implementation generics line up (previously ISimpleSyncFeed<StateSyncBatch?> mismatched ISyncDownloader<StateSyncBatch>, forcing concrete resolution). Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com> * fix: remove unused StateSync using in StateSyncFeedTests Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com> * fix: address claude review findings on snap/state sync dispatcher - SnapSyncFeed: catch OperationCanceledException before general Exception so clean shutdown doesn't log ERROR - SimpleDispatcher: remove token from Task.Run to prevent semaphore leak if cancellation fires between WaitAsync and task scheduling - StateSyncRunner: replace LINQ with foreach in WaitForCloseToHead - SafeContext: add Feed property so tests use ctx.Feed instead of container.Resolve<ISimpleSyncFeed<StateSyncBatch>> * fix: swallow OCE on shutdown; retry transient SnapSyncFeed errors - StateSyncRunner.Run wraps the body in try/catch (OperationCanceledException) when token.IsCancellationRequested, so clean shutdown no longer propagates OCE to Synchronizer.StartSnapAndStateSyncComponents' ContinueWith handler and logs "State sync failed" at ERROR. - SnapSyncFeed.PrepareRequest moves try/catch inside the while loop with a dedicated catch (OperationCanceledException) returning null, matching StateSyncFeed's shape. Transient exceptions from _snapProvider.IsFinished() now retry on the next iteration instead of permanently aborting snap sync by signalling done. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com> * refactor: rename RunRound -> RunStateSyncRounds; extract StateSyncPrecursorWait - Rename `IStateSyncRunner.RunRound` to `RunStateSyncRounds` (multiple rounds, plural). - Combine the WaitUntilMode(StateNodes) and WaitForCloseToHead calls into a single private method `StateSyncPrecursorWait` and use it both at the top of `Run` and at the top of every `RunStateSyncRounds` iteration. Yields between rounds when MSMS leaves StateNodes or the node drifts away from head. - Test harness: also substitute ISyncProgressResolver and IBeaconSyncStrategy so the close-to-head check returns immediately (default test peers have HeadNumber=0 which would never satisfy the real predicate). Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com> * refactor: drop unused ITreeSync interface and SyncCompleted event The SyncCompleted event was fired by TreeSync but no longer had any subscribers after VerifyStateOnStateSyncFinished was deleted and the verify-trie call moved into StateSyncRunner.RunStateSyncRounds. ITreeSync existed only to expose this event, so remove the whole surface. - Delete ITreeSync.cs and SyncCompletedEventArgs. - Drop the event field and its invocation in TreeSync.VerifyPostSyncCleanUp. - DI: AddSingleton<ITreeSync, TreeSync>() -> AddSingleton<TreeSync>(). - Drop the now-unused Substitute.For<ITreeSync>() in SynchronizerModuleTests. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com> * fix(snap-sync): drive FlatSnapTrieFactory init/finalize from SnapSyncRunner.Run (#11472) * test(snap-sync): add Windows-flake stress reproducer Adds a 30-iteration stress reproducer for the Windows-only E2ESyncTests.SnapSync flake observed in CI (run #25139586200, job #73686059011 on PR #11422). Each iteration is its own NUnit case so a single failure does not terminate the rest, no [Retry] so every flake surfaces, and [Explicit] so it does not run in normal CI. Scoped to DbMode.Flat where the flake has been observed. Local reproduction on Windows (~1m runtime, 60 iterations across the two Flat fixtures): 9-19 failures with both observed CI patterns — "Verification failed: ... N missing in flat" and "TrieNodeException: Failed to load key X from root hash Y" during post-snap-sync block processing. Run: dotnet test --project Nethermind.Synchronization.Test \ -c release -- --filter "FullyQualifiedName~SnapSync_StressRepro" Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com> * fix(snap-sync): drive FlatSnapTrieFactory init/finalize from SnapSyncRunner.Run Replaces the lazy-init lock + `_initialized` flag in `FlatSnapTrieFactory` (which raced against concurrent `CreateStateTree`/`CreateStorageTree` calls and caused `Clear()` to wipe freshly-written accounts) with explicit sequential lifecycle hooks driven by `SnapSyncRunner.Run`: - `ISnapTrieFactory.EnsureInitialize()` (default no-op) — called once before any tree is created. `FlatSnapTrieFactory` clears the flat DB here. - `ISnapTrieFactory.FinalizeSync()` (default no-op) — called once after the dispatcher has finished. `FlatSnapTrieFactory` flushes the persistence here. - `SnapSyncRunner.Run` brackets `dispatcher.Run` with both calls. Because both halves run from a single sequential entry point — and the new `SimpleDispatcher.Run` only returns once every batch handler (and therefore every `ISnapTree.Dispose`) has completed — the implementation needs no internal locking and the `_initialized` flag goes away. This is the in-tree equivalent of the master-branch fix in #11443; the runner-driven shape suggested by @asdacap is cleaner because it doesn't depend on the factory guessing when "the first call" happens. `PatriciaSnapTrieFactory` and the test factories pick up the empty default implementations of both new methods. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com> * test(snap-sync): extract RunSnapSyncOnce shared helper `SnapSync` and `SnapSync_StressRepro` had verbatim-copied bodies; extract a private `RunSnapSyncOnce(CancellationToken)` helper and have both call it. Trim the multi-line stress-repro comment to a single line per coding-style. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com> * test(snap-sync): SnapSyncRunner lifecycle ordering test + cancel/throw cases Adds an internal-visible Func<CancellationToken, Task> ctor on SnapSyncRunner so the lifecycle can be exercised without standing up a real SimpleDispatcher, and three NSubstitute-based tests: - happy path: EnsureInitialize → dispatcher → FinalizeSync - dispatcher throws: FinalizeSync still runs - dispatcher cancelled: FinalizeSync still runs Trim the doc comments on the new ISnapTrieFactory hooks to a single line per the repo's coding-style rule. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com> * refactor(snap-sync): make SnapSyncRunner Func ctor public The Func<CancellationToken, Task> overload was internal so the test could reach it via InternalsVisibleTo. Make it public — it's a legitimate way to construct the runner, the surface is small, and DI still resolves the SimpleDispatcher overload because Func<CancellationToken, Task> isn't a registered service. Promotes the public ctor pattern to use a primary constructor (matches the rest of the codebase) and drops the assembly-level InternalsVisibleTo that was added solely for this class. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com> * test(snap-sync): collapse SnapSyncRunner ordering tests into a single TestCase Three near-identical tests (happy / throws / cancels) folded into one parameterized method with a DispatcherOutcome enum. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com> * test(snap-sync): switch expression for SnapSyncRunner dispatcher outcome Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com> --------- Co-authored-by: Claude Opus 4.7 (1M context) <noreply@anthropic.com> --------- Co-authored-by: Claude Opus 4.7 (1M context) <noreply@anthropic.com> Co-authored-by: Lukasz Rozmej <lukasz.rozmej@gmail.com>
1 parent 484537f commit 60e7e76

39 files changed

Lines changed: 664 additions & 502 deletions

src/Nethermind/Nethermind.Merge.Plugin.Test/EngineModuleTests.Synchronization.cs

Lines changed: 1 addition & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -25,7 +25,6 @@
2525
using Nethermind.Synchronization.ParallelSync;
2626
using Nethermind.Synchronization.Peers;
2727
using Nethermind.Synchronization.Peers.AllocationStrategies;
28-
using Nethermind.Synchronization.SnapSync;
2928
using NSubstitute;
3029
using NSubstitute.ExceptionExtensions;
3130
using NUnit.Framework;
@@ -1052,8 +1051,7 @@ private MultiSyncModeSelector CreateMultiSyncModeSelector(MergeTestBlockchain ch
10521051
new SyncConfig(),
10531052
Substitute.For<ISyncFeed<HeadersSyncBatch?>>(),
10541053
Substitute.For<ISyncFeed<BodiesSyncBatch?>>(),
1055-
Substitute.For<ISyncFeed<ReceiptsSyncBatch?>>(),
1056-
Substitute.For<ISyncFeed<SnapSyncBatch?>>());
1054+
Substitute.For<ISyncFeed<ReceiptsSyncBatch?>>());
10571055

10581056
MultiSyncModeSelector multiSyncModeSelector = new(syncProgressResolver,
10591057
syncPeerPool, new SyncConfig(), No.BeaconSync,

src/Nethermind/Nethermind.Network.Test/SnapCapabilitySwitcherTests.cs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -21,7 +21,7 @@ public void Dispose_ShouldUnsubscribeFromSyncModeChanges()
2121

2222
snapCapabilitySwitcher.EnableSnapCapabilityUntilSynced();
2323
snapCapabilitySwitcher.Dispose();
24-
syncModeSelector.Changed += Raise.EventWith(this, new SyncModeChangedEventArgs(SyncMode.SnapSync, SyncMode.Full));
24+
syncModeSelector.Changed += Raise.EventWith(this, new SyncModeChangedEventArgs(SyncMode.StateNodes, SyncMode.Full));
2525

2626
protocolsManager.Received(1).RemoveSupportedCapability(new Capability(Protocol.Snap, 1));
2727
}

src/Nethermind/Nethermind.State.Flat/Sync/Snap/FlatSnapTrieFactory.cs

Lines changed: 9 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -14,43 +14,32 @@ namespace Nethermind.State.Flat.Sync.Snap;
1414
/// <summary>
1515
/// ISnapTrieFactory implementation for flat state storage.
1616
/// Uses IPersistence to create reader/writeBatch per tree for proper resource management.
17+
/// EnsureInitialize/FinalizeSync are driven by the snap-sync runner at start/end of the run,
18+
/// so they don't need internal locking — they're never called concurrently with CreateXxxTree.
1719
/// </summary>
1820
public class FlatSnapTrieFactory(IPersistence persistence, ISyncConfig syncConfig, ILogManager logManager) : ISnapTrieFactory
1921
{
2022
private readonly ILogger _logger = logManager.GetClassLogger<FlatSnapTrieFactory>();
21-
private readonly Lock _lock = new();
2223

23-
private bool _initialized = false;
24+
public void EnsureInitialize()
25+
{
26+
if (_logger.IsInfo) _logger.Info("Clearing database");
27+
persistence.Clear();
28+
}
29+
30+
public void FinalizeSync() => persistence.Flush();
2431

2532
public ISnapTree<PathWithAccount> CreateStateTree()
2633
{
27-
EnsureDatabaseCleared();
28-
2934
IPersistence.IPersistenceReader reader = persistence.CreateReader(ReaderFlags.Sync);
3035
IPersistence.IWriteBatch writeBatch = persistence.CreateWriteBatch(StateId.Sync, StateId.Sync, WriteFlags.DisableWAL);
3136
return new FlatSnapStateTree(reader, writeBatch, syncConfig.EnableSnapDoubleWriteCheck, logManager);
3237
}
3338

3439
public ISnapTree<PathWithStorageSlot> CreateStorageTree(in ValueHash256 accountPath)
3540
{
36-
EnsureDatabaseCleared();
37-
3841
IPersistence.IPersistenceReader reader = persistence.CreateReader(ReaderFlags.Sync);
3942
IPersistence.IWriteBatch writeBatch = persistence.CreateWriteBatch(StateId.Sync, StateId.Sync, WriteFlags.DisableWAL);
4043
return new FlatSnapStorageTree(reader, writeBatch, accountPath.ToCommitment(), syncConfig.EnableSnapDoubleWriteCheck, logManager);
4144
}
42-
43-
private void EnsureDatabaseCleared()
44-
{
45-
if (_initialized) return;
46-
47-
using (_lock.EnterScope())
48-
{
49-
if (_initialized) return;
50-
_initialized = true;
51-
52-
_logger.Info("Clearing database");
53-
persistence.Clear();
54-
}
55-
}
5645
}

src/Nethermind/Nethermind.Synchronization.Test/DbTuner/SyncDbTunerTests.cs

Lines changed: 2 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,6 @@
66
using Nethermind.Synchronization.DbTuner;
77
using Nethermind.Synchronization.FastBlocks;
88
using Nethermind.Synchronization.ParallelSync;
9-
using Nethermind.Synchronization.SnapSync;
109
using NSubstitute;
1110
using NUnit.Framework;
1211

@@ -17,11 +16,8 @@ public class SyncDbTunerTests
1716
private ITunableDb.TuneType _tuneType = ITunableDb.TuneType.HeavyWrite;
1817
private readonly ITunableDb.TuneType _blocksTuneType = ITunableDb.TuneType.AggressiveHeavyWrite;
1918
private SyncConfig _syncConfig = null!;
20-
private ISyncFeed<SnapSyncBatch> _snapSyncFeed = null!;
2119
private ISyncFeed<BodiesSyncBatch> _bodiesSyncFeed = null!;
2220
private ISyncFeed<ReceiptsSyncBatch> _receiptSyncFeed = null!;
23-
private ITunableDb _stateDb = null!;
24-
private ITunableDb _codeDb = null!;
2521
private ITunableDb _blockDb = null!;
2622
private ITunableDb _receiptDb = null!;
2723

@@ -34,40 +30,24 @@ public void Setup()
3430
TuneDbMode = _tuneType,
3531
BlocksDbTuneDbMode = _blocksTuneType,
3632
};
37-
_snapSyncFeed = Substitute.For<ISyncFeed<SnapSyncBatch>>();
3833
_bodiesSyncFeed = Substitute.For<ISyncFeed<BodiesSyncBatch>>();
3934
_receiptSyncFeed = Substitute.For<ISyncFeed<ReceiptsSyncBatch>>();
40-
_stateDb = Substitute.For<ITunableDb>();
41-
_codeDb = Substitute.For<ITunableDb>();
4235
_blockDb = Substitute.For<ITunableDb>();
4336
_receiptDb = Substitute.For<ITunableDb>();
4437

4538
SyncDbTuner _ = new(
4639
_syncConfig,
47-
_snapSyncFeed,
4840
_bodiesSyncFeed,
4941
_receiptSyncFeed,
50-
_stateDb,
51-
_codeDb,
5242
_blockDb,
5343
_receiptDb);
5444
}
5545

5646
[Test]
57-
public void WhenSnapIsOn_TriggerStateDbTune() =>
58-
TestFeedAndDbTune(_snapSyncFeed, _stateDb);
47+
public void WhenBodiesIsOn_TriggerBlocksDbTune() => TestFeedAndDbTune(_bodiesSyncFeed, _blockDb, _blocksTuneType);
5948

6049
[Test]
61-
public void WhenSnapIsOn_TriggerCodeDbTune() =>
62-
TestFeedAndDbTune(_snapSyncFeed, _codeDb);
63-
64-
[Test]
65-
public void WhenBodiesIsOn_TriggerBlocksDbTune() =>
66-
TestFeedAndDbTune(_bodiesSyncFeed, _blockDb, _blocksTuneType);
67-
68-
[Test]
69-
public void WhenReceiptsIsOn_TriggerReceiptsDbTune() =>
70-
TestFeedAndDbTune(_receiptSyncFeed, _receiptDb);
50+
public void WhenReceiptsIsOn_TriggerReceiptsDbTune() => TestFeedAndDbTune(_receiptSyncFeed, _receiptDb);
7151

7252
private void TestFeedAndDbTune<T>(ISyncFeed<T> feed, ITunableDb db, ITunableDb.TuneType? tuneType = null)
7353
{

src/Nethermind/Nethermind.Synchronization.Test/E2ESyncTests.cs

Lines changed: 21 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -408,15 +408,31 @@ public async Task SnapSync()
408408
if (dbMode == DbMode.Hash) Assert.Ignore("Hash db does not support snap sync");
409409

410410
using CancellationTokenSource cancellationTokenSource = new CancellationTokenSource().ThatCancelAfter(TestTimeout);
411+
await RunSnapSyncOnce(cancellationTokenSource.Token);
412+
}
413+
414+
// Stress reproducer for SnapSync Windows flake — run manually; see PR #11443 for context.
415+
[Test, Explicit("Stress reproducer for SnapSync Windows flake — run manually")]
416+
[TestCaseSource(nameof(StressIterations))]
417+
public async Task SnapSync_StressRepro(int iteration)
418+
{
419+
if (dbMode != DbMode.Flat) Assert.Ignore("Stress repro only targets the Flat dbMode where the flake was observed");
420+
_ = iteration; // index is purely to give NUnit a unique case per attempt
421+
422+
using CancellationTokenSource cancellationTokenSource = new CancellationTokenSource().ThatCancelAfter(TestTimeout);
423+
await RunSnapSyncOnce(cancellationTokenSource.Token);
424+
}
411425

426+
private async Task RunSnapSyncOnce(CancellationToken cancellationToken)
427+
{
412428
PrivateKey clientKey = TestItem.PrivateKeyD;
413429
await using IContainer client = await CreateNode(clientKey, async (cfg, spec) =>
414430
{
415431
SyncConfig syncConfig = (SyncConfig)cfg.GetConfig<ISyncConfig>();
416432
syncConfig.FastSync = true;
417433
syncConfig.SnapSync = true;
418434

419-
await SetPivot(syncConfig, cancellationTokenSource.Token);
435+
await SetPivot(syncConfig, cancellationToken);
420436

421437
INetworkConfig networkConfig = cfg.GetConfig<INetworkConfig>();
422438
networkConfig.P2PPort = AllocatePort();
@@ -425,9 +441,12 @@ public async Task SnapSync()
425441
networkConfig.FilterDiscoveryNodesByRecentIp = false;
426442
});
427443

428-
await client.Resolve<SyncTestContext>().SyncFromServer(_server, cancellationTokenSource.Token);
444+
await client.Resolve<SyncTestContext>().SyncFromServer(_server, cancellationToken);
429445
}
430446

447+
private const int StressIterationCount = 30;
448+
private static IEnumerable<int> StressIterations() => Enumerable.Range(0, StressIterationCount);
449+
431450
[Test]
432451
[Retry(5)]
433452
public async Task SnapSync_HalfPathServer_HashClient()
Lines changed: 12 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -1,33 +1,37 @@
11
// SPDX-FileCopyrightText: 2022 Demerzel Solutions Limited
22
// SPDX-License-Identifier: LGPL-3.0-only
33

4+
using System;
45
using System.Threading;
56
using System.Threading.Tasks;
67
using Nethermind.Blockchain.Synchronization;
7-
using Nethermind.Logging;
88
using Nethermind.Synchronization.FastSync;
99
using Nethermind.Synchronization.ParallelSync;
1010
using Nethermind.Synchronization.Peers;
11+
using Nethermind.Synchronization.StateSync;
1112

1213
namespace Nethermind.Synchronization.Test.FastSync.SnapProtocolTests
1314
{
1415
public class StateSyncDispatcherTester(
15-
ISyncFeed<StateSyncBatch> syncFeed,
1616
ISyncDownloader<StateSyncBatch> downloader,
17-
ISyncPeerPool syncPeerPool,
18-
IPeerAllocationStrategyFactory<StateSyncBatch> peerAllocationStrategy,
19-
ILogManager logManager) : SyncDispatcher<StateSyncBatch>(new SyncConfig() { SyncDispatcherEmptyRequestDelayMs = 1, SyncDispatcherAllocateTimeoutMs = 1 }, syncFeed, downloader, syncPeerPool, peerAllocationStrategy, logManager)
17+
ISyncPeerPool syncPeerPool) : IAsyncDisposable
2018
{
21-
private readonly ISyncDownloader<StateSyncBatch> _downloader = downloader;
19+
private readonly ISyncConfig _syncConfig = new SyncConfig() { SyncDispatcherEmptyRequestDelayMs = 1, SyncDispatcherAllocateTimeoutMs = 1 };
2220

2321
public async Task ExecuteDispatch(StateSyncBatch batch, int times)
2422
{
25-
SyncPeerAllocation allocation = await Allocate(batch, default);
23+
StateSyncAllocationStrategyFactory strategyFactory = new();
24+
SyncPeerAllocation allocation = await syncPeerPool.Allocate(
25+
strategyFactory.Create(batch), AllocationContexts.State, _syncConfig.SyncDispatcherAllocateTimeoutMs);
2626

2727
for (int i = 0; i < times; i++)
2828
{
29-
await _downloader.Dispatch(allocation.Current!, batch, CancellationToken.None);
29+
await downloader.Dispatch(allocation.Current!, batch, CancellationToken.None);
3030
}
31+
32+
syncPeerPool.Free(allocation);
3133
}
34+
35+
public ValueTask DisposeAsync() => ValueTask.CompletedTask;
3236
}
3337
}

src/Nethermind/Nethermind.Synchronization.Test/FastSync/SnapProtocolTests/StateSyncDispatcherTests.cs

Lines changed: 1 addition & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -9,7 +9,6 @@
99
using Nethermind.Logging;
1010
using Nethermind.Stats;
1111
using Nethermind.Synchronization.FastSync;
12-
using Nethermind.Synchronization.ParallelSync;
1312
using Nethermind.Synchronization.Peers;
1413
using Nethermind.Synchronization.StateSync;
1514
using NSubstitute;
@@ -18,7 +17,6 @@
1817
using Nethermind.Core.Crypto;
1918
using System.Net;
2019
using FluentAssertions;
21-
using Nethermind.Consensus;
2220
using Nethermind.Core.Test;
2321
using Nethermind.Trie;
2422
using Nethermind.Core.Collections;
@@ -52,11 +50,8 @@ public void Setup()
5250
_pool = new SyncPeerPool(blockTree, new NodeStatsManager(timerFactory, LimboLogs.Instance), new TotalDifficultyBetterPeerStrategy(LimboLogs.Instance), LimboLogs.Instance, 25);
5351
_pool.Start();
5452

55-
ISyncFeed<StateSyncBatch>? feed = Substitute.For<ISyncFeed<StateSyncBatch>>();
56-
IPoSSwitcher poSSwitcher = Substitute.For<IPoSSwitcher>();
57-
poSSwitcher.TransitionFinished.Returns(false);
5853
_dispatcher =
59-
new StateSyncDispatcherTester(feed, new StateSyncDownloader(_logManager), _pool, new StateSyncAllocationStrategyFactory(), _logManager);
54+
new StateSyncDispatcherTester(new StateSyncDownloader(_logManager), _pool);
6055
}
6156

6257
[TearDown]

0 commit comments

Comments
 (0)