Skip to content

Commit 7a27b32

Browse files
committed
Update consumer state file atomically
1 parent 8d0a7c1 commit 7a27b32

1 file changed

Lines changed: 21 additions & 3 deletions

File tree

src/Consumer.js

Lines changed: 21 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -42,10 +42,18 @@ class Consumer extends stream.Readable {
4242
this.index = this.storage.openIndex(indexName);
4343
this.indexName = indexName;
4444
const consumerDirectory = path.join(this.storage.indexDirectory, 'consumers');
45+
this.fileName = path.join(consumerDirectory, this.storage.storageFile + '.' + indexName + '.' + identifier);
4546
if (!fs.existsSync(consumerDirectory)) {
4647
mkdirpSync(consumerDirectory);
48+
} else {
49+
// Clean up left over from failed writes
50+
const files = fs.readdirSync(consumerDirectory);
51+
for (let file of files) {
52+
if (file.startsWith(this.fileName + '.')) {
53+
fs.unlinkSync(file);
54+
}
55+
}
4756
}
48-
this.fileName = path.join(consumerDirectory, this.storage.storageFile + '.' + indexName + '.' + identifier);
4957
}
5058

5159
/**
@@ -124,9 +132,19 @@ class Consumer extends stream.Readable {
124132
const consumerData = Buffer.allocUnsafe(4 + consumerState.length);
125133
consumerData.writeInt32LE(this.position, 0);
126134
consumerData.write(consumerState, 4, consumerState.length, 'utf-8');
127-
fs.writeFileSync(this.fileName, consumerData);
135+
var tmpFile = this.fileName + '.' + this.position;
128136
this.persisting = null;
129-
this.emit('persisted');
137+
if (fs.existsSync(tmpFile)) {
138+
throw new Error(`Trying to update consumer ${this.name} concurrently. Keep each single consumer within a single process.`);
139+
}
140+
try {
141+
fs.writeFileSync(tmpFile, consumerData);
142+
// If the write fails (half-way), the consumer state file will not be corrupted
143+
fs.renameSync(tmpFile, this.fileName);
144+
this.emit('persisted');
145+
} catch (e) {
146+
fs.unlinkSync(tmpFile);
147+
}
130148
});
131149
}
132150

0 commit comments

Comments
 (0)