Skip to content

Commit 8f83faf

Browse files
authored
Merge pull request #155 from albe/repair-torn-writes
Automatically repair torn writes on opening
2 parents c4876f3 + 03da3a7 commit 8f83faf

10 files changed

Lines changed: 126 additions & 30 deletions

File tree

README.md

Lines changed: 12 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -413,7 +413,18 @@ leads to every single document being flushed directly.
413413

414414
#### Consistency
415415

416-
Since the storage is append-only, consistency is automatically guaranteed.
416+
Since the storage is append-only, consistency is automatically guaranteed for all successful writes. Writes that fail in
417+
the middle, e.g. because the machine crashes before the full write buffer is flushed, will lead to a torn write. This is
418+
a partial invalid write. To recover from such a state, the storage will detect torn writes and truncate them when an existing
419+
lock is reclaimed. This can be done by instantiating the store with the following option:
420+
421+
```javascript
422+
const eventstore = new EventStore('my-event-store', { storageConfig: { lock: EventStore.LOCK_RECLAIM } });
423+
```
424+
425+
Note that this option will effectively bypass the lock that prevents multiple instances from being created, so you should
426+
not use this carelessly. Having multiple instances write to the same files will lead to inconsistent data that can not be
427+
easily recovered from.
417428

418429
#### Isolation
419430

src/Consumer.js

Lines changed: 2 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -1,8 +1,7 @@
11
const stream = require('stream');
22
const fs = require('fs');
33
const path = require('path');
4-
const mkdirpSync = require('mkdirp').sync;
5-
const { assert } = require('./util');
4+
const { assert, ensureDirectory } = require('./util');
65

76
const Storage = require('./Storage/ReadableStorage');
87
const MAX_CATCHUP_BATCH = 10;
@@ -59,9 +58,7 @@ class Consumer extends stream.Readable {
5958
this.indexName = indexName;
6059
const consumerDirectory = path.join(this.storage.indexDirectory, 'consumers');
6160
this.fileName = path.join(consumerDirectory, this.storage.storageFile + '.' + indexName + '.' + identifier);
62-
if (!fs.existsSync(consumerDirectory)) {
63-
mkdirpSync(consumerDirectory);
64-
} else {
61+
if (ensureDirectory(consumerDirectory)) {
6562
this.cleanUpFailedWrites();
6663
}
6764
}

src/EventStore.js

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -398,4 +398,6 @@ class EventStore extends events.EventEmitter {
398398

399399
module.exports = EventStore;
400400
module.exports.ExpectedVersion = ExpectedVersion;
401-
module.exports.OptimisticConcurrencyError = OptimisticConcurrencyError;
401+
module.exports.OptimisticConcurrencyError = OptimisticConcurrencyError;
402+
module.exports.LOCK_THROW = Storage.LOCK_THROW;
403+
module.exports.LOCK_RECLAIM = Storage.LOCK_RECLAIM;

src/Index/WritableIndex.js

Lines changed: 2 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,6 @@
11
const fs = require('fs');
2-
const mkdirpSync = require('mkdirp').sync;
32
const ReadableIndex = require('./ReadableIndex');
4-
const { assertEqual, buildMetadataHeader } = require('../util');
3+
const { assertEqual, buildMetadataHeader, ensureDirectory } = require('../util');
54

65
/**
76
* An index is a simple append-only file that stores an ordered list of entry elements pointing to the actual file position
@@ -45,9 +44,7 @@ class WritableIndex extends ReadableIndex {
4544
*/
4645
initialize(options) {
4746
super.initialize(options);
48-
if (!fs.existsSync(options.dataDirectory)) {
49-
mkdirpSync(options.dataDirectory);
50-
}
47+
ensureDirectory(options.dataDirectory);
5148

5249
this.fileMode = 'a+';
5350
this.writeBuffer = Buffer.allocUnsafe(options.writeBufferSize >>> 0); // jshint ignore:line

src/Partition/ReadablePartition.js

Lines changed: 16 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -123,6 +123,21 @@ class ReadablePartition extends events.EventEmitter {
123123
return true;
124124
}
125125

126+
/**
127+
* @returns {number} -1 if the partition is ok and the sequence number of the broken document if a torn write was detected.
128+
*/
129+
checkTornWrite() {
130+
const reader = this.prepareReadBufferBackwards(this.size);
131+
const separator = reader.buffer.toString('ascii', reader.cursor - DOCUMENT_SEPARATOR.length, reader.cursor);
132+
if (separator !== DOCUMENT_SEPARATOR) {
133+
const position = this.findDocumentPositionBefore(this.size);
134+
const reader = this.prepareReadBuffer(position);
135+
const { sequenceNumber } = this.readDocumentHeader(reader.buffer, reader.cursor, position);
136+
return sequenceNumber;
137+
}
138+
return -1;
139+
}
140+
126141
/**
127142
* Read the partition metadata from the file.
128143
*
@@ -348,7 +363,7 @@ class ReadablePartition extends events.EventEmitter {
348363
}
349364
position -= this.readBufferLength;
350365
} while (position > 0);
351-
return position;
366+
return Math.max(0, position);
352367
}
353368

354369
/**

src/Partition/WritablePartition.js

Lines changed: 2 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,6 @@
11
const fs = require('fs');
2-
const mkdirpSync = require('mkdirp').sync;
32
const ReadablePartition = require('./ReadablePartition');
4-
const { assert, buildMetadataHeader, alignTo } = require('../util');
3+
const { assert, buildMetadataHeader, alignTo, ensureDirectory } = require('../util');
54
const Clock = require('../Clock');
65

76
const DEFAULT_WRITE_BUFFER_SIZE = 16 * 1024;
@@ -40,9 +39,7 @@ class WritablePartition extends ReadablePartition {
4039
config.metadata = Object.assign(defaults.metadata, config.metadata);
4140
config = Object.assign(defaults, config);
4241
super(name, config);
43-
if (!fs.existsSync(this.dataDirectory)) {
44-
mkdirpSync(this.dataDirectory);
45-
}
42+
ensureDirectory(this.dataDirectory);
4643
this.fileMode = 'a+';
4744
this.writeBufferSize = config.writeBufferSize >>> 0; // jshint ignore:line
4845
this.maxWriteBufferDocuments = config.maxWriteBufferDocuments >>> 0; // jshint ignore:line

src/Storage/ReadableStorage.js

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -138,6 +138,7 @@ class ReadableStorage extends events.EventEmitter {
138138
for (let file of files) {
139139
if (file.substr(-6) === '.index') continue;
140140
if (file.substr(-7) === '.branch') continue;
141+
if (file.substr(-5) === '.lock') continue;
141142
if (file.substr(0, this.storageFile.length) !== this.storageFile) continue;
142143

143144
const partition = this.createPartition(file, this.partitionConfig);

src/Storage/WritableStorage.js

Lines changed: 40 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -1,13 +1,15 @@
11
const fs = require('fs');
2-
const mkdirpSync = require('mkdirp').sync;
32
const path = require('path');
43
const WritablePartition = require('../Partition/WritablePartition');
54
const WritableIndex = require('../Index/WritableIndex');
65
const ReadableStorage = require('./ReadableStorage');
7-
const { assert, matches, buildMetadataForMatcher, buildMatcherFromMetadata } = require('../util');
6+
const { assert, matches, buildMetadataForMatcher, buildMatcherFromMetadata, ensureDirectory } = require('../util');
87

98
const DEFAULT_WRITE_BUFFER_SIZE = 16 * 1024;
109

10+
const LOCK_RECLAIM = 0x1;
11+
const LOCK_THROW = 0x2;
12+
1113
class StorageLockedError extends Error {}
1214

1315
/**
@@ -37,6 +39,7 @@ class WritableStorage extends ReadableStorage {
3739
* @param {function(object, number): string} [config.partitioner] A function that takes a document and sequence number and returns a partition name that the document should be stored in. Defaults to write all documents to the primary partition.
3840
* @param {object} [config.indexOptions] An options object that should be passed to all indexes on construction.
3941
* @param {string} [config.hmacSecret] A private key that is used to verify matchers retrieved from indexes.
42+
* @param {number} [config.lock] One of LOCK_* constants that defines how an existing lock should be handled.
4043
*/
4144
constructor(storageName = 'storage', config = {}) {
4245
if (typeof storageName !== 'string') {
@@ -53,13 +56,13 @@ class WritableStorage extends ReadableStorage {
5356
};
5457
config = Object.assign(defaults, config);
5558
config.indexOptions = Object.assign({ syncOnFlush: config.syncOnFlush }, config.indexOptions);
56-
if (!fs.existsSync(config.dataDirectory)) {
57-
try {
58-
mkdirpSync(config.dataDirectory);
59-
} catch (e) {
60-
}
61-
}
59+
ensureDirectory(config.dataDirectory);
6260
super(storageName, config);
61+
62+
this.lockFile = path.resolve(this.dataDirectory, this.storageFile + '.lock');
63+
if (config.lock === LOCK_RECLAIM) {
64+
this.unlock();
65+
}
6366
this.partitioner = config.partitioner;
6467
}
6568

@@ -75,6 +78,26 @@ class WritableStorage extends ReadableStorage {
7578
return super.open();
7679
}
7780

81+
/**
82+
* Check all partitions torn writes and truncate the storage to the position before the first torn write.
83+
* This might delete correctly written events in partitions, if their sequence number is higher than the
84+
* torn write in another partition.
85+
*/
86+
checkTornWrites() {
87+
let lastValidSequenceNumber = Number.MAX_SAFE_INTEGER;
88+
this.forEachPartition(partition => {
89+
partition.open();
90+
const tornSequenceNumber = partition.checkTornWrite();
91+
if (tornSequenceNumber >= 0) {
92+
lastValidSequenceNumber = Math.min(lastValidSequenceNumber, tornSequenceNumber);
93+
}
94+
});
95+
if (lastValidSequenceNumber < Number.MAX_SAFE_INTEGER) {
96+
this.truncate(lastValidSequenceNumber);
97+
}
98+
this.forEachPartition(partition => partition.close());
99+
}
100+
78101
/**
79102
* Attempt to lock this storage by means of a lock directory.
80103
* @returns {boolean} True if the lock was created or false if the lock is already in place.
@@ -85,7 +108,6 @@ class WritableStorage extends ReadableStorage {
85108
if (this.locked) {
86109
return false;
87110
}
88-
this.lockFile = path.resolve(this.dataDirectory, this.storageFile + '.lock');
89111
try {
90112
fs.mkdirSync(this.lockFile);
91113
this.locked = true;
@@ -105,7 +127,12 @@ class WritableStorage extends ReadableStorage {
105127
* Current implementation just deletes a lock file that is named like the storage.
106128
*/
107129
unlock() {
108-
fs.rmdirSync(this.lockFile);
130+
if (fs.existsSync(this.lockFile)) {
131+
if (!this.locked) {
132+
this.checkTornWrites();
133+
}
134+
fs.rmdirSync(this.lockFile);
135+
}
109136
this.locked = false;
110137
}
111138

@@ -370,4 +397,6 @@ class WritableStorage extends ReadableStorage {
370397
}
371398

372399
module.exports = WritableStorage;
373-
module.exports.StorageLockedError = StorageLockedError;
400+
module.exports.StorageLockedError = StorageLockedError;
401+
module.exports.LOCK_THROW = LOCK_THROW;
402+
module.exports.LOCK_RECLAIM = LOCK_RECLAIM;

src/util.js

Lines changed: 21 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,6 @@
11
const crypto = require('crypto');
2+
const fs = require('fs');
3+
const mkdirpSync = require('mkdirp').sync;
24

35
/**
46
* Assert that actual and expected match or throw an Error with the given message appended by information about expected and actual value.
@@ -183,6 +185,23 @@ function wrapAndCheck(index, length) {
183185
return index;
184186
}
185187

188+
/**
189+
* Ensure that the given directory exists.
190+
* @param {string} dirName
191+
* @return {boolean} true if the directory existed already
192+
*/
193+
function ensureDirectory(dirName) {
194+
if (!fs.existsSync(dirName)) {
195+
try {
196+
mkdirpSync(dirName);
197+
} catch (e) {
198+
}
199+
return false;
200+
}
201+
return true;
202+
}
203+
204+
186205
module.exports = {
187206
assert,
188207
assertEqual,
@@ -193,5 +212,6 @@ module.exports = {
193212
buildMetadataForMatcher,
194213
buildMatcherFromMetadata,
195214
buildMetadataHeader,
196-
alignTo
215+
alignTo,
216+
ensureDirectory
197217
};

test/EventStore.spec.js

Lines changed: 27 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -60,6 +60,33 @@ describe('EventStore', function() {
6060
fs.readdir = originalReaddir;
6161
});
6262

63+
it('repairs torn writes', function(done) {
64+
eventstore = new EventStore({
65+
storageDirectory
66+
});
67+
68+
const events = [{foo: 'bar'.repeat(500)}];
69+
eventstore.on('ready', () => {
70+
eventstore.commit('foo-bar', events, () => {
71+
// Simulate a torn write (but indexes are still written)
72+
fs.truncateSync(eventstore.storage.getPartition('foo-bar').fileName, 512);
73+
74+
// The previous instance was not closed, so the lock still exists
75+
eventstore = new EventStore({
76+
storageDirectory,
77+
storageConfig: {
78+
lock: EventStore.LOCK_RECLAIM
79+
}
80+
});
81+
eventstore.on('ready', () => {
82+
expect(eventstore.length).to.be(0);
83+
expect(eventstore.getStreamVersion('foo-bar')).to.be(0);
84+
done();
85+
});
86+
});
87+
});
88+
});
89+
6390
it('throws when trying to open non-existing store read-only', function() {
6491
expect(() => new EventStore({
6592
storageDirectory,

0 commit comments

Comments
 (0)