Skip to content
Original file line number Diff line number Diff line change
Expand Up @@ -14,43 +14,32 @@ namespace Nethermind.State.Flat.Sync.Snap;
/// <summary>
/// ISnapTrieFactory implementation for flat state storage.
/// Uses IPersistence to create reader/writeBatch per tree for proper resource management.
/// EnsureInitialize/FinalizeSync are driven by the snap-sync runner at start/end of the run,
/// so they don't need internal locking — they're never called concurrently with CreateXxxTree.
/// </summary>
public class FlatSnapTrieFactory(IPersistence persistence, ISyncConfig syncConfig, ILogManager logManager) : ISnapTrieFactory
{
private readonly ILogger _logger = logManager.GetClassLogger<FlatSnapTrieFactory>();
private readonly Lock _lock = new();

private bool _initialized = false;
public void EnsureInitialize()
{
if (_logger.IsInfo) _logger.Info("Clearing database");
persistence.Clear();
}

public void FinalizeSync() => persistence.Flush();

public ISnapTree<PathWithAccount> CreateStateTree()
{
EnsureDatabaseCleared();

IPersistence.IPersistenceReader reader = persistence.CreateReader(ReaderFlags.Sync);
IPersistence.IWriteBatch writeBatch = persistence.CreateWriteBatch(StateId.Sync, StateId.Sync, WriteFlags.DisableWAL);
return new FlatSnapStateTree(reader, writeBatch, syncConfig.EnableSnapDoubleWriteCheck, logManager);
}

public ISnapTree<PathWithStorageSlot> CreateStorageTree(in ValueHash256 accountPath)
{
EnsureDatabaseCleared();

IPersistence.IPersistenceReader reader = persistence.CreateReader(ReaderFlags.Sync);
IPersistence.IWriteBatch writeBatch = persistence.CreateWriteBatch(StateId.Sync, StateId.Sync, WriteFlags.DisableWAL);
return new FlatSnapStorageTree(reader, writeBatch, accountPath.ToCommitment(), syncConfig.EnableSnapDoubleWriteCheck, logManager);
}

private void EnsureDatabaseCleared()
{
if (_initialized) return;

using (_lock.EnterScope())
{
if (_initialized) return;
_initialized = true;

_logger.Info("Clearing database");
persistence.Clear();
}
}
}
23 changes: 21 additions & 2 deletions src/Nethermind/Nethermind.Synchronization.Test/E2ESyncTests.cs
Original file line number Diff line number Diff line change
Expand Up @@ -403,15 +403,31 @@ public async Task SnapSync()
if (dbMode == DbMode.Hash) Assert.Ignore("Hash db does not support snap sync");

using CancellationTokenSource cancellationTokenSource = new CancellationTokenSource().ThatCancelAfter(TestTimeout);
await RunSnapSyncOnce(cancellationTokenSource.Token);
}

// Stress reproducer for SnapSync Windows flake — run manually; see PR #11443 for context.
[Test, Explicit("Stress reproducer for SnapSync Windows flake — run manually")]
[TestCaseSource(nameof(StressIterations))]
public async Task SnapSync_StressRepro(int iteration)
{
if (dbMode != DbMode.Flat) Assert.Ignore("Stress repro only targets the Flat dbMode where the flake was observed");
_ = iteration; // index is purely to give NUnit a unique case per attempt

using CancellationTokenSource cancellationTokenSource = new CancellationTokenSource().ThatCancelAfter(TestTimeout);
await RunSnapSyncOnce(cancellationTokenSource.Token);
}

private async Task RunSnapSyncOnce(CancellationToken cancellationToken)
{
PrivateKey clientKey = TestItem.PrivateKeyD;
await using IContainer client = await CreateNode(clientKey, async (cfg, spec) =>
{
SyncConfig syncConfig = (SyncConfig)cfg.GetConfig<ISyncConfig>();
syncConfig.FastSync = true;
syncConfig.SnapSync = true;

await SetPivot(syncConfig, cancellationTokenSource.Token);
await SetPivot(syncConfig, cancellationToken);

INetworkConfig networkConfig = cfg.GetConfig<INetworkConfig>();
networkConfig.P2PPort = AllocatePort();
Expand All @@ -420,9 +436,12 @@ public async Task SnapSync()
networkConfig.FilterDiscoveryNodesByRecentIp = false;
});

await client.Resolve<SyncTestContext>().SyncFromServer(_server, cancellationTokenSource.Token);
await client.Resolve<SyncTestContext>().SyncFromServer(_server, cancellationToken);
}

private const int StressIterationCount = 30;
private static IEnumerable<int> StressIterations() => Enumerable.Range(0, StressIterationCount);

[Test]
[Retry(5)]
public async Task SnapSync_HalfPathServer_HashClient()
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,52 @@
// SPDX-FileCopyrightText: 2026 Demerzel Solutions Limited
// SPDX-License-Identifier: LGPL-3.0-only

using System;
using System.Collections.Generic;
using System.Threading;
using System.Threading.Tasks;
using FluentAssertions;
using Nethermind.Synchronization.SnapSync;
using NSubstitute;
using NUnit.Framework;

namespace Nethermind.Synchronization.Test.SnapSync;

[TestFixture]
public class SnapSyncRunnerTests
{
public enum DispatcherOutcome { Completes, Throws, Cancels }

[TestCase(DispatcherOutcome.Completes, null)]
[TestCase(DispatcherOutcome.Throws, typeof(InvalidOperationException))]
[TestCase(DispatcherOutcome.Cancels, typeof(OperationCanceledException))]
public async Task Run_invokes_lifecycle_in_order(DispatcherOutcome outcome, Type? expectedException)
{
List<string> calls = new();
ISnapTrieFactory factory = Substitute.For<ISnapTrieFactory>();
factory.When(f => f.EnsureInitialize()).Do(_ => calls.Add("EnsureInitialize"));
factory.When(f => f.FinalizeSync()).Do(_ => calls.Add("FinalizeSync"));

using CancellationTokenSource cts = new();
if (outcome == DispatcherOutcome.Cancels) cts.Cancel();

SnapSyncRunner runner = new(token =>
{
calls.Add("dispatcher");
return outcome switch
{
DispatcherOutcome.Throws => throw new InvalidOperationException("boom"),
DispatcherOutcome.Cancels => throw new OperationCanceledException(token),
_ => Task.CompletedTask,
};
}, factory);

Func<Task> act = () => runner.Run(cts.Token);
if (expectedException is null)
await act.Should().NotThrowAsync();
else
await act.Should().ThrowAsync<Exception>().Where(e => expectedException.IsInstanceOfType(e));

calls.Should().Equal("EnsureInitialize", "dispatcher", "FinalizeSync");
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,10 @@ namespace Nethermind.Synchronization.SnapSync;

public interface ISnapTrieFactory
{
// Called once at the start/end of a snap-sync run from SnapSyncRunner.Run — sequential, no concurrent invocations.
void EnsureInitialize() { }
void FinalizeSync() { }

ISnapTree<PathWithAccount> CreateStateTree();
ISnapTree<PathWithStorageSlot> CreateStorageTree(in ValueHash256 accountPath);
}
Original file line number Diff line number Diff line change
@@ -1,13 +1,29 @@
// SPDX-FileCopyrightText: 2026 Demerzel Solutions Limited
// SPDX-License-Identifier: LGPL-3.0-only

using System;
using System.Threading;
using System.Threading.Tasks;
using Nethermind.Synchronization.ParallelSync;

namespace Nethermind.Synchronization.SnapSync;

public class SnapSyncRunner(SimpleDispatcher<SnapSyncBatch> dispatcher) : ISnapSyncRunner
public class SnapSyncRunner(Func<CancellationToken, Task> runDispatcher, ISnapTrieFactory snapTrieFactory) : ISnapSyncRunner
{
public Task Run(CancellationToken token) => dispatcher.Run(token);
public SnapSyncRunner(SimpleDispatcher<SnapSyncBatch> dispatcher, ISnapTrieFactory snapTrieFactory)
: this(dispatcher.Run, snapTrieFactory) { }

public async Task Run(CancellationToken token)
{
snapTrieFactory.EnsureInitialize();
try
{
await runDispatcher(token);
}
finally
{
// Always finalize — FinalizeSync is idempotent and we want partial writes flushed on cancel/throw.
snapTrieFactory.FinalizeSync();
}
}
}
Loading