From 2efdecbdd7c44763208421f9dba1d1fb9c6fc310 Mon Sep 17 00:00:00 2001 From: George Melikov Date: Sat, 13 Jun 2026 17:05:01 +0000 Subject: [PATCH] fix startup deadlock when raftCommitIndex exceeds journal after crash MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit On an unclean shutdown (power loss, OOM kill) the node can start with raftCommitIndex from .journal.meta pointing past the last entry in the journal file. This happens because journal entries are written to an mmap (page cache, no msync per write) while the meta is saved via f.flush() + atomic rename once per second — two paths the OS can flush to disk in any order. Concretely: after log compaction leaves the journal at [prev, N], new entries N+1..N+k are appended to the mmap. The meta timer fires and persists raftCommitIndex=N+k. The node crashes. The post-compaction pages had already been written back by the OS; the newer dirty pages had not. On recovery: meta=N+k, journal ends at N. Without the fix the node deadlocks: raftCommitIndex > getCurrentLogIndex() so applyLogEntries finds nothing to apply, and the leader commit loop condition `while commitIdx < getCurrentLogIndex()` is immediately false, so commitIndex can never advance either. Fix: in __loadDumpFile, clamp raftCommitIndex down to getCurrentLogIndex() if it overshoots. The lost entries are unrecoverable; rolling the pointer back is the only safe option. The corrected value is logged as a warning and re-persisted to .journal.meta on the next onOneSecondTimer tick. --- pysyncobj/syncobj.py | 8 +++++ test_syncobj.py | 84 ++++++++++++++++++++++++++++++++++++++++++++ 2 files changed, 92 insertions(+) diff --git a/pysyncobj/syncobj.py b/pysyncobj/syncobj.py index dddefb0..c4a3a30 100644 --- a/pysyncobj/syncobj.py +++ b/pysyncobj/syncobj.py @@ -1415,6 +1415,14 @@ def __loadDumpFile(self, clearJournal): self.__raftLastApplied = data[1][1] + if self.__raftCommitIndex > self.__getCurrentLogIndex(): + logger.warning('raftCommitIndex (%d) exceeds journal end (%d) after loading dump, ' + 'likely due to unsynced journal entries lost on crash; ' + 'clamping to journal end', + self.__raftCommitIndex, self.__getCurrentLogIndex()) + self.__raftCommitIndex = self.__getCurrentLogIndex() + self.__raftLog.setRaftCommitIndex(self.__raftCommitIndex) + if self.__conf.dynamicMembershipChange: self.__updateClusterConfiguration([node for node in data[3] if node != self.__selfNode]) self.__onSetCodeVersion(0) diff --git a/test_syncobj.py b/test_syncobj.py index 5d1dbf2..8dfdced 100755 --- a/test_syncobj.py +++ b/test_syncobj.py @@ -1215,6 +1215,90 @@ def test_applyJournalAfterRestart(): removeFiles([e + '.meta' for e in journalFiles]) +def test_journalCrashCommitIndexAhead(): + # Regression test for the case where raftCommitIndex in .journal.meta exceeds + # the last entry physically present in the journal file after an unclean shutdown. + # + # How this happens in production: + # 1. Log compaction runs and leaves the journal with 2 entries [prev, last_applied]. + # 2. New entries (N+1 .. N+k) are written to the mmap journal (page cache only). + # 3. The once-per-second meta timer fires and persists raftCommitIndex=N+k to + # .journal.meta via f.flush() + atomic rename. + # 4. The node crashes before msync() flushes the mmap dirty pages to disk. + # 5. On recovery: meta has N+k, journal ends at N — node deadlocks without this fix. + # + # We reproduce the corrupted state by: + # - running a single-node cluster until counter==100, + # - forcing log compaction (journal shrinks to 2 entries, dump written), + # - committing one more entry (counter==150, journal grows to 3 entries), + # - then re-opening the journal file directly and removing the last entry + # while leaving the meta's raftCommitIndex at the stale high value. + dumpFile = getNextDumpFile() + journalFile = getNextJournalFile() + removeFiles([dumpFile]) + removeFiles([journalFile, journalFile + '.meta']) + + random.seed(42) + + a = [getNextAddr()] + + # Phase 1: commit some state, force compaction, commit one more entry. + o1 = TestObj(a[0], [], TEST_TYPE.JOURNAL_1, dumpFile=dumpFile, journalFile=journalFile) + doTicks([o1], 10, stopFunc=lambda: o1._isReady()) + assert o1._isReady() + + o1.addValue(100) + doTicks([o1], 10, stopFunc=lambda: o1.getCounter() == 100) + assert o1.getCounter() == 100 + + # Force compaction: dump is written at counter=100, journal shrinks to 2 entries. + o1._forceLogCompaction() + doTicks([o1], 1.5) + assert o1._getRaftLogSize() == 2 + + # One more committed entry — this is the entry that will be "lost on crash". + o1.addValue(50) + doTicks([o1], 10, stopFunc=lambda: o1.getCounter() == 150) + assert o1.getCounter() == 150 + + o1._destroy() + + # Phase 2: simulate the crash state. + # After compaction the journal had 2 entries; addValue(50) added a 3rd. + # We remove that 3rd entry but leave the meta's raftCommitIndex pointing past it, + # as the once-per-second timer already flushed the high value before the crash. + journal = createJournal(journalFile) + assert len(journal) == 3 + stale_commit_idx = journal[-1][1] # idx of the entry we are about to lose + journal.deleteEntriesFrom(2) # drop the entry at list index 2 (simulate page-cache loss) + assert len(journal) == 2 + assert journal[-1][1] < stale_commit_idx + journal.setRaftCommitIndex(stale_commit_idx) # stale meta value survives the crash + journal.onOneSecondTimer() + journal._destroy() + + # Phase 3: restart — must recover without deadlock. + # Without the fix the node would get stuck: raftCommitIndex=stale_commit_idx but + # the journal only goes up to journal[-1][1], so applyLogEntries can never advance + # raftLastApplied to commitIndex and the leader commit loop never fires either. + o1 = TestObj(a[0], [], TEST_TYPE.JOURNAL_1, dumpFile=dumpFile, journalFile=journalFile) + doTicks([o1], 10, stopFunc=lambda: o1._isReady()) + assert o1._isReady() + + # State is restored from the dump (counter=100); addValue(50) was lost on crash. + assert o1.getCounter() == 100 + + # The node must be able to accept and commit new commands normally. + o1.addValue(25) + doTicks([o1], 10, stopFunc=lambda: o1.getCounter() == 125) + assert o1.getCounter() == 125 + + o1._destroy() + + removeFiles([dumpFile]) + removeFiles([journalFile, journalFile + '.meta']) + + def test_autoTick1(): random.seed(42)