diff --git a/src/datastore/LMDBStoreFactory.ts b/src/datastore/LMDBStoreFactory.ts index 41c07999..1b5033d2 100644 --- a/src/datastore/LMDBStoreFactory.ts +++ b/src/datastore/LMDBStoreFactory.ts @@ -235,6 +235,10 @@ export class LMDBStoreFactory implements DataStoreFactory { if (this.closed) return; try { + const staleLocks = this.env.readerCheck(); + if (staleLocks > 0) { + this.log.info(`Removed ${staleLocks} stale reader locks for LMDB`); + } const envStat = stats(this.env); this.telemetry.histogram('version', VersionNumber); this.telemetry.histogram('env.size.bytes', envStat.totalSize, { unit: 'By' }); @@ -271,6 +275,11 @@ function createEnv(lmdbDir: string) { mapSize: TotalMaxDbSize, encoding: Encoding, encryptionKey: encryptionStrategy(VersionNumber), + // Forces use of the last safely flushed transaction on open, rather than the last committed + // (but possibly unflushed) one. Prevents corruption when the process is killed mid-flush. + // https://github.com/kriszyp/lmdb-js#readme ("safeRestore") + // https://github.com/kriszyp/lmdb-js/blob/master/open.js#L188 (flag 0x800) + ...({ safeRestore: true } as Record), }; if (isWindows) { diff --git a/src/datastore/lmdb/LMDBStore.ts b/src/datastore/lmdb/LMDBStore.ts index ead3c983..55160c26 100644 --- a/src/datastore/lmdb/LMDBStore.ts +++ b/src/datastore/lmdb/LMDBStore.ts @@ -12,8 +12,8 @@ export class LMDBStore implements DataStore { constructor( public readonly name: StoreName, private store: Database, - private readonly onError?: ErrorHandler, - private readonly validateDatabase?: () => void, + private readonly onError: ErrorHandler, + private readonly validateDatabase: () => void, ) { this.telemetry = TelemetryService.instance.get(`LMDB.${name}`); } @@ -27,11 +27,13 @@ export class LMDBStore implements DataStore { op, () => { try { - this.validateDatabase?.(); + this.validateDatabase(); return fn(); } catch (e) { - this.onError?.(e); - throw e; + this.onError(e); + this.telemetry.count(`retry.${op}`, 1); + this.validateDatabase(); + return fn(); } }, { captureErrorAttributes: true }, @@ -43,11 +45,13 @@ export class LMDBStore implements DataStore { op, async () => { try { - this.validateDatabase?.(); + this.validateDatabase(); return await fn(); } catch (e) { - this.onError?.(e); - throw e; + this.onError(e); + this.telemetry.count(`retry.${op}`, 1); + this.validateDatabase(); + return await fn(); } }, { captureErrorAttributes: true }, diff --git a/tst/unit/datastore/LMDB.retry.test.ts b/tst/unit/datastore/LMDB.retry.test.ts new file mode 100644 index 00000000..c1185e15 --- /dev/null +++ b/tst/unit/datastore/LMDB.retry.test.ts @@ -0,0 +1,143 @@ +import { randomUUID as v4 } from 'crypto'; +import fs from 'fs'; +import { join } from 'path'; +import { describe, it, expect, beforeEach, afterEach, vi } from 'vitest'; +import { StoreName } from '../../../src/datastore/DataStore'; +import { LMDBStoreFactory } from '../../../src/datastore/LMDBStoreFactory'; + +describe('LMDB retry after recovery', () => { + let testDir: string; + let factory: LMDBStoreFactory; + + beforeEach(() => { + testDir = join(process.cwd(), 'node_modules', '.cache', 'lmdb-retry-test', v4()); + fs.mkdirSync(testDir, { recursive: true }); + factory = new LMDBStoreFactory(testDir); + }); + + afterEach(async () => { + await factory.close(); + }); + + describe('sync operations retry on transient failure', () => { + it('should return data after transient get() failure triggers recovery and retry', async () => { + const store = factory.get(StoreName.public_schemas); + await store.put('key', 'value'); + + const lmdbStore = store as any; + const realStore = lmdbStore.store; + + realStore.get = () => { + throw new Error('MDB_CORRUPTED: Located page was wrong type'); + }; + + // Recovery replaces the store handle, retry succeeds on the new handle + const result = store.get('key'); + expect(result).toBe('value'); + }); + + it('should throw if recovery fails and retry also fails', () => { + const store = factory.get(StoreName.public_schemas); + const lmdbStore = store as any; + + // Mock handleError to not actually recover + const originalHandleError = (factory as any).handleError.bind(factory); + (factory as any).handleError = () => { + /* no-op: simulates recovery failure */ + }; + + const realStore = lmdbStore.store; + realStore.get = () => { + throw new Error('MDB_CORRUPTED: permanent'); + }; + + expect(() => store.get('key')).toThrow('MDB_CORRUPTED: permanent'); + (factory as any).handleError = originalHandleError; + }); + + it('should retry keys() after transient failure', async () => { + const store = factory.get(StoreName.public_schemas); + await store.put('k1', 'v1'); + + const lmdbStore = store as any; + const realStore = lmdbStore.store; + const originalGetKeys = realStore.getKeys.bind(realStore); + let callCount = 0; + + realStore.getKeys = (opts: any) => { + callCount++; + if (callCount === 1) throw new Error('MDB_BAD_TXN: Transaction must abort'); + return originalGetKeys(opts); + }; + + const keys = store.keys(10); + expect(keys).toContain('k1'); + }); + }); + + describe('async operations retry on transient failure', () => { + it('should succeed after transient put() failure', async () => { + const store = factory.get(StoreName.public_schemas); + const lmdbStore = store as any; + const realStore = lmdbStore.store; + + realStore.put = () => { + throw new Error('MDB_PAGE_NOTFOUND: Requested page not found'); + }; + + await store.put('key', 'value'); + expect(store.get('key')).toBe('value'); + }); + + it('should throw on async if retry also fails', async () => { + const store = factory.get(StoreName.public_schemas); + const lmdbStore = store as any; + + (factory as any).handleError = () => { + /* no-op */ + }; + + const realStore = lmdbStore.store; + realStore.put = () => { + throw new Error('MDB_PANIC: unrecoverable'); + }; + + await expect(store.put('key', 'value')).rejects.toThrow('MDB_PANIC: unrecoverable'); + }); + }); + + describe('recovery is called exactly once per failure', () => { + it('should call handleError once on transient failure', async () => { + const store = factory.get(StoreName.public_schemas); + await store.put('key', 'value'); + + const handleErrorSpy = vi.spyOn(factory as any, 'handleError'); + const lmdbStore = store as any; + const realStore = lmdbStore.store; + + realStore.get = () => { + throw new Error('MDB_CORRUPTED: test'); + }; + + store.get('key'); + expect(handleErrorSpy).toHaveBeenCalledTimes(1); + handleErrorSpy.mockRestore(); + }); + + it('should work normally after a successful retry', async () => { + const store = factory.get(StoreName.public_schemas); + await store.put('key1', 'value1'); + + const lmdbStore = store as any; + const realStore = lmdbStore.store; + + realStore.get = () => { + throw new Error('MDB_BAD_TXN: abort'); + }; + + expect(store.get('key1')).toBe('value1'); + await store.put('key2', 'value2'); + expect(store.get('key2')).toBe('value2'); + }); + }); +});