Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
27 changes: 26 additions & 1 deletion src/Nethermind/Nethermind.Db/SnapshotableMemColumnsDb.cs
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@

using System;
using System.Collections.Generic;
using System.Threading;
using Nethermind.Core;

namespace Nethermind.Db
Expand All @@ -16,6 +17,9 @@ public class SnapshotableMemColumnsDb<TKey> : IColumnsDb<TKey> where TKey : stru
private readonly Dictionary<TKey, SnapshotableMemDb> _columnDbs = new();
private readonly bool _neverPrune;

// Cross-column atomicity guard so CreateSnapshot can't observe a half-applied multi-column writeBatch dispose. RocksDB has this for free; this matches it.
private readonly Lock _atomicityLock = new();

private SnapshotableMemColumnsDb(TKey[] keys, bool neverPrune)
{
_neverPrune = neverPrune;
Expand Down Expand Up @@ -55,10 +59,11 @@ public IDb GetColumnDb(TKey key)

public IReadOnlyColumnDb<TKey> CreateReadOnly(bool createInMemWriteStore) => new ReadOnlyColumnsDb<TKey>(this, createInMemWriteStore);

public IColumnsWriteBatch<TKey> StartWriteBatch() => new InMemoryColumnWriteBatch<TKey>(this);
public IColumnsWriteBatch<TKey> StartWriteBatch() => new AtomicColumnsWriteBatch(this);

public IColumnDbSnapshot<TKey> CreateSnapshot()
{
using Lock.Scope _ = _atomicityLock.EnterScope();
Dictionary<TKey, IKeyValueStoreSnapshot> snapshots = new();
foreach (KeyValuePair<TKey, SnapshotableMemDb> kvp in _columnDbs)
{
Expand All @@ -67,6 +72,26 @@ public IColumnDbSnapshot<TKey> CreateSnapshot()
return new ColumnSnapshot(snapshots);
}

/// <summary>
/// Wraps <see cref="InMemoryColumnWriteBatch{TKey}"/> so the per-column commit phase
/// happens under the columns DB's write lock, making the multi-column commit atomic
/// w.r.t. <see cref="CreateSnapshot"/>.
/// </summary>
private sealed class AtomicColumnsWriteBatch(SnapshotableMemColumnsDb<TKey> db) : IColumnsWriteBatch<TKey>
{
private readonly InMemoryColumnWriteBatch<TKey> _inner = new(db);

public IWriteBatch GetColumnBatch(TKey key) => _inner.GetColumnBatch(key);

public void Clear() => _inner.Clear();

public void Dispose()
{
using Lock.Scope _ = db._atomicityLock.EnterScope();
_inner.Dispose();
}
}

public void Dispose()
{
foreach (SnapshotableMemDb db in _columnDbs.Values)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,10 +7,12 @@
using Nethermind.Core.Extensions;
using Nethermind.Core.Test.Builders;
using Nethermind.Db;
using Nethermind.Blockchain.Synchronization;
using Nethermind.Int256;
using Nethermind.Logging;
using Nethermind.State.Flat.Persistence;
using Nethermind.State.Flat.Sync;
using Nethermind.State.Flat.Sync.Snap;
using NSubstitute;
using NUnit.Framework;

Expand Down Expand Up @@ -72,7 +74,8 @@ public void EnsureStorageEmpty_deletes_all_storage_entries()

Assert.That(HasStorageEntries(address), Is.True, "Storage entries should exist before cleanup");

FlatTreeSyncStore store = new(_persistence, Substitute.For<IPersistenceManager>(), LimboLogs.Instance);
FlatSnapTrieFactory snapTrieFactory = new(_persistence, Substitute.For<ISyncConfig>(), LimboLogs.Instance);
FlatTreeSyncStore store = new(_persistence, Substitute.For<IPersistenceManager>(), snapTrieFactory, LimboLogs.Instance);
store.EnsureStorageEmpty(Keccak.Compute(address.Bytes));

Assert.That(HasStorageEntries(address), Is.False, "Storage entries should be deleted after EnsureStorageEmpty");
Expand Down
12 changes: 11 additions & 1 deletion src/Nethermind/Nethermind.State.Flat/Sync/FlatTreeSyncStore.cs
Original file line number Diff line number Diff line change
Expand Up @@ -9,13 +9,14 @@
using Nethermind.Serialization.Rlp;
using Nethermind.State.Flat.Persistence;
using Nethermind.State.Flat.ScopeProvider;
using Nethermind.State.Flat.Sync.Snap;
using Nethermind.Synchronization.FastSync;
using Nethermind.Trie;
using Nethermind.Trie.Pruning;

namespace Nethermind.State.Flat.Sync;

public class FlatTreeSyncStore(IPersistence persistence, IPersistenceManager persistenceManager, ILogManager logManager) : ITreeSyncStore
public class FlatTreeSyncStore(IPersistence persistence, IPersistenceManager persistenceManager, FlatSnapTrieFactory snapTrieFactory, ILogManager logManager) : ITreeSyncStore
{
// For flat, one cannot continue syncing after finalization as it will corrupt existing state.
private bool _wasFinalized = false;
Expand Down Expand Up @@ -236,6 +237,15 @@ public void FinalizeSync(BlockHeader pivotHeader)
{
if (Interlocked.CompareExchange(ref _wasFinalized, true, false)) throw new InvalidOperationException("Db was finalized");

// Wait for any in-flight ISnapTree disposals to complete. Phase-1 / phase-2 trees
// hold per-tree IWriteBatch instances; until each tree's Dispose returns, its writes
// aren't yet committed to the underlying persistence. Without this drain a caller that
// observes "sync finished" can read state that's missing accounts from late-disposed
// trees (manifests as E2ESyncTests.SnapSync "Verification failed: N missing in flat").
// WaitForInFlightTreesDrained logs its own warning on timeout; we still proceed so a
// stuck tree doesn't permanently block FinalizeSync.
snapTrieFactory.WaitForInFlightTreesDrained(TimeSpan.FromSeconds(30));

using IPersistence.IPersistenceReader reader = persistence.CreateReader(ReaderFlags.Sync);
StateId from = reader.CurrentState;
StateId to = new(pivotHeader);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,13 +24,15 @@ public class FlatSnapStateTree : ISnapTree<PathWithAccount>
private readonly bool _enableDoubleWriteCheck;
private readonly SnapUpperBoundAdapter _adapter;
private readonly StateTree _tree;
private readonly FlatSnapTrieFactory? _ownerFactory;
private IReadOnlyList<PathWithAccount>? _pendingEntries;

public FlatSnapStateTree(IPersistence.IPersistenceReader reader, IPersistence.IWriteBatch writeBatch, bool enableDoubleWriteCheck, ILogManager logManager)
public FlatSnapStateTree(IPersistence.IPersistenceReader reader, IPersistence.IWriteBatch writeBatch, bool enableDoubleWriteCheck, ILogManager logManager, FlatSnapTrieFactory? ownerFactory = null)
{
_reader = reader;
_writeBatch = writeBatch;
_enableDoubleWriteCheck = enableDoubleWriteCheck;
_ownerFactory = ownerFactory;
_adapter = new SnapUpperBoundAdapter(new PersistenceTrieStoreAdapter(reader, writeBatch, enableDoubleWriteCheck));
_tree = new StateTree(_adapter, logManager);
}
Expand Down Expand Up @@ -74,8 +76,15 @@ public void Commit(ValueHash256 upperBound)

public void Dispose()
{
_writeBatch.Dispose();
_reader.Dispose();
try
{
_writeBatch.Dispose();
_reader.Dispose();
}
finally
{
_ownerFactory?.OnTreeDisposed();
}
}

/// <summary>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,14 +26,16 @@ public class FlatSnapStorageTree : ISnapTree<PathWithStorageSlot>
private readonly StorageTree _tree;
private readonly Hash256 _addressHash;
private readonly SnapUpperBoundAdapter _adapter;
private readonly FlatSnapTrieFactory? _ownerFactory;
private IReadOnlyList<PathWithStorageSlot>? _pendingEntries;

public FlatSnapStorageTree(IPersistence.IPersistenceReader reader, IPersistence.IWriteBatch writeBatch, Hash256 addressHash, bool enableDoubleWriteCheck, ILogManager logManager)
public FlatSnapStorageTree(IPersistence.IPersistenceReader reader, IPersistence.IWriteBatch writeBatch, Hash256 addressHash, bool enableDoubleWriteCheck, ILogManager logManager, FlatSnapTrieFactory? ownerFactory = null)
{
_reader = reader;
_writeBatch = writeBatch;
_enableDoubleWriteCheck = enableDoubleWriteCheck;
_addressHash = addressHash;
_ownerFactory = ownerFactory;
_adapter = new SnapUpperBoundAdapter(new PersistenceStorageTrieStoreAdapter(reader, writeBatch, addressHash, enableDoubleWriteCheck));
_tree = new StorageTree(_adapter, logManager);
}
Expand Down Expand Up @@ -82,8 +84,15 @@ public void Commit(ValueHash256 upperBound)

public void Dispose()
{
_writeBatch.Dispose();
_reader.Dispose();
try
{
_writeBatch.Dispose();
_reader.Dispose();
}
finally
{
_ownerFactory?.OnTreeDisposed();
}
}

/// <summary>
Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
// SPDX-FileCopyrightText: 2025 Demerzel Solutions Limited
// SPDX-License-Identifier: LGPL-3.0-only

using System;

Check warning on line 4 in src/Nethermind/Nethermind.State.Flat/Sync/Snap/FlatSnapTrieFactory.cs

View workflow job for this annotation

GitHub Actions / Check code lint

Using directive is unnecessary. (https://learn.microsoft.com/dotnet/fundamentals/code-analysis/style-rules/ide0005) [/home/runner/work/nethermind/nethermind/src/Nethermind/Nethermind.State.Flat/Nethermind.State.Flat.csproj]
using System.Diagnostics;
using Nethermind.Blockchain.Synchronization;
using Nethermind.Core;
using Nethermind.Core.Crypto;
Expand All @@ -14,30 +16,93 @@
/// <summary>
/// ISnapTrieFactory implementation for flat state storage.
/// Uses IPersistence to create reader/writeBatch per tree for proper resource management.
/// Tracks in-flight trees so callers can wait for all writeBatches to drain before relying
/// on the persisted state being complete.
/// </summary>
public class FlatSnapTrieFactory(IPersistence persistence, ISyncConfig syncConfig, ILogManager logManager) : ISnapTrieFactory
{
private readonly ILogger _logger = logManager.GetClassLogger<FlatSnapTrieFactory>();
private readonly Lock _lock = new();

private bool _initialized = false;
private volatile bool _initialized;

// Tracks ISnapTree instances created via this factory that haven't been disposed yet.
// The dispose path commits the per-tree IWriteBatch; until that completes, the tree's
// accumulated writes aren't yet visible to subsequent IPersistence readers. Callers
// that need a consistent post-sync view (e.g. FlatTreeSyncStore.FinalizeSync) drain via
// <see cref="WaitForInFlightTreesDrained"/>.
private long _inFlightTrees;

internal void OnTreeCreated() => Interlocked.Increment(ref _inFlightTrees);

internal void OnTreeDisposed() => Interlocked.Decrement(ref _inFlightTrees);

/// <summary>
/// Spin-wait until all trees handed out by this factory have been disposed. Returns true
/// if drained within <paramref name="timeout"/>, false on timeout.
/// </summary>
public bool WaitForInFlightTreesDrained(TimeSpan timeout)
{
long deadline = Stopwatch.GetTimestamp() + (long)(timeout.TotalSeconds * Stopwatch.Frequency);
SpinWait spin = new();
while (Interlocked.Read(ref _inFlightTrees) > 0)
{
if (Stopwatch.GetTimestamp() > deadline)
{
if (_logger.IsWarn) _logger.Warn($"FlatSnapTrieFactory: {Interlocked.Read(ref _inFlightTrees)} trees still in flight after {timeout}");
return false;
}
spin.SpinOnce();
}
return true;
}

public ISnapTree<PathWithAccount> CreateStateTree()
{
EnsureDatabaseCleared();

IPersistence.IPersistenceReader reader = persistence.CreateReader(ReaderFlags.Sync);
IPersistence.IWriteBatch writeBatch = persistence.CreateWriteBatch(StateId.Sync, StateId.Sync, WriteFlags.DisableWAL);
return new FlatSnapStateTree(reader, writeBatch, syncConfig.EnableSnapDoubleWriteCheck, logManager);
IPersistence.IPersistenceReader? reader = null;
IPersistence.IWriteBatch? writeBatch = null;
bool treeTracked = false;
try
{
reader = persistence.CreateReader(ReaderFlags.Sync);
writeBatch = persistence.CreateWriteBatch(StateId.Sync, StateId.Sync, WriteFlags.DisableWAL);
OnTreeCreated();
treeTracked = true;
return new FlatSnapStateTree(reader, writeBatch, syncConfig.EnableSnapDoubleWriteCheck, logManager, this);
}
catch
{
if (treeTracked) OnTreeDisposed();
writeBatch?.Dispose();
reader?.Dispose();
throw;
}
}

public ISnapTree<PathWithStorageSlot> CreateStorageTree(in ValueHash256 accountPath)
{
EnsureDatabaseCleared();

IPersistence.IPersistenceReader reader = persistence.CreateReader(ReaderFlags.Sync);
IPersistence.IWriteBatch writeBatch = persistence.CreateWriteBatch(StateId.Sync, StateId.Sync, WriteFlags.DisableWAL);
return new FlatSnapStorageTree(reader, writeBatch, accountPath.ToCommitment(), syncConfig.EnableSnapDoubleWriteCheck, logManager);
IPersistence.IPersistenceReader? reader = null;
IPersistence.IWriteBatch? writeBatch = null;
bool treeTracked = false;
try
{
reader = persistence.CreateReader(ReaderFlags.Sync);
writeBatch = persistence.CreateWriteBatch(StateId.Sync, StateId.Sync, WriteFlags.DisableWAL);
OnTreeCreated();
treeTracked = true;
return new FlatSnapStorageTree(reader, writeBatch, accountPath.ToCommitment(), syncConfig.EnableSnapDoubleWriteCheck, logManager, this);
}
catch
{
if (treeTracked) OnTreeDisposed();
writeBatch?.Dispose();
reader?.Dispose();
throw;
}
}

private void EnsureDatabaseCleared()
Expand All @@ -47,10 +112,15 @@
using (_lock.EnterScope())
{
if (_initialized) return;
_initialized = true;

_logger.Info("Clearing database");
persistence.Clear();

// Set _initialized AFTER Clear completes so a concurrent caller can't see _initialized=true
// and proceed to write accounts while Clear is still iterating GetAllKeys / queueing
// Removes in its write batch — that race deletes freshly-written accounts when Clear's
// batch finally disposes. Volatile (via the field declaration) gives the publish ordering.
_initialized = true;
}
}
}
23 changes: 21 additions & 2 deletions src/Nethermind/Nethermind.Synchronization.Test/E2ESyncTests.cs
Original file line number Diff line number Diff line change
Expand Up @@ -408,15 +408,31 @@ public async Task SnapSync()
if (dbMode == DbMode.Hash) Assert.Ignore("Hash db does not support snap sync");

using CancellationTokenSource cancellationTokenSource = new CancellationTokenSource().ThatCancelAfter(TestTimeout);
await RunSnapSyncOnce(cancellationTokenSource.Token);
}

// Stress reproducer for SnapSync Windows flake — run manually; see PR #11443 for context.
[Test, Explicit("Stress reproducer for SnapSync Windows flake — run manually")]
[TestCaseSource(nameof(StressIterations))]
public async Task SnapSync_StressRepro(int iteration)
{
if (dbMode != DbMode.Flat) Assert.Ignore("Stress repro only targets the Flat dbMode where the flake was observed");
_ = iteration; // index is purely to give NUnit a unique case per attempt

using CancellationTokenSource cancellationTokenSource = new CancellationTokenSource().ThatCancelAfter(TestTimeout);
await RunSnapSyncOnce(cancellationTokenSource.Token);
}

private async Task RunSnapSyncOnce(CancellationToken cancellationToken)
{
PrivateKey clientKey = TestItem.PrivateKeyD;
await using IContainer client = await CreateNode(clientKey, async (cfg, spec) =>
{
SyncConfig syncConfig = (SyncConfig)cfg.GetConfig<ISyncConfig>();
syncConfig.FastSync = true;
syncConfig.SnapSync = true;

await SetPivot(syncConfig, cancellationTokenSource.Token);
await SetPivot(syncConfig, cancellationToken);

INetworkConfig networkConfig = cfg.GetConfig<INetworkConfig>();
networkConfig.P2PPort = AllocatePort();
Expand All @@ -425,9 +441,12 @@ public async Task SnapSync()
networkConfig.FilterDiscoveryNodesByRecentIp = false;
});

await client.Resolve<SyncTestContext>().SyncFromServer(_server, cancellationTokenSource.Token);
await client.Resolve<SyncTestContext>().SyncFromServer(_server, cancellationToken);
}
Comment on lines +428 to +445
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The body of this test is a verbatim copy of SnapSync(). Per the repo's test-infrastructure rules, duplicating test method bodies is not allowed — extract the shared logic into a private helper (e.g. RunSnapSyncClientOnce(CancellationToken)) and have both methods call it.

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Still a verbatim copy of SnapSync() (lines 412–428). Per coding-style.md ("extract repeated blocks (5+ lines)") and test-infrastructure.md ("Do not duplicate test methods"), extract the shared body into a private helper and have both methods call it:

private async Task RunSnapSyncOnce(CancellationToken cancellationToken)
{
    PrivateKey clientKey = TestItem.PrivateKeyD;
    await using IContainer client = await CreateNode(clientKey, async (cfg, spec) =>
    {
        SyncConfig syncConfig = (SyncConfig)cfg.GetConfig<ISyncConfig>();
        syncConfig.FastSync = true;
        syncConfig.SnapSync = true;
        await SetPivot(syncConfig, cancellationToken);
        INetworkConfig networkConfig = cfg.GetConfig<INetworkConfig>();
        networkConfig.P2PPort = AllocatePort();
        networkConfig.FilterPeersByRecentIp = false;
        networkConfig.FilterDiscoveryNodesByRecentIp = false;
    });
    await client.Resolve<SyncTestContext>().SyncFromServer(_server, cancellationToken);
}

Then both tests become:

await RunSnapSyncOnce(cancellationTokenSource.Token);

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Medium (third occurrence — unaddressed): Lines 447–462 are still a verbatim copy of SnapSync() lines 412–428. coding-style.md requires extracting repeated blocks of 5+ lines into shared methods. Extract to a private helper:

private async Task RunSnapSyncOnce(CancellationToken cancellationToken)
{
    PrivateKey clientKey = TestItem.PrivateKeyD;
    await using IContainer client = await CreateNode(clientKey, async (cfg, spec) =>
    {
        SyncConfig syncConfig = (SyncConfig)cfg.GetConfig<ISyncConfig>();
        syncConfig.FastSync = true;
        syncConfig.SnapSync = true;
        await SetPivot(syncConfig, cancellationToken);
        INetworkConfig networkConfig = cfg.GetConfig<INetworkConfig>();
        networkConfig.P2PPort = AllocatePort();
        networkConfig.FilterPeersByRecentIp = false;
        networkConfig.FilterDiscoveryNodesByRecentIp = false;
    });
    await client.Resolve<SyncTestContext>().SyncFromServer(_server, cancellationToken);
}

Both SnapSync() and SnapSync_StressRepro() then become a one-liner call to this helper.


private const int StressIterationCount = 30;
private static IEnumerable<int> StressIterations() => Enumerable.Range(0, StressIterationCount);

[Test]
[Retry(5)]
public async Task SnapSync_HalfPathServer_HashClient()
Expand Down
Loading