-
Notifications
You must be signed in to change notification settings - Fork 4
Expand file tree
/
Copy pathReadableIndex.js
More file actions
453 lines (401 loc) · 16.7 KB
/
ReadableIndex.js
File metadata and controls
453 lines (401 loc) · 16.7 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
const fs = require('fs');
const path = require('path');
const events = require('events');
const Entry = require('../IndexEntry');
const { assert, wrapAndCheck, binarySearch } = require('../util');
const RingBuffer = require('./RingBuffer');
// node-event-store-index V01
const HEADER_MAGIC = "nesidx01";
class CorruptedIndexError extends Error {}
/**
* Returns a constructor for a CorruptedIndexError with the given size property.
*/
function CorruptedIndexErrorFactory(size) {
return function (...args) {
let error = new CorruptedIndexError(...args);
error.size = size;
return error;
};
}
/**
* An index is a simple append-only file that stores an ordered list of entry elements pointing to the actual file position
* where the matching document is found in the storage file.
* It does not provide a key-value lookup and hence only allows random and range positional reads.
* This is highly optimized for the usage as an index into an event store, where it's only necessary to query sequentially
* within a version (sequence number) range inside a single stream.
* It allows to store additional metadata about the index in the header on creation, which is verified to be unchanged on later
* access.
*
* The index basically functions like a simplified LSM list.
*/
class ReadableIndex extends events.EventEmitter {
/**
* @param {string} [name] The name of the file to use for storing the index.
* @param {object} [options] An object with additional index options.
* @param {typeof EntryInterface} [options.EntryClass] The entry class to use for index items. Must implement the EntryInterface methods.
* @param {string} [options.dataDirectory] The directory to store the index file in. Default '.'.
* @param {number} [options.cacheSize] The number of most-recent index entries to keep in memory. Default 1024.
* @param {number} [options.writeBufferSize] The number of bytes to use for the write buffer. Default 4096.
* @param {number} [options.flushDelay] How many ms to delay the write buffer flush to optimize throughput. Default 100.
* @param {object} [options.metadata] An object containing the metadata information for this index. Will be written on initial creation and checked on subsequent openings.
*/
constructor(name = '.index', options = {}) {
super();
if (typeof name !== 'string') {
options = name;
name = '.index';
}
let defaults = {
dataDirectory: '.',
EntryClass: Entry
};
options = Object.assign(defaults, options);
Entry.assertValidEntryClass(options.EntryClass);
this.name = name;
this.initialize(options);
this.open();
}
/**
* @protected
* @param {object} options
*/
initialize(options) {
const cacheSize = options.cacheSize !== undefined ? options.cacheSize : 1024;
this.cache = new RingBuffer(cacheSize);
this.fd = null;
this.fileMode = 'r';
this.EntryClass = options.EntryClass;
this.dataDirectory = options.dataDirectory;
this.fileName = path.resolve(options.dataDirectory, this.name);
this.readBuffer = Buffer.allocUnsafe(options.EntryClass.size);
if (options.metadata) {
this.metadata = Object.assign({entryClass: options.EntryClass.name, entrySize: options.EntryClass.size}, options.metadata);
}
}
/**
* Return the last entry in the index.
*
* @api
* @returns {Entry|boolean} The last entry in the index or false if the index is empty.
*/
get lastEntry() {
if (this.length > 0) {
return this.get(this.length);
}
return false;
}
/**
* Return the amount of items in the index.
*
* @api
* @returns {number}
*/
get length() {
return this.cache.length;
}
/**
* Check if the index is opened and ready for access.
*
* @api
* @returns {boolean}
*/
isOpen() {
return !!this.fd;
}
/**
* Check if the index file is still intact.
*
* @protected
* @returns {number} The amount of entries in the file. -1 is returned if the index file is empty.
* @throws {Error} If the file is corrupt or can not be read correctly.
*/
checkFile() {
const stat = fs.fstatSync(this.fd);
if (stat.size === 0) {
return -1;
}
stat.size -= this.readMetadata();
assert(stat.size >= 0, 'Invalid index file!');
const length = Math.floor(stat.size / this.EntryClass.size);
assert(stat.size === length * this.EntryClass.size, 'Index file is corrupt!', CorruptedIndexErrorFactory(length));
return length;
}
/**
* Open the index if it is not already open.
* This will open a file handle and either write the metadata if the file is empty or read back the metadata and verify
* it against the metadata provided in the constructor options.
*
* @api
* @returns {boolean} True if the index was opened or false if it was already open.
* @throws {Error} if the file can not be opened.
*/
open() {
if (this.fd) {
return false;
}
this.fd = fs.openSync(this.fileName, this.fileMode);
this.readUntil = -1;
const length = this.readFileLength();
if (length > 0) {
this.cache.truncate(length);
// Read last item to get the index started
this.read(length);
}
return true;
}
/**
* @protected
* @returns {number} The amount of entries in the file.
* @throws {Error} If the file is corrupt or can not be read correctly.
*/
readFileLength() {
let length;
try {
length = this.checkFile();
assert(length >= 0, 'Index file was truncated to empty!');
} catch (e) {
this.close();
throw e;
}
return length;
}
/**
* Verify the metadata block read from the file against the expected metadata and set it.
*
* @private
* @param {string} metadata Stringified metadata read from the file.
* @throws {Error} if metadata is set and the read metadata does not match.
*/
verifyAndSetMetadata(metadata) {
// Verify metadata if it was set in constructor
if (this.metadata && JSON.stringify(this.metadata) !== metadata) {
throw new Error('Index metadata mismatch! ' + metadata);
}
try {
this.metadata = JSON.parse(metadata);
} catch (e) {
throw new Error('Invalid metadata.');
}
}
/**
* Read the index metadata from the file.
*
* @private
* @returns {number} The size of the metadata header.
* @throws {Error} if the file header magic value is invalid.
* @throws {Error} if the metadata size in the header is invalid.
*/
readMetadata() {
const headerBuffer = Buffer.allocUnsafe(8 + 4);
fs.readSync(this.fd, headerBuffer, 0, 8 + 4, 0);
const headerMagic = headerBuffer.toString('utf8', 0, 8);
assert(headerMagic.substr(0, 6) === HEADER_MAGIC.substr(0, 6), 'Invalid file header.');
assert(headerMagic === HEADER_MAGIC, `Invalid file version. The index ${this.fileName} was created with a different library version (${headerMagic.substr(6)}).`);
const metadataSize = headerBuffer.readUInt32BE(8);
assert(metadataSize >= 3, 'Invalid metadata size.');
const metadataBuffer = Buffer.allocUnsafe(metadataSize - 1);
metadataBuffer.fill(" ");
fs.readSync(this.fd, metadataBuffer, 0, metadataSize - 1, 8 + 4);
const metadata = metadataBuffer.toString('utf8').trim();
this.verifyAndSetMetadata(metadata);
this.headerSize = 8 + 4 + metadataSize;
return this.headerSize;
}
/**
* Close the index and release the file handle.
* @api
*/
close() {
this.cache.reset();
this.readUntil = -1;
this.readBuffer.fill(0);
if (this.fd) {
fs.closeSync(this.fd);
this.fd = null;
}
}
/**
* Read a single index entry from the given index position.
* Will prevent reading if the entry has already been read sequentially from the start.
*
* @private
* @param {number} index
* @returns {Entry} The index entry at the given position.
*/
read(index) {
const zeroBasedIndex = Number(index) - 1;
fs.readSync(this.fd, this.readBuffer, 0, this.EntryClass.size, this.headerSize + zeroBasedIndex * this.EntryClass.size);
if (zeroBasedIndex === this.readUntil + 1) {
this.readUntil++;
}
const entry = this.EntryClass.fromBuffer(this.readBuffer);
this.cache.set(zeroBasedIndex, entry);
return entry;
}
/**
* Read a range of entries from disk. This method will not do any range checks.
* It will however optimize to prevent reading entries that have already been read sequentially from start.
*
* Entries within the cache window are stored in the cache; entries outside the window
* (older than cacheSize) are read from disk for the return value but not cached.
*
* @private
* @param {number} from The 1-based index position from where to read from (inclusive).
* @param {number} until The 1-based index position until which to read to (inclusive).
* @returns {Array<Entry>|boolean} An array of the index entries in the given range or false on error.
*/
readRange(from, until) {
if (until === from) {
return [this.read(from)];
}
const zeroBasedFrom = from - 1;
const zeroBasedUntil = until - 1;
const cacheStart = this.cache.windowStart;
// Build the result array up front
const result = new Array(zeroBasedUntil - zeroBasedFrom + 1);
// Part 1: Out-of-window entries [zeroBasedFrom, min(cacheStart-1, zeroBasedUntil)] — read from disk, do not cache
const outEnd = Math.min(cacheStart - 1, zeroBasedUntil);
if (zeroBasedFrom < cacheStart && outEnd >= zeroBasedFrom) {
const count = outEnd - zeroBasedFrom + 1;
const outBuf = Buffer.allocUnsafe(count * this.EntryClass.size);
const bytesRead = fs.readSync(this.fd, outBuf, 0, outBuf.byteLength, this.headerSize + zeroBasedFrom * this.EntryClass.size);
const entries = Math.floor(bytesRead / this.EntryClass.size);
for (let idx = 0; idx < entries; idx++) {
result[idx] = this.EntryClass.fromBuffer(outBuf, idx * this.EntryClass.size);
}
}
// Part 2: In-window entries [max(cacheStart, zeroBasedFrom), zeroBasedUntil] — use cache + disk for uncached ones
const inStart = Math.max(cacheStart, zeroBasedFrom);
if (inStart <= zeroBasedUntil) {
// Optimisation: skip entries already loaded sequentially into the cache
const readFrom = Math.max(this.readUntil + 1, inStart);
// Trim trailing entries already present in the cache
let readUntilPos = zeroBasedUntil;
while (readUntilPos >= readFrom && this.cache.get(readUntilPos)) {
readUntilPos--;
}
if (readFrom <= readUntilPos) {
const count = readUntilPos - readFrom + 1;
const inBuf = Buffer.allocUnsafe(count * this.EntryClass.size);
const bytesRead = fs.readSync(this.fd, inBuf, 0, inBuf.byteLength, this.headerSize + readFrom * this.EntryClass.size);
const entries = Math.floor(bytesRead / this.EntryClass.size);
for (let idx = 0; idx < entries; idx++) {
const index = readFrom + idx;
this.cache.set(index, this.EntryClass.fromBuffer(inBuf, idx * this.EntryClass.size));
}
if (inStart <= this.readUntil + 1) {
this.readUntil = Math.max(this.readUntil, readUntilPos);
}
}
// Fill the result from the cache for the in-window portion
for (let index = inStart; index <= zeroBasedUntil; index++) {
result[index - zeroBasedFrom] = this.cache.get(index);
}
}
return result;
}
/**
* Read all index entries. Equal to range(1, index.length) with the exception that this returns
* an empty array if the index is empty.
*
* @api
* @returns {Array<EntryInterface>} An array of all index entries.
*/
all() {
if (this.length === 0) {
return [];
}
return this.range(1, this.length);
}
/**
* Get a single index entry at given position, checking the boundaries.
*
* @api
* @param {number} index The 1-based index position to get the entry for.
* @returns {Entry|boolean} The entry at the given index position or false if out of bounds.
*/
get(index) {
index = wrapAndCheck(index, this.cache.length);
if (index <= 0) {
return false;
}
const cached = this.cache.get(index - 1);
if (cached) return cached;
return this.read(index);
}
/**
* Check if the given range is within the bounds of the index.
*
* @api
* @param {number} from The 1-based index position from where to get entries from (inclusive).
* @param {number} until The 1-based index position until where to get entries to (inclusive).
* @returns {boolean}
*/
validRange(from, until) {
if (from < 1 || from > this.length) {
return false;
}
return (until >= from && until <= this.length);
}
/**
* Get a range of index entries.
*
* @api
* @param {number} from The 1-based index position from where to get entries from (inclusive). If < 0 will start at that position from end.
* @param {number} [until] The 1-based index position until where to get entries to (inclusive). If < 0 will get until that position from the end. Defaults to this.length.
* @returns {Array<Entry>|boolean} An array of entries for the given range or false on error.
*/
range(from, until = -1) {
from = wrapAndCheck(from, this.cache.length);
until = wrapAndCheck(until, this.cache.length);
if (from <= 0 || until < from) {
return false;
}
const zeroBasedFrom = from - 1;
const zeroBasedUntil = until - 1;
const cacheStart = this.cache.windowStart;
// Determine if any disk reads are required
const hasOutOfWindow = zeroBasedFrom < cacheStart;
const inStart = Math.max(cacheStart, zeroBasedFrom);
const readFrom = Math.max(this.readUntil + 1, inStart);
let needsDiskRead = hasOutOfWindow;
if (!needsDiskRead && inStart <= zeroBasedUntil) {
// Scan backwards for uncached in-window tail entries
let scanUntil = zeroBasedUntil;
while (scanUntil >= readFrom && this.cache.get(scanUntil)) {
scanUntil--;
}
needsDiskRead = readFrom <= scanUntil;
}
if (needsDiskRead) {
return this.readRange(from, until);
}
// All required entries are already in the cache — return a slice directly
return this.cache.slice(zeroBasedFrom, zeroBasedUntil);
}
/**
* Find the given global sequence number inside this index and return the last entry position with a sequence
* number lower than or equal to `number`. This is equal to the `high` value in the binary search.
* If the the parameter `min` is set to true, it will search for the first entry position that is at least equal
* to `number`. This is equal to the `low` value in the binary search.
*
* Complexity: O(log `number`) - because we only need to search up to the `number`-th element maximum.
*
* @api
* @param {number} number The sequence number to search for.
* @param {boolean} [min] If set to true, will return the first entry that has a sequence number greater than or equal to `number`.
* @returns {number} The last index entry position that is lower than or equal to the `number`. Returns 0 if no index matches.
*/
find(number, min = false) {
if (this.length < 1) {
return 0;
}
// We only need to search until the searched number because entry.number is always >= position
const [low, high] = binarySearch(number, Math.min(this.length, number), index => this.get(index).number);
return min ? low : high;
}
}
module.exports = ReadableIndex;
module.exports.Entry = Entry;
module.exports.HEADER_MAGIC = HEADER_MAGIC;
module.exports.CorruptedIndexError = CorruptedIndexError;