@@ -16,8 +16,8 @@ class TrailingChecksumTransform extends Transform {
1616 this . log = log ;
1717 this . errCb = errCb ;
1818 this . chunkSizeBuffer = Buffer . alloc ( 0 ) ;
19- this . outputBuffer = Buffer . alloc ( 0 ) ;
20- this . toFlush = Buffer . alloc ( 0 ) ;
19+ this . outputBuffer = [ ] ;
20+ this . bytesToDiscard = 0 ; // when trailing /r/n are present, we discard them but they can be in different chunks
2121 this . bytesToRead = 0 ; // when a chunk is advertised, the size is put here and we forward all bytes
2222 this . streamClosed = false ;
2323 }
@@ -32,64 +32,66 @@ class TrailingChecksumTransform extends Transform {
3232 */
3333 _transform ( chunkInput , encoding , callback ) {
3434 let chunk = chunkInput ;
35- for ( ; ; ) {
36- if ( chunk . byteLength === 0 ) {
37- break ;
38- }
39- if ( this . streamClosed ) {
40- return callback ( null , '' , encoding ) ;
35+ while ( chunk . byteLength > 0 && ! this . streamClosed ) {
36+ if ( this . bytesToDiscard > 0 ) {
37+ const toDiscard = Math . min ( this . bytesToDiscard , chunk . byteLength ) ;
38+ chunk = chunk . subarray ( toDiscard ) ;
39+ this . bytesToDiscard -= toDiscard ;
40+ continue ;
4141 }
42+ // forward up to bytesToRead bytes from the chunk, restart processing on leftover
4243 if ( this . bytesToRead > 0 ) {
43- if ( chunk . byteLength <= this . bytesToRead ) {
44- // chunk is smaller than the advertised size
45- // forward the whole chunk
46- this . bytesToRead -= chunk . byteLength ;
47- this . outputBuffer = Buffer . concat ( [ this . outputBuffer , chunk ] ) ;
48- break ;
44+ const toRead = Math . min ( this . bytesToRead , chunk . byteLength ) ;
45+ this . outputBuffer . push ( chunk . subarray ( 0 , toRead ) ) ;
46+ chunk = chunk . subarray ( toRead ) ;
47+ this . bytesToRead -= toRead ;
48+ if ( this . bytesToRead === 0 ) {
49+ this . bytesToDiscard = 2 ;
4950 }
50- // chunk is bigger than the advertised size
51- this . log . info ( 'concatenating chunk' , { first : this . outputBuffer . toString ( ) } ) ;
52- this . log . info ( 'concatenating chunk' , { second : chunk . subarray ( 0 , this . bytesToRead ) . toString ( ) } ) ;
53- this . outputBuffer = Buffer . concat ( [ this . outputBuffer , chunk . subarray ( 0 , this . bytesToRead ) ] ) ;
54- chunk = chunk . subarray ( this . bytesToRead ) ;
55- this . bytesToRead = 0 ;
5651 continue ;
5752 }
58- const lineBreakIndex = chunk . indexOf ( '\r\n' ) ;
59- if ( lineBreakIndex === - 1 ) {
60- this . chunkSizeBuffer = Buffer . concat ( [ this . chunkSizeBuffer , chunk ] ) ;
61- if ( this . chunkSizeBuffer . byteLength > 8 ) {
53+
54+ const lineBreakIndex2 = chunk . indexOf ( '\r' ) ;
55+ if ( lineBreakIndex2 === - 1 ) {
56+ if ( this . chunkSizeBuffer . byteLength + chunk . byteLength > 10 ) {
57+ this . log . debug ( 'chunk size field too big' , {
58+ chunkSizeBuffer : this . chunkSizeBuffer . toString ( 'hex' ) ,
59+ truncatedChunk : chunk . subarray ( 0 , 8 ) . toString ( 'hex' ) ,
60+ } ) ;
6261 // if bigger, the chunk would be over 5 GB
6362 // returning early to avoid a DoS by memory exhaustion
6463 return callback ( errors . InvalidArgument ) ;
6564 }
66- return callback ( null , '' , encoding ) ;
67- }
68- if ( lineBreakIndex === 0 ) {
69- chunk = chunk . subarray ( lineBreakIndex + 2 ) ;
70- continue ;
65+ // no delimiter, we'll keep the chunk for later
66+ this . chunkSizeBuffer = Buffer . concat ( [ this . chunkSizeBuffer , chunk ] ) ;
67+ break ;
7168 }
72- this . chunkSizeBuffer = Buffer . concat ( [ this . chunkSizeBuffer , chunk . subarray ( 0 , lineBreakIndex ) ] ) ;
73- chunk = chunk . subarray ( lineBreakIndex + 2 ) ;
69+
70+ this . chunkSizeBuffer = Buffer . concat ( [ this . chunkSizeBuffer , chunk . subarray ( 0 , lineBreakIndex2 ) ] ) ;
71+ chunk = chunk . subarray ( lineBreakIndex2 ) ;
72+
7473 // chunk-size is sent in hex
7574 const dataSize = Number . parseInt ( this . chunkSizeBuffer . toString ( ) , 16 ) ;
7675 if ( Number . isNaN ( dataSize ) ) {
76+ this . log . debug ( 'unable to parse chunk size' , {
77+ chunkSizeBuffer : this . chunkSizeBuffer . toString ( 'hex' ) ,
78+ } ) ;
7779 return callback ( errors . InvalidArgument ) ;
7880 }
7981 this . chunkSizeBuffer = Buffer . alloc ( 0 ) ;
8082 if ( dataSize === 0 ) {
83+ // TODO: check if the checksum is correct
8184 // last chunk, no more data to read, the stream is closed
8285 this . streamClosed = true ;
83- break ;
8486 }
8587 this . bytesToRead = dataSize ;
86- this . log . info ( 'chunk size' , { dataSize } ) ;
88+ this . bytesToDiscard = 2 ;
8789 }
8890
89- this . toFlush = this . outputBuffer ;
90- this . outputBuffer = Buffer . alloc ( 0 ) ;
91- this . log . info ( 'toFlush' , { toFlush : this . toFlush . toString ( ) } ) ;
92- return callback ( null , this . toFlush , encoding ) ;
91+ // TODO: push the stream into a checksum accumulator
92+ const toFlush = Buffer . concat ( this . outputBuffer ) ;
93+ this . outputBuffer = [ ] ;
94+ return callback ( null , toFlush , encoding ) ;
9395 }
9496}
9597
0 commit comments