diff --git a/__tests__/allowHalfOpen.test.js b/__tests__/allowHalfOpen.test.js new file mode 100644 index 0000000..eced7a5 --- /dev/null +++ b/__tests__/allowHalfOpen.test.js @@ -0,0 +1,109 @@ +import { expect, test, jest } from '@jest/globals'; +import net from '../src/index'; +import { nativeEventEmitter } from '../src/Globals'; +import { NativeModules } from 'react-native'; + +const Sockets = NativeModules.TcpSockets; + +jest.mock('../src/Globals', () => { + const { EventEmitter } = require('events'); + const emitter = new EventEmitter(); + const originalAddListener = emitter.addListener.bind(emitter); + // @ts-ignore + emitter.addListener = (event, listener) => { + originalAddListener(event, listener); + return { remove: () => emitter.removeListener(event, listener) }; + }; + + let idCounter = 1000; + return { + __esModule: true, + nativeEventEmitter: emitter, + getNextId: () => idCounter++, + }; +}); + +test('allowHalfOpen: false (default) should call Sockets.end() on end event', () => { + return new Promise((resolve, reject) => { + // Reset mocks + Sockets.end.mockClear(); + + const server = net.createServer(); // allowHalfOpen default false + // @ts-ignore + const serverId = server._id; + server.listen(12345); + + server.on('connection', (socket) => { + socket.on('end', () => { + try { + // When we receive 'end', if allowHalfOpen is false, socket.end() should be called + // which calls Sockets.end(id) + expect(Sockets.end).toHaveBeenCalled(); + resolve(undefined); + } catch (e) { + reject(e); + } + }); + }); + + // Simulate connection + nativeEventEmitter.emit('connection', { + id: serverId, + info: { + id: 456, + connection: { + localAddress: '127.0.0.1', + localPort: 12345, + remoteAddress: '127.0.0.1', + remotePort: 54321, + remoteFamily: 'IPv4', + }, + }, + }); + + // Simulate 'end' event from native for socket 456 + nativeEventEmitter.emit('end', { id: 456 }); + }); +}); + +test('allowHalfOpen: true should NOT call Sockets.end() on end event', () => { + return new Promise((resolve, reject) => { + // Reset mocks + Sockets.end.mockClear(); + + const server = net.createServer({ allowHalfOpen: true }); + // @ts-ignore + const serverId = server._id; + server.listen(12346); + + server.on('connection', (socket) => { + socket.on('end', () => { + try { + // When we receive 'end', if allowHalfOpen is true, socket.end() should NOT be called + expect(Sockets.end).not.toHaveBeenCalled(); + resolve(undefined); + } catch (e) { + reject(e); + } + }); + }); + + // Simulate connection + nativeEventEmitter.emit('connection', { + id: serverId, + info: { + id: 457, + connection: { + localAddress: '127.0.0.1', + localPort: 12346, + remoteAddress: '127.0.0.1', + remotePort: 54321, + remoteFamily: 'IPv4', + }, + }, + }); + + // Simulate 'end' event from native for socket 457 + nativeEventEmitter.emit('end', { id: 457 }); + }); +}); diff --git a/__tests__/server_options.test.js b/__tests__/server_options.test.js new file mode 100644 index 0000000..e343132 --- /dev/null +++ b/__tests__/server_options.test.js @@ -0,0 +1,53 @@ +import { expect, test, jest } from '@jest/globals'; + +jest.mock('../src/Globals', () => { + const { EventEmitter } = require('events'); + const emitter = new EventEmitter(); + const originalAddListener = emitter.addListener.bind(emitter); + // @ts-ignore + emitter.addListener = (event, listener) => { + originalAddListener(event, listener); + return { remove: () => emitter.removeListener(event, listener) }; + }; + return { + __esModule: true, + nativeEventEmitter: emitter, + getNextId: () => 123, + }; +}); + +import net from '../src/index'; +import { nativeEventEmitter } from '../src/Globals'; + +test('server option pauseOnConnect should pause the socket', () => { + return new Promise((resolve, reject) => { + const server = net.createServer({ pauseOnConnect: true }); + + server.listen(12345); + + server.on('connection', (socket) => { + try { + // Check if socket is paused + // @ts-ignore + expect(socket._paused).toBe(true); + resolve(undefined); + } catch (error) { + reject(error); + } + }); + + nativeEventEmitter.emit('connection', { + id: 123, + info: { + id: 456, + connection: { + localAddress: '127.0.0.1', + localPort: 12345, + remoteAddress: '127.0.0.1', + remotePort: 54321, + remoteFamily: 'IPv4', + }, + }, + }); + }); +}); diff --git a/android/src/main/java/com/asterinet/react/tcpsocket/TcpEventListener.java b/android/src/main/java/com/asterinet/react/tcpsocket/TcpEventListener.java index aed1561..7c1c6ee 100644 --- a/android/src/main/java/com/asterinet/react/tcpsocket/TcpEventListener.java +++ b/android/src/main/java/com/asterinet/react/tcpsocket/TcpEventListener.java @@ -92,6 +92,12 @@ public void onData(int id, byte[] data) { sendEvent("data", eventParams); } + public void onEnd(int id) { + WritableMap eventParams = Arguments.createMap(); + eventParams.putInt("id", id); + sendEvent("end", eventParams); + } + public void onWritten(int id, int msgId, @Nullable Exception e) { String error = null; if (e != null) { diff --git a/android/src/main/java/com/asterinet/react/tcpsocket/TcpSocketClient.java b/android/src/main/java/com/asterinet/react/tcpsocket/TcpSocketClient.java index 8ec32a2..1f214f1 100644 --- a/android/src/main/java/com/asterinet/react/tcpsocket/TcpSocketClient.java +++ b/android/src/main/java/com/asterinet/react/tcpsocket/TcpSocketClient.java @@ -266,7 +266,8 @@ public void run() { if (bufferCount > 0) { receiverListener.onData(socketId, Arrays.copyOfRange(buffer, 0, bufferCount)); } else if (bufferCount == -1) { - clientSocket.destroy(); + receiverListener.onEnd(socketId); + break; } } } catch (IOException | InterruptedException ioe) { diff --git a/ios/TcpSocketClient.h b/ios/TcpSocketClient.h index aa74c26..ab17fc9 100644 --- a/ios/TcpSocketClient.h +++ b/ios/TcpSocketClient.h @@ -40,6 +40,7 @@ typedef enum RCTTCPError RCTTCPError; - (void)onSecureConnection:(TcpSocketClient *)client toClient:(NSNumber *)clientID; - (void)onData:(NSNumber *)clientID data:(NSData *)data; +- (void)onEnd:(NSNumber *)clientID; - (void)onClose:(TcpSocketClient *)client withError:(NSError *)err; - (void)onError:(TcpSocketClient *)client withError:(NSError *)err; - (void)onWrittenData:(TcpSocketClient *)client msgId:(NSNumber *)msgId; diff --git a/ios/TcpSocketClient.m b/ios/TcpSocketClient.m index ad52d6c..f1903cc 100644 --- a/ios/TcpSocketClient.m +++ b/ios/TcpSocketClient.m @@ -579,9 +579,9 @@ - (void)socket:(GCDAsyncSocket *)sock } - (void)socketDidCloseReadStream:(GCDAsyncSocket *)sock { - // TODO : investigate for half-closed sockets - // for now close the stream completely - [sock disconnect]; + if (_clientDelegate) { + [_clientDelegate onEnd:[sock userData]]; + } } - (void)socketDidDisconnect:(GCDAsyncSocket *)sock withError:(NSError *)err { diff --git a/ios/TcpSockets.m b/ios/TcpSockets.m index fc99b73..0fe15d2 100644 --- a/ios/TcpSockets.m +++ b/ios/TcpSockets.m @@ -22,7 +22,7 @@ @implementation TcpSockets { - (NSArray *)supportedEvents { return @[ @"connect", @"listening", @"connection", @"secureConnection", @"data", - @"close", @"error", @"written" + @"close", @"error", @"written", @"end" ]; } @@ -294,7 +294,17 @@ - (void)onSocketConnection:(TcpSocketClient *)client - (void)onData:(NSNumber *)clientID data:(NSData *)data { NSString *base64String = [data base64EncodedStringWithOptions:0]; [self sendEventWithName:@"data" - body:@{@"id" : clientID, @"data" : base64String}]; + body:@{ + @"id" : clientID, + @"data" : base64String + }]; +} + +- (void)onEnd:(NSNumber *)clientID { + [self sendEventWithName:@"end" + body:@{ + @"id" : clientID + }]; } - (void)onClose:(NSNumber *)clientID withError:(NSError *)err { diff --git a/jest.setup.js b/jest.setup.js index ebd524f..025f3b8 100644 --- a/jest.setup.js +++ b/jest.setup.js @@ -14,6 +14,10 @@ jest.mock('react-native', () => { destroy: jest.fn(), write: jest.fn(), listen: jest.fn(), + pause: jest.fn(), + resume: jest.fn(), + setKeepAlive: jest.fn(), + setNoDelay: jest.fn(), }, }, NativeEventEmitter: NativeEventEmitter, diff --git a/src/Server.js b/src/Server.js index af746cc..a9884cc 100644 --- a/src/Server.js +++ b/src/Server.js @@ -38,6 +38,7 @@ export default class Server extends EventEmitter { this._id = getNextId(); /** @protected @readonly */ this._eventEmitter = nativeEventEmitter; + // console.log('Server eventEmitter:', this._eventEmitter); /** @private @type {Set} */ this._connections = new Set(); /** @private */ @@ -249,6 +250,14 @@ export default class Server extends EventEmitter { const keepAliveDelay = this._serverOptions.keepAliveInitialDelay || 0; newSocket.setKeepAlive(this._serverOptions.keepAlive, keepAliveDelay); } + + if (this._serverOptions.allowHalfOpen !== undefined) { + newSocket.allowHalfOpen = this._serverOptions.allowHalfOpen; + } + + if (this._serverOptions.pauseOnConnect) { + newSocket.pause(); + } } return newSocket; diff --git a/src/Socket.js b/src/Socket.js index 6ed2141..2cc862b 100644 --- a/src/Socket.js +++ b/src/Socket.js @@ -3,7 +3,6 @@ import { NativeModules } from 'react-native'; import EventEmitter from 'eventemitter3'; import { Buffer } from 'buffer'; -const Sockets = NativeModules.TcpSockets; import { nativeEventEmitter, getNextId } from './Globals'; /** @@ -30,6 +29,7 @@ import { nativeEventEmitter, getNextId } from './Globals'; * @typedef {object} ReadableEvents * @property {() => void} pause * @property {() => void} resume + * @property {() => void} end * * @typedef {object} SocketEvents * @property {(had_error: boolean) => void} close @@ -95,6 +95,7 @@ export default class Socket extends EventEmitter { this.remoteAddress = undefined; this.remotePort = undefined; this.remoteFamily = undefined; + this.allowHalfOpen = false; this._registerEvents(); } @@ -164,7 +165,12 @@ export default class Socket extends EventEmitter { }); this._connecting = true; this._readyState = 'opening'; - Sockets.connect(this._id, customOptions.host, customOptions.port, customOptions); + NativeModules.TcpSockets.connect( + this._id, + customOptions.host, + customOptions.port, + customOptions + ); return this; } @@ -243,7 +249,7 @@ export default class Socket extends EventEmitter { this.once('connect', () => this.setNoDelay(noDelay)); return this; } - Sockets.setNoDelay(this._id, noDelay); + NativeModules.TcpSockets.setNoDelay(this._id, noDelay); return this; } @@ -267,7 +273,7 @@ export default class Socket extends EventEmitter { ); } - Sockets.setKeepAlive(this._id, enable, Math.floor(initialDelay)); + NativeModules.TcpSockets.setKeepAlive(this._id, enable, Math.floor(initialDelay)); return this; } @@ -291,14 +297,14 @@ export default class Socket extends EventEmitter { end(data, encoding) { if (data) { this.write(data, encoding, () => { - Sockets.end(this._id); + NativeModules.TcpSockets.end(this._id); }); return this; } if (this._pending || this._destroyed) return this; this._clearTimeout(); - Sockets.end(this._id); + NativeModules.TcpSockets.end(this._id); return this; } @@ -309,7 +315,7 @@ export default class Socket extends EventEmitter { if (this._destroyed) return this; this._destroyed = true; this._clearTimeout(); - Sockets.destroy(this._id); + NativeModules.TcpSockets.destroy(this._id); return this; } @@ -357,7 +363,7 @@ export default class Socket extends EventEmitter { if (!ok) this.writableNeedDrain = true; this._lastSentMsgId = currentMsgId; this._bytesWritten += generatedBuffer.byteLength; - Sockets.write(this._id, generatedBuffer.toString('base64'), currentMsgId); + NativeModules.TcpSockets.write(this._id, generatedBuffer.toString('base64'), currentMsgId); return ok; } @@ -365,10 +371,11 @@ export default class Socket extends EventEmitter { * Pauses the reading of data. That is, `'data'` events will not be emitted. Useful to throttle back an upload. */ pause() { - if (this._paused) return; + if (this._paused) return this; this._paused = true; - Sockets.pause(this._id); + NativeModules.TcpSockets.pause(this._id); this.emit('pause'); + return this; } /** @@ -426,7 +433,7 @@ export default class Socket extends EventEmitter { } } this._resuming = false; - Sockets.resume(this._id); + NativeModules.TcpSockets.resume(this._id); } /** @@ -462,6 +469,13 @@ export default class Socket extends EventEmitter { this._setDisconnected(); this.emit('close', evt.error); }); + this._endListener = this._eventEmitter.addListener('end', (evt) => { + if (evt.id !== this._id) return; + if (!this.allowHalfOpen) { + this.end(); + } + this.emit('end'); + }); this._connectListener = this._eventEmitter.addListener('connect', (evt) => { if (evt.id !== this._id) return; this._setConnected(evt.connection); @@ -480,6 +494,7 @@ export default class Socket extends EventEmitter { this._dataListener?.remove(); this._errorListener?.remove(); this._closeListener?.remove(); + this._endListener?.remove(); this._connectListener?.remove(); this._writtenListener?.remove(); }