1+ import { decodeDockerContainerStats } from "./convert" ;
2+ import { DockerContainerStats } from "./types" ;
3+
4+
5+ export class DockerStatsStream extends ReadableStream < DockerContainerStats > {
6+ constructor ( source : ReadableStream < Uint8Array > ) {
7+ super ( {
8+ async start ( controller ) {
9+ let readerDone = false ;
10+ const reader = source . getReader ( ) ;
11+ const decoder = new TextDecoder ( 'utf-8' ) ;
12+ let curr = '' ;
13+ while ( ! reader . closed && ! readerDone ) {
14+ const { done, value } = await reader . read ( ) ;
15+ readerDone = readerDone || done ;
16+ if ( ! value ) continue ;
17+
18+ curr += decoder . decode ( value , { stream : true } ) ;
19+ let idx : number ;
20+ while ( ( idx = curr . indexOf ( '\n' ) ) >= 0 ) {
21+ const line = curr . slice ( 0 , idx ) ;
22+ curr = curr . slice ( idx + 1 ) ;
23+ const statsJson = JSON . parse ( line ) ;
24+ controller . enqueue ( decodeDockerContainerStats ( statsJson ) ) ;
25+ }
26+ }
27+ controller . close ( ) ;
28+ } ,
29+ cancel ( ) {
30+ source . cancel ( ) ;
31+ }
32+ } ) ;
33+ }
34+ }
35+
36+
37+ export type DockerLogStreamType = 'stderr' | 'stdin' | 'stdout' ;
38+
39+
40+ export type DockerLogStreamCallback = ( type : DockerLogStreamType , data : string ) => void ;
41+
42+
43+
44+ /*
45+ export class DockerLogStream {
46+ private rawStream: NodeJS.ReadableStream;
47+ private closed: boolean = false;
48+ private onClose: () => void;
49+
50+
51+ constructor(contentType: string, rawStream: NodeJS.ReadableStream, onClose: () => void, onReceive: DockerLogStreamCallback){
52+ this.rawStream = rawStream;
53+ this.onClose = onClose;
54+
55+ this.rawStream.on('end', () => {
56+ this.onClose();
57+ });
58+
59+
60+ // any other type of stream
61+ if(contentType !== 'application/vnd.docker.multiplexed-stream'){
62+ this.rawStream.on('data', (chunk: Buffer) => {
63+ onReceive('stdout', chunk.toString('utf8'));
64+ });
65+ return;
66+ }
67+
68+
69+ // Docker Multi-Stream
70+ let currType: DockerLogStreamType | null = null; // if null read header
71+ let currBuf: Buffer = Buffer.alloc(0);
72+ let currOffset = 0;
73+ let currPayloadSize: number = 0;
74+
75+ this.rawStream.on('data', (chunk: Buffer) => {
76+ if(currOffset < currBuf.length){
77+ currBuf = Buffer.concat([currBuf, chunk]);
78+ } else {
79+ currBuf = chunk;
80+ currOffset = 0;
81+ }
82+
83+ while(true){
84+ if(!currType){
85+ // read header
86+ if(currBuf.length - currOffset >= 8){ // header has 8 bytes
87+ switch(currBuf[currOffset]){
88+ case 0: currType = 'stdin'; break;
89+ case 1: currType = 'stdout'; break;
90+ case 2: currType = 'stderr'; break;
91+ default: currType = 'stdout'; break;
92+ }
93+ currPayloadSize = currBuf.readUInt32BE(currOffset + 4); // read length of payload
94+ currOffset += 8; // move past header
95+ } else {
96+ break;
97+ }
98+ }
99+
100+ if(currBuf.length - currOffset >= currPayloadSize){
101+ const end = currOffset + currPayloadSize;
102+ const payload = currBuf.toString('utf8', currOffset, end);
103+ onReceive(currType, payload);
104+ currBuf = (currBuf.length > end) ? currBuf.subarray(end) : Buffer.alloc(0);
105+ currType = null;
106+ currPayloadSize = 0;
107+ currOffset = 0;
108+ } else {
109+ break;
110+ }
111+ }
112+ });
113+ }
114+
115+ public close(): void {
116+ if(this.closed) return;
117+ this.closed = true;
118+ this.rawStream.pause();
119+ this.onClose();
120+ }
121+
122+ public isClosed(): boolean {
123+ return this.closed;
124+ }
125+ }*/
126+
127+ export type DockerLogStreamChunk = {
128+
129+ /** Type of the stream (stdout, stderr, stdin) */
130+ type : DockerLogStreamType ;
131+
132+ /** Data of the stream */
133+ data : string ;
134+ } ;
135+
136+ export class DockerLogStream extends ReadableStream < DockerLogStreamChunk > {
137+
138+ constructor ( contentType : string , source : ReadableStream < Uint8Array > ) {
139+ super ( {
140+ async start ( controller ) {
141+ // Initialize the stream
142+ let readerDone = false ;
143+ const reader = source . getReader ( ) ;
144+ const decoder = new TextDecoder ( 'utf-8' ) ;
145+
146+ // any other type of stream
147+ if ( contentType !== 'application/vnd.docker.multiplexed-stream' ) {
148+ while ( ! reader . closed && ! readerDone ) {
149+ const { done, value } = await reader . read ( ) ;
150+ readerDone = readerDone || done ;
151+ if ( ! value ) continue ;
152+
153+ controller . enqueue ( { type : 'stdout' , data : decoder . decode ( value , { stream : true } ) } ) ;
154+ }
155+ controller . close ( ) ;
156+ return ;
157+ }
158+
159+
160+ // Docker Multi-Stream
161+ let currType : DockerLogStreamType | null = null ; // if null read header
162+ let currBuf : Buffer = Buffer . alloc ( 0 ) ;
163+ let currOffset = 0 ;
164+ let currPayloadSize : number = 0 ;
165+
166+ while ( ! reader . closed && ! readerDone ) {
167+ const { done, value : chunk } = await reader . read ( ) ;
168+ readerDone = readerDone || done ;
169+ if ( ! chunk ) continue ;
170+
171+ if ( currOffset < currBuf . length ) {
172+ currBuf = Buffer . concat ( [ currBuf , chunk ] ) ;
173+ } else {
174+ currBuf = Buffer . from ( chunk ) ;
175+ currOffset = 0 ;
176+ }
177+
178+ while ( true ) {
179+ if ( ! currType ) {
180+ // read header
181+ if ( currBuf . length - currOffset >= 8 ) { // header has 8 bytes
182+ switch ( currBuf [ currOffset ] ) {
183+ case 0 : currType = 'stdin' ; break ;
184+ case 1 : currType = 'stdout' ; break ;
185+ case 2 : currType = 'stderr' ; break ;
186+ default : currType = 'stdout' ; break ;
187+ }
188+ currPayloadSize = currBuf . readUInt32BE ( currOffset + 4 ) ; // read length of payload
189+ currOffset += 8 ; // move past header
190+ } else {
191+ break ;
192+ }
193+ }
194+
195+ if ( currBuf . length - currOffset >= currPayloadSize ) {
196+ const end = currOffset + currPayloadSize ;
197+ const payload = currBuf . toString ( 'utf8' , currOffset , end ) ;
198+ controller . enqueue ( { type : currType , data : payload } ) ;
199+ currBuf = ( currBuf . length > end ) ? currBuf . subarray ( end ) : Buffer . alloc ( 0 ) ;
200+ currType = null ;
201+ currPayloadSize = 0 ;
202+ currOffset = 0 ;
203+ } else {
204+ break ;
205+ }
206+ }
207+ }
208+ controller . close ( ) ;
209+ } ,
210+ cancel ( ) {
211+ // Clean up the stream
212+ return source . cancel ( ) ;
213+ } ,
214+ } ) ;
215+ }
216+ }
0 commit comments