Skip to content

Commit 8528913

Browse files
authored
Merge pull request #427 from bootjp/copilot/sub-pr-423
store: fix lock ordering in ScanAt/ReverseScanAt/Compact and atomicize restore metadata writes
2 parents 11a506a + 77771df commit 8528913

1 file changed

Lines changed: 54 additions & 46 deletions

File tree

store/lsm_store.go

Lines changed: 54 additions & 46 deletions
Original file line numberDiff line numberDiff line change
@@ -252,6 +252,27 @@ func writePebbleUint64(db *pebble.DB, key []byte, value uint64, opts *pebble.Wri
252252
return errors.WithStack(db.Set(key, buf[:], opts))
253253
}
254254

255+
// writeTempDBMetadata writes lastCommitTS and minRetainedTS atomically in a
256+
// single synced batch so that both values are either fully durable or fully
257+
// absent after a crash. This is critical for restore paths that swap a
258+
// temporary Pebble directory into place: losing lastCommitTS could allow
259+
// future commits to reuse timestamps, violating monotonic ordering.
260+
func writeTempDBMetadata(db *pebble.DB, lastCommitTS, minRetainedTS uint64) error {
261+
batch := db.NewBatch()
262+
defer func() { _ = batch.Close() }()
263+
264+
var buf [timestampSize]byte
265+
binary.LittleEndian.PutUint64(buf[:], lastCommitTS)
266+
if err := batch.Set(metaLastCommitTSBytes, buf[:], nil); err != nil {
267+
return errors.WithStack(err)
268+
}
269+
binary.LittleEndian.PutUint64(buf[:], minRetainedTS)
270+
if err := batch.Set(metaMinRetainedTSBytes, buf[:], nil); err != nil {
271+
return errors.WithStack(err)
272+
}
273+
return errors.WithStack(batch.Commit(pebble.Sync))
274+
}
275+
255276
func isPebbleMetaKey(rawKey []byte) bool {
256277
return bytes.Equal(rawKey, metaLastCommitTSBytes) ||
257278
bytes.Equal(rawKey, metaMinRetainedTSBytes) ||
@@ -619,17 +640,19 @@ func (s *pebbleStore) collectScanResults(iter *pebble.Iterator, start, end []byt
619640
}
620641

621642
func (s *pebbleStore) ScanAt(ctx context.Context, start []byte, end []byte, limit int, ts uint64) ([]*KVPair, error) {
622-
if readTSCompacted(ts, s.effectiveMinRetainedTS()) {
623-
return nil, ErrReadTSCompacted
624-
}
625-
626643
if limit <= 0 {
627644
return []*KVPair{}, nil
628645
}
629646

647+
// Acquire dbMu.RLock before reading effectiveMinRetainedTS (which takes
648+
// mtx.RLock) to preserve the global lock ordering: dbMu before mtx.
630649
s.dbMu.RLock()
631650
defer s.dbMu.RUnlock()
632651

652+
if readTSCompacted(ts, s.effectiveMinRetainedTS()) {
653+
return nil, ErrReadTSCompacted
654+
}
655+
633656
iter, err := s.db.NewIter(&pebble.IterOptions{
634657
LowerBound: encodeKey(start, math.MaxUint64),
635658
})
@@ -704,17 +727,19 @@ func (s *pebbleStore) nextReverseScanKV(
704727
}
705728

706729
func (s *pebbleStore) ReverseScanAt(ctx context.Context, start []byte, end []byte, limit int, ts uint64) ([]*KVPair, error) {
707-
if readTSCompacted(ts, s.effectiveMinRetainedTS()) {
708-
return nil, ErrReadTSCompacted
709-
}
710-
711730
if limit <= 0 {
712731
return []*KVPair{}, nil
713732
}
714733

734+
// Acquire dbMu.RLock before reading effectiveMinRetainedTS (which takes
735+
// mtx.RLock) to preserve the global lock ordering: dbMu before mtx.
715736
s.dbMu.RLock()
716737
defer s.dbMu.RUnlock()
717738

739+
if readTSCompacted(ts, s.effectiveMinRetainedTS()) {
740+
return nil, ErrReadTSCompacted
741+
}
742+
718743
opts := &pebble.IterOptions{
719744
LowerBound: encodeKey(start, math.MaxUint64),
720745
}
@@ -1042,45 +1067,37 @@ func (s *pebbleStore) runCompactionGC(ctx context.Context, minTS uint64) (pebble
10421067

10431068
func (s *pebbleStore) Compact(ctx context.Context, minTS uint64) error {
10441069
ctx = ensureContext(ctx)
1070+
1071+
// Acquire locks in canonical order (maintenanceMu → dbMu → mtx) before
1072+
// reading the effective watermark. Taking maintenanceMu and dbMu.RLock
1073+
// first prevents a deadlock with Restore, which takes maintenanceMu →
1074+
// dbMu.Lock → mtx.Lock: if Compact read effectiveMinRetainedTS (mtx.RLock)
1075+
// before acquiring maintenanceMu, Go's sync.RWMutex could block the
1076+
// RLock when Restore has queued a pending Lock(), causing both goroutines
1077+
// to wait on each other indefinitely.
1078+
s.maintenanceMu.Lock()
1079+
defer s.maintenanceMu.Unlock()
1080+
1081+
// Hold dbMu.RLock() for the entire compaction so that concurrent Restore
1082+
// cannot swap out s.db while the GC scan and metadata commits are in
1083+
// progress. Lock ordering: dbMu before mtx.
1084+
s.dbMu.RLock()
1085+
defer s.dbMu.RUnlock()
1086+
10451087
if minTS <= s.effectiveMinRetainedTS() {
10461088
// Fast path: the requested watermark is already covered. However, a
10471089
// pending watermark may have survived a previous crash (GC ran and
10481090
// deleted versions but minRetainedTS was never committed). Finalize it
10491091
// now so reads < pending are unblocked and the pending key is not leaked.
1050-
//
1051-
// The initial read below is a lock-free optimization to skip heavy lock
1052-
// acquisition when clearly nothing needs to be done. Correctness is
1053-
// guaranteed by the re-check under maintenanceMu+dbMu below.
10541092
s.mtx.RLock()
10551093
pending, committed := s.pendingMinRetainedTS, s.minRetainedTS
10561094
s.mtx.RUnlock()
10571095
if pending <= committed {
10581096
return nil
10591097
}
1060-
// Acquire locks in the canonical order (maintenanceMu → dbMu → mtx)
1061-
// and re-check to handle any concurrent modifications before committing.
1062-
s.maintenanceMu.Lock()
1063-
defer s.maintenanceMu.Unlock()
1064-
s.dbMu.RLock()
1065-
defer s.dbMu.RUnlock()
1066-
s.mtx.RLock()
1067-
pending, committed = s.pendingMinRetainedTS, s.minRetainedTS
1068-
s.mtx.RUnlock()
1069-
if pending <= committed {
1070-
return nil
1071-
}
10721098
return s.commitCompactionMinRetainedTS(pending)
10731099
}
10741100

1075-
s.maintenanceMu.Lock()
1076-
defer s.maintenanceMu.Unlock()
1077-
1078-
// Hold dbMu.RLock() for the entire compaction so that concurrent Restore
1079-
// cannot swap out s.db while the GC scan and metadata commits are in
1080-
// progress. Lock ordering: dbMu before mtx.
1081-
s.dbMu.RLock()
1082-
defer s.dbMu.RUnlock()
1083-
10841101
shouldRun, err := s.beginCompaction(minTS)
10851102
if err != nil {
10861103
return err
@@ -1504,13 +1521,9 @@ func writeStreamingMVCCRestoreTempDB(dir string, body io.Reader, hash hash.Hash3
15041521
cleanupTmp()
15051522
return "", errors.WithStack(ErrInvalidChecksum)
15061523
}
1507-
if err := writePebbleUint64(tmpDB, metaLastCommitTSBytes, lastCommitTS, pebble.NoSync); err != nil {
1524+
if err := writeTempDBMetadata(tmpDB, lastCommitTS, minRetainedTS); err != nil {
15081525
cleanupTmp()
1509-
return "", errors.WithStack(err)
1510-
}
1511-
if err := writePebbleUint64(tmpDB, metaMinRetainedTSBytes, minRetainedTS, pebble.Sync); err != nil {
1512-
cleanupTmp()
1513-
return "", errors.WithStack(err)
1526+
return "", err
15141527
}
15151528
if err := tmpDB.Close(); err != nil {
15161529
_ = os.RemoveAll(tmpDir)
@@ -1622,15 +1635,10 @@ func (s *pebbleStore) restoreLegacyGobToTempDB(entries []mvccSnapshotEntry, last
16221635
return err
16231636
}
16241637

1625-
if err := writePebbleUint64(tmpDB, metaLastCommitTSBytes, lastCommitTS, pebble.NoSync); err != nil {
1638+
if err := writeTempDBMetadata(tmpDB, lastCommitTS, minRetainedTS); err != nil {
16261639
_ = tmpDB.Close()
16271640
_ = os.RemoveAll(tmpDir)
1628-
return errors.WithStack(err)
1629-
}
1630-
if err := writePebbleUint64(tmpDB, metaMinRetainedTSBytes, minRetainedTS, pebble.Sync); err != nil {
1631-
_ = tmpDB.Close()
1632-
_ = os.RemoveAll(tmpDir)
1633-
return errors.WithStack(err)
1641+
return err
16341642
}
16351643

16361644
if err := tmpDB.Close(); err != nil {

0 commit comments

Comments
 (0)