Skip to content

Commit 6854e1e

Browse files
committed
Added container stats stream reader
1 parent 9371e45 commit 6854e1e

File tree

4 files changed

+158
-125
lines changed

4 files changed

+158
-125
lines changed

package.json

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
{
22
"name": "lup-docker",
3-
"version": "1.1.1",
3+
"version": "1.1.2",
44
"description": "NodeJS library to interact with the Docker engine.",
55
"main": "./lib/index",
66
"types": "./lib/index.d.ts",

src/client.ts

Lines changed: 20 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -33,7 +33,7 @@ import {
3333
DockerExportImagesOptions,
3434
DockerContainerStats,
3535
} from './types';
36-
import { DockerStatsStream, DockerLogStream } from './stream';
36+
import { DockerStatsStream, DockerLogStream, DockerStatsStreamReader } from './stream';
3737
import {
3838
decodeDockerContainer,
3939
decodeDockerContainerStats,
@@ -642,6 +642,25 @@ class DockerClient {
642642
);
643643
}
644644

645+
/**
646+
* Returns the utilization stats for a container as a continuous stream
647+
* tunneled into a stream reader.
648+
*
649+
* @warning The stream will continuously produce stats objects which accumulate over time if not consumed!
650+
*
651+
* @param containerIdOrName ID or name of the container.
652+
* @returns Stream of stat objects tunneled into a stream reader or an error.
653+
*/
654+
public async getContainerStatsReader(
655+
containerIdOrName: string,
656+
): Promise<DockerResult<DockerStatsStreamReader, DockerGetContainerStatsResponseError>> {
657+
return this.getContainerStatsStream(containerIdOrName).then<
658+
DockerResult<DockerStatsStreamReader, DockerGetContainerStatsResponseError>
659+
>((result) =>
660+
result.success ? { success: new DockerStatsStreamReader(result.success!) } : { error: result.error! },
661+
);
662+
}
663+
645664
/**
646665
* Returns the utilization stats for a container as a continuous stream.
647666
*

src/index.ts

Lines changed: 16 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
import DockerClient from './client';
2-
import { DockerLogStream, DockerStatsStream } from './stream';
2+
import { DockerLogStream, DockerStatsStream, DockerStatsStreamReader } from './stream';
33
import {
44
DockerContainer,
55
DockerContainerStats,
@@ -155,6 +155,21 @@ const Docker = {
155155
return CLIENT.getContainerStats(containerIdOrName, options);
156156
},
157157

158+
/**
159+
* Returns the utilization stats for a container as a continuous stream
160+
* tunneled into a stream reader.
161+
*
162+
* @warning The stream will continuously produce stats objects which accumulate over time if not consumed!
163+
*
164+
* @param containerIdOrName ID or name of the container.
165+
* @returns Stream of stat objects tunneled into a stream reader or an error.
166+
*/
167+
async getContainerStatsReader(
168+
containerIdOrName: string,
169+
): Promise<DockerResult<DockerStatsStreamReader, DockerGetContainerStatsResponseError>> {
170+
return CLIENT.getContainerStatsReader(containerIdOrName);
171+
},
172+
158173
/**
159174
* Returns the utilization stats for a container as a continuous stream.
160175
*

src/stream.ts

Lines changed: 121 additions & 122 deletions
Original file line numberDiff line numberDiff line change
@@ -3,132 +3,10 @@
33
import { decodeDockerContainerStats } from './convert';
44
import { DockerContainerStats } from './types';
55

6-
export class DockerStatsStream extends ReadableStream<DockerContainerStats> {
7-
constructor(source: ReadableStream<Uint8Array>) {
8-
let reader: ReadableStreamDefaultReader<Uint8Array<ArrayBufferLike>>;
9-
10-
super({
11-
async start(controller) {
12-
try {
13-
let readerDone = false;
14-
reader = source.getReader();
15-
const decoder = new TextDecoder('utf-8');
16-
let curr = '';
17-
while (!readerDone) {
18-
const { done, value } = await reader.read();
19-
readerDone = readerDone || done;
20-
if (!value) continue;
21-
22-
curr += decoder.decode(value, { stream: true });
23-
let idx: number;
24-
while (true) {
25-
idx = curr.indexOf('\n');
26-
if (idx < 0) break;
27-
const line = curr.slice(0, idx);
28-
curr = curr.slice(idx + 1);
29-
const statsJson = JSON.parse(line);
30-
controller.enqueue(decodeDockerContainerStats(statsJson));
31-
}
32-
}
33-
} finally {
34-
controller.close();
35-
}
36-
},
37-
38-
async cancel(reason) {
39-
if (reader) await reader.cancel(reason);
40-
},
41-
});
42-
}
43-
}
44-
456
export type DockerLogStreamType = 'stderr' | 'stdin' | 'stdout';
467

478
export type DockerLogStreamCallback = (type: DockerLogStreamType, data: string) => void;
489

49-
/*
50-
export class DockerLogStream {
51-
private rawStream: NodeJS.ReadableStream;
52-
private closed: boolean = false;
53-
private onClose: () => void;
54-
55-
56-
constructor(contentType: string, rawStream: NodeJS.ReadableStream, onClose: () => void, onReceive: DockerLogStreamCallback){
57-
this.rawStream = rawStream;
58-
this.onClose = onClose;
59-
60-
this.rawStream.on('end', () => {
61-
this.onClose();
62-
});
63-
64-
65-
// any other type of stream
66-
if(contentType !== 'application/vnd.docker.multiplexed-stream'){
67-
this.rawStream.on('data', (chunk: Buffer) => {
68-
onReceive('stdout', chunk.toString('utf8'));
69-
});
70-
return;
71-
}
72-
73-
74-
// Docker Multi-Stream
75-
let currType: DockerLogStreamType | null = null; // if null read header
76-
let currBuf: Buffer = Buffer.alloc(0);
77-
let currOffset = 0;
78-
let currPayloadSize: number = 0;
79-
80-
this.rawStream.on('data', (chunk: Buffer) => {
81-
if(currOffset < currBuf.length){
82-
currBuf = Buffer.concat([currBuf, chunk]);
83-
} else {
84-
currBuf = chunk;
85-
currOffset = 0;
86-
}
87-
88-
while(true){
89-
if(!currType){
90-
// read header
91-
if(currBuf.length - currOffset >= 8){ // header has 8 bytes
92-
switch(currBuf[currOffset]){
93-
case 0: currType = 'stdin'; break;
94-
case 1: currType = 'stdout'; break;
95-
case 2: currType = 'stderr'; break;
96-
default: currType = 'stdout'; break;
97-
}
98-
currPayloadSize = currBuf.readUInt32BE(currOffset + 4); // read length of payload
99-
currOffset += 8; // move past header
100-
} else {
101-
break;
102-
}
103-
}
104-
105-
if(currBuf.length - currOffset >= currPayloadSize){
106-
const end = currOffset + currPayloadSize;
107-
const payload = currBuf.toString('utf8', currOffset, end);
108-
onReceive(currType, payload);
109-
currBuf = (currBuf.length > end) ? currBuf.subarray(end) : Buffer.alloc(0);
110-
currType = null;
111-
currPayloadSize = 0;
112-
currOffset = 0;
113-
} else {
114-
break;
115-
}
116-
}
117-
});
118-
}
119-
120-
public close(): void {
121-
if(this.closed) return;
122-
this.closed = true;
123-
this.rawStream.pause();
124-
this.onClose();
125-
}
126-
127-
public isClosed(): boolean {
128-
return this.closed;
129-
}
130-
}*/
131-
13210
export type DockerLogStreamChunk = {
13311
/** Type of the stream (stdout, stderr, stdin) */
13412
type: DockerLogStreamType;
@@ -229,3 +107,124 @@ export class DockerLogStream extends ReadableStream<DockerLogStreamChunk> {
229107
});
230108
}
231109
}
110+
111+
export class DockerStatsStream extends ReadableStream<DockerContainerStats> {
112+
constructor(source: ReadableStream<Uint8Array>) {
113+
let reader: ReadableStreamDefaultReader<Uint8Array<ArrayBufferLike>>;
114+
115+
super({
116+
async start(controller) {
117+
try {
118+
let readerDone = false;
119+
reader = source.getReader();
120+
const decoder = new TextDecoder('utf-8');
121+
let curr = '';
122+
while (!readerDone) {
123+
const { done, value } = await reader.read();
124+
readerDone = readerDone || done;
125+
if (!value) continue;
126+
127+
curr += decoder.decode(value, { stream: true });
128+
let idx: number;
129+
while (true) {
130+
idx = curr.indexOf('\n');
131+
if (idx < 0) break;
132+
const line = curr.slice(0, idx);
133+
curr = curr.slice(idx + 1);
134+
const statsJson = JSON.parse(line);
135+
controller.enqueue(decodeDockerContainerStats(statsJson));
136+
}
137+
}
138+
} finally {
139+
controller.close();
140+
}
141+
},
142+
143+
async cancel(reason) {
144+
if (reader) await reader.cancel(reason);
145+
},
146+
});
147+
}
148+
}
149+
150+
export class DockerStatsStreamReader {
151+
private reader: ReadableStreamDefaultReader<DockerContainerStats>;
152+
private closed: boolean = false;
153+
private callbacks: ((stats: DockerContainerStats) => void)[] = [];
154+
private cancelAutoReadCallbacks: (() => void)[] = [];
155+
156+
constructor(stream: DockerStatsStream) {
157+
this.reader = stream.getReader();
158+
}
159+
160+
private async autoReadLoop() {
161+
while (!this.closed && this.cancelAutoReadCallbacks.length === 0) {
162+
const { done } = await this.next(); // important to use own method because it invokes the callbacks
163+
if (done) break;
164+
}
165+
this.callbacks = [];
166+
this.cancelAutoReadCallbacks.forEach((callback) => callback());
167+
this.cancelAutoReadCallbacks = [];
168+
}
169+
170+
/**
171+
* Adds a callback to be called with each new stats object
172+
* and starts the auto-read mode if not already running.
173+
*
174+
* @param callback The callback to add.
175+
*/
176+
public addAutoReadCallback(callback: (stats: DockerContainerStats) => void): void {
177+
if (this.closed) throw new Error('Reader is already closed');
178+
this.callbacks.push(callback);
179+
if (this.callbacks.length === 1) this.autoReadLoop();
180+
}
181+
182+
/**
183+
* Cancels the auto-read mode and waits for any ongoing reads to complete.
184+
* Clears all callbacks added through the `addAutoReadCallback` method.
185+
*
186+
* @returns A promise that resolves when the auto-read mode is cancelled and all ongoing reads are complete.
187+
*/
188+
public async cancelAutoRead(): Promise<void> {
189+
if (this.callbacks.length === 0) return;
190+
return new Promise((resolve) => {
191+
this.cancelAutoReadCallbacks.push(resolve);
192+
});
193+
}
194+
195+
/**
196+
* Closes the stream and releases any resources.
197+
*/
198+
public async close(): Promise<void> {
199+
if (this.closed) return;
200+
this.closed = true;
201+
202+
this.callbacks = [];
203+
this.cancelAutoReadCallbacks.forEach((callback) => callback());
204+
this.cancelAutoReadCallbacks = [];
205+
206+
return this.reader.cancel();
207+
}
208+
209+
/**
210+
* Checks if the stream is closed.
211+
*
212+
* @returns True if the stream is closed, false otherwise.
213+
*/
214+
public isClosed(): boolean {
215+
return this.closed;
216+
}
217+
218+
/**
219+
* Reads the next stats object from the stream.
220+
*
221+
* @returns Object containing the done status and the next stats object, if available.
222+
*/
223+
public async next(): Promise<{ done: boolean; stats?: DockerContainerStats }> {
224+
if (this.closed) return { done: true };
225+
const { done, value } = await this.reader.read();
226+
if (done) this.close();
227+
if (value) this.callbacks.forEach((callback) => callback(value));
228+
return { done, stats: value };
229+
}
230+
}

0 commit comments

Comments
 (0)