Skip to content

Commit b058ae2

Browse files
committed
Add lock option to configure how an existing lock should be handled
1 parent 1a970f8 commit b058ae2

4 files changed

Lines changed: 59 additions & 7 deletions

File tree

README.md

Lines changed: 12 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -413,7 +413,18 @@ leads to every single document being flushed directly.
413413

414414
#### Consistency
415415

416-
Since the storage is append-only, consistency is automatically guaranteed.
416+
Since the storage is append-only, consistency is automatically guaranteed for all successful writes. Writes that fail in
417+
the middle, e.g. because the machine crashes before the full write buffer is flushed, will lead to a torn write. This is
418+
a partial invalid write. To recover from such a state, the storage will detect torn writes and truncate them when an existing
419+
lock is reclaimed. This can be done by instantiating the store with the following option:
420+
421+
```javascript
422+
const eventstore = new EventStore('my-event-store', { storageConfig: { lock: EventStore.LOCK_RECLAIM } });
423+
```
424+
425+
Note that this option will effectively bypass the lock that prevents multiple instances from being created, so you should
426+
not use this carelessly. Having multiple instances write to the same files will lead to inconsistent data that can not be
427+
easily recovered from.
417428

418429
#### Isolation
419430

src/EventStore.js

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -397,4 +397,6 @@ class EventStore extends events.EventEmitter {
397397

398398
module.exports = EventStore;
399399
module.exports.ExpectedVersion = ExpectedVersion;
400-
module.exports.OptimisticConcurrencyError = OptimisticConcurrencyError;
400+
module.exports.OptimisticConcurrencyError = OptimisticConcurrencyError;
401+
module.exports.LOCK_THROW = Storage.LOCK_THROW;
402+
module.exports.LOCK_RECLAIM = Storage.LOCK_RECLAIM;

src/Storage/WritableStorage.js

Lines changed: 17 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,9 @@ const { assert, matches, buildMetadataForMatcher, buildMatcherFromMetadata } = r
88

99
const DEFAULT_WRITE_BUFFER_SIZE = 16 * 1024;
1010

11+
const LOCK_RECLAIM = 0x1;
12+
const LOCK_THROW = 0x2;
13+
1114
class StorageLockedError extends Error {}
1215

1316
/**
@@ -37,6 +40,7 @@ class WritableStorage extends ReadableStorage {
3740
* @param {function(object, number): string} [config.partitioner] A function that takes a document and sequence number and returns a partition name that the document should be stored in. Defaults to write all documents to the primary partition.
3841
* @param {object} [config.indexOptions] An options object that should be passed to all indexes on construction.
3942
* @param {string} [config.hmacSecret] A private key that is used to verify matchers retrieved from indexes.
43+
* @param {number} [config.lock] One of LOCK_* constants that defines how an existing lock should be handled.
4044
*/
4145
constructor(storageName = 'storage', config = {}) {
4246
if (typeof storageName !== 'string') {
@@ -60,6 +64,11 @@ class WritableStorage extends ReadableStorage {
6064
}
6165
}
6266
super(storageName, config);
67+
68+
this.lockFile = path.resolve(this.dataDirectory, this.storageFile + '.lock');
69+
if (config.lock === LOCK_RECLAIM) {
70+
this.unlock();
71+
}
6372
this.partitioner = config.partitioner;
6473
}
6574

@@ -105,7 +114,6 @@ class WritableStorage extends ReadableStorage {
105114
if (this.locked) {
106115
return false;
107116
}
108-
this.lockFile = path.resolve(this.dataDirectory, this.storageFile + '.lock');
109117
try {
110118
fs.mkdirSync(this.lockFile);
111119
this.locked = true;
@@ -125,10 +133,12 @@ class WritableStorage extends ReadableStorage {
125133
* Current implementation just deletes a lock file that is named like the storage.
126134
*/
127135
unlock() {
128-
if (!this.locked && fs.existsSync(this.lockFile)) {
129-
this.checkTornWrites();
136+
if (fs.existsSync(this.lockFile)) {
137+
if (!this.locked) {
138+
this.checkTornWrites();
139+
}
140+
fs.rmdirSync(this.lockFile);
130141
}
131-
fs.rmdirSync(this.lockFile);
132142
this.locked = false;
133143
}
134144

@@ -393,4 +403,6 @@ class WritableStorage extends ReadableStorage {
393403
}
394404

395405
module.exports = WritableStorage;
396-
module.exports.StorageLockedError = StorageLockedError;
406+
module.exports.StorageLockedError = StorageLockedError;
407+
module.exports.LOCK_THROW = LOCK_THROW;
408+
module.exports.LOCK_RECLAIM = LOCK_RECLAIM;

test/EventStore.spec.js

Lines changed: 27 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -60,6 +60,33 @@ describe('EventStore', function() {
6060
fs.readdir = originalReaddir;
6161
});
6262

63+
it('repairs torn writes', function(done) {
64+
eventstore = new EventStore({
65+
storageDirectory
66+
});
67+
68+
const events = [{foo: 'bar'.repeat(500)}];
69+
eventstore.on('ready', () => {
70+
eventstore.commit('foo-bar', events, () => {
71+
// Simulate a torn write (but indexes are still written)
72+
fs.truncateSync(eventstore.storage.getPartition('foo-bar').fileName, 512);
73+
74+
// The previous instance was not closed, so the lock still exists
75+
eventstore = new EventStore({
76+
storageDirectory,
77+
storageConfig: {
78+
lock: EventStore.LOCK_RECLAIM
79+
}
80+
});
81+
eventstore.on('ready', () => {
82+
expect(eventstore.length).to.be(0);
83+
expect(eventstore.getStreamVersion('foo-bar')).to.be(0);
84+
done();
85+
});
86+
});
87+
});
88+
});
89+
6390
it('throws when trying to open non-existing store read-only', function() {
6491
expect(() => new EventStore({
6592
storageDirectory,

0 commit comments

Comments
 (0)