Skip to content

Commit 30fe7b1

Browse files
committed
fix: LMDB stale readers and automatic compactions
closes #460
1 parent a1efd5e commit 30fe7b1

12 files changed

Lines changed: 604 additions & 281 deletions

File tree

Makefile

Lines changed: 10 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -58,9 +58,17 @@ test-setup:
5858
test-rust:
5959
cargo test --workspace --features zlob --exclude fff-nvim
6060

61+
# neovim instance swallows internal crashes and doesn't rise the the error exiting silently
62+
# so check the stdout in case the sigsegv coming out of fff was printed (actual regression)
6163
test-lua: test-setup build
62-
nvim --headless -u tests/minimal_init.lua \
63-
-c "PlenaryBustedDirectory tests/ {minimal_init = 'tests/minimal_init.lua'}" 2>&1
64+
@output=$$(nvim --headless -u tests/minimal_init.lua \
65+
-c "PlenaryBustedDirectory tests/ {minimal_init = 'tests/minimal_init.lua'}" 2>&1); \
66+
echo "$$output"; \
67+
if echo "$$output" | grep -qE "SIG(SEGV|ABRT|BUS|FPE|ILL)"; then \
68+
echo ""; \
69+
echo "FAIL: native crash detected during lua tests"; \
70+
exit 1; \
71+
fi
6472

6573
test-version: test-setup
6674
nvim --headless -u tests/minimal_init.lua \

crates/fff-c/src/lib.rs

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -226,7 +226,6 @@ pub unsafe extern "C" fn fff_create_instance2(
226226
if let Err(e) = shared_frecency.init(tracker) {
227227
return FffResult::err(&format!("Failed to acquire frecency lock: {}", e));
228228
}
229-
let _ = shared_frecency.spawn_gc(frecency_path.clone());
230229
}
231230
Err(e) => return FffResult::err(&format!("Failed to init frecency db: {}", e)),
232231
}

crates/fff-core/src/dbs/db_healthcheck.rs

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -9,23 +9,29 @@ pub struct DbHealth {
99
pub disk_size: u64,
1010
/// Entry counts by table name
1111
pub entry_counts: Vec<(&'static str, u64)>,
12+
/// Set to `false` if can not acquire the write lock
13+
pub healthy: bool,
1214
}
1315

1416
pub trait DbHealthChecker {
1517
fn get_env(&self) -> &heed::Env;
18+
fn is_healthy(&self) -> bool;
19+
/// Entries per database, each group has a static string label
1620
fn count_entries(&self) -> Result<Vec<(&'static str, u64)>>;
1721

22+
/// Health summary of the database, returns summary struct
1823
fn get_health(&self) -> Result<DbHealth> {
1924
let env = self.get_env();
2025

21-
let size = env.real_disk_size().map_err(crate::error::Error::EnvOpen)?;
26+
let size = env.real_disk_size().map_err(crate::error::Error::GenericDbError)?;
2227
let path = env.path().to_string_lossy().to_string();
2328
let entry_counts = self.count_entries()?;
2429

2530
Ok(DbHealth {
2631
path,
2732
disk_size: size,
2833
entry_counts,
34+
healthy: self.is_healthy(),
2935
})
3036
}
3137
}

crates/fff-core/src/dbs/frecency.rs

Lines changed: 115 additions & 107 deletions
Original file line numberDiff line numberDiff line change
@@ -1,13 +1,10 @@
11
use super::db_healthcheck::DbHealthChecker;
2-
use super::lmdb::{LmdbStore, is_map_full};
2+
use super::lmdb::{DbHealth, LmdbStore, is_map_full};
33
use crate::error::{Error, Result};
44
use crate::file_picker::FFFMode;
55
use crate::git::is_modified_status;
6-
use crate::shared::SharedFrecency;
76
use heed::types::{Bytes, SerdeBincode};
87
use heed::{Database, Env};
9-
use std::fs;
10-
use std::path::PathBuf;
118
use std::time::{SystemTime, UNIX_EPOCH};
129
use std::{collections::VecDeque, path::Path};
1310

@@ -24,6 +21,7 @@ const AI_MAX_HISTORY_DAYS: f64 = 7.0; // Only consider accesses within 7 days
2421
pub struct FrecencyTracker {
2522
env: Env,
2623
db: Database<Bytes, SerdeBincode<VecDeque<u64>>>,
24+
health: DbHealth,
2725
}
2826

2927
const MODIFICATION_THRESHOLDS: [(i64, u64); 5] = [
@@ -48,22 +46,53 @@ impl DbHealthChecker for FrecencyTracker {
4846
&self.env
4947
}
5048

49+
fn is_healthy(&self) -> bool {
50+
self.health.is_healthy()
51+
}
52+
5153
fn count_entries(&self) -> Result<Vec<(&'static str, u64)>> {
52-
let rtxn = self.env.read_txn().map_err(Error::DbStartReadTxn)?;
53-
let count = self.db.len(&rtxn).map_err(Error::DbRead)?;
54+
let rtxn = self
55+
.env
56+
.read_txn()
57+
.map_err(|source| Error::DbStartReadTxn {
58+
db: Self::LABEL,
59+
source,
60+
})?;
61+
let count = self.db.len(&rtxn).map_err(|source| Error::DbRead {
62+
db: Self::LABEL,
63+
source,
64+
})?;
5465

5566
Ok(vec![("absolute_frecency_entries", count)])
5667
}
5768
}
5869

5970
impl LmdbStore for FrecencyTracker {
60-
const MAX_DBS: u32 = 0;
71+
const LABEL: &'static str = "frecency";
6172
// 10 MiB hard ceiling. Owner's db after years of use is ~560 KiB, so this
6273
// leaves ~18× headroom while capping runaway growth (see GH issue #437).
6374
const MAP_SIZE: usize = 10 * 1024 * 1024;
75+
const MAX_DBS: u32 = 0;
6476
// Nuke the db when it exceeds 8 MiB on disk — leaves a small margin under
6577
// MAP_SIZE so we don't hit MDB_MAP_FULL before the open-time erase fires.
66-
const SIZE_CAP_BYTES: u64 = 8 * 1024 * 1024;
78+
const SIZE_CAP_BYTES: u64 = 12 * 1024 * 1024;
79+
80+
fn env(&self) -> &Env {
81+
&self.env
82+
}
83+
84+
fn health(&self) -> &DbHealth {
85+
&self.health
86+
}
87+
88+
fn purge_stale_data(env: &Env) -> Result<()> {
89+
let (deleted, pruned) = Self::purge_stale_entries(env)?;
90+
if deleted > 0 || pruned > 0 {
91+
tracing::info!(deleted, pruned, "Frecency GC purged entries");
92+
}
93+
94+
Ok(())
95+
}
6796
}
6897

6998
impl FrecencyTracker {
@@ -74,11 +103,10 @@ impl FrecencyTracker {
74103

75104
pub fn open(db_path: impl AsRef<Path>) -> Result<Self> {
76105
let db_path = db_path.as_ref();
77-
let env = Self::open_env(db_path)?;
106+
let (env, health) = Self::open_env(db_path)?;
78107

79108
let db = Self::open_database_safe(&env, None)?;
80-
81-
Ok(FrecencyTracker { db, env })
109+
Ok(FrecencyTracker { db, env, health })
82110
}
83111

84112
#[deprecated(
@@ -90,102 +118,41 @@ impl FrecencyTracker {
90118
Self::open(db_path)
91119
}
92120

93-
/// Spawns a background thread to purge stale frecency entries and compact the database.
94-
/// Run it once in a while to purge old pages and keep DB file size reasonable.
95-
///
96-
/// It's okay to not join this thread since it acquires locks for the db access
97-
///
98-
/// ```
99-
/// use fff_search::frecency::FrecencyTracker;
100-
/// use fff_search::SharedFrecency;
101-
/// let shared_frecency: SharedFrecency = Default::default();
102-
/// let _ = FrecencyTracker::spawn_gc(shared_frecency, "/path/to/frecency_db".into()).ok();
103-
/// ```
104-
pub fn spawn_gc(
105-
shared: SharedFrecency,
106-
db_path: String,
107-
) -> Result<std::thread::JoinHandle<()>> {
108-
Ok(std::thread::Builder::new()
109-
.name("fff-frecency-gc".into())
110-
.spawn(move || Self::run_frecency_gc(shared, db_path))?)
111-
}
112-
113-
#[tracing::instrument(skip(shared), fields(db_path = %db_path))]
114-
fn run_frecency_gc(shared: SharedFrecency, db_path: String) {
115-
let start = std::time::Instant::now();
116-
117-
let (deleted, pruned) = {
118-
let guard = match shared.read() {
119-
Ok(g) => g,
120-
Err(e) => {
121-
tracing::debug!("Failed to acquire read lock: {e}");
122-
return;
123-
}
124-
};
125-
let Some(ref tracker) = *guard else {
126-
return;
127-
};
128-
129-
// Clear stale readers here (on a background thread) rather than in
130-
// open_env — clear_stale_readers needs the writer mutex which can
131-
// block indefinitely on a stuck lock if called on the main thread.
132-
if let Err(e) = tracker.env.clear_stale_readers() {
133-
tracing::debug!("clear_stale_readers failed: {e}");
134-
}
135-
136-
match tracker.purge_stale_entries() {
137-
Ok(result) => result,
138-
Err(e) => {
139-
tracing::debug!("Purge failed: {e}");
140-
return;
141-
}
142-
}
143-
};
144-
145-
if deleted > 0 || pruned > 0 {
146-
tracing::info!(deleted, pruned, elapsed = ?start.elapsed(), "Frecency GC purged entries");
147-
}
148-
149-
let data_path = PathBuf::from(&db_path).join("data.mdb");
150-
let file_size = fs::metadata(&data_path).map(|m| m.len()).unwrap_or(0);
151-
if file_size > <Self as LmdbStore>::SIZE_CAP_BYTES {
152-
tracing::warn!(
153-
size = file_size,
154-
cap = <Self as LmdbStore>::SIZE_CAP_BYTES,
155-
"Frecency DB exceeds size cap — will be erased on next open"
156-
);
157-
}
158-
}
159-
160121
/// Removes entries where all timestamps are older than MAX_HISTORY_DAYS,
161122
/// and prunes stale timestamps from entries that still have recent ones.
162123
/// Returns (deleted_count, pruned_count).
163-
fn purge_stale_entries(&self) -> Result<(usize, usize)> {
164-
let now = self.get_now();
124+
fn purge_stale_entries(env: &Env) -> Result<(usize, usize)> {
125+
let now = SystemTime::now()
126+
.duration_since(UNIX_EPOCH)
127+
.unwrap()
128+
.as_secs();
165129
let cutoff_time = now.saturating_sub((MAX_HISTORY_DAYS * SECONDS_PER_DAY) as u64);
166130

167-
// Collect entries to delete or update
168-
let rtxn = self.env.read_txn().map_err(Error::DbStartReadTxn)?;
131+
let db: Database<Bytes, SerdeBincode<VecDeque<u64>>> = Self::open_database_safe(env, None)?;
132+
133+
let rtxn = env.read_txn().map_err(|source| Error::DbStartReadTxn {
134+
db: Self::LABEL,
135+
source,
136+
})?;
169137
let mut to_delete: Vec<Vec<u8>> = Vec::new();
170138
let mut to_update: Vec<(Vec<u8>, VecDeque<u64>)> = Vec::new();
171139

172-
let iter = self.db.iter(&rtxn).map_err(Error::DbRead)?;
140+
let iter = db.iter(&rtxn).map_err(|source| Error::DbRead {
141+
db: Self::LABEL,
142+
source,
143+
})?;
173144
for result in iter {
174-
let (key, accesses) = result.map_err(Error::DbRead)?;
145+
let (key, accesses) = result.map_err(|source| Error::DbRead {
146+
db: Self::LABEL,
147+
source,
148+
})?;
175149

176-
// Timestamps are chronologically ordered (oldest at front).
177-
// Find the first timestamp that is still within the retention window.
150+
// Timestamps chronologically ordered (oldest at front).
178151
let fresh_start = accesses.iter().position(|&ts| ts >= cutoff_time);
179152
match fresh_start {
180-
None => {
181-
// All timestamps are stale — delete the entire entry
182-
to_delete.push(key.to_vec());
183-
}
184-
Some(0) => {
185-
// All timestamps are fresh — nothing to do
186-
}
153+
None => to_delete.push(key.to_vec()),
154+
Some(0) => {}
187155
Some(start) => {
188-
// Some timestamps are stale — keep only the fresh ones
189156
let pruned: VecDeque<u64> = accesses.iter().skip(start).copied().collect();
190157
to_update.push((key.to_vec(), pruned));
191158
}
@@ -197,27 +164,54 @@ impl FrecencyTracker {
197164
return Ok((0, 0));
198165
}
199166

200-
// Apply all changes in a single write transaction
201-
let mut wtxn = self.env.write_txn().map_err(Error::DbStartWriteTxn)?;
167+
let mut wtxn = env.write_txn().map_err(|source| Error::DbStartWriteTxn {
168+
db: Self::LABEL,
169+
source,
170+
})?;
171+
202172
for key in &to_delete {
203-
self.db.delete(&mut wtxn, key).map_err(Error::DbWrite)?;
173+
db.delete(&mut wtxn, key).map_err(|source| Error::DbWrite {
174+
db: Self::LABEL,
175+
source,
176+
})?;
204177
}
178+
205179
for (key, accesses) in &to_update {
206-
self.db
207-
.put(&mut wtxn, key, accesses)
208-
.map_err(Error::DbWrite)?;
180+
db.put(&mut wtxn, key, accesses)
181+
.map_err(|source| Error::DbWrite {
182+
db: Self::LABEL,
183+
source,
184+
})?;
209185
}
210-
wtxn.commit().map_err(Error::DbCommit)?;
186+
wtxn.commit().map_err(|source| Error::DbCommit {
187+
db: Self::LABEL,
188+
source,
189+
})?;
211190

212191
Ok((to_delete.len(), to_update.len()))
213192
}
214193

215194
fn get_accesses(&self, path: &Path) -> Result<Option<VecDeque<u64>>> {
216195
let key_hash = Self::path_to_hash_bytes(path)?;
217196

218-
let rtxn = self.env.read_txn().map_err(Error::DbStartReadTxn)?;
219-
let result = self.db.get(&rtxn, &key_hash).map_err(Error::DbRead)?;
220-
rtxn.commit().map_err(Error::DbCommit)?;
197+
let rtxn = self
198+
.env
199+
.read_txn()
200+
.map_err(|source| Error::DbStartReadTxn {
201+
db: Self::LABEL,
202+
source,
203+
})?;
204+
let result = self
205+
.db
206+
.get(&rtxn, &key_hash)
207+
.map_err(|source| Error::DbRead {
208+
db: Self::LABEL,
209+
source,
210+
})?;
211+
rtxn.commit().map_err(|source| Error::DbCommit {
212+
db: Self::LABEL,
213+
source,
214+
})?;
221215

222216
Ok(result)
223217
}
@@ -265,29 +259,43 @@ impl FrecencyTracker {
265259
accesses.push_back(now);
266260
tracing::debug!(?path, accesses = accesses.len(), "Tracking access");
267261

268-
let mut wtxn = self.env.write_txn().map_err(Error::DbStartWriteTxn)?;
262+
let mut wtxn = self
263+
.env
264+
.write_txn()
265+
.map_err(|source| Error::DbStartWriteTxn {
266+
db: Self::LABEL,
267+
source,
268+
})?;
269269
if let Err(e) = self.db.put(&mut wtxn, &key_hash, &accesses) {
270270
if is_map_full(&e) {
271+
self.health.mark_unhealthy("MDB_MAP_FULL on put");
271272
tracing::error!(
272273
?path,
273274
"Frecency DB hit MDB_MAP_FULL; dropping write — db will be \
274275
erased on next open via LmdbStore::erase_if_oversized"
275276
);
276277
return Ok(());
277278
}
278-
return Err(Error::DbWrite(e));
279+
return Err(Error::DbWrite {
280+
db: Self::LABEL,
281+
source: e,
282+
});
279283
}
280284

281285
wtxn.commit()
282286
.inspect_err(|e| {
283287
if is_map_full(e) {
288+
self.health.mark_unhealthy("MDB_MAP_FULL on commit");
284289
tracing::error!(
285290
?path,
286291
"Frecency DB hit MDB_MAP_FULL on commit; dropping write"
287292
);
288293
}
289294
})
290-
.map_err(Error::DbCommit)
295+
.map_err(|source| Error::DbCommit {
296+
db: Self::LABEL,
297+
source,
298+
})
291299
}
292300

293301
pub fn get_access_score(&self, file_path: &Path, mode: FFFMode) -> i64 {

0 commit comments

Comments
 (0)