diff --git a/pysyncobj/syncobj.py b/pysyncobj/syncobj.py index dddefb0..c4a3a30 100644 --- a/pysyncobj/syncobj.py +++ b/pysyncobj/syncobj.py @@ -1415,6 +1415,14 @@ def __loadDumpFile(self, clearJournal): self.__raftLastApplied = data[1][1] + if self.__raftCommitIndex > self.__getCurrentLogIndex(): + logger.warning('raftCommitIndex (%d) exceeds journal end (%d) after loading dump, ' + 'likely due to unsynced journal entries lost on crash; ' + 'clamping to journal end', + self.__raftCommitIndex, self.__getCurrentLogIndex()) + self.__raftCommitIndex = self.__getCurrentLogIndex() + self.__raftLog.setRaftCommitIndex(self.__raftCommitIndex) + if self.__conf.dynamicMembershipChange: self.__updateClusterConfiguration([node for node in data[3] if node != self.__selfNode]) self.__onSetCodeVersion(0) diff --git a/test_syncobj.py b/test_syncobj.py index 5d1dbf2..8dfdced 100755 --- a/test_syncobj.py +++ b/test_syncobj.py @@ -1215,6 +1215,90 @@ def test_applyJournalAfterRestart(): removeFiles([e + '.meta' for e in journalFiles]) +def test_journalCrashCommitIndexAhead(): + # Regression test for the case where raftCommitIndex in .journal.meta exceeds + # the last entry physically present in the journal file after an unclean shutdown. + # + # How this happens in production: + # 1. Log compaction runs and leaves the journal with 2 entries [prev, last_applied]. + # 2. New entries (N+1 .. N+k) are written to the mmap journal (page cache only). + # 3. The once-per-second meta timer fires and persists raftCommitIndex=N+k to + # .journal.meta via f.flush() + atomic rename. + # 4. The node crashes before msync() flushes the mmap dirty pages to disk. + # 5. On recovery: meta has N+k, journal ends at N — node deadlocks without this fix. + # + # We reproduce the corrupted state by: + # - running a single-node cluster until counter==100, + # - forcing log compaction (journal shrinks to 2 entries, dump written), + # - committing one more entry (counter==150, journal grows to 3 entries), + # - then re-opening the journal file directly and removing the last entry + # while leaving the meta's raftCommitIndex at the stale high value. + dumpFile = getNextDumpFile() + journalFile = getNextJournalFile() + removeFiles([dumpFile]) + removeFiles([journalFile, journalFile + '.meta']) + + random.seed(42) + + a = [getNextAddr()] + + # Phase 1: commit some state, force compaction, commit one more entry. + o1 = TestObj(a[0], [], TEST_TYPE.JOURNAL_1, dumpFile=dumpFile, journalFile=journalFile) + doTicks([o1], 10, stopFunc=lambda: o1._isReady()) + assert o1._isReady() + + o1.addValue(100) + doTicks([o1], 10, stopFunc=lambda: o1.getCounter() == 100) + assert o1.getCounter() == 100 + + # Force compaction: dump is written at counter=100, journal shrinks to 2 entries. + o1._forceLogCompaction() + doTicks([o1], 1.5) + assert o1._getRaftLogSize() == 2 + + # One more committed entry — this is the entry that will be "lost on crash". + o1.addValue(50) + doTicks([o1], 10, stopFunc=lambda: o1.getCounter() == 150) + assert o1.getCounter() == 150 + + o1._destroy() + + # Phase 2: simulate the crash state. + # After compaction the journal had 2 entries; addValue(50) added a 3rd. + # We remove that 3rd entry but leave the meta's raftCommitIndex pointing past it, + # as the once-per-second timer already flushed the high value before the crash. + journal = createJournal(journalFile) + assert len(journal) == 3 + stale_commit_idx = journal[-1][1] # idx of the entry we are about to lose + journal.deleteEntriesFrom(2) # drop the entry at list index 2 (simulate page-cache loss) + assert len(journal) == 2 + assert journal[-1][1] < stale_commit_idx + journal.setRaftCommitIndex(stale_commit_idx) # stale meta value survives the crash + journal.onOneSecondTimer() + journal._destroy() + + # Phase 3: restart — must recover without deadlock. + # Without the fix the node would get stuck: raftCommitIndex=stale_commit_idx but + # the journal only goes up to journal[-1][1], so applyLogEntries can never advance + # raftLastApplied to commitIndex and the leader commit loop never fires either. + o1 = TestObj(a[0], [], TEST_TYPE.JOURNAL_1, dumpFile=dumpFile, journalFile=journalFile) + doTicks([o1], 10, stopFunc=lambda: o1._isReady()) + assert o1._isReady() + + # State is restored from the dump (counter=100); addValue(50) was lost on crash. + assert o1.getCounter() == 100 + + # The node must be able to accept and commit new commands normally. + o1.addValue(25) + doTicks([o1], 10, stopFunc=lambda: o1.getCounter() == 125) + assert o1.getCounter() == 125 + + o1._destroy() + + removeFiles([dumpFile]) + removeFiles([journalFile, journalFile + '.meta']) + + def test_autoTick1(): random.seed(42)