diff --git a/db.go b/db.go index 70bdc4774..8a1021308 100644 --- a/db.go +++ b/db.go @@ -378,7 +378,7 @@ func Open(opt Options) (*DB, error) { db.closers.writes = z.NewCloser(1) go db.doWrites(db.closers.writes) - if !db.opt.InMemory { + if !db.opt.InMemory && !db.opt.ReadOnly { db.closers.valueGC = z.NewCloser(1) go db.vlog.waitOnGC(db.closers.valueGC) } @@ -538,7 +538,7 @@ func (db *DB) close() (err error) { db.blockWrites.Store(1) db.isClosed.Store(1) - if !db.opt.InMemory { + if db.closers.valueGC != nil { // Stop value GC first. db.closers.valueGC.SignalAndWait() } @@ -671,8 +671,8 @@ const ( // Sync syncs database content to disk. This function provides // more control to user to sync data whenever required. func (db *DB) Sync() error { - if db.opt.InMemory { - // InMemory mode does not use WAL/vlog files, so Sync is a no-op. + if db.opt.InMemory || db.opt.ReadOnly { + // InMemory and read-only modes do not use WAL/vlog files, so Sync is a no-op. return nil } @@ -1235,6 +1235,9 @@ func (db *DB) RunValueLogGC(discardRatio float64) error { if db.opt.InMemory { return ErrGCInMemoryMode } + if db.opt.ReadOnly { + return ErrGCInReadOnlyMode + } if discardRatio >= 1.0 || discardRatio <= 0.0 { return ErrInvalidRequest } @@ -1355,6 +1358,9 @@ func (db *DB) GetSequence(key []byte, bandwidth uint64) (*Sequence, error) { if db.opt.managedTxns { panic("Cannot use GetSequence with managedDB=true.") } + if db.opt.ReadOnly { + panic("Cannot use GetSequence in read-only mode.") + } switch { case len(key) == 0: @@ -1563,6 +1569,9 @@ func (db *DB) startMemoryFlush() { // stopped. Ideally, no writes are going on during Flatten. Otherwise, it would create competition // between flattening the tree and new tables being created at level zero. func (db *DB) Flatten(workers int) error { + if db.opt.ReadOnly { + panic("Cannot flatten in read-only mode.") + } db.stopCompactions() defer db.startCompactions() @@ -1851,6 +1860,9 @@ func (db *DB) BanNamespace(ns uint64) error { if db.opt.NamespaceOffset < 0 { return ErrNamespaceMode } + if db.opt.ReadOnly { + panic("Cannot ban namespace in read-only mode.") + } db.opt.Infof("Banning namespace: %d", ns) // First set the banned namespaces in DB and then update the in-memory structure. key := y.KeyWithTs(append(bannedNsKey, y.U64ToBytes(ns)...), 1) diff --git a/db_test.go b/db_test.go index 1771a80b3..df6a383dd 100644 --- a/db_test.go +++ b/db_test.go @@ -1742,8 +1742,6 @@ func TestTestSequence2(t *testing.T) { } func TestReadOnly(t *testing.T) { - t.Skipf("TODO: ReadOnly needs truncation, so this fails") - dir, err := os.MkdirTemp("", "badger-test") require.NoError(t, err) defer removeDir(dir) @@ -1771,12 +1769,10 @@ func TestReadOnly(t *testing.T) { opts.ReadOnly = true kv1, err := Open(opts) require.NoError(t, err) - defer kv1.Close() // Open another read-only kv2, err := Open(opts) require.NoError(t, err) - defer kv2.Close() // Attempt a read-write open while it's open for read-only opts.ReadOnly = false @@ -1793,6 +1789,10 @@ func TestReadOnly(t *testing.T) { require.Equal(t, b1, []byte("value1")) err = txn1.Commit() require.NoError(t, err) + err = kv1.RunValueLogGC(0.5) + require.Error(t, err, ErrGCInReadOnlyMode) + err = kv1.Sync() + require.NoError(t, err) // Get a thing from the DB via the other connection txn2 := kv2.NewTransaction(true) @@ -1811,6 +1811,26 @@ func TestReadOnly(t *testing.T) { require.Contains(t, err.Error(), "No sets or deletes are allowed in a read-only transaction") err = txn.Commit() require.NoError(t, err) + + // Close + require.NoError(t, kv1.Close()) + require.NoError(t, kv2.Close()) + + // Test os permission read-only open + // We don't directly chmod the directory to still be able to aquire the lock + err = os.Chmod(dir, 0o500) + require.NoError(t, err) + opts.ReadOnly = true + kv3, err := Open(opts) + require.NoError(t, err) + txn3 := kv3.NewTransaction(true) + _, err = txn3.Get([]byte("key1")) + require.NoError(t, err) + require.NoError(t, kv3.Close()) + require.NoError(t, txn3.Commit()) + + // Restore permissions for cleanup + require.NoError(t, os.Chmod(dir, 0o700)) } func TestLSMOnly(t *testing.T) { diff --git a/errors.go b/errors.go index aa2ebc656..dcf0d12ae 100644 --- a/errors.go +++ b/errors.go @@ -109,6 +109,9 @@ var ( // ErrGCInMemoryMode is returned when db.RunValueLogGC is called in in-memory mode. ErrGCInMemoryMode = stderrors.New("Cannot run value log GC when DB is opened in InMemory mode") + // ErrGCInReadOnlyMode is returned when db.RunValueLogGC is called in read-only mode. + ErrGCInReadOnlyMode = stderrors.New("Cannot run value log GC when DB is opened in ReadOnly mode") + // ErrDBClosed is returned when a get operation is performed after closing the DB. ErrDBClosed = stderrors.New("DB Closed") ) diff --git a/value.go b/value.go index 77a023265..47023144f 100644 --- a/value.go +++ b/value.go @@ -540,6 +540,10 @@ func (vlog *valueLog) init(db *DB) { } vlog.dirPath = vlog.opt.ValueDir + if vlog.opt.ReadOnly { + return + } + vlog.garbageCh = make(chan struct{}, 1) // Only allow one GC at a time. lf, err := InitDiscardStats(vlog.opt) y.Check(err) @@ -571,14 +575,17 @@ func (vlog *valueLog) open(db *DB) error { lf, ok := vlog.filesMap[fid] y.AssertTrue(ok) - // Just open in RDWR mode. This should not create a new log file. lf.opt = vlog.opt - if err := lf.open(vlog.fpath(fid), os.O_RDWR, + flags := os.O_RDWR + if vlog.opt.ReadOnly { + flags = os.O_RDONLY + } + if err := lf.open(vlog.fpath(fid), flags, 2*vlog.opt.ValueLogFileSize); err != nil { return y.Wrapf(err, "Open existing file: %q", lf.path) } // We shouldn't delete the maxFid file. - if lf.size.Load() == vlogHeaderSize && fid != vlog.maxFid { + if lf.size.Load() == vlogHeaderSize && fid != vlog.maxFid && !vlog.opt.ReadOnly { vlog.opt.Infof("Deleting empty file: %s", lf.path) if err := lf.Delete(); err != nil { return y.Wrapf(err, "while trying to delete empty file: %s", lf.path) @@ -1092,7 +1099,7 @@ func (vlog *valueLog) runGC(discardRatio float64) error { } func (vlog *valueLog) updateDiscardStats(stats map[uint32]int64) { - if vlog.opt.InMemory { + if vlog.opt.InMemory || vlog.opt.ReadOnly { return } for fid, discard := range stats {