Skip to content

Commit d712c85

Browse files
committed
Add tests for readEvents.
1 parent 6b3e3c1 commit d712c85

17 files changed

Lines changed: 374 additions & 108 deletions

package-lock.json

Lines changed: 1 addition & 11 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

package.json

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -14,8 +14,7 @@
1414
},
1515
"types": "./dist/index.d.ts",
1616
"dependencies": {
17-
"axios": "1.8.4",
18-
"stream-to-async-iterator": "1.0.0"
17+
"axios": "1.8.4"
1918
},
2019
"devDependencies": {
2120
"@biomejs/biome": "1.9.4",

src/Client.ts

Lines changed: 13 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -125,22 +125,29 @@ class Client {
125125
const apiToken = this.#apiToken;
126126

127127
return (async function* () {
128-
const response = await axios({
129-
url,
128+
const response = await fetch(url, {
130129
method: 'post',
131130
headers: {
132131
authorization: `Bearer ${apiToken}`,
133132
'content-type': 'application/json',
134133
},
135-
data: {
134+
body: JSON.stringify({
136135
subject,
137136
options,
138-
},
139-
responseType: 'stream',
137+
}),
140138
signal: abortController.signal,
141139
});
142140

143-
for await (const line of readNdJsonStream(response.data)) {
141+
if (response.status !== 200) {
142+
throw new Error(
143+
`Failed to read events, got HTTP status code '${response.status}', expected '200'.`,
144+
);
145+
}
146+
if (!response.body) {
147+
throw new Error('Failed to read events.');
148+
}
149+
150+
for await (const line of readNdJsonStream(response.body)) {
144151
if (isStreamHeartbeat(line)) {
145152
continue;
146153
}

src/index.ts

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,9 +1,10 @@
11
import { Client } from './Client.js';
2+
import type { Event } from './Event.js';
23
import type { EventCandidate } from './EventCandidate.js';
34
import type { ObserveEventsOptions } from './ObserveEventsOptions.js';
45
import type { ReadEventsOptions } from './ReadEventsOptions.js';
56
import { isSubjectOnEventId } from './isSubjectOnEventId.js';
67
import { isSubjectPristine } from './isSubjectPristine.js';
78

89
export { Client, isSubjectPristine, isSubjectOnEventId };
9-
export type { EventCandidate, ReadEventsOptions, ObserveEventsOptions };
10+
export type { Event, EventCandidate, ReadEventsOptions, ObserveEventsOptions };

src/ndjson/LinesDecoder.test.ts

Lines changed: 0 additions & 22 deletions
This file was deleted.

src/ndjson/LinesDecoder.ts

Lines changed: 0 additions & 37 deletions
This file was deleted.

src/ndjson/readNdJsonStream.test.ts

Lines changed: 18 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -3,10 +3,26 @@ import { Readable } from 'node:stream';
33
import { suite, test } from 'node:test';
44
import { readNdJsonStream } from './readNdJsonStream.js';
55

6+
function convertStreamToWebStream(stream: Readable): ReadableStream<Uint8Array> {
7+
return new ReadableStream({
8+
start(controller) {
9+
stream.on('data', chunk => {
10+
controller.enqueue(new Uint8Array(chunk));
11+
});
12+
stream.on('end', () => {
13+
controller.close();
14+
});
15+
stream.on('error', err => {
16+
controller.error(err);
17+
});
18+
},
19+
});
20+
}
21+
622
suite('readNdJsonStream', (): void => {
723
test('returns an async generator that yields parsed json objects.', async (): Promise<void> => {
8-
const stream = Readable.from(
9-
Buffer.from('{"foo":"bar"}\n{"bar":"baz"}\n{"incomplete', 'utf-8'),
24+
const stream = convertStreamToWebStream(
25+
Readable.from(Buffer.from('{"foo":"bar"}\n{"bar":"baz"}\n{"incomplete', 'utf-8')),
1026
);
1127

1228
const values: Record<string, unknown>[] = [];

src/ndjson/readNdJsonStream.ts

Lines changed: 23 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -1,19 +1,31 @@
1-
import type { Readable } from 'node:stream';
2-
import StreamToAsyncIterator from 'stream-to-async-iterator';
3-
import { LinesDecoder } from './LinesDecoder.js';
4-
51
const readNdJsonStream = async function* (
6-
stream: Readable,
2+
stream: ReadableStream<Uint8Array>,
73
): AsyncGenerator<Record<string, unknown>, void, void> {
8-
const decoder = new LinesDecoder('utf-8');
4+
const reader = stream.getReader();
5+
const decoder = new TextDecoder('utf-8');
6+
let buffer = '';
7+
8+
while (true) {
9+
const { done, value } = await reader.read();
10+
if (done) {
11+
break;
12+
}
13+
14+
buffer += decoder.decode(value, { stream: true });
915

10-
for await (const chunk of new StreamToAsyncIterator<Buffer>(stream)) {
11-
const lines = decoder.write(chunk);
16+
let index: number;
17+
while (true) {
18+
index = buffer.indexOf('\n');
19+
if (index < 0) {
20+
break;
21+
}
1222

13-
for (const line of lines) {
14-
const parsedLine = JSON.parse(line);
23+
const line = buffer.slice(0, index).trim();
24+
buffer = buffer.slice(index + 1);
1525

16-
yield parsedLine;
26+
if (line) {
27+
yield JSON.parse(line);
28+
}
1729
}
1830
}
1931
};

src/stream/isStreamCloudEvent.ts

Lines changed: 8 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,14 @@ const blueprint: StreamCloudEvent = {
1818
};
1919

2020
const isStreamCloudEvent = (line: unknown): line is StreamCloudEvent => {
21-
return hasShapeOf(line, blueprint);
21+
if (!hasShapeOf(line, blueprint)) {
22+
return false;
23+
}
24+
if (line.type !== 'event') {
25+
return false;
26+
}
27+
28+
return true;
2229
};
2330

2431
export { isStreamCloudEvent };

src/stream/isStreamError.ts

Lines changed: 8 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -9,7 +9,14 @@ const blueprint: StreamError = {
99
};
1010

1111
const isStreamError = (line: unknown): line is StreamError => {
12-
return hasShapeOf(line, blueprint);
12+
if (!hasShapeOf(line, blueprint)) {
13+
return false;
14+
}
15+
if (line.type !== 'error') {
16+
return false;
17+
}
18+
19+
return true;
1320
};
1421

1522
export { isStreamError };

0 commit comments

Comments
 (0)