Skip to content

Commit 16bdb1b

Browse files
authored
Merge pull request #114 from albe/atomic-consumer
Update consumer state file atomically
2 parents 8d0a7c1 + 54b3eee commit 16bdb1b

2 files changed

Lines changed: 40 additions & 3 deletions

File tree

src/Consumer.js

Lines changed: 32 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -42,10 +42,27 @@ class Consumer extends stream.Readable {
4242
this.index = this.storage.openIndex(indexName);
4343
this.indexName = indexName;
4444
const consumerDirectory = path.join(this.storage.indexDirectory, 'consumers');
45+
this.fileName = path.join(consumerDirectory, this.storage.storageFile + '.' + indexName + '.' + identifier);
4546
if (!fs.existsSync(consumerDirectory)) {
4647
mkdirpSync(consumerDirectory);
48+
} else {
49+
this.cleanUpFailedWrites(consumerDirectory);
50+
}
51+
}
52+
53+
/**
54+
* Iterate over all files in the directory of this consumer and unlink any file that starts with the filename followed by a dot.
55+
* @private
56+
*/
57+
cleanUpFailedWrites() {
58+
const consumerNamePrefix = path.basename(this.fileName) + '.';
59+
const consumerDirectory = path.dirname(this.fileName);
60+
const files = fs.readdirSync(consumerDirectory);
61+
for (let file of files) {
62+
if (file.startsWith(consumerNamePrefix)) {
63+
fs.unlinkSync(path.join(consumerDirectory, file));
64+
}
4765
}
48-
this.fileName = path.join(consumerDirectory, this.storage.storageFile + '.' + indexName + '.' + identifier);
4966
}
5067

5168
/**
@@ -124,9 +141,21 @@ class Consumer extends stream.Readable {
124141
const consumerData = Buffer.allocUnsafe(4 + consumerState.length);
125142
consumerData.writeInt32LE(this.position, 0);
126143
consumerData.write(consumerState, 4, consumerState.length, 'utf-8');
127-
fs.writeFileSync(this.fileName, consumerData);
144+
var tmpFile = this.fileName + '.' + this.position;
128145
this.persisting = null;
129-
this.emit('persisted');
146+
/* istanbul ignore if */
147+
if (fs.existsSync(tmpFile)) {
148+
throw new Error(`Trying to update consumer ${this.name} concurrently. Keep each single consumer within a single process.`);
149+
}
150+
try {
151+
fs.writeFileSync(tmpFile, consumerData);
152+
// If the write fails (half-way), the consumer state file will not be corrupted
153+
fs.renameSync(tmpFile, this.fileName);
154+
this.emit('persisted');
155+
} catch (e) {
156+
/* istanbul ignore next */
157+
fs.unlinkSync(tmpFile);
158+
}
130159
});
131160
}
132161

test/Consumer.spec.js

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -41,6 +41,14 @@ describe('Consumer', function() {
4141
expect(fs.existsSync(dataDirectory + '/consumers')).to.be(true);
4242
});
4343

44+
it('cleans up failed write left-overs', function() {
45+
consumer = new Consumer(storage, 'foobar', 'consumer1');
46+
consumer.stop();
47+
fs.writeFileSync(consumer.fileName + '.1', 'failed write!');
48+
consumer = new Consumer(storage, 'foobar', 'consumer1');
49+
expect(fs.existsSync(consumer.fileName + '.1')).to.be(false);
50+
});
51+
4452
it('emits event when catching up', function(done){
4553
consumer = new Consumer(storage, 'foobar', 'consumer1');
4654
consumer.stop();

0 commit comments

Comments
 (0)