11const xml2js = require ( 'xml2js' ) ;
22const { errors, errorInstances, jsutil } = require ( 'arsenal' ) ;
3- const { Readable } = require ( 'stream' ) ;
3+ const { Readable, pipeline : streamPipeline } = require ( 'stream' ) ;
4+ const { promisify } = require ( 'util' ) ;
45const collectResponseHeaders = require ( '../../utilities/collectResponseHeaders' ) ;
56const collectCorsHeaders = require ( '../../utilities/collectCorsHeaders' ) ;
67const crypto = require ( 'crypto' ) ;
78const { prepareStream } = require ( 'arsenal/build/lib/s3middleware/prepareStream' ) ;
89const UtilizationService = require ( '../../utilization/instance' ) ;
910const metadata = require ( '../../metadata/wrapper' ) ;
1011
12+ const pipeline = promisify ( streamPipeline ) ;
13+
1114/**
1215 * Decodes an URI and return the result.
1316 * Do the same decoding than in S3 server
@@ -109,23 +112,12 @@ function getResponseHeader(request, bucket, dataBuffer, lastModified, log) {
109112 * @param {BucketInfo } bucket - bucket info
110113 * @param {string } data - data to send
111114 * @param {date } [lastModified] - last modified date of the value
112- * @returns {undefined } -
115+ * @returns {Promise<void> } -
113116 */
114- function respondWithData ( request , response , log , bucket , data , lastModified ) {
117+ async function respondWithData ( request , response , log , bucket , data , lastModified ) {
115118 const dataBuffer = Buffer . from ( data ) ;
116119 const responseMetaHeaders = getResponseHeader ( request , bucket , dataBuffer , lastModified , log ) ;
117120
118- response . on ( 'finish' , ( ) => {
119- let contentLength = 0 ;
120- if ( responseMetaHeaders && responseMetaHeaders [ 'Content-Length' ] ) {
121- contentLength = responseMetaHeaders [ 'Content-Length' ] ;
122- }
123- log . end ( ) . addDefaultFields ( { contentLength } ) ;
124- log . end ( ) . info ( 'responded with streamed content' , {
125- httpCode : response . statusCode ,
126- } ) ;
127- } ) ;
128-
129121 if ( responseMetaHeaders && typeof responseMetaHeaders === 'object' ) {
130122 Object . keys ( responseMetaHeaders ) . forEach ( key => {
131123 if ( responseMetaHeaders [ key ] !== undefined ) {
@@ -143,13 +135,17 @@ function respondWithData(request, response, log, bucket, data, lastModified) {
143135 }
144136
145137 response . writeHead ( 200 ) ;
146- const stream = Readable . from ( dataBuffer ) ;
147- stream . pipe ( response ) ;
148- stream . on ( 'unpipe' , ( ) => {
149- response . end ( ) ;
150- } ) ;
151- stream . on ( 'error' , ( ) => {
152- response . end ( ) ;
138+ // Use a single-element array so the Buffer is sent as one chunk rather
139+ // than being iterated byte-by-byte by Readable.from.
140+ await pipeline ( Readable . from ( [ dataBuffer ] ) , response ) ;
141+
142+ let contentLength = 0 ;
143+ if ( responseMetaHeaders && responseMetaHeaders [ 'Content-Length' ] ) {
144+ contentLength = responseMetaHeaders [ 'Content-Length' ] ;
145+ }
146+ log . end ( ) . addDefaultFields ( { contentLength } ) ;
147+ log . end ( ) . info ( 'responded with streamed content' , {
148+ httpCode : response . statusCode ,
153149 } ) ;
154150}
155151
@@ -308,7 +304,6 @@ async function buildVeeamFileData(request, bucketMd, log, name) {
308304
309305module . exports = {
310306 _decodeURI,
311- // callbackify keeps backward compatibility with put.js which still uses the callback form
312307 receiveData,
313308 respondWithData,
314309 getResponseHeader,
0 commit comments