Skip to content

Commit 48b3ce5

Browse files
committed
Add reset & chainable repeat ws endpoints
1 parent 83593e9 commit 48b3ce5

5 files changed

Lines changed: 239 additions & 0 deletions

File tree

src/endpoints/ws-index.ts

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -15,3 +15,5 @@ export * from './ws/echo.js';
1515
export * from './ws/delay.js';
1616
export * from './ws/close.js';
1717
export * from './ws/message.js';
18+
export * from './ws/reset.js';
19+
export * from './ws/repeat.js';

src/endpoints/ws/repeat.ts

Lines changed: 52 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,52 @@
1+
import { StatusError } from '@httptoolkit/util';
2+
import { WebSocketEndpoint } from '../ws-index.js';
3+
4+
const REPEAT_PREFIX = '/ws/repeat/';
5+
6+
const getRemainingPath = (path: string): string | undefined => {
7+
// /ws/repeat/$message/$freq/...
8+
const afterPrefix = path.slice(REPEAT_PREFIX.length);
9+
const firstSlash = afterPrefix.indexOf('/');
10+
if (firstSlash === -1) return undefined;
11+
const afterMessage = afterPrefix.slice(firstSlash + 1);
12+
const secondSlash = afterMessage.indexOf('/');
13+
return secondSlash !== -1 ? '/ws' + afterMessage.slice(secondSlash) : undefined;
14+
};
15+
16+
const parseParams = (path: string): { message: string; freqMs: number } => {
17+
const afterPrefix = path.slice(REPEAT_PREFIX.length);
18+
const firstSlash = afterPrefix.indexOf('/');
19+
if (firstSlash === -1) {
20+
return { message: '', freqMs: NaN };
21+
}
22+
const message = decodeURIComponent(afterPrefix.slice(0, firstSlash));
23+
const afterMessage = afterPrefix.slice(firstSlash + 1);
24+
const secondSlash = afterMessage.indexOf('/');
25+
const freqStr = secondSlash !== -1 ? afterMessage.slice(0, secondSlash) : afterMessage;
26+
return { message, freqMs: parseInt(freqStr, 10) };
27+
};
28+
29+
export const wsRepeatEndpoint: WebSocketEndpoint = {
30+
matchPath: (path) => {
31+
if (!path.startsWith(REPEAT_PREFIX)) return false;
32+
const { freqMs } = parseParams(path);
33+
if (isNaN(freqMs) || freqMs <= 0) {
34+
throw new StatusError(400, `Invalid repeat frequency in ${path}`);
35+
}
36+
return true;
37+
},
38+
getRemainingPath,
39+
handle: (ws, req, { path }) => {
40+
const { message, freqMs } = parseParams(path);
41+
42+
const interval = setInterval(() => {
43+
if (ws.readyState === ws.OPEN) {
44+
ws.send(message);
45+
} else {
46+
clearInterval(interval);
47+
}
48+
}, freqMs);
49+
50+
ws.on('close', () => clearInterval(interval));
51+
}
52+
};

src/endpoints/ws/reset.ts

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,10 @@
1+
import { WebSocketEndpoint } from '../ws-index.js';
2+
3+
export const wsResetEndpoint: WebSocketEndpoint = {
4+
matchPath: (path) => path === '/ws/error/reset',
5+
handle: (ws) => {
6+
// @ts-ignore - accessing internal socket
7+
const socket = ws._socket;
8+
socket?.destroy();
9+
}
10+
};

test/ws-repeat.spec.ts

Lines changed: 115 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,115 @@
1+
import * as net from 'net';
2+
import { expect } from 'chai';
3+
import { WebSocket } from 'ws';
4+
import { DestroyableServer, makeDestroyable } from 'destroyable-server';
5+
6+
import { createServer } from '../src/server.js';
7+
8+
describe("WebSocket Repeat endpoint", () => {
9+
10+
let server: DestroyableServer;
11+
let serverPort: number;
12+
13+
beforeEach(async () => {
14+
server = makeDestroyable(await createServer({
15+
domain: 'localhost'
16+
}));
17+
await new Promise<void>((resolve) => server.listen(resolve));
18+
serverPort = (server.address() as net.AddressInfo).port;
19+
});
20+
21+
afterEach(async () => {
22+
await server.destroy();
23+
});
24+
25+
it("sends repeated messages at specified interval", async () => {
26+
const ws = new WebSocket(`ws://localhost:${serverPort}/ws/repeat/ping/50`);
27+
28+
const messages: { text: string; time: number }[] = [];
29+
const startTime = Date.now();
30+
31+
ws.on('message', (data) => {
32+
messages.push({ text: data.toString(), time: Date.now() - startTime });
33+
});
34+
35+
await new Promise<void>((resolve) => {
36+
ws.on('open', () => {
37+
setTimeout(() => {
38+
ws.close();
39+
resolve();
40+
}, 180);
41+
});
42+
});
43+
44+
// Should have received ~3-4 messages in 180ms at 50ms intervals
45+
expect(messages.length).to.be.greaterThanOrEqual(3);
46+
expect(messages.length).to.be.lessThanOrEqual(4);
47+
expect(messages.every(m => m.text === 'ping')).to.be.true;
48+
});
49+
50+
it("decodes URL-encoded messages", async () => {
51+
const ws = new WebSocket(`ws://localhost:${serverPort}/ws/repeat/hello%20world/50`);
52+
53+
const message = await new Promise<string>((resolve) => {
54+
ws.on('message', (data) => {
55+
resolve(data.toString());
56+
ws.close();
57+
});
58+
});
59+
60+
expect(message).to.equal('hello world');
61+
});
62+
63+
it("rejects invalid frequency", async () => {
64+
const ws = new WebSocket(`ws://localhost:${serverPort}/ws/repeat/msg/notanumber`);
65+
66+
const result = await new Promise<{ statusCode: number }>((resolve, reject) => {
67+
ws.on('open', () => {
68+
reject(new Error('Should not have connected'));
69+
});
70+
ws.on('unexpected-response', (req, res) => {
71+
resolve({ statusCode: res.statusCode! });
72+
});
73+
ws.on('error', reject);
74+
});
75+
76+
expect(result.statusCode).to.equal(400);
77+
});
78+
79+
it("rejects zero frequency", async () => {
80+
const ws = new WebSocket(`ws://localhost:${serverPort}/ws/repeat/msg/0`);
81+
82+
const result = await new Promise<{ statusCode: number }>((resolve, reject) => {
83+
ws.on('open', () => {
84+
reject(new Error('Should not have connected'));
85+
});
86+
ws.on('unexpected-response', (req, res) => {
87+
resolve({ statusCode: res.statusCode! });
88+
});
89+
ws.on('error', reject);
90+
});
91+
92+
expect(result.statusCode).to.equal(400);
93+
});
94+
95+
it("chains with close", async () => {
96+
const ws = new WebSocket(`ws://localhost:${serverPort}/ws/repeat/tick/30/delay/0.1/close/1000`);
97+
98+
const messages: string[] = [];
99+
ws.on('message', (data) => {
100+
messages.push(data.toString());
101+
});
102+
103+
const result = await new Promise<{ code: number }>((resolve) => {
104+
ws.on('close', (code) => {
105+
resolve({ code });
106+
});
107+
});
108+
109+
// Should have received messages during the 100ms delay
110+
expect(messages.length).to.be.greaterThanOrEqual(2);
111+
expect(messages.every(m => m === 'tick')).to.be.true;
112+
expect(result.code).to.equal(1000);
113+
});
114+
115+
});

test/ws-reset.spec.ts

Lines changed: 60 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,60 @@
1+
import * as net from 'net';
2+
import { expect } from 'chai';
3+
import { WebSocket } from 'ws';
4+
import { DestroyableServer, makeDestroyable } from 'destroyable-server';
5+
6+
import { createServer } from '../src/server.js';
7+
8+
describe("WebSocket Reset endpoint", () => {
9+
10+
let server: DestroyableServer;
11+
let serverPort: number;
12+
13+
beforeEach(async () => {
14+
server = makeDestroyable(await createServer({
15+
domain: 'localhost'
16+
}));
17+
await new Promise<void>((resolve) => server.listen(resolve));
18+
serverPort = (server.address() as net.AddressInfo).port;
19+
});
20+
21+
afterEach(async () => {
22+
await server.destroy();
23+
});
24+
25+
it("resets the connection abruptly", async () => {
26+
const ws = new WebSocket(`ws://localhost:${serverPort}/ws/error/reset`);
27+
28+
const result = await new Promise<{ event: string; code?: number }>((resolve) => {
29+
ws.on('open', () => {
30+
// Connection opened, reset should happen immediately
31+
});
32+
ws.on('close', (code) => {
33+
resolve({ event: 'close', code });
34+
});
35+
ws.on('error', () => {
36+
resolve({ event: 'error' });
37+
});
38+
});
39+
40+
// Connection should be terminated abnormally
41+
expect(result.event).to.equal('close');
42+
expect(result.code).to.equal(1006); // Abnormal closure
43+
});
44+
45+
it("works in a chain after delay", async () => {
46+
const startTime = Date.now();
47+
const ws = new WebSocket(`ws://localhost:${serverPort}/ws/delay/0.1/error/reset`);
48+
49+
const result = await new Promise<{ code: number }>((resolve) => {
50+
ws.on('close', (code) => {
51+
resolve({ code });
52+
});
53+
});
54+
55+
const elapsed = Date.now() - startTime;
56+
expect(elapsed).to.be.greaterThan(90);
57+
expect(result.code).to.equal(1006);
58+
});
59+
60+
});

0 commit comments

Comments
 (0)