Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 8 additions & 0 deletions pysyncobj/syncobj.py
Original file line number Diff line number Diff line change
Expand Up @@ -1415,6 +1415,14 @@ def __loadDumpFile(self, clearJournal):

self.__raftLastApplied = data[1][1]

if self.__raftCommitIndex > self.__getCurrentLogIndex():
logger.warning('raftCommitIndex (%d) exceeds journal end (%d) after loading dump, '
'likely due to unsynced journal entries lost on crash; '
'clamping to journal end',
self.__raftCommitIndex, self.__getCurrentLogIndex())
self.__raftCommitIndex = self.__getCurrentLogIndex()
self.__raftLog.setRaftCommitIndex(self.__raftCommitIndex)

if self.__conf.dynamicMembershipChange:
self.__updateClusterConfiguration([node for node in data[3] if node != self.__selfNode])
self.__onSetCodeVersion(0)
Expand Down
84 changes: 84 additions & 0 deletions test_syncobj.py
Original file line number Diff line number Diff line change
Expand Up @@ -1215,6 +1215,90 @@ def test_applyJournalAfterRestart():
removeFiles([e + '.meta' for e in journalFiles])


def test_journalCrashCommitIndexAhead():
# Regression test for the case where raftCommitIndex in .journal.meta exceeds
# the last entry physically present in the journal file after an unclean shutdown.
#
# How this happens in production:
# 1. Log compaction runs and leaves the journal with 2 entries [prev, last_applied].
# 2. New entries (N+1 .. N+k) are written to the mmap journal (page cache only).
# 3. The once-per-second meta timer fires and persists raftCommitIndex=N+k to
# .journal.meta via f.flush() + atomic rename.
# 4. The node crashes before msync() flushes the mmap dirty pages to disk.
# 5. On recovery: meta has N+k, journal ends at N — node deadlocks without this fix.
#
# We reproduce the corrupted state by:
# - running a single-node cluster until counter==100,
# - forcing log compaction (journal shrinks to 2 entries, dump written),
# - committing one more entry (counter==150, journal grows to 3 entries),
# - then re-opening the journal file directly and removing the last entry
# while leaving the meta's raftCommitIndex at the stale high value.
dumpFile = getNextDumpFile()
journalFile = getNextJournalFile()
removeFiles([dumpFile])
removeFiles([journalFile, journalFile + '.meta'])

random.seed(42)

a = [getNextAddr()]

# Phase 1: commit some state, force compaction, commit one more entry.
o1 = TestObj(a[0], [], TEST_TYPE.JOURNAL_1, dumpFile=dumpFile, journalFile=journalFile)
doTicks([o1], 10, stopFunc=lambda: o1._isReady())
assert o1._isReady()

o1.addValue(100)
doTicks([o1], 10, stopFunc=lambda: o1.getCounter() == 100)
assert o1.getCounter() == 100

# Force compaction: dump is written at counter=100, journal shrinks to 2 entries.
o1._forceLogCompaction()
doTicks([o1], 1.5)
assert o1._getRaftLogSize() == 2

# One more committed entry — this is the entry that will be "lost on crash".
o1.addValue(50)
doTicks([o1], 10, stopFunc=lambda: o1.getCounter() == 150)
assert o1.getCounter() == 150

o1._destroy()

# Phase 2: simulate the crash state.
# After compaction the journal had 2 entries; addValue(50) added a 3rd.
# We remove that 3rd entry but leave the meta's raftCommitIndex pointing past it,
# as the once-per-second timer already flushed the high value before the crash.
journal = createJournal(journalFile)
assert len(journal) == 3
stale_commit_idx = journal[-1][1] # idx of the entry we are about to lose
journal.deleteEntriesFrom(2) # drop the entry at list index 2 (simulate page-cache loss)
assert len(journal) == 2
assert journal[-1][1] < stale_commit_idx
journal.setRaftCommitIndex(stale_commit_idx) # stale meta value survives the crash
journal.onOneSecondTimer()
journal._destroy()

# Phase 3: restart — must recover without deadlock.
# Without the fix the node would get stuck: raftCommitIndex=stale_commit_idx but
# the journal only goes up to journal[-1][1], so applyLogEntries can never advance
# raftLastApplied to commitIndex and the leader commit loop never fires either.
o1 = TestObj(a[0], [], TEST_TYPE.JOURNAL_1, dumpFile=dumpFile, journalFile=journalFile)
doTicks([o1], 10, stopFunc=lambda: o1._isReady())
assert o1._isReady()

# State is restored from the dump (counter=100); addValue(50) was lost on crash.
assert o1.getCounter() == 100

# The node must be able to accept and commit new commands normally.
o1.addValue(25)
doTicks([o1], 10, stopFunc=lambda: o1.getCounter() == 125)
assert o1.getCounter() == 125

o1._destroy()

removeFiles([dumpFile])
removeFiles([journalFile, journalFile + '.meta'])


def test_autoTick1():
random.seed(42)

Expand Down
Loading