Skip to content

Commit 76c68e8

Browse files
committed
Bugfix streams
1 parent e94864d commit 76c68e8

File tree

3 files changed

+115
-82
lines changed

3 files changed

+115
-82
lines changed

src/__tests__/Docker.test.ts

Lines changed: 19 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,6 @@
11
import DockerClient from '../client';
22

3+
/*
34
test('getContainers', async () => {
45
const client = new DockerClient();
56
const containers = await client.getContainers();
@@ -16,3 +17,21 @@ test('createContainer', async () => {
1617
});
1718
console.log(JSON.stringify(container, null, 2));
1819
});
20+
*/
21+
22+
/*
23+
test('getContainerStats', async () => {
24+
const client = new DockerClient();
25+
const response = await client.getContainerStats('postgresql', { stream: true });
26+
if(response.success?.stream){
27+
const stream = response.success.stream;
28+
const reader = stream.getReader();
29+
for(let i=0; i < 5; i++){
30+
const { done, value: stats } = await reader.read();
31+
if(done) break;
32+
console.log(JSON.stringify(stats, null, 2));
33+
}
34+
await reader.cancel();
35+
}
36+
});
37+
*/

src/stream.ts

Lines changed: 93 additions & 79 deletions
Original file line numberDiff line numberDiff line change
@@ -4,31 +4,39 @@ import { DockerContainerStats } from "./types";
44

55
export class DockerStatsStream extends ReadableStream<DockerContainerStats> {
66
constructor(source: ReadableStream<Uint8Array>) {
7+
let reader: ReadableStreamDefaultReader<Uint8Array<ArrayBufferLike>>;
8+
79
super({
10+
811
async start(controller){
9-
let readerDone = false;
10-
const reader = source.getReader();
11-
const decoder = new TextDecoder('utf-8');
12-
let curr = '';
13-
while(!reader.closed && !readerDone){
14-
const { done, value } = await reader.read();
15-
readerDone = readerDone || done;
16-
if(!value) continue;
17-
18-
curr += decoder.decode(value, { stream: true });
19-
let idx: number;
20-
while((idx = curr.indexOf('\n')) >= 0){
21-
const line = curr.slice(0, idx);
22-
curr = curr.slice(idx + 1);
23-
const statsJson = JSON.parse(line);
24-
controller.enqueue(decodeDockerContainerStats(statsJson));
12+
try {
13+
let readerDone = false;
14+
reader = source.getReader();
15+
const decoder = new TextDecoder('utf-8');
16+
let curr = '';
17+
while(!readerDone){
18+
const { done, value } = await reader.read();
19+
readerDone = readerDone || done;
20+
if(!value) continue;
21+
22+
curr += decoder.decode(value, { stream: true });
23+
let idx: number;
24+
while((idx = curr.indexOf('\n')) >= 0){
25+
const line = curr.slice(0, idx);
26+
curr = curr.slice(idx + 1);
27+
const statsJson = JSON.parse(line);
28+
controller.enqueue(decodeDockerContainerStats(statsJson));
29+
}
2530
}
31+
} finally {
32+
controller.close();
2633
}
27-
controller.close();
2834
},
29-
cancel(){
30-
source.cancel();
31-
}
35+
36+
async cancel(reason){
37+
if(reader) await reader.cancel(reason);
38+
},
39+
3240
});
3341
}
3442
}
@@ -136,81 +144,87 @@ export type DockerLogStreamChunk = {
136144
export class DockerLogStream extends ReadableStream<DockerLogStreamChunk> {
137145

138146
constructor(contentType: string, source: ReadableStream<Uint8Array>) {
147+
let reader: ReadableStreamDefaultReader<Uint8Array> | null = null;
148+
139149
super({
140-
async start(controller) {
141-
// Initialize the stream
142-
let readerDone = false;
143-
const reader = source.getReader();
144-
const decoder = new TextDecoder('utf-8');
145-
146-
// any other type of stream
147-
if(contentType !== 'application/vnd.docker.multiplexed-stream'){
148-
while(!reader.closed && !readerDone){
149-
const { done, value } = await reader.read();
150-
readerDone = readerDone || done;
151-
if(!value) continue;
152150

153-
controller.enqueue({ type: 'stdout', data: decoder.decode(value, { stream: true }) });
151+
async start(controller) {
152+
try {
153+
let readerDone = false;
154+
reader = source.getReader();
155+
const decoder = new TextDecoder('utf-8');
156+
157+
// any other type of stream
158+
if(contentType !== 'application/vnd.docker.multiplexed-stream'){
159+
while(!readerDone){
160+
const { done, value } = await reader.read();
161+
readerDone = readerDone || done;
162+
if(!value) continue;
163+
164+
controller.enqueue({ type: 'stdout', data: decoder.decode(value, { stream: true }) });
165+
}
166+
controller.close();
167+
return;
154168
}
155-
controller.close();
156-
return;
157-
}
158169

159170

160-
// Docker Multi-Stream
161-
let currType: DockerLogStreamType | null = null; // if null read header
162-
let currBuf: Buffer = Buffer.alloc(0);
163-
let currOffset = 0;
164-
let currPayloadSize: number = 0;
171+
// Docker Multi-Stream
172+
let currType: DockerLogStreamType | null = null; // if null read header
173+
let currBuf: Buffer = Buffer.alloc(0);
174+
let currOffset = 0;
175+
let currPayloadSize: number = 0;
165176

166-
while(!reader.closed && !readerDone){
167-
const { done, value: chunk } = await reader.read();
168-
readerDone = readerDone || done;
169-
if(!chunk) continue;
170-
171-
if(currOffset < currBuf.length){
172-
currBuf = Buffer.concat([currBuf, chunk]);
173-
} else {
174-
currBuf = Buffer.from(chunk);
175-
currOffset = 0;
176-
}
177+
while(!readerDone){
178+
const { done, value: chunk } = await reader.read();
179+
readerDone = readerDone || done;
180+
if(!chunk) continue;
181+
182+
if(currOffset < currBuf.length){
183+
currBuf = Buffer.concat([currBuf, chunk]);
184+
} else {
185+
currBuf = Buffer.from(chunk);
186+
currOffset = 0;
187+
}
177188

178-
while(true){
179-
if(!currType){
180-
// read header
181-
if(currBuf.length - currOffset >= 8){ // header has 8 bytes
182-
switch(currBuf[currOffset]){
183-
case 0: currType = 'stdin'; break;
184-
case 1: currType = 'stdout'; break;
185-
case 2: currType = 'stderr'; break;
186-
default: currType = 'stdout'; break;
189+
while(true){
190+
if(!currType){
191+
// read header
192+
if(currBuf.length - currOffset >= 8){ // header has 8 bytes
193+
switch(currBuf[currOffset]){
194+
case 0: currType = 'stdin'; break;
195+
case 1: currType = 'stdout'; break;
196+
case 2: currType = 'stderr'; break;
197+
default: currType = 'stdout'; break;
198+
}
199+
currPayloadSize = currBuf.readUInt32BE(currOffset + 4); // read length of payload
200+
currOffset += 8; // move past header
201+
} else {
202+
break;
187203
}
188-
currPayloadSize = currBuf.readUInt32BE(currOffset + 4); // read length of payload
189-
currOffset += 8; // move past header
204+
}
205+
206+
if(currBuf.length - currOffset >= currPayloadSize){
207+
const end = currOffset + currPayloadSize;
208+
const payload = currBuf.toString('utf8', currOffset, end);
209+
controller.enqueue({ type: currType, data: payload });
210+
currBuf = (currBuf.length > end) ? currBuf.subarray(end) : Buffer.alloc(0);
211+
currType = null;
212+
currPayloadSize = 0;
213+
currOffset = 0;
190214
} else {
191215
break;
192216
}
193217
}
194-
195-
if(currBuf.length - currOffset >= currPayloadSize){
196-
const end = currOffset + currPayloadSize;
197-
const payload = currBuf.toString('utf8', currOffset, end);
198-
controller.enqueue({ type: currType, data: payload });
199-
currBuf = (currBuf.length > end) ? currBuf.subarray(end) : Buffer.alloc(0);
200-
currType = null;
201-
currPayloadSize = 0;
202-
currOffset = 0;
203-
} else {
204-
break;
205-
}
206218
}
219+
} finally {
220+
controller.close();
207221
}
208-
controller.close();
209222
},
210-
cancel() {
211-
// Clean up the stream
212-
return source.cancel();
223+
224+
async cancel(reason) {
225+
if(reader) await reader.cancel(reason);
213226
},
227+
214228
});
215229
}
216230
}

src/types.ts

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -3,13 +3,13 @@ import { DockerLogStream, DockerLogStreamCallback, DockerStatsStream } from "./s
33
export type DockerSuccess<T> = {
44

55
/** Object containing the successful response data. */
6-
success: T;
6+
success?: T;
77
};
88

99
export type DockerError<T extends object = {}> = {
1010

1111
/** Object containing the error response data. */
12-
error: {
12+
error?: {
1313

1414
/** DockerError message (always present in case of an error). */
1515
message: string;
@@ -26,7 +26,7 @@ export type DockerError<T extends object = {}> = {
2626
} & T;
2727
};
2828

29-
export type DockerResult<S, E extends object = {}> = DockerSuccess<S> | DockerError<E>;
29+
export type DockerResult<S, E extends object = {}> = DockerSuccess<S> & DockerError<E>;
3030

3131

3232

0 commit comments

Comments
 (0)