Skip to content

Commit 9147560

Browse files
ApoorvApoorv
authored andcommitted
fix: resolve allowHalfOpen behavior and refactor NativeModules usage
1 parent bf026d3 commit 9147560

10 files changed

Lines changed: 211 additions & 17 deletions

File tree

__tests__/allowHalfOpen.test.js

Lines changed: 103 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,103 @@
1+
2+
import { expect, test, jest } from '@jest/globals';
3+
import net from '../src/index';
4+
import { nativeEventEmitter } from '../src/Globals';
5+
import { NativeModules } from 'react-native';
6+
7+
const Sockets = NativeModules.TcpSockets;
8+
9+
jest.mock('../src/Globals', () => {
10+
const { EventEmitter } = require('events');
11+
const emitter = new EventEmitter();
12+
const originalAddListener = emitter.addListener.bind(emitter);
13+
emitter.addListener = (event, listener) => {
14+
originalAddListener(event, listener);
15+
return { remove: () => emitter.removeListener(event, listener) };
16+
};
17+
18+
let idCounter = 1000;
19+
return {
20+
__esModule: true,
21+
nativeEventEmitter: emitter,
22+
getNextId: () => idCounter++,
23+
};
24+
});
25+
26+
test('allowHalfOpen: false (default) should call Sockets.end() on end event', (done) => {
27+
// Reset mocks
28+
Sockets.end.mockClear();
29+
30+
const server = net.createServer(); // allowHalfOpen default false
31+
const serverId = server._id;
32+
server.listen(12345);
33+
34+
server.on('connection', (socket) => {
35+
socket.on('end', () => {
36+
try {
37+
// When we receive 'end', if allowHalfOpen is false, socket.end() should be called
38+
// which calls Sockets.end(id)
39+
expect(Sockets.end).toHaveBeenCalled();
40+
done();
41+
} catch (e) {
42+
done(e);
43+
}
44+
});
45+
});
46+
47+
// Simulate connection
48+
nativeEventEmitter.emit('connection', {
49+
id: serverId,
50+
info: {
51+
id: 456,
52+
connection: {
53+
localAddress: '127.0.0.1',
54+
localPort: 12345,
55+
remoteAddress: '127.0.0.1',
56+
remotePort: 54321,
57+
remoteFamily: 'IPv4'
58+
}
59+
}
60+
});
61+
62+
// Simulate 'end' event from native for socket 456
63+
nativeEventEmitter.emit('end', { id: 456 });
64+
});
65+
66+
test('allowHalfOpen: true should NOT call Sockets.end() on end event', (done) => {
67+
// Reset mocks
68+
Sockets.end.mockClear();
69+
70+
const server = net.createServer({ allowHalfOpen: true });
71+
const serverId = server._id;
72+
server.listen(12346);
73+
74+
server.on('connection', (socket) => {
75+
socket.on('end', () => {
76+
try {
77+
// When we receive 'end', if allowHalfOpen is true, socket.end() should NOT be called
78+
expect(Sockets.end).not.toHaveBeenCalled();
79+
done();
80+
} catch (e) {
81+
done(e);
82+
}
83+
});
84+
});
85+
86+
// Simulate connection
87+
nativeEventEmitter.emit('connection', {
88+
id: serverId,
89+
info: {
90+
id: 457,
91+
connection: {
92+
localAddress: '127.0.0.1',
93+
localPort: 12346,
94+
remoteAddress: '127.0.0.1',
95+
remotePort: 54321,
96+
remoteFamily: 'IPv4'
97+
}
98+
}
99+
});
100+
101+
// Simulate 'end' event from native for socket 457
102+
nativeEventEmitter.emit('end', { id: 457 });
103+
});

__tests__/server_options.test.js

Lines changed: 50 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,50 @@
1+
2+
import { expect, test, jest } from '@jest/globals';
3+
4+
jest.mock('../src/Globals', () => {
5+
const { EventEmitter } = require('events');
6+
const emitter = new EventEmitter();
7+
const originalAddListener = emitter.addListener.bind(emitter);
8+
emitter.addListener = (event, listener) => {
9+
originalAddListener(event, listener);
10+
return { remove: () => emitter.removeListener(event, listener) };
11+
};
12+
return {
13+
__esModule: true,
14+
nativeEventEmitter: emitter,
15+
getNextId: () => 123,
16+
};
17+
});
18+
19+
import net from '../src/index';
20+
import { nativeEventEmitter } from '../src/Globals';
21+
22+
test('server option pauseOnConnect should pause the socket', (done) => {
23+
const server = net.createServer({ pauseOnConnect: true });
24+
25+
server.listen(12345);
26+
27+
server.on('connection', (socket) => {
28+
try {
29+
// Check if socket is paused
30+
expect(socket._paused).toBe(true);
31+
done();
32+
} catch (error) {
33+
done(error);
34+
}
35+
});
36+
37+
nativeEventEmitter.emit('connection', {
38+
id: 123,
39+
info: {
40+
id: 456,
41+
connection: {
42+
localAddress: '127.0.0.1',
43+
localPort: 12345,
44+
remoteAddress: '127.0.0.1',
45+
remotePort: 54321,
46+
remoteFamily: 'IPv4'
47+
}
48+
}
49+
});
50+
});

android/src/main/java/com/asterinet/react/tcpsocket/TcpEventListener.java

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -92,6 +92,12 @@ public void onData(int id, byte[] data) {
9292
sendEvent("data", eventParams);
9393
}
9494

95+
public void onEnd(int id) {
96+
WritableMap eventParams = Arguments.createMap();
97+
eventParams.putInt("id", id);
98+
sendEvent("end", eventParams);
99+
}
100+
95101
public void onWritten(int id, int msgId, @Nullable Exception e) {
96102
String error = null;
97103
if (e != null) {

android/src/main/java/com/asterinet/react/tcpsocket/TcpSocketClient.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -266,7 +266,8 @@ public void run() {
266266
if (bufferCount > 0) {
267267
receiverListener.onData(socketId, Arrays.copyOfRange(buffer, 0, bufferCount));
268268
} else if (bufferCount == -1) {
269-
clientSocket.destroy();
269+
receiverListener.onEnd(socketId);
270+
break;
270271
}
271272
}
272273
} catch (IOException | InterruptedException ioe) {

ios/TcpSocketClient.h

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -40,6 +40,7 @@ typedef enum RCTTCPError RCTTCPError;
4040
- (void)onSecureConnection:(TcpSocketClient *)client
4141
toClient:(NSNumber *)clientID;
4242
- (void)onData:(NSNumber *)clientID data:(NSData *)data;
43+
- (void)onEnd:(NSNumber *)clientID;
4344
- (void)onClose:(TcpSocketClient *)client withError:(NSError *)err;
4445
- (void)onError:(TcpSocketClient *)client withError:(NSError *)err;
4546
- (void)onWrittenData:(TcpSocketClient *)client msgId:(NSNumber *)msgId;

ios/TcpSocketClient.m

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -579,9 +579,9 @@ - (void)socket:(GCDAsyncSocket *)sock
579579
}
580580

581581
- (void)socketDidCloseReadStream:(GCDAsyncSocket *)sock {
582-
// TODO : investigate for half-closed sockets
583-
// for now close the stream completely
584-
[sock disconnect];
582+
if (_clientDelegate) {
583+
[_clientDelegate onEnd:[sock userData]];
584+
}
585585
}
586586

587587
- (void)socketDidDisconnect:(GCDAsyncSocket *)sock withError:(NSError *)err {

ios/TcpSockets.m

Lines changed: 12 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -22,7 +22,7 @@ @implementation TcpSockets {
2222
- (NSArray<NSString *> *)supportedEvents {
2323
return @[
2424
@"connect", @"listening", @"connection", @"secureConnection", @"data",
25-
@"close", @"error", @"written"
25+
@"close", @"error", @"written", @"end"
2626
];
2727
}
2828

@@ -294,7 +294,17 @@ - (void)onSocketConnection:(TcpSocketClient *)client
294294
- (void)onData:(NSNumber *)clientID data:(NSData *)data {
295295
NSString *base64String = [data base64EncodedStringWithOptions:0];
296296
[self sendEventWithName:@"data"
297-
body:@{@"id" : clientID, @"data" : base64String}];
297+
body:@{
298+
@"id" : clientID,
299+
@"data" : base64String
300+
}];
301+
}
302+
303+
- (void)onEnd:(NSNumber *)clientID {
304+
[self sendEventWithName:@"end"
305+
body:@{
306+
@"id" : clientID
307+
}];
298308
}
299309

300310
- (void)onClose:(NSNumber *)clientID withError:(NSError *)err {

jest.setup.js

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,10 @@ jest.mock('react-native', () => {
1414
destroy: jest.fn(),
1515
write: jest.fn(),
1616
listen: jest.fn(),
17+
pause: jest.fn(),
18+
resume: jest.fn(),
19+
setKeepAlive: jest.fn(),
20+
setNoDelay: jest.fn(),
1721
},
1822
},
1923
NativeEventEmitter: NativeEventEmitter,

src/Server.js

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -38,6 +38,7 @@ export default class Server extends EventEmitter {
3838
this._id = getNextId();
3939
/** @protected @readonly */
4040
this._eventEmitter = nativeEventEmitter;
41+
// console.log('Server eventEmitter:', this._eventEmitter);
4142
/** @private @type {Set<Socket>} */
4243
this._connections = new Set();
4344
/** @private */
@@ -249,6 +250,14 @@ export default class Server extends EventEmitter {
249250
const keepAliveDelay = this._serverOptions.keepAliveInitialDelay || 0;
250251
newSocket.setKeepAlive(this._serverOptions.keepAlive, keepAliveDelay);
251252
}
253+
254+
if (this._serverOptions.allowHalfOpen !== undefined) {
255+
newSocket.allowHalfOpen = this._serverOptions.allowHalfOpen;
256+
}
257+
258+
if (this._serverOptions.pauseOnConnect) {
259+
newSocket.pause();
260+
}
252261
}
253262

254263
return newSocket;

src/Socket.js

Lines changed: 21 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,6 @@
33
import { NativeModules } from 'react-native';
44
import EventEmitter from 'eventemitter3';
55
import { Buffer } from 'buffer';
6-
const Sockets = NativeModules.TcpSockets;
76
import { nativeEventEmitter, getNextId } from './Globals';
87

98
/**
@@ -30,6 +29,7 @@ import { nativeEventEmitter, getNextId } from './Globals';
3029
* @typedef {object} ReadableEvents
3130
* @property {() => void} pause
3231
* @property {() => void} resume
32+
* @property {() => void} end
3333
*
3434
* @typedef {object} SocketEvents
3535
* @property {(had_error: boolean) => void} close
@@ -95,6 +95,7 @@ export default class Socket extends EventEmitter {
9595
this.remoteAddress = undefined;
9696
this.remotePort = undefined;
9797
this.remoteFamily = undefined;
98+
this.allowHalfOpen = false;
9899
this._registerEvents();
99100
}
100101

@@ -164,7 +165,7 @@ export default class Socket extends EventEmitter {
164165
});
165166
this._connecting = true;
166167
this._readyState = 'opening';
167-
Sockets.connect(this._id, customOptions.host, customOptions.port, customOptions);
168+
NativeModules.TcpSockets.connect(this._id, customOptions.host, customOptions.port, customOptions);
168169
return this;
169170
}
170171

@@ -243,7 +244,7 @@ export default class Socket extends EventEmitter {
243244
this.once('connect', () => this.setNoDelay(noDelay));
244245
return this;
245246
}
246-
Sockets.setNoDelay(this._id, noDelay);
247+
NativeModules.TcpSockets.setNoDelay(this._id, noDelay);
247248
return this;
248249
}
249250

@@ -267,7 +268,7 @@ export default class Socket extends EventEmitter {
267268
);
268269
}
269270

270-
Sockets.setKeepAlive(this._id, enable, Math.floor(initialDelay));
271+
NativeModules.TcpSockets.setKeepAlive(this._id, enable, Math.floor(initialDelay));
271272
return this;
272273
}
273274

@@ -291,14 +292,14 @@ export default class Socket extends EventEmitter {
291292
end(data, encoding) {
292293
if (data) {
293294
this.write(data, encoding, () => {
294-
Sockets.end(this._id);
295+
NativeModules.TcpSockets.end(this._id);
295296
});
296297
return this;
297298
}
298299
if (this._pending || this._destroyed) return this;
299300

300301
this._clearTimeout();
301-
Sockets.end(this._id);
302+
NativeModules.TcpSockets.end(this._id);
302303
return this;
303304
}
304305

@@ -309,7 +310,7 @@ export default class Socket extends EventEmitter {
309310
if (this._destroyed) return this;
310311
this._destroyed = true;
311312
this._clearTimeout();
312-
Sockets.destroy(this._id);
313+
NativeModules.TcpSockets.destroy(this._id);
313314
return this;
314315
}
315316

@@ -357,18 +358,19 @@ export default class Socket extends EventEmitter {
357358
if (!ok) this.writableNeedDrain = true;
358359
this._lastSentMsgId = currentMsgId;
359360
this._bytesWritten += generatedBuffer.byteLength;
360-
Sockets.write(this._id, generatedBuffer.toString('base64'), currentMsgId);
361+
NativeModules.TcpSockets.write(this._id, generatedBuffer.toString('base64'), currentMsgId);
361362
return ok;
362363
}
363364

364365
/**
365366
* Pauses the reading of data. That is, `'data'` events will not be emitted. Useful to throttle back an upload.
366367
*/
367368
pause() {
368-
if (this._paused) return;
369+
if (this._paused) return this;
369370
this._paused = true;
370-
Sockets.pause(this._id);
371+
NativeModules.TcpSockets.pause(this._id);
371372
this.emit('pause');
373+
return this;
372374
}
373375

374376
/**
@@ -426,7 +428,7 @@ export default class Socket extends EventEmitter {
426428
}
427429
}
428430
this._resuming = false;
429-
Sockets.resume(this._id);
431+
NativeModules.TcpSockets.resume(this._id);
430432
}
431433

432434
/**
@@ -462,6 +464,13 @@ export default class Socket extends EventEmitter {
462464
this._setDisconnected();
463465
this.emit('close', evt.error);
464466
});
467+
this._endListener = this._eventEmitter.addListener('end', (evt) => {
468+
if (evt.id !== this._id) return;
469+
if (!this.allowHalfOpen) {
470+
this.end();
471+
}
472+
this.emit('end');
473+
});
465474
this._connectListener = this._eventEmitter.addListener('connect', (evt) => {
466475
if (evt.id !== this._id) return;
467476
this._setConnected(evt.connection);
@@ -480,6 +489,7 @@ export default class Socket extends EventEmitter {
480489
this._dataListener?.remove();
481490
this._errorListener?.remove();
482491
this._closeListener?.remove();
492+
this._endListener?.remove();
483493
this._connectListener?.remove();
484494
this._writtenListener?.remove();
485495
}

0 commit comments

Comments
 (0)