File tree Expand file tree Collapse file tree 1 file changed +57
-0
lines changed
Expand file tree Collapse file tree 1 file changed +57
-0
lines changed Original file line number Diff line number Diff line change 1+
2+ 'use strict' ;
3+
4+ const pipeline = require ( 'internal/streams/pipeline' ) ;
5+ const Duplex = require ( 'internal/streams/duplex' ) ;
6+
7+ module . exports = function pipe ( ...streams ) {
8+ let cb ;
9+ let ret ;
10+
11+ const r = pipeline ( streams , function ( err ) {
12+ if ( cb ) {
13+ cb ( err ) ;
14+ } else {
15+ ret . destroy ( err ) ;
16+ }
17+ } ) ;
18+ const w = streams [ 0 ] ;
19+
20+ ret = new Duplex ( {
21+ writable : ! ! w ?. writable ,
22+ readable : ! ! r ?. readable ,
23+ objectMode : streams [ 0 ] . readableObjectMode ,
24+ highWaterMark : 1
25+ } ) ;
26+
27+ if ( ret . writable ) {
28+ ret . _write = function ( chunk , encoding , callback ) {
29+ w . write ( chunk , encoding , callback ) ;
30+ } ;
31+
32+ ret . _final = function ( chunk , encoding , callback ) {
33+ w . end ( chunk , encoding , callback ) ;
34+ } ;
35+ }
36+
37+ if ( ret . readable ) {
38+ ret . _read = function ( ) {
39+ r . resume ( ) ;
40+ } ;
41+
42+ r
43+ . on ( 'data' , function ( buf ) {
44+ if ( ! ret . push ( buf ) ) {
45+ this . pause ( ) ;
46+ }
47+ } )
48+ . on ( 'end' , function ( ) {
49+ ret . push ( null ) ;
50+ } ) ;
51+ }
52+
53+ ret . _destroy = function ( err , callback ) {
54+ cb = callback ;
55+ streams [ 0 ] . destroy ( err ) ;
56+ } ;
57+ }
You can’t perform that action at this time.
0 commit comments