diff --git a/src/Nethermind/Nethermind.Db/SnapshotableMemColumnsDb.cs b/src/Nethermind/Nethermind.Db/SnapshotableMemColumnsDb.cs index 7646446db9db..f61327a53e76 100644 --- a/src/Nethermind/Nethermind.Db/SnapshotableMemColumnsDb.cs +++ b/src/Nethermind/Nethermind.Db/SnapshotableMemColumnsDb.cs @@ -3,6 +3,7 @@ using System; using System.Collections.Generic; +using System.Threading; using Nethermind.Core; namespace Nethermind.Db @@ -16,6 +17,9 @@ public class SnapshotableMemColumnsDb : IColumnsDb where TKey : stru private readonly Dictionary _columnDbs = new(); private readonly bool _neverPrune; + // Cross-column atomicity guard so CreateSnapshot can't observe a half-applied multi-column writeBatch dispose. RocksDB has this for free; this matches it. + private readonly Lock _atomicityLock = new(); + private SnapshotableMemColumnsDb(TKey[] keys, bool neverPrune) { _neverPrune = neverPrune; @@ -55,10 +59,11 @@ public IDb GetColumnDb(TKey key) public IReadOnlyColumnDb CreateReadOnly(bool createInMemWriteStore) => new ReadOnlyColumnsDb(this, createInMemWriteStore); - public IColumnsWriteBatch StartWriteBatch() => new InMemoryColumnWriteBatch(this); + public IColumnsWriteBatch StartWriteBatch() => new AtomicColumnsWriteBatch(this); public IColumnDbSnapshot CreateSnapshot() { + using Lock.Scope _ = _atomicityLock.EnterScope(); Dictionary snapshots = new(); foreach (KeyValuePair kvp in _columnDbs) { @@ -67,6 +72,26 @@ public IColumnDbSnapshot CreateSnapshot() return new ColumnSnapshot(snapshots); } + /// + /// Wraps so the per-column commit phase + /// happens under the columns DB's write lock, making the multi-column commit atomic + /// w.r.t. . + /// + private sealed class AtomicColumnsWriteBatch(SnapshotableMemColumnsDb db) : IColumnsWriteBatch + { + private readonly InMemoryColumnWriteBatch _inner = new(db); + + public IWriteBatch GetColumnBatch(TKey key) => _inner.GetColumnBatch(key); + + public void Clear() => _inner.Clear(); + + public void Dispose() + { + using Lock.Scope _ = db._atomicityLock.EnterScope(); + _inner.Dispose(); + } + } + public void Dispose() { foreach (SnapshotableMemDb db in _columnDbs.Values) diff --git a/src/Nethermind/Nethermind.State.Flat.Test/Sync/FlatTreeSyncStoreTests.cs b/src/Nethermind/Nethermind.State.Flat.Test/Sync/FlatTreeSyncStoreTests.cs index d25c20344d1a..add8f747281d 100644 --- a/src/Nethermind/Nethermind.State.Flat.Test/Sync/FlatTreeSyncStoreTests.cs +++ b/src/Nethermind/Nethermind.State.Flat.Test/Sync/FlatTreeSyncStoreTests.cs @@ -7,10 +7,12 @@ using Nethermind.Core.Extensions; using Nethermind.Core.Test.Builders; using Nethermind.Db; +using Nethermind.Blockchain.Synchronization; using Nethermind.Int256; using Nethermind.Logging; using Nethermind.State.Flat.Persistence; using Nethermind.State.Flat.Sync; +using Nethermind.State.Flat.Sync.Snap; using NSubstitute; using NUnit.Framework; @@ -72,7 +74,8 @@ public void EnsureStorageEmpty_deletes_all_storage_entries() Assert.That(HasStorageEntries(address), Is.True, "Storage entries should exist before cleanup"); - FlatTreeSyncStore store = new(_persistence, Substitute.For(), LimboLogs.Instance); + FlatSnapTrieFactory snapTrieFactory = new(_persistence, Substitute.For(), LimboLogs.Instance); + FlatTreeSyncStore store = new(_persistence, Substitute.For(), snapTrieFactory, LimboLogs.Instance); store.EnsureStorageEmpty(Keccak.Compute(address.Bytes)); Assert.That(HasStorageEntries(address), Is.False, "Storage entries should be deleted after EnsureStorageEmpty"); diff --git a/src/Nethermind/Nethermind.State.Flat/Sync/FlatTreeSyncStore.cs b/src/Nethermind/Nethermind.State.Flat/Sync/FlatTreeSyncStore.cs index 650c95a33151..d630bbef75ff 100644 --- a/src/Nethermind/Nethermind.State.Flat/Sync/FlatTreeSyncStore.cs +++ b/src/Nethermind/Nethermind.State.Flat/Sync/FlatTreeSyncStore.cs @@ -9,13 +9,14 @@ using Nethermind.Serialization.Rlp; using Nethermind.State.Flat.Persistence; using Nethermind.State.Flat.ScopeProvider; +using Nethermind.State.Flat.Sync.Snap; using Nethermind.Synchronization.FastSync; using Nethermind.Trie; using Nethermind.Trie.Pruning; namespace Nethermind.State.Flat.Sync; -public class FlatTreeSyncStore(IPersistence persistence, IPersistenceManager persistenceManager, ILogManager logManager) : ITreeSyncStore +public class FlatTreeSyncStore(IPersistence persistence, IPersistenceManager persistenceManager, FlatSnapTrieFactory snapTrieFactory, ILogManager logManager) : ITreeSyncStore { // For flat, one cannot continue syncing after finalization as it will corrupt existing state. private bool _wasFinalized = false; @@ -236,6 +237,15 @@ public void FinalizeSync(BlockHeader pivotHeader) { if (Interlocked.CompareExchange(ref _wasFinalized, true, false)) throw new InvalidOperationException("Db was finalized"); + // Wait for any in-flight ISnapTree disposals to complete. Phase-1 / phase-2 trees + // hold per-tree IWriteBatch instances; until each tree's Dispose returns, its writes + // aren't yet committed to the underlying persistence. Without this drain a caller that + // observes "sync finished" can read state that's missing accounts from late-disposed + // trees (manifests as E2ESyncTests.SnapSync "Verification failed: N missing in flat"). + // WaitForInFlightTreesDrained logs its own warning on timeout; we still proceed so a + // stuck tree doesn't permanently block FinalizeSync. + snapTrieFactory.WaitForInFlightTreesDrained(TimeSpan.FromSeconds(30)); + using IPersistence.IPersistenceReader reader = persistence.CreateReader(ReaderFlags.Sync); StateId from = reader.CurrentState; StateId to = new(pivotHeader); diff --git a/src/Nethermind/Nethermind.State.Flat/Sync/Snap/FlatSnapStateTree.cs b/src/Nethermind/Nethermind.State.Flat/Sync/Snap/FlatSnapStateTree.cs index 4dfc76185605..99b570972688 100644 --- a/src/Nethermind/Nethermind.State.Flat/Sync/Snap/FlatSnapStateTree.cs +++ b/src/Nethermind/Nethermind.State.Flat/Sync/Snap/FlatSnapStateTree.cs @@ -24,13 +24,15 @@ public class FlatSnapStateTree : ISnapTree private readonly bool _enableDoubleWriteCheck; private readonly SnapUpperBoundAdapter _adapter; private readonly StateTree _tree; + private readonly FlatSnapTrieFactory? _ownerFactory; private IReadOnlyList? _pendingEntries; - public FlatSnapStateTree(IPersistence.IPersistenceReader reader, IPersistence.IWriteBatch writeBatch, bool enableDoubleWriteCheck, ILogManager logManager) + public FlatSnapStateTree(IPersistence.IPersistenceReader reader, IPersistence.IWriteBatch writeBatch, bool enableDoubleWriteCheck, ILogManager logManager, FlatSnapTrieFactory? ownerFactory = null) { _reader = reader; _writeBatch = writeBatch; _enableDoubleWriteCheck = enableDoubleWriteCheck; + _ownerFactory = ownerFactory; _adapter = new SnapUpperBoundAdapter(new PersistenceTrieStoreAdapter(reader, writeBatch, enableDoubleWriteCheck)); _tree = new StateTree(_adapter, logManager); } @@ -74,8 +76,15 @@ public void Commit(ValueHash256 upperBound) public void Dispose() { - _writeBatch.Dispose(); - _reader.Dispose(); + try + { + _writeBatch.Dispose(); + _reader.Dispose(); + } + finally + { + _ownerFactory?.OnTreeDisposed(); + } } /// diff --git a/src/Nethermind/Nethermind.State.Flat/Sync/Snap/FlatSnapStorageTree.cs b/src/Nethermind/Nethermind.State.Flat/Sync/Snap/FlatSnapStorageTree.cs index 1110b21be574..d282e80ec42f 100644 --- a/src/Nethermind/Nethermind.State.Flat/Sync/Snap/FlatSnapStorageTree.cs +++ b/src/Nethermind/Nethermind.State.Flat/Sync/Snap/FlatSnapStorageTree.cs @@ -26,14 +26,16 @@ public class FlatSnapStorageTree : ISnapTree private readonly StorageTree _tree; private readonly Hash256 _addressHash; private readonly SnapUpperBoundAdapter _adapter; + private readonly FlatSnapTrieFactory? _ownerFactory; private IReadOnlyList? _pendingEntries; - public FlatSnapStorageTree(IPersistence.IPersistenceReader reader, IPersistence.IWriteBatch writeBatch, Hash256 addressHash, bool enableDoubleWriteCheck, ILogManager logManager) + public FlatSnapStorageTree(IPersistence.IPersistenceReader reader, IPersistence.IWriteBatch writeBatch, Hash256 addressHash, bool enableDoubleWriteCheck, ILogManager logManager, FlatSnapTrieFactory? ownerFactory = null) { _reader = reader; _writeBatch = writeBatch; _enableDoubleWriteCheck = enableDoubleWriteCheck; _addressHash = addressHash; + _ownerFactory = ownerFactory; _adapter = new SnapUpperBoundAdapter(new PersistenceStorageTrieStoreAdapter(reader, writeBatch, addressHash, enableDoubleWriteCheck)); _tree = new StorageTree(_adapter, logManager); } @@ -82,8 +84,15 @@ public void Commit(ValueHash256 upperBound) public void Dispose() { - _writeBatch.Dispose(); - _reader.Dispose(); + try + { + _writeBatch.Dispose(); + _reader.Dispose(); + } + finally + { + _ownerFactory?.OnTreeDisposed(); + } } /// diff --git a/src/Nethermind/Nethermind.State.Flat/Sync/Snap/FlatSnapTrieFactory.cs b/src/Nethermind/Nethermind.State.Flat/Sync/Snap/FlatSnapTrieFactory.cs index 9bc92d2a69fb..44144662c83d 100644 --- a/src/Nethermind/Nethermind.State.Flat/Sync/Snap/FlatSnapTrieFactory.cs +++ b/src/Nethermind/Nethermind.State.Flat/Sync/Snap/FlatSnapTrieFactory.cs @@ -1,6 +1,8 @@ // SPDX-FileCopyrightText: 2025 Demerzel Solutions Limited // SPDX-License-Identifier: LGPL-3.0-only +using System; +using System.Diagnostics; using Nethermind.Blockchain.Synchronization; using Nethermind.Core; using Nethermind.Core.Crypto; @@ -14,30 +16,93 @@ namespace Nethermind.State.Flat.Sync.Snap; /// /// ISnapTrieFactory implementation for flat state storage. /// Uses IPersistence to create reader/writeBatch per tree for proper resource management. +/// Tracks in-flight trees so callers can wait for all writeBatches to drain before relying +/// on the persisted state being complete. /// public class FlatSnapTrieFactory(IPersistence persistence, ISyncConfig syncConfig, ILogManager logManager) : ISnapTrieFactory { private readonly ILogger _logger = logManager.GetClassLogger(); private readonly Lock _lock = new(); - private bool _initialized = false; + private volatile bool _initialized; + + // Tracks ISnapTree instances created via this factory that haven't been disposed yet. + // The dispose path commits the per-tree IWriteBatch; until that completes, the tree's + // accumulated writes aren't yet visible to subsequent IPersistence readers. Callers + // that need a consistent post-sync view (e.g. FlatTreeSyncStore.FinalizeSync) drain via + // . + private long _inFlightTrees; + + internal void OnTreeCreated() => Interlocked.Increment(ref _inFlightTrees); + + internal void OnTreeDisposed() => Interlocked.Decrement(ref _inFlightTrees); + + /// + /// Spin-wait until all trees handed out by this factory have been disposed. Returns true + /// if drained within , false on timeout. + /// + public bool WaitForInFlightTreesDrained(TimeSpan timeout) + { + long deadline = Stopwatch.GetTimestamp() + (long)(timeout.TotalSeconds * Stopwatch.Frequency); + SpinWait spin = new(); + while (Interlocked.Read(ref _inFlightTrees) > 0) + { + if (Stopwatch.GetTimestamp() > deadline) + { + if (_logger.IsWarn) _logger.Warn($"FlatSnapTrieFactory: {Interlocked.Read(ref _inFlightTrees)} trees still in flight after {timeout}"); + return false; + } + spin.SpinOnce(); + } + return true; + } public ISnapTree CreateStateTree() { EnsureDatabaseCleared(); - IPersistence.IPersistenceReader reader = persistence.CreateReader(ReaderFlags.Sync); - IPersistence.IWriteBatch writeBatch = persistence.CreateWriteBatch(StateId.Sync, StateId.Sync, WriteFlags.DisableWAL); - return new FlatSnapStateTree(reader, writeBatch, syncConfig.EnableSnapDoubleWriteCheck, logManager); + IPersistence.IPersistenceReader? reader = null; + IPersistence.IWriteBatch? writeBatch = null; + bool treeTracked = false; + try + { + reader = persistence.CreateReader(ReaderFlags.Sync); + writeBatch = persistence.CreateWriteBatch(StateId.Sync, StateId.Sync, WriteFlags.DisableWAL); + OnTreeCreated(); + treeTracked = true; + return new FlatSnapStateTree(reader, writeBatch, syncConfig.EnableSnapDoubleWriteCheck, logManager, this); + } + catch + { + if (treeTracked) OnTreeDisposed(); + writeBatch?.Dispose(); + reader?.Dispose(); + throw; + } } public ISnapTree CreateStorageTree(in ValueHash256 accountPath) { EnsureDatabaseCleared(); - IPersistence.IPersistenceReader reader = persistence.CreateReader(ReaderFlags.Sync); - IPersistence.IWriteBatch writeBatch = persistence.CreateWriteBatch(StateId.Sync, StateId.Sync, WriteFlags.DisableWAL); - return new FlatSnapStorageTree(reader, writeBatch, accountPath.ToCommitment(), syncConfig.EnableSnapDoubleWriteCheck, logManager); + IPersistence.IPersistenceReader? reader = null; + IPersistence.IWriteBatch? writeBatch = null; + bool treeTracked = false; + try + { + reader = persistence.CreateReader(ReaderFlags.Sync); + writeBatch = persistence.CreateWriteBatch(StateId.Sync, StateId.Sync, WriteFlags.DisableWAL); + OnTreeCreated(); + treeTracked = true; + return new FlatSnapStorageTree(reader, writeBatch, accountPath.ToCommitment(), syncConfig.EnableSnapDoubleWriteCheck, logManager, this); + } + catch + { + if (treeTracked) OnTreeDisposed(); + writeBatch?.Dispose(); + reader?.Dispose(); + throw; + } } private void EnsureDatabaseCleared() @@ -47,10 +112,15 @@ private void EnsureDatabaseCleared() using (_lock.EnterScope()) { if (_initialized) return; - _initialized = true; _logger.Info("Clearing database"); persistence.Clear(); + + // Set _initialized AFTER Clear completes so a concurrent caller can't see _initialized=true + // and proceed to write accounts while Clear is still iterating GetAllKeys / queueing + // Removes in its write batch — that race deletes freshly-written accounts when Clear's + // batch finally disposes. Volatile (via the field declaration) gives the publish ordering. + _initialized = true; } } } diff --git a/src/Nethermind/Nethermind.Synchronization.Test/E2ESyncTests.cs b/src/Nethermind/Nethermind.Synchronization.Test/E2ESyncTests.cs index 988c80b54d09..7c5e563f7749 100644 --- a/src/Nethermind/Nethermind.Synchronization.Test/E2ESyncTests.cs +++ b/src/Nethermind/Nethermind.Synchronization.Test/E2ESyncTests.cs @@ -408,7 +408,23 @@ public async Task SnapSync() if (dbMode == DbMode.Hash) Assert.Ignore("Hash db does not support snap sync"); using CancellationTokenSource cancellationTokenSource = new CancellationTokenSource().ThatCancelAfter(TestTimeout); + await RunSnapSyncOnce(cancellationTokenSource.Token); + } + + // Stress reproducer for SnapSync Windows flake — run manually; see PR #11443 for context. + [Test, Explicit("Stress reproducer for SnapSync Windows flake — run manually")] + [TestCaseSource(nameof(StressIterations))] + public async Task SnapSync_StressRepro(int iteration) + { + if (dbMode != DbMode.Flat) Assert.Ignore("Stress repro only targets the Flat dbMode where the flake was observed"); + _ = iteration; // index is purely to give NUnit a unique case per attempt + + using CancellationTokenSource cancellationTokenSource = new CancellationTokenSource().ThatCancelAfter(TestTimeout); + await RunSnapSyncOnce(cancellationTokenSource.Token); + } + private async Task RunSnapSyncOnce(CancellationToken cancellationToken) + { PrivateKey clientKey = TestItem.PrivateKeyD; await using IContainer client = await CreateNode(clientKey, async (cfg, spec) => { @@ -416,7 +432,7 @@ public async Task SnapSync() syncConfig.FastSync = true; syncConfig.SnapSync = true; - await SetPivot(syncConfig, cancellationTokenSource.Token); + await SetPivot(syncConfig, cancellationToken); INetworkConfig networkConfig = cfg.GetConfig(); networkConfig.P2PPort = AllocatePort(); @@ -425,9 +441,12 @@ public async Task SnapSync() networkConfig.FilterDiscoveryNodesByRecentIp = false; }); - await client.Resolve().SyncFromServer(_server, cancellationTokenSource.Token); + await client.Resolve().SyncFromServer(_server, cancellationToken); } + private const int StressIterationCount = 30; + private static IEnumerable StressIterations() => Enumerable.Range(0, StressIterationCount); + [Test] [Retry(5)] public async Task SnapSync_HalfPathServer_HashClient()