-
Notifications
You must be signed in to change notification settings - Fork 3
fix: partial line handling in docker events listener #8
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from 6 commits
068e717
a57a5c5
06168b2
221cc6c
fa01831
5ca7181
5a1266d
56179e4
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,96 @@ | ||
| import * as assert from "node:assert"; | ||
|
|
||
| import { JsonlStream } from "../utils/jsonl-stream.js"; | ||
|
|
||
| const setup = () => { | ||
| const jsonlStream = new JsonlStream(); | ||
|
|
||
| const callbackCalls: unknown[] = []; | ||
| jsonlStream.onJson((data: unknown) => { | ||
| callbackCalls.push(data); | ||
| }); | ||
|
|
||
| return { | ||
| jsonlStream, | ||
| callbackCalls, | ||
| }; | ||
| }; | ||
|
|
||
| suite("JSONL Streams", () => { | ||
| test("should parse and emit complete JSONL messages", () => { | ||
| const { jsonlStream, callbackCalls } = setup(); | ||
|
|
||
| // Test with multiple JSON objects in a single write | ||
| const testData = Buffer.from('{"key1":"value1"}\n{"key2":"value2"}\n'); | ||
|
|
||
| jsonlStream.write(testData); | ||
|
|
||
| assert.strictEqual(callbackCalls.length, 2); | ||
| assert.deepStrictEqual(callbackCalls[0], { key1: "value1" }); | ||
| assert.deepStrictEqual(callbackCalls[1], { key2: "value2" }); | ||
| }); | ||
|
|
||
| test("should handle incomplete JSONL messages across multiple writes", () => { | ||
| const { jsonlStream, callbackCalls } = setup(); | ||
|
|
||
| // First write with partial message | ||
| const firstChunk = Buffer.from('{"key":"value'); | ||
| jsonlStream.write(firstChunk); | ||
|
|
||
| // Shouldn't emit anything yet | ||
| assert.strictEqual(callbackCalls.length, 0); | ||
|
|
||
| // Complete the message in second write | ||
| const secondChunk = Buffer.from('1"}\n'); | ||
| jsonlStream.write(secondChunk); | ||
|
|
||
| // Now it should emit the complete message | ||
| assert.strictEqual(callbackCalls.length, 1); | ||
| assert.deepStrictEqual(callbackCalls[0], { key: "value1" }); | ||
| }); | ||
|
|
||
| test("should handle multiple messages in chunks", () => { | ||
| const { jsonlStream, callbackCalls } = setup(); | ||
|
|
||
| // Write first message and part of second | ||
| const firstChunk = Buffer.from('{"first":1}\n{"second":'); | ||
| jsonlStream.write(firstChunk); | ||
|
|
||
| // First message should be emitted | ||
| assert.strictEqual(callbackCalls.length, 1); | ||
| assert.deepStrictEqual(callbackCalls[0], { first: 1 }); | ||
|
|
||
| // Complete second message and add third | ||
| const secondChunk = Buffer.from('2}\n{"third":3}\n'); | ||
| jsonlStream.write(secondChunk); | ||
|
|
||
| // Should have all three messages now | ||
| assert.strictEqual(callbackCalls.length, 3); | ||
| assert.deepStrictEqual(callbackCalls[1], { second: 2 }); | ||
| assert.deepStrictEqual(callbackCalls[2], { third: 3 }); | ||
| }); | ||
|
|
||
| test("should ignore invalid JSON lines", () => { | ||
| const { jsonlStream, callbackCalls } = setup(); | ||
|
|
||
| const testData = Buffer.from('not json\n{"valid":true}\n{invalid}\n'); | ||
|
|
||
| jsonlStream.write(testData); | ||
|
|
||
| // Should only emit the valid JSON object | ||
| assert.strictEqual(callbackCalls.length, 1); | ||
| assert.deepStrictEqual(callbackCalls[0], { valid: true }); | ||
| }); | ||
|
|
||
| test("should handle empty lines", () => { | ||
| const { jsonlStream, callbackCalls } = setup(); | ||
|
|
||
| const testData = Buffer.from('\n\n{"key":"value"}\n\n'); | ||
|
|
||
| jsonlStream.write(testData); | ||
|
|
||
| // Should only emit the valid JSON object | ||
| assert.strictEqual(callbackCalls.length, 1); | ||
| assert.deepStrictEqual(callbackCalls[0], { key: "value" }); | ||
| }); | ||
| }); | ||
| Original file line number | Diff line number | Diff line change | ||||
|---|---|---|---|---|---|---|
| @@ -0,0 +1,52 @@ | ||||||
| import { Writable } from "node:stream"; | ||||||
|
|
||||||
| /** | ||||||
| * Safely parses a JSON string, returning null if parsing fails. | ||||||
| * @param str - The JSON string to parse. | ||||||
| * @returns The parsed object or null if invalid. | ||||||
| */ | ||||||
| export function safeJsonParse(str: string): unknown { | ||||||
| try { | ||||||
| return JSON.parse(str); | ||||||
| } catch { | ||||||
| return undefined; | ||||||
| } | ||||||
| } | ||||||
|
|
||||||
| /** | ||||||
| * Writable stream that buffers data until a newline, | ||||||
| * parses each line as JSON, and emits the parsed object. | ||||||
| */ | ||||||
| export class JsonlStream extends Writable { | ||||||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. nit: may be it's just for me, I find it hard to read this class name - lowercase
Suggested change
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I'm also torn between JsonLineStream and JsonLinesStream, but I believe the latter is more correct. Asked AI for an opinion :)
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Does AI have opinion on readability and accidental ligatures? :D I do, and:
Thanks for updating this!
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Agreed that it's awkward having an |
||||||
| constructor() { | ||||||
| let buffer = ""; | ||||||
| super({ | ||||||
| write: (chunk, _encoding, callback) => { | ||||||
| buffer += String(chunk); | ||||||
|
|
||||||
| let newlineIndex = buffer.indexOf("\n"); | ||||||
| while (newlineIndex !== -1) { | ||||||
| const line = buffer.substring(0, newlineIndex).trim(); | ||||||
| buffer = buffer.substring(newlineIndex + 1); | ||||||
|
|
||||||
| const json = safeJsonParse(line); | ||||||
| if (json !== undefined) { | ||||||
| this.emit("json", json); | ||||||
| } | ||||||
|
|
||||||
| newlineIndex = buffer.indexOf("\n"); | ||||||
| } | ||||||
|
|
||||||
| callback(); | ||||||
| }, | ||||||
| }); | ||||||
| } | ||||||
|
|
||||||
| /** | ||||||
| * Registers a listener for parsed JSON objects. | ||||||
| * @param listener - Function called with each parsed object. | ||||||
| */ | ||||||
| onJson(callback: (json: unknown) => void) { | ||||||
| this.on("json", callback); | ||||||
| } | ||||||
| } | ||||||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for adding this test case, it is exactly the doubt I had when looking at the code!