Skip to content
Merged
Show file tree
Hide file tree
Changes from 6 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
96 changes: 96 additions & 0 deletions src/test/jsonl-stream.test.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,96 @@
import * as assert from "node:assert";

import { JsonlStream } from "../utils/jsonl-stream.js";

const setup = () => {
const jsonlStream = new JsonlStream();

const callbackCalls: unknown[] = [];
jsonlStream.onJson((data: unknown) => {
callbackCalls.push(data);
});

return {
jsonlStream,
callbackCalls,
};
};

suite("JSONL Streams", () => {
test("should parse and emit complete JSONL messages", () => {
const { jsonlStream, callbackCalls } = setup();

// Test with multiple JSON objects in a single write
const testData = Buffer.from('{"key1":"value1"}\n{"key2":"value2"}\n');

jsonlStream.write(testData);

assert.strictEqual(callbackCalls.length, 2);
assert.deepStrictEqual(callbackCalls[0], { key1: "value1" });
assert.deepStrictEqual(callbackCalls[1], { key2: "value2" });
});

test("should handle incomplete JSONL messages across multiple writes", () => {
const { jsonlStream, callbackCalls } = setup();

// First write with partial message
const firstChunk = Buffer.from('{"key":"value');
jsonlStream.write(firstChunk);

// Shouldn't emit anything yet
assert.strictEqual(callbackCalls.length, 0);

// Complete the message in second write
const secondChunk = Buffer.from('1"}\n');
jsonlStream.write(secondChunk);

// Now it should emit the complete message
assert.strictEqual(callbackCalls.length, 1);
assert.deepStrictEqual(callbackCalls[0], { key: "value1" });
});

test("should handle multiple messages in chunks", () => {
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for adding this test case, it is exactly the doubt I had when looking at the code!

const { jsonlStream, callbackCalls } = setup();

// Write first message and part of second
const firstChunk = Buffer.from('{"first":1}\n{"second":');
jsonlStream.write(firstChunk);

// First message should be emitted
assert.strictEqual(callbackCalls.length, 1);
assert.deepStrictEqual(callbackCalls[0], { first: 1 });

// Complete second message and add third
const secondChunk = Buffer.from('2}\n{"third":3}\n');
jsonlStream.write(secondChunk);

// Should have all three messages now
assert.strictEqual(callbackCalls.length, 3);
assert.deepStrictEqual(callbackCalls[1], { second: 2 });
assert.deepStrictEqual(callbackCalls[2], { third: 3 });
});

test("should ignore invalid JSON lines", () => {
const { jsonlStream, callbackCalls } = setup();

const testData = Buffer.from('not json\n{"valid":true}\n{invalid}\n');

jsonlStream.write(testData);

// Should only emit the valid JSON object
assert.strictEqual(callbackCalls.length, 1);
assert.deepStrictEqual(callbackCalls[0], { valid: true });
});

test("should handle empty lines", () => {
const { jsonlStream, callbackCalls } = setup();

const testData = Buffer.from('\n\n{"key":"value"}\n\n');

jsonlStream.write(testData);

// Should only emit the valid JSON object
assert.strictEqual(callbackCalls.length, 1);
assert.deepStrictEqual(callbackCalls[0], { key: "value" });
});
});
54 changes: 24 additions & 30 deletions src/utils/container-status.ts
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import type { Disposable, LogOutputChannel } from "vscode";
import * as z from "zod/v4-mini";

import { createEmitter } from "./emitter.ts";
import { JsonlStream } from "./jsonl-stream.ts";

export type ContainerStatus = "running" | "stopping" | "stopped";

Expand Down Expand Up @@ -63,14 +64,6 @@ const DockerEventsSchema = z.object({
}),
});

function safeJsonParse(text: string): unknown {
try {
return JSON.parse(text);
} catch {
return undefined;
}
}

function listenToContainerStatus(
containerName: string,
outputChannel: LogOutputChannel,
Expand Down Expand Up @@ -130,32 +123,33 @@ function listenToContainerStatus(
throw new Error("Failed to get stdout from docker events process");
}

dockerEvents.stdout.on("data", (data: Buffer) => {
const lines = data.toString().split("\n").filter(Boolean);
for (const line of lines) {
const json = safeJsonParse(line);
const parsed = DockerEventsSchema.safeParse(json);
if (!parsed.success) {
continue;
}
const jsonlStream = new JsonlStream();
jsonlStream.onJson((json) => {
const parsed = DockerEventsSchema.safeParse(json);
if (!parsed.success) {
return;
}

if (parsed.data.Actor.Attributes.name !== containerName) {
continue;
}
if (parsed.data.Actor.Attributes.name !== containerName) {
return;
}

switch (parsed.data.Action) {
case "start":
onStatusChange("running");
break;
case "kill":
onStatusChange("stopping");
break;
case "die":
onStatusChange("stopped");
break;
}
outputChannel.debug(`[container.status]: ${parsed.data.Action}`);

switch (parsed.data.Action) {
case "start":
onStatusChange("running");
break;
case "kill":
onStatusChange("stopping");
break;
case "die":
onStatusChange("stopped");
break;
}
});

dockerEvents.stdout.pipe(jsonlStream);
} catch (error) {
// If we can't spawn the process, try again after a delay
scheduleRestart();
Expand Down
52 changes: 52 additions & 0 deletions src/utils/jsonl-stream.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,52 @@
import { Writable } from "node:stream";

/**
* Safely parses a JSON string, returning null if parsing fails.
* @param str - The JSON string to parse.
* @returns The parsed object or null if invalid.
*/
export function safeJsonParse(str: string): unknown {
try {
return JSON.parse(str);
} catch {
return undefined;
}
}

/**
* Writable stream that buffers data until a newline,
* parses each line as JSON, and emits the parsed object.
*/
export class JsonlStream extends Writable {
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: may be it's just for me, I find it hard to read this class name - lowercase l before uppercase S is a bit lost and indeed may look like a typo. :) Moreover, jsonl is not fully standardized yet (both JSONL and JSON Lines are used). How about JsonLineStream for both class and variable name? Again, had to use singular "Line" to avoid double S in JsonLinesStream, but I'm fine with both. WDYT?

Suggested change
export class JsonlStream extends Writable {
export class JsonLineStream extends Writable {

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm also torn between JsonLineStream and JsonLinesStream, but I believe the latter is more correct. Asked AI for an opinion :)

JsonLinesStream is the better choice.
 • The format is commonly called “JSON Lines” (see: jsonlines.org ↗), not “JSON Line”.
 • The class processes multiple lines, not just a single line.
 • “JsonLinesStream” clearly communicates that it deals with a stream of JSON lines.

So: JsonLinesStream is correct and idiomatic.

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Does AI have opinion on readability and accidental ligatures? :D

I do, and:

  • I agree that plural is better semantically 👍
  • sS is more difficult to pronounce but easier to read than lS, lowercase s is not lost.

Thanks for updating this!

Copy link
Copy Markdown
Contributor Author

@skyrpex skyrpex Sep 3, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Agreed that it's awkward having an sS, although when it comes to pronouncing you can just omit it and say Json Line Stream, I won't notice the difference 😆

constructor() {
let buffer = "";
super({
write: (chunk, _encoding, callback) => {
buffer += String(chunk);

let newlineIndex = buffer.indexOf("\n");
while (newlineIndex !== -1) {
const line = buffer.substring(0, newlineIndex).trim();
buffer = buffer.substring(newlineIndex + 1);

const json = safeJsonParse(line);
if (json !== undefined) {
this.emit("json", json);
}

newlineIndex = buffer.indexOf("\n");
}

callback();
},
});
}

/**
* Registers a listener for parsed JSON objects.
* @param listener - Function called with each parsed object.
*/
onJson(callback: (json: unknown) => void) {
this.on("json", callback);
}
}
Loading