Skip to content

Commit 0dc11ce

Browse files
committed
Bound pending job cursor scans
1 parent 7917a77 commit 0dc11ce

2 files changed

Lines changed: 129 additions & 5 deletions

File tree

lib/remote_share/pending_jobs.js

Lines changed: 9 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -73,8 +73,12 @@ function createLmdbStorage(database) {
7373
if (!dbi) return;
7474
const cursor = new database.lmdb.Cursor(txn, dbi);
7575
try {
76-
for (let found = cursor.goToFirst(); found; found = cursor.goToNext()) {
77-
cursor.getCurrentString(callback); // jshint ignore:line
76+
for (let found = cursor.goToFirst(); found !== null; found = cursor.goToNext()) {
77+
let shouldContinue = true;
78+
cursor.getCurrentString(function onCurrentEntry(key, value) { // jshint ignore:line
79+
if (callback(key, value) === false) shouldContinue = false;
80+
});
81+
if (!shouldContinue) break;
7882
}
7983
} finally {
8084
cursor.close();
@@ -111,13 +115,15 @@ function createLmdbStorage(database) {
111115
const txn = database.env.beginTxn({ readOnly: true });
112116
try {
113117
function collectJobs(key, value) {
114-
if (jobsByKey.size >= limit || jobsByKey.has(key)) return;
118+
if (jobsByKey.size >= limit) return false;
119+
if (jobsByKey.has(key)) return true;
115120
try {
116121
const job = JSON.parse(value);
117122
if (job.nextAttemptAt <= timeNow) jobsByKey.set(key, job);
118123
} catch (_error) {
119124
jobsByKey.set(key, { key, type: "invalid", invalid: true });
120125
}
126+
return jobsByKey.size < limit;
121127
}
122128

123129
withCursorEntries(txn, getPendingJobDb(), collectJobs);

tests/remote_share/block_jobs.js

Lines changed: 120 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -161,7 +161,13 @@ function createMapStorage() {
161161
function createPendingJobDatabase() {
162162
const stores = {
163163
blockDB: new Map(),
164-
altblockDB: new Map()
164+
altblockDB: new Map(),
165+
namedDbs: new Map()
166+
};
167+
const dbHandles = new Map();
168+
const lmdbState = {
169+
cursorReads: 0,
170+
cursorCloses: 0
165171
};
166172
const resets = [];
167173

@@ -171,10 +177,64 @@ function createPendingJobDatabase() {
171177
throw new Error("Unknown DB handle");
172178
}
173179

180+
function getStringStore(db) {
181+
if (db && stores.namedDbs.has(db.name)) return stores.namedDbs.get(db.name);
182+
throw new Error("Unknown string DB handle");
183+
}
184+
185+
function sortEntries(entries) {
186+
return Array.from(entries).sort(function byKey(left, right) {
187+
return String(left[0]).localeCompare(String(right[0]));
188+
});
189+
}
190+
191+
class Cursor {
192+
constructor(_txn, db) {
193+
this.entries = sortEntries(getStringStore(db).entries());
194+
this.index = -1;
195+
}
196+
197+
goToFirst() {
198+
this.index = this.entries.length > 0 ? 0 : -1;
199+
return this.index === -1 ? null : this.entries[this.index][0];
200+
}
201+
202+
goToNext() {
203+
if (this.index === -1 || this.index + 1 >= this.entries.length) {
204+
this.index = -1;
205+
return null;
206+
}
207+
this.index += 1;
208+
return this.entries[this.index][0];
209+
}
210+
211+
getCurrentString(callback) {
212+
if (this.index === -1) return null;
213+
const entry = this.entries[this.index];
214+
lmdbState.cursorReads += 1;
215+
callback(entry[0], entry[1]);
216+
return entry[1];
217+
}
218+
219+
close() {
220+
lmdbState.cursorCloses += 1;
221+
}
222+
}
223+
174224
const database = {
175225
blockDB: { name: "blockDB" },
176226
altblockDB: { name: "altblockDB" },
227+
lmdb: { Cursor },
177228
env: {
229+
openDbi(options) {
230+
const name = options.name || "";
231+
if (!stores.namedDbs.has(name)) {
232+
if (options.create === false) throw new Error("DB not found");
233+
stores.namedDbs.set(name, new Map());
234+
dbHandles.set(name, { name, close() {} });
235+
}
236+
return dbHandles.get(name);
237+
},
178238
beginTxn() {
179239
return {
180240
getBinary(db, key) {
@@ -183,6 +243,12 @@ function createPendingJobDatabase() {
183243
putBinary(db, key, value) {
184244
getStore(db).set(key, Buffer.from(value));
185245
},
246+
putString(db, key, value) {
247+
getStringStore(db).set(key, value);
248+
},
249+
del(db, key) {
250+
getStringStore(db).delete(key);
251+
},
186252
abort() {},
187253
commit() {}
188254
};
@@ -204,7 +270,7 @@ function createPendingJobDatabase() {
204270
}
205271
};
206272

207-
return { database, resets, stores };
273+
return { database, resets, stores, lmdbState };
208274
}
209275

210276
test.describe("remote share block jobs", { concurrency: false }, () => {
@@ -611,6 +677,58 @@ test("pending altblock jobs do not orphan wallet reward lookup misses", () => {
611677
}
612678
});
613679

680+
test("lmdb pending job polling stops cursor after due job limit", () => {
681+
const restore = installRemoteShareGlobals();
682+
const { database, stores, lmdbState } = createPendingJobDatabase();
683+
684+
global.coinFuncs = {
685+
getBlockHeaderByHash(_hash, callback) {
686+
callback(null, { reward: 25 });
687+
},
688+
getPoolProfile() {
689+
return { pool: {} };
690+
},
691+
PORT2COIN() {
692+
return "XMR";
693+
},
694+
PORT2COIN_FULL() {
695+
return "XMR";
696+
}
697+
};
698+
699+
const pendingJobs = createPendingJobs({
700+
database,
701+
logger: { log() {} }
702+
});
703+
704+
try {
705+
for (let index = 0; index < 150; index += 1) {
706+
const hash = index.toString(16).padStart(64, "0");
707+
const block = {
708+
hash,
709+
difficulty: 100,
710+
shares: 0,
711+
timestamp: Date.now(),
712+
poolType: PROTOS.POOLTYPE.PPLNS,
713+
unlocked: false,
714+
valid: true
715+
};
716+
pendingJobs.enqueueBlock(1000 + index, PROTOS.Block.encode(block), block);
717+
}
718+
719+
lmdbState.cursorReads = 0;
720+
pendingJobs.processDueJobs();
721+
722+
assert.equal(lmdbState.cursorReads, 100);
723+
assert.equal(lmdbState.cursorCloses, 1);
724+
assert.equal(stores.namedDbs.get("pending_blocks").size, 50);
725+
assert.equal(stores.blockDB.size, 100);
726+
} finally {
727+
pendingJobs.close();
728+
restore();
729+
}
730+
});
731+
614732
test("pending block jobs back off retries and cap the delay", () => {
615733
const restore = installRemoteShareGlobals();
616734
const { database } = createPendingJobDatabase();

0 commit comments

Comments
 (0)