Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 9 additions & 0 deletions src/datastore/LMDBStoreFactory.ts
Original file line number Diff line number Diff line change
Expand Up @@ -235,6 +235,10 @@ export class LMDBStoreFactory implements DataStoreFactory {
if (this.closed) return;

try {
const staleLocks = this.env.readerCheck();
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

how does this work?

Copy link
Copy Markdown
Collaborator Author

@satyakigh satyakigh Apr 28, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

readerCheck clears out stale reader transaction entries from the reader lock table.

LMDB uses a shared lock table to track active read transactions across processes/threads. If a process crashes or exits without properly closing its read transaction, its entry remains in the table as a stale reader. Stale readers are problematic because LMDB can't reclaim database pages that were in use at the time of that reader's snapshot - this causes the database file to grow continuously since old pages can never be freed.

db.readerCheck() scans the reader lock table, detects entries belonging to processes/PIDs that no longer exist, and clears them. It returns the number of stale entries that were removed.

if (staleLocks > 0) {
this.log.info(`Removed ${staleLocks} stale reader locks for LMDB`);
}
const envStat = stats(this.env);
this.telemetry.histogram('version', VersionNumber);
this.telemetry.histogram('env.size.bytes', envStat.totalSize, { unit: 'By' });
Expand Down Expand Up @@ -271,6 +275,11 @@ function createEnv(lmdbDir: string) {
mapSize: TotalMaxDbSize,
encoding: Encoding,
encryptionKey: encryptionStrategy(VersionNumber),
// Forces use of the last safely flushed transaction on open, rather than the last committed
// (but possibly unflushed) one. Prevents corruption when the process is killed mid-flush.
// https://github.com/kriszyp/lmdb-js#readme ("safeRestore")
// https://github.com/kriszyp/lmdb-js/blob/master/open.js#L188 (flag 0x800)
...({ safeRestore: true } as Record<string, unknown>),
};

if (isWindows) {
Expand Down
20 changes: 12 additions & 8 deletions src/datastore/lmdb/LMDBStore.ts
Original file line number Diff line number Diff line change
Expand Up @@ -12,8 +12,8 @@ export class LMDBStore implements DataStore {
constructor(
public readonly name: StoreName,
private store: Database<unknown, string>,
private readonly onError?: ErrorHandler,
private readonly validateDatabase?: () => void,
private readonly onError: ErrorHandler,
private readonly validateDatabase: () => void,
) {
this.telemetry = TelemetryService.instance.get(`LMDB.${name}`);
}
Expand All @@ -27,11 +27,13 @@ export class LMDBStore implements DataStore {
op,
() => {
try {
this.validateDatabase?.();
this.validateDatabase();
return fn();
} catch (e) {
this.onError?.(e);
throw e;
this.onError(e);
this.telemetry.count(`retry.${op}`, 1);
this.validateDatabase();
return fn();
}
},
{ captureErrorAttributes: true },
Expand All @@ -43,11 +45,13 @@ export class LMDBStore implements DataStore {
op,
async () => {
try {
this.validateDatabase?.();
this.validateDatabase();
return await fn();
} catch (e) {
this.onError?.(e);
throw e;
this.onError(e);
this.telemetry.count(`retry.${op}`, 1);
this.validateDatabase();
return await fn();
}
},
{ captureErrorAttributes: true },
Expand Down
143 changes: 143 additions & 0 deletions tst/unit/datastore/LMDB.retry.test.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,143 @@
import { randomUUID as v4 } from 'crypto';
import fs from 'fs';
import { join } from 'path';
import { describe, it, expect, beforeEach, afterEach, vi } from 'vitest';
import { StoreName } from '../../../src/datastore/DataStore';
import { LMDBStoreFactory } from '../../../src/datastore/LMDBStoreFactory';

describe('LMDB retry after recovery', () => {
let testDir: string;
let factory: LMDBStoreFactory;

beforeEach(() => {
testDir = join(process.cwd(), 'node_modules', '.cache', 'lmdb-retry-test', v4());
fs.mkdirSync(testDir, { recursive: true });
factory = new LMDBStoreFactory(testDir);
});

afterEach(async () => {
await factory.close();
});

describe('sync operations retry on transient failure', () => {
it('should return data after transient get() failure triggers recovery and retry', async () => {
const store = factory.get(StoreName.public_schemas);
await store.put('key', 'value');

const lmdbStore = store as any;
const realStore = lmdbStore.store;

realStore.get = () => {
throw new Error('MDB_CORRUPTED: Located page was wrong type');
};

// Recovery replaces the store handle, retry succeeds on the new handle
const result = store.get<string>('key');
expect(result).toBe('value');
});

it('should throw if recovery fails and retry also fails', () => {
const store = factory.get(StoreName.public_schemas);
const lmdbStore = store as any;

// Mock handleError to not actually recover
const originalHandleError = (factory as any).handleError.bind(factory);
(factory as any).handleError = () => {
/* no-op: simulates recovery failure */
};

const realStore = lmdbStore.store;
realStore.get = () => {
throw new Error('MDB_CORRUPTED: permanent');
};

expect(() => store.get<string>('key')).toThrow('MDB_CORRUPTED: permanent');
(factory as any).handleError = originalHandleError;
});

it('should retry keys() after transient failure', async () => {
const store = factory.get(StoreName.public_schemas);
await store.put('k1', 'v1');

const lmdbStore = store as any;
const realStore = lmdbStore.store;
const originalGetKeys = realStore.getKeys.bind(realStore);
let callCount = 0;

realStore.getKeys = (opts: any) => {
callCount++;
if (callCount === 1) throw new Error('MDB_BAD_TXN: Transaction must abort');
return originalGetKeys(opts);
};

const keys = store.keys(10);
expect(keys).toContain('k1');
});
});

describe('async operations retry on transient failure', () => {
it('should succeed after transient put() failure', async () => {
const store = factory.get(StoreName.public_schemas);
const lmdbStore = store as any;
const realStore = lmdbStore.store;

realStore.put = () => {
throw new Error('MDB_PAGE_NOTFOUND: Requested page not found');
};

await store.put('key', 'value');
expect(store.get('key')).toBe('value');
});

it('should throw on async if retry also fails', async () => {
const store = factory.get(StoreName.public_schemas);
const lmdbStore = store as any;

(factory as any).handleError = () => {
/* no-op */
};

const realStore = lmdbStore.store;
realStore.put = () => {
throw new Error('MDB_PANIC: unrecoverable');
};

await expect(store.put('key', 'value')).rejects.toThrow('MDB_PANIC: unrecoverable');
});
});

describe('recovery is called exactly once per failure', () => {
it('should call handleError once on transient failure', async () => {
const store = factory.get(StoreName.public_schemas);
await store.put('key', 'value');

const handleErrorSpy = vi.spyOn(factory as any, 'handleError');
const lmdbStore = store as any;
const realStore = lmdbStore.store;

realStore.get = () => {
throw new Error('MDB_CORRUPTED: test');
};

store.get<string>('key');
expect(handleErrorSpy).toHaveBeenCalledTimes(1);
handleErrorSpy.mockRestore();
});

it('should work normally after a successful retry', async () => {
const store = factory.get(StoreName.public_schemas);
await store.put('key1', 'value1');

const lmdbStore = store as any;
const realStore = lmdbStore.store;

realStore.get = () => {
throw new Error('MDB_BAD_TXN: abort');
};

expect(store.get<string>('key1')).toBe('value1');
await store.put('key2', 'value2');
expect(store.get<string>('key2')).toBe('value2');
});
});
});
Loading