@@ -2,114 +2,115 @@ const Duplex = require('stream').Duplex
22const Writable = require ( 'stream' ) . Writable
33const util = require ( 'util' )
44
5- const CopyStream = ( module . exports = function ( pq , options ) {
6- Duplex . call ( this , options )
7- this . pq = pq
8- this . _reading = false
9- } )
105
11- util . inherits ( CopyStream , Duplex )
12-
13- // writer methods
14- CopyStream . prototype . _write = function ( chunk , encoding , cb ) {
15- const result = this . pq . putCopyData ( chunk )
16-
17- // sent successfully
18- if ( result === 1 ) return cb ( )
6+ class CopyStream extends Duplex {
7+ constructor ( pq , options ) {
8+ super ( options ) ;
9+ this . pq = pq ;
10+ this . _reading = false ;
11+ }
1912
20- // error
21- if ( result === - 1 ) return cb ( new Error ( this . pq . errorMessage ( ) ) )
2213
23- // command would block. wait for writable and call again.
24- const self = this
25- this . pq . writable ( function ( ) {
26- self . _write ( chunk , encoding , cb )
27- } )
28- }
14+ // writer methods
15+ _write ( chunk , encoding , cb ) {
16+ const result = this . pq . putCopyData ( chunk )
2917
30- CopyStream . prototype . end = function ( ) {
31- const args = Array . prototype . slice . call ( arguments , 0 )
32- const self = this
18+ // sent successfully
19+ if ( result === 1 ) return cb ( )
3320
34- const callback = args . pop ( )
21+ // error
22+ if ( result === - 1 ) return cb ( new Error ( this . pq . errorMessage ( ) ) )
3523
36- if ( args . length ) {
37- this . write ( args [ 0 ] )
38- }
39- const result = this . pq . putCopyEnd ( )
40-
41- // sent successfully
42- if ( result === 1 ) {
43- // consume our results and then call 'end' on the
44- // "parent" writable class so we can emit 'finish' and
45- // all that jazz
46- return consumeResults ( this . pq , function ( err , res ) {
47- Writable . prototype . end . call ( self )
48-
49- // handle possible passing of callback to end method
50- if ( callback ) {
51- callback ( err )
52- }
24+ // command would block. wait for writable and call again.
25+ const self = this
26+ this . pq . writable ( function ( ) {
27+ self . _write ( chunk , encoding , cb )
5328 } )
5429 }
5530
56- // error
57- if ( result === - 1 ) {
58- const err = new Error ( this . pq . errorMessage ( ) )
59- return this . emit ( 'error' , err )
60- }
31+ end ( ) {
32+ const args = Array . prototype . slice . call ( arguments , 0 )
33+ const self = this
6134
62- // command would block. wait for writable and call end again
63- // don't pass any buffers to end on the second call because
64- // we already sent them to possible this.write the first time
65- // we called end
66- return this . pq . writable ( function ( ) {
67- return self . end . apply ( self , callback )
68- } )
69- }
35+ const callback = args . pop ( )
36+
37+ if ( args . length ) {
38+ this . write ( args [ 0 ] )
39+ }
40+ const result = this . pq . putCopyEnd ( )
41+
42+ // sent successfully
43+ if ( result === 1 ) {
44+ // consume our results and then call 'end' on the
45+ // "parent" writable class so we can emit 'finish' and
46+ // all that jazz
47+ return consumeResults ( this . pq , function ( err , res ) {
48+ Writable . prototype . end . call ( self )
49+
50+ // handle possible passing of callback to end method
51+ if ( callback ) {
52+ callback ( err )
53+ }
54+ } )
55+ }
56+
57+ // error
58+ if ( result === - 1 ) {
59+ const err = new Error ( this . pq . errorMessage ( ) )
60+ return this . emit ( 'error' , err )
61+ }
7062
71- // reader methods
72- CopyStream . prototype . _consumeBuffer = function ( cb ) {
73- const result = this . pq . getCopyData ( true )
74- if ( result instanceof Buffer ) {
75- return setImmediate ( function ( ) {
76- cb ( null , result )
63+ // command would block. wait for writable and call end again
64+ // don't pass any buffers to end on the second call because
65+ // we already sent them to possible this.write the first time
66+ // we called end
67+ return this . pq . writable ( function ( ) {
68+ return self . end . apply ( self , callback )
7769 } )
7870 }
79- if ( result === - 1 ) {
80- // end of stream
81- return cb ( null , null )
71+
72+ // reader methods
73+ _consumeBuffer ( cb ) {
74+ const result = this . pq . getCopyData ( true )
75+ if ( result instanceof Buffer ) {
76+ return setImmediate ( function ( ) {
77+ cb ( null , result )
78+ } )
79+ }
80+ if ( result === - 1 ) {
81+ // end of stream
82+ return cb ( null , null )
83+ }
84+ if ( result === 0 ) {
85+ const self = this
86+ this . pq . once ( 'readable' , function ( ) {
87+ self . pq . stopReader ( )
88+ self . pq . consumeInput ( )
89+ self . _consumeBuffer ( cb )
90+ } )
91+ return this . pq . startReader ( )
92+ }
93+ cb ( new Error ( 'Unrecognized read status: ' + result ) )
8294 }
83- if ( result === 0 ) {
95+
96+ _read ( size ) {
97+ if ( this . _reading ) return
98+ this . _reading = true
99+ // console.log('read begin');
84100 const self = this
85- this . pq . once ( 'readable' , function ( ) {
86- self . pq . stopReader ( )
87- self . pq . consumeInput ( )
88- self . _consumeBuffer ( cb )
101+ this . _consumeBuffer ( function ( err , buffer ) {
102+ self . _reading = false
103+ if ( err ) {
104+ return self . emit ( 'error' , err )
105+ }
106+ if ( buffer === false ) {
107+ // nothing to read for now, return
108+ return
109+ }
110+ self . push ( buffer )
89111 } )
90- return this . pq . startReader ( )
91112 }
92- cb ( new Error ( 'Unrecognized read status: ' + result ) )
93113}
94-
95- CopyStream . prototype . _read = function ( size ) {
96- if ( this . _reading ) return
97- this . _reading = true
98- // console.log('read begin');
99- const self = this
100- this . _consumeBuffer ( function ( err , buffer ) {
101- self . _reading = false
102- if ( err ) {
103- return self . emit ( 'error' , err )
104- }
105- if ( buffer === false ) {
106- // nothing to read for now, return
107- return
108- }
109- self . push ( buffer )
110- } )
111- }
112-
113114const consumeResults = function ( pq , cb ) {
114115 const cleanup = function ( ) {
115116 pq . removeListener ( 'readable' , onReadable )
0 commit comments