File tree Expand file tree Collapse file tree
Expand file tree Collapse file tree Original file line number Diff line number Diff line change @@ -46,12 +46,21 @@ class Consumer extends stream.Readable {
4646 if ( ! fs . existsSync ( consumerDirectory ) ) {
4747 mkdirpSync ( consumerDirectory ) ;
4848 } else {
49- // Clean up left over from failed writes
50- const files = fs . readdirSync ( consumerDirectory ) ;
51- for ( let file of files ) {
52- if ( file . startsWith ( this . fileName + '.' ) ) {
53- fs . unlinkSync ( file ) ;
54- }
49+ this . cleanUpFailedWrites ( consumerDirectory ) ;
50+ }
51+ }
52+
53+ /**
54+ * Iterate over all files in the directory of this consumer and unlink any file that starts with the filename followed by a dot.
55+ * @private
56+ */
57+ cleanUpFailedWrites ( ) {
58+ const consumerNamePrefix = path . basename ( this . fileName ) + '.' ;
59+ const consumerDirectory = path . dirname ( this . fileName ) ;
60+ const files = fs . readdirSync ( consumerDirectory ) ;
61+ for ( let file of files ) {
62+ if ( file . startsWith ( consumerNamePrefix ) ) {
63+ fs . unlinkSync ( path . join ( consumerDirectory , file ) ) ;
5564 }
5665 }
5766 }
Original file line number Diff line number Diff line change @@ -41,6 +41,14 @@ describe('Consumer', function() {
4141 expect ( fs . existsSync ( dataDirectory + '/consumers' ) ) . to . be ( true ) ;
4242 } ) ;
4343
44+ it ( 'cleans up failed write left-overs' , function ( ) {
45+ consumer = new Consumer ( storage , 'foobar' , 'consumer1' ) ;
46+ consumer . stop ( ) ;
47+ fs . writeFileSync ( consumer . fileName + '.1' , 'failed write!' ) ;
48+ consumer = new Consumer ( storage , 'foobar' , 'consumer1' ) ;
49+ expect ( fs . existsSync ( consumer . fileName + '.1' ) ) . to . be ( false ) ;
50+ } ) ;
51+
4452 it ( 'emits event when catching up' , function ( done ) {
4553 consumer = new Consumer ( storage , 'foobar' , 'consumer1' ) ;
4654 consumer . stop ( ) ;
You can’t perform that action at this time.
0 commit comments