Skip to content

Commit 3fb6943

Browse files
update api spec
1 parent ea39020 commit 3fb6943

7 files changed

Lines changed: 114 additions & 39 deletions

File tree

client/README.md

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -33,20 +33,20 @@ const client = new Client({
3333

3434
client.connect();
3535

36-
client.on('open', () => {
36+
client.onOpen(() => {
3737
console.log('Connected to server');
3838

3939
// Create a new stream
4040
const stream = client.createStream('my-stream-1');
4141

42-
stream.on('data', (data) => {
42+
stream.onData((data) => {
4343
console.log('Received data:', data);
4444
});
4545

4646
stream.send({ message: 'Hello World' });
4747
});
4848

49-
client.on('close', () => {
49+
client.onClose(() => {
5050
console.log('Disconnected from server');
5151
});
5252
```

client/src/client.ts

Lines changed: 22 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,3 @@
1-
import { EventEmitter } from 'events';
21
import WebSocket from 'ws';
32
import { Message, MessageType } from './protocol';
43
import { Stream } from './stream';
@@ -10,7 +9,7 @@ export interface ClientOptions {
109
maxRetries?: number;
1110
}
1211

13-
export class Client extends EventEmitter {
12+
export class Client {
1413
private ws: WebSocket | null = null;
1514
private options: ClientOptions;
1615
public streams: Map<string, Stream> = new Map();
@@ -19,15 +18,30 @@ export class Client extends EventEmitter {
1918
private shouldReconnect: boolean = true;
2019
private retryTimer: NodeJS.Timeout | null = null;
2120

21+
private openCallbacks: (() => void)[] = [];
22+
private closeCallbacks: (() => void)[] = [];
23+
private errorCallbacks: ((error: Error) => void)[] = [];
24+
2225
constructor(options: ClientOptions) {
23-
super();
2426
this.options = {
2527
retryInterval: 1000,
2628
maxRetries: Infinity,
2729
...options
2830
};
2931
}
3032

33+
public onOpen(cb: () => void) {
34+
this.openCallbacks.push(cb);
35+
}
36+
37+
public onClose(cb: () => void) {
38+
this.closeCallbacks.push(cb);
39+
}
40+
41+
public onError(cb: (error: Error) => void) {
42+
this.errorCallbacks.push(cb);
43+
}
44+
3145
public connect(): void {
3246
this.shouldReconnect = true;
3347
this.initiateConnection();
@@ -44,7 +58,7 @@ export class Client extends EventEmitter {
4458
this.ws.onopen = () => {
4559
console.log('Connected');
4660
this.isConnected = true;
47-
this.emit('open');
61+
this.openCallbacks.forEach(cb => cb());
4862
this.flushQueue();
4963

5064
// Clear any existing Retry timer
@@ -69,6 +83,7 @@ export class Client extends EventEmitter {
6983
this.ws.onerror = (err) => {
7084
console.error('WebSocket Error:', err);
7185
// creating error usually triggers close, so cleanup logic runs there.
86+
this.errorCallbacks.forEach(cb => cb(new Error(err.message)));
7287
};
7388
}
7489

@@ -129,7 +144,7 @@ export class Client extends EventEmitter {
129144
const stream = this.streams.get(header.streamId);
130145
if (stream) {
131146
if (header.type === MessageType.CLOSE_STREAM) {
132-
stream.emit('close');
147+
stream.dispatchClose();
133148
this.streams.delete(header.streamId);
134149
} else if (header.type === MessageType.DATA) {
135150
stream.handleMessage(payload);
@@ -156,9 +171,9 @@ export class Client extends EventEmitter {
156171

157172
private cleanup(): void {
158173
this.isConnected = false;
159-
this.emit('close');
174+
this.closeCallbacks.forEach(cb => cb());
160175
// Close all streams
161-
this.streams.forEach(stream => stream.emit('close'));
176+
this.streams.forEach(stream => stream.dispatchClose());
162177
this.streams.clear();
163178
}
164179

client/src/stream.ts

Lines changed: 21 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -1,19 +1,28 @@
1-
import { EventEmitter } from 'events';
21
import { MessageType } from './protocol';
32
import { Client } from './client';
43

5-
export class Stream extends EventEmitter {
4+
export class Stream {
65
public id: string;
76
private client: Client;
87
public isOpen: boolean;
98

9+
private dataCallbacks: ((data: any) => void)[] = [];
10+
private closeCallbacks: (() => void)[] = [];
11+
1012
constructor(id: string, client: Client) {
11-
super();
1213
this.id = id;
1314
this.client = client;
1415
this.isOpen = true;
1516
}
1617

18+
public onData(cb: (data: any) => void) {
19+
this.dataCallbacks.push(cb);
20+
}
21+
22+
public onClose(cb: () => void) {
23+
this.closeCallbacks.push(cb);
24+
}
25+
1726
public send(data: any): void {
1827
if (!this.isOpen) {
1928
throw new Error('Stream is closed');
@@ -25,12 +34,19 @@ export class Stream extends EventEmitter {
2534
if (this.isOpen) {
2635
this.isOpen = false;
2736
this.client.send(this.id, MessageType.CLOSE_STREAM, null);
28-
this.emit('close');
37+
this.dispatchClose();
2938
}
3039
}
3140

3241
// Called internally by Client
3342
public handleMessage(payload: any): void {
34-
this.emit('data', payload);
43+
this.dataCallbacks.forEach(cb => cb(payload));
44+
}
45+
46+
public dispatchClose(): void {
47+
if (this.isOpen) {
48+
this.isOpen = false;
49+
}
50+
this.closeCallbacks.forEach(cb => cb());
3551
}
3652
}

server/README.md

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -31,30 +31,30 @@ import { LinkServer } from '@marketrix-ai/link-server';
3131

3232
const server = new LinkServer({ port: 8080 });
3333

34-
server.on('listening', () => {
34+
server.onListening(() => {
3535
console.log('Server is listening on port 8080');
3636
});
3737

38-
server.on('connection', (connection) => {
38+
server.onConnection((connection) => {
3939
console.log('Client connected:', connection.id);
4040

4141
// Handle new streams created by the client
42-
connection.on('stream', (stream) => {
42+
connection.onStream((stream) => {
4343
console.log(`Stream ${stream.id} created`);
4444

45-
stream.on('data', (data) => {
45+
stream.onData((data) => {
4646
console.log(`Received data on stream ${stream.id}:`, data);
4747

4848
// Example: sending data back
4949
// stream.send({ message: 'Hello from server' });
5050
});
5151

52-
stream.on('close', () => {
52+
stream.onClose(() => {
5353
console.log(`Stream ${stream.id} closed`);
5454
});
5555
});
5656

57-
connection.on('close', () => {
57+
connection.onClose(() => {
5858
console.log('Client disconnected');
5959
});
6060
});

server/src/connection.ts

Lines changed: 21 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -1,20 +1,22 @@
1-
import { EventEmitter } from 'events';
21
import WebSocket from 'ws';
32
import { MessageType, Message } from './protocol';
43
import { Stream } from './stream';
54

6-
export class Connection extends EventEmitter {
5+
export class Connection {
76
private socket: WebSocket;
87
public id: string | null;
98
public streams: Map<string, Stream>;
109

10+
private streamCallbacks: ((stream: Stream) => void)[] = [];
11+
private closeCallbacks: (() => void)[] = [];
12+
private errorCallbacks: ((error: Error) => void)[] = [];
13+
1114
private pingTimer: NodeJS.Timeout | null = null;
1215
private pongTimeoutTimer: NodeJS.Timeout | null = null;
1316
private readonly PING_INTERVAL = 2000; // 2 seconds (TEST)
1417
private readonly PONG_TIMEOUT = 2000; // 2 seconds (TEST)
1518

1619
constructor(socket: WebSocket) {
17-
super();
1820
this.socket = socket;
1921
this.id = null; // Will be set upon first message
2022
this.streams = new Map();
@@ -26,6 +28,18 @@ export class Connection extends EventEmitter {
2628
this.startPingLoop();
2729
}
2830

31+
public onStream(cb: (stream: Stream) => void) {
32+
this.streamCallbacks.push(cb);
33+
}
34+
35+
public onClose(cb: () => void) {
36+
this.closeCallbacks.push(cb);
37+
}
38+
39+
public onError(cb: (error: Error) => void) {
40+
this.errorCallbacks.push(cb);
41+
}
42+
2943
private startPingLoop(): void {
3044
this.pingTimer = setInterval(() => {
3145
if (this.socket.readyState === WebSocket.OPEN) {
@@ -84,14 +98,14 @@ export class Connection extends EventEmitter {
8498
if (!this.streams.has(streamId)) {
8599
const stream = new Stream(streamId, this);
86100
this.streams.set(streamId, stream);
87-
this.emit('stream', stream);
101+
this.streamCallbacks.forEach(cb => cb(stream));
88102
}
89103
}
90104

91105
const stream = this.streams.get(streamId);
92106
if (stream) {
93107
if (type === MessageType.CLOSE_STREAM) {
94-
stream.emit('close');
108+
stream.dispatchClose();
95109
this.streams.delete(streamId);
96110
} else if (type === MessageType.DATA) {
97111
stream.handleMessage(payload);
@@ -133,8 +147,8 @@ export class Connection extends EventEmitter {
133147
if (this.pingTimer) clearInterval(this.pingTimer);
134148
if (this.pongTimeoutTimer) clearTimeout(this.pongTimeoutTimer);
135149

136-
this.streams.forEach(stream => stream.emit('close'));
150+
this.streams.forEach(stream => stream.dispatchClose());
137151
this.streams.clear();
138-
this.emit('close');
152+
this.closeCallbacks.forEach(cb => cb());
139153
}
140154
}

server/src/server.ts

Lines changed: 19 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,3 @@
1-
import { EventEmitter } from 'events';
21
import WebSocket from 'ws';
32
import { Connection } from './connection';
43

@@ -7,30 +6,44 @@ export interface LinkServerOptions extends WebSocket.ServerOptions {
76
// for now we just extend standard WS server options
87
}
98

10-
export class LinkServer extends EventEmitter {
9+
export class LinkServer {
1110
private wss: WebSocket.Server;
11+
private connectionCallbacks: ((connection: Connection) => void)[] = [];
12+
private errorCallbacks: ((error: Error) => void)[] = [];
13+
private listeningCallbacks: (() => void)[] = [];
1214

1315
constructor(options: LinkServerOptions) {
14-
super();
1516
this.wss = new WebSocket.Server(options);
1617
this.init();
1718
}
1819

1920
private init() {
2021
this.wss.on('connection', (ws: WebSocket) => {
2122
const connection = new Connection(ws);
22-
this.emit('connection', connection);
23+
this.connectionCallbacks.forEach(cb => cb(connection));
2324
});
2425

2526
this.wss.on('error', (error) => {
26-
this.emit('error', error);
27+
this.errorCallbacks.forEach(cb => cb(error));
2728
});
2829

2930
this.wss.on('listening', () => {
30-
this.emit('listening');
31+
this.listeningCallbacks.forEach(cb => cb());
3132
});
3233
}
3334

35+
public onConnection(cb: (connection: Connection) => void) {
36+
this.connectionCallbacks.push(cb);
37+
}
38+
39+
public onError(cb: (error: Error) => void) {
40+
this.errorCallbacks.push(cb);
41+
}
42+
43+
public onListening(cb: () => void) {
44+
this.listeningCallbacks.push(cb);
45+
}
46+
3447
public close(cb?: (err?: Error) => void) {
3548
this.wss.close(cb);
3649
}

server/src/stream.ts

Lines changed: 22 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -1,19 +1,28 @@
1-
import { EventEmitter } from 'events';
21
import { MessageType } from './protocol';
32
import { Connection } from './connection';
43

5-
export class Stream extends EventEmitter {
4+
export class Stream {
65
public id: string;
76
private connection: Connection;
87
public isOpen: boolean;
98

9+
private dataCallbacks: ((data: any) => void)[] = [];
10+
private closeCallbacks: (() => void)[] = [];
11+
1012
constructor(id: string, connection: Connection) {
11-
super();
1213
this.id = id;
1314
this.connection = connection;
1415
this.isOpen = true;
1516
}
1617

18+
public onData(cb: (data: any) => void) {
19+
this.dataCallbacks.push(cb);
20+
}
21+
22+
public onClose(cb: () => void) {
23+
this.closeCallbacks.push(cb);
24+
}
25+
1726
public send(data: any): void {
1827
if (!this.isOpen) {
1928
throw new Error('Stream is closed');
@@ -25,12 +34,20 @@ export class Stream extends EventEmitter {
2534
if (this.isOpen) {
2635
this.isOpen = false;
2736
this.connection.send(this.id, MessageType.CLOSE_STREAM, null);
28-
this.emit('close');
37+
this.dispatchClose();
2938
}
3039
}
3140

3241
// Called when data is received from the connection for this stream
3342
public handleMessage(payload: any): void {
34-
this.emit('data', payload);
43+
this.dataCallbacks.forEach(cb => cb(payload));
44+
}
45+
46+
// Called internally or by Connection when stream is closed remotely
47+
public dispatchClose(): void {
48+
if (this.isOpen) {
49+
this.isOpen = false;
50+
}
51+
this.closeCallbacks.forEach(cb => cb());
3552
}
3653
}

0 commit comments

Comments
 (0)